【开源项目源码分析】EventBus 源码分析

0. 基础

EventBus 是一个基于事件的“发布-订阅”模式的事件总线框架,有以下基础的概念:

  • 订阅者(Subscriber):订阅消息的角色;
  • 发布者(Publisher):发布事件的角色;
  • 事件(Event):代表一个可以被发布的消息;
  • 事件总线(EventBus):用于管理订阅关系和消息的发布的总线;

引用 EventBus官网上关于“发布-订阅” 模式的经典示意图:
EventBus

从上图中可以看出,整个模式是围着着“事件”的传递进行的,一个事件从发布者,经由总线调度后,被分发给多个订阅者,然后完成整个过程。这个也被称为“生产者-消费者” 模式。

可见该模式的步骤如下:

  1. 预先定义一些事件;
  2. 订阅者总线那里订阅一些自己关注的事件;
  3. 当某些条件满足后,发布者通过总线 发布事件;
  4. 订阅者收到事件后进行各自相应的处理;
  5. 如果订阅者不再对事件感兴趣的话,就去总线处取消订阅,后续的事件也将不会再通知到这个订阅者。

1. 源码分析

“订阅-发布” 模式中各个角色的实现:

角色 实现 说明
订阅者(Subscriber) SubscriberMethod 订阅者是一个个普通的类,实际上是以方法为单位作为真正的订阅者的。SubscriberMethod订阅方法,代表在EventBus中注册的一个方法
发布者(Publisher) EventBus 在EventBus的框架中,是由EventBus这个类来“兼任“了消息发布者的角色
事件(Event) 一些预先定义的POJO格式的java类,也是普通的Java类 在EventBus中是用一个一个的类来描述事件的,根据类的Class对象类区分
事件总线(EventBus) EventBus类 主要的API类,包含有总线的职责

其他一些关键类:

  • Subscription:订阅信息,内部包含有订阅者和订阅方法的信息;
  • EventBusBuilder:用于构造一个EventBus 对象的Builder类;
  • SubscriberMethodFinder:用于在类中通过反射来查找某个注解的方法;

EventBus类是整个框架的接口调用类,是使用Builder 设计模式来进行对象的构造的,通过EventBusBuilder类来传入各种参数。各个参数参考官方的说明:EventBus配置

1.1 订阅过程

EventBus框架是使用Java注解语法来实现的,这样做是为了使用的简单,只需要在订阅者类的内部给一些方法加上注解,如下:

1
2
3
4
5
// This method will be called when a MessageEvent is posted (in the UI thread for Toast)
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MessageEvent event) {
Toast.makeText(getActivity(), event.message, Toast.LENGTH_SHORT).show();
}

在注解添加之后,需要给该方法(订阅者)进行注册,在注册之后,一个订阅者的订阅过程就完成了。事件的订阅过程就是从EventBus.register(Object subscriber) 订阅方法开始的。register 这个方法的内容很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 注册方法的过程是:
* 1. 通过调用 SubscriberMethodFinder 辅助类的 findSubscriberMethods 方法查找到这个订阅者里面有哪些方法是有 @Subscribe 这个注解的,并且把找到的方法放在了一个List里面;
* 2. 然后对List中的每一个元素调用 subscribe(Object subscriber, SubscriberMethod subscriberMethod) 方法来进行订阅。
*/
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
// 这里使用通过块,保证在多线程环境中调用 register 方法也是线程安全的
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

