Django, Celery、Flowerによる分散タスク処理
James Reed
Infrastructure Engineer · Leapcell

はじめに
現代のWebアプリケーションの世界では、応答性とスケーラビリティが最優先事項です。ユーザーインタラクションは、画像処理、データ分析、一括メール送信、レポート生成などの複雑な操作をトリガーすることがよくあります。これらのタスクをメインのリクエスト・レスポンスサイクル内で同期的に実行すると、ユーザーエクスペリエンスが低下し、タイムアウトが発生し、アプリケーションが不安定になる可能性があります。そこで、バックグラウンドタスクの概念が重要になります。時間のかかる操作を別のプロセスにオフロードすることで、ユーザーに迅速に応答を返すことができ、知覚されるパフォーマンスが向上し、アプリケーションがより多くの同時リクエストを処理できるようになります。この記事では、Django、Celery、Flowerの強力な組み合わせを使用してこれを実現する方法を掘り下げ、分散バックグラウンドタスクの構築、実行、監視におけるそれらの相乗効果を実証します。
主要な概念の説明
実装の詳細に入る前に、関係する主要なテクノロジーについて明確に理解しましょう。
- Django:迅速な開発とクリーンで実践的な設計を奨励する、ハイレベルなPython Webフレームワークです。アプリケーションのフロントエンドを提供し、バックグラウンドタスクをトリガーするエンティティとなります。
- Celery:分散メッセージパッシングに基づく非同期タスクキュー/ジョブキューです。Djangoアプリケーションからタスクをオフロードし、別のワーカープロセスで実行できるようにします。非常に柔軟で、さまざまなメッセージブローカーをサポートし、タスクスケジューリング、リトライ、レート制限などの機能を提供します。
- Broker:Djangoアプリケーション(プロデューサー)とCeleryワーカー(コンシューマー)間の通信を容易にするメッセージキューです。タスクが送信されると、ブローカーに配置され、ワーカーがそこから取得します。一般的な選択肢には、RabbitMQやRedisがあります。
- Celery Worker:ブローカーで新しいタスクを継続的に監視する別のプロセスです。タスクを取得すると、ワーカーはそのタスクのロジックを実行します。並行してタスクを処理するために、複数のワーカーを実行できます。
- Celery Beat:Celeryキューに定期的にタスクをディスパッチするスケジューラです。これは、日次データバックアップや夜間レポート生成のようなスケジュールされたジョブに便利です。
- Celery Flower:CeleryのリアルタイムWebベースモニターです。タスクの状態(保留中、開始済み、成功、失敗)、ワーカーアクティビティを検査するためのユーザーフレンドリーなインターフェースを提供し、ワーカーのリモート制御も可能にします。
分散タスクの構築、実行、監視
画像リサイズを実践的な例として、これらのツールを統合するプロセスを段階的に見ていきましょう。ユーザーが画像をアップロードし、アップロードプロセスをブロックする代わりに、リサイズをCeleryワーカーにオフロードします。
プロジェクトのセットアップとインストール
まず、Djangoプロジェクトがセットアップされていることを確認します。次に、必要なパッケージをインストールします。
pip install Django celery redis flower Pillow
画像操作のために、RedisをCeleryブローカーおよび結果バックエンドとして、そのシンプルさとパフォーマンスから使用します。Pillowは画像操作用です。
Djangoプロジェクトの設定
settings.py
ファイルを変更してCeleryを設定します。
# myproject/settings.py # Celery Configuration CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' # またはお住まいのタイムゾーン
次に、Djangoプロジェクトのルートディレクトリ(settings.py
と同じレベル)にcelery.py
ファイルを作成します。
# myproject/celery.py import os from celery import Celery # 'celery' プログラムのデフォルトのDjango設定モジュールを設定します。 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') app = Celery('myproject') # ここで文字列を使用することは、ワーカーが設定オブジェクトを子プロセスにシリアライズする必要がないことを意味します。 # - namespace='CELERY' は、すべてのCelery関連の設定キーに `CELERY_` というプレフィックスが付くことを意味します。 app.config_from_object('django.conf:settings', namespace='CELERY') # 登録済みのすべてのDjangoアプリ設定からタスクモジュールをロードします。 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
最後に、DjangoアプリがCeleryをロードすることを確認します。プロジェクトの__init__.py
でこれを行います。
# myproject/__init__.py # これにより、Djangoが起動したときにアプリが常にインポートされるようになり、shared_taskがこのアプリを使用するようになります。 from .celery import app as celery_app __all__ = ('celery_app',)
Celeryタスクの定義
images
という名前のDjangoアプリを作成し、その中の画像リサイズタスクを定義しましょう。
python manage.py startapp images
images
アプリ内にtasks.py
ファイルを作成します。
# images/tasks.py import os from PIL import Image from io import BytesIO from django.core.files.base import ContentFile from django.conf import settings from .models import UploadedImage # 画像を保存するためのモデルを想定 from celery import shared_task @shared_task def resize_image_task(image_id, max_width=800, max_height=600): try: uploaded_image = UploadedImage.objects.get(id=image_id) original_image_path = uploaded_image.image.path # 元の画像を開く img = Image.open(original_image_path) img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS) # リサイズされた画像をバッファに保存 output_buffer = BytesIO() # 元のフォーマットを保持するか、JPEGのような一般的なフォーマットを選択します img_format = img.format if img.format else 'JPEG' quality = 85 if img_format == 'JPEG' else None img.save(output_buffer, format=img_format, quality=quality) output_buffer.seek(0) # リサイズされた画像の新しいファイル名を構築する original_filename_base, original_filename_ext = os.path.splitext(uploaded_image.image.name) resized_filename = f"{original_filename_base}_resized{original_filename_ext}" # リサイズされた画像でモデルを更新する uploaded_image.resized_image.save( resized_filename, ContentFile(output_buffer.read()), save=False # モデルはまだ保存しません。明示的に保存します。 ) uploaded_image.is_processed = True uploaded_image.save() return f"Image {image_id} resized successfully to {max_width}x{max_height}." except UploadedImage.DoesNotExist: return f"Error: Image with ID {image_id} not found." except Exception as e: return f"Error resizing image {image_id}: {str(e)}"
画像保存のためのシンプルなDjangoモデルが必要です。
# images/models.py from django.db import models class UploadedImage(models.Model): image = models.ImageField(upload_to='original_images/') resized_image = models.ImageField(upload_to='resized_images/', blank=True, null=True) uploaded_at = models.DateTimeField(auto_now_add=True) is_processed = models.BooleanField(default=False) def __str__(self): return f"Image {self.id}: {self.image.name}"
マイグレーションを実行することを忘れないでください: python manage.py makemigrations images
および python manage.py migrate
。また、画像保存のためにsettings.py
でMEDIA_ROOT
とMEDIA_URL
が設定されていることを確認してください。
# myproject/settings.py # ... MEDIA_ROOT = os.path.join(BASE_DIR, 'media') MEDIA_URL = '/media/' # ...
Djangoからタスクをトリガーする
画像アップロードを処理し、Celeryタスクをトリガーする簡単なビューをimages
アプリに作成しましょう。
# images/views.py from django.shortcuts import render, redirect from .forms import ImageUploadForm from .models import UploadedImage from .tasks import resize_image_task def upload_image(request): if request.method == 'POST': form = ImageUploadForm(request.POST, request.FILES) if form.is_valid(): uploaded_image = form.save() # タスクをキューに追加する resize_image_task.delay(uploaded_image.id) return redirect('image_list') else: form = ImageUploadForm() return render(request, 'images/upload.html', {'form': form}) def image_list(request): images = UploadedImage.objects.all().order_by('-uploaded_at') return render(request, 'images/list.html', {'images': images})
そして、対応するフォームです。
# images/forms.py from django import forms from .models import UploadedImage class ImageUploadForm(forms.ModelForm): class Meta: model = UploadedImage fields = ['image']
URLを設定します。
# myproject/urls.py from django.contrib import admin from django.urls import path from django.conf import settings from django.conf.urls.static import static from images.views import upload_image, image_list urlpatterns = [ path('admin/', admin.site.urls), path('upload/', upload_image, name='upload_image'), path('images/', image_list, name='image_list'), ] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
そして、upload.html
とlist.html
の基本的なテンプレートです。
<!-- images/templates/images/upload.html --> <h1>Upload Image</h1> <form method="post" enctype="multipart/form-data"> {% csrf_token %} {{ form.as_p }} <button type="submit">Upload</button> </form> <a href="{% url 'image_list' %}">View Images</a>
<!-- images/templates/images/list.html --> <h1>Uploaded Images</h1> <ul> {% for image in images %} <li> <p>Original: <img src="{{ image.image.url }}" width="100"></p> {% if image.is_processed %} <p>Resized: <img src="{{ image.resized_image.url }}" width="100"></p> {% else %} <p>Processing...</p> {% endif %} <p>Uploaded at: {{ image.uploaded_at }}</p> </li> {% empty %} <li>No images uploaded yet.</li> {% endfor %} </ul> <a href="{% url 'upload_image' %}">Upload New Image</a>
コンポーネントの実行
すべてがセットアップされたので、さまざまな部分を実行しましょう。
-
Redisの起動(まだ実行されていない場合):
redis-server
-
Celery Workerの起動:「プロジェクトのルートディレクトリで新しいターミナルを開きます。
celery -A myproject worker -l info
ワーカーがRedisに接続され、タスクを処理する準備ができたことを示す出力が表示されるはずです。
-
Django開発サーバーの起動:「別のターミナルを開きます。
python manage.py runserver
-
Flowerの起動(監視用):「3番目のターミナルを開きます。
celery -A myproject flower
Flowerは通常
http://localhost:5555
でアクセスできます。
これで、ブラウザで http://localhost:8000/upload/
にアクセスし、画像をアップロードして、次のことを監視します。
- Djangoビューはすぐにリダイレクトされ、タスクがオフロードされたことを示します。
- Celeryワーカーターミナルは、
resize_image_task
を受信して処理しているログを表示します。 - Flower (
http://localhost:5555
) は、タスクの状態(PENDING、STARTED、SUCCESS)をリアルタイムで表示します。タスクIDをクリックすると、詳細情報、引数、戻り値を確認できます。 http://localhost:8000/images/
をリフレッシュすると、処理が完了すると元の画像とリサイズされた画像のすべてが表示されます。
高度な概念とアプリケーションシナリオ
- タスクの連鎖/ワークフロー:Celeryは、
chord
、group
、chain
操作を使用して複雑なタスクワークフローをサポートします。たとえば、画像をリサイズした後、クラウドストレージにアップロードするタスク、その後データベースレコードを更新するタスクを連鎖させたい場合があります。 - リトライ:タスクは、カスタマイズ可能なリトライ遅延と最大試行回数で障害発生時に自動的にリトライするように構成できます。これにより、アプリケーションはより回復力のあるものになります。
- レート制限:ワーカーが特定の時間枠内で実行できるタスクの数を制御することで、サードパーティAPIや機密性の高いサービスのリソース枯渇を防ぎます。
- Celery Beatによるスケジュールされたタスク:特定の時間間隔(例:毎晩午前3時に古いデータをクリーンアップする)でタスクを実行するには、Celery Beatを使用します。
次に、別のターミナルからCelery Beatを実行します:# myproject/settings.py CELERY_BEAT_SCHEDULE = { 'clean-old-images-every-day': { 'task': 'images.tasks.cleanup_old_images_task', 'schedule': crontab(hour=3, minute=0), # 毎日午前3時に実行 }, }
celery -A myproject beat -l info
- エラー処理:
try-except
ブロックを使用してタスク内で適切なエラー処理を実装し、検査のためにエラーをエミットしたり、ログに記録したりします。 - 同時実行性:高負荷のタスクを処理するために、複数のワーカープロセスを実行するか、異なるマシンで複数のワーカーノードを実行して、同時実行性をスケールアップします。
結論
DjangoとCeleryとFlowerを統合することで、バックグラウンドタスクを管理するための堅牢でスケーラブルなソリューションが提供されます。Celeryは、時間のかかる操作をオフロードすることにより、Djangoアプリケーションの応答性を維持できるようにし、Flowerはタスク実行のリアルタイムの可視性と制御を提供します。この強力なトリオは、アプリケーションのパフォーマンス、ユーザーエクスペリエンス、および運用の効率を大幅に向上させ、最新のWeb開発において不可欠なパターンとなっています。同期処理と長時間実行される操作を分離することで、分散アーキテクチャの可能性を最大限に引き出すことができます。