spider - 手写

同步

1、同步

1
2
3
4
5
6
7
8
9
10
11
import requests


def fetch_async(url):
response = requests.get(url)
return response

url_list = ['http://www.github.com', 'http://www.baidu.com']

for url in url_list:
fetch_async(url)

2、多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
from concurrent.futures import ThreadPoolExecutor
import requests


def fetch_async(url):
response = requests.get(url)
return response

url_list = ['http://www.github.com', 'http://www.bing.com']
pool = ThreadPoolExecutor(5)
for url in url_list:
pool.submit(fetch_async, url)
pool.shutdown(wait=True)

3、多线程+回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from concurrent.futures import ThreadPoolExecutor
import requests


def fetch_async(url):
response = requests.get(url)
return response

def callback(future):
print(future.result())

url_list = ['http://www.github.com', 'http://www.bing.com']
pool = ThreadPoolExecutor(5)
for url in url_list:
v = pool.submit(fetch_async, url)
v.add_done_callback(callback)
pool.shutdown(wait=True)

4、多进程

1
2
3
4
5
6
7
8
9
10
11
12
from concurrent.futures import ProcessPoolExecutor
import requests

def fetch_async(url):
response = requests.get(url)
return response

url_list = ['http://www.github.com', 'http://www.bing.com']
pool = ProcessPoolExecutor(5)
for url in url_list:
pool.submit(fetch_async, url)
pool.shutdown(wait=True)

5、多进程+回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from concurrent.futures import ProcessPoolExecutor
import requests


def fetch_async(url):
response = requests.get(url)
return response

def callback(future):
print(future.result())

url_list = ['http://www.github.com', 'http://www.bing.com']
pool = ProcessPoolExecutor(5)
for url in url_list:
v = pool.submit(fetch_async, url)
v.add_done_callback(callback)
pool.shutdown(wait=True)

通过上述代码均可以完成对请求性能的提高,对于多线程和多进行的缺点是在IO阻塞时会造成了线程和进程的浪费,所以首选异步IO:

异步

asyncio

1、asyncio.sleep

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio


@asyncio.coroutine
def func1():
print('before...func1......')
yield from asyncio.sleep(5)
print('end...func1......')

tasks = [func1(), func1()]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

2、asyncio.open_connection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio


@asyncio.coroutine
def fetch_async(host, url='/'):
print(host, url)
reader, writer = yield from asyncio.open_connection(host, 80)

request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)
request_header_content = bytes(request_header_content, encoding='utf-8')

writer.write(request_header_content)
yield from writer.drain()
text = yield from reader.read()
print(host, url, text)
writer.close()

tasks = [
fetch_async('www.cnblogs.com', '/wupeiqi/'),
fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

3、asyncio+aiohttp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import aiohttp
import asyncio


@asyncio.coroutine
def fetch_async(url):
print(url)
response = yield from aiohttp.request('GET', url)
# data = yield from response.read()
# print(url, data)
print(url, response)
response.close()

tasks = [fetch_async('http://www.google.com/'), fetch_async('http://www.chouti.com/')]

event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()

4、asynico+requests

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import requests


@asyncio.coroutine
def fetch_async(func, *args):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, func, *args)
response = yield from future
print(response.url, response.content)

tasks = [
fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

5、gevert+requests

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import gevent
import requests
from gevent import monkey

monkey.patch_all()


def fetch_async(method, url, req_kwargs):
print(method, url, req_kwargs)
response = requests.request(method=method, url=url, **req_kwargs)
print(response.url, response.content)

# ##### 发送请求 #####
gevent.joinall([
gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
])

# ##### 发送请求(协程池控制最大协程数量) #####
# from gevent.pool import Pool
# pool = Pool(None)
# gevent.joinall([
# pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
# pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
# pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),
# ])

6、grequests

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import grequests


request_list = [
grequests.get('http://httpbin.org/delay/1', timeout=0.001),
grequests.get('http://fakedomain/'),
grequests.get('http://httpbin.org/status/500')
]

# ##### 执行并获取响应列表 #####
# response_list = grequests.map(request_list)
# print(response_list)

# ##### 执行并获取响应列表(处理异常) #####
# def exception_handler(request, exception):
# print(request,exception)
# print("Request failed")

# response_list = grequests.map(request_list, exception_handler=exception_handler)
# print(response_list)

7、Twisted 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from twisted.web.client import getPage, defer
from twisted.internet import reactor


def all_done(arg):
reactor.stop()

def callback(contents):
print(contents)

deferred_list = []

url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
deferred = getPage(bytes(url, encoding='utf8'))
deferred.addCallback(callback)
deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)
reactor.run()

8、tornado

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop


