sql >> Databasteknik >  >> NoSQL >> Redis

Flaska efter exempel – Implementera en Redis-uppgiftskö

Den här delen av handledningen beskriver hur man implementerar en Redis-uppgiftskö för att hantera textbearbetning.

Uppdateringar:

  • 02/12/2020:Uppgraderad till Python version 3.8.1 samt de senaste versionerna av Redis, Python Redis och RQ. Se nedan för detaljer. Nämn en bugg i den senaste RQ-versionen och ge en lösning. Löste http före https-felet.
  • 03/22/2016:Uppgraderad till Python version 3.5.1 samt de senaste versionerna av Redis, Python Redis och RQ. Se nedan för mer information.
  • 02/22/2015:Lade till stöd för Python 3.

Gratis bonus: Klicka här för att få tillgång till en gratis självstudievideo för Flask + Python som visar hur du bygger en webbapp för Flask, steg för steg.

Kom ihåg:Det här är vad vi bygger – en Flask-app som beräknar ord-frekvenspar baserat på texten från en given webbadress.

  1. Del ett:Skapa en lokal utvecklingsmiljö och distribuera sedan både en iscensättning och en produktionsmiljö på Heroku.
  2. Del två:Konfigurera en PostgreSQL-databas tillsammans med SQLAlchemy och Alembic för att hantera migrering.
  3. Del tre:Lägg till back-end-logiken för att skrapa och bearbeta sedan ordantalet från en webbsida med hjälp av biblioteken för begäran, BeautifulSoup och Natural Language Toolkit (NLTK).
  4. Del fyra:Implementera en Redis-uppgiftskö för att hantera textbehandlingen. (aktuell )
  5. Del fem:Ställ in Angular på front-end för att kontinuerligt polla back-end för att se om begäran är klar.
  6. Del sex:Push till staging-servern på Heroku - ställ in Redis och beskriver hur man kör två processer (webb och worker) på en enda Dyno.
  7. Del sju:Uppdatera gränssnittet för att göra det mer användarvänligt.
  8. Del åtta:Skapa ett anpassat vinkeldirektiv för att visa ett frekvensfördelningsdiagram med JavaScript och D3.

Behöver du koden? Ta tag i det från arkivet.


Installationskrav

Använda verktyg:

  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) - ett enkelt bibliotek för att skapa en uppgiftskö

Börja med att ladda ner och installera Redis från antingen den officiella webbplatsen eller via Homebrew (brew install redis ). När den är installerad, starta Redis-servern:

$ redis-server

Installera sedan Python Redis och RQ i ett nytt terminalfönster:

$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


Konfigurera Worker

Låt oss börja med att skapa en arbetsprocess för att lyssna efter uppgifter i kö. Skapa en ny fil worker.py , och lägg till den här koden:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Här lyssnade vi efter en kö som heter default och upprättade en anslutning till Redis-servern på localhost:6379 .

Starta upp detta i ett annat terminalfönster:

$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Nu måste vi uppdatera vår app.py för att skicka jobb till kön...



Uppdatera app.py

Lägg till följande importer till app.py :

from rq import Queue
from rq.job import Job
from worker import conn

Uppdatera sedan konfigurationsavsnittet:

app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) skapade en Redis-anslutning och initierade en kö baserat på den anslutningen.

Flytta textbehandlingsfunktionen från vår indexrutt och till en ny funktion som heter count_and_save_words() . Den här funktionen accepterar ett argument, en URL, som vi skickar till den när vi anropar den från vår indexrutt.

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Notera följande kod:

job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Obs! Vi måste importera count_and_save_words funktion i vår funktion index eftersom RQ-paketet för närvarande har en bugg, där det inte hittar funktioner i samma modul.

Här använde vi kön som vi initierade tidigare och kallade enqueue_call() fungera. Detta lade till ett nytt jobb i kön och det jobbet körde count_and_save_words() funktion med URL:en som argument. result_ttl=5000 linjeargumentet talar om för RQ hur länge man ska hålla fast vid resultatet av jobbet i - 5 000 sekunder, i det här fallet. Sedan matade vi jobb-id till terminalen. Detta id behövs för att se om jobbet är färdigbearbetat.

Låt oss ställa in en ny rutt för det...



Få resultat

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Låt oss testa detta.

Starta servern, navigera till http://localhost:5000/, använd URL:en https://realpython.com och hämta jobb-id:t från terminalen. Använd sedan det id:t i '/results/'-slutpunkten - d.v.s. http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Så länge det har gått mindre än 5 000 sekunder innan du kontrollerar statusen, bör du se ett id-nummer som genereras när vi lägger till resultaten i databasen:

# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Låt oss nu ändra rutten något för att returnera de faktiska resultaten från databasen i JSON:

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Se till att lägga till importen:

from flask import jsonify

Testa detta igen. Om allt gick bra bör du se något liknande i din webbläsare:

[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


Vad är härnäst?

Gratis bonus: Klicka här för att få tillgång till en gratis självstudiekurs för Flask + Python som visar hur du bygger en webbapp för Flask, steg för steg.

I del 5 kommer vi att sammanföra klienten och servern genom att lägga till Angular i mixen för att skapa en poller, som skickar en begäran var femte sekund till /results/<job_key> slutpunkt som ber om uppdateringar. När informationen är tillgänglig lägger vi till den i DOM.

Skål!

Detta är ett samarbete mellan Cam Linke, medgrundare av Startup Edmonton, och folket på Real Python



  1. Faktorer att tänka på när du väljer MongoDB för Big Data-applikationer

  2. MongoDB:hur tolkar man datum i 3.6 mongoDb-versionen?

  3. Redis pub/sub on rails

  4. StackExchange Redis ChannelPrefix Inte omfattningsnycklar