0%

Android-EventBus修改纪实(一)-必达事件

Android-EventBus修改纪实

持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第 3 天,点击查看活动详情


背景

笔者在使用 EventBus 的过程中发现有时只能收到最后一次的黏性 Event ,导致业务逻辑出现混乱,下面是笔者的使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Event.java
public final class Event {

private final int code;

public Event(int code) {
this.code = code;
}

public int getCode() {
return code;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Example.java
public class Example {

// 调用多次
public void test(int code) {
EventBus.getDefault().postSticky(new Event(code));
}

// 调用多次 `test(int code)` 后再注册订阅者
public void register() {
EventBus.getDefault().register(this);
}

@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
public void receiveEvent(Event event) {
// 发现只能收到最后一次的黏性事件
System.out.println(event.getCode());
}
}

所以去查看了 EventBus 的源码,接下来我们分析下 EventBus 发送黏性事件的流程。

分析

黏性事件

以下源码基于 EventBus 3.3.1 版本

下面是发送黏性事件的源码:

1
2
3
4
5
6
7
8
9
private final Map<Class<?>, Object> stickyEvents;

public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}

postSticky 代码比较简单,首先对 stickyEvents 进行加锁,接下来把 event 事件的 Class 对象作为 Key,event 事件本身作为 value 放进 Map 中,其中stickyEvents 是 Map 对象,实例是 ConcurrentHashMap, 其 Key 和 Value 的泛型形参分别是 Class<?>Object, 它的作用就是用来存储黏性事件;然后调用 post(event) 把黏性事件当作普通事件发送一下。

首先我们看下最后为什么要调用下 post(event)

虽然 post(evnet) 上面有注释,简单翻译下:“在放进 Map 后应该再发送一次,以防止订阅者想立即删除此事件”,读完注释后,可能还是不太明白,这里笔者认为:在前面存储完黏性事件后,这里调用 post 把黏性事件当作普通事件发送出去,或许是因为现在已经有注册的黏性事件订阅者,此时把已经注册的黏性事件订阅者当作普通事件的订阅者,这样已经注册的黏性事件订阅者可以立即收到相应的事件,只是此时事件不再是黏性的。

postSticky 中我们并没有看到黏性事件是在哪里发送的,想一想我们使用黏性事件的目的是什么?当注册订阅者时可以收到之前发送的事件,这样来看,黏性事件的发送是在注册订阅者时,下面是注册订阅者的源码,删除了一些无关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void register(Object subscriber) {

// 省略无关代码

Class<?> subscriberClass = subscriber.getClass();

// 查找订阅者所有的Event接收方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

register 代码也比较简单,首先通过订阅者的 Class 对象查找订阅者所有的Event事件接收方法,然后对 EventBus 对象加锁,遍历所有的Event事件接收方法 subscriberMethods 调用 subscribe 方法,以下是 subscribe 方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Key 为 Event Class 对象,Value 为存储 Event 的订阅者和接收 Event 方法对象的集合 
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

// Key 为订阅者对象,Value 为订阅者中的 Event Class对象集合
private final Map<Object, List<Class<?>>> typesBySubscriber;

// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// Event Class对象
Class<?> eventType = subscriberMethod.eventType;

// 订阅者和接收 Event 方法对象
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);

// 根据 Event Class对象,获取订阅者和接收 Event 方法对象的集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);

// 判断订阅者和接收 Event 方法对象是否为空
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
// 判断是否已经包含了新的订阅者和接收 Event 方法对象,若是包含则认为是重复注册
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

// 这里是按优先级排序插入到集合中
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

// 这里是把 Event Class对象添加进对应订阅者的 Event Class对象集合中
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}

// 上面已经判断了是否重复注册,所以这里直接添加
subscribedEvents.add(eventType);

// 接下来就是黏性事件的发送逻辑了
// 判断 Event 接收方法是否可以处理黏性事件
if (subscriberMethod.sticky) {
// 这里判断是否考虑 Event 事件类的继承关系,默认为 Ture
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

在上面的源码中,增加了不少注释有助于我们读懂源码,在源码的最后就是黏性事件的发送逻辑了,其中有两个分支,其中一个分支根据 Event 事件的继承关系发送事件,另外一个分支根据接收 Event 方法中的 Event Class 对象从 stickyEvents 中直接查找黏性事件,最后两个分支殊途同归,都调用了 checkPostStickyEventToSubscription 方法:

1
2
3
4
5
6
7
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}

checkPostStickyEventToSubscription 方法很简单,对黏性事件做下判空处理,继续调用 postToSubscription 方法,传入订阅者与接收 Event 方法对象,黏性事件和是否是主线程布尔值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;A
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

postToSubscription 方法比较长,但是比较好理解,就是根据接收 Event 方法上的 @Subscribe 注解中传入的线程模型进行事件的分发,具体的事件分发流程,有空再分析,本文就先不分析了,现在我们只需知道最后都会调用 invokeSubscriber(Subscription subscription, Object event) 方法即可:

