文章出處

Python自動化 【第十篇】:Python進階-多進程/協程/事件驅動與Select\Poll\Epoll異步IO

本節內容:

  1. 多進程
  2. 協程
  3. 事件驅動與Select\Poll\Epoll異步IO

 

1.  多進程

  啟動多個進程

  進程中啟進程

  父進程與子進程

 

  進程間通信

  不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:

a)   queues

  
#!/usr/bin/env python

# -*- coding:utf-8 -*-

from multiprocessing import Process, Queue

import queue

import threading

def f(qq):

    qq.put("hahaha123")

if __name__ == '__main__':

    #q = queue.Queue() #  線程queue不能直接傳給子進程

    q = Queue()

    p = Process(target=f, args=(q,))

    #p = threading.Thread(target=f, args=(q,))

    p.start()

    print(q.get())

    p.join()
queues

  父進程克隆了一個Queue,將克隆的Queue交給了子進程,當一個Queue對數據進行修改時,會將修改后的Queue數據序列化到某一位置,另一個Queue會從這個位置反序列化獲取數據,實現進程間的通信

b)   Pipes

  
#!/usr/bin/env python

# -*- coding:utf-8 -*- 

from multiprocessing import Process, Pipe

def f(conn):

    conn.send("qqqqqq")

    conn.send("qqqqqq2")

    print("from parent:", conn.recv())

    conn.close()

if __name__ == '__main__':

    parent_conn, chile_conn = Pipe()

    p = Process(target=f, args=(chile_conn,))

    p.start()

    print(parent_conn.recv())

    print(parent_conn.recv())

    parent_conn.send("hehehhe")

    p.join()
pipes

c)       Managers 實現進程間數據的共享,可以同時修改,而不是數據的傳遞

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
from multiprocessing import Process, Manager
import os
def f(d, l):
    d[os.getpid()] = os.getpid()

    l.append(os.getpid())
    print(l)
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict() # 生成一個字典可在多個進程間共享和傳遞
        l = manager.list()# 生成一個列表可在多個進程間共享和傳遞
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l,))
            p.start()
            p_list.append(p)
        for res in p_list:# 等待結果
            res.join()
        print(d)
        print(l)
managers  

 

  進程同步

  
#!/usr/bin/env python

# -*- coding:utf-8 -*-

from multiprocessing import Process,Lock

def f(l, i):

    l.acquire()

    print("hello world", i)

    l.release()

if __name__ == '__main__':

    lock = Lock()

    for num in range(10):

        Process(target=f, args=(lock, num)).start()
進程同步 

 

  進程池(生產中常用)

  
#!/usr/bin/env python

# -*- coding:utf-8 -*-

from multiprocessing import Process, Pool, freeze_support

import time, os

def Foo(i):

    time.sleep(2)

    print("in process ", os.getpid())

    return i

def Bar(args):

    print("--->", args, os.getpid())

if __name__ == '__main__':

    #freeze_support()

    pool = Pool(processes=5)

    print("main_process:", os.getpid())

    for i in range(10):

        #pool.apply(func=Foo, args=(i,))

        #pool.apply_async(func=Foo, args=(i,))

        pool.apply_async(func=Foo, args=(i,),callback=Bar)

    print('end')

    pool.close()

    pool.join() #進程池中進程執行完畢后再關閉,注釋后程序不等進程執行我那后就直接關閉了
進程池

 

2.  協程

  協程,微線程

  協程的好處:

  • 無需線程上下文切換的開銷
  • 無需原子操作的鎖定及同步的開銷
  • 方便切換控制流,簡化編程模型
  • 高并發+高擴展+低成本(一個cpu可支持上萬個協程)

     缺點:

  • 無法利用多核資源,需要和進程配合才能運行在多CPU上
  • 運行阻塞(blocking)操作會阻塞整個程序

 

  通過yield實現簡單的協程(單線程實現多并發效果):

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
def consumer(name):
    print("----->starting")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
        con.send(n) #
        con2.send(n)
if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()
yield

  greenlet實現協程手動切換

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
from greenlet import greenlet
def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()
def test2():
    print(56)
    gr1.switch()
    print(78)
    gr1.switch()
