文章出處

線程

使用Instruments的CPU strategy view查看代碼如何在多核CPU中執行。創建線程可以使用POSIX 線程API,或者NSThread(封裝POSIX 線程API)。下面是并發4個線程在一百萬個數字中找最小值和最大值的pthread例子:

#import <pthread.h>

struct threadInfo {
     uint32_t * inputValues;
     size_t count;
};

struct threadResult {
     uint32_t min;
     uint32_t max;
};

void * findMinAndMax(void *arg)
{
     struct threadInfo const * const info = (struct threadInfo *) arg;
     uint32_t min = UINT32_MAX;
     uint32_t max = 0;
     for (size_t i = 0; i < info->count; ++i) {
          uint32_t v = info->inputValues[i];
          min = MIN(min, v);
          max = MAX(max, v);
     }
     free(arg);
     struct threadResult * const result = (struct threadResult *) malloc(sizeof(*result));
     result->min = min;
     result->max = max;
     return result;
}

int main(int argc, const char * argv[])
{
     size_t const count = 1000000;
     uint32_t inputValues[count];

     // 使用隨機數字填充 inputValues
     for (size_t i = 0; i < count; ++i) {
          inputValues[i] = arc4random();
     }

     // 開始4個尋找最小值和最大值的線程
     size_t const threadCount = 4;
     pthread_t tid[threadCount];
     for (size_t i = 0; i < threadCount; ++i) {
          struct threadInfo * const info = (struct threadInfo *) malloc(sizeof(*info));
          size_t offset = (count / threadCount) * i;
          info->inputValues = inputValues + offset;
          info->count = MIN(count - offset, count / threadCount);
          int err = pthread_create(tid + i, NULL, &findMinAndMax, info);
          NSCAssert(err == 0, @"pthread_create() failed: %d", err);
     }
     // 等待線程退出
     struct threadResult * results[threadCount];
     for (size_t i = 0; i < threadCount; ++i) {
          int err = pthread_join(tid[i], (void **) &(results[i]));
          NSCAssert(err == 0, @"pthread_join() failed: %d", err);
     }
     // 尋找 min 和 max
     uint32_t min = UINT32_MAX;
     uint32_t max = 0;
     for (size_t i = 0; i < threadCount; ++i) {
          min = MIN(min, results[i]->min);
          max = MAX(max, results[i]->max);
          free(results[i]);
          results[i] = NULL;
     }

     NSLog(@"min = %u", min);
     NSLog(@"max = %u", max);
     return 0;
}

 

使用NSThread來寫

@interface FindMinMaxThread : NSThread
@property (nonatomic) NSUInteger min;
@property (nonatomic) NSUInteger max;
- (instancetype)initWithNumbers:(NSArray *)numbers;
@end

@implementation FindMinMaxThread {
     NSArray *_numbers;
}

- (instancetype)initWithNumbers:(NSArray *)numbers
{
     self = [super init];
     if (self) {
          _numbers = numbers;
     }
     return self;
}

- (void)main
{
     NSUInteger min;
     NSUInteger max;
     // 進行相關數據的處理
     self.min = min;
     self.max = max;
}
@end

//啟動一個新的線程,創建一個線程對象
SMutableSet *threads = [NSMutableSet set];
NSUInteger numberCount = self.numbers.count;
NSUInteger threadCount = 4;
for (NSUInteger i = 0; i < threadCount; i++) {
     NSUInteger offset = (count / threadCount) * i;
     NSUInteger count = MIN(numberCount - offset, numberCount / threadCount);
     NSRange range = NSMakeRange(offset, count);
     NSArray *subset = [self.numbers subarrayWithRange:range];
     FindMinMaxThread *thread = [[FindMinMaxThread alloc] initWithNumbers:subset];
     [threads addObject:thread];
     [thread start];
}

 

Grand Central Dispatch

GCD概要

  • 和operation queue一樣都是基于隊列的并發編程API,他們通過集中管理大家協同使用的線程池。
  • 公開的5個不同隊列:運行在主線程中的main queue,3個不同優先級的后臺隊列(High Priority Queue,Default Priority Queue,Low Priority Queue),以及一個優先級更低的后臺隊列Background Priority Queue(用于I/O)
  • 可創建自定義隊列:串行或并列隊列。自定義一般放在Default Priority Queue和Main Queue里。

