本文是我看了http://python.jobbole.com/88291/后加上自己的一些感想所创
我们都知道,IO比CPU慢很多个数量级,而传统的IO,是阻塞型的,CPU花在等待IO上的时间很多,那么想要提高并发量,选择解决CPU在等待IO上花费的大量时间是一个比较好的出路。
先来看一个阻塞型IO的例子。
同步阻塞下载程序
该脚本的功能是下载10个网页。
import socket
urls = ['/'+str(i) for i in range(10)]
host = 'www.baidu.com'
def fn(url):
sock = socket.socket()
sock.connect((host,80)) # 这里会阻塞直到发送成功
#send不会阻塞
sock.send(('GET %s HTTP/1.0\r\nHost: %s\r\n\r\n' % (url,host)).encode('utf-8'))
response = b''
while True:
chunk = sock.recv(4096) #这里会阻塞直到接收数据
if chunk:
response += chunk
else:
break
# print(response.decode('utf-8'))
sock.close()
if __name__ == '__main__':
import time
start = time.time()
for i in urls:
fn(i)
print(time.time()-start)
花费的时间大概是7-10秒。
我们知道,程序会在connect和recv上阻塞,等待数据,这就是阻塞式IO的缺点,时间都花费在这里了
那我们是不是可以通过改变套接字为非阻塞来解决这个问题呢?
非阻塞式下载
import socket
urls = ['/'+str(i) for i in range(10)]
host = 'www.baidu.com'
def fn(url):
sock = socket.socket()
sock.setblocking(False) # 设置为非阻塞套接字
try:
sock.connect(('www.baidu.com',80))
except BlockingIOError:
# connect无法立即完成,会有异常
pass
while True:
# 这个循环是为了确保connect完成了
try:
sock.send(('GET %s HTTP/1.0\r\nHost: %s\r\n\r\n' % (url,host)).encode('utf-8'))
break
except IOError:
pass
response = b''
while True:
while True:
# 这个循环确保recv收到信息了
try:
chunk = sock.recv(4096)
break
except IOError:
pass
if chunk:
response += chunk
else:
break
# print(response.decode('utf-8'))
sock.close()
if __name__ == '__main__':
import time
start = time.time()
for i in urls:
fn(i)
print(time.time()-start)
运行时间实际上和阻塞式IO时间近似,虽然IO 操作不在等待了,但是我们不知道具体何时IO操作会完成,所以还是得不停的去检测到底是不是完成了,故而效率并没有提高。
那么如果我们有办法不用不停的去检测何时IO就绪,有一个东西可以告诉我们,那是不是就可以提高效率呢?对的这个就是IO多路复用
IO多路复用的下载
# coding=utf-8
import socket
from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
import time
'''
相比于select模块,官方更推荐使用selectors这个高级IO多路复用库
'''
# 自动选择最高效的IO多路复用,Linux上默认是epoll
selector = DefaultSelector()
stopped = False
urls_todo = {'/'+str(i) for i in range(10)}
class Crawler:
def __init__(self,url):
self.url = url
self.sock = None
self.response = b''
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('flycold.cn',80))
except BlockingIOError:
pass
# 注册sock的可写事件(connect完成时)
selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)
# 当connect完成时发request
def connected(self,key,mask):
# 取消注册
selector.unregister(key.fd)
get = 'GET {0} HTTP/1.0\r\nHost:flycold.cn\r\n\r\n'.format(self.url)
self.sock.send(get.encode('utf-8'))
# 注册可读事件(当服务器给予回应时)
selector.register(key.fd,EVENT_READ,self.read_response)
# 当服务器回应后触发
def read_response(self,key,mask):
global stopped
chunk = self.sock.recv(4096)
if chunk:
self.response += chunk
else:
selector.unregister(key.fd)
urls_todo.remove(self.url)
self.sock.close()
if not urls_todo:
stopped = True
print(chunk.decode('utf-8'))
# 注册事件后,需要获取哪个套接字触发了事件,并去处理
def loop():
while not stopped:
# 阻塞,直到一个事件发生
events = selector.select()
for event_key,event_mask in events:
callback = event_key.data
callback(event_key,event_mask)
if __name__ == '__main__':
start = time.time()
for i in urls_todo:
x = Crawler(i)
x.fetch()
loop()
print(time.time()-start)
运行时间提高了很多,在网络情况良好时,只花费了不到0.5秒。
这样虽然提高了效率,但是问题还是蛮多的:
回调层次过多时代码可读性差
破坏代码结构
共享状态管理困难
错误处理困难
所以,我们可以使用协程来优化代码。
我们知道,协程可以暂停去执行别的部分,并且不会忘记刚才的状态
协程版
我们需要几个新的对象
未来对象(Future)
异步执行的结果,放在这个对象里。不用回调,我们不知道何时IO操作完成,那么用这个未来对象,存储未来结果。
# 未来对象,result存放未来的执行结果
class Future:
# 初始化结果和回调函数为空
def __init__(self):
self.result = None
self._callbacks = []
# 添加回调函数
def add_done_callback(self,fn):
self._callbacks.append(fn)
# 设置结果并执行回调
def set_result(self,result):
self.result = result
for fn in self._callbacks:
fn(self)
重构Crawler
# 爬虫类
class Crawler:
def __init__(self,url):
self.url = url
self.sock = None
self.response = b''
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('flycold.cn',80))
except BlockingIOError:
pass
f = Future()
def on_connected():
f.set_result(None)
epoller.register(sock.fileno(),EVENT_WRITE,on_connected)
# connect后会跳出协程
yield f
epoller.unregister(sock.fileno())
get = 'GET {0} HTTP/1.0\r\nHost:flycold.cn\r\n\r\n'.format(self.url)
sock.send(get.encode('utf-8'))
global stopped
while True:
f = Future()
# sock可读时的事件
def on_readable():
f.set_result(sock.recv(4096))
epoller.register(sock.fileno(),EVENT_READ,on_readable)
# send后跳出协程
chunk = yield f
epoller.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
break
任务对象Task
我们何时回复爬虫对象继续执行了,需要有人去管理,就是Task
# 任务对象
class Task:
# 初始化,设置爬虫任务对象,执行step
# coro是fetch生成器
def __init__(self,coro):
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)
# 单步执行
def step(self,future):
try:
# send 会进入到coro执行,即fetch,直到下次yield
# next_future 为yield返回的对象
# 第一次发送None启动生成器,返回空的未来对象
# 之后
next_future = self.coro.send(future.result)
except StopIteration:
return
# 新的未来对象添加回调
next_future.add_done_callback(self.step)
循环
def loop():
while not stopped:
# 阻塞,直到一个事件发生
events = epoller.select()
for fd,mask in events:
# 收到的是send事件和recv事件
callback = fd.data
callback()
总览
# coding=utf-8
import socket
from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
epoller = DefaultSelector()
stopped = False
urls_todo = {'/'+str(i) for i in range(10)}
# 未来对象,result存放未来的执行结果
class Future:
# 初始化结果和回调函数为空
def __init__(self):
self.result = None
self._callbacks = []
# 添加回调函数
def add_done_callback(self,fn):
self._callbacks.append(fn)
# 设置结果并执行回调
def set_result(self,result):
self.result = result
for fn in self._callbacks:
fn(self)
# 爬虫类
class Crawler:
def __init__(self,url):
self.url = url
self.sock = None
self.response = b''
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('flycold.cn',80))
except BlockingIOError:
pass
f = Future()
def on_connected():
f.set_result(None)
epoller.register(sock.fileno(),EVENT_WRITE,on_connected)
yield f
epoller.unregister(sock.fileno())
get = 'GET {0} HTTP/1.0\r\nHost:flycold.cn\r\n\r\n'.format(self.url)
sock.send(get.encode('utf-8'))
global stopped
while True:
f = Future()
# sock可读时的事件
def on_readable():
f.set_result(sock.recv(4096))
epoller.register(sock.fileno(),EVENT_READ,on_readable)
chunk = yield f
epoller.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
break
# 任务对象
class Task:
# 初始化,设置爬虫任务对象,执行step
def __init__(self,coro):
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)
# 单步执行
def step(self,future):
try:
# send 会进入到coro执行,即fetch,直到下次yield
# next_future 为yield返回的对象
# 第一次发送None启动生成器,返回空的未来对象
# 之后
next_future = self.coro.send(future.result)
except StopIteration:
return
# 新的未来对象添加回调
next_future.add_done_callback(self.step)
def loop():
while not stopped:
# 阻塞,直到一个事件发生
events = epoller.select()
for fd,mask in events:
# 收到的是send事件和recv事件
callback = fd.data
callback()
if __name__ == '__main__':
import time
start = time.time()
# 循环创建爬虫任务
for i in urls_todo:
x = Crawler(i)
# 启动爬虫协程,之后全部暂停在connect上
Task(x.fetch())
loop()
print(time.time()-start)