首頁>技術>

1.什麼是celery

celery是一個簡單,靈活、可靠的分散式任務執行框架,可以支援大量任務的併發執行。celery採用典型生產者和消費者模型。生產者提交任務到任務佇列,眾多消費者從任務佇列中取任務執行。

1.1 celery架構

Celery由以下三部分構成:訊息中介軟體(Broker)、任務執行單元Worker、結果儲存(Backend)

任務呼叫提交任務執行請求給Broker佇列如果是非同步任務,worker會立即從佇列中取出任務並執行,執行結果儲存在Backend中如果是定時任務,任務由Celery Beat程序週期性地將任務發往Broker佇列,Worker實時監視訊息佇列獲取佇列中的任務執行1.2 應用場景大量的長時間任務的非同步執行, 如上傳大檔案大規模實時任務執行,支援叢集部署,如支援高併發的機器學習推理定時任務執行,如定時傳送郵件,定時掃描機器執行情況2.安裝

celery安裝非常簡單, 除了安裝celery,本文中使用redis作為訊息佇列即Broker

# celery 安裝pip install celery# celery 監控 flowerpip install flowerpip install redis
# redis 安裝yum install redis# redis啟動redis-server /etc/redis.conf
3. 完整例子

celery的應用開發涉及四個部分

celery 例項初始化任務的定義(定時和實時任務)任務worker的啟動任務的呼叫3.1 專案目錄
# 專案目錄wedo.├── config.py├── __init__.py├── period_task.py└── tasks.py
3.2 celery 例項初始化

celery的例項化,主要包括執行Broker和backend的訪問方式,任務模組的申明等

# celery 例項初始化 # __init__.pyfrom celery import Celeryapp = Celery('wedo')  # 建立 Celery 例項app.config_from_object('wedo.config') # 配置 wedo.config# config.pyBROKER_URL = 'redis://10.8.238.2:6379/0' # Broker配置,使用Redis作為訊息中介軟體CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0' # BACKEND配置,這裡使用redisCELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間CELERY_TIMEZONE='Asia/Shanghai'   # 時區配置CELERY_IMPORTS = (     # 指定匯入的任務模組,可以指定多個    'wedo.tasks',    'wedo.period_task')
3.3 任務的定義

celery中通過@task的裝飾器來進行申明celery任務,其他操作無任何差別

# 任務的定義# 簡單任務  tasks.pyimport celeryimport timefrom celery.utils.log import get_task_loggerfrom wedo import app@app.taskdef sum(x, y):    return x + y@app.taskdef mul(x, y):    time.sleep(5)    return x * y

定時任務和實時任務的區別主要是要申明何時執行任務,任務本身也是通過task裝飾器來申明 何時執行任務有2種

指定頻率執行:sender.add_periodic_task(時間頻率單位s, 任務函式, name='to_string')crontab方式:分鐘/小時/天/月/周粒度, 可以支援多種排程
# 任務的定義# 定時任務  period_task.pyfrom wedo import appfrom celery.schedules import crontab@app.on_after_configure.connectdef setup_periodic_tasks(sender, **kwargs):    sender.add_periodic_task(5.0, to_string.s("celery peroid task"), name='to_string') # 每5秒執行add    sender.add_periodic_task(        crontab(minute='*/10'),      #每10分鐘執行一次        send_mail.s('hello, this is a celery'), name='send_mail'    )@app.taskdef send_mail(content):    print('send mail, content is %s' % content)@app.taskdef to_string(text):    return 'this is a %s' % text
3.4 任務worker的啟動

任務啟動分為worker啟動和定時任務beat啟動

# -A wedo為應用模組# -l為日誌level# -c 為程序數celery worker -A wedo  -l debug -c 4# 後臺啟動nohup celery worker -A wedo -l debug -c 4 > ./log.log  2>&1# 從下面的日誌可以看出啟動了4個任務#   . wedo.period_task.send_mail#   . wedo.period_task.to_string#   . wedo.tasks.mul#   . wedo.tasks.sum -------------- celery@localhost.localdomain v4.4.2 (cliffs)--- ***** ----- -- ******* ---- Linux-3.10.0-327.28.3.el7.x86_64-x86_64-with-centos-7.2.1511-Core 2020-04-25 23:35:26- *** --- * --- - ** ---------- [config]- ** ---------- .> app:         wedo:0x7f05af30d320- ** ---------- .> transport:   redis://10.8.238.2:6379/0- ** ---------- .> results:     redis://10.8.238.2:6379/0- *** --- * --- .> concurrency: 4 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** -----  -------------- [queues]                .> celery           exchange=celery(direct) key=celery                [tasks]  . celery.accumulate  . celery.backend_cleanup...  . wedo.period_task.send_mail  . wedo.period_task.to_string  . wedo.tasks.mul  . wedo.tasks.sum...[2020-04-25 23:35:27,617: INFO/MainProcess] celery@localhost.localdomain ready.[2020-04-25 23:35:27,617: DEBUG/MainProcess] basic.qos: prefetch_count->16[2020-04-25 23:35:27,655: DEBUG/MainProcess] celery@12103675 joined the party
celery beat -A wedo.period_taskcelery beat v4.4.2 (cliffs) is starting.__    -    ... __   -        _LocalTime -> 2020-04-25 23:37:08Configuration ->    . broker -> redis://10.8.238.2:6379/0    . loader -> celery.loaders.app.AppLoader    . scheduler -> celery.beat.PersistentScheduler    . db -> celerybeat-schedule    . logfile -> [stderr]@%WARNING    . maxinterval -> 5.00 minutes (300s)
# worker啟動是4個程序\\_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4        \\_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4    \\_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4    \\_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4    \\_  /root/anaconda3/envs/post/bin/celery worker -A wedo -l debug -c 4