dispatch_once用法

+ (UIColor *)boringColor;
{
     static UIColor *color;
     //只運行一次
     static dispatch_once_t onceToken;
     dispatch_once(&onceToken, ^{
          color = [UIColor colorWithRed:0.380f green:0.376f blue:0.376f alpha:1.000f];
     });
     return color;
}

  

延后執行

使用dispatch_after

- (void)foo
{
     double delayInSeconds = 2.0;
     dispatch_time_t popTime = dispatch_time(DISPATCH_TIME_NOW, (int64_t) (delayInSeconds * NSEC_PER_SEC));
     dispatch_after(popTime, dispatch_get_main_queue(), ^(void){
          [self bar];
     });
}

 

GCD隊列

隊列默認是串行的,只能執行一個單獨的block,隊列也可以是并行的,同一時間執行多個block

- (id)init;
{
     self = [super init];
     if (self != nil) {
          NSString *label = [NSString stringWithFormat:@"%@.isolation.%p", [self class], self];
          self.isolationQueue = dispatch_queue_create([label UTF8String], 0);

          label = [NSString stringWithFormat:@"%@.work.%p", [self class], self];
          self.workQueue = dispatch_queue_create([label UTF8String], 0);
     }
     return self;
}

 

多線程并發讀寫同一個資源

//創建隊列
self.isolationQueue = dispatch_queue_create([label UTF8String], DISPATCH_QUEUE_CONCURRENT);
//改變setter
- (void)setCount:(NSUInteger)count forKey:(NSString *)key
{
     key = [key copy];
     //確保所有barrier都是async異步的
     dispatch_barrier_async(self.isolationQueue, ^(){
          if (count == 0) {
               [self.counts removeObjectForKey:key];
          } else {
               self.counts[key] = @(count);
          }
     });
}

 

都用異步處理避免死鎖,異步的缺點在于調試不方便,但是比起同步容易產生死鎖這個副作用還算小的。

異步API寫法

設計一個異步的API調用dispatch_async(),這個調用放在API的方法或函數中做。讓API的使用者設置一個回調處理隊列

- (void)processImage:(UIImage *)image completionHandler:(void(^)(BOOL success))handler;
{
     dispatch_async(self.isolationQueue, ^(void){
          // do actual processing here
          dispatch_async(self.resultQueue, ^(void){
               handler(YES);
          });
     });
}

 

dispatch_apply進行快速迭代

for (size_t y = 0; y < height; ++y) {
     for (size_t x = 0; x < width; ++x) {
          // Do something with x and y here
     }
}
//使用dispatch_apply可以運行的更快
dispatch_apply(height, dispatch_get_global_queue(0, 0), ^(size_t y) {
     for (size_t x = 0; x < width; x += 2) {
          // Do something with x and y here
     }
});

 

Block組合
dispatch_group_t group = dispatch_group_create();

dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_group_async(group, queue, ^(){
     // 會處理一會
     [self doSomeFoo];
     dispatch_group_async(group, dispatch_get_main_queue(), ^(){
          self.foo = 42;
     });
});
dispatch_group_async(group, queue, ^(){
     // 處理一會兒
     [self doSomeBar];
     dispatch_group_async(group, dispatch_get_main_queue(), ^(){
          self.bar = 1;
     });
});

// 上面的都搞定后這里會執行一次
dispatch_group_notify(group, dispatch_get_main_queue(), ^(){
     NSLog(@"foo: %d", self.foo);
     NSLog(@"bar: %d", self.bar);
});

 

如何對現有API使用dispatch_group_t

//給Core Data的-performBlock:添加groups。組合完成任務后使用dispatch_group_notify來運行一個block即可。
- (void)withGroup:(dispatch_group_t)group performBlock:(dispatch_block_t)block
{
     if (group == NULL) {
          [self performBlock:block];
     } else {
          dispatch_group_enter(group);
          [self performBlock:^(){
               block();
               dispatch_group_leave(group);
          }];
     }
}

