Verteilte Aufgabenverarbeitung mit Django, Celery und Flower
James Reed
Infrastructure Engineer · Leapcell

Einleitung
In der Welt moderner Webanwendungen sind Reaktionsfähigkeit und Skalierbarkeit von größter Bedeutung. Benutzerinteraktionen lösen oft komplexe Operationen aus, wie z. B. Bildverarbeitung, Datenanalyse, Versenden von Massen-E-Mails oder Generieren von Berichten. Die synchrone Ausführung dieser Aufgaben im Haupt-Request-Response-Zyklus kann zu langsamen Benutzererlebnissen, Timeouts und einer instabilen Anwendung führen. Hier kommt das Konzept der Hintergrundaufgaben ins Spiel. Durch das Auslagern zeitaufwendiger Operationen an einen separaten Prozess können wir dem Benutzer schnell eine Antwort zurückgeben, die wahrgenommene Leistung verbessern und unserer Anwendung ermöglichen, mehr gleichzeitige Anfragen zu bearbeiten. Dieser Artikel befasst sich damit, wie dies mit der leistungsstarken Kombination aus Django, Celery und Flower erreicht werden kann, und demonstriert deren Synergie beim Erstellen, Ausführen und Überwachen verteilter Hintergrundaufgaben.
Kernkonzepte erklärt
Bevor wir uns mit den Implementierungsdetails befassen, wollen wir die beteiligten Schlüsseltechnologien klar verstehen:
- Django: Ein High-Level-Python-Webframework, das schnelle Entwicklung und sauberes, pragmatisches Design fördert. Es bietet das Frontend für unsere Anwendung und wird die Entität sein, die unsere Hintergrundaufgaben auslöst.
- Celery: Eine asynchrone Aufgabenwarteschlange/Jobwarteschlange, die auf verteilten Nachrichten basiert. Sie ermöglicht es uns, Aufgaben aus unserer Django-Anwendung auszulagern, die von separaten Worker-Prozessen ausgeführt werden. Sie ist äußerst flexibel, unterstützt verschiedene Message Broker und bietet Funktionen wie Aufgabenplanung, Wiederholungsversuche und Ratenbegrenzung.
- Broker: Eine Nachrichtenwarteschlange, die die Kommunikation zwischen unserer Django-Anwendung (Erzeuger) und den Celery-Workern (Verbrauchern) erleichtert. Wenn eine Aufgabe gesendet wird, wird sie auf dem Broker platziert, und die Worker holen sie von dort ab. Beliebte Optionen sind RabbitMQ und Redis.
- Celery Worker: Ein separater Prozess, der den Broker kontinuierlich auf neue Aufgaben überwacht. Sobald eine Aufgabe abgerufen wurde, führt der Worker die Logik der Aufgabe aus. Sie können mehrere Worker ausführen, um Aufgaben gleichzeitig zu verarbeiten.
- Celery Beat: Ein Scheduler, der Aufgaben periodisch an die Celery-Warteschlange dispatcht. Dies ist nützlich für geplante Jobs wie tägliche Datensicherungen oder nächtliche Berichtserstellung.
- Celery Flower: Ein webbasierter Echtzeit-Monitor für Celery. Er bietet eine benutzerfreundliche Oberfläche zur Inspektion des Status von Aufgaben (ausstehend, gestartet, erfolgreich, fehlgeschlagen), der Worker-Aktivität und ermöglicht sogar die Fernsteuerung von Workern.
Erstellen, Ausführen und Überwachen verteilter Aufgaben
Lassen Sie uns den Prozess der Integration dieser Tools anhand eines praktischen Beispiels durchgehen: Größenänderung von Bildern. Wir simulieren ein Szenario, in dem ein Benutzer ein Bild hochlädt, und anstatt den Upload-Prozess zu blockieren, lagern wir die Größenänderung an einen Celery-Worker aus.
Projektaufbau und Installation
Stellen Sie zunächst sicher, dass Sie ein Django-Projekt eingerichtet haben. Installieren Sie dann die erforderlichen Pakete:
pip install Django celery redis flower Pillow
Wir werden Redis als unseren Celery-Broker und Ergebnis-Backend verwenden, da es einfach und leistungsfähig ist. Pillow dient zur Bildbearbeitung.
Django-Projektkonfiguration
Ändern Sie Ihre settings.py
-Datei, um Celery zu konfigurieren:
# myproject/settings.py # Celery Configuration CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' # Oder Ihre lokale Zeitzone
Erstellen Sie als Nächstes eine Datei celery.py
im Stammverzeichnis Ihres Django-Projekts (auf derselben Ebene wie settings.py
):
# myproject/celery.py import os from celery import Celery # Set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') app = Celery('myproject') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
Stellen Sie schließlich sicher, dass Ihre Django-App Celery lädt. In der __init__.py
Ihres Projekts:
# myproject/__init__.py # This will make sure the app is always imported when Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)
Definieren einer Celery-Aufgabe
Erstellen wir eine Django-App namens images
und definieren unsere Bildgrößenänderungsaufgabe darin.
python manage.py startapp images
Erstellen Sie eine Datei tasks.py
in Ihrer images
-App:
# images/tasks.py import os from PIL import Image from io import BytesIO from django.core.files.base import ContentFile from django.conf import settings from .models import UploadedImage # Annahme eines Modells zur Speicherung von Bildern from celery import shared_task @shared_task def resize_image_task(image_id, max_width=800, max_height=600): try: uploaded_image = UploadedImage.objects.get(id=image_id) original_image_path = uploaded_image.image.path # Öffnen Sie das Originalbild img = Image.open(original_image_path) img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS) # Speichern Sie das skalierte Bild in einem Puffer output_buffer = BytesIO() # Behalten Sie das ursprüngliche Format bei oder wählen Sie ein gängiges Format wie JPEG img_format = img.format if img.format else 'JPEG' quality = 85 if img_format == 'JPEG' else None img.save(output_buffer, format=img_format, quality=quality) output_buffer.seek(0) # Erstellen Sie einen neuen Dateinamen für das skalierte Bild original_filename_base, original_filename_ext = os.path.splitext(uploaded_image.image.name) resized_filename = f"{original_filename_base}_resized{original_filename_ext}" # Aktualisieren Sie das Modell mit dem skalierten Bild uploaded_image.resized_image.save( resized_filename, ContentFile(output_buffer.read()), save=False # Speichern Sie das Modell noch nicht, wir tun dies explizit ) uploaded_image.is_processed = True uploaded_image.save() return f