Python RQ 的运行流程

yufei       4 年, 2 月 前       2747

在文章 Python RQ 背后的机制 中我们详细介绍了 RQ 如何将数据数据保存到 Redis

我们知道如果使用下面的代码则会创建一个默认的队列

q = Queue(connection=Redis())

队列在 Redis 中的键为 rq:queue:default 是一个列表类型

但我们在继续介绍 RQ 中的队列之前,我们先来简单了解下 RQ 中的作业任务

RQ 作业任务 ( job )

RQ 作业任务 ( job ) 是一个可执行的 Python 对象,是一个被处于后台的工作进程( works ) 异步调用的函数

RQ 中只需将对函数及其参数加入到队列中,就可以异步调用任何 Python 函数,这种操作也称之为 入队

创建作业任务

因为 RQ 中的作业任务是一个可执行的对象,例如函数和实现了 __call__ 方法的类的实例,所以创建一个作业任务和创建一个普通函数没啥区别

task.py

import requests

def length_of_url(url):
    resp = requests.get(url)
    return len(resp.text)

Python RQ 背后的机制 章节中我们知道作业任务的执行结果有好几种状态,执行成功 ( finished) 和执行失败 (failed)

正常情况下,不管这个可调用的对象有没有返回值,都认为是执行成功的

但如果执行中抛出了异常,任何异常,都会被视为执行失败

注意

作业任务所在的文件有一个特殊要求,就是不能处于 __main__ 入口文件中,否则会报错

比如下面的代码

from redis import Redis
from rq import Queue

import requests

def length_of_url(url):
    resp = requests.get(url)
    return len(resp.text)


q = Queue(connection=Redis())

rs = q.enqueue(
          length_of_url, 'https://www.twle.cn')

print(rs)

运行结果会报错

$ python3 task.py                                
Traceback (most recent call last):
  File "d.py", line 14, in <module>
    count_words_at_url, 'https://www.twle.cn')
  File "/Users/yufei/devops/python/demo/lib/python3.7/site-packages/rq/queue.py", line 279, in enqueue
    raise ValueError('Functions from the __main__ module cannot be processed '
ValueError: Functions from the __main__ module cannot be processed by workers
(demo)

作业任务入队

刚刚我们已经创建了一个作业任务 length_of_url(url),现在,是时候把作业任务和所需参数加入到任务队列中了

将一个作业任务入队的方式很简单,短短几行代码就解决了

main.py

from rq import Queue
from redis import Redis

import task

# 告诉 RQ 使用那个 Redis 服务
redis_conn = Redis()
q = Queue(connection=redis_conn)  # 没有传递任何参数则表示使用默认队列

# 延时异步执行 length_of_url('https://www.twle.cn/')
job = q.enqueue(task.length_of_url, 'https://www.twle.cn/')
print(job.result)   # => None

# 然后,我们等待个几秒,直到作业任务被执行完成,就可以查看执行的结果
time.sleep(2)
print(job.result)   # => 41039

打开一个 shell 并跳转到当前目录下,运行下面的命令开启后台工作进程

$ rq worker
10:17:57 RQ worker 'rq:worker:lie.16080' started, version 0.11.0
10:17:57 *** Listening on default...
10:17:57 Cleaning registries for queue: default

启动过程没啥亮点,唯一值得注意的就是 rq:worker:lie.16080 ,这是当前工作进程的 ID

然后打开一个新的 shell,跳转到当前目录下,运行 python main.py 查看结果

$ python main.py
None
41039

作业任务执行结果

回到 rq worker 的 shell,可以看到输出结果如下

$ rq worker
10:17:57 RQ worker 'rq:worker:lie.16080' started, version 0.11.0
10:17:57 *** Listening on default...
10:17:57 Cleaning registries for queue: default
10:20:03 default: task.length_of_url('https://www.twle.cn/') (c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36)
10:20:04 default: Job OK (c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36)
10:20:04 Result is kept for 500 seconds

日志输出没什么,但可以看到一个 500 数字,这个表示执行结果保存的时间,500 秒之后就会失效

如果我们在 redis-cli 中使用 keys * 命令,可以看到输出结果如下

127.0.0.1:6379> keys *
1) "rq:worker:lie.16080"
2) "rq:workers"
3) "rq:workers:default"
4) "rq:job:c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36"
5) "rq:finished:default"
6) "rq:queues"

