Gevent 手记

鼎鼎大名的 Gevent 。

协程核心思想是通过再次切分时间片,进行细粒度的上下文切换达到“并行”的效果,这种行为与线程类似,但是不在系统层面而是在应用的层面开销很小,所以称为协程。

– 所以某种程度上来说,协程的调度是应用主动进行的。

– 可以通过调用 gevent.sleep 来主动的进行协程调度

– gevent.select 也可以触发协程调度。

– 理想中的状态是,在发生 IO 和网络等待时触发协程调度。

– 相较于系统的线程调度,greenlet 的调度是可以预期的,但是这往往并不能代表结果就是可预期的,因为我们往往面对着随机的网络等待和IO等待。

– 可以使用 Greenlet.spawn 和 gevent.spawn 直接使用函数发起 greenlet ,使用 gevent.joinall 来同步协程的运行结果。

– 如果想用类的方式制作 greenlet 的执行体,需要继承 Greenlet 类,并且实现 _run 方法作为,执行的启动调用。

– Greenlet 的状态可以如下操作:

    started — Boolean, 协程是否启动
    ready() — Boolean, 协程是否被挂起
    successful() — Boolean, 协程是否执行完毕并且没有异常发生
    value — arbitrary, 协程的返回值
    exception — exception, 协程发生的异常,但是不会从协程中抛出到主逻辑进程中

– 使用 gevent.signal(signal.SIGQUIT, gevent.kill) 来时 SIGQUIT 信号与 gevent.kill 关联起来,防止僵尸协程的出现。

– gevent 中有很多种处理超时的方法。

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
  gevent.sleep(10)

try:
  gevent.spawn(wait).join()
  except Timeout:
  print('Could not complete')
    也可以使用with
import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
  pass

with Timeout(time_to_wait, TooLong):
  gevent.sleep(10)

– 猴子补丁

使用猴子补丁以后,gevent 替换了标准库的 socket 实现。

猴子补丁通过使用 gevent.monkey.patch_all 调用,也可以调用具体某个方面的补丁:

    gevent.monkey.patch_socket(dns=True, aggressive=True)
    gevent.monkey.patch_ssl()
    gevent.monkey.patch_os() # 替换了 fork
    gevent.monkey.patch_time() # 替换了 sleep
    gevent.monkey.patch_select(aggressive=True) # 替换了 select
    gevent.monkey.patch_thread(threading=True, _threading_local=True, Event=False)
    gevent.monkey.patch_subprocess()
    gevent.monkey.patch_sys(stdin=True, stdout=True, stderr=True)

– Gevent.event 相当于信号锁。event 的 set 方法可以通知所有的 wait 调用

– AsyncResult 可以在不同的协程中传递信息,可以有多个接收,(这点与 go 中的 channel 不同,channel 是有个数缓冲的, gevent 中类似广播机制)

– Gevent 中的 Queue 可以在不同协程中使用。

– 注意 Queue 中的 get put 与 get_nowait put_nowait 的区别

– 队列为空之后进行 get 将会阻塞,如果设置了超时,则超时后以后会抛出 Empty 异常

– 可以用 group 管理一组协程,方便 join 等操作

– group 也提供很多方法方便处理协程,如 map imap imap_unordered 等

– pool 与 group 作用类似,但是pool会限制并发数。

– gevent 提供了 local 作为保存协程内部变量使用,(效率更高?)不能跨协程使用。

– gevent 也提供 Subprocess 相关操作,可以直接用来替换标准库中的相关函数,这些都可以在协程环境下工作。

– 利用 gevent.socket.wait_read 与 wait_write 与标准库 multiprocessing 一起工作。

import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write

# To Process
a, b = Pipe()

# From Process
c, d = Pipe()

def relay():
  for i in xrange(10):
  msg = b.recv()
  c.send(msg + " in " + str(i))

def put_msg():
  for i in xrange(10):
  wait_write(a.fileno())
  a.send('hi')

def get_msg():
  for i in xrange(10):
  wait_read(d.fileno())
  print(d.recv())

if __name__ == '__main__':
  proc = Process(target=relay)
  proc.start()

g1 = gevent.spawn(get_msg)
g2 = gevent.spawn(put_msg)
gevent.joinall([g1, g2], timeout=1)

– 利用 Gevent 实现 Actor 模式。

import gevent
from gevent.queue import Queue

class Actor(gevent.Greenlet):

def __init__(self):
  self.inbox = Queue()
  Greenlet.__init__(self)

def receive(self, message):
  """
  Define in your subclass.
  """
  raise NotImplemented()

def _run(self):
  self.running = True

while self.running:
  message = self.inbox.get()
  self.receive(message)
    使用
import gevent
from gevent.queue import Queue
from gevent import Greenlet

class Pinger(Actor):
  def receive(self, message):
    print(message)
    pong.inbox.put('ping')
    gevent.sleep(0)

class Ponger(Actor):
  def receive(self, message):
    print(message)
    ping.inbox.put('pong')
    gevent.sleep(0)

ping = Pinger()
pong = Ponger()

ping.start()
pong.start()

ping.inbox.put('start')
gevent.joinall([ping, pong])