阿布云

你所需要的,不仅仅是一个好用的代理。

wechat-admin:Celery使用篇

阿布云 发表于

11.png

Celery是一个专注于实时处理和任务调度分布式任务队列。通过RabbitMQ、Redis、MongoDB等消息代理,把任务发给执行任务的Worker以达到异步执行。

我写的那本《Python Web开发实战》的样章就是 《使用Celery》 ,建议看下面内容之前先读一下这篇文章。

接下来的内容假设你已经对Celery有了一定的了解。对 wechat-admin 项目来说,使用Celery要做如下事情:

  1. 更新项目数据库中的联系人、群聊和公众号等相关内容
  2. 监听wxpy进程,处理自动加群、接受消息、踢人以及各种插件功能等
  3. 自动重启上述的监听进程
  4. 发送新消息数量提醒

首先我们创建一个目录(wechat),专门用来存放celery任务相关的内容,目录下文件列表如下:

❯ tree wechat

wechat

├── __init__.py

├── celery.py # 名为celery.py是主程序,启动的时候可以直接`celery -A wechat worker -l info -B`

├── celeryconfig.py # 配置文件

└── tasks.py # 存放任务逻辑

 

0 directories, 4 files

我们挨个看看

celeryconfig.py

看文件名字就知道了,这个是放配置的文件:

❯ cat celeryconfig.py

from config import REDIS_URL

BROKER_URL = REDIS_URL

CELERY_RESULT_BACKEND = REDIS_URL

CELERY_TASK_SERIALIZER = 'msgpack'

CELERY_RESULT_SERIALIZER = 'json'

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

CELERY_ACCEPT_CONTENT = ['json', 'msgpack']

指定消息代理和执行结果都使用Redis,任务(消息)使用msgpack序列化,结果使用json序列化,任务结果保存时间24小时等

celery.py

主程序有点Flask的app.py的感觉:

❯ cat celery.py

from celery import Celery

from celery.signals import worker_ready

 

from models.redis import db, LISTENER_TASK_KEY

 

app = Celery('wechat', include=['wechat.tasks'])

app.config_from_object('wechat.celeryconfig')

 

@worker_ready.connect

def at_start(sender, **k):

       with sender.app.connection() as conn: # noqa

             task_id = sender.app.send_task('wechat.tasks.listener')

             db.set(LISTENER_TASK_KEY, task_id)

 

if __name__ == '__main__':

       app.start()

这段代码有2点需要解释一下:

  1. 调用send_task会返回任务id,存在LISTENER_TASK_KEY里面用于未来重启时直接通过这个任务id
  2. 使用了Celery的信号系统,listener这个异步任务需要在worker启动之后就运行,使用worker_ready这个信号就可以。

tasks.py

tasks.py这个文件包含了很多业务逻辑,为了演示我省略部分代码。不过代码还是很长,所以我直接在对应行数的代码上加注释来解释了:

❯ cat tasks.py

from datetime import timedelta

from celery.task import periodic_task

from celery.task.control import revoke

 

from wechat.celery import app

from wxpy.exceptions import ResponseError

from itchat.signals import logged_out

 

def restart_listener(sender, **kw):

      # 重启tasks.listener这个任务

      task_id = r.get(LISTENER_TASK_KEY)

      if task_id:

          revoke(str(task_id, 'utf-8'))

      task_id = app.send_task('wechat.tasks.listener')

      r.set(LISTENER_TASK_KEY, task_id)

 

logged_out.connect(restart_listener)

 

from wxpy.signals import stopped

from libs.wx import get_bot

from views.api import json_api

from models.redis import db as r, LISTENER_TASK_KEY

from app import app as sse_api

 

stopped.connect(restart_listener)

bot = get_bot()

 

def _retrieve_data(update=False):

      _update_contact(bot, update)

      _update_group(bot, update)

      _update_mp(bot, update)

 

@app.task

def listener():

      # 不用全局的bot,因为在import listener的过程中会

      # 注册各种函数(处理自动加群、接受消息、踢人以及各种插件功能)

      from libs.listener import bot

      with json_api.app_context():

           bot.join()

 

@app.task

def retrieve_data():

      # 使用Flask应用中的方法都需要放在对应的上下文内

      with json_api.app_context():

           _retrieve_data(True)

 

@app.task

def update_contact(update=False):

       # 都是业务逻辑,就省略了,这样分开写是可以单独的更新一种类型的数据

       ...

 

@app.task

def update_group(update=False):

       ...

 

@app.task

def update_mp(update=False):

       ...

 

# periodic_task就是定时任务,表示周期性的执行某任务

@periodic_task(run_every=timedelta(seconds=60), time_limit=5)

def send_notify():

       # 发送新消息数量提醒

       ...

 

上一篇我说SSE的时候忘说了一点,就是更新消息提醒。在Web页面标记已读的时候,会POST到/readall接口,后端清空新通知数量。这是由于SSE的单向特点造成的,如果使用socketio(WebSocket)的话可以直接emit到后端,就不用HTTP这种方案了