Python 任务队列 RQ 中的连接 ( connections )

yufei       5 年, 8 月 前       1073

早期的版本,为了方便起见,RQ 提供了 use_connection() 函数,但这个函数在后来的版本中被 弃用

原因是它会污染全局命名空间,取而代之的是,使用 with Connection(...): 上下文管理器或或直接传递 Redis 连接引用给队列来提供显式的连接管理功能

单个 Redis 连接 ( 简单 )

注意

use_connection() 函数已经被弃用,你不应该在脚本中再使用 use_connection(),而是使用显式的连接管理器

开发模式下,我们一般都是连接到本地的,默认的 Redis 服务上,这种场合,使用 use_connection() 是最方便的

from rq import use_connection
use_connection()

但在生产环境下,这样做虽然也提供了方便,但对于未来的更改则提供了诸多的不变,因此建议显式的指定一个 Redis 服务

from redis import Redis
from rq import use_connection

redis = Redis('my.host.org', 6789, password='secret')
use_connection(redis)

即便如此,我们还是希望你注意到两个事实

  1. use_connection() 会污染全局命名空间
  2. 同样的,使用 use_connection() 意味着只能连接到单一的 Redis 服务

多个 Redis 连接

单一连接模式在某些场景下是非常有用的,比如适用于连接到单个 Redis 实例的情况,以及希望影响全局上下文的情况(通过使用 use_connection() 调用替换现有连接)

但也仅仅只有这两个好处,也只能在在完全能够控制 Web 堆栈时才能使用此模式

其它情况下,或者想要使用多个连接时,使用 Connection 上下文或者显式传递 Redis 连接是个更好的注意

显式指定连接 ( 精准但乏味 )

每一个 RQ 对象实例 ( 队列 、工作进程、作业任务 ) 在初始化时都提供了一个 connection 形参用于指定 Redis 连接

所以,我们就可以抛弃 use_connection() ,转而使用下面这种方式

from rq import Queue
from redis import Redis

conn1 = Redis('localhost', 6379)
conn2 = Redis('remote.host.org', 9836)

q1 = Queue('foo', connection=conn1)
q2 = Queue('bar', connection=conn2)

这种方式下,队列中的每个作业任务都可以明确的知道它属于哪个连接,工作进程中也一样

这种使用方式相当精准,当也相当冗余,因此也枯燥乏味

连接上下文 ( 精准且简洁 )

如果要使用多个连接,可以使用更好的方法,就是使用连接上下文

每个 RQ 对象实例在创建时将使用 RQ 连接堆栈上最顶层的 Redis 连接

这是临时的,用于替换将要使用的默认连接的机制

纯文字版看起来很难理解,我们直接来一段代码吧:

from rq import Queue, Connection
from redis import Redis

with Connection(Redis('localhost', 6379)):
    q1 = Queue('foo')
    with Connection(Redis('remote.host.org', 9836)):
        q2 = Queue('bar')
    q3 = Queue('qux')

assert q1.connection != q2.connection
assert q2.connection != q3.connection
assert q1.connection == q3.connection

对于这种方式,你可以将此视为在 Connection 上下文中,每个新创建的 RQ 对象实例都将隐式设置连接参数

也就是说使用 q2 将作业任务排入队列会将其保存到第二个 ( 远程 ) Redis 后端,即使在连接上下文之外也是如此

压入或弹出连接

如果我们的代码中不允许使用 with 语句,例如在单元测试中,则可以使用 push_connection()pop_connection() 来代替上下文管理器

import unittest
from rq import Queue
from rq import push_connection, pop_connection

class MyTest(unittest.TestCase):
    def setUp(self):
        push_connection(Redis())

    def tearDown(self):
        pop_connection()

    def test_foo(self):
        """Any queues created here use local Redis."""
        q = Queue()
        ...

哨兵支持

RQ 提供了 Redis 连接的哨兵功能,哨兵功能,也就是防止单机故障而导致整个应用不能正常工作

为了使用哨兵功能,我们必须在配置文件中添加一些配置选项,从而让 systemd 或者 docker 容器能够在某个 Redis 连接不可用的情况下自动重启

SENTINEL: {'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],
           'SOCKET_TIMEOUT': None,
           'PASSWORD': 'secret',
           'DB': 2,
           'MASTER_NAME': 'master'}
目前尚无回复
简单教程 = 简单教程,简单编程
简单教程 是一个关于技术和学习的地方
现在注册
已注册用户请 登入
关于   |   FAQ   |   我们的愿景   |   广告投放   |  博客

  简单教程,简单编程 - IT 入门首选站

Copyright © 2013-2022 简单教程 twle.cn All Rights Reserved.