文章出處

Python自動化 【第九篇】:Python基礎-線程、進程及python GIL全局解釋器鎖

本節內容:

  1. 進程與線程區別
  2. 線程
  • a)  語法
  • b)  join
  • c)  線程鎖之Lock\Rlock\信號量
  • d)  將線程變為守護進程
  • e)  Event事件 
  • f)   queue隊列
  • g)  生產者消費者模型

  3. python GIL全局解釋器鎖

 

1. 進程與線程區別

  線程:是操作系統能夠進行運算和調度的最小單位,是一堆指令的集合。線程被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發多個線程,每條線程并行執行不同的任務。線程就是cpu執行時所需要的一堆上下文關系。

 

  進程:以一個整體的形式暴露給操作系統管理,里面包含對各種資源的調用,內存的管理,網絡接口的調用等,對各種資源管理的集合就可稱為 進程。進程要操作cpu, 必須先創建一個線程。

 

  進程和線程的區別:

  • 線程共享內存空間,進程的內存是獨立的
  • 線程共用數據,進程數據獨立
  • 同一個進程的線程之間可以直接交流,兩個進程必須通過中間代理實現通信
  • 新線程容易創建,新進程需要克隆父進程
  • 一個線程可以控制和操作同一進程里的其他線程, 進程只能操作子進程

  修改主線程有可能影響到其他線程的行為,對父進程修改不會影響子進程。

2. 線程(threading模塊)

a)  語法

  先寫一個簡單的線程:

  
import threading
import time
def run(n):
    print("task" ,n)
    time.sleep(2)
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t1.start()
t2.start()

print(t1.getName) #獲取線程名

print(t2.getName)
View Code 

  繼承式調用:

  
import threading
import time


class MyThread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):  # 定義每個線程要運行的函數

        print("running on number:%s" % self.num)

        time.sleep(3)


if __name__ == '__main__':
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()
繼承式調用

  啟動多個線程:

  
import threading

import time

def run(n):

    print("task" ,n)

    time.sleep(2)

for i in range(50):

    t = threading.Thread(target=run, args=("t_%s" % i,))

    t.start()
啟用多線程

 

b)  join 

  join & Daemon用法:

  默認情況主線程不會等子線程執行完畢,但是join可以做到

  
import threading

import time

def run(n):

    print("task" ,n)

    time.sleep(2)

    print("task done ", n)

start_time = time.time()



t_objs = []

for i in range(50):

    t = threading.Thread(target=run, args=("t_%s" % i,))
   t.start()

    t_objs.append(t)

for t in t_objs:

    t.join()

print("cost_time:",time.time()-start_time)
join用法

  主線程是程序本身

  

     threading.current_thread()和threading.active_count()用法:

  
import threading

import time

def run(n):

    print("task" ,n)

    time.sleep(2)

    print("task done ", n,threading.current_thread())

start_time = time.time()



t_objs = []

for i in range(50):

    t = threading.Thread(target=run, args=("t_%s" % i,))

    t.start()

    t_objs.append(t)

# for t in t_objs:

#     t.join()

print("===all threads has finished", threading.current_thread(), threading.active_count())

print("cost_time:",time.time()-start_time)
threading.current_thread()和threading.active_count()

c)  信號量:

  threading.BoundedSemaphore(n)

  互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 。

  threading.BoundedSamphore(n)

  

d) 把子線程變成守護線程setDaemod()方法:

  
#!/usr/bin/env python

# -*- coding:utf-8 -*-

# Author: zhoujunlong

import threading

import time

def run(n):

    print("task" ,n)

    time.sleep(2)

    print("task done ", n,threading.current_thread())

start_time = time.time()

for i in range(50):

    t = threading.Thread(target=run, args=("t_%s" % i,))

    t.setDaemon(True) # 把當前線程設置為守護線程 , 在start之前

    t.start()

print("===all threads has finished", threading.current_thread(), threading.active_count())

print("cost_time:",time.time()-start_time)
set Daemod()

 

e)  事件:

  事件是一個簡單地同步對象

  event = threading.event()

  event.wait() 等待標志位被設定

  event.set() 設置標志位

  event.clear() 清除標志位

  event.is_set() 判斷標志位是否設定

  通過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。

  
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: zhoujunlong
import threading
import time
event = threading.Event()
def lighter():
    count = 0
    event.set()
    while True:
        if 10>=count > 5:#change_to_red_light
            event.clear() # 清空標志位
            print("\033[41;1m紅燈了\033[0m")
        elif count > 10:
            event.set() #change_to_green_light
            count = 0
        else:print("\033[42;1m綠燈了\033[0m")
        time.sleep(1)
        count += 1
def car(name):
    while True:
        if event.is_set():#代表綠燈
            print("[%s] running..." % name)
            time.sleep(1)
        else:
            print("[%s] sees red light, waiting" % name)
            event.wait()
            print("\033[34;1m[%s] 綠燈了,gogogo\033[0m"%name)
light = threading.Thread(target=lighter)
light.start()
car1 = threading.Thread(target=car,args=("QQ",))

car2 = threading.Thread(target=car,args=("TT",))
car1.start()
car2.start()
示例1-紅綠燈

  這里還有一個event使用的例子,員工進公司門要刷卡, 我們這里設置一個線程是“門”, 再設置幾個線程為“員工”,員工看到門沒打開,就刷卡,刷完卡,門開了,員工就可以通過。

  
