內容簡述
類似Binder機制,MessageQueue、Looper也有底層的C++實現,涉及文件管道和驅動等。
以下僅從Java層的Looper、Handler和MessageQueue等相關類型的源碼來分析線程消息處理的機制。
MessageQueue的創建
Looper用來創建和啟動消息隊列。
Looper對象和線程綁定是通過ThreadLocal實現的。
它提供了各種getter方便獲取線程關聯的MessageQueue和Looper對象。
Looper.prepare();
Looper.prepareMainLooper();
線程通過執行Looper.prepare()來創建關聯的MessageQueue。
主線程則調用prepareMainLooper()來創建主線程關聯的Looper,方便其它線程向主線程發消息。
Looper.prepare()
它新建了Looper對象,以ThreadLocal存儲它,所以和當前線程綁定。
構造函數中創建了消息隊列:
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
MessageQueue的創建
// MessageQueue.java
private long mPtr;
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
nativeInit()調用C++代碼創建底層的C++MessageQueue對象。
有了MessageQueue對象以后,接著需要開啟消息循環,使用關聯的Handler來發送、處理消息了。
消息循環
MessageQueue有點像一個阻塞隊列,它提供MessageQueue.next()用來從隊列中取出一個Message對象。
若沒有消息則調用會阻塞。
Looper.loop()
Looper.loop()用來開啟對MessageQueue的取消息操作的無限循環。
public static void loop() {
final Looper me = myLooper();
...
final MessageQueue queue = me.mQueue;
...
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
...
msg.target.dispatchMessage(msg);
...
msg.recycleUnchecked();
}
}
可以看到,loop()的操作就是無限從queue中調用next()獲取一個新Message對象,然后執行dispatchMessage來處理它。
MessageQueue.next()
nativePollOnce()用來檢查隊列中是否有新的消息。
參數nextPollTimeoutMillis表示在暫無消息時此次檢查過程需要休眠的時間:
- 等于-1:表示無限等待,直到被其它線程喚醒。
- 等于 0:繼續循環檢查隊列。
- 大于 0:以nextPollTimeoutMillis的毫米數等待。
使用for (;;) 無限循環檢查是否有消息,直到返回一個Message對象。
若當前MessageQueue正在退出,則返回null作為標識。
Message next() {
...
int nextPollTimeoutMillis = 0;
for (;;) {
...
nativePollOnce(ptr, nextPollTimeoutMillis);
...
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message msg = mMessages;
...
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
...
mMessages = msg.next;
msg.next = null;
...
return msg;
}
} else {
// 繼續循環等待消息
nextPollTimeoutMillis = -1;
}
...
// 每次執行next()時,若當前沒有消息可處理,那么就找到所有的idle handlers,回調它們
...
}
// 回調idle handlers. 每次調用next()只會執行一次
...
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
每次調用next()時,第一次檢查如果發現沒有要出來的消息,就一次性調用所有在注冊了的IdleHandler回調對象。
發送消息
MessageQueue和Looper對象都是和某個線程關聯的。
向一個線程發送消息,通過和它綁定的一個Handler對象進行。
執行創建Handler對象的的線程,Handler對象和此線程綁定。
Handler的創建
Handler對象的構造函數中,將當前線程關聯的Looper和MessageQueue保存到其字段中。
public Handler(Callback callback, boolean async) {
...
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
Handler.sendMessageDelayed
每個發送到線程關聯的MessageQueue中的Message對象,主要字段有:
- long when字段表示消息預期被處理的時間;
- int what表示消息的動作;
- Object obj可以攜帶額外的數據。
發送消息的一般形式為:
boolean sendMessageAtTime(Message msg, long uptimeMillis);
boolean sendMessageDelayed(Message msg, long delayMillis);
boolean sendEmptyMessageAtTime(int what, long uptimeMillis);
boolean sendEmptyMessageDelayed(int what, long delayMillis);
若不指定when那么它為當前時間,之后被Looper取出后立即執行;
sendMessage()中將Handler Message.target
設置為自身,最執行Handler綁定的隊列的MessageQueue.enqueue()方法。
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
MessageQueue.enqueueMessage
字段Message Message.next
用來指向Message鏈表中的下一個對象。
MessageQueue.mMessages字段指向了它擁有的消息鏈表的頭結點。
mMessages中的消息根據其when的時間排列,時間近的在前面。
enqueueMessage()在添加消息時將它放到mMessages的合適的位置。
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
mBlocked記錄當前隊列是否處于休眠?
在next()時若最近要處理的消息的when還未到或隊列空時,則在檢查隊列是否空時會主動休眠一段時間。
若新的Message被添加到鏈表頭,且它的when時間到了,那么就喚醒Looper繼續執行next(),獲取此Message然后處理它。
if (needWake) {
nativeWake(mPtr);
}
消息的處理
在Looper.loop()中,每當獲取新的消息后:
Message msg = queue.next(); // might block
...
msg.target.dispatchMessage(msg);
target就是發送Message的Hander。
Hander發送消息到其綁定的MessageQueue中。
可見,相關的Looper、MessageQueue、Handler都是和同一個線程關聯的。
從next()的執行在Looper.loop()的循環中進行可知:
Hander.dispatchMessage()的調用就是在Looper關聯的線程中進行。
Hander.dispatchMessage
public class Handler {
final Callback mCallback;
/**
* Callback interface you can use when instantiating a Handler to avoid
* having to implement your own subclass of Handler.
*
* @param msg A {@link android.os.Message Message} object
* @return True if no further handling is desired
*/
public interface Callback {
public boolean handleMessage(Message msg);
}
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
/**
* Subclasses must implement this to receive messages.
*/
public void handleMessage(Message msg) {
}
}
上面的Message.callback是一個Runnable。
Handler.mCallback是創建它時指定的一個回調對象。
handleMessage()就是子類重寫處理自定義消息的地方。
循環和退出
Looper.loop()執行后,線程“阻塞”,不斷從關聯的MessageQueue中取出消息并處理。
其它線程或在處理某個消息的邏輯中,可以調用Looper.quit()退出當前線程的消息循環。
// Looper.java
final MessageQueue mQueue;
...
public void quit() {
mQueue.quit(false);
}
它執行了MessageQueue.quit()。
void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
if (mQuitting) {
return;
}
mQuitting = true;
if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}
// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}
這里設置mQuitting為true。
之后下次調用next()時,返回null告知Looper.loop()退出循環。
線程的消息循環結束。
(本文使用Atom編寫)
文章列表