Python Celery

提供: ArchWiki
ナビゲーションに移動 検索に移動

プロジェクトウェブサイト より:

Celery は分散されたメッセージ受け渡しに基づく非同期のタスクキュー・ジョブキューです。リアルタイム処理に焦点が当てられていますが、スケジューリングもサポートしています。タスクは非同期 (バックグラウンド) または同期的 (準備ができるまで待機) に実行できます。

インストール

python-celery パッケージをインストールしてください。他の python パッケージと同じように Python 3.x と互換性のあるパッケージが入手できます。Python 2.x と互換性のあるパッケージが必要な場合は python2-celery[リンク切れ: アーカイブ: aur-mirror] をインストールしてください。

Celery のドキュメント より: "Celery はメッセージを送信・受信するための手段を必要とします"。選択肢のひとつとして RabbitMQ が存在し、公式リポジトリからインストールできます。

設定

Celery

設定ファイルを保存するためのディレクトリ /etc/celery/ を作成する必要があります。サンプル設定ファイルが Celery のドキュメント に記載されています。

celery@celery.service起動有効化してください。

RabbitMQ

RabbitMQ は設定を /etc/rabbitmq/rabbitmq-env.conf に保存します。

デフォルト設定:

NODENAME=rabbit@rakieta
NODE_IP_ADDRESS=0.0.0.0
NODE_PORT=5672

LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia

RabbitMQ は Unix ソケットをサポートしていないため 0.0.0.0127.0.0.1 で置き換えると良いでしょう。

設定をシンプルにするために HOME=/var/lib/rabbitmq も追加すると良いでしょう。環境変数について詳しくは RabbitMQ のドキュメント を参照してください。

rabbitmq.service起動有効化してください。

ノート: rabbitmq-service は rabbitmq ユーザーとして実行され、ホームフォルダは /var/lib/rabbitmq の中に保存されます。フォルダとサブフォルダの所有者が rabbitmq ユーザーになっていることを確認してください。

RabbitMQ のドキュメント に従ってユーザーとバーチャルホストを追加:

$ cd /var/lib/rabbitmq
$ su rabbitmq -c 'rabbitmqctl add_user myuser mypassword'
$ su rabbitmq -c 'rabbitmqctl add_vhost myvhost'
$ su rabbitmq -c 'rabbitmqctl set_user_tags myuser mytag'
$ su rabbitmq -c 'rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"'

詳しくは RabbitMQ の管理者ガイド を読んでください。

su rabbitmq -c "rabbitmqctl status" を実行したときに badrpc,nodedown となる場合、問題の解決方法が こちらの記事 に載っています。

ノート: su rabbitmq -c "erl" を実行することで erlang のプロンプトをエラーなく表示できます。

セキュリティ

Celery ドキュメント のセキュリティセクションを読んでください。

サンプルタスク

Celery アプリケーション

Celery ドキュメント に従って python のサンプルタスクを作成:

test.py
from celery import Celery
    
    app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost')
    
    @app.task
    def add(x, y):
        return x + y

amqp://myuser:mypassword@localhost:5672/myvhost は RabbitMQ を設定したときに作成したユーザー・バーチャルホストと同じにしてください。

Celery によって使われるデフォルトのブローカーは RabbitMQ であるため backend='amqp' パラメータは任意です。

テスト

test.py のあるディレクトリで以下のコマンドを実行:

$ celery -A task worker --loglevel=info

そして (同一ディレクトリに) 以下のファイルを作成:

call.py
from test import add
    
    add.delay(4, 4)

実行:

$ python call.py

コンソールからワーカーの情報が出力されます:

Received task: task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e]
Task task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] succeeded in 0.0007182330009527504s: 8

Celery サービスのモジュールの準備

Celery ドキュメント に書かれていることとは少し異なる手順で説明しています。

test_task モジュールを作成:

# mkdir /lib/python3.5/site-packages/test_task
# touch /lib/python3.5/site-packages/test_task/__init__.py
# touch /lib/python3.5/site-packages/test_task/test_task.py
# touch /lib/python3.5/site-packages/test_task/celery.py
/lib/python3.5/site-packages/test_task/celery.py
from __future__ import absolute_import

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost')

if __name__ == '__main__':
 app.start()
/lib/python3.5/site-packages/test_task/test_task.py
from __future__ import absolute_import

from test_task.celery import app

@app.task
def add(x, y):
 return x + y

コンソールで python を起動して以下のコマンドを実行できるはずです:

>>> from test_task import celery

/etc/celery/celery.conf の中の以下の行を:

CELERY_APP="proj"

以下のように置き換えてください:

CELERY_APP="test_task"

celery@celery.service再起動してください。

タスクを定期的に実行

Celery Beat を使うことでタスクを定期実行することができます。基本的なセットアップは Celery のドキュメントページ で説明されています。

celery.py の中で CELERYBEAT_SCHEDULE を指定したい場合、Celery がタスクを認識できるように app.conf プレフィックスを追加する必要があります。その後 Celery デーモンを起動するときに --beat --schedule=/var/lib/celery/celerybeat-schedule パラメータを追加してください。さらに、/var/lib/celery ディレクトリが存在している必要があり所有者は celery を実行するユーザーでなければなりません。

トラブルシューティング

#systemd の chroot ユニットが特に問題を報告しないのに celery サービスが機能しない場合、コンソールから chroot の中の celery を起動して詳しくログを確認することができます。例:

# /usr/bin/chroot --userspec=root:root /srv/http/apps/celery /usr/bin/env -i HOME=/ /usr/bin/python -m celery worker -A package_name --uid=33 --gid=33 --pidfile=/run/celery.pid --logfile=/var/log/celery/celery.log --loglevel="INFO"