那么,继续跟踪subscribe(Object subscriber, SubscriberMethod subscriberMethod),能够看到最终的订阅过程,是把一个新的订阅放在类EventBus 的 subscriptionsByEventType 成员中了,该成员是一个Map对象,里面存放的是EventType作为Key,List 作为Value 的映射。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 用于存放 “事件-订阅关系列表” 的数据,是一个Map对象
// 存放订阅关系的列表是一个 CopyOnWriteArrayList<Subscription> 的集合类,这是一个基于Copy-On-Write机制实现的列表,能够保证多线程环境中的线程安全
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 存放订阅者对象和它注册的所有的Event的列表之间的映射
private final Map<Object, List<Class<?>>> typesBySubscriber;
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 事件类型,EventType 是事件类的Class对象
Class<?> eventType = subscriberMethod.eventType;
// 把订阅者和订阅方法合成一个 Subscription 对象,用于后续的处理
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 检查当前的 subscriptionsByEventType 集合里面是否已经存在了同样的订阅信息
// 存在的话,就直接抛出异常了,由此可见,不允许一个订阅者中注册多个Event相同的方法
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
// 把将要添加Subscription 对象按照优先级添加到List里面,List顺序是按照 SubscriberMethod.priority 的数值
// 由大到小排列的,这个顺序决定了在分发事件的时候的顺序。
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 把新的订阅者添加到 typesBySubscriber 变量中,该变量保存的是订阅者和它内部所注册的事件的列表的映射
// 保存到typesBySubscriber 中的作用是为了后续的取消注册的流程,这个后续回讲到。
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 粘性事件的处理(Sticky Event),关于粘性事件的详情,参考 1.4 章节
if (subscriberMethod.sticky) {
// eventInheritance 属性表示是否针对事件Event的父类也进行匹配,如果设为true,则在发送一个Event的时候
// 注册该Event 的父类也可以收到该事件。
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

以上就是订阅过程的源码,总的来说,分为以下几个步骤(在调用 EventBus.register 之后):

  1. 通过反射查找注册的订阅者对象中的订阅方法(SubscriberMethod)的列表;
  2. 把订阅方法列表中的 SubscriberMethod 对象逐个调用subscribe 方法;
    2.1. 在subscribe 方法中,先通过 SubscriberMethod 对象构造出一个Subscription 对象来;
    2.2. 在subscribe 方法中,把构造出的 Subscription 对象依次正确地放在EventBus 的 subscriptionsByEventTypesubscribedEvents 两个成员中,这两个成员都是Map类型的集合,存放的是已订阅的订阅信息;
    2.3. 在subscribe 方法中,如果注册的是粘性事件,处理粘性事件的特殊逻辑;

总的来说,就是收集订阅信息,把他们放在EventBus 的subscriptionsByEventTypesubscribedEvents 两个成员中。

以上,就完成了订阅的流程。

1.2 取消订阅过程

取消发布的流程,是通过调用EventBus 的public synchronized void unregister(Object subscriber) 方法来完成的。和订阅的原理类似,即是通过操作typesBySubscriber 变量的内容来完成的,不同的是订阅过程是往集合类里面添加数据,而取消注册正好相反,是从集合中移除数据。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/** Unregisters the given subscriber from all event classes. */
public synchronized void unregister(Object subscriber) {
// typesBySubscriber 变量的内容,是在注册的时候添加的进去的,Map的Key就是订阅者对象
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
// 调用 unsubscribeByEventType,内部实现是把订阅信息从 subscriptionsByEventType 中移除
unsubscribeByEventType(subscriber, eventType);
}
// 把订阅信息从 typesBySubscriber 中移除了
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
// 找到集合中存储的和目标订阅者一样的引用
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

从上面的源码中可以看出,取消订阅的过程是和订阅的正好相反的,订阅的过程是把订阅信息从EventBus 的subscriptionsByEventTypesubscribedEvents 两个成员中移除。

1.3 发布事件过程

事件的发布过程,是通过 EventBus.post(Object event) 方法来发布一个事件,post方法的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void post(Object event) {
// currentPostingThreadState 是EventBus 的一个成员,是ThreadLocal类型的,
// 内部包含的是一个 PostingThreadState 类型的对象,PostingThreadState 内部包含有一个待发送事件的队列
// 由于postingState 是ThreadLocal类型的对象持有的,所以每一个线程都有各自的postingState 对象
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
// 把需要发布的事件,添加到队列中
eventQueue.add(event);
// isPosting 标志着是否正在添加
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 循环取出队列中的Event事件,来执行 postSingleEvent 方法进行事件的发布
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

首先,我们能够看到,事件的发布,是由一个队列控制着的,保证了事件的发生顺序是按照post 方法的调用顺序进行发布的。EventBus 中的currentPostingThreadState 是一个ThreadLocal对象,里面包含有一个 PostingThreadState 类型的对象,这个PostingThreadState 是EventBus 的静态内部类,代表着当前线程正在发布事件的状态,内部仅仅定义了一些成员,没有逻辑在内部,可以理解为一个数据的集合:

1
2
3
4
5
6
7
8
9
/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}

每一个被post 的事件,都会被放在当前线程的PostingThreadState 对象的 eventQueue 里面,这个 eventQueue 是一个列表(当作队列使用),在post 方法里面循环遍历 eventQueue,依次调用postSingleEvent方法,它的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
// eventInheritance 是EventBus 的一个成员
// 设置为 true的话,表示在发布一个Event的时候,所有定于了Event类型以及它的父类的订阅者,都可以收到该事件
if (eventInheritance) {
// 循环遍历找到eventClass 的父类们
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
// 针对事件的各个层级的父类来调用 postSingleEventForEventType 来完成事件的发布
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 不设置 eventInheritance 的话,直接调用postSingleEventForEventType 来完成事件的发布
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 没有找到订阅者的话,会输出一些日志
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
// 如果设置了sendNoSubscriberEvent 参数的话,会在找不到订阅者的时候,发布一个 NoSubscriberEvent 事件
// 该参数用于处理异常
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

在postSingleEvent 方法的内部,会处理 eventInheritance 参数的逻辑,分别针对 eventInheritance设为true 和false两种情况进行了不同的处理,但是相同的是,最终都会调用到 postSingleEventForEventType 方法,它的逻辑如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 从subscriptionsByEventType 中获取到订阅信息,这个信息是在订阅者进行订阅的时候添加的
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 循环遍历,调用 postToSubscription
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

接下来,是到了 postToSubscription 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 这里就是处理多个线程参数的情况,针对不同的目标线程,是决定直接调用 invokeSubscriber 执行发布
// 还是添加到目标线程的队列里面,然后在队列里面再执行 invokeSubscriber
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
// 需要在主线程执行,会先check下当前是不是主线程,不是的话,就会先添加到队列里面
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
// 最终执行发布事件的方法,发布事件,就是反射订阅者的订阅方法Method对象,执行这方法,并把参数传进去
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

从上面的分析,可以看到发布事件的调用顺序是:

1
2
post() --> postSingleEvent() --> postSingleEventForEventType() --> postToSubscription() --> invokeSubscriber()

调用的步骤是:

  1. 调用 post() 方法开始发布事件;
  2. 把需要发布的事件,放在当前线程的发布队列里面;
  3. 循环把发布队列中的事件挨个进行处理;
  4. 处理 eventInheritance 参数的逻辑;
  5. 按照需要发布事件的线程类型(ThreadMode),来确定是直接调用invokeSubscriber方法来发布,还是先加到目标线程的队列中;
  6. 最终,调用 invokeSubscriber 方法,执行订阅方法(Method) 的invoke方法,通过反射来执行订阅方法,完成事件的发布;

以上,就是普通事件的发布过程。

1.4 发布粘性事件(Sticky Event)

粘性事件,是相对于普通事件来说的,对于普通的事件来说:先订阅、再发布事件是一个标准的步骤,也就是说,如果想要能接收到一个事件的话,那前提是得先订阅了这个事件,如果说在事件发布的时候,还没订阅这个事件,那么这个订阅者是收不到的这个事件的。

然而,在某些特殊的场景中,是能够希望订阅者的订阅过程哪怕是晚一点也能够收到事件的,能够实现的是,只要一个订阅者订阅了事件,那哪怕是在订阅前发生的这个事件,这个订阅者也照样能收得到。对于这样的特殊的事件,被称为粘性事件(Sticky Event)。粘性这个概念,指的就是这样的场景,这个概念在Android系统中也有,就是粘性广播,和这里的粘性事件类似的含义。

粘性事件的发布流程,大体上和普通事件的一样,是通过postSticky 方法实现的:

1
2
3
4
5
6
7
8
public void postSticky(Object event) {
// 先加入到Eventbus 的 stickyEvents 对象中,这也是一个Map对象
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// 然后再调用普通事件的post
post(event);
}

从源码中可以看出,Sticky Event 的发布,其实就是先把Sticky Event加入到stickyEvents集合里面,然后又调用了 post 方法。也就是说比普通的事件的发布多了一个加入到 stickyEvents 的过程。

这个 stickyEvents 是EventBus 的一个成员,是一个Map类型的集合对象,这个对象在 1.3 发布事件过程 里面已经提到过,在订阅的时候,当订阅的事件是Sticky 的时候,会check当前的 stickyEvents 里面是否存在该类型的Event,有的话,就直接调用到 postToSubscription 执行事件的发布了。

由此就可以明白,所谓的Sticky Event,其实就是在postSticky之后,把事件在 stickyEvents 成员中缓存起来,当后续有订阅者来订阅该类型的Event 的时候,则直接从 stickyEvents 缓存中读取事件并直接发布事件。这样,就做到了“先发布、后订阅”也能让订阅者收到粘性事件了。

1.5 线程调度

EventBus中的事件发布,支持多线程的调度,多线程的调度主要是用在事件的发布过程中的postToSubscription方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 直接执行
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
// 不直接执行,调用Poster接口装载到队列里面
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}

从源码中可以看出来,线程的调度,都是通过Poster 接口的enqueue 方法来实现的。Poster接口代表一个和线程关联的事件队列,包含一个像线程事件队列的接口enqueue() 方法。

Poster 接口的定义很简答,只有一个装载事件到队列的方法:

1
2
3
4
5
6
7
8
9
10
interface Poster {
/**
* Enqueue an event to be posted for a particular subscription.
*
* @param subscription Subscription which will receive the event.
* @param event Event that will be posted to subscribers.
*/
void enqueue(Subscription subscription, Object event);
}

Poster有多个子类的实现,分别对应了不同的线程处理模式:

  • AsyncPoster: 对应于ThreadMode.ASYNC 模式,该模式总是把事件在一个新的线程中执行。内部是使用了EventBus 的 executorService 成员(线程池对象)来执行的;
  • BackgroundPoster: 对应于 ThreadMode.BACKGROUND模式,该模式会把事件发布到后台线程中执行。内部也是使用了EventBus 的 executorService 成员(线程池对象)来执行的;
  • HandlerPoster: 对应于 ThreadMode.MAIN模式,该模式是把事件发布到Android的主线程中执行。HandlerPoster 是继承了Android 的Handler 类。

在3个Poster 的子类中,AsynPoster 和BackgroundPoster的原理是类似的,都是通过EventBus的线程池对象来执行的。以BackgroundPoster 为例,它的 enqueue 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
// 使用EventBus 的线程池对象执行,本身是一个Runnable 的接口
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
...
// run 方法中的主要逻辑是调用 invokeSubscriber 方法来执行事件的发送
eventBus.invokeSubscriber(pendingPost);
...
}

