Python自動化 【第九篇】:Python基礎-線程、進程及python GIL全局解釋器鎖
本節內容:
- 進程與線程區別
- 線程
- a) 語法
- b) join
- c) 線程鎖之Lock\Rlock\信號量
- d) 將線程變為守護進程
- e) Event事件
- f) queue隊列
- g) 生產者消費者模型
3. python GIL全局解釋器鎖
1. 進程與線程區別
線程:是操作系統能夠進行運算和調度的最小單位,是一堆指令的集合。線程被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發多個線程,每條線程并行執行不同的任務。線程就是cpu執行時所需要的一堆上下文關系。
進程:以一個整體的形式暴露給操作系統管理,里面包含對各種資源的調用,內存的管理,網絡接口的調用等,對各種資源管理的集合就可稱為 進程。進程要操作cpu, 必須先創建一個線程。
進程和線程的區別:
- 線程共享內存空間,進程的內存是獨立的
- 線程共用數據,進程數據獨立
- 同一個進程的線程之間可以直接交流,兩個進程必須通過中間代理實現通信
- 新線程容易創建,新進程需要克隆父進程
- 一個線程可以控制和操作同一進程里的其他線程, 進程只能操作子進程
修改主線程有可能影響到其他線程的行為,對父進程修改不會影響子進程。
2. 線程(threading模塊)
a) 語法
先寫一個簡單的線程:


import threading import time def run(n): print("task" ,n) time.sleep(2) t1 = threading.Thread(target=run, args=("t1",)) t2 = threading.Thread(target=run, args=("t2",)) t1.start() t2.start() print(t1.getName) #獲取線程名 print(t2.getName)
繼承式調用:


import threading import time class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): # 定義每個線程要運行的函數 print("running on number:%s" % self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
啟動多個線程:


import threading import time def run(n): print("task" ,n) time.sleep(2) for i in range(50): t = threading.Thread(target=run, args=("t_%s" % i,)) t.start()
b) join
join & Daemon用法:
默認情況主線程不會等子線程執行完畢,但是join可以做到


import threading import time def run(n): print("task" ,n) time.sleep(2) print("task done ", n) start_time = time.time() t_objs = [] for i in range(50): t = threading.Thread(target=run, args=("t_%s" % i,)) t.start() t_objs.append(t) for t in t_objs: t.join() print("cost_time:",time.time()-start_time)
主線程是程序本身
threading.current_thread()和threading.active_count()用法:


import threading import time def run(n): print("task" ,n) time.sleep(2) print("task done ", n,threading.current_thread()) start_time = time.time() t_objs = [] for i in range(50): t = threading.Thread(target=run, args=("t_%s" % i,)) t.start() t_objs.append(t) # for t in t_objs: # t.join() print("===all threads has finished", threading.current_thread(), threading.active_count()) print("cost_time:",time.time()-start_time)
c) 信號量:
threading.BoundedSemaphore(n)
互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 。

d) 把子線程變成守護線程setDaemod()方法:


#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: zhoujunlong import threading import time def run(n): print("task" ,n) time.sleep(2) print("task done ", n,threading.current_thread()) start_time = time.time() for i in range(50): t = threading.Thread(target=run, args=("t_%s" % i,)) t.setDaemon(True) # 把當前線程設置為守護線程 , 在start之前 t.start() print("===all threads has finished", threading.current_thread(), threading.active_count()) print("cost_time:",time.time()-start_time)
e) 事件:
事件是一個簡單地同步對象
event = threading.event()
event.wait() 等待標志位被設定
event.set() 設置標志位
event.clear() 清除標志位
event.is_set() 判斷標志位是否設定
通過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。


#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: zhoujunlong import threading import time event = threading.Event() def lighter(): count = 0 event.set() while True: if 10>=count > 5:#change_to_red_light event.clear() # 清空標志位 print("\033[41;1m紅燈了\033[0m") elif count > 10: event.set() #change_to_green_light count = 0 else:print("\033[42;1m綠燈了\033[0m") time.sleep(1) count += 1 def car(name): while True: if event.is_set():#代表綠燈 print("[%s] running..." % name) time.sleep(1) else: print("[%s] sees red light, waiting" % name) event.wait() print("\033[34;1m[%s] 綠燈了,gogogo\033[0m"%name) light = threading.Thread(target=lighter) light.start() car1 = threading.Thread(target=car,args=("QQ",)) car2 = threading.Thread(target=car,args=("TT",)) car1.start() car2.start()
這里還有一個event使用的例子,員工進公司門要刷卡, 我們這里設置一個線程是“門”, 再設置幾個線程為“員工”,員工看到門沒打開,就刷卡,刷完卡,門開了,員工就可以通過。


