阿布云

你所需要的,不仅仅是一个好用的代理。

Python的异步编程,从无到有(一)

阿布云 发表于

29.png

本文将通过一些例子来讲述作为Python开发者有哪些常用的方式来实现异步编程,以及分享个人对异步编程的理解,如有错误,欢迎指正。

先从一个例子说起。

小梁是一个忠实的电影好爱者,有一天,小梁看到豆瓣这个网站,发现了很多自己喜欢的内容,恰好小梁是个程序猿,于是心血来潮的他决定写个程序,把豆瓣Top250的电影列表给爬下来。小梁平时是个Python发烧友,做起这些事情来自然是得心应手,于是他欣喜地撸起袖子就是干!果不其然,不到十分钟,小梁就写好了第一个程序。

#-*- coding:utf-8 -*-

import urllib.request

import ssl

from lxml import etree

 

url = 'https://movie.douban.com/top250'

context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)

 

def fetch_page(url):

    response = urllib.request.urlopen(url, context=context)

    return response

 

def parse(url):

    response = fetch_page(url)

    page = response.read()

    html = etree.HTML(page)

 

    xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'

    xpath_title = './/span[@class="title"]'

    xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'

 

    pages = html.xpath(xpath_pages)

    fetch_list = []

    result = []

 

    for element_movie in html.xpath(xpath_movie):

        result.append(element_movie)

 

    for p in pages:

        fetch_list.append(url + p.get('href'))

 

    for url in fetch_list:

        response = fetch_page(url)

        page = response.read()

        html = etree.HTML(page)

        for element_movie in html.xpath(xpath_movie):

            result.append(element_movie)

 

    for i, movie in enumerate(result, 1):

        title = movie.find(xpath_title).text         

        print(i, title)

 

 

def main():

    parse(url)

 

if __name__ == '__main__':

    main()

程序也不出意外地正常运行。

1.png

但是,这个程序让人感觉比较慢,有多慢呢?小梁在主函数中加了下面一段代码。

def main():

    from time import time

    start = time()    

    for i in range(5):

        parse(url)

    end = time()

    print ('Cost {} seconds'.format((end - start) / 5))

发现总共耗时7.6秒!!

 

python movie.py

Cost 7.619797945022583 seconds

小梁不禁陷入了沉思...

2.png

小梁突然想起了两天前小张同学给他安利的一个库,叫 requests ,比那urllib,urllib2,urllib3,urllibn...不知高到哪里去了!小梁兴致勃勃地修改程序,用requests代替了标准库urllib。

import requests

from lxml import etree

from time import time

 

url = 'https://movie.douban.com/top250'

 

def fetch_page(url):

    response = requests.get(url)

    return response

 

def parse(url):

    response = fetch_page(url)

    page = response.content

    html = etree.HTML(page)

 

    xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'

    xpath_title = './/span[@class="title"]'

    xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'

 

    pages = html.xpath(xpath_pages)

    fetch_list = []

    result = []

 

    for element_movie in html.xpath(xpath_movie):

        result.append(element_movie)

 

    for p in pages:

        fetch_list.append(url + p.get('href'))

 

    for url in fetch_list:

        response = fetch_page(url)

        page = response.content

        html = etree.HTML(page)

        for element_movie in html.xpath(xpath_movie):

            result.append(element_movie)

 

    for i, movie in enumerate(result, 1):

        title = movie.find(xpath_title).text

        # print(i, title)

结果一测,6.5秒!虽然比用urllib快了1秒多,但是总体来说,他们基本还是处于同一水平线的,程序并没有快很多,这一点的差距或许是requests对请求做了优化导致的。

 

python movie_requests.py

Cost 6.540304231643677 seconds

小梁不禁暗想:是我的程序写的太挫了吗?会不会是lxml这个库解析的速度太慢了,用正则表达式会不会好一些?

 

于是小梁把lxml库换成了标准的re库。

 

#-*- coding:utf-8 -*-

import requests

from time import time

import re

 

url = 'https://movie.douban.com/top250'

 

def fetch_page(url):

    response = requests.get(url)

    return response

 

def parse(url):

    response = fetch_page(url)

    page = response.content

    

    fetch_list = set()

    result = []

 

    for title in re.findall(rb'<a href=.*\s.*<span class="title">(.*)</span>', page):

        result.append(title)

 

    for postfix in re.findall(rb'<a href="(\?start=.*?)"', page):

        fetch_list.add(url + postfix.decode())

 

    for url in fetch_list:

        response = fetch_page(url)

        page = response.content

        for title in re.findall(rb'<a href=.*\s.*<span class="title">(.*)</span>', page):

            result.append(title)

 

    for i, title in enumerate(result, 1):

        title = title.decode()

        # print(i, title)

再一跑,咦,又足足提升了将近一秒!

 

python movie_regex.py

