django-celeryで非同期処理クイックスタートガイド
django-celery で非同期処理をやる。
サクッと非同期処理を試せちゃうような、クイックスタートガイド をメモがてら書いていく。
インストール
まずはインストールから。
pip を使って Django と django-celery をインストールする。
% pip install Django
% pip install django-celery
おわり。
バージョンは以下のようになった
% pip freeze
Django==1.5
amqp==1.0.10
anyjson==0.3.3
billiard==2.7.3.23
celery==3.0.16
django-celery==3.0.11
kombu==2.5.8
python-dateutil==1.5
pytz==2013b
wsgiref==0.1.2
設定
インストールできたら Django のプロジェクト asynctest と、 アプリケーション caculator を作ってみる。
% django-admin.py startproject asynktest
% cd asynktest
% python manage.py startapp caculator
まずは設定ファイル (asynctest/asynctest/settings.py) に django-celery と先ほど作った caculator 、それとあと 裏方の kombu.transport.django をいれてやる。
INSTALLED_APPS = (
...
'djcelery',
'kombu.transport.django',
'caculator',
)
こんなかんじに。
さらに、 django-celery に関する設定を追記する。
###### django-celery configuations ######
from djcelery import setup_loader
setup_loader()
BROKER_URL = 'django://'
# Tasks will be executed asynchronously.
CELERY_ALWAYS_EAGER = False
あとはDBと同期すればOK。
Note
django-kombu をインストールしなくても最近の kombu と上記の設定で Django のDBをブローカーとして使えるよう。 django-celery インストールの段階で kombu もインストール されてくれるので、とくに kombu を意識する必要がなくなって しまった。
非同期処理させるもの
さて肝心の処理をおこなう関数を書く。 caculator/tasks.py を以下のように書いた。
import time
from celery import task
@task
def add(a, b):
time.sleep(10)
return a + b
task でデコレートしてあげるだけで良い。 add 関数は10秒スリープしてくれるという親切設計なので、 存分に非同期を味わうことができる。
あと、モジュール名は tasks.py にしましょうね。
実行してみる
準備ができたので非同期を味わってみる。 まずは Celery 氏を起動。
% python manage.py celeryd -l info
別のシェルから manage.py shell を起動。
% python manage.py shell
打っていく。
>>> from caculator.tasks import add
>>> add(1, 2) # 10秒かかる
3
>>> add.run(1, 2) # 10秒かかる
3
>>> result = add.delay(1, 2) # 非同期で実行するには delay
>>> result.ready() # ready で終了したかがわかる
False
>>> result.ready() # マダァ?(・∀・ )っ/凵⌒☆チンチン
True
>>> result.get() # 終わってたら get でとる
3
>>> # 処理終わってないのに get すると、
>>> # 値が返るまで待ってしまう
>>> # タイムアウトしたい
>>>
>>> result = add.delay(3, 4) # もっかい計算
>>> result.get(timeout=3) # 3秒間だけ待つ
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/path/to/celery/result.py", line 109, in get
interval=interval)
File "/path/to/celery/backends/base.py", line 186, in wait_for
raise TimeoutError('The operation timed out.')
TimeoutError: The operation timed out.
>>> # 3秒以内に終わらなかったら TimeoutError
>>> result.get(timeout=3)
7
>>> # 3秒以内なら結果が返る
こんなかんじで使えます。 やったね。非同期処理ができたよ。
重い処理があれば非同期でやらせて、ポーリングして結果待つとか サクッと書けてしまう。 Django 内で完結してできるから分かりやすくて楽。 django-celery 覚えていて損ないかと思う。
参考:
django-celeryでモジュール名をtasks.pyにせずにハマった
django-celery でハマった。 django-celery の場合、 manage.py celeryd を読んでやるだけで、 とくにモジュールの指定や celery インスタンスの生成が要らない。
そういった設定は setting.py に書いておいてやってあとは勝手にやってくれる。 だけど、今回はこれにハマった。
非同期処理を行わせる処理はすべて tasks.py というモジュールにいれてやる 必要がある:
PROJECT_NAME/APP_NAME/tasks.py
これを知らずに runner.py だか workflow.py だかにいれてやっててエラーになっていた。 状況としては以下の様な関数を tasks.py 以外の モジュールに書いていた。
@task(name='test')
def hoge():
return 'hoge'
これで
>>> from hogeapp.hoge import hoge
>>> hoge.delay()
実行。
そうすると Celery 氏が泣く。
[2013-03-21 23:04:47,126: INFO/MainProcess] consumer: Connected to django://localhost//.
[2013-03-21 23:05:12,227: ERROR/MainProcess] Received unregistered task of type 'test'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
More: http://docs.celeryq.org/en/latest/userguide/tasks.html#names
The full contents of the message body was:
{'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': 'test', 'callbacks': None, 'errbacks': None, 'taskset': None, 'kwargs': {}, 'eta': None, 'id': '5d887f1a-e72c-46d2-8f64-16da02cf6780'} (184b)
Traceback (most recent call last):
File "/home/hirokiky/dev/hogehogeenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 592, in receive_message
self.strategies[name](message, body, message.ack_log_error)
KeyError: 'test'
tasks.py に入れておいてやらないと、読み込んで登録しておいてくれないっぽい。
ためしに worker/consumer.py の592行目に潜り込んでみたら、たしかに self.strategies には無かった。
ただ手元のシェルで見てみると登録されているように見えたので、 「ちゃんと登録されてるしー」と思っていたのが罠。
>>> from celery import registry
>>> registry.tasks
{'celery.chain': <@task: celery.chain>, ... 'hoge': <@task: hoge>}
あると思ったけど、結局はCeleryd側では読めてないっぽかった。
これの解答になりそうなものはあった。
でも celery.registry.TaskRegistry なんてクラスなかったのでスルーした。
まぁ、ドキュメントちゃんと読んだら書いてあったので、解決:
Tasks are defined by wrapping functions in the @task decorator.
It is a common practice to put these in their own module named tasks.py,
and the worker will automatically go through the apps in INSTALLED_APPS
to import these modules.
http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
何かにつけて思うけど、ドキュメントまじワイフ。