芹菜错误:result.get超时

我已经安装了芹菜,我试图用芹菜的第一步文件进行testing。

我尝试使用Redis和RabbitMQ作为中间商和后端,但我无法得到结果:

result.get(timeout = 10) 

每一次,我得到这个错误:

  Traceback (most recent call last): File "<input>", line 11, in <module> File "/home/mehdi/.virtualenvs/python3/lib/python3.4/site-packages/celery/result.py", line 169, in get no_ack=no_ack, File "/home/mehdi/.virtualenvs/python3/lib/python3.4/site-packages/celery/backends/base.py", line 225, in wait_for raise TimeoutError('The operation timed out.') celery.exceptions.TimeoutError: The operation timed out. 

经纪人部分似乎工作得很好:当我运行这个代码

 from celery import Celery app = Celery('tasks', backend='redis://localhost/', broker='amqp://') @app.task def add(x, y): return x + y result = add.delay(4,4) 

我得到(如预期)

[2015-08-04 12:05:44,910:INFO / MainProcess]收到的任务:tasks.add [741160b8-cb7b-4e63-93c3-f5e43f8f8a02]

[2015-08-04 12:05:44,911:INFO / MainProcess]任务tasks.add [741160b8-cb7b-4e63-93c3-f5e43f8f8a02]成功在0.0004287530000510742s:8

PS:我正在使用Xubuntu 64位

编辑:

我的app.conf

 {'CELERY_RESULT_DB_TABLENAMES': None, 'BROKER_TRANSPORT_OPTIONS': {}, 'BROKER_USE_SSL': False, 'CELERY_BROADCAST_QUEUE': 'celeryctl', 'EMAIL_USE_TLS': False, 'CELERY_STORE_ERRORS_EVEN_IF_IGNORED': False, 'CELERY_CREATE_MISSING_QUEUES': True, 'CELERY_DEFAULT_QUEUE': 'celery', 'CELERY_SEND_TASK_SENT_EVENT': False, 'CELERYD_TASK_TIME_LIMIT': None, 'BROKER_URL': 'amqp://', 'CELERY_EVENT_QUEUE_EXPIRES': None, 'CELERY_DEFAULT_EXCHANGE_TYPE': 'direct', 'CELERYBEAT_SCHEDULER': 'celery.beat:PersistentScheduler', 'CELERY_MAX_CACHED_RESULTS': 100, 'CELERY_RESULT_PERSISTENT': None, 'CELERYD_POOL': 'prefork', 'CELERYD_AGENT': None, 'EMAIL_HOST': 'localhost', 'CELERY_CACHE_BACKEND_OPTIONS': {}, 'BROKER_HEARTBEAT': None, 'CELERY_RESULT_ENGINE_OPTIONS': None, 'CELERY_RESULT_SERIALIZER': 'pickle', 'CELERYBEAT_SCHEDULE_FILENAME': 'celerybeat-schedule', 'CELERY_REDIRECT_STDOUTS_LEVEL': 'WARNING', 'CELERY_IMPORTS': (), 'SERVER_EMAIL': 'celery@localhost', 'CELERYD_TASK_LOG_FORMAT': '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s', 'CELERY_SECURITY_CERTIFICATE': None, 'CELERYD_LOG_COLOR': None, 'CELERY_RESULT_EXCHANGE': 'celeryresults', 'CELERY_TRACK_STARTED': False, 'CELERY_REDIS_PASSWORD': None, 'BROKER_USER': None, 'CELERY_COUCHBASE_BACKEND_SETTINGS': None, 'CELERY_RESULT_EXCHANGE_TYPE': 'direct', 'CELERY_REDIS_DB': None, 'CELERYD_TIMER_PRECISION': 1.0, 'CELERY_REDIS_PORT': None, 'BROKER_TRANSPORT': None, 'CELERYMON_LOG_FILE': None, 'CELERYD_CONCURRENCY': 0, 'CELERYD_HIJACK_ROOT_LOGGER': True, 'BROKER_VHOST': None, 'CELERY_DEFAULT_EXCHANGE': 'celery', 'CELERY_DEFAULT_ROUTING_KEY': 'celery', 'CELERY_ALWAYS_EAGER': False, 'EMAIL_TIMEOUT': 2, 'CELERYD_TASK_SOFT_TIME_LIMIT': None, 'CELERY_WORKER_DIRECT': False, 'CELERY_REDIS_HOST': None, 'CELERY_QUEUE_HA_POLICY': None, 'BROKER_PORT': None, 'CELERYD_AUTORELOADER': 'celery.worker.autoreload:Autoreloader', 'BROKER_CONNECTION_TIMEOUT': 4, 'CELERY_ENABLE_REMOTE_CONTROL': True, 'CELERY_RESULT_DB_SHORT_LIVED_SESSIONS': False, 'CELERY_EVENT_SERIALIZER': 'json', 'CASSANDRA_DETAILED_MODE': False, 'CELERY_REDIS_MAX_CONNECTIONS': None, 'CELERY_CACHE_BACKEND': None, 'CELERYD_PREFETCH_MULTIPLIER': 4, 'BROKER_PASSWORD': None, 'CELERY_BROADCAST_EXCHANGE_TYPE': 'fanout', 'CELERY_EAGER_PROPAGATES_EXCEPTIONS': False, 'CELERY_IGNORE_RESULT': False, 'CASSANDRA_KEYSPACE': None, 'EMAIL_HOST_PASSWORD': None, 'CELERYMON_LOG_LEVEL': 'INFO', 'CELERY_DISABLE_RATE_LIMITS': False, 'CELERY_TASK_PUBLISH_RETRY_POLICY': {'interval_start': 0, 'interval_max': 1, 'max_retries': 3, 'interval_step': 0.2}, 'CELERY_SECURITY_KEY': None, 'CELERY_MONGODB_BACKEND_SETTINGS': None, 'CELERY_DEFAULT_RATE_LIMIT': None, 'CELERYBEAT_SYNC_EVERY': 0, 'CELERY_EVENT_QUEUE_TTL': None, 'CELERYD_POOL_PUTLOCKS': True, 'CELERY_TASK_SERIALIZER': 'pickle', 'CELERYD_WORKER_LOST_WAIT': 10.0, 'CASSANDRA_SERVERS': None, 'CELERYD_POOL_RESTARTS': False, 'CELERY_TASK_PUBLISH_RETRY': True, 'CELERY_ENABLE_UTC': True, 'CELERY_SEND_EVENTS': False, 'BROKER_CONNECTION_MAX_RETRIES': 100, 'CELERYD_LOG_FILE': None, 'CELERYD_FORCE_EXECV': False, 'CELERY_CHORD_PROPAGATES': True, 'CELERYD_AUTOSCALER': 'celery.worker.autoscale:Autoscaler', 'CELERYD_STATE_DB': None, 'CELERY_ROUTES': None, 'CELERYD_TIMER': None, 'ADMINS': (), 'BROKER_HEARTBEAT_CHECKRATE': 3.0, 'CELERY_ACCEPT_CONTENT': ['json', 'pickle', 'msgpack', 'yaml'], 'BROKER_LOGIN_METHOD': None, 'BROKER_CONNECTION_RETRY': True, 'CELERY_TIMEZONE': None, 'CASSANDRA_WRITE_CONSISTENCY': None, 'CELERYBEAT_MAX_LOOP_INTERVAL': 0, 'CELERYD_LOG_LEVEL': 'WARN', 'CELERY_REDIRECT_STDOUTS': True, 'BROKER_POOL_LIMIT': 10, 'CELERY_SECURITY_CERT_STORE': None, 'CELERYD_CONSUMER': 'celery.worker.consumer:Consumer', 'CELERY_INCLUDE': (), 'CELERYD_MAX_TASKS_PER_CHILD': None, 'CELERYD_LOG_FORMAT': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s', 'CELERY_ANNOTATIONS': None, 'CELERY_MESSAGE_COMPRESSION': None, 'CASSANDRA_READ_CONSISTENCY': None, 'EMAIL_USE_SSL': False, 'CELERY_SEND_TASK_ERROR_EMAILS': False, 'CELERY_QUEUES': None, 'CELERY_ACKS_LATE': False, 'CELERYMON_LOG_FORMAT': '[%(asctime)s: %(levelname)s] %(message)s', 'CELERY_TASK_RESULT_EXPIRES': datetime.timedelta(1), 'BROKER_HOST': None, 'EMAIL_PORT': 25, 'BROKER_FAILOVER_STRATEGY': None, 'CELERY_RESULT_BACKEND': 'rpc://', 'CELERY_BROADCAST_EXCHANGE': 'celeryctl', 'CELERYBEAT_LOG_FILE': None, 'CELERYBEAT_SCHEDULE': {}, 'CELERY_RESULT_DBURI': None, 'CELERY_DEFAULT_DELIVERY_MODE': 2, 'CELERYBEAT_LOG_LEVEL': 'INFO', 'CASSANDRA_COLUMN_FAMILY': None, 'EMAIL_HOST_USER': None} 

最后使用这个项目布局

 proj/celery_proj/__init__.py /celery.py /tasks.py /test.py 

哪里

celery.py

 from __future__ import absolute_import from celery import Celery app = Celery('celery_proj', broker='amqp://', backend='amqp://', include=['celery_proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( CELERY_TASK_RESULT_EXPIRES=3600, ) if __name__ == '__main__': app.start() 

tasks.py

 from __future__ import absolute_import from celery_proj.celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers) 

test.py

 __author__ = 'mehdi' path = '/home/mehdi/PycharmProjects' import sys sys.path.append(path) from celery_proj.tasks import add r = add.delay(4,4) print(r.status) print(r.result) 

启动工作人员:

 cd proj celery -A celery_proj worker -l info 

然后运行test.py:

 python test.py 

“后端”配置不能作为可选参数传递给Celery对象,而应该通过python配置参数CELERY_RESULT_BACKEND传递(请参阅https://github.com/celery/celery/issues/2146 )。

所以(来自Celery教程的)tasks.py应该是这样的:

 from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//') app.config_from_object('celeryconfig') @app.task def add(x, y): print '[' + str(x) + '][' + str(y) + ']=' + str(x+y) return x + y 

使用以下内容在与tasks.py相同的目录中创建一个文件celeryconfig.py:

 CELERY_RESULT_BACKEND='amqp://' 

修改任务之后,需要重新启动芹菜重新读取更改。

使用app.backend.get_result(result.id)来代替AsyncResult.get()因为AsyncResult.get()将阻塞,直到任务状态变为就绪,但是任务已经运行完成