而HandlerPoster类,它的构造,是使用了一个外部的 Looper对象,关于Looper机制,是Android平台上面的Event-Loop 机制的实现,这里不再赘述(这个外部的Looper对象,针对Android平台上来说,默认就是Android的主线程的Looper):

1
2
3
4
5
6
7
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
// 调用父类的构造方法,关联到一个Looper
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}

调用上面这个构造方法的地方,是在 MainThreadSupport.createPoster() 里面,而里面的Looper参数,则来自EventBus 的mainThreadSupport成员:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
MainThreadSupport getMainThreadSupport() {
if (mainThreadSupport != null) {
return mainThreadSupport;
} else if (AndroidLogger.isAndroidLogAvailable()) {
// 在Android平台上,是使用的
Object looperOrNull = getAndroidMainLooperOrNull();
return looperOrNull == null ? null :
new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
} else {
return null;
}
}
Object getAndroidMainLooperOrNull() {
try {
// 在Android平台上,返回的就是主线程的Looper
return Looper.getMainLooper();
} catch (RuntimeException e) {
// Not really a functional Android (e.g. "Stub!" maven dependencies)
return null;
}
}

HandlerPoster 也实现了 Poster 的enqueue 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 加到一个队列一个
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
// 调用Handler 的sendMessage,会触发handleMessahe
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
// 收到 enqueue 里面的sendMessage
@Override
public void handleMessage(Message msg) {
// 这个方法是执行在该HandlerPoster 关联的Looper所在的线程里面,在Android平台上,就是app的主线程。
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
// 从 queue 中取出消息,这是在 enqueue 方法中加入到队列的
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
// 调用 invokeSubscriber 方法完成事件的发布执行
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}

从上面的源码中,能看出来对于ThreadMode.MAIN 模式的线程调度的实现,就是通过主线程的Looper关联一个Handler来实现消息的发布的。

以上,就是EventBus 的主要的源码的分析。

坚持原创技术分享,您的支持将鼓励我继续创作!