worker和beat的停止

ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9ps auxww | awk '/celery beat/ {print $2}' | xargs kill -9
3.5 任務的呼叫

任務worker已經啟動好了,通過任務呼叫傳遞給broker(redis),並返回任務執行結果 任務呼叫主要有兩種,本質是一致的,delay是apply_async的封裝,apply_async可以支援更多的任務呼叫配置

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

apply_async和delay會返回一個非同步的任務結果,AsyncResult中儲存了任務的執行狀態和結果,常用的操作

value = result.get() # 任務返回值print(result.__dict__) # 結果資訊print(result.successful()) # 是否成功print(result.fail()) # 是否失敗print(result.ready()) # 是否執行完成print(result.state) # 狀態 PENDING -> STARTED -> SUCCESS/FAIL

常規任務:

from celery.utils.log import get_loggerfrom wedo.tasks import sum, mul, post_filefrom celery import group, chain, chordlogger = get_logger(__name__)try:    result = mul.apply_async(args=(2, 2))    value = result.get() # 等待任務執行完畢後,才會返回任務返回值    print(value)except mul.OperationalError as exc: # 任務異常處理    logger.exception('Sending task raised: %r', exc)

組合任務:

多個任務並行執行, group多個任務鏈式執行,chain:第一個任務的返回值作為第二個的輸入引數,以此類推
result = group(sum.s(i, i) for i in range(5))()result.get()# [0, 2, 4, 6, 8]result = chain(sum.s(1,2), sum.s(3), mul.s(3))()result.get()# ((1+2)+3)*3=18
4. 分散式叢集部署

celery作為分散式的任務佇列框架,worker是可以執行在不同的伺服器上的。部署過程和單機上啟動是一樣。只要把專案程式碼copy到其他伺服器,使用相同命令就可以了。可以思考下,這個是怎麼實現的?對了,就是通過共享Broker佇列。使用合適的佇列,如redis,單程序單執行緒的方式可以有效的避免同個任務被不同worker同時執行的情況。

5. 進階使用

在前面已經了解了celery的主要的功能了。celery還為一些特別的場景提供了需要擴充套件的功能

5.1 任務狀態跟蹤和日誌

有時候我們需要對任務的執行情況做一些監控,比如失敗後報警通知。

celery在裝飾器@app.task中提供了base引數,傳入重寫的Task模組,重新on_*函式就可以控制不同的任務結果在@app.task提供bind=True,可以通過self獲取Task中各種引數self.request:任務的各種引數self.update_state: 自定義任務狀態, 原有的任務狀態:PENDING -> STARTED -> SUCCESS, 如果你想了解STARTED -> SUCCESS之間的一個狀態,比如執行的百分比之類,可以通過自定義狀態來實現self.retry: 重試
import celeryimport timefrom celery.utils.log import get_task_loggerfrom wedo import applogger = logger = get_task_logger(__name__)class TaskMonitor(celery.Task):    def on_failure(self, exc, task_id, args, kwargs, einfo):        """failed callback"""        logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))    def on_success(self, retval, task_id, args, kwargs):        """success callback"""        logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))    def on_retry(self, exc, task_id, args, kwargs, einfo):        """retry callback"""        logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))@app.task(base=TaskMonitor, bind=True, name='post_file')def post_file(self, file_names):    logger.info(self.request.__dict__)    try:        for i, file in enumerate(file_names):            print('the file %s is posted' % file)            if not self.request.called_directly:                self.update_state(state='PROGRESS',                    meta={'current': i, 'total': len(file_names)})            time.sleep(2)    except Exception as exec:        raise self.retry(exc=exec, countdown=3, max_retries=5)
5.2 任務指定特定的worker執行

celery做為支援分散式,理論上可以無限擴充套件worker。預設情況下celery提交任務後,任務會放入名為celery的佇列,所有線上的worker都會從任務佇列中獲取任務,任一個worker都有可能執行這個任務。有時候,有時候任務的特殊性或者機器本身的限制,某些任務只能跑在某些worker上。celery提供了queue在區別不同的worker,很好的支援這種情況。

啟動worker時,-Q 指定worker支援的任務列隊名, 可以支援多個佇列名哦
celery worker -A wedo  -l debug -c 4 -Q celery,hipri
任務呼叫時, queue=*來指定需要執行worker
result = mul.apply_async(args=(2, 2), queue='hipri')
6. 任務佇列監控

如果你想通過視覺化的方式,檢視celery的一切。flower提供可行的解決方案,十分的方便

celery為分散式佇列, 通過訊息佇列連線任務提交和執行者worker, 鬆耦合模式,可擴充套件celery訊息佇列建議為rediscelery通過@app.task裝飾把普通任務變成celery Taskcelery worker 通過不同queue支援特定的worker消費特定的任務@app.task中可以同步base和bind引數獲取更過的控制任務生命週期flower監控celery全過程celery doc:https://docs.celeryproject.org/en/master/getting-started/index.html

感謝大家支援!!!

最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 跨平臺解決方案(Flutter)實現效果原始碼,快學習吧