Python Celery
プロジェクトウェブサイト より:
- Celery は分散されたメッセージ受け渡しに基づく非同期のタスクキュー・ジョブキューです。リアルタイム処理に焦点が当てられていますが、スケジューリングもサポートしています。タスクは非同期 (バックグラウンド) または同期的 (準備ができるまで待機) に実行できます。
目次
インストール
python-celery パッケージをインストールしてください。他の python パッケージと同じように Python 3.x と互換性のあるパッケージが入手できます。Python 2.x と互換性のあるパッケージが必要な場合は python2-celery[リンク切れ: アーカイブ: aur-mirror] をインストールしてください。
Celery のドキュメント より: "Celery はメッセージを送信・受信するための手段を必要とします"。選択肢のひとつとして RabbitMQ が存在し、公式リポジトリからインストールできます。
設定
Celery
設定ファイルを保存するためのディレクトリ /etc/celery/
を作成する必要があります。サンプル設定ファイルが Celery のドキュメント に記載されています。
celery@celery.service
を起動・有効化してください。
RabbitMQ
RabbitMQ は設定を /etc/rabbitmq/rabbitmq-env.conf
に保存します。
デフォルト設定:
NODENAME=rabbit@rakieta NODE_IP_ADDRESS=0.0.0.0 NODE_PORT=5672 LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia
RabbitMQ は Unix ソケットをサポートしていないため 0.0.0.0
を 127.0.0.1
で置き換えると良いでしょう。
設定をシンプルにするために HOME=/var/lib/rabbitmq
も追加すると良いでしょう。環境変数について詳しくは RabbitMQ のドキュメント を参照してください。
rabbitmq.service
を起動・有効化してください。
RabbitMQ のドキュメント に従ってユーザーとバーチャルホストを追加:
$ cd /var/lib/rabbitmq $ su rabbitmq -c 'rabbitmqctl add_user myuser mypassword' $ su rabbitmq -c 'rabbitmqctl add_vhost myvhost' $ su rabbitmq -c 'rabbitmqctl set_user_tags myuser mytag' $ su rabbitmq -c 'rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"'
詳しくは RabbitMQ の管理者ガイド を読んでください。
su rabbitmq -c "rabbitmqctl status"
を実行したときに badrpc,nodedown
となる場合、問題の解決方法が こちらの記事 に載っています。
セキュリティ
Celery ドキュメント のセキュリティセクションを読んでください。
サンプルタスク
Celery アプリケーション
Celery ドキュメント に従って python のサンプルタスクを作成:
test.py
from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost') @app.task def add(x, y): return x + y
amqp://myuser:mypassword@localhost:5672/myvhost
は RabbitMQ を設定したときに作成したユーザー・バーチャルホストと同じにしてください。
Celery によって使われるデフォルトのブローカーは RabbitMQ であるため backend='amqp'
パラメータは任意です。
テスト
test.py
のあるディレクトリで以下のコマンドを実行:
$ celery -A task worker --loglevel=info
そして (同一ディレクトリに) 以下のファイルを作成:
call.py
from test import add add.delay(4, 4)
実行:
$ python call.py
コンソールからワーカーの情報が出力されます:
Received task: task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] Task task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] succeeded in 0.0007182330009527504s: 8
Celery サービスのモジュールの準備
Celery ドキュメント に書かれていることとは少し異なる手順で説明しています。
test_task
モジュールを作成:
# mkdir /lib/python3.5/site-packages/test_task # touch /lib/python3.5/site-packages/test_task/__init__.py # touch /lib/python3.5/site-packages/test_task/test_task.py # touch /lib/python3.5/site-packages/test_task/celery.py
/lib/python3.5/site-packages/test_task/celery.py
from __future__ import absolute_import from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost') if __name__ == '__main__': app.start()
/lib/python3.5/site-packages/test_task/test_task.py
from __future__ import absolute_import from test_task.celery import app @app.task def add(x, y): return x + y
コンソールで python
を起動して以下のコマンドを実行できるはずです:
>>> from test_task import celery
/etc/celery/celery.conf
の中の以下の行を:
CELERY_APP="proj"
以下のように置き換えてください:
CELERY_APP="test_task"
celery@celery.service
を再起動してください。
タスクを定期的に実行
Celery Beat を使うことでタスクを定期実行することができます。基本的なセットアップは Celery のドキュメントページ で説明されています。
celery.py
の中で CELERYBEAT_SCHEDULE
を指定したい場合、Celery がタスクを認識できるように app.conf
プレフィックスを追加する必要があります。その後 Celery デーモンを起動するときに --beat --schedule=/var/lib/celery/celerybeat-schedule
パラメータを追加してください。さらに、/var/lib/celery
ディレクトリが存在している必要があり所有者は celery を実行するユーザーでなければなりません。
トラブルシューティング
#systemd の chroot ユニットが特に問題を報告しないのに celery サービスが機能しない場合、コンソールから chroot の中の celery を起動して詳しくログを確認することができます。例:
# /usr/bin/chroot --userspec=root:root /srv/http/apps/celery /usr/bin/env -i HOME=/ /usr/bin/python -m celery worker -A package_name --uid=33 --gid=33 --pidfile=/run/celery.pid --logfile=/var/log/celery/celery.log --loglevel="INFO"