//NSURLConnection也可以這樣做
+ (void)withGroup:(dispatch_group_t)group
     sendAsynchronousRequest:(NSURLRequest *)request
     queue:(NSOperationQueue *)queue
     completionHandler:(void (^)(NSURLResponse*, NSData*, NSError*))handler
{
     if (group == NULL) {
          [self sendAsynchronousRequest:request
               queue:queue
               completionHandler:handler];
     } else {
          dispatch_group_enter(group);
          [self sendAsynchronousRequest:request
                    queue:queue
                    completionHandler:^(NSURLResponse *response, NSData *data, NSError *error){
               handler(response, data, error);
               dispatch_group_leave(group);
          }];
     }
}

 

注意事項
  • dispatch_group_enter() 必須運行在 dispatch_group_leave() 之前。
  • dispatch_group_enter() 和 dispatch_group_leave() 需要成對出現的

用GCD監視進程

NSRunningApplication *mail = [NSRunningApplication runningApplicationsWithBundleIdentifier:@"com.apple.mail"];
if (mail == nil) {
     return;
}
pid_t const pid = mail.processIdentifier;
self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_PROC, pid, DISPATCH_PROC_EXIT, DISPATCH_TARGET_QUEUE_DEFAULT);
dispatch_source_set_event_handler(self.source, ^(){
     NSLog(@"Mail quit.");
});
//在事件源傳到你的事件處理前需要調用dispatch_resume()這個方法
dispatch_resume(self.source);

 

監視文件夾內文件變化

NSURL *directoryURL; // assume this is set to a directory
int const fd = open([[directoryURL path] fileSystemRepresentation], O_EVTONLY);
if (fd < 0) {
     char buffer[80];
     strerror_r(errno, buffer, sizeof(buffer));
     NSLog(@"Unable to open "%@": %s (%d)", [directoryURL path], buffer, errno);
     return;
}
dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_VNODE, fd,
DISPATCH_VNODE_WRITE | DISPATCH_VNODE_DELETE, DISPATCH_TARGET_QUEUE_DEFAULT);
dispatch_source_set_event_handler(source, ^(){
     unsigned long const data = dispatch_source_get_data(source);
     if (data & DISPATCH_VNODE_WRITE) {
          NSLog(@"The directory changed.");
     }
     if (data & DISPATCH_VNODE_DELETE) {
          NSLog(@"The directory has been deleted.");
     }
});
dispatch_source_set_cancel_handler(source, ^(){
     close(fd);
});
self.source = source;
dispatch_resume(self.source);
//還要注意需要用DISPATCH_VNODE_DELETE 去檢查監視的文件或文件夾是否被刪除,如果刪除了就停止監聽

 

GCD版定時器

dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,0, 0, DISPATCH_TARGET_QUEUE_DEFAULT);
dispatch_source_set_event_handler(source, ^(){
     NSLog(@"Time flies.");
});
dispatch_time_t start
dispatch_source_set_timer(source, DISPATCH_TIME_NOW, 5ull * NSEC_PER_SEC,100ull * NSEC_PER_MSEC);
self.source = source;
dispatch_resume(self.source);

 

GCD深入操作

  • 緩沖區:dispatch_data_t基于零碎的內存區域,使用dispatch_data_apply來遍歷,還可以用dispatch_data_create_subrange來創建一個不做任何拷貝的子區域
  • I/O調度:使用GCD提供的dispatch_io_read,dispatch_io_write和dispatch_io_close
  • 測試:使用dispatch_benchmark小工具
  • 原子操作: libkern/OSAtomic.h里可以查看那些函數,用于底層多線程編程。

Operation Queues

  • Operation Queue是在GCD上實現了一些方便的功能。
  • NSOperationQueue有主隊列和自定義隊列兩種類型隊列。主隊列在主線程上運行,自定義隊列在后臺。
  • 重寫main方法自定義自己的operations。較簡單,不需要管理isExecuting和isFinished,main返回時operation就結束了。
@implementation YourOperation
     - (void)main
     {
          // 進行處理 ...
     }
@end

 

  • 重寫start方法能夠獲得更多的控制權,還可以在一個操作中執行異步任務
@implementation YourOperation
     - (void)start
     {
          self.isExecuting = YES;
          self.isFinished = NO;
          // 開始處理,在結束時應該調用 finished ...
     }

     - (void)finished
     {
          self.isExecuting = NO;
          self.isFinished = YES;
     }
