Skip to content

Commit

Permalink
修复RxBus多线程下可能出现的ConcurrentModificationException问题
Browse files Browse the repository at this point in the history
  • Loading branch information
xuexiangjys committed Apr 26, 2021
1 parent 640a4f7 commit 7cd4b9e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 17 deletions.
20 changes: 9 additions & 11 deletions rxutil2/src/main/java/com/xuexiang/rxutil2/rxbus/RxBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

import androidx.annotation.NonNull;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
Expand All @@ -33,12 +32,12 @@
* @author xuexiang
* @since 2018/3/1 上午10:30
*/
public class RxBus {
public final class RxBus {

/**
* 事件订阅的注册池, Key:事件名, value:事件的订阅者(事件的消费者、目标)
*/
private ConcurrentHashMap<Object, List<Subject>> maps = new ConcurrentHashMap<>();
private ConcurrentHashMap<Object, CopyOnWriteArrayList<Subject>> maps = new ConcurrentHashMap<>();

private static RxBus sInstance;

Expand All @@ -58,7 +57,6 @@ public static RxBus get() {
return sInstance;
}


/**
* 注册事件的订阅
*
Expand All @@ -76,10 +74,10 @@ public <T> Flowable<T> register(@NonNull Object eventName, Class<T> clazz) {
* @param eventName 事件名
* @return 订阅者
*/
private <T> Subject<T> register(@NonNull Object eventName) {
List<Subject> subjects = maps.get(eventName);
private <T> Subject<T> register(@NonNull Object eventName) {
CopyOnWriteArrayList<Subject> subjects = maps.get(eventName);
if (subjects == null) {
subjects = new ArrayList<>();
subjects = new CopyOnWriteArrayList<>();
maps.put(eventName, subjects);
}
Subject<T> subject = PublishSubject.<T>create().toSerialized();
Expand All @@ -94,7 +92,7 @@ private <T> Subject<T> register(@NonNull Object eventName) {
* @param flowable 需要取消的订阅者
*/
public void unregister(@NonNull Object eventName, @NonNull Flowable flowable) {
List<Subject> subjects = maps.get(eventName);
CopyOnWriteArrayList<Subject> subjects = maps.get(eventName);
if (subjects != null) {
subjects.remove(flowable);
if (subjects.isEmpty()) {
Expand All @@ -109,7 +107,7 @@ public void unregister(@NonNull Object eventName, @NonNull Flowable flowable) {
* @param eventName 事件名
*/
public void unregisterAll(@NonNull Object eventName) {
List<Subject> subjects = maps.get(eventName);
CopyOnWriteArrayList<Subject> subjects = maps.get(eventName);
if (subjects != null) {
maps.remove(eventName);
}
Expand All @@ -131,7 +129,7 @@ public void post(@NonNull Object eventName) {
* @param content 发送的内容
*/
public void post(@NonNull Object eventName, @NonNull Object content) {
List<Subject> subjects = maps.get(eventName);
CopyOnWriteArrayList<Subject> subjects = maps.get(eventName);
if (subjects != null && !subjects.isEmpty()) {
for (Subject s : subjects) {
s.onNext(content);
Expand Down
18 changes: 12 additions & 6 deletions rxutil2/src/main/java/com/xuexiang/rxutil2/rxbus/RxBusUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public <T> SubscribeInfo<T> onMainThread(@NonNull Object eventName, Class<T> cla
* @param errorConsumer 错误订阅
*/
public <T> SubscribeInfo<T> onMainThread(@NonNull Object eventName, Class<T> clazz, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {
Flowable<T> flowable = register(eventName, clazz); //注册后,返回订阅者
// 注册后,返回订阅者
Flowable<T> flowable = register(eventName, clazz);
/* 订阅管理 */
SubscribeInfo<T> info = new SubscribeInfo<>(flowable);
info.setDisposable(add(eventName, flowable.observeOn(AndroidSchedulers.mainThread()).subscribe(consumer, errorConsumer)));
Expand Down Expand Up @@ -135,7 +136,8 @@ public <T> SubscribeInfo<T> on(@NonNull Object eventName, Class<T> clazz, Consum
* @param errorConsumer 错误订阅
*/
public <T> SubscribeInfo<T> on(@NonNull Object eventName, Class<T> clazz, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {
Flowable<T> Observable = register(eventName, clazz);//注册后,返回订阅者
// 注册后,返回订阅者
Flowable<T> Observable = register(eventName, clazz);
/* 订阅管理 */
SubscribeInfo<T> info = new SubscribeInfo<>(Observable);
info.setDisposable(add(eventName, Observable.subscribe(consumer, errorConsumer)));
Expand Down Expand Up @@ -177,7 +179,8 @@ public <T> SubscribeInfo<T> on(@NonNull Object eventName, Class<T> clazz, Schedu
* @param errorConsumer 错误订阅
*/
public <T> SubscribeInfo<T> on(@NonNull Object eventName, Class<T> clazz, Scheduler scheduler, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {
Flowable<T> flowable = register(eventName, clazz);//注册后,返回订阅者
// 注册后,返回订阅者
Flowable<T> flowable = register(eventName, clazz);
/* 订阅管理 */
SubscribeInfo<T> info = new SubscribeInfo<>(flowable);
info.setDisposable(add(eventName, flowable.observeOn(scheduler).subscribe(consumer, errorConsumer)));
Expand Down Expand Up @@ -227,13 +230,16 @@ public void unregisterAll(@NonNull Object eventName) {
public void unregister(@NonNull Object eventName, Disposable disposable, Flowable flowable) {
CompositeDisposable compositeDisposable = maps.get(eventName);
if (compositeDisposable != null) {
compositeDisposable.remove(disposable); //先取消特定的事件订阅
// 先取消特定的事件订阅
compositeDisposable.remove(disposable);
if (compositeDisposable.size() == 0) {
maps.remove(eventName);
RxBus.get().unregisterAll(eventName); //没有订阅信息了,直接注销事件
// 没有订阅信息了,直接注销事件
RxBus.get().unregisterAll(eventName);
}
}
RxBus.get().unregister(eventName, flowable); //取消事件的订阅者
// 取消事件的订阅者
RxBus.get().unregister(eventName, flowable);
}

/**
Expand Down

0 comments on commit 7cd4b9e

Please sign in to comment.