Skip to content

Commit

Permalink
修改线程切换SchedulerTransformer不起作用的bug
Browse files Browse the repository at this point in the history
  • Loading branch information
xuexiangjys committed Jun 20, 2018
1 parent 43d9f4c commit 6d438ba
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import butterknife.OnClick;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.subscribers.SafeSubscriber;

/**
* RxJavaUtils演示示例
Expand Down Expand Up @@ -103,9 +104,35 @@ public void doInUIThread(String s) {
// }, new SimpleSubscriber<Integer>() {
// @Override
// public void onSuccess(Integer integer) {
// int a = 100/0;
// Log.e(TAG, "[doInUIThread] " + getLooperStatus() + ", 入参:" + integer);
// }
// });
// RxJavaUtils.executeAsyncTask(new Function<Integer, Integer>() {
// @Override
// public Integer apply(Integer integer) throws Exception {
// Log.e(TAG, "[doInIOThread] " + getLooperStatus());
// return 12345;
// }
// }).subscribe(new Consumer<Integer>() {
// @Override
// public void accept(Integer integer) throws Exception {
// Log.e(TAG, "[doInUIThread] " + getLooperStatus() + ", 入参:" + integer);
// }
// });

// RxJavaUtils.executeAsyncTask2(new Function<Integer, Integer>() {
// @Override
// public Integer apply(Integer integer) throws Exception {
// Log.e(TAG, "[doInIOThread] " + getLooperStatus());
// return 12345;
// }
// }, new SimpleSubscriber<Integer>() {
// @Override
// public void onSuccess(Integer integer) {
// Log.e(TAG, "[doInUIThread] " + getLooperStatus() + ", 入参:" + integer);
// }
// });

RxJavaUtils.executeAsyncTask(new RxAsyncTask<String, Integer>("我是入参789") {
@Override
public Integer doInIOThread(String s) {
Expand Down
60 changes: 48 additions & 12 deletions rxutil2/src/main/java/com/xuexiang/rxutil2/rxjava/RxJavaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ public static Disposable polling(long initialDelay, long interval, @NonNull Cons
/**
* 轮询操作
*
* @param initialDelay 初始延迟
* @param interval 轮询间期
* @param unit 轮询间期时间单位
* @param initialDelay 初始延迟
* @param interval 轮询间期
* @param unit 轮询间期时间单位
*/
public static Flowable<Long> polling(long initialDelay, long interval, @NonNull TimeUnit unit) {
return Flowable.interval(initialDelay, interval, unit)
Expand All @@ -174,6 +174,7 @@ public static Disposable polling(long initialDelay, long interval, @NonNull Time
}

//=================倒计时=================//

/**
* 倒计时操作【间隔1秒】
*
Expand Down Expand Up @@ -245,8 +246,8 @@ public static Disposable delay(@NonNull long delayTime, @NonNull TimeUnit unit,
/**
* 延迟操作
*
* @param delayTime 延迟时间
* @param unit 延迟时间单位
* @param delayTime 延迟时间
* @param unit 延迟时间单位
*/
public static Observable<Long> delay(long delayTime, @NonNull TimeUnit unit) {
return Observable.timer(delayTime, unit)
Expand All @@ -269,9 +270,9 @@ public static Disposable delay(long delayTime, @NonNull TimeUnit unit, @NonNull
/**
* 延迟操作
*
* @param t 发射源
* @param delayTime 延迟时间
* @param unit 延迟时间单位
* @param t 发射源
* @param delayTime 延迟时间
* @param unit 延迟时间单位
*/
public static <T> Observable<T> delay(@NonNull T t, long delayTime, @NonNull TimeUnit unit) {
return Observable.just(t)
Expand Down Expand Up @@ -359,14 +360,13 @@ public static <T, R> Flowable<R> executeAsyncTask(@NonNull T t, @NonNull Functio
/**
* 执行异步任务(IO线程处理,UI线程显示)
*
* @param t 处理入参
* @param func1 动作
* @return
*/
public static <T, R> Observable<R> executeAsyncTask2(@NonNull T t, @NonNull Function<T, R> func1) {
return Observable.just(t)
public static <R> Flowable<R> executeAsyncTask(@NonNull Function<Integer, R> func1) {
return Flowable.just(1)
.map(func1)
.compose(RxSchedulerUtils.<R>_io_main_o());
.compose(RxSchedulerUtils.<R>_io_main_f());
}

/**
Expand Down Expand Up @@ -395,6 +395,31 @@ public static <T, R> Disposable executeAsyncTask(@NonNull T t, @NonNull Function
return executeAsyncTask(t, func1).subscribe(consumer, errorConsumer);
}

/**
* 执行异步任务(IO线程处理,UI线程显示)
*
* @param t 处理入参
* @param func1 动作
* @return
*/
public static <T, R> Observable<R> executeAsyncTask2(@NonNull T t, @NonNull Function<T, R> func1) {
return Observable.just(t)
.map(func1)
.compose(RxSchedulerUtils.<R>_io_main_o());
}

/**
* 执行异步任务(IO线程处理,UI线程显示)
*
* @param func1 动作
* @return
*/
public static <R> Observable<R> executeAsyncTask2(@NonNull Function<Integer, R> func1) {
return Observable.just(1)
.map(func1)
.compose(RxSchedulerUtils.<R>_io_main_o());
}

/**
* 执行异步任务(IO线程处理,UI线程显示)
*
Expand All @@ -407,6 +432,17 @@ public static <T, R> Disposable executeAsyncTask2(@NonNull T t, @NonNull Functio
return executeAsyncTask2(t, func1).subscribeWith(subscriber);
}

/**
* 执行异步任务(IO线程处理,UI线程显示)
*
* @param func1 动作
* @param subscriber 订阅事件
* @return
*/
public static <R> Disposable executeAsyncTask2(@NonNull Function<Integer, R> func1, @NonNull BaseSubscriber<R> subscriber) {
return executeAsyncTask2(func1).subscribeWith(subscriber);
}

//==================Transformer=====================//

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

import com.xuexiang.rxutil2.rxjava.scheduler.SchedulerType;

import org.reactivestreams.Publisher;

import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* @author xuexiang
* @since 2018/6/12 下午11:25
*/
public class SchedulerTransformer<T> implements ObservableTransformer<T, T>, FlowableTransformer<T, T>, SingleTransformer<T, T>, MaybeTransformer<T, T> , CompletableTransformer {
public class SchedulerTransformer<T> implements ObservableTransformer<T, T>, FlowableTransformer<T, T>, SingleTransformer<T, T>, MaybeTransformer<T, T>, CompletableTransformer {

/**
* 线程类型
Expand All @@ -42,23 +42,19 @@ public SchedulerTransformer(SchedulerType schedulerType) {
public ObservableSource<T> apply(Observable<T> upstream) {
switch (mSchedulerType) {
case _main:
upstream.observeOn(AndroidSchedulers.mainThread());
break;
return upstream.observeOn(AndroidSchedulers.mainThread());
case _io:
upstream.observeOn(Schedulers.io());
break;
return upstream.observeOn(Schedulers.io());
case _io_main:
upstream
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
break;
case _io_io:
upstream
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
break;
default:
break;
}
Expand All @@ -69,23 +65,19 @@ public ObservableSource<T> apply(Observable<T> upstream) {
public Publisher<T> apply(Flowable<T> upstream) {
switch (mSchedulerType) {
case _main:
upstream.observeOn(AndroidSchedulers.mainThread());
break;
return upstream.observeOn(AndroidSchedulers.mainThread());
case _io:
upstream.observeOn(Schedulers.io());
break;
return upstream.observeOn(Schedulers.io());
case _io_main:
upstream
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
break;
case _io_io:
upstream
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
break;
default:
break;
}
Expand All @@ -96,23 +88,19 @@ public Publisher<T> apply(Flowable<T> upstream) {
public MaybeSource<T> apply(Maybe<T> upstream) {
switch (mSchedulerType) {
case _main:
upstream.observeOn(AndroidSchedulers.mainThread());
break;
return upstream.observeOn(AndroidSchedulers.mainThread());
case _io:
upstream.observeOn(Schedulers.io());
break;
return upstream.observeOn(Schedulers.io());
case _io_main:
upstream
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
break;
case _io_io:
upstream
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
break;
default:
break;
}
Expand All @@ -123,23 +111,19 @@ public MaybeSource<T> apply(Maybe<T> upstream) {
public SingleSource<T> apply(Single<T> upstream) {
switch (mSchedulerType) {
case _main:
upstream.observeOn(AndroidSchedulers.mainThread());
break;
return upstream.observeOn(AndroidSchedulers.mainThread());
case _io:
upstream.observeOn(Schedulers.io());
break;
return upstream.observeOn(Schedulers.io());
case _io_main:
upstream
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
break;
case _io_io:
upstream
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
break;
default:
break;
}
Expand All @@ -150,23 +134,19 @@ public SingleSource<T> apply(Single<T> upstream) {
public CompletableSource apply(Completable upstream) {
switch (mSchedulerType) {
case _main:
upstream.observeOn(AndroidSchedulers.mainThread());
break;
return upstream.observeOn(AndroidSchedulers.mainThread());
case _io:
upstream.observeOn(Schedulers.io());
break;
return upstream.observeOn(Schedulers.io());
case _io_main:
upstream
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
break;
case _io_io:
upstream
return upstream
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
break;
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@ public ProgressDialogLoader(Context context, String msg) {

@Override
public boolean isLoading() {
if (mDialog != null) {
return mDialog.isShowing();
} else {
return false;
}
return mDialog != null && mDialog.isShowing();
}

@Override
Expand Down

0 comments on commit 6d438ba

Please sign in to comment.