Python RQ 任务队列中的工作进程 ( 二 )

yufei       4 年, 2 月 前       1408

在上一章节 Python RQ 任务队列中的工作进程 ( 一 ) 章节中我们讲述了 RQ 工作进程的日常使用的一些知识

本章节我们我们来做一些深入了解,也借此机会一窥工作进程的全貌

工作进程

默认情况下,rq worker shell 脚本是一个简单的 fetch-fork-execute 循环

如果我们的作业任务需要进行冗长的设置,或者都依赖于同一组模块时,你每次运行一个作业任务时就要消耗大量的时间。因为这些配置或模块只有在 fork 之后才进行了导入

从某些方面说,这很干净,因为 RQ 不会以这种方式泄漏内存

但也会变慢

可用于提高这类作业的吞吐量性能的方式是 fork 之前导入必要的模块

但 RQ 并没有提供相关的选项用于配置此设置,不过,我们可以自定义脚本,并在开始工作循环之前自己完成此操作

为此,我们需要提供自己的工作脚本 ( 而不是使用 rq worker )

一个简单的实现范例

#!/usr/bin/env python
import sys
from rq import Connection, Worker

# 预先载入的模块
import library_that_you_want_preloaded

# 使用一个队列名作为参数来来监听该队列
# 类似于 rq worker
with Connection():
    qs = sys.argv[1:] or ['default']

    w = Worker(qs)
    w.work()

工作进程名称

工作进程在注册到系统时需要使用一个名字

默认情况下,这个名字由 rq:worker: 、当前的主机名和当前的进程名组成,例如

rq:worker:lie.16080

当然了,我们可以自定义这个名称,只需要在启动工作进程时使用 --name 选项即可

rq worker --name rq:worker:lie:1

获取工作进程信息

这一特性在 0.10.0 版本中被实现

工作进程会将它们的运行时信息存储在 Redis 中,我们可以使用下面的代码来获取这些信息

from redis import Redis
from rq import Queue, Worker

# 返回该连接中注册的所有工作进程
redis = Redis()
workers = Worker.all(connection=redis)

# 返回监听某个队列的所有工作进程 ( 版本 0.10.0 中新增 )
queue = Queue('queue_name')
workers = Worker.all(queue=queue)

处于监控目的,如果我们只想知道有多少工作进程,使用 Worker.count() 函数会更高效

from redis import Redis
from rq import Worker

redis = Redis()

# 统计该 Redis 连接中注册的所有工作进程
workers = Worker.count(connection=redis)

# 统计监听某个队列的所有工作进程
queue = Queue('queue_name', connection=redis)
workers = Worker.all(queue=queue)

工作进程统计信息

这一特性在 0.9.0 中被实现

为了检查队列的利用率,工作进程会存储一些有用的信息

from rq.worker import Worker

worker = Worker.find_by_key('rq:worker:name')

worker.successful_job_count  # 成功执行的作业任务数
worker.failed_job_count. # 当前工作进程执行失败的作业任务数
worker.total_working_time  # 执行作业任务的总耗时

下线工作进程

无论何时,只要工作进程接收到 SIGINT 信号 ( 通过 Ctrl + C ) 或 SIGTERM 信号 ( 通过 kill ) ,则工作进程会等待当前正在运行的任务完成,然后停止循环并优雅地注册自己的死亡

如果在此下线阶段,工作进程再次收到 SIGINTSIGTERM ,工作进程将强制终止子进程 ( 发送 SIGKILL ) ,但仍会尝试注册自己的死亡

也就是俗称两次 CTRL + C 退出进程

使用配置文件

这一特性在 0.3.2 版本中被实现

如果我们想通过配置文件而不是通过命令行参数配置 rq worker

可以通过创建类似 settings.pyPython 文件来完成此操作

REDIS_URL = 'redis://localhost:6379/1'

# You can also specify the Redis DB to use
# REDIS_HOST = 'redis.example.com'
# REDIS_PORT = 6380
# REDIS_DB = 3
# REDIS_PASSWORD = 'very secret'

# Queues to listen on
QUEUES = ['high', 'normal', 'low']

# If you're using Sentry to collect your runtime exceptions, you can use this
# to configure RQ for it in a single step
# The 'sync+' prefix is required for raven: https://github.com/nvie/rq/issues/350#issuecomment-43592410
SENTRY_DSN = 'sync+http://public:secret@example.com/1'

这个配置文件显示了当前支持的所有选项

需要注意的是 QUEUESREDIS_PASSWORD 设置从 0.3.3 版本开始才被支持

创建完配置文件后,我们就可以使用 -c 参数来指定工作进程启动时使用的配置文件

$ rq worker -c settings

自定义工作进程类

这一特性在 0.4.0 版本中被实现

比如有时候我们需要自定义工作进程的行为

到目前为止,需要这么做的情况一般是

  1. 在运行作业任务前管理数据库连接
  2. 使用自定义的作业执行模型,而不是使用 os.fork
  3. 使用不同的并发模型,例如 multiprocessinggevent

不管处于何种目的,我们都可以通过 -w 选项来自定义工作进程类

$ rq worker -w 'path.to.GeventWorker'

自定义作业任务类和队列类

这是一个还未实现的特性,据说下一个版本会实现

简单来说,就是在启动 rq worker 时通过选项 --job-class 和/或 --queue-class 告诉 worker 使用自定义的作业类和队列类

$ rq worker --job-class 'custom.JobClass' --queue-class 'custom.QueueClass'

当然,如果使用了自定义的作业类和队列类,那么在将作业任务加入队列时也必须使用自定义的类

例如

from rq import Queue
from rq.job import Job

class CustomJob(Job):
    pass

class CustomQueue(Queue):
    job_class = CustomJob

queue = CustomQueue('default', connection=redis_conn)
queue.enqueue(some_func)

自定义异常处理器

上一章节提到工作进程启动参数的时候,我们说过可以使用 --exception-handler 选项来自定义异常处理器

这个特性在 0.5.5 版本中被添加

如果我们需要针对不同类型的作业以不同方式处理错误,或者只是想要自定义 RQ 的默认错误处理行为,可以使用 --exception-handler 选项运行 rq worker

$ rq worker --exception-handler 'path.to.my.ErrorHandler'

RQ 还支持多个异常处理器,使用方式也简单,就是多次使用 --exception-handler

$ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'
目前尚无回复
简单教程 = 简单教程,简单编程
简单教程 是一个关于技术和学习的地方
现在注册
已注册用户请 登入
关于   |   FAQ   |   我们的愿景   |   广告投放   |  博客

  简单教程,简单编程 - IT 入门首选站

Copyright © 2013-2022 简单教程 twle.cn All Rights Reserved.