Skip to content

Commit

Permalink
improved usage
Browse files Browse the repository at this point in the history
  • Loading branch information
MFlisar committed May 2, 2016
1 parent 665fc24 commit e54ac71
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 19 deletions.
16 changes: 2 additions & 14 deletions demo/src/main/java/com/michaelflisar/rxbus/demo/DemoActivity.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,8 @@ public void call(String s) {
.withOnNext(new Action1<String>() {
@Override
public void call(String s) {
// security check: it may happen that the valve evaluates the is resumed state while activity is resumed and that the event may be emited
// when the activity is already paused => we just repost the event, this will only happen once, as the activity is currently paused
// this is only the current workaround!!!

// Solution for projects:
// if (RXUtil.safetyQueueCheck(s, DemoActivity.this))
// Log.d(TAG, "QUEUED BUS: " + s + " | " + getIsResumedMessage());

// for demo purposes, we adjust the event to see the effect in the logs
// this will happen very rarely!!!
if (isRXBusResumed())
Log.d(TAG, "QUEUED BUS: " + s + " | " + getIsResumedMessage());
else
RXBus.get().sendEvent(s + "POSTPONED");
// activity IS resumed, you can safely update your UI for example
Log.d(TAG, "QUEUED BUS: " + s + " | " + getIsResumedMessage());
}
})
.buildSubscription();
Expand Down
76 changes: 76 additions & 0 deletions lib/src/main/java/com/michaelflisar/rxbus/InternalRXBusUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.michaelflisar.rxbus;

import com.michaelflisar.rxbus.interfaces.IRXBusIsResumedProvider;
import com.michaelflisar.rxbus.rx.RXUtil;

import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;

/**
* Created by flisar on 02.05.2016.
*/
public class InternalRXBusUtil
{
protected static <T> Action1<T> wrapQueueAction(Action1<T> action, IRXBusIsResumedProvider isResumedProvider)
{
return new Action1<T>()
{
@Override
public void call(T t)
{
if (RXUtil.safetyQueueCheck(t, isResumedProvider))
action.call(t);
}
};
}

protected static <T> Observer<T> wrapObserver(Observer<T> observer, IRXBusIsResumedProvider isResumedProvider)
{
return new Observer<T>()
{
@Override
public void onCompleted()
{
observer.onCompleted();
}

@Override
public void onError(Throwable e)
{
observer.onError(e);
}

@Override
public void onNext(T t)
{
if (RXUtil.safetyQueueCheck(t, isResumedProvider))
observer.onNext(t);
}
};
}

protected static <T> Subscriber<T> wrapSubscriber(Subscriber<T> subscriber, IRXBusIsResumedProvider isResumedProvider)
{
return new Subscriber<T>()
{
@Override
public void onCompleted()
{
subscriber.onCompleted();
}

@Override
public void onError(Throwable e)
{
subscriber.onError(e);
}

@Override
public void onNext(T t)
{
if (RXUtil.safetyQueueCheck(t, isResumedProvider))
subscriber.onNext(t);
}
};
}}
28 changes: 23 additions & 5 deletions lib/src/main/java/com/michaelflisar/rxbus/RXBusBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class RXBusBuilder<T>
private Observable<Boolean> mObservableIsResumed;
private IRXBusIsResumedProvider mIsResumedProvider;
private int mValvePrefetch = 1000;
private boolean mQueueSubscriptionSafetyCheckEnabled = true;

private Action1<? super T> mActionNext;
private Action1<Throwable> mActionError;
Expand Down Expand Up @@ -60,6 +61,13 @@ public RXBusBuilder<T> queue(Observable<Boolean> isResumedObservable, IRXBusIsRe
return this;
}

public RXBusBuilder<T> withQueueSubscriptionSafetyCheckEnabled(boolean enabled)
{
mQueueSubscriptionSafetyCheckEnabled = enabled;
return this;
}


public RXBusBuilder<T> withValvePrefetch(int prefetch)
{
mValvePrefetch = prefetch;
Expand Down Expand Up @@ -123,19 +131,29 @@ public Subscription buildSubscription()

if (mSubscriber == null && mSubscriptionObserver == null)
{
Action1<? super T> actionNext = mActionNext;
if (mQueueSubscriptionSafetyCheckEnabled)
actionNext = InternalRXBusUtil.wrapQueueAction(mActionNext, mIsResumedProvider);

if (mActionError != null && mActionOnComplete != null)
return observable.subscribe(mActionNext, mActionError, mActionOnComplete);
return observable.subscribe(actionNext, mActionError, mActionOnComplete);
else if (mActionError != null)
return observable.subscribe(mActionNext, mActionError);
return observable.subscribe(mActionNext);
return observable.subscribe(actionNext, mActionError);
return observable.subscribe(actionNext);
}
else if (mSubscriber == null)
{
return observable.subscribe(mSubscriptionObserver);
Observer<? super T> subscriptionObserver = mSubscriptionObserver;
if (mQueueSubscriptionSafetyCheckEnabled)
subscriptionObserver = InternalRXBusUtil.wrapObserver(mSubscriptionObserver, mIsResumedProvider);
return observable.subscribe(subscriptionObserver);
}
else if (mSubscriptionObserver == null)
{
return observable.subscribe(mSubscriber);
Subscriber<? super T> subscriber = mSubscriber;
if (mQueueSubscriptionSafetyCheckEnabled)
subscriber = InternalRXBusUtil.wrapSubscriber(mSubscriber, mIsResumedProvider);
return observable.subscribe(subscriber);
}
else
throw new RuntimeException("Subscription can't be build, because you have set more than one of following: nnext action, subscriber or observable!");
Expand Down
1 change: 1 addition & 0 deletions lib/src/main/java/com/michaelflisar/rxbus/rx/RXUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,5 @@ public static <T> boolean safetyQueueCheck(T event, IRXBusIsResumedProvider isRe
return false;
}
}

}

0 comments on commit e54ac71

Please sign in to comment.