可以看到多了几个键,分别是 rq:worker:lie.16080rq:workersrq:workers:default 还有 rq:finished:default

  1. rq:finished:default

    rq:finished:default 用于保存默认队列的执行结果,是一个 zset 类型

    127.0.0.1:6379> type rq:finished:default
    zset
    

    保存的是执行成功的作业任务的 ID

    127.0.0.1:6379> zrange rq:finished:default 0 -1
    1) "c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36"
    
  2. rq:job:c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36

    既然任务已经执行完成,我们就看看执行完成的任务是啥样的

     127.0.0.1:6379> hgetall rq:job:c7f1b4bd-13f1-4c38-98ae-e6754d2b3d36
     1) "description"
     2) "task.length_of_url('https://www.twle.cn/')"
     3) "timeout"
     4) "180"
     5) "started_at"
     6) "2018-07-13T02:29:10.524589Z"
     7) "enqueued_at"
     8) "2018-07-13T02:29:10.510681Z"
     9) "created_at"
    10) "2018-07-13T02:29:10.510287Z"
    11) "result"
    12) "\x80\x04\x95\x04\x00\x00\x00\x00\x00\x00\x00MO\xa0."
    13) "origin"
    14) "default"
    15) "ended_at"
    16) "2018-07-13T02:29:10.926991Z"
    17) "status"
    18) "finished"
    19) "data"
    20) "x\x9ck`\x99j\xca\x00\x01\x1a=B%\x89\xc5\xd9z9\xa9y\xe9%\x19\xf1\xf9i\xf1\xa5E9S\xfczD2JJ\n\x8a\xad\xf4\xf5\xcb\xcb\xcb\xf5J\xcasR\xf5\x92\xf3\xf4\xa7\xb4N\xa9\x9dR2E\x0f\x00r\xe7\x16\x1c"
    

    可以看到,新增加了几个键值对,同时 status 键的值已经改成了 finished

    说明
    started_at 2018-07-13T02:29:10.524589Z 开始执行任务的时间
    ended_at 2018-07-13T02:29:10.926991Z 结束执行任务的时间
    result "\x80\x04\x95\x04\x00\x00\x00\x00\x00\x00\x00MO\xa0." 执行的结果
  3. rq:workersrq:workers:default

    rq:workersrq:workers:default 不细说了,跟 rq:queuerq:queue:default 是一样的

  4. rq:worker:lie.16080

    rq:worker:lie.16080 是我们的后台工作进程的 ID,而这个键保存的是这个工作进程的状态

    127.0.0.1:6379> type rq:worker:lie.16080
    hash
    127.0.0.1:6379> hgetall rq:worker:lie.16080
     1) "birth"
     2) "2018-07-13T02:17:57.213198Z"
     3) "last_heartbeat"
     4) "2018-07-13T02:29:10.937285Z"
     5) "queues"
     6) "default"
     7) "state"
     8) "idle"
     9) "successful_job_count"
    10) "3"
    11) "total_working_time"
    12) "1421007"
    

    键值对一看就懂,就不细说了

目前尚无回复
简单教程 = 简单教程,简单编程
简单教程 是一个关于技术和学习的地方
现在注册
已注册用户请 登入
关于   |   FAQ   |   我们的愿景   |   广告投放   |  博客

  简单教程,简单编程 - IT 入门首选站

Copyright © 2013-2022 简单教程 twle.cn All Rights Reserved.