Cost 5.578997182846069 seconds

小梁心里暗爽,程序变得更短了,运行得也更快了,感觉离成功越来越近了,但小梁眉头一皱,很快地意识到了一个问题,这样写出来的程序虽然看起来更短了,但所做的都是在盲目地求  ,但完全没有 扩展性 可言!虽然这样做可以满足普通的需求场景,但当程序逻辑变复杂时,依赖原生正则表达式的程序会更加难以维护!借助一些专门做这些事情的解析库,才能使程序变得清晰。其次,这种网络应用通常瓶颈都在IO层面,解决等待读写的问题比提高文本解析速度来的更有性价比!小梁想起了昨天上操作系统课时老师讲的多进程和多线程概念,正好用他们来解决实际问题。

3.png

#-*- coding:utf-8 -*-

import requests

from lxml import etree

from time import time

from threading import Thread

 

url = 'https://movie.douban.com/top250'

 

def fetch_page(url):

    response = requests.get(url)

    return response

 

def parse(url):

    response = fetch_page(url)

    page = response.content

    html = etree.HTML(page)

 

    xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'

    xpath_title = './/span[@class="title"]'

    xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'

 

    pages = html.xpath(xpath_pages)

    fetch_list = []

    result = []

 

    for element_movie in html.xpath(xpath_movie):

        result.append(element_movie)

 

    for p in pages:

        fetch_list.append(url + p.get('href'))

 

    def fetch_content(url):

        response = fetch_page(url)

        page = response.content

        html = etree.HTML(page)

        for element_movie in html.xpath(xpath_movie):

            result.append(element_movie)

 

    threads = []

    for url in fetch_list:

        t = Thread(target=fetch_content, args=[url])

        t.start()

        threads.append(t)

 

    for t in threads:

        t.join()

 

    for i, movie in enumerate(result, 1):

        title = movie.find(xpath_title).text

        # print(i, title)

效果果然立竿见影!多线程有效的解决了阻塞等待的问题,这个程序足足比之前的程序快了80%!只需要1.4秒就可完成电影列表的抓取。

 

python movie_multithread.py

Cost 1.451986598968506 seconds

但小梁还是觉得不够过瘾,既然Python的多线程也受制于GIL,为什么我不用多进程呢?于是话不多说又撸出了一个基于多进程的版本。用4个进程的进程池来并行处理网络数据。

 

#-*- coding:utf-8 -*-

import requests

from lxml import etree

from time import time

from concurrent.futures import ProcessPoolExecutor

 

url = 'https://movie.douban.com/top250'

 

def fetch_page(url):

    response = requests.get(url)

    return response

 

def fetch_content(url):

    response = fetch_page(url)

    page = response.content

    return page

 

def parse(url):

    page = fetch_content(url)

    html = etree.HTML(page)

 

    xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'

    xpath_title = './/span[@class="title"]'

    xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'

 

    pages = html.xpath(xpath_pages)

    fetch_list = []

    result = []

 

    for element_movie in html.xpath(xpath_movie):

        result.append(element_movie)

 

    for p in pages:

        fetch_list.append(url + p.get('href'))

 

    with ProcessPoolExecutor(max_workers=4) as executor:

        for page in executor.map(fetch_content, fetch_list):

            html = etree.HTML(page)

            for element_movie in html.xpath(xpath_movie):

                result.append(element_movie)

 

    for i, movie in enumerate(result, 1):

        title = movie.find(xpath_title).text

        # print(i, title)

结果是2秒,甚至还不如多线程的版本。

 

python movie_multiprocess.py

Cost 2.029435634613037 seconds

(注:ThreadPoolExecutor和ProcessPoolExecutor是Python3.2之后引入的分别对线程池和进程池的一个封装,如果使用Python2.x,需要安装 futures 这个库才能使用它们。)

小梁立马就傻眼了,这跟他的预期完全不符合啊。

4.png

多进程带来的优点(cpu处理)并没有得到体现, 反而创建和调度进程带来的开销要远超出它的正面效应 ,拖了一把后腿。即便如此,多进程带来的效益相比于之前单进程单线程的模型要好得多。

5.png

正当小梁在苦苦思索还有什么方法可以提高性能时,他无意中看到一篇文章,里面提到了协程相比于多进程和多线程的优点( 多进程和多线程除了创建的开销大之外还有一个难以根治的缺陷,就是处理进程之间或线程之间的协作问题,因为是依赖多进程和多线程的程序在不加锁的情况下通常是不可控的,而协程则可以完美地解决协作问题,由用户来决定协程之间的调度。 ),小梁折腾起来也是不甘人后啊,他搜索了一些资料,思考如何用协程来加强自己的程序。

很快,小梁就发现了一个基于协程的网络库,叫做gevent,而且更爽的是,听说用了gevent的猴子补丁后,整个程序就会变成异步的了!