def handle_response(response):
"""
处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop()
:param response:
:return:
"""
if response.error:
print("Error:", response.error)
else:
print(response.body)

def func():
url_list = [
'http://www.baidu.com',
'http://www.bing.com',
]
for url in url_list:
print(url)
http_client = AsyncHTTPClient()
http_client.fetch(HTTPRequest(url), handle_response)

ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()

Twisted 更多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from twisted.internet import reactor  
from twisted.web.client import getPage
import urllib.parse

def one\_done(arg):
print(arg)
reactor.stop()

post\_data = urllib.parse.urlencode({‘check\_data’: ‘adf’})
post\_data = bytes(post\_data, encoding=‘utf8’)
headers = {b’Content-Type’: b’application/x-www-form-urlencoded’}
response = getPage(bytes(‘http://dig.chouti.com/login’, encoding=‘utf8’),
method=bytes(‘POST’, encoding=‘utf8’),
postdata=post\_data,
cookies={},
headers=headers)
response.addBoth(one\_done)

reactor.run()

以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】:

异步IO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import select
import socket
import time


class AsyncTimeoutException(TimeoutError):
"""
请求超时异常类
"""

def __init__(self, msg):
self.msg = msg
super(AsyncTimeoutException, self).__init__(msg)


class HttpContext(object):
"""封装请求和相应的基本数据"""

def __init__(self, sock, host, port, method, url, data, callback, timeout=5):
"""
sock: 请求的客户端socket对象
host: 请求的主机名
port: 请求的端口
port: 请求的端口
method: 请求方式
url: 请求的URL
data: 请求时请求体中的数据
callback: 请求完成后的回调函数
timeout: 请求的超时时间
"""
self.sock = sock
self.callback = callback
self.host = host
self.port = port
self.method = method
self.url = url
self.data = data

self.timeout = timeout

self.__start_time = time.time()
self.__buffer = []

def is_timeout(self):
"""当前请求是否已经超时"""
current_time = time.time()
if (self.__start_time + self.timeout) < current_time:
return True

def fileno(self):
"""请求sockect对象的文件描述符,用于select监听"""
return self.sock.fileno()

def write(self, data):
"""在buffer中写入响应内容"""
self.__buffer.append(data)

def finish(self, exc=None):
"""在buffer中写入响应内容完成,执行请求的回调函数"""
if not exc:
response = b''.join(self.__buffer)
self.callback(self, response, exc)
else:
self.callback(self, None, exc)

def send_request_data(self):
content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s""" % (
self.method.upper(), self.url, self.host, self.data,)

return content.encode(encoding='utf8')


class AsyncRequest(object):
def __init__(self):
self.fds = []
self.connections = []

def add_request(self, host, port, method, url, data, callback, timeout):
"""创建一个要请求"""
client = socket.socket()
client.setblocking(False)
try:
client.connect((host, port))
except BlockingIOError as e:
pass
# print('已经向远程发送连接的请求')
req = HttpContext(client, host, port, method, url, data, callback, timeout)
self.connections.append(req)
self.fds.append(req)

def check_conn_timeout(self):
"""检查所有的请求,是否有已经连接超时,如果有则终止"""
timeout_list = []
for context in self.connections:
if context.is_timeout():
timeout_list.append(context)
for context in timeout_list:
context.finish(AsyncTimeoutException('请求超时'))
self.fds.remove(context)
self.connections.remove(context)

def running(self):
"""事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""
while True:
r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)

if not self.fds:
return

for context in r:
sock = context.sock
while True:
try:
data = sock.recv(8096)
if not data:
self.fds.remove(context)
context.finish()
break
else:
context.write(data)
except BlockingIOError as e:
break
except TimeoutError as e:
self.fds.remove(context)
self.connections.remove(context)
context.finish(e)
break

for context in w:
# 已经连接成功远程服务器,开始向远程发送请求数据
if context in self.fds:
data = context.send_request_data()
context.sock.sendall(data)
self.connections.remove(context)

self.check_conn_timeout()


if __name__ == '__main__':
def callback_func(context, response, ex):
"""
:param context: HttpContext对象,内部封装了请求相关信息
:param response: 请求响应内容
:param ex: 是否出现异常(如果有异常则值为异常对象;否则值为None)
:return:
"""
print(context, response, ex)

obj = AsyncRequest()
url_list = [
{'host': 'www.google.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
'callback': callback_func},
{'host': 'www.baidu.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
'callback': callback_func},
{'host': 'www.bing.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
'callback': callback_func},
]
for item in url_list:
print(item)
obj.add_request(**item)

obj.running()

此文为转载!!!


spider - 手写
https://flepeng.github.io/021-Python-spider-spider-手写/
作者
Lepeng
发布于
2021年7月30日
许可协议