阿布云

你所需要的,不仅仅是一个好用的代理。

Python开发(1):协程、异步IO

阿布云 发表于

6.png

协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程,协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切换回来的时候,恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法,进入上一次离开时所处逻辑流的位置。

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后A执行完毕。

所以子程序调用时通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。

协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

  1. def a():

  2. print(" 1 ")

  3. print(" 2 ")

  4. print(" 3 ")

  5.  
  6. def b():

  7. print(" x ")

  8. print(" y ")

  9. print(" z ")

假设由程序执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:

  1. 1

  2. 2

  3. x

  4. y

  5. 3

  6. z

但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。看起来A、B的执行有点像多线程,但协程的特点在是一个线程执行,和多线程比协程有何优势?

最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是有程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

因为协程是一个线程执行,那么怎么利用多核CPU呢?最简单的方法是多进程加协程,既充分利用多核,有充分发挥协程的高效率,可获得极高的性能。

协程的优点:

无需线程上下文切换的开销。

无需原子操作锁定及同步的开销。原子操作(atomic operation)是不需要synchronized,所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何context switch(切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。

方便切换控制流,简化编程模型。

高并发+高扩展性+低成本。一个CPU支持上万的协程都不是问题,所以很适合用于高并发处理。

协程的缺点:

无法利用多核资源。协程的本质是个单线程,它不能同时将单个CPU的多个核用上,协程需要和进程配合才能运行在多CPU上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是CPU密集型应用。

进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序。

使用yield实现协程操作。

  1. import time,queue

  2.  
  3. def consumer(name):

  4. print(" -->starting eating xoxo ")

  5. while True:

  6. new_xo = yield

  7. print(" %s is eating xoxo %s "%(name,new_xo))

  8.  
  9. def producer():

  10. r = con.__next__()

  11. r = con2.__next__()

  12. n = 0

  13. while n < 5:

  14. n += 1

  15. con.send(n)

  16. con2.send(n)

  17. print(" \033[32;1mproducer\033[0m is making xoxo %s "%n)

  18.  
  19. if __name__ == " __main__ ":

  20. con = consumer(" c1 ")

  21. con2 = consumer(" c2 ")

  22. p = producer()

  23. 输出:

  24. -->starting eating xoxo

  25. -->starting eating xoxo

  26. c1 is eating xoxo 1

  27. c2 is eating xoxo 1

  28. producer is making xoxo 1

  29. c1 is eating xoxo 2

  30. c2 is eating xoxo 2

  31. producer is making xoxo 2

  32. c1 is eating xoxo 3

  33. c2 is eating xoxo 3

  34. producer is making xoxo 3

  35. c1 is eating xoxo 4

  36. c2 is eating xoxo 4

  37. producer is making xoxo 4

  38. c1 is eating xoxo 5

  39. c2 is eating xoxo 5

  40. producer is making xoxo 5

协程的特点:

1、必须在只有一个单线程里实现并发。

2、修改共享数据不需加锁。

3、用户程序里自己保持多个控制流的上下文栈。

4、一个协程遇到IO操作自动切换到其它协程。

刚才yield实现的不能算是合格的协程。

Python对协程的支持是通过generator实现的。在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回到下一个值。但是python的yield不但可以返回一个值,它可以接收调用者发出的参数。

Greenlet

greenlet是一个用C实现的协程模块,相比于Python自带的yield,它可以在任意函数之间随意切换,而不需把这个函数声明为generator。

  1. from greenlet import greenlet

  2.  
  3. def f1():

  4. print(11)

  5. gr2. switch ()

  6. print(22)

  7. gr2. switch ()

  8.  
  9. def f2():

  10. print(33)

  11. gr1. switch ()

  12. print(44)

  13.  
  14. gr1 = greenlet(f1)

  15. gr2 = greenlet(f2)

  16. gr1. switch ()

  17. 输出:

  18. 11

  19. 33

  20. 22

  21. 44

以上例子还有一个问题没有解决,就是遇到IO操作自动切换。

Gevent

Gevent是一个第三方库,可以轻松提供gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

  1. import gevent

  2.  
  3. def foo():

  4. print(" Running in foo ")

  5. gevent.sleep()

  6. print(" Explicit contenxt switch to foo agin ")

  7.  
  8. def bar():

  9. print(" Explicit context to bar ")

  10. gevent.sleep(1)

  11. print(" Implict context switch back to bar ")

  12.  
  13. def func3():

  14. print(" running func3 ")

  15. gevent.sleep(0)

  16. print(" running func3 again ")

  17.  
  18. gevent.joinall([

  19. gevent.spawn(foo),

  20. gevent.spawn(bar),

  21. gevent.spawn(func3),

  22. ])

  23. 输出:

  24. Running in foo

  25. Explicit context to bar

  26. running func3

  27. Explicit contenxt switch to foo agin

  28. running func3 again

  29. Implict context switch back to bar

同步与异步的性能区别

  1. import gevent

  2.  
  3. def f1(pid):

  4. gevent.sleep(0.5)

  5. print(" F1 %s done "%pid)

  6.  
  7. def f2():

  8. for i in range(10):

  9. f1(i)

  10.  
  11. def f3():

  12. threads = [gevent.spawn(f1,i) for i in range(10)]

  13. gevent.joinall(threads)

  14.  
  15. print(" f2 ")

  16. f2()

  17. print(" f3 ")

  18. f3()

  19. 输出:

  20. f2

  21. F1 0 done

  22. F1 1 done

  23. F1 2 done

  24. F1 3 done

  25. F1 4 done

  26. F1 5 done

  27. F1 6 done

  28. F1 7 done

  29. F1 8 done

  30. F1 9 done

  31. f3

  32. F1 0 done

  33. F1 4 done

  34. F1 8 done

  35. F1 7 done

  36. F1 6 done

  37. F1 5 done

  38. F1 1 done

  39. F1 3 done

  40. F1 2 done

  41. F1 9 done

上面程序的重要部分是将f1函数封装到Greenlet内部线程的gevent.spawn。初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在所有greenlet执行完后才会继续向下走。

IO阻塞自动切换任务

  1. from urllib import request

  2. import gevent,time

  3. from gevent import monkey

  4.  
  5. # 把当前程序的所有的id操作给单独的做上标记

  6. monkey.patch_all()

  7. def f(url):

  8. print(" GET:%s "%url)

  9. resp = request.urlopen(url)

  10. data = resp.read()

  11. f = open(" load.txt "," wb ")

  12. f.write(data)

  13. f.close()

  14. print(" %d bytes received from %s. "%(len(data),url))

  15.  
  16. urls = ['https: //www.python.org/',

  17. 'http: //www.cnblogs.com/yinshoucheng-golden/',

  18. 'https: //github.com/']

  19. time_start = time.time()

  20. for url in urls:

  21. f(url)

  22. print(" 同步cost ",time.time() - time_start)

  23.  
  24. async_time_start = time.time()

  25. gevent.joinall([

  26. gevent.spawn(f,'https: //www.python.org/'),

  27. gevent.spawn(f,'http: //www.cnblogs.com/yinshoucheng-golden/'),

  28. gevent.spawn(f,'https: //github.com/'),

  29. ])

  30. print(" 异步cost ",time.time() - async_time_start)

通过gevent实现单线程下的多socket并发

server side

  1. import sys,socket,time,gevent

  2.  
  3. from gevent import socket,monkey

  4. monkey.patch_all()

  5.  
  6. def server(port):

  7. s = socket.socket()

  8. s.bind((" 0.0.0.0 ",port))

  9. s.listen(500)

  10. while True:

  11. cli,addr = s.accept()

  12. gevent.spawn(handle_request,cli)

  13.  
  14. def handle_request(conn):

  15. try :

  16. while True:

  17. data = conn.recv(1024)

  18. print(" recv: ",data)

  19. if not data:

  20. conn.shutdown(socket.SHUT_WR)

  21. conn.send(data)

  22. except Exception as ex:

  23. print(ex)

  24. finally :

  25. conn.close()

  26.  
  27. if __name__ == " __main__ ":

  28. server(6969)

client side

  1. import socket

  2.  
  3. HOST = " localhost "

  4. PORT = 6969

  5. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

  6. s.connect((HOST,PORT))

  7. while True:

  8. msg = bytes(input(" >>: "),encoding=" utf8 ")

  9. s.sendall(msg)

  10. data = s.recv(1024)

  11. # print(data)

  12. print(" Received ",repr(data))

  13.  
  14. s.close()

socket并发

  1. import socket,threading

  2.  
  3. def sock_conn():

  4. client = socket.socket()

  5. client.connect((" localhost ",6969))

  6. count = 0

  7.  
  8. while True:

  9. client.send((" hello %s "%count).encode(" utf-8 "))

  10. data = client.recv(1024)

  11. print(" %s from server:%s "%(threading.get_ident(),data.decode()))

  12. count += 1

  13. client.close()

  14.  
  15. for i in range(100):

  16. t = threading.Thread(target=sock_conn)

  17. t.start()