1
2
3
4
5
6
7
8
9
10
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 反射调用 Event 接收方法传入 Event 事件
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

终于在 invokeSubscriber 方法中找到调用 Event 接收方法的地方了,原来 EventBus 最后是通过反射调用 Event 接收方法并传入相应 Event 事件的。

分析完 Event 事件的发送流程,好像没有发现为什么有时收不到黏性事件。

我们回过头来再看下笔者的使用示例,为了方便查看,下面贴出使用示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Example.java
public class Example {

// 调用多次
public void test(int code) {
EventBus.getDefault().postSticky(new Event(code));
}

// 调用多次 `test(int code)` 后再注册订阅者
public void register() {
EventBus.getDefault().register(this);
}

@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
public void receiveEvent(Event event) {
// 发现只能收到最后一次的黏性事件
System.out.println(event.getCode());
}
}

可能细心的读者已经发现 test 方法调用了,问题应该出在 postSticky 方法中,让我们再次查看 postSticky 方法:

1
2
3
4
5
6
7
8
9
private final Map<Class<?>, Object> stickyEvents;

public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}

根据前面分析 postSticky 方法的结果,stickyEvents 用于存储黏性事件,它是个 Map 结构,而 stickyEvents 的 Key 正是 Event 的 Class 对象,根据 Map 结构的存储原理:如果存在相同的 Key,则覆盖 Value 的值,而 stickyEvents 的 Value 正是 Event 本身。

终于真相大白,多次调用 test 方法发送黏性事件,EventBus 只会存储最后一次的黏性事件。

小结

EventBus 针对同一个黏性 Event 事件只会存储最后一次发送的黏性事件。

EventBus 的上述实现可能是因为多次发送同一个黏性事件,则认为之前的事件是过期事件应该抛弃,因此只传递最新的黏性事件。

EventBus 的这种实现无法满足笔者的业务逻辑需求,笔者希望多次发送的黏性事件,订阅者都能接收到,而不是只接收最新的黏性事件,可以理解为黏性事件必达订阅者,下面让我们修改 EventBus 的源码来满足需求吧。

修改

上一节我们分析了黏性事件的发送流程,为了满足黏性事件必达的需求,基于现有黏性事件流程,我们可以仿照黏性事件的发送来提供一个发送必达消息的方法。

Subscribe

首先我们定义 Event 接收方法可以接收黏性事件是在 @Subscribesticky = true , 所以我们可以修改 Subscribe 注解,增加黏性事件必达的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POSTING;

/**
* If true, delivers the most recent sticky event (posted with
* {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
*/
boolean sticky() default false;

// 增加消息必达的方法
boolean rendezvous() default false;

/** Subscriber priority to influence the order of event delivery.
* Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
* others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
* delivery among subscribers with different {@link ThreadMode}s! */
int priority() default 0;
}

rendezvous 以为约会、约定的意思,可以理解为不见不散,在这里它有两层作用,其一是标记方法可以接收黏性事件,其二是标记方法接收的事件是必达的。

findSubscriberMethods

接下来就需要解析 rendezvous 了,我们先看看 sticky 是如何解析的,在上一节我们分析了 register 方法,方便查看,下面再贴出 register 方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void register(Object subscriber) {

// 省略无关代码

Class<?> subscriberClass = subscriber.getClass();

// 查找订阅者所有的Event接收方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

上一节分析中,我们没有分析查找订阅者中所有的 Event 接收方法 findSubscriberMethods ,接下来我们分析下在 findSubscriberMethods 方法是如何查找 Event 接收方法的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 先从缓存中查找
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

// 是否忽略生成索引,默认为False,所以这里走else分支
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 查找Event接收方法
subscriberMethods = findUsingInfo(subscriberClass);
}

// 如果订阅者和订阅者父类中没有Event接收方法则抛出异常
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 添加进缓存中
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

调用 findSubscriberMethods 方法需要传入订阅者 Class 对象,通过笔者在源码中增加的注释分析发现默认调用 findUsingInfo 方法查找 Event 接收方法,我们继续跟踪 findUsingInfo 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// FindState对订阅者Class对象和Event接收方法进行了一层封装
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass); // ①
while (findState.clazz != null) {

// 查找订阅者信息,包含订阅者Class对象、 订阅者父类、Event接收方法等
findState.subscriberInfo = getSubscriberInfo(findState); // ②

// 在 ① initForSubscriber中会把subscriberInfo置为null,
// 在 ② getSubscriberInfo中没有Index对象,
// 所以第一次时这里会走else分支
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 查找Event接收方法
findUsingReflectionInSingleClass(findState);
}

// 查找父类的Event接收方法
findState.moveToSuperclass();
}

// 通过findState返回Event接收方法,并回收findState
return getMethodsAndRelease(findState);
}

根据笔者在源码中的注释分析,在 findUsingInfo 方法中使用「享元模式」对 FindState 进行回收利用,避免创建大量临时的 FindState 对象占用内存,最后再次调用 findUsingReflectionInSingleClass 方法查找 Event 接收方法,看方法名字应该是使用反射查找,findUsingReflectionInSingleClass 源码较长,删减一些不关心的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
// 通过反射获取当前类中声明的所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// 删减不关心的代码
}

// 遍历所有方法
for (Method method : methods) {

// 获取方法的修饰符
int modifiers = method.getModifiers();

// 判断方法是否是public的;是否是抽象方法,是否是静态方法,是否是桥接方法,是否是合成方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {

// 获取方法的形参Class对象数组
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {

// 获取方法上的Subscribe注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];

// 检测是否已经添加了相同签名的方法,考虑子类复写父类方法的情况
if (findState.checkAdd(method, eventType)) {

// 获取注解的参数
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky(),

// 这里我们添加rendezvous参数 ①
subscribeAnnotation.rendezvous()));
}
}
}
// 删减不关心的代码
}
// 删减不关心的代码
}
}

findUsingReflectionInSingleClass 方法中通过反射获取订阅者中声明的所有方法,然后遍历所有方法:

  1. 首先判断方法的修饰符是否符合,
  2. 其次判断方法是否只有一个形参,
  3. 再次判断方法是否有 Subscribe 注解,
  4. 然后检测是否已经添加了相同签名的方法,主要是考虑子类复写父类方法这种情况,
  5. 最后获取 Subscribe 注解的参数,在这里我们解析 rendezvous,封装进 SubscriberMethod 中。

SubscriberMethod 中增加 rendezvous 字段,删除不关心的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;

// 增加 `rendezvous` 字段
final boolean rendezvous;
/** Used for efficient comparison */
String methodString;

public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode,
int priority, boolean sticky,

// 增加 `rendezvous` 形参
boolean rendezvous) {
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
this.priority = priority;
this.sticky = sticky;
this.rendezvous = rendezvous;
}
}

postRendezvous

好的,rendezvous 已经解析出来了,接下来我们对外提供发送必达事件的接口:

1
2
3
4
5
6
7
8
9
10
// 选择List存储必达事件,使用Pair封装必达事件的Key和Value
private final List<Pair<Class<?>, Object>> rendezvousEvents;

public void postRendezvous(Object event) {
synchronized (rendezvousEvents) {
rendezvousEvents.add(Pair.create(event.getClass(), event));
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}

上面的源码,我们通过仿照 postSticky 方法实现了 postRendezvous 方法,在 postSticky 方法中使用 Map 存储黏性事件,不过我们在 postRendezvous 方法中使用 List 存储必达事件,保证必达事件不会因为 Key 相同而被覆盖丢失,最后也是调用 post 方法尝试先发送一次必达事件。

register

在上一节中我们分析了黏性事件是在 register 中调用 subscribe 方法进行发送的,这里我们仿照黏性事件的发送逻辑,实现必达事件的发送逻辑,我们可以在 subscribe 方法最后增加发送必达事件的逻辑,以下源码省略了一些不关心的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private final List<Pair<Class<?>, Object>> rendezvousEvents;

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 省略不关心的代码

// 黏性事件发送逻辑
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}

// 新增必达事件发送逻辑
// 判断方法是否可以接收必达事件
if (subscriberMethod.rendezvous) {
if (eventInheritance) {
for (Pair<Class<?>, Object> next : rendezvousEvents) {
Class<?> candidateEventType = next.first;
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = next.second;
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object rendezvousEvent = getRendezvousEvent(eventType);
if (rendezvousEvent != null) {
checkPostStickyEventToSubscription(newSubscription, rendezvousEvent);
}
}
}
}

subscribe 方法中,我们通过仿照黏性事件的发送逻辑增加了必达事件的发送:

  1. 首先判断 Event 接收方法是否可以接收必达事件
  2. 其次考虑 Event 必达事件的继承关系,
  3. 最后两个分支都调用 checkPostStickyEventToSubscription 方法发送必达事件

happy~

总结

使用第三方库时,发现问题不要慌张,带着问题去查看源码总有一番收获,这也告诫我们在使用第三库时最好先搞明白它的实现原理,遇到问题时不至于束手无策。

通过分析 EventBus 的源码,我们有以下收获:

  1. 明白了我们注册订阅者时 EventBus 做了哪些事情
  2. 知晓了我们发送黏性事件时,EventBus 是如何处理及何时发送黏性事件的
  3. 了解到 EventBus 是通过反射调用 Event 事件的接收方法
  4. 学习了 EventBus 中的一些优化点,比如对 FindState 使用「享元模式」避免创建大量临时对象占用内存
  5. 进一步了解到对并发的处理

通过以上收获,我们成功修改 EventBus 源码实现了我们必达事件的需求。

到这里我们已经完成了必达事件的发送,不过我们还剩下获取必达事件,移除必达事件没有实现,最后 EventBus 中还有单元测试 module,我们还没有针对 rendezvous 编写单元测试,读者有兴趣的话,可以自己试着实现。

希望可以帮到你~

欢迎关注我的其它发布渠道