真的有那么神奇吗?小梁迫不及待地要看看这到底是什么黑科技!马上写出了基于gevent的栗子:

 

#-*- coding:utf-8 -*-

import requests

from lxml import etree

from time import time

import gevent

from gevent import monkey

monkey.patch_all()

 

url = 'https://movie.douban.com/top250'

 

def fetch_page(url):

    response = requests.get(url)

    return response

 

def fetch_content(url):

    response = fetch_page(url)

    page = response.content

    return page

 

def parse(url):

    page = fetch_content(url)

    html = etree.HTML(page)

 

    xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'

    xpath_title = './/span[@class="title"]'

    xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'

 

    pages = html.xpath(xpath_pages)

    fetch_list = []

    result = []

 

    for element_movie in html.xpath(xpath_movie):

        result.append(element_movie)

 

    for p in pages:

        fetch_list.append(url + p.get('href'))

 

    jobs = [gevent.spawn(fetch_content, url) for url in fetch_list]

    gevent.joinall(jobs)

    [job.value for job in jobs]

 

    for page in [job.value for job in jobs]:

        html = etree.HTML(page)

        for element_movie in html.xpath(xpath_movie):

            result.append(element_movie)

 

    for i, movie in enumerate(result, 1):

        title = movie.find(xpath_title).text

        # print(i, title)

只有1.2秒,果然很快!而且我们看整个程序,几乎看不到有异步处理的影子, gevent给予了我们一种以同步逻辑来书写异步程序的能力

,看monkey.patch_all()这段代码,它是整个程序实现异步的黑科技,当我们给程序打了猴子补丁后,Python程序在运行时会动态地将一些网络库(例如socket,thread)替换掉,变成异步的库。使得程序在进行网络操作的时候都变成异步的方式去工作,效率就自然提升很多了。

 

python movie_gevent.py

Cost 1.2647549629211425 seconds

虽然程序变得很快了,但小梁整个人都是懵逼的啊,gevent的魔术给他带来了一定的困惑,而且他觉得gevent这玩意实在不好学,跟他心目中Pythonic的清晰优雅还是有距离的。Python社区也意识到Python需要一个独立的标准库来支持协程,于是就有了后来的asyncio。

 

小梁把同步的requests库改成了支持asyncio的aiohttp库,使用3.5的async/await语法( 3.5之前用@asyncio.coroutine和yield from代替 )写出了协程版本的例子。

#-*- coding:utf-8 -*-

from lxml import etree

from time import time

import asyncio

import aiohttp

 

url = 'https://movie.douban.com/top250'

 

async def fetch_content(url):

    async with aiohttp.ClientSession() as session:

        async with session.get(url) as response:

            return await response.text()

 

async def parse(url):

    page = await fetch_content(url)

    html = etree.HTML(page)

 

    xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'

    xpath_title = './/span[@class="title"]'

    xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'

 

    pages = html.xpath(xpath_pages)

    fetch_list = []

    result = []

 

    for element_movie in html.xpath(xpath_movie):

        result.append(element_movie)

 

    for p in pages:

        fetch_list.append(url + p.get('href'))

 

    tasks = [fetch_content(url) for url in fetch_list]

    pages = await asyncio.gather(*tasks)

 

    for page in pages:

        html = etree.HTML(page)

        for element_movie in html.xpath(xpath_movie):

            result.append(element_movie)

 

    for i, movie in enumerate(result, 1):

        title = movie.find(xpath_title).text

        # print(i, title)

 

 

def main():

    loop = asyncio.get_event_loop()    

    start = time()    

    for i in range(5):

        loop.run_until_complete(parse(url))

    end = time()

    print ('Cost {} seconds'.format((end - start) / 5))

    loop.close()

1.7秒,也不错。而且用上了async/await语法使得程序的可读性提高了不少。

 

python movie_asyncio.py

Cost 1.713043785095215 seconds

经过一番洗礼后,小梁对异步有了更加深刻的认识。异步方式有很多,这里列出了比较常见的几种,在实际使用中,应该根据使用场景来挑选最合适的应用方案,影响程序效率的因素有很多,以上不同的异步方式在不同的场景下也会有不一样的表现,不要抱死在一个大树上,该用同步的地方用同步,该用异步的地方异步,这样才能构建出更加灵活的网络应用。

 

说到这里你估计也明白了, 清晰优雅的协程可以说实现异步的最优方案之一 。

 

协程的机制使得我们可以用同步的方式写出异步运行的代码。

 

总所周知,Python因为有GIL(全局解释锁)这玩意,不可能有真正的多线程的存在,因此很多情况下都会用multiprocessing实现并发,而且在Python中应用多线程还要注意关键地方的同步,不太方便,用协程代替多线程和多进程是一个很好的选择,因为它吸引人的特性: 主动调用/退出,状态保存,避免cpu上下文切换 等…