对于协程(coroutine),你必须知道事情都在这里了(内附代码)

MLTalks
8 min readMar 21, 2020

--

对于协程(coroutine),你必须知道事情都在这里了(内附代码)

什么是协程

协程(coroutine)的概念根据Donald Knuth的说法早在1958年就由Melvin Conway提出了,对应wikipedia的定义如下:

Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed. Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes.

这里子例程(subroutine)是一个概括性的术语,子例程可以是整个程序中的一个代码区块,当它被主程序调用的时候就会进入运行。例如函数就是子例程中的一种。

c = max(a,b);

从wikipedia定义可以看出协程相比子例程更加的灵活,允许执行过程中被挂起恢复,多个协程可以一起相互协作执行任务。从协程(co + routine)名字上来拆解为支持协作(cooperate)的例程。

协程与子例程的执行区别

图中左边是子例程的执行过程,右边是协程的执行过程,可以很明显的看出来执行过程中的区别。

  • 先说左边,子例程可以看成是某个函数,子例程的执行过程中通常是嵌套顺序执行的过程,子例程1和子例程2的关系(调用和被调用)不是完全平等的,子例程1能调用子例程2,但子例程2不能反过来调用子例程1。
  • 再说右边,协程1和协程2的关系是完全对等的,协程1执行过程中可以中断挂起执行另外一个协程2,反之也是可以的,直到最终两个协程都执行完以后再返回回到主程序中,即协程1和协程2相互协作完成了整个任务。

接下来举一个协程实现生产者和消费者的例子:

var q := new queuecoroutine produce
loop
while q is not full
create some new items
add the items to q
yield to consume
coroutine consume
loop
while q is not empty
remove some items from q
use the items
yield to produce

这里有一个队列queue,一个生产者produce,一个消费者consume,yield代表中断挂起当前协程,并恢复其他协程的操作。生产者生产物品以后加入到队列以后,中断挂起自身并恢复消费者,消费者从队列中消费完物品以后中断挂起自身并恢复生产者,不断来回切换直到达到最终条件(比如所有原料都生产成物品并全都被消费完成),程序终止。

进程、线程、协程的关系和比较

通常会提到进程是资源分配的最小单位,线程是CPU调度的最小单位, 一个进程里可以有多个线程,这里直接画了个图来说明三者关系。

  • 进程是资源分配的最小单位,会拥有独立的地址空间以及对应的内存空间,还有网络和文件资源等,不同进程之间资源都是独立的,可以通过进程间通信(管道、共享内存、信号量等方式)来进行交互。
  • 线程为CPU调度的基本单位,除了拥有运行中必不可少的信息(如程序计数器、一组寄存器和栈)以外,本身并不拥有系统资源,所有线程会共享进程的资源,比如会共享堆资源。
  • 协程可以认为是运行在线程上的代码块,协程提供的挂起操作会使协程暂停执行,而不会导致线程阻塞。一个线程内部可以创建几千个协程都没有任何问题。
  • 进程的切换和线程切换中都包含了对应上下文的切换,这块都涉及到了内核来完成,即一次用户态到内核态的切换和一次内核态到用户态的切换。因为进程上下文切换保存的信息更多,所以进程切换代价会比线程切换代价更大。
  • 协程是一个纯用户态的并发机制,同一时刻只会有一个协程在运行,其他协程挂起等待;不同协程之间的切换不涉及内核,只用在用户态切换即可,所以切换代价更小,更轻量级,适合IO密集型的场景。

coroutine的python实现

  1. Python最初的版本里是包含了yield/send关键字,通过yield/send可以方便的实现一个协程的例子,这里还是以为生产者和消费者为例,具体实现方式如下:如下:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
def consumer():
"""consumer定义"""
ret = ''
while True:
# 挂起consumer,恢复producer, ret会传回给producer
item_id = yield ret
if not item_id:
return
print('consume item_id:{}'.format(item_id))
ret = 'success'
def producer(consumer):
"""producer定义"""
# send一个None可以看成是启动consumer
# consumer这里包含了yield关键字相当于是一个generator
consumer.send(None)
item_id = 0
# 生产的item的总数
item_total_count = 3
while item_id < item_total_count:
item_id = item_id + 1
print('produce item:{}'.format(item_id))
# 挂起producer,恢复consumer, item_id会传回给consumer
ret = c.send(item_id)
print('consumer return:{}'.format(ret))
consumer.close()
if __name__ == "__main__":
c = consumer()
producer(c)

结果:

produce item:1
consume item_id:1
consumer return:success
produce item:2
consume item_id:2
consumer return:success
produce item:3
consume item_id:3
consumer return:success

2. python 3.5版本开始引入了async/await关键字给了我们另外一种实现的方法,还是以为生产者和消费者为例,具体实现方式如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
import random

async def producer(queue, item_total_count):
"""producer 定义"""
for item_id in range(0, item_total_count):
# 生产一个新的item
print('produce item_id:{}'.format(item_id))
# 模拟IO操作, 挂起producer,恢复consumer
#await asyncio.sleep(random.random())
# 把item放入队列, 挂起producer,恢复consumer
await queue.put(item_id)
# 放入一个None到队列表示生产全部结束
await queue.put(None)


async def consumer(queue):
"""consumer 定义"""
while True:
# 等待从队列中获得一个新的item, 等待过程中会挂起consumer,恢复producer
item_id = await queue.get()
if item_id is None:
# 表示生产者都生产完了,没有待消费的请求了
break
print('consume item_id:{}'.format(item_id))
# 模拟IO操作, 挂起consumer,恢复producer
#await asyncio.sleep(random.random())

if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = producer(queue, 3)
consumer_coro = consumer(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()

结果:

produce item_id:0
produce item_id:1
produce item_id:2
consume item_id:0
consume item_id:1
consume item_id:2

文章声明: 本文属于个人原创,欢迎转载,请注明出处,谢谢

--

--