import threading import time import random def door(): door_open_time_counter = 0 while True: if door_swiping_event.is_set(): print("\033[32;1mdoor opening....\033[0m") door_open_time_counter +=1 else: print("\033[31;1mdoor closed...., swipe to open.\033[0m") door_open_time_counter = 0 #清空計時器 door_swiping_event.wait() if door_open_time_counter > 3:#門開了已經3s了,該關了 door_swiping_event.clear() time.sleep(0.5) def staff(n): print("staff [%s] is comming..." % n ) while True: if door_swiping_event.is_set(): print("\033[34;1mdoor is opened, passing.....\033[0m") break else: print("staff [%s] sees door got closed, swipping the card....." % n) print(door_swiping_event.set()) door_swiping_event.set() print("after set ",door_swiping_event.set()) time.sleep(0.5) door_swiping_event = threading.Event() #設置事件 door_thread = threading.Thread(target=door) door_thread.start() for i in range(5): p = threading.Thread(target=staff,args=(i,)) time.sleep(random.randrange(3)) p.start()
f) queue(隊列)
class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #先入先出 #last in first out
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列
- 實現程序的解耦
- 提高運行效率


>>> import queue >>> q = queue.Queue >>> q.put("disk1") >>> q.put("disk2") >>> q.put("disk3") >>q.qsize() 3 >>>q.get() 'disk1' >>>q.get() 'disk2' >>>q.get() 'disk3' >>>q.get_nowait()
拋異常,不會卡住
maxsize()方法:


>>>q = queue.Queue(maxsize=3) >>>q.put(1) >>>q.put(2) >>>q.put(3) >>>q.put(4)
再put時會卡住
Lifo


import queue q = queue.LifoQueue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get())
程序輸出:


3 2 1
Priority方法:


import queue q = queue.PriorityQueue() q.put(("-1a1")) q.put(("5, a2")) q.put(("2, a3")) print(q.get()) print(q.get()) print(q.get())
程序輸出:


-1, a1 2, a3 5, a2
Queue.task_done() 以下為解釋:
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
g) 生產者消費者模型
在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式?
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
什么是生產者消費者模式?
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
下面來學習一個最基本的生產者消費者模型的例子:


import threading import queue def producer(): for i in range(10): q.put("骨頭 %s" % i ) print("開始等待所有的骨頭被取走...") q.join() print("所有的骨頭被取完了...") def consumer(n): while q.qsize() >0: print("%s 取到" %n, q.get()) q.task_done() #告知這個任務執行完了 q = queue.Queue() p = threading.Thread(target=producer,) p.start() c1 = consumer("Jack")
再來一個:


import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <20: time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 def Consumer(name): count = 0 while count <20: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) p1.start() c1.start()
3. GIL全局解釋器鎖(面試必會)
Python中無論幾核,同一時間只有一個線程在執行。
線程鎖(互斥鎖)
一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味著每個線程可以訪問同一份數據,此時,如果2個線程同時要修改同一份數據,會出現什么狀況?


import threading import time def run(n): global num time.sleep(2) num += 1 num = 0 t_objs = [] for i in range(1000): t = threading.Thread(target=run, args=("t_%s" % i,)) t.start() t_objs.append(t) for t in t_objs: t.join() print("===all threads has finished") print("num:", num)
正常來講,這個num結果應該是1000, 但在python 2.7上多運行幾次,會發現,最后打印出來的num結果不總是1000,為什么每次運行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操作, 由于2個線程是并發同時運行的,所以2個線程很有可能同時拿走了num=0這個初始變量交給cpu去運算,當A線程去處完的結果是1,但此時B線程運算完的結果也是1,兩個線程同時CPU運算的結果再賦值給num變量后,結果就都是1。那怎么辦呢? 很簡單,每個線程在要修改公共數據時,為了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢并把鎖釋放掉后才能再訪問此數據。
為了讓上邊代碼每次都輸出num是1000的話,必須再加一把鎖:


import threading, time def run1(): print("grab the first part data") lock.acquire() global num num += 1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2 += 1 lock.release() return num2 def run3(): lock.acquire() res = run1() print('--------between run1 and run2-----') res2 = run2() lock.release() print(res, res2) if __name__ == '__main__': num, num2 = 0, 0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: print(threading.active_count()) else: print('----all threads done---') print(num, num2)
加上鎖后程序就變串行了,為了避免程序變慢,別在子線程里加sleep等類似操作!
GIL VS Lock :
既然Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock?注意,這里的lock是用戶級的lock,跟那個GIL沒關系 ,具體我們通過下圖來看一下:
遞歸鎖:
threading.RLock()不會出現鎖死情況,說白了就是在一個大鎖中還要再包含子鎖。
文章列表