@end
//使操作隊列有取消功能,需要不斷檢查isCancelled屬性
- (void)main
{
     while (notDone && !self.isCancelled) {
          // 進行處理
     }
}

 

  • 定義好operation類以后,將一個operation加到隊列里:
NSOperationQueue *queue = [[NSOperationQueue alloc] init];
YourOperation *operation = [[YourOperation alloc] init];
[queue addOperation:operation];

 

  • 如果是在主隊列中進行一個一次性任務,可以將block加到操作隊列
[[NSOperationQueue mainQueue] addOperationWithBlock:^{
     // 代碼...
}];

 

  • 通過maxConcurrentOperationCount屬性控制一個特定隊列中并發執行操作的數量。設置為1就是串行隊列。
  • 對operation優先級排序,指定operation之間的依賴關系。
//確保operation1和operation2是在intermediateOperation和finishOperation之前執行
[intermediateOperation addDependency:operation1];
[intermediateOperation addDependency:operation2];
[finishedOperation addDependency:intermediateOperation];

 

Run Loops

  • Run loop比GCD和操作隊列要容易,不必處理并發中復雜情況就能異步執行。
  • 主線程配置main run loop,其它線程默認都沒有配置run loop。一般都在主線程中調用后分配給其它隊列。如果要在其它線程添加run loop至少添加一個input source,不然一運行就會退出。

在后臺操作UI

使用操作隊列處理

//weak引用參照self避免循環引用,及block持有self,operationQueue retain了block,而self有retain了operationQueue。
__weak id weakSelf = self;
[self.operationQueue addOperationWithBlock:^{
     NSNumber* result = findLargestMersennePrime();
     [[NSOperationQueue mainQueue] addOperationWithBlock:^{
          MyClass* strongSelf = weakSelf;
          strongSelf.textLabel.text = [result stringValue];
     }];
}];

 

drawRect在后臺繪制

drawRect:方法會影響性能,所以可以放到后臺執行。

//使用UIGraphicsBeginImageContextWithOptions取代UIGraphicsGetCurrentContext:方法
UIGraphicsBeginImageContextWithOptions(size, NO, 0);
// drawing code here
UIImage *i = UIGraphicsGetImageFromCurrentImageContext();
UIGraphicsEndImageContext();
return i;

 

可以把這個方法運用到table view中,使table view的cell在滾出邊界時能在didEndDisplayingCell委托方法中取消。WWDC中有講解:Session 211 -- Building Concurrent User Interfaces on iOS https://developer.apple.com/videos/wwdc/2012/

還有個使用CALayer里drawsAsynchronously屬性的方法。不過有時work,有時不一定。

網絡異步請求

網絡都要使用異步方式,但是不要直接使用dispatch_async,這樣沒法取消這個網絡請求。dataWithContentsOfURL:的超時是30秒,那么這個線程需要干等到超時完。解決辦法就是使用NSURLConnection的異步方法,把所有操作轉化成operation來執行。NSURLConnection是通過run loop來發送事件的。AFNetworking是建立一個獨立的線程設置一個非main run loop。下面是處理URL連接重寫自定義operation子類里的start方法

- (void)start
{
     NSURLRequest* request = [NSURLRequest requestWithURL:self.url];
     self.isExecuting = YES;
     self.isFinished = NO;
     [[NSOperationQueue mainQueue] addOperationWithBlock:^
     {
          self.connection = [NSURLConnectionconnectionWithRequest:request
               delegate:self];
     }];
}

 

重寫start方法需要管理isExecuting和isFinished狀態。下面是取消操作的方法

- (void)cancel
{
     [super cancel];
     [self.connection cancel];
     self.isFinished = YES;
     self.isExecuting = NO;
}
//連接完成發送回調
- (void)connectionDidFinishLoading:(NSURLConnection *)connection
{
     self.data = self.buffer;
     self.buffer = nil;
     self.isExecuting = NO;
     self.isFinished = YES;
}

 

后臺處理I/O

異步處理文件可以使用NSInputStream。官方文檔:http://developer.apple.com/library/ios/#documentation/FileManagement/Conceptual/FileSystemProgrammingGUide/TechniquesforReadingandWritingCustomFiles/TechniquesforReadingandWritingCustomFiles.html 實例:https://github.com/objcio/issue-2-background-file-io

@interface Reader : NSObject
- (void)enumerateLines:(void (^)(NSString*))block
     completion:(void (^)())completion;
