Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #31: RxROS operators should use subscribe_on rather than observe_on #32

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

henrik7264
Copy link
Collaborator

The observe_on call has been changed with the subscribe_on to ensure that the operations are executed in a dedicated thread.

@gavanderhoorn
Copy link
Member

@henrik7264: you should rebase your PR once #33 gets merged.

@gavanderhoorn
Copy link
Member

gavanderhoorn commented Oct 7, 2019

@henrik7264: could you please rebase this to we can run CI on it?

nevermind, just noticed that observe_on_blocks_operators is actually a branch in this repository, so I've rebased it.

@gavanderhoorn gavanderhoorn removed their request for review October 7, 2019 10:39
@gavanderhoorn
Copy link
Member

Unassigned myself as I believe @wasowski would be in a better position to review this one.

@gavanderhoorn
Copy link
Member

@wasowski: would you have time to take a quick look at this before ROSCon?

@wasowski
Copy link
Member

I missed the original deadline, and I need a bit more time for this (need to read up...). I will really try this week ...

@@ -301,7 +301,7 @@ namespace rxros
auto publish_to_topic(const std::string &topic, const uint32_t queue_size = 10) {
return [=](auto&& source) {
ros::Publisher publisher(rxros::node::get_handle().advertise<T>(topic, queue_size));
source.observe_on(rxcpp::synchronize_new_thread()).subscribe(
source.subscribe_on(rxcpp::synchronize_new_thread()).subscribe(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been reading about this in a book and online, and as far as I can see there should be no difference between the two operators in this case. Observe_on immediately spawns a new thread for all subsequent lambda and the subscription (so the subscribed callbacks all called in the parallel thread to the current thread). Subscribe_on has the same effect only for the subsequent subscription, but not for the subsequent lambdas (map, filter, etc). In this case we only have the subscribe afterwards, so the effect should be identical. Have you observed any difference in testing @henrik7264 ?

There is some background on this here: https://stackoverflow.com/questions/44984730/rxandroid-whats-the-difference-between-subscribeon-and-observeon (two top answers are worth reading).

Thinking longer about it, I believe that we should remove observe_on (and subscribe_on), not replace it. This gives the programmer maximum flexibility. This way publish_to_topic will not spawn any new threads, unless the programmer explicitly asks for it by inserting observe_on or subscribe_on.

@@ -77,15 +77,15 @@ namespace rxros
auto send_transform() {
return [=](auto&& source) {
tf::TransformBroadcaster transformBroadcaster;
source.observe_on(rxcpp::synchronize_new_thread()).subscribe(
source.subscribe_on(rxcpp::synchronize_new_thread()).subscribe(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest removing observe_on here as well (not just replacing it with subscribe_on).

[&](const tf::StampedTransform& stf) {transformBroadcaster.sendTransform(stf);});
return source;};}


auto send_transform(const std::string &parent_frameId, const std::string &child_frameId) {
return [=](auto&& source) {
tf::TransformBroadcaster transformBroadcaster;
source.observe_on(rxcpp::synchronize_new_thread()).subscribe(
source.subscribe_on(rxcpp::synchronize_new_thread()).subscribe(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest removing observe_on here as well (not just replacing it with subscribe_on).

Basically, when we remove them, everything seems to become synchronous, based on the input source (typically but not necessarily a topic subscription). If the programmer wants to parallelize more they should insert observe_on (if they want to parallelize the pipeline) or subscribe_on (if they want to just parallelize the sending). Also we should probably explain subscribe_on and observe_on in the documentation.

@gavanderhoorn
Copy link
Member

Thanks for the review @wasowski.

@gavanderhoorn
Copy link
Member

I'd like to release an updated version of rxros soon. @henrik7264: would you have time to take a look at @wasowski's comments?

@henrik7264
Copy link
Collaborator Author

henrik7264 commented Nov 18, 2019 via email

@wasowski
Copy link
Member

We like long stories. I also looked further in the terrible book I am reading, and I found contradictory information on this issue. So let's look at your side of things. I think understanding this is key for controlling concurrency in rxros.

@henrik7264
Copy link
Collaborator Author

OK, so here is my view of this subject. First of all, here are a couple of books I have used in the past:

None of the books are really great, but lack of good literature on reactive programming and especially in the area of threading, schedulers, the special operators subscribe_on and observe_on they have been my main source of inspiration. I tend to like the RxPY book slightly better than RxCPP book.

I have also not been able to find any good explanations on the internet - they repeat over and over again the same information about threading, schedulers, and how to use subscribe_on and observe_on. But the explanation is very thin - I afraid it is because the subject is rather complex.

@gavanderhoorn
Copy link
Member

So, what's the way forward?

@henrik7264
Copy link
Collaborator Author

henrik7264 commented Nov 18, 2019

So, a typically ROS program has the following form:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");
    //...
    ros::spin();
}

The ros::spin() function blocks the main thread until it is shutdown or terminated. The other purpose of the rxros::spin function is to dispatch messages from ROS topics to the appropriate RxROS observables. It is therefore important that this function is called right after initialization of variables, function definitions etc.

The following (uninteresting) program is therefore not desirable as the ros::spint() never will be reached. The interval observable will emit an infinite number of values in the main thread and the map and subscribe functions will likewise be executed in the main thread.

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxcpp::observable<>::interval (std::chrono::milliseconds (1000))
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

By default, a message stream and the chain of operators that is apply to it works on the same thread. It is actually bad to use the word thread. The right word is scheduler. However, a scheduler is typically executed in a dedicated thread.

The ObserveOn operator changes the scheduler of the operators and observers below it. This means that the ObserveOn can be called multiple times at various points during the chain of operators in order to change on which thread the operators should operate.

The SubscribeOn operator affects the operators and observers above and below it. In other words, the SubscribeOn operator specifies which scheduler the observable message stream will begin operating on, no matter at what point in the chain the SubscribeOn is called.

This means that the following (uninteresting) program

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxcpp::observable<>::interval (std::chrono::milliseconds (1000))
    | observe_on(scheduler1)
    | map ([] (int i) {return i*2;})
    | observe_on(scheduler2)
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

behaves as follows: The interval observable will emit values every second in the main thread. The first map function will be executed in the context of scheduller1. The second map function and the subscribe function will be executed in the context of scheduller2. The program is still not desirable as the ros::spin() function is not reached.

Now we make a final change to the program:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxcpp::observable<>::interval (std::chrono::milliseconds (1000))
    | observe_on(scheduler1)
    | map ([] (int i) {return i*2;})
    | observe_on(scheduler2)
    | map ([] (int j) {return j*3;})
    | subscribe_on(scheduler3).subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

This program behaves as follows: Due to the subscribe_on call all observables and operations will execute in the context of scheduler3. However, the two observe_on calls will overrule this behavior so that the first map is executed in the context of scheduler1 and the second map is executed in the context of the scheduler2. If scheduler3 is executed in a thread different from the main thread then the ros::spin() function will execute in parallel with the interval observable.

To be continued.

@henrik7264
Copy link
Collaborator Author

The interval observable can take a scheduler as argument as follows:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxcpp::observable<>::interval (std::chrono::milliseconds (1000), scheduler)
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

In this case the interval observable will emit vales in the context of the scheduler. The map operators and subscribe function will likewise execute in the context of the scheduler and the ros::spin() function is called if the scheduler is executing in the a thread that is different from the main thread.

I have actually not tried this, but I shall try it out this weekend. The interface provided by the interval observable is a common way to construct observables. The interface for the interval observable looks something like this:

template<class Duration>
auto interval(Duration period)
...
template<class Duration, class Scheduler>
auto interval(Duration period, Scheduler scheduler)
...

This is actually the interface I would like to end up with in RxROS. We should have one interface that will execute in the context of the current scheduler (defined by observe_on or schedule_on) and an interface that specifies the scheduler to be used.

template<class T>
static auto from_topic(const std::string& topic)
...
template<class T, class Scheduler>
static auto from_topic(const std::string& topic, Scheduler scheduler)
...

But, it is unfortunately not that easy. To be continued.

@henrik7264
Copy link
Collaborator Author

So, suppose we had an interface (our goal) like

template<class T>
static auto from_topic(const std::string& topic)
...
template<class T, class Scheduler>
static auto from_topic(const std::string& topic, Scheduler scheduler)
...

and an implementaion of from_topic (simplified):

template<class T>
static auto from_topic(const std::string& topic)
{
    auto observable = rxcpp::observable<>::create<T>(
        [=](rxcpp::subscriber<T> subscriber) {
            auto callback = [=](const T &val) {subscriber.on_next(val);};
            ros::Subscriber ros_subscriber(node::get_handle().subscribe<T>(topic, 10, callback));
            ros::waitForShutdown();});
    return observable;
}
...
template<class T, class Scheduler>
static auto from_topic(const std::string& topic, Scheduler& scheduler)
{
    auto observable = rxcpp::observable<>::create<T>(
        [=](rxcpp::subscriber<T> subscriber) {
            auto callback = [=](const T &val) {subscriber.on_next(val);};
            ros::Subscriber ros_subscriber(node::get_handle().subscribe<T>(topic, 10, callback));
            ros::waitForShutdown();});
    return observable.subscribe_on(scheduler);
}

The from_topic then works as follows. The following code is

    auto observable = rxcpp::observable<>::create<T>(
        [=](rxcpp::subscriber<T> subscriber) {
           // Here goes the code that should be executed when we subscribe to the topic
        });
    return observable;

standard way of creating an observable. So when we execute something like

    auto obs = rxros::observable::from_topoic<int> ("/some_topic")

we actually get a (standard) observable as a result. This means we can subscribe to it as follows:

    auto obs = rxros::observable::from_topoic<int> ("/some_topic")
    obs.subscribe (
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

When we subscribe to it the "// Here goes the code that should be executed when we subscribe to the topic" is executed or more precisly the following code:

auto callback = [=](const T &val) {subscriber.on_next(val);};
ros::Subscriber ros_subscriber(node::get_handle().subscribe<T>(topic, 10, callback));
ros::waitForShutdown();});

What happens here is that we create a callback function and setup a ROS subscriber that calls the callback function each time the topic is updated. The callback function is rather simple it just forwards the value to the subscriber's on_next call, i.e. we are emitting a value.

So lets now look at the following code:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topoic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

This almost looks perfect, but it will not work because the following code

    rxros::observable::from_topoic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

is blocking the main thread due to the ros::waitForShutdown() call. The consequence is that the ros::spin() is never called.

OK, then we just remove the ros::waitForShutdown() call! This will also not work as the following code

auto callback = [=](const T &val) {subscriber.on_next(val);};
ros::Subscriber ros_subscriber(node::get_handle().subscribe<T>(topic, 10, callback));
//ros::waitForShutdown();});

would simply terminate immediately as soon as we subscribe to it.

OK, then we just call observe_on and execute the code in another thread

    rxros::observable::from_topoic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | observe_on(scheduler).subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

This will also not work. The only part that will execute in the schedulers context is the handler part, i.e the following code

[](int i){std::cout << "OnNext: " << v << std::endl;},
[](){std::cout << "OnCompleted" << std::endl;});

The only way to solve the problem is use subscribe_on:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topoic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe_on(scheduler).subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

or to use the other interface of from_topic:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topoic<int> ("/some_topic", scheduler)
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

In both these cases both the emitter and the handlers of the subscribe operator will be
executed in the context of the scheduler.

To be continued...

@henrik7264
Copy link
Collaborator Author

henrik7264 commented Nov 25, 2019

The current implementation of from_topic has the following interface (simplified)

template<class T>
static auto from_topic(const std::string& topic)

It has similarities with the standard/ideal interface discussed above

template<class T>
static auto from_topic(const std::string& topic)

template<class T, class Scheduler>
static auto from_topic(const std::string& topic, Scheduler scheduler)

but the implementation is different. The current version of from_topic is exactly equal to executing the ideal version of from_topic with rxcpp::synchronize_new_thread() as scheduler.

The advantages of this is that we make the choice for the programmer. It is simple and easy to use. The from_topic will always execute in the context of the rxcpp::synchronize_new_thread() scheduler. This scheduler always execute in a thread that is different from main thread and we are sure to reach the ros::spin function at the end of the main function.

The disadvantages is that we do not give the programmer a chance to choose the scheduler he may like to impose on the program and we deviate from providing a standard interface to an observable.

We have three choices here:

  1. Go with the current implementation knowing that it has limited functionality.
  2. Go with the standard/ideal interface knowing that it may be confusing to use with the ros::spin() function.
  3. Go with standard interface, but a hybrid implementation where the from_topic(const std::string& topic)
    behaves as the current implementation.

To be continued (we are almost finished - can you follow me) ...

@henrik7264
Copy link
Collaborator Author

henrik7264 commented Nov 27, 2019

The last part is about operators. The publish_to_topic operator has the following interface (simplified):

template<typename T>
auto publish_to_topic(const std::string &topic);

and the following implementation (simplified):

template<typename T>
auto publish_to_topic(const std::string &topic) {
    return [=](auto&& source) {
        ros::Publisher publisher(rxros::node::get_handle().advertise<T>(topic, 10));
        source.observe_on(rxcpp::synchronize_new_thread()).subscribe(
            [=](const T& msg) {publisher.publish(msg);});
        return source;};}

A standard opeator is constructed as follows:

auto myOperator(/* some arguments */) {
    return [=](auto&& source) {
        // Here goes the code that modifies the emitted values of the observable source.
        return source;};}

An operator is thus a higher order function or closure. The returned lambda expression/function takes as argument a source observable and return as result a new observable. Just for the record: The myOperator is a kind of identity function. It takes as input a source observable and returns it without any changes. Observe that this is how the publish_to_topic works.

Lets now look at what the publish_to_topic does with the source observable:

ros::Publisher publisher(rxros::node::get_handle().advertise<T>(topic, 10));
source.observe_on(rxcpp::synchronize_new_thread()).subscribe(
    [=](const T& msg) {publisher.publish(msg);});

It first creates a ros::Publisher object and then we subscribe to the source/input observable. This means that each time a event is emmited by the source observable it is published on the created ros::Publisher object. Observe that the source is observed on the scheduler rxcpp::synchronize_new_thread()). This means that handler part, i.e. publisher.publish(msg) is executed in a thread different from the main thread.

Lets now look at a the following piece of code:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topoic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | publish_to_topic<int>("/another_topic")

    ros::spin();
}

Will this code execute as expected?

@wasowski
Copy link
Member

Guys, give me a few days to read this :)

@henrik7264
Copy link
Collaborator Author

henrik7264 commented Dec 3, 2019

Well, the answer depends on how from_topic and publish_to_topic is implemented:

  1. If neither from_topic nor publish_to_topic use the subscribe_on then the execution will block and we will never reach the ros::spin function.

  2. If either from_topic or publish_to_topic use the subscribe_on the execution will take place in the context of the specified scheduler which normally is executing in a thread different from the main thread. This means that the ros::spin function is reached and the code works as expected. The observe_on operator can be used anywhere as long as there is one subscribe_on call.

  3. If both from_topic and publish_to_topic use the subscribe_on. This situation is actually a bit strange and not really desirable as it becomes unclear which scheduler is executing what part of the code. There should in general only be one call of the subscribe_on. This is at least a goal we should try to reach. The code will work as ros::spin() is reached, but we may have used an unnecessary number of threads and the code is not easy to understand as it is unclear in which context/thread the code actually is executing.

The standard way of creating operators like publish_to_topic is:

template<typename T>
auto publish_to_topic(const std::string &topic);

template<typename T, typename Scheduler>
auto publish_to_topic(const std::string &topic, Scheduler& scheduler);

Observe that this interface is almost identical to the standard interface we defined for from_topic. In normal situations publish_to_topic(const std::string &topic) would execute in the context of the current scheduler and publish_to_topic(const std::string &topic, Scheduler& scheduler) would execute in the
context of the specified scheduler (argument). I think this is the preferred solution for the publish_to_topic operator.

So, this is my understanding of how the observe_on and subscribe_on operators work. Does it give any meaning? The reason for this long discussion of the two operators is actually not caused by the operators themselves, but rather by the ros::spin() function that need to be called in order for the from_topic and
publish_to_topic to work as expected.

I will add one more comment to this discussion. Look at the following example:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | publish_to_topic<int>("/another_topic")
    | spin();
}

In this case the spin function has become an RxROS operator. I actually don't know if this is possible and what the implications of this is, but it is a taught that has been spinning around in my head for some time :-)

@henrik7264
Copy link
Collaborator Author

Well, the last idea is unfortunately nor realizable for some obvious reason. Look at this code:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | publish_to_topic<int>("/another_topic")
    | spin();

    rxros::observable::from_topic<int> ("/some_other_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | publish_to_topic<int>("/yet_another_topic")
    | spin();
}

This will not work at all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants