Android-EventBus修改纪实(三)
持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第 5 天,点击查看活动详情
前言
在上一篇 Android-EventBus修改纪实(二) 中笔者简单分析了 EventBus 提供的 5 种线程模型的作用及各模型的使用场景与注意事项,特别是在 POSTING
线程模型下,要谨慎的使用黏性事件和必达事件。
上一篇中未对线程模型的具体实现做分析,本篇文章分析下线程模型是如何做线程调度的。
本篇文章只讨论发布普通事件的线程调度,粘性事件和必达事件的线程调度与普通事件略有不同。
线程模型
先简单回顾下上篇文章中关于线程模型的分析:
- POSTING:对于普通事件来说,事件发布与订阅方法将在同一个线程,这是该线程调度模型的本意
- MAIN:在主线程来分发事件,根据是否在 Android 上使用,处理逻辑不同
- MAIN_ORDER:在主线程依次分发事件
- BACKGROUND:在后台线程来分发事件,根据是否在 Android 上使用,处理逻辑不同。使用单线程处理,尽量不要进行耗时操作以免阻塞后台线程
- ASYNC:在异步线程来分发事件,使用线程池处理
事件分发器
EventBus 使用 事件分发器 对事件进行线程调度,除 POSTING
线程模型外,其他 4 种线程模型都有 事件分发器 的身影,事件分发器 主要在postToSubscription
方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case MAIN_ORDERED: if (mainThreadPoster != null) { mainThreadPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
|
在 postToSubscription
方法中可以看出各线程模型对应分别对应哪种 事件分发器:
- MAIN,MAIN_ORDERED:mainThreadPoster,
mainThreadPoster
在Android 平台上的类型是 HandlerPoster
- BACKGROUND:backgroundPoster,
backgroundPoster
的类型是 BackgroundPoster
- ASYNC:asyncPoster,
asyncPoster
的类型是 AsyncPoster
以上事件发布器都实现了 Poster
接口:
1 2 3 4 5 6 7 8 9 10
| public interface Poster {
void enqueue(Subscription subscription, Object event); }
|
Poster
接口是对事件分发器行为的抽象。
从 Poster
接口中唯一的接口方法名称 enqueue
来看,可以大概猜测到事件分发器的实现思想为:事件循环机制。
既然是 事件循环,接口方法名称又是 enqueue
,猜测有队列相关的实现。
待发布事件队列
每个事件分发器都有一个待发布事件队列,将待发布的事件存储在队列中,待发布事件队列是通过一种先进先出(FIFO)的单向链表实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| final class PendingPostQueue { private PendingPost head; private PendingPost tail;
synchronized void enqueue(PendingPost pendingPost) { if (pendingPost == null) { throw new NullPointerException("null cannot be enqueued"); } if (tail != null) { tail.next = pendingPost; tail = pendingPost; } else if (head == null) { head = tail = pendingPost; } else { throw new IllegalStateException("Head present, but no tail"); } notifyAll(); }
synchronized PendingPost poll() { PendingPost pendingPost = head; if (head != null) { head = head.next; if (head == null) { tail = null; } } return pendingPost; }
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException { if (head == null) { wait(maxMillisToWait); } return poll(); } }
|
待发布事件队列中有两个字段,其中 head
是真链表,标识链表的头部元素,tail
是假链表,它是真链表尾部元素的引用,标识链表的尾部元素,它的作用是方便在链表尾部插入元素,元素数据中有 next
字段指向下一个元素,下图展示了元素入队时真假链表的变化情况:
箭头向右表示元素入队
- 初始时,
head
和 tail
均为 null
,
- 当第一个元素入队时,
head
和 tail
均为 event1
,
- 当第二个元素入队时,
event2
插入 head
的尾部,tail
的引用更新为 event2
- 当第三个元素入队时,
event3
插入 head
的尾部,tail
的引用更新为 event3
当插入更多元素时依次类推,head
为链表的头部元素,tail
为链表的尾部元素。
元素出队的情况,正好与元素入队相反,下图展示了元素出队时真假链表的变化情况:
箭头向左表示元素出队
- 当链表中有三个元素时,出队一个元素,即取出
head
头部元素 event1
,然后把 head
的 next
元素 event2
指向为 head
,判断 head
是否为 null
,如果是,则认为链表中没有数据,将 head
和 tail
均置为 null
, 否则 tail
尾部元素不变
- 当链表中有两个元素时,出队一个元素,即取出
head
头部元素 event2
,然后把 head
的 next
元素 event3
指向为 head
,判断 head
是否为 null
,如果是,则认为链表中没有数据,将 head
和 tail
均置为 null
, 否则 tail
尾部元素不变
- 当链表中有一个元素时,出队一个元素,即取出
head
头部元素 event3
,然后把 head
的 next
元素 null
指向为 head
,判断 head
是否为 null
,如果是,则认为链表中没有数据,将 head
和 tail
均置为 null
, 否则 tail
尾部元素不变
待发布事件队列元素
待发布事件队列的元素是 PendingPost
,它是单向链表结构,也是订阅者和订阅方法与事件的包装类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| final class PendingPost { private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event; Subscription subscription; PendingPost next;
private PendingPost(Object event, Subscription subscription) { this.event = event; this.subscription = subscription; }
static PendingPost obtainPendingPost(Subscription subscription, Object event) { synchronized (pendingPostPool) { int size = pendingPostPool.size(); if (size > 0) { PendingPost pendingPost = pendingPostPool.remove(size - 1); pendingPost.event = event; pendingPost.subscription = subscription; pendingPost.next = null; return pendingPost; } } return new PendingPost(event, subscription); }
static void releasePendingPost(PendingPost pendingPost) { pendingPost.event = null; pendingPost.subscription = null; pendingPost.next = null; synchronized (pendingPostPool) { if (pendingPostPool.size() < 10000) { pendingPostPool.add(pendingPost); } } } }
|
看了 PendingPost
的源码,笔者想到了 Android 中 Message
的实现,我们在 Android 中使用 Handler 发消息时,是不是也经常使用 handler.obtainMessage()
或者使用 Message.obtain()
去获取一个 Message 的实例,Handler 使用完 Message 后会自动调用 Message#recycle()
方法回收利用这个 Message 实例,正好对应 PendingPost#releasePendingPost()
方法。Message
使用了「享元模式」达到循环利用对象,避免重复创建的目的,看来 PendingPost
也是使用了「享元模式」,在第一篇 Android-EventBus修改纪实 中我们也提到 FindState
也是使用了「享元模式」。
HandlerPoster
HandlerPoster
用于 MAIN
和 MAIN_ORDERED
线程模型,将事件分发到主线程处理。
HandlerPoster
继承自 Handler
传入 MainLooper
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive;
public HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) { super(looper); this.eventBus = eventBus; this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); }
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } }
@Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } } }
|
HandlerPoster
实现 Poster
接口中的 enqueue
方法用于事件入队,在 handleMessage
方法中调用 EventBus#invokeSubscriber
方法传入待发布事件队列元素分发事件:
1 2 3 4 5 6 7 8 9 10 11 12 13
| void invokeSubscriber(PendingPost pendingPost) { Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event); } }
|
HandlerPoster
为避免阻塞主线程,默认单轮事件分发最多执行 10 毫秒,即 10 毫秒内可以分发多个事件,多个订阅方法的执行耗时之和超时 10 毫秒后,就会停止本轮事件分发,重新发送一个空消息给 Handler 等待执行下轮事件分发。这里的处理思路,我们可以借鉴。
BackgroundPoster
BackgroundPoster
用于 BACKGROUND
线程模型,将事件分发到后台线程。
BackgroundPoster
实现了 Runnable
接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue; private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); }
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } }
@Override public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } } }
|
BackgroundPoster
同样实现了 Poster
接口中的 enqueue
方法用于事件入队,不过 BackgroundPoster
使用线程池进行事件分发,所以在 run
方法中调用 EventBus#invokeSubscriber
方法传入待发布事件队列元素分发事件。
BackgroundPoster
在处理时间分发时,第一次获取元素时调用了 poll(int)
方法,如果队列为空,最长等待1000毫秒后再次取头部元素,为何要这样实现?
这里笔者猜测是想提高事件分发效率与线程利用率,因为 BackgroundPoster
是在子线程中分发事件,如果队列为空,可以接收最长等待1000毫秒后再次取头部元素,减小在第二次加锁取头部元素还为空的几率,及减小线程切换导致的资源开销,充分利用线程资源,提高事件分发效率。
BackgroundPoster
默认使用 EventBus 中的线程池 CachedThreadPool
,在事件分发时,BackgroundPoster
忽略了 InterruptedException
,即没有响应线程中断,如果我们使用自定义的线程池,我们在外边关闭线程池后,这可能导致事件分发还在继续。
最后,BACKGROUND
线程模型描述中说使用单线程进行事件分发,为何在 BackgroundPoster
中却是使用的线程池呢?
其实这个问题我们很容易就解答了。因为在 BackgroundPoster
中使用了 executorRunning
变量,这个变量保证执行单轮事件分发时,都使用线程池中的一个线程,单轮事件分发完成后,新的一轮事件分发可能就使用其他的线程了。
AsyncPoster
AsyncPoster
用于 ASYNC
线程模型,将事件分发到异步线程。
AsyncPoster
实现了 Runnable
和 Poster
接口,使用线程池实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue; private final EventBus eventBus;
AsyncPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); }
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this); }
@Override public void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); } }
|
AsyncPoster
的实现比较简单,调用 enqueue
方法将事件入队并交由线程池进行事件分发,run
方法处理事件分发。
AsyncPoster
默认使用 EventBus 中的线程池 CachedThreadPool
,CachedThreadPool
没有核心线程,允许创建 Integer.MAX_VALUE
个工作线程且线程空闲时的存活时长为 60 秒,如果发布多个耗时较短的事件或多个耗时较长的时间,都会导致创建多个工作线程而浪费资源。
至此,EventBus 的线程调度分析完成。
总结
本文对 EventBus 的线程调度进行了分析,最后我们做下总结:
PendingPostQueue
在 PendingPostQueue
中,我们学到了如何使用单链表实现先进先出(FIFO)队列。
PendingPost
在 PendingPost
中,我们学到了「享元模式」的使用以及如何定义单链表数据结构。
HandlerPoster
在 HandlerPoster
中判断单轮事件分发最大处理时长机制,及时让出主线程的执行权,避免阻塞主线程;不过这个机制适用于耗时短的订阅方法,如果订阅方法耗时较长,一样会阻塞主线程。
BackgroundPoster
在 BackgroundPoster
中调用 poll(int)
提高事件分发效率和线程利用率,不过 BackgroundPoster
忽略了线程中断异常,使用自定义线程池时需要注意这里,最后我们解答了 BACKGROUND
线程模型描述中说使用单线程进行事件分发,而 BackgroundPoster
却是使用的线程池问题。
AsyncPoster
AsyncPoster
比较简单,一般简单的地方反而意味着是比较危险的,我们通常会看不到它的危险性,因为我们认为它足够简单。
Poster
Poster
说一下吧,设计框架时尽量使用接口或抽象类。
最后,还是那句话:无论使用哪种线程模型,在订阅方法中都应该尽量避免进行耗时操作。
happy~,希望可以帮你更好的使用 EventBus