- (id)initWithFileAtPath:(NSString*)path;

//采用main run loop的事件將數據發到后臺操作線程去處理
- (void)enumerateLines:(void (^)(NSString*))block
completion:(void (^)())completion
{
     if (self.queue == nil) {
          self.queue = [[NSOperationQueue alloc] init];
          self.queue.maxConcurrentOperationCount = 1;
     }
     self.callback = block;
     self.completion = completion;
     self.inputStream = [NSInputStream inputStreamWithURL:self.fileURL];
     self.inputStream.delegate = self;
     [self.inputStream scheduleInRunLoop:[NSRunLoop currentRunLoop]
     forMode:NSDefaultRunLoopMode];
          [self.inputStream open];
     }
@end

//input stream在主線程中發送代理消息,接著就可以在操作隊列加入block操作
- (void)stream:(NSStream*)stream handleEvent:(NSStreamEvent)eventCode
{
     switch (eventCode) {
          ...
          case NSStreamEventHasBytesAvailable: {
               NSMutableData *buffer = [NSMutableData dataWithLength:4 * 1024];
               NSUInteger length = [self.inputStream read:[buffer mutableBytes]
                    maxLength:[buffer length]];
               if (0 < length) {
                    [buffer setLength:length];
                    __weak id weakSelf = self;
                    [self.queue addOperationWithBlock:^{
                         [weakSelf processDataChunk:buffer];
                    }];
               }
               break;
          }
          ...
     }
}

//處理數據chunk,原理就是把數據切成很多小塊,然后不斷更新和處理buffer緩沖區,逐塊讀取和存入方式來處理大文件響應快而且內存開銷也小。
- (void)processDataChunk:(NSMutableData *)buffer;
{
     if (self.remainder != nil) {
          [self.remainder appendData:buffer];
     } else {
          self.remainder = buffer;
     }
     [self.remainder obj_enumerateComponentsSeparatedBy:self.delimiter
               usingBlock:^(NSData* component, BOOL last) {
          if (!last) {
               [self emitLineWithData:component];
          } else if (0 < [component length]) {
               self.remainder = [component mutableCopy];
          } else {
               self.remainder = nil;
          }
     }];
}

 

并發開發會遇到的困難問題

多個線程訪問共享資源

比如兩個線程都會把計算結果寫到一個整型數中。為了防止,需要一種互斥機制來訪問共享資源

互斥鎖

同一時刻只能有一個線程訪問某個資源。某線程要訪問某個共享資源先獲得共享資源的互斥鎖,完成操作再釋放這個互斥鎖,然后其它線程就能訪問這個共享資源。

還有需要解決無序執行問題,這時就需要引入內存屏障。

在Objective-C中如果屬性聲明為atomic就能夠支持互斥鎖,但是因為加解鎖會有性能代價,所以一般是聲明noatomic的。

死鎖

當多個線程在相互等待對方鎖結束時就會發生死鎖,程序可能會卡住。

void swap(A, B)
{
     lock(lockA);
     lock(lockB);
     int a = A;
     int b = B;
     A = b;
     B = a;
     unlock(lockB);
     unlock(lockA);
}

 


//一般沒問題,但是如果兩個線程使用相反的值同時調用上面這個方法就可能會死鎖。線程1獲得X的一個鎖,線程2獲得Y的一個鎖,它們會同時等待另一個鎖的釋放,但是卻是沒法等到的。
swap(X, Y); // 線程 1
swap(Y, X); // 線程 2

為了防止死鎖,需要使用比簡單讀寫鎖更好的辦法,比如write preferencehttp://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock,或read-copy-update算法http://en.wikipedia.org/wiki/Read-copy-update

優先級反轉

運行時低優先級任務由于先取得了釋放了鎖的共享資源而阻塞了高優先級任務,這種情況叫做優先級反轉

最佳安全實踐避免問題的方法

從主線程中取到數據,利用一個操作隊列在后臺處理數據,完后返回后臺隊列中得到的數據到主隊列中。這樣的操作不會有任何鎖操作。

并發測試

 

本文地址:http://www.starming.com/index.php?v=index&view=73


文章列表




Avast logo

Avast 防毒軟體已檢查此封電子郵件的病毒。
www.avast.com


arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()