import threading
import time
import random


def door():
    door_open_time_counter = 0
    while True:
        if door_swiping_event.is_set():
            print("\033[32;1mdoor opening....\033[0m")
            door_open_time_counter +=1
        else:
            print("\033[31;1mdoor closed...., swipe to open.\033[0m")
            door_open_time_counter = 0 #清空計時器
            door_swiping_event.wait()
        if door_open_time_counter > 3:#門開了已經3s了,該關了
            door_swiping_event.clear()
        time.sleep(0.5)


def staff(n):
    print("staff [%s] is comming..." % n )
    while True:
        if door_swiping_event.is_set():
            print("\033[34;1mdoor is opened, passing.....\033[0m")
            break
        else:
            print("staff [%s] sees door got closed, swipping the card....." % n)
            print(door_swiping_event.set())
            door_swiping_event.set()
            print("after set ",door_swiping_event.set())
        time.sleep(0.5)
door_swiping_event  = threading.Event() #設置事件
door_thread = threading.Thread(target=door)
door_thread.start()
for i in range(5):
    p = threading.Thread(target=staff,args=(i,))
    time.sleep(random.randrange(3))
    p.start()
示例2-員工進門刷卡

 

f)  queue(隊列) 

  class queue.Queue(maxsize=0) #先入先出

  class queue.LifoQueue(maxsize=0) #先入先出 #last  in  first  out

  class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列 

  • 實現程序的解耦
  • 提高運行效率
  
>>> import queue

>>> q = queue.Queue

>>> q.put("disk1")

>>> q.put("disk2")

>>> q.put("disk3")

>>q.qsize()

3

>>>q.get()

'disk1'

>>>q.get()

'disk2'

>>>q.get()

'disk3'

>>>q.get_nowait() 
queue

   拋異常,不會卡住

 

  maxsize()方法:

  
>>>q = queue.Queue(maxsize=3)

>>>q.put(1)

>>>q.put(2)

>>>q.put(3)

>>>q.put(4)
maxsize()方法

  再put時會卡住

 

  Lifo

  
import queue
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
Lifo

  程序輸出:

  
3

2

1
程序輸出

 

  Priority方法:

  
import queue
q = queue.PriorityQueue()
q.put(("-1a1"))
q.put(("5, a2"))
q.put(("2, a3"))
print(q.get())
print(q.get())
print(q.get())
Priority方法

  程序輸出:

  
-1, a1

2, a3

5, a2
程序輸出

 

  Queue.task_done() 以下為解釋:

  Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

  If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

  Raises a ValueError if called more times than there were items placed in the queue.

  

g)  生產者消費者模型

  在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

  為什么要使用生產者和消費者模式?

  在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。

  什么是生產者消費者模式?

  生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。

 

  下面來學習一個最基本的生產者消費者模型的例子:

  
import threading
import queue

def producer():
    for i in range(10):
        q.put("骨頭 %s" % i )

    print("開始等待所有的骨頭被取走...")
    q.join()
    print("所有的骨頭被取完了...")

def consumer(n):
    while q.qsize() >0:
        print("%s 取到" %n, q.get())
        q.task_done() #告知這個任務執行完了

q = queue.Queue()

p = threading.Thread(target=producer,)
p.start()

c1 = consumer("Jack")
示例1-生產者消費者模型

  再來一個:

  
import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
def Consumer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()
示例2

  

3.  GIL全局解釋器鎖(面試必會)

   Python中無論幾核,同一時間只有一個線程在執行。

  線程鎖(互斥鎖)

  一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味著每個線程可以訪問同一份數據,此時,如果2個線程同時要修改同一份數據,會出現什么狀況?

  
import threading

import time

def run(n):

    global num

    time.sleep(2)

    num += 1

num = 0

t_objs = []

for i in range(1000):

    t = threading.Thread(target=run, args=("t_%s" % i,))

    t.start()

    t_objs.append(t)

for t in t_objs:

    t.join()

print("===all threads has finished")

print("num:", num)
線程鎖

  正常來講,這個num結果應該是1000, 但在python 2.7上多運行幾次,會發現,最后打印出來的num結果不總是1000,為什么每次運行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操作, 由于2個線程是并發同時運行的,所以2個線程很有可能同時拿走了num=0這個初始變量交給cpu去運算,當A線程去處完的結果是1,但此時B線程運算完的結果也是1,兩個線程同時CPU運算的結果再賦值給num變量后,結果就都是1。那怎么辦呢? 很簡單,每個線程在要修改公共數據時,為了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢并把鎖釋放掉后才能再訪問此數據。 

 

  為了讓上邊代碼每次都輸出num是1000的話,必須再加一把鎖:

  
import threading, time


def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num


def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)


if __name__ == '__main__':

    num, num2 = 0, 0
    lock = threading.RLock()
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)
輸出num為1000

 

  加上鎖后程序就變串行了,為了避免程序變慢,別在子線程里加sleep等類似操作!

  GIL VS Lock 

  既然Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock?注意,這里的lock是用戶級的lock,跟那個GIL沒關系 ,具體我們通過下圖來看一下:

  遞歸鎖:

  threading.RLock()不會出現鎖死情況,說白了就是在一個大鎖中還要再包含子鎖。


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()