文章出處
文章列表
鎖是高性能程序的殺手,但是為了保證數據的一致性,在多線程的應用環境下又不得不加鎖。但是在某些特殊的場景下, 是可以通過優化數據結構來達到無鎖的目的。那么我們就來看一下如何實現一個無鎖隊列。
隊列:眾所周知,就是先進先出。 出隊列的時候從隊列頭取出一個結點;入隊列的時候,將結點添加到隊列尾部。當多線程同時操作一個隊列讀寫時,顯然就需要加鎖。但是在單讀單寫的這種多線程應用時,是可以做到無鎖的。直接上代碼
#ifndef _QUEUE_H_ #define _QUEUE_H_ template<class T> class node { public: T* pdata; node* next; }; template<class T> class queue { private: node<T> *front; node<T> *tear; public: queue(); ~queue(); bool isempty(); T* pop(); void push(T *p); }; template<class T> queue<T>::queue() { node<T> *pnode = new node<T>(); front = pnode; tear = pnode; } template<class T> queue<T>::~queue() { node<T> *cur = front; node<T> *next = NULL; while(cur) { next = cur->next; delete cur->pdata; delete cur; cur = next; } } template<class T> bool queue<T>::isempty() { if(front == tear) return true; return false; } template<class T> void queue<T>::push(T *p) { node<T> *pnode = new node<T>(); tear->pdata = p; tear->next = pnode; tear = pnode; } template<class T> T* queue<T>::pop() { if(isempty()) return NULL; node<T>* pnode = front; T* p = pnode->pdata; front = front->next; delete pnode; return p; } #endif
原理其實很簡單,就是在隊列的末尾添加一個空節點。這樣在隊列非空的情況下,front結點與tear結點中間至少間隔一個結點。那么當兩個線程同時插入與取出結點時,就不會存在同時操作front與tear的情況,從而保證不會出現race condition
下面是測試代碼:
#include <stdio.h> #include <pthread.h> #include <sys/time.h> #include "queue.h" #define MAX 100000 void* fun1(void *p) { queue<int> *q = (queue<int>*)p; int i = 0; while(1) { int *tmp = q->pop(); if(tmp) { if(*tmp != i++) printf("err\n"); delete tmp; } if(i == MAX) break; } } void* fun2(void *p) { queue<int> *q = (queue<int>*)p; int i = 0; while(i < MAX) { int *tmp = new int(i++); q->push(tmp); } } int main() { queue<int> q; struct timeval tv1; struct timeval tv2; pthread_t t1,t2; gettimeofday(&tv1,NULL); pthread_create(&t1,NULL ,fun1 ,&q); pthread_create(&t2,NULL, fun2 ,&q ); pthread_join(t1,NULL); pthread_join(t2,NULL); gettimeofday(&tv2,NULL); long delta = tv2.tv_sec*1000000+tv2.tv_usec - ( tv1.tv_sec*1000000+tv1.tv_usec) ; printf("time : %lu\n",delta); return 0; }
在我的機器上,測試結果為327730us
下面再給出加鎖的版本,并使用相同的測試方法,進行對比
#ifndef _QUEUE_H_ #define _QUEUE_H_ #include <pthread.h> template<class T> class node { public: T* pdata; node* next; node(T* p1, node* p2):pdata(p1),next(p2){} }; template<class T> class queue { private: node<T> *front; node<T> *tear; pthread_mutex_t mutex; public: queue(); ~queue(); bool isempty(); T* pop(); void push(T *p); }; template<class T> queue<T>::queue() { front = NULL; tear = NULL; pthread_mutex_init(&mutex,NULL); } template<class T> queue<T>::~queue() { node<T> *cur = front; node<T> *next = NULL; while(cur) { next = cur->next; delete cur->pdata; delete cur; cur = next; } } template<class T> void queue<T>::push(T *p) { node<T> *pnode = new node<T>(p,NULL); pthread_mutex_lock(&mutex); if(front == NULL) { front = pnode; tear = pnode; } else { tear->next = pnode; tear = pnode; } pthread_mutex_unlock(&mutex); } template<class T> T* queue<T>::pop() { T * pdata = NULL; node<T> *pnode = NULL; pthread_mutex_lock(&mutex); if(front == NULL) { pthread_mutex_unlock(&mutex); return NULL; } pnode = front; if(front == tear) tear = NULL; front = front->next; pthread_mutex_unlock(&mutex); pdata = pnode->pdata; delete pnode; return pdata; } #endif
加鎖的版本,測試結果為497987us。
可見,加鎖版本所耗時間,差不多為無鎖版本的1.5倍以上。
文章列表
全站熱搜