Python爬虫进阶教程(二):线程、协程
简介
线程
线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。 线程没有自己的系统资源,只拥有在运行时必不可少的资源。但线程可以与同属与同一进程的其他线程共享进程所拥有的其他资源。
多线程类似同时执行多个不同程序,python的标准库提供了两个模块thread和threading,thread是低级模块,threading是高级模块,对thread进行了封装,大多情况下,我们只使用threading模块即可。
threading模块
此模块在较低级别的thread模块之上构建的更高级别的线程接口,一般通过两种方式实现多线程,第一种方式是把一个函数传入并创建实例,然后调用start方法执行;第二种方式是直接从threading.Thread继承并创建线程类,然后重写init方法和run方法。
第一种方式代码示例:
import time import random import threading ? def t_run(urls): """ 线程执行代码 """ # threading.current_thread()返回当前的Thread对象,对应于调用者控制的线程。 # 如果调用者控制的线程不是通过threading模块创建的,则返回一个只有有限功能的虚假线程对象。 print('Current %s is running...' % threading.current_thread().name) for url in urls: print(' threading %s -----> %s ' % (threading.current_thread().name, url)) time.sleep(random.random()) print('%s ended.' % threading.current_thread().name) ? if __name__ == '__main__': # 创建两个线程实例 t1 = threading.Thread(target=t_run, name='Thread_1', args=(['url1', 'url2'],)) t2 = threading.Thread(target=t_run, name='Thread_2', args=(['url3', 'url4'],)) # 启动线程 t1.start() t2.start() # 等待线程结束 t1.join() t2.join() print('%s ended.' % threading.current_thread().name)
运行结果如下:
Current Thread_1 is running... threading Thread_1 -----> url1 Current Thread_2 is running... threading Thread_2 -----> url3 threading Thread_1 -----> url2 threading Thread_2 -----> url4 Thread_2 ended. Thread_1 ended. MainThread ended.
第二种方式用threading.Thread继承创建线程类
import time import random import threading ? class MyThread(threading.Thread): """ 定义线程类 """ ? def __init__(self, name, urls): """ 初始化,重写线程 """ threading.Thread.__init__(self, name=name) self.urls = urls ? def run(self): """ 执行函数 """ # 打印当前线程名 print('Current %s is running...' % threading.current_thread().name) for url in self.urls: print('Thread %s ------> %s' % (threading.current_thread().name, url)) time.sleep(random.random()) print('%s ended.' % threading.current_thread().name) ? if __name__ == '__main__': print('%s is running...' % threading.current_thread().name) t1 = MyThread(name='Thread_1', urls=['url1', 'url2']) t2 = MyThread(name='Thread_2', urls=['url3', 'url4']) t1.start() t2.start() t1.join() t2.join() print('%s ended.' % threading.current_thread().name)
结果如下:
MainThread is running... Current Thread_1 is running... Thread Thread_1 ------> url1 Current Thread_2 is running... Thread Thread_2 ------> url3 Thread Thread_1 ------> url2 Thread Thread_2 ------> url4 Thread_1 ended. Thread_2 ended. MainThread ended.
线程同步
如果多个线程共同对某个数据进行修改,就有可能会造成不可预料的结果,为了防止这种情况发生,需要对线程进行同步,使用Lock和Rlock可以实现简单线程同步。
Lock 对象
一个可重入锁处于“locked”或者“unlocked”状态中的一种。它创建时处于unlocked状态。它有两个基本方法,acquire()和release()。当状态是unlocked时,acquire()改变该状态为locked并立即返回。当状态被锁定时,acquire()阻塞,直到在另一个线程中对release()的调用将其改为unlocked,然后acquire()执行,release()方法只应在锁定状态下调用;它将状态更改为已解锁并立即返回。如果尝试释放已解锁的锁,将会引发RuntimeError。
Rlock 对象
一个可重入锁必须由获得它的线程释放。一旦线程获得了可重入锁,同一线程可以再次获取它而不阻塞;在所有的release操作完成后,别的线程才能申请Rlock对象,见下面例子:
import threading # 创建Rlock实例 lock = threading.RLock() # 定义变量 num = 0 ? class MyThread(threading.Thread): """ 定义线程类 """ ? def __init__(self, name): """ 重新定义name """ threading.Thread.__init__(self, name=name) ? def run(self): """ 执行函数 """ # 全局变量num global num while True: # 加锁 lock.acquire() print('%s locked, Number: %d' % (threading.current_thread().name, num)) if num >= 4: # 解锁 lock.release() print('%s released, Number: %d' % (threading.current_thread().name, num)) break num += 1 print('%s released, Number: %d' % (threading.current_thread().name, num)) lock.release() ? if __name__ == '__main__': thread1 = MyThread('Thread_1') thread2 = MyThread('Thread_2') thread3 = MyThread('Thread_3') thread1.start() thread2.start() thread3.start()
运行结果如下:
Thread_1 locked, Number: 0 Thread_1 released, Number: 1 Thread_1 locked, Number: 1 Thread_1 released, Number: 2 Thread_1 locked, Number: 2 Thread_1 released, Number: 3 Thread_1 locked, Number: 3 Thread_1 released, Number: 4 Thread_1 locked, Number: 4 Thread_1 released, Number: 4 Thread_2 locked, Number: 4 Thread_2 released, Number: 4 Thread_3 locked, Number: 4 Thread_3 released, Number: 4
可以看出Rlock锁只有线程1的num为4时,调用release方法,全部解锁后,线程2才可以调用,线程2开始时num就是4,所以也直接到if判断结束,调用release后,线程3开始执行。
全局解释器锁(GIL)
首先说的一点是GIL并不是Python的特性,它是Python解析器(CPython)引入的一个概念。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
GIL全称Global Interpreter Lock,是一个互斥锁,它可以防止多个本地线程同时执行Python的某个值,毫无疑问全局锁的存在会对多线程的效率有不小影响。几乎等于Python是个单线程的程序。(这也是大家吐槽python多线程慢的槽点)
协程
协程又称微线程、纤程,就好比同时开启多个任务,但一次只顺序执行一个。等到所执行的任务遭遇阻塞,就切换到下一个任务继续执行,以期节省下阻塞所占用的时间。
协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。对CPU来说协程就是单线程,不必考虑切换开销。
那么python如何实现协程呢?Python对协程的支持是通过generator实现的。在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。见下面简单生产者消费者示例:
def consumer(): r = '' while True: # 这个地方注意,到达这个yield后,就会抛出n的值,暂停等待next或send继续 n = yield r if not n: return print('[CONSUMER] Consuming %s ...' % n) r = '200 OK' ? def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print('[PRODUCER] Porducing %s ...' % n) r = c.send(n) print('[PRODUCER] Consumer return: %s...' % r) c.close() ? c = consumer() produce(c)
结果如下:
[PRODUCER] Porducing 1 ... [CONSUMER] Consuming 1 ... [PRODUCER] Consumer return: 200 OK... [PRODUCER] Porducing 2 ... [CONSUMER] Consuming 2 ... [PRODUCER] Consumer return: 200 OK... [PRODUCER] Porducing 3 ... [CONSUMER] Consuming 3 ... [PRODUCER] Consumer return: 200 OK... [PRODUCER] Porducing 4 ... [CONSUMER] Consuming 4 ... [PRODUCER] Consumer return: 200 OK... [PRODUCER] Porducing 5 ... [CONSUMER] Consuming 5 ... [PRODUCER] Consumer return: 200 OK...
注意到consumer函数是一个generator,把一个consumer传入produce后:
首先调用c.send(None)启动生成器;
然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
consumer通过yield拿到消息,处理,又通过yield把结果传回;
produce拿到consumer处理的结果,继续生产下一条消息;
produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
最后套用Donald Knuth的一句话总结协程的特点:
“子程序就是协程的一种特例。”
asyncio
asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。用asyncio实现Hello world代码如下:
import asyncio # 把一个generator标记为协程类型 @asyncio.coroutine def hello(): """ 定义一个生成器 """ print('Hello world!') # 通过yield from调用另一个标记为协程的生成器 r = yield from asyncio.sleep(2) print('Hello again!') # 生成实例 loop = asyncio.get_event_loop() # 将标记为协程的生成器执行 loop.run_until_complete(hello()) loop.close()
执行结果:
Hello world! # 此处等待了2秒 Hello again!
从结果可以看出hello()会首先打印出Hello world!,然后,yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。
我们用Task封装两个coroutine试试:
import asyncio import threading @asyncio.coroutine def hello(): print('Hello world! %s' % threading.current_thread()) r = yield from asyncio.sleep(5) print('Hello again! %s' % threading.current_thread()) loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
输出结果如下:
Hello world! <_MainThread(MainThread, started 22668)> Hello world! <_MainThread(MainThread, started 22668)> # 此处暂停5秒 Hello again! <_MainThread(MainThread, started 22668)> Hello again! <_MainThread(MainThread, started 22668)>
由打印的当前线程名称可以看出,两个coroutine是由同一个线程并发执行的。
如果把asyncio.sleep()换成真正的IO操作,则多个coroutine就可以由一个线程并发执行。
我们用asyncio的异步网络连接来获取sina、sohu和163的网站首页:
import asyncio @asyncio.coroutine def wget(host): print('wget %s ...' % host) connect = asyncio.open_connection(host, 80) reader, writer = yield from connect header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s ' % (host, line.decode('utf-8').rstrip())) writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.baidu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
输出结果如下:
wget www.163.com ... wget www.sina.com.cn ... wget www.baidu.com ... www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0 www.163.com header > Date: Sat, 06 Jan 2018 14:14:58 GMT www.163.com header > Content-Length: 0 www.163.com header > Location: http://www.163.com/special/0077jt/error_isp.html www.163.com header > Connection: close www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Server: nginx www.sina.com.cn header > Date: Sat, 06 Jan 2018 14:12:15 GMT www.sina.com.cn header > Content-Type: text/html www.sina.com.cn header > Content-Length: 605048 www.sina.com.cn header > Connection: close www.sina.com.cn header > Last-Modified: Sat, 06 Jan 2018 14:09:06 GMT www.sina.com.cn header > Vary: Accept-Encoding www.sina.com.cn header > Expires: Sat, 06 Jan 2018 14:13:12 GMT www.sina.com.cn header > Cache-Control: max-age=60 www.sina.com.cn header > X-Powered-By: shci_v1.03 www.sina.com.cn header > Age: 3 www.sina.com.cn header > Via: http/1.1 cnc.beixian.ha2ts4.205 (ApacheTrafficServer/6.2.1 [cMsSf ]), http/1.1 gwbn.beijing.ha2ts4.23 (ApacheTrafficServer/6.2.1 [cHs f ]) www.sina.com.cn header > X-Via-Edge: 15152479356296c6422730904eedb7d91845d www.sina.com.cn header > X-Cache: HIT.23 www.sina.com.cn header > X-Via-CDN: f=edge,s=gwbn.beijing.ha2ts4.21.nb.sinaedge.com,c=115.34.100.108;f=Edge,s=gwbn.beijing.ha2ts4.23,c=219.238.4.21 www.baidu.com header > HTTP/1.1 200 OK www.baidu.com header > Date: Sat, 06 Jan 2018 14:12:15 GMT www.baidu.com header > Content-Type: text/html www.baidu.com header > Content-Length: 14613 www.baidu.com header > Last-Modified: Fri, 29 Dec 2017 03:29:00 GMT www.baidu.com header > Connection: Close www.baidu.com header > Vary: Accept-Encoding www.baidu.com header > Set-Cookie: BAIDUID=BEA2CAC2706F8386AAC50DEEC6287BD9:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com www.baidu.com header > Set-Cookie: BIDUPSID=BEA2CAC2706F8386AAC50DEEC6287BD9; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com www.baidu.com header > Set-Cookie: PSTM=1515247935; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com www.baidu.com header > P3P: CP=" OTI DSP COR IVA OUR IND COM " www.baidu.com header > Server: BWS/1.1 www.baidu.com header > X-UA-Compatible: IE=Edge,chrome=1 www.baidu.com header > Pragma: no-cache www.baidu.com header > Cache-control: no-cache www.baidu.com header > Accept-Ranges: bytes
async/await
asyncio提供的@asyncio.coroutine可以把一个generator标记为coroutine类型,然后在coroutine内部用yield from调用另一个coroutine实现异步操作。
为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。
请注意,async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:
- 把@asyncio.coroutine替换为async;
- 把yield from替换为await。
那么6部分的hello代码就可以这样替换了
# 把一个generator标记为协程类型 @asyncio.coroutine def hello(): """ 定义一个生成器 """ print('Hello world!') # 通过yield from调用另一个标记为协程的生成器 r = yield from asyncio.sleep(2) print('Hello again!')
替换为
async def hello(): """ 定义一个生成器 """ print('Hello world!') # 通过await调用另一个标记为协程的生成器 r = await asyncio.sleep(2) print('Hello again!')
其它位置的代码均不变化。