django-celeryで非同期処理クイックスタートガイド

django-celery で非同期処理をやる。

サクッと非同期処理を試せちゃうような、クイックスタートガイド をメモがてら書いていく。

インストール

まずはインストールから。

pip を使って Djangodjango-celery をインストールする。

% pip install Django
% pip install django-celery

おわり。

バージョンは以下のようになった

% pip freeze

Django==1.5
amqp==1.0.10
anyjson==0.3.3
billiard==2.7.3.23
celery==3.0.16
django-celery==3.0.11
kombu==2.5.8
python-dateutil==1.5
pytz==2013b
wsgiref==0.1.2

設定

インストールできたら Django のプロジェクト asynctest と、 アプリケーション caculator を作ってみる。

% django-admin.py startproject asynktest
% cd asynktest
% python manage.py startapp caculator

まずは設定ファイル (asynctest/asynctest/settings.py) に django-celery と先ほど作った caculator 、それとあと 裏方の kombu.transport.django をいれてやる。

INSTALLED_APPS = (
    ...
    'djcelery',
    'kombu.transport.django',
    'caculator',
)

こんなかんじに。

さらに、 django-celery に関する設定を追記する。

###### django-celery configuations ######
from djcelery import setup_loader
setup_loader()
BROKER_URL = 'django://'
# Tasks will be executed asynchronously.
CELERY_ALWAYS_EAGER = False

あとはDBと同期すればOK。

Note

django-kombu をインストールしなくても最近の kombu と上記の設定で Django のDBをブローカーとして使えるよう。 django-celery インストールの段階で kombu もインストール されてくれるので、とくに kombu を意識する必要がなくなって しまった。

非同期処理させるもの

さて肝心の処理をおこなう関数を書く。 caculator/tasks.py を以下のように書いた。

import time

from celery import task

@task
def add(a, b):
    time.sleep(10)
    return a + b

task でデコレートしてあげるだけで良い。 add 関数は10秒スリープしてくれるという親切設計なので、 存分に非同期を味わうことができる。

あと、モジュール名は tasks.py にしましょうね。

実行してみる

準備ができたので非同期を味わってみる。 まずは Celery 氏を起動。

% python manage.py celeryd -l info

別のシェルから manage.py shell を起動。

% python manage.py shell

打っていく。

>>> from caculator.tasks import add
>>> add(1, 2) # 10秒かかる
3
>>> add.run(1, 2) # 10秒かかる
3
>>> result = add.delay(1, 2) # 非同期で実行するには delay
>>> result.ready() # ready で終了したかがわかる
False
>>> result.ready() # マダァ?(・∀・ )っ/凵⌒☆チンチン
True
>>> result.get() # 終わってたら get でとる
3
>>> # 処理終わってないのに get すると、
>>> # 値が返るまで待ってしまう
>>> # タイムアウトしたい
>>>
>>> result = add.delay(3, 4) # もっかい計算
>>> result.get(timeout=3) # 3秒間だけ待つ
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/path/to/celery/result.py", line 109, in get
    interval=interval)
  File "/path/to/celery/backends/base.py", line 186, in wait_for
    raise TimeoutError('The operation timed out.')
TimeoutError: The operation timed out.
>>> # 3秒以内に終わらなかったら TimeoutError
>>> result.get(timeout=3)
7
>>> # 3秒以内なら結果が返る

こんなかんじで使えます。 やったね。非同期処理ができたよ。

重い処理があれば非同期でやらせて、ポーリングして結果待つとか サクッと書けてしまう。 Django 内で完結してできるから分かりやすくて楽。 django-celery 覚えていて損ないかと思う。

参考:

django-celeryでモジュール名をtasks.pyにせずにハマった

django-celery でハマった。 django-celery の場合、 manage.py celeryd を読んでやるだけで、 とくにモジュールの指定や celery インスタンスの生成が要らない。

そういった設定は setting.py に書いておいてやってあとは勝手にやってくれる。 だけど、今回はこれにハマった。

非同期処理を行わせる処理はすべて tasks.py というモジュールにいれてやる 必要がある:

PROJECT_NAME/APP_NAME/tasks.py

これを知らずに runner.py だか workflow.py だかにいれてやっててエラーになっていた。 状況としては以下の様な関数を tasks.py 以外の モジュールに書いていた。

@task(name='test')
def hoge():
    return 'hoge'

これで

>>> from hogeapp.hoge import hoge
>>> hoge.delay()

実行。

そうすると Celery 氏が泣く。

[2013-03-21 23:04:47,126: INFO/MainProcess] consumer: Connected to django://localhost//.
[2013-03-21 23:05:12,227: ERROR/MainProcess] Received unregistered task of type 'test'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
More: http://docs.celeryq.org/en/latest/userguide/tasks.html#names

The full contents of the message body was:
{'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': 'test', 'callbacks': None, 'errbacks': None, 'taskset': None, 'kwargs': {}, 'eta': None, 'id': '5d887f1a-e72c-46d2-8f64-16da02cf6780'} (184b)

Traceback (most recent call last):
  File "/home/hirokiky/dev/hogehogeenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 592, in receive_message
    self.strategies[name](message, body, message.ack_log_error)
KeyError: 'test'

tasks.py に入れておいてやらないと、読み込んで登録しておいてくれないっぽい。

ためしに worker/consumer.py の592行目に潜り込んでみたら、たしかに self.strategies には無かった。

ただ手元のシェルで見てみると登録されているように見えたので、 「ちゃんと登録されてるしー」と思っていたのが罠。

>>> from celery import registry
>>> registry.tasks
{'celery.chain': <@task: celery.chain>, ... 'hoge': <@task: hoge>}

あると思ったけど、結局はCeleryd側では読めてないっぽかった。

これの解答になりそうなものはあった。

でも celery.registry.TaskRegistry なんてクラスなかったのでスルーした。

まぁ、ドキュメントちゃんと読んだら書いてあったので、解決:

Tasks are defined by wrapping functions in the @task decorator.
It is a common practice to put these in their own module named tasks.py,
and the worker will automatically go through the apps in INSTALLED_APPS
to import these modules.

http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

何かにつけて思うけど、ドキュメントまじワイフ。