booleanenqueueMessage(Message msg, long when) { if (msg.target == null) { thrownewIllegalArgumentException("Message must have a target."); }
// 上锁,保证没有两条消息同时进行入队操作产生并发问题 synchronized (this) { // 如果这个 messgae 正在队列中,当然不能再次入队 // 说一点小小的感悟吧,rust语言的移动语义可以让多次入队成为不可能,而在java中为了防范这种边界情况要写大量的检查代码。移动语义这个设计确实高明 if (msg.isInUse()) { thrownewIllegalStateException(msg + " This message is already in use."); }
// 如果这个线程正在退出,当然不能给一条死掉的线程上的handler发消息 if (mQuitting) { IllegalStateExceptione=newIllegalStateException( msg.target + " sending message to a Handler on a dead thread"); Log.w(TAG, e.getMessage(), e); msg.recycle(); returnfalse; }
// 标记这个消息正在使用 msg.markInUse(); // 设置这条信息应当从队列取出时的时间 msg.when = when; // 拿到消息队列的头节点,没错,消息队列是一个链表结构 Messagep= mMessages; boolean needWake; // 如果头节点为空 或者 when == 0 (意味着这个消息必须放置在头节点)或者 新插入的消息出队时间比头节点早 // 就将插入的消息设置为新的头节点 // 这里可以看出消息队列是一个优先队列 if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; // 如果事件队列现在正处于等待状态就之后唤醒他 (其实就是唤醒epoll等待的事件线程) needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. // 在队列的中间插入。通常我们不会唤醒这个事件队列除非队列的头部有一个同步屏障 // 且这条消息是队列中最早的异步消息
// We can assume mPtr != 0 because mQuitting is false. // 如果需要唤醒就调用 nativeWake 进到 native 层对事件循环进行唤醒 if (needWake) { nativeWake(mPtr); } } returntrue; }
voidLooper::rebuildEpollLocked(){ // Close old epoll instance if we have one. if (mEpollFd >= 0) { #if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); #endif mEpollFd.reset(); }
// Allocate the new epoll instance and register the WakeEventFd. // 分配新的 epoll instance 并且注册 WakeEventFd mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC)); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
// 创建 epoll 事件 wakeEvent epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ); // 注册文件描述符 int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s", strerror(errno));
// 重要步骤,从队列中取出消息 Messagemsg= me.mQueue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. returnfalse; }
// This must be in a local variable, in case a UI event sets the logger finalPrinterlogging= me.mLogging; if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } // Make sure the observer won't change while processing a transaction. // 这个 Observer 可以在消息处理前和消息处理后做一些事情 finalObserverobserver= sObserver;
if (traceTag != 0 && Trace.isTagEnabled(traceTag)) { Trace.traceBegin(traceTag, msg.target.getTraceName(msg)); }
finallongdispatchStart= needStartTime ? SystemClock.uptimeMillis() : 0; finallong dispatchEnd; Objecttoken=null; if (observer != null) { token = observer.messageDispatchStarting(); } longorigWorkSource= ThreadLocalWorkSource.setUid(msg.workSourceUid); try { // 在这里把消息发给 Handler msg.target.dispatchMessage(msg); if (observer != null) { observer.messageDispatched(token, msg); } dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0; } catch (Exception exception) { if (observer != null) { observer.dispatchingThrewException(token, msg, exception); } throw exception; } finally { ThreadLocalWorkSource.restore(origWorkSource); if (traceTag != 0) { Trace.traceEnd(traceTag); } } if (logSlowDelivery) { if (me.mSlowDeliveryDetected) { if ((dispatchStart - msg.when) <= 10) { Slog.w(TAG, "Drained"); me.mSlowDeliveryDetected = false; } } else { if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery", msg)) { // Once we write a slow delivery log, suppress until the queue drains. me.mSlowDeliveryDetected = true; } } } if (logSlowDispatch) { showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg); }
if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); }
// Make sure that during the course of dispatching the // identity of the thread wasn't corrupted. finallongnewIdent= Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); }
publicstaticvoidloop() { finalLooperme= myLooper(); if (me == null) { thrownewRuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } if (me.mInLoop) { Slog.w(TAG, "Loop again would have the queued messages be executed" + " before this one completed."); }
me.mInLoop = true;
// Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); finallongident= Binder.clearCallingIdentity();
// Allow overriding a threshold with a system prop. e.g. // adb shell 'setprop log.looper.1000.main.slow 1 && stop && start' finalintthresholdOverride= SystemProperties.getInt("log.looper." + Process.myUid() + "." + Thread.currentThread().getName() + ".slow", 0);
me.mSlowDeliveryDetected = false;
for (;;) { if (!loopOnce(me, ident, thresholdOverride)) { return; } } }
Message next() { // Return here if the message loop has already quit and been disposed. // This can happen if the application tries to restart a looper after quit // which is not supported. finallongptr= mPtr; if (ptr == 0) { returnnull; }
// 这里出现了 IdleHandler 的字眼 intpendingIdleHandlerCount= -1; // -1 only during first iteration intnextPollTimeoutMillis=0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); }
synchronized (this) { // Try to retrieve the next message. Return if found. finallongnow= SystemClock.uptimeMillis(); MessageprevMsg=null; Messagemsg= mMessages;
// 消息队列的头部有同步屏障 if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. // 找出队列中的首个异步消息 do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } // 如果有消息 if (msg != null) { // 这个消息还没有到时间,设置一下下轮循环执行的 pollOnce 的超时时间 if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // 这个消息已经到时间了,从消息队列中取出返回 // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // 队列空了,进入无限期的 epoll_wait 等待 // No more messages. nextPollTimeoutMillis = -1; }
// Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); returnnull; }
// 这里是 IdleHandler 相关的内容,先去看看 IdleHandler 这个东西怎么用 // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. // 第一次空闲的时候,记录要运行的 idlehandler 的数量。 // 空闲处理只在队列为空或第一条消息需要等待一段时间的时候执行 // 说白了就是在 pollOnce 之前执行 if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // 没有 IdleHandler 的情况下单次循环在这里结束 // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; }
// Run the idle handlers. // We only ever reach this code block during the first iteration. for (inti=0; i < pendingIdleHandlerCount; i++) { finalIdleHandleridler= mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler
// 重制数量 // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0;
// 执行了idleHandler就不等待了,因为可能在idleHandler中已经发送了新的消息,重走一遍流程 // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
// Rebuild epoll set if needed. if (mEpollRebuildRequired) { mEpollRebuildRequired = false; rebuildEpollLocked(); goto Done; }
// Check for poll error. // 如果发生的事件小于0,说明 epoll_wait 出异常了,设置 result 为 POLL_ERROR 后跳转到 Done if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error: %s", strerror(errno)); result = POLL_ERROR; goto Done; }
// Check for poll timeout. // epoll_wait 超时返回,跳转到 Done if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this); #endif result = POLL_TIMEOUT; goto Done; }
// Handle all events. #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif
// 走到这里说明有事件 for (int i = 0; i < eventCount; i++) { // 挨个取出事件进行响应 const SequenceNumber seq = eventItems[i].data.u64; uint32_t epollEvents = eventItems[i].events; // 是 wake event if (seq == WAKE_EVENT_FD_SEQ) { if (epollEvents & EPOLLIN) { // 清除 wakeEventFd 的事件循环计数器,以便接收下一次事件 // 事件文件描述符(如 eventfd)被设置为边缘触发(ET)模式。 // 这意味着只有在状态发生变化时,epoll 才会返回这个文件描述符的事件。 // 如果你不读取这个事件,状态就不会改变,所以 epoll 可能不会再次返回这个事件,即使有新的唤醒事件发生。 awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { // 响应其他事件 constauto& request_it = mRequests.find(seq); if (request_it != mRequests.end()) { constauto& request = request_it->second; int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; mResponses.push({.seq = seq, .events = events, .request = request}); } else { ALOGW("Ignoring unexpected epoll events 0x%x for sequence number %" PRIu64 " that is no longer registered.", epollEvents, seq); } } } Done: ;
// Invoke pending message callbacks. // 这里就是处理 native 层的消息,跟 java 层 handler 的逻辑差不多 // native 层的消息是用 vector 存的 mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); // 消息到期 if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock();
mLock.lock(); mSendingMessage = false; // 这里 result 就是把message回调给了handler result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. // 设置下条消息到期的时间 并跳出循环等待Java层的下一次轮询 mNextMessageUptime = messageEnvelope.uptime; break; } }
// Release lock. mLock.unlock();
// Invoke all response callbacks. for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { AutoMutex _l(mLock); removeSequenceNumberLocked(response.seq); }
// Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } } return result; }