现代操作系统基本上都是支持“多任务”的。
现在,多核CPU已经非常普及了,但是,即使过去的单核CPU,也可以执行多任务。由于CPU执行代码都是顺序执行的,那么,单核CPU是怎么执行多任务的呢?
答案就是 操作系统轮流让各个任务交替执行 ,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒……这样反复执行下去。表面上看,每个任务都是交替执行的,但是, 由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样 。
真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会 自动把很多任务轮流调度到每个核心上执行 。
对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。
有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们 把进程内的这些“子任务”称为线程(Thread) 。
由于每个进程至少要干一件事,所以, 一个进程至少有一个线程 。
多任务的实现有3种方式:
多进程和多线程的程序涉及到同步、数据共享的问题,编写起来更复杂。
参考 廖雪峰Python教程 。
先了解操作系统关于进程的相关知识。
Unix/Linux操作系统提供了一个 fork()
系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是 fork()
调用一次,返回两次 ,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后, 分别在父进程和子进程内返回 。
子进程永远返回 0
,而父进程返回子进程的ID。这样做的理由是,一个父进程可以 fork
出很多子进程,所以,父进程要 记下每个子进程的ID ,而子进程只需要调用 getppid()
就可以拿到父进程的ID。
Python的 os
模块封装了常见的系统调用,其中就包括 fork
,可以在Python程序中轻松创建子进程:
import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
由于Windows没有fork调用,上面的代码在Windows上无法运行。
关于 Windows 下创建进程,WINAPI 中有 CreateProcess
。至于具体内容,那就是另外一回事了。
有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。
Python 的 multiprocessing
模块是跨平台版本的多进程模块。
multiprocessing
模块提供了一个 Process
类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.')
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个 Process
实例,用 start()
方法启动,这样创建进程比 fork()
还要简单。
join()
方法可以 等待子进程结束后再继续往下运行 ,通常用于 进程间的同步 。
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) # 对Pool对象调用join()方法会等待所有子进程执行完毕 # 调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。 # 由于Pool的默认大小是CPU的核数,如果创建进程数量超过核数,需要等前面的进程执行完毕再创建新进程 if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.')
multiprocessing.Pool
有一个 map()
方法可以自动映射参数列表,比如我们想批量处理文件的时候,可以将文件名列表映射给不同的进程。
很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。
subprocess
模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。
下面的例子演示了如何在Python代码中运行命令 nslookup www.python.org
,这和命令行直接运行的效果是一样的:
import subprocess print('$ nslookup www.python.org') # subprocess.call 直接调用命令行 r = subprocess.call(['nslookup', 'www.python.org']) print('Exit code:', r)
如果子进程还需要输入,则可以通过 communicate()
方法输入:
import subprocess print('$ nslookup') # 先执行 nslookup p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 然后再输入 nslookup 指令 output, err = p.communicate(b'set q=mx\npython.org\nexit\n') print(output.decode('utf-8')) print('Exit code:', p.returncode)
Process
之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的 multiprocessing
模块包装了底层的机制,提供了 Queue
、 Pipes
等多种方式来交换数据。
我们以 Queue
为例,在父进程中创建两个子进程,一个往 Queue
里写数据,一个从 Queue
里读数据:
multiprocessing.Queue
是专门用于进程间通信的队列,和 Python 标准库中的 Queue
有区别。
from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__=='__main__': # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() # pr进程里是死循环,无法等待其结束,只能强行终止: pr.terminate()
在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。
进程是由若干线程组成的,一个进程至少有一个线程。
由于线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Python也不例外,并且,Python的线程是真正的Posix Thread,而不是模拟出来的线程。
Python的标准库提供了两个模块: _thread
和 threading
,_thread
是低级模块, threading
是高级模块,对 _thread
进行了封装。绝大多数情况下,我们只需要使用 threading
这个高级模块。
启动一个线程就是把一个函数传入并创建 Thread
实例,然后调用 start()
开始执行:
import time, threading # 新线程执行的代码: def loop(): print('thread %s is running...' % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print('thread %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1) print('thread %s ended.' % threading.current_thread().name) print('thread %s is running...' % threading.current_thread().name) t = threading.Thread(target=loop, name='LoopThread') t.start() t.join() print('thread %s ended.' % threading.current_thread().name)
由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的 threading
模块有个 current_thread()
函数,它永远返回当前线程的实例。主线程实例的名字叫 MainThread
,子线程的名字在创建时指定,我们用 LoopThread
命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为 Thread-1
, Thread-2
……
多线程和多进程最大的不同在于, 多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响 ,而多线程中, 所有变量都由所有线程共享 ,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
实用 threading.Lock()
加锁:
balance = 0 lock = threading.Lock() def run_thread(n): for i in range(100000): # 先要获取锁: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了一定要释放锁: lock.release()
当多个线程同时执行 lock.acquire()
时, 只有一个线程能成功地获取锁 ,然后继续执行代码,其他线程就继续等待直到获得锁为止。
无论多少线程,同一时刻最多只有一个线程持有该锁。
获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用 try...finally
来确保锁一定会被释放。
锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是 阻止了多线程并发执行 ,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次, 由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁 ,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。
我们可以监控到一个死循环线程会100%占用一个CPU。
如果有两个死循环线程,在多核CPU中,可以监控到会占用200%的CPU,也就是占用两个CPU核心。
要想把N核CPU的核心全部跑满,就必须启动N个死循环线程。
试试用Python写个死循环:
import threading, multiprocessing def loop(): x = 0 while True: x = x ^ 1 for i in range(multiprocessing.cpu_count()): t = threading.Thread(target=loop) t.start()
启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有102%,也就是仅使用了一核。
但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?
因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。 这个GIL全局锁实际上把所有线程的执行代码都给上了锁 ,所以, 多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核 。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。
所以,在Python中,可以使用多线程,但 不要指望能有效利用多核 。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。
在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦:
def process_student(name): std = Student(name) # std是局部变量,但是每个函数都要用它,因此必须传进去: do_task_1(std) do_task_2(std) def do_task_1(std): do_subtask_1(std) do_subtask_2(std) def do_task_2(std): do_subtask_2(std) do_subtask_2(std)
每个函数一层一层调用都这么传参数那还得了?用全局变量?也不行,因为每个线程处理不同的 Student
对象,不能共享。
如果用一个全局 dict
存放所有的 Student
对象,然后以 thread
自身作为 key
获得线程对应的 Student
对象如何?
global_dict = {} def std_thread(name): std = Student(name) # 把std放到全局变量global_dict中: global_dict[threading.current_thread()] = std do_task_1() do_task_2() def do_task_1(): # 不传入std,而是根据当前线程查找: std = global_dict[threading.current_thread()] ... def do_task_2(): # 任何函数都可以查找出当前线程的std变量: std = global_dict[threading.current_thread()] ...
这种方式理论上是可行的,它最大的优点是消除了std对象在每层函数中的传递问题,但是,每个函数获取std的代码有点丑。
ThreadLocal
应运而生,不用查找 dict
, ThreadLocal
帮你自动做这件事:
import threading # 创建全局ThreadLocal对象: # 可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。 local_school = threading.local() def process_student(): # 获取当前线程关联的student: std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name): # 绑定ThreadLocal的student: local_school.student = name process_student() t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join()
ThreadLocal
最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
要实现多任务,通常我们会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务,因此,多任务环境下,通常是一个Master,多个Worker。
如果用多进程实现Master-Worker,主进程就是Master,其他进程就是Worker。
如果用多线程实现Master-Worker,主线程就是Master,其他线程就是Worker。
多进程模式:
多线程模式:
无论是多进程还是多线程,只要数量一多,效率就会下降。
操作系统在切换进程或者线程时,需要先保存当前执行的现场环境(CPU寄存器状态、内存页等),然后,把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等),才能开始执行。这个切换过程虽然很快,但是也需要耗费时间。如果有几千个任务同时进行,操作系统可能就主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。
是否采用多任务的第二个考虑是任务的类型。我们可以把任务分为计算密集型和IO密集型。
计算密集型任务的特点是要进行 大量的计算 ,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。
第二种任务的类型是 IO密集型 ,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。
考虑到CPU和IO之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待IO操作,单进程单线程模型会导致别的任务无法并行执行,因此,我们才需要多进程模型或者多线程模型来支持多任务并发执行。
现代操作系统对IO操作已经做了巨大的改进,最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为 事件驱动模型 ,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。
对应到Python语言,单线程的异步编程模型称为 协程 ,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。我们会在后面讨论如何编写协程。
在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
Python的 multiprocessing
模块不但支持多进程,其中 managers
子模块还支持 把多进程分布到多台机器上 。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于 managers
模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。
我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:
# task_master.py import random, time, queue from multiprocessing.managers import BaseManager # 发送任务的队列: task_queue = queue.Queue() # 接收结果的队列: result_queue = queue.Queue() # 从BaseManager继承的QueueManager: class QueueManager(BaseManager): pass # 把两个Queue都注册到网络上, callable参数关联了Queue对象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 启动Queue: manager.start() # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() # 放几个任务进去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 关闭: manager.shutdown() print('master exit.')
请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。
# task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 处理结束: print('worker exit.')
这个简单的Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算 n*n
的代码换成发送邮件,就实现了邮件队列的异步发送。
因为一个IO操作就阻塞了当前线程,导致其他代码无法执行,所以我们必须使用多线程或者多进程来并发执行代码,为多个用户服务。每个用户都会分配一个线程,如果遇到IO导致线程被挂起,其他用户的线程不受影响。
多线程和多进程的模型虽然解决了并发问题,但是系统不能无上限地增加线程。由于系统切换线程的开销也很大,所以,一旦线程数量过多,CPU的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。
由于我们要解决的问题是CPU高速执行能力和IO设备的龟速严重不匹配,多线程和多进程只是解决这一问题的一种方法。
另一种解决IO问题的方法是异步IO。当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就去执行其他代码了。一段时间后,当IO返回结果时,再通知CPU进行处理。
异步IO模型需要一个 消息循环 ,在消息循环中,主线程不断地重复 “读取消息-处理消息” 这一过程:
loop = get_event_loop() while True: event = loop.get_event() process_event(event)
消息模型其实早就应用在桌面应用程序中了。一个GUI程序的主线程就负责不停地读取消息并处理消息。所有的键盘、鼠标等消息都被发送到GUI程序的消息队列中,然后由GUI程序的主线程处理。
由于GUI线程处理键盘、鼠标等消息的速度非常快,所以用户感觉不到延迟。某些时候,GUI线程在一个消息处理的过程中遇到问题导致一次消息处理时间过长,此时,用户会感觉到整个GUI程序停止响应了,敲键盘、点鼠标都没有反应。这种情况说明 在消息模型中,处理一个消息必须非常迅速,否则,主线程将无法及时处理消息队列中的其他消息,导致程序看上去停止响应 。
消息模型是如何解决同步IO必须等待IO操作这一问题的呢?当遇到IO操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。
在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。
协程,又称微线程,纤程。英文名Coroutine。
协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。
子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。
协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行 。
注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:
def A(): print('1') print('2') print('3') def B(): print('x') print('y') print('z')
假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:
1 2 x y 3 z
但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。
看起来A、B的执行有点像多线程,但 协程的特点在于是一个线程执行 ,那和多线程比,协程有何优势?
最大的优势就是 协程极高的执行效率 。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
第二大优势就是 不需要多线程的锁机制 ,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
Python对协程的支持是通过 generator
实现的。
在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。
但是 Python的yield不但可以返回一个值,它还可以接收调用者发出的参数 。
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:
def consumer(): r = '' while True: # 右边是 yield 语句,所以yield语句执行完以后,进入暂停, # 而赋值语句在下一次启动生成器的时候首先被执行 n = yield r if not n: return print('[CONSUMER] Consuming %s...' % n) r = '200 OK' def produce(c): c.send(None) # c.send(None),其功能类似于next(c) # 在使用c.send(n)之前,必须先使用c.send(None)或者next(c)来返回生成器的第一个值。 n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) # send 在接受None参数的情况下,等同于next(generator)的功能, # 但send同时也可接收其他参数 r = c.send(n) print('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c)
注意到 consumer
函数是一个 generator
,把一个 consumer
传入 produce
后:
c.send(None)
启动生成器;c.send(n)
切换到 consumer
执行;consumer
通过 yield
拿到消息,处理,又通过 yield
把结果传回;produce
拿到 consumer
处理的结果,继续生产下一条消息;produce
决定不生产了,通过 c.close()
关闭 consumer
,整个过程结束。整个流程无锁,由一个线程执行, produce
和 consumer
协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
asyncio
是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。
asyncio
的编程模型就是一个消息循环。我们从 asyncio
模块中直接获取一个 EventLoop
的引用,然后把需要执行的协程扔到 EventLoop
中执行,就实现了异步IO。
import asyncio @asyncio.coroutine def hello(): print("Hello world!") # 异步调用asyncio.sleep(1): r = yield from asyncio.sleep(1) print("Hello again!") # 获取EventLoop: loop = asyncio.get_event_loop() # 执行coroutine loop.run_until_complete(hello()) loop.close()
@asyncio.coroutine把一个generator标记为coroutine类型,然后,我们就把这个coroutine扔到EventLoop中执行。
hello()会首先打印出Hello world!,然后,yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。
把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。
我们用Task封装两个coroutine试试:
import threading import asyncio @asyncio.coroutine def hello(): print('Hello world! (%s)' % threading.currentThread()) yield from asyncio.sleep(1) print('Hello again! (%s)' % threading.currentThread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close() # Hello world! (<_MainThread(MainThread, started 140735195337472)>) # Hello world! (<_MainThread(MainThread, started 140735195337472)>) # (暂停约1秒) # Hello again! (<_MainThread(MainThread, started 140735195337472)>) # Hello again! (<_MainThread(MainThread, started 140735195337472)>) # 如果把asyncio.sleep()换成真正的IO操作,则多个coroutine就可以由一个线程并发执行。
import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host) connect = asyncio.open_connection(host, 80) reader, writer = yield from connect header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。
asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。
关于协程这一块内容的理解:
协程是程序调度的方式,多线程和多进程由操作系统调度,而协程由程序自身即程序员调度。协程,虽说是单线程,但也实现了各代码块的快速交替执行。