diff --git a/src/prime_server.cpp b/src/prime_server.cpp index 1aee966..3231675 100644 --- a/src/prime_server.cpp +++ b/src/prime_server.cpp @@ -144,7 +144,7 @@ server_t::server_t( uint32_t request_timeout, const health_check_matcher_t& health_check_matcher, const std::string& health_check_response) - : client(context, ZMQ_STREAM), proxy(context, ZMQ_DEALER), loopback(context, ZMQ_SUB), + : client(context, ZMQ_STREAM), proxy(context, ZMQ_DEALER), loopback(context, ZMQ_PULL), interrupt(context, ZMQ_PUB), log(log), max_request_size(max_request_size), request_timeout(request_timeout), request_id(0), health_check_matcher(health_check_matcher), health_check_response(health_check_response.size(), health_check_response.data()) { @@ -158,9 +158,6 @@ server_t::server_t( proxy.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); proxy.connect(proxy_endpoint.c_str()); - // TODO: consider making this into a pull socket so we dont lose any results due to timing - loopback.setsockopt(ZMQ_RCVHWM, &disabled, sizeof(disabled)); - loopback.setsockopt(ZMQ_SUBSCRIBE, "", 0); loopback.bind(result_endpoint.c_str()); interrupt.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); @@ -468,7 +465,7 @@ worker_t::worker_t(zmq::context_t& context, const cleanup_function_t& cleanup_function, const std::string& heart_beat) : upstream_proxy(context, ZMQ_DEALER), downstream_proxy(context, ZMQ_DEALER), - loopback(context, ZMQ_PUB), interrupt(context, ZMQ_SUB), work_function(work_function), + loopback(context, ZMQ_PUSH), interrupt(context, ZMQ_SUB), work_function(work_function), cleanup_function(cleanup_function), heart_beat_interval(5000), heart_beat(heart_beat), job(std::numeric_limits::max()) { @@ -482,7 +479,6 @@ worker_t::worker_t(zmq::context_t& context, downstream_proxy.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); downstream_proxy.connect(downstream_proxy_endpoint.c_str()); - loopback.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled)); loopback.connect(result_endpoint.c_str()); interrupt.setsockopt(ZMQ_RCVHWM, &disabled, sizeof(disabled));