gr1 = greenlet(test1)#啟動一個協程
gr2 = greenlet(test2)
gr1.switch()
greenlet 

  gevent實現協程自動切換

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import  gevent
def foo():
    print("1")
    gevent.sleep(1) # 模仿IO切換
    print("2")
def bar():
    print("3")
    gevent.sleep(0)
    print("4")
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar)
])
gevent

 

  協程大并發下載網頁(urllib模塊):

  通過gevent調用urllib默認是阻塞的,加入monkey模塊,把所有的io操作加上標記實現并行操作

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
from gevent import monkey
from urllib import request
import gevent
monkey.patch_all() #把當前程序的所有的io操作單獨做上標記
def f(url):
    print("Get: %s" % url)
    res = request.urlopen(url)
    data = res.read()
    print("%d bytes received from %s" % (len(data), url))
gevent.joinall([
    gevent.spawn(f, "https://www.python.org/"),
    gevent.spawn(f, "https://www.yahoo.com/"),
    gevent.spawn(f, "https://github.com/")
])
View Code

   通過gevent實現單線程下的socket并發

    Server:

    
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
import sys
import socket
import time
import gevent
from gevent import socket, monkey
monkey.patch_all()
host = "0.0.0.0"
def server(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((host, port))
    s.listen(500)
    while True:
        conn, addr = s.accept()
        gevent.spawn(handle_request, conn) #前邊是函數,后邊是函數所需的參數
def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data.decode())
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)
    except Exception as e:
        print("\033[31;1merr\033[0m", e)
    finally:
        conn.close()
if __name__ == '__main__':
    server(5566)
server

    Client:

    
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
import socket
host = "localhost"
port = 5566
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    print("Recv:", repr(data))
s.close()
client

 

3.  事件驅動與異步IO

  通常,我們寫服務器處理模型的程序時,有以下幾種模型:

  (1)每收到一個請求,創建一個新的進程,來處理該請求;

  (2)每收到一個請求,創建一個新的線程,來處理該請求;

  (3)每收到一個請求,放入一個事件列表,讓主進程通過非阻塞I/O方式來處理請求

  上面的幾種方式,各有千秋,

  第(1)中方法,由于創建新的進程的開銷比較大,所以,會導致服務器性能比較差,但實現比較簡單。

  第(2)種方式,由于要涉及到線程的同步,有可能會面臨死鎖等問題。

  第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都復雜。

  綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數網絡服務器采用的方式

  事件驅動模型:

  事件驅動大體思路:

  a)有一個事件(消息)隊列

  b)鼠標按下時, 往這個隊列中增加一個點擊事件(消息)

  c)有個循環,部隊從隊列取出事件,根據不同的事件,調用不同的函數

  d)事件(消息)一般各自保存各自的處理函數指針,這樣,每個事件都有獨立的處理函數

  事件驅動編程是一種編程范式,這里程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程范式是(單線程)同步以及多線程編程。

  當我們面對如下的環境時,事件驅動模型通常是一個好的選擇:

  • 程序中有許多任務,而且…
  • 任務之間高度獨立(因此它們不需要互相通信,或者等待彼此)而且…
  • 在等待事件到來時,某些任務會阻塞。

  當應用程序需要在任務間共享可變的數據時,這也是一個不錯的選擇,因為這里不需要采用同步處理。

  網絡應用程序通常都有上述這些特點,這使得它們能夠很好的契合事件驅動編程模型。

      

  Select\Poll\Epoll異步IO

  select  1)最多能維護1024個socket  2)不知道具體哪個socket反回了數據

  poll     去掉了默認文件鏈接數的限制

  epoll   依然是io多路復用,(tornado)、(twisted)都用epoll, windows不支持epoll  可以知道哪個socket有數據,省去循環,此時數據還存放在內核態,需要用戶主動調用接收數據操作,如果這次用戶沒取數據,下次還繼續通知數據來了(水平觸發)

 

(一) 概念說明

