python job queue 処理のフレームワーク、Celeryを使ったときのメモ。
Celeryとは
Celeryを使えばタスクを別のスレッドやマシンに分散することができる。このタスクを分散させる仕組みをタスクキューといい、Celeryのプロセスは新しいタスクが入ってきていないかタスクキューを監視する。タスクの実行は1つ以上のワーカーサーバー上でmultiprocessing(プロセスベースで並列処理を行えるPythonの標準ライブラリのパッケージ)等を使用して、同時に行うことができる。 タスクは、非同期に(バックグラウンドで)実行することも、同期して実行することもできる。一日に何百万ものタスクを処理することができる。
ワーカーとのやりとりについて、タスクを開始する際にクライアントはメッセージをキューに追加し、Redisなどのブローカーがそのメッセージをワーカーに持って行く。ブローカーというのは「仲介人」という意味通り、メッセージをクライアントとワーカー間で送受信させるソリューションのこと。(少しだけ後述)
参考文献:http://www.celeryproject.org/
Django First ステップ をやってみた
以下、Django First ステップ — Celery 3.1.18 ドキュメントを行ったメモ。ソースはgitにあるとのこと
treeはこんな内容。
demoapp ├── __init__.py ├── models.py ├── tasks.py ├── tests.py └── views.py manage.py proj ├── __init__.py ├── celery.py ├── settings.py ├── urls.py └── wsgi.py
proj/celery.pyが追加されたもの。
Django と Celery の連携
proj/__init__.py
from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app
celery.pyからappがインポートされることによってDjangoの起動時にappがロードされ
tasks.pyの@shared_taskデコレータがcelery.pyのappを参照できるようになる。
※デコレータというのは、関数に付加機能をつけるようなもの。
proj/celery.py
from __future__ import absolute_import import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') from django.conf import settings app = Celery('proj') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
app = Celery('proj')
appに対してDjangoの設定モジュールをCeleryの設定として使う。
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
この一文により、Celeryはtasks.pyの規約に則った再利用可能なアプリケーション内のタスクを自動的に検知する。
autodiscover_tasks(packages=None, related_name=u'tasks', force=False)
"tasks.py"モジュールのパッケージのリストを検索します(またはrelated_name引数を使用します)。
参照:
celery — Distributed processing — Celery 4.1.0 documentation
@app.task(bind=True) def debug_task(self):
bindオプションにより、関数がバインドされtaskのインスタンス(の属性とメソッド)にアクセスできるようになる。
参照:http://docs.celeryproject.org/en/latest/userguide/tasks.html#example
demoapp/tasks.py
from __future__ import absolute_import from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
前述した通り、@shared_taskによってcelery.pyのapp = Celery('proj')を参照できるようになり、複数のタスクが同時に処理されるようになる。
proj/settings.py
#Celery設定箇所 from __future__ import absolute_import BROKER_URL = 'redis://localhost:6379/0'#'amqp://guest:guest@localhost//' #: Only add pickle to this list if your broker is secured #: from unwanted access (see userguide/security.html) CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'
BROKERとは?
Celery はメッセージの送信と受信を行うソリューションを必要とします。通常これは メッセージブローカー と呼ばれる独立したサービスの形で提供されます。
Celery ファーストステップ — Celery 3.1.18 ドキュメント
BROKER_URLはRedis データベースのロケーションを意味している。コメントアウトしているのはRabbitMQの場合のもの。
URLのフォーマットは下記の通り。
redis://:password@hostname:port/db_number
スキーム(scheme)以下の全フィールドはオプションで、デフォルトは
・ホスト名:localhost
・ポート番号:6379
・データベース番号:0
UNIX ソケット接続を使う場合、URL のフォーマットは次のようになる
redis+socket:///path/to/redis.sock
今回実装していないが、タスクのステートや戻り値を Redis に保存したい場合は、次の設定を追加すべし。
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
Redis の使用 — Celery 3.1.18 ドキュメント
結果バックエンドの記事
その他の設定についてはこちらを参照
Configuration and defaults — Celery 3.1.18 ドキュメント
ワーカープロセスの起動
本番環境ではワーカーをデーモンとしてバックグラウンドで稼動させるだろうが、テストや開発時においてはcelery workerコマンドを使ってワーカーインスタンスを起動する方が便利だろうとのこと。デーモンとは、Unix系のOSで使用される常駐ソフトのこと。
$ celery -A proj worker -l info
このコマンドでworkerプロセスを起動。Ctrl+C で停止する。-lはログレベルのオプション。
ログレベルでは、DEBUG、INFO、WARNING、ERROR、CRITICAL、またはFATALのいずれかを選択します。
ワーカーについての記事:ワーカー — Celery 3.1.18 ドキュメント
オプションについてはこのページ:celery.bin.worker — Celery 3.1.18 ドキュメント
実行結果
$ celery -A proj worker -l info -------------- celery@hoge-VirtualBox v4.0.2 (latentcall) ---- **** ----- --- * *** * -- Linux-4.4.0-57-generic-x86_64-with-Ubuntu-16.04-xenial 2016-12-27 10:55:36 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: proj:0x7f86e2254590 - ** ---------- .> transport: redis://localhost:6379/0 - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . demoapp.tasks.add . demoapp.tasks.mul . demoapp.tasks.xsum . proj.celery.debug_task [2016-12-27 10:55:36,647: INFO/MainProcess] Connected to redis://localhost:6379/0 [2016-12-27 10:55:36,658: INFO/MainProcess] mingle: searching for neighbors [2016-12-27 10:55:37,684: INFO/MainProcess] mingle: all alone /usr/local/lib/python2.7/dist-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never ' [2016-12-27 10:55:39,233: WARNING/MainProcess] /usr/local/lib/python2.7/dist-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never '
以下に実行結果に表示されたバナーのメッセージの説明を
ネクストステップ — Celery 3.1.18 ドキュメント
より抜粋する。
Concurrency
プリフォークされるワーカープロセスの数で、同時にタスクを処理する数になります。すべてのワーカープロセスが処理中の場合、新しいタスクは他のタスクが完了するまで待たなくてはいけません。デフォルトの concurrency 数はマシンの CPU (Core も含む) 数です。-c オプションでこの数を指定できます。オプション:-c, --concurrency
Events
ワーカーで発生したアクションの監視メッセージ(イベント)を Celery に送信させるオプションです。イベントは、celery events や Flower - リアルタイムのCelery モニター - といった監視プログラムで使われます。オプション:-E, --events
Queues
ワーカーがタスクを取得してくるキューのリストです。一度にいくつかのキューからタスクを取得するようにワーカーに指示することもできます。これは Quality of Service や separation of concerns、優先度のエミュレートを実現する手段として特定のワーカーにメッセージを送るのに使用されます。オプション:-Q, --queues
複数のworkerをバックグラウンドで起動する方法
$ celery multi start w1 -A proj -l info
再起動
$ celery multi restart w1 -A proj -l info
停止
$ celery multi stop w1 -A proj -l info または $ celery multi stopwait w1 -A proj -l info
stop コマンドは非同期なのでワーカーが停止するまで待ちません。stopwait コマンドを使えば現在実行中の全タスクが完了するのを待って終了します。
以下の記事も参考になりました。
DjangoでCeleryのメモ - Qiita