- 用戶空間和內核空間
- 進程的阻塞
- 緩存 I/O

  現在操作系統都是采用虛擬存儲器,那么對32位操作系統而言,它的尋址空間(虛擬存儲空間)為4G(2的32次方)。操作系統的核心是內核,獨立于普通的應用程序,可以訪問受保護的內存空間,也有訪問底層硬件設備的所有權限。為了保證用戶進程不能直接操作內核(kernel),保證內核的安全,操心系統將虛擬空間劃分為兩部分,一部分為內核空間,一部分為用戶空間。針對linux操作系統而言,將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱為內核空間,而將較低的3G字節(從虛擬地址0x00000000到0xBFFFFFFF),供各個進程使用,稱為用戶空間。

 

  為了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,并恢復以前掛起的某個進程的執行。這種行為被稱為進程切換。因此可以說,任何進程都是在操作系統內核的支持下運行的,是與內核緊密相關的。

 

保存處理機上下文,包括程序計數器和其他寄存器。

  • 把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列。
  • 更新內存管理的數據結構。
  •   進程的阻塞

  文件描述符fd

  文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核為每一個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者創建一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫往往會圍繞著文件描述符展開。但是文件描述符這一概念往往只適用于UNIX、Linux這樣的操作系統。

  緩存 I/O 又被稱作標準 I/O,大多數文件系統的默認 I/O 操作都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操作系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操作系統內核的緩沖區中,然后才會從操作系統內核的緩沖區拷貝到應用程序的地址空間。

 

  數據在傳輸過程中需要在應用程序地址空間和內核進行多次數據拷貝操作,這些數據拷貝操作所帶來的 CPU 以及內存開銷是非常大的。

剛才說了,對于一次IO訪問(以read舉例),數據會先被拷貝到操作系統內核的緩沖區中,然后才會從操作系統內核的緩沖區拷貝到應用程序的地址空間。所以說,當一個read操作發生時,它會經歷兩個階段:

 

  • 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)

  - 阻塞 I/O(blocking IO)
  - I/O 多路復用( IO multiplexing)
  - 異步 I/O(asynchronous IO)

  阻塞 I/O(blocking IO)

 

  所以,blocking IO的特點就是在IO執行的兩個階段都被block了。

  linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:

 

  當用戶進程發出read操作時,如果kernel中的數據還沒有準備好,那么它并不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作后,并不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有準備好,于是它可以再次發送read操作。一旦kernel中的數據準備好了,并且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存,然后返回。

  I/O 多路復用( IO multiplexing)

 

  所以,I/O 多路復用的特點是通過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就可以返回。

  所以,如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢并不是對于單個連接能處理得更快,而是在于能處理更多的連接。)

 

1

select(rlist, wlist, xlist, timeout=None)

  select 函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。調用后select函數會阻塞,直到有描述副就緒(有數據 可讀、可寫、或者有except),或者超時(timeout指定等待時間,如果立即返回設為null即可),函數返回。當select函數返回后,可以 通過遍歷fdset,來找到就緒的描述符。

  select目前幾乎在所有的平臺上支持,其良好跨平臺支持也是它的一個優點。select的一 個缺點在于單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,可以通過修改宏定義甚至重新編譯內核的方式提升這一限制,但 是這樣也會造成效率的降低。

  poll

  int poll (struct pollfd *fds, unsigned int nfds, int timeout);

  不同與select使用三個位圖來表示三個fdset的方式,poll使用一個 pollfd的指針實現。

  struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

  pollfd結構包含了要監視的event和發生的event,不再使用select“參數-值”傳遞的方式。同時,pollfd并沒有最大數量限制(但是數量過大后性能也是會下降)。 和select函數一樣,poll返回后,需要輪詢pollfd來獲取就緒的描述符。

  從上面看,select和poll都需要在返回后,通過遍歷文件描述符來獲取已經就緒的socket。事實上,同時連接的大量客戶端在一時刻可能只有很少的處于就緒狀態,因此隨著監視的描述符數量的增長,其效率也會線性下降。

  epoll

  epoll是在2.6內核中提出的,是之前的select和poll的增強版本。相對于select和poll來說,epoll更加靈活,沒有描述符限制。epoll使用一個文件描述符管理多個描述符,將用戶關系的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。

  一 epoll操作過程

  epoll操作過程需要三個接口,分別如下:

  int epoll_create(int size);//創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大
  int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

  1)int epoll_create(int size);
  創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不同于select()中的第一個參數,給出最大監聽的fd+1的值,參數size并不是限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。
當創建好epoll句柄后,它就會占用一個fd值,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll后,必須調用close()關閉,否則可能導致fd被耗盡。

  2)int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  函數是對指定描述符fd執行op操作。
  - epfd:是epoll_create()的返回值。
  - op:表示op操作,用三個宏來表示:添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分別添加、刪除和修改對fd的監聽事件。
  - fd:是需要監聽的fd(文件描述符)
  - epoll_event:是告訴內核需要監聽什么事

  3)int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  等待epfd上的io事件,最多返回maxevents個事件。
  參數events用來從內核得到事件的集合,maxevents告之內核這個events有多大,這個maxevents的值不能大于創建epoll_create()時的size,參數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。該函數返回需要處理的事件數目,如返回0表示已超時。

 

  select方法(實現socketserver)

  傳遞給  select  的參數是這么幾個列表,分別表示讀事件、寫事件和錯誤事件。 select  方法返回三個列表,其中包含滿足條件的對象(讀、寫和異常)。      

  select 多并發socket 例子

        server

    
          #!/usr/bin/python

# -*- coding:utf-8 -*-

# Author:zhoujunlong

import select

import socket

import queue

server = socket.socket()

server.setblocking(0)

server_addr = ('localhost',10000)

print('starting up on %s port %s' % server_addr)

server.bind(server_addr)

server.listen(5)

inputs = [server, ] #自己也要監測呀,因為server本身也是個fd

outputs = []

message_queues = {}

while True:

    print("waiting for next event...")

    readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果沒有任何fd就緒,那程序就會一直阻塞在這里

    for s in readable: #每個s就是一個socket

        if s is server: #別忘記,上面我們server自己也當做一個fd放在了inputs列表里,傳給了select,如果這個s是server,代表server這個fd就緒了,

            #就是有活動了, 什么情況下它才有活動? 當然 是有新連接進來的時候 呀

            #新連接進來了,接受這個連接

            conn, client_addr = s.accept()

            print("new connection from",client_addr)

            conn.setblocking(0)

            inputs.append(conn) #為了不阻塞整個程序,我們不會立刻在這里開始接收客戶端發來的數據, 把它放到inputs里, 下一次loop時,這個新連接

            #就會被交給select去監聽,如果這個連接的客戶端發來了數據 ,那這個連接的fd在server端就會變成就續的,select就會把這個連接返回,返回到

            #readable 列表里,然后你就可以loop readable列表,取出這個連接,開始接收數據了, 下面就是這么干 的

            message_queues[conn] = queue.Queue() #接收到客戶端的數據后,不立刻返回 ,暫存在隊列里,以后發送

        else: #s不是server的話,那就只能是一個 與客戶端建立的連接的fd了

            #客戶端的數據過來了,在這接收

            data = s.recv(1024)

            if data:

                print("收到來自[%s]的數據:" % s.getpeername()[0], data)

                message_queues[s].put(data) #收到的數據先放到queue里,一會返回給客戶端

                if s not  in outputs:

                    outputs.append(s) #為了不影響處理與其它客戶端的連接 , 這里不立刻返回數據給客戶端

            else:#如果收不到data代表什么呢? 代表客戶端斷開了呀

                print("客戶端斷開了",s)

                if s in outputs:

                    outputs.remove(s) #清理已斷開的連接

                inputs.remove(s) #清理已斷開的連接

                del message_queues[s] ##清理已斷開的連接

    for s in writeable:

        try :

            next_msg = message_queues[s].get_nowait()

        except queue.Empty:

            print("client [%s]" %s.getpeername()[0], "queue is empty..")

            outputs.remove(s)

        else:

            print("sending msg to [%s]"%s.getpeername()[0], next_msg)

            s.send(next_msg.upper())

    for s in exeptional:

        print("handling exception for ",s.getpeername())

        inputs.remove(s)

        if s in outputs:

            outputs.remove(s)

        s.close()

        del message_queues[s]
server

    client

    
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
sock = socket.socket()
sock.bind(('localhost', 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)
client

  selectors模塊

  
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
sock = socket.socket()
sock.bind(('localhost', 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)
selectors模塊

文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()