Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
raviqqe committed Jul 16, 2024
1 parent 6d86764 commit 020e89d
Showing 1 changed file with 2 additions and 6 deletions.
8 changes: 2 additions & 6 deletions src/prime_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ server_t<request_container_t, request_info_t>::server_t(
uint32_t request_timeout,
const health_check_matcher_t& health_check_matcher,
const std::string& health_check_response)
: client(context, ZMQ_STREAM), proxy(context, ZMQ_DEALER), loopback(context, ZMQ_SUB),
: client(context, ZMQ_STREAM), proxy(context, ZMQ_DEALER), loopback(context, ZMQ_PULL),
interrupt(context, ZMQ_PUB), log(log), max_request_size(max_request_size),
request_timeout(request_timeout), request_id(0), health_check_matcher(health_check_matcher),
health_check_response(health_check_response.size(), health_check_response.data()) {
Expand All @@ -158,9 +158,6 @@ server_t<request_container_t, request_info_t>::server_t(
proxy.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled));
proxy.connect(proxy_endpoint.c_str());

// TODO: consider making this into a pull socket so we dont lose any results due to timing
loopback.setsockopt(ZMQ_RCVHWM, &disabled, sizeof(disabled));
loopback.setsockopt(ZMQ_SUBSCRIBE, "", 0);
loopback.bind(result_endpoint.c_str());

interrupt.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled));
Expand Down Expand Up @@ -468,7 +465,7 @@ worker_t::worker_t(zmq::context_t& context,
const cleanup_function_t& cleanup_function,
const std::string& heart_beat)
: upstream_proxy(context, ZMQ_DEALER), downstream_proxy(context, ZMQ_DEALER),
loopback(context, ZMQ_PUB), interrupt(context, ZMQ_SUB), work_function(work_function),
loopback(context, ZMQ_PUSH), interrupt(context, ZMQ_SUB), work_function(work_function),
cleanup_function(cleanup_function), heart_beat_interval(5000), heart_beat(heart_beat),
job(std::numeric_limits<decltype(job)>::max()) {

Expand All @@ -482,7 +479,6 @@ worker_t::worker_t(zmq::context_t& context,
downstream_proxy.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled));
downstream_proxy.connect(downstream_proxy_endpoint.c_str());

loopback.setsockopt(ZMQ_SNDHWM, &disabled, sizeof(disabled));
loopback.connect(result_endpoint.c_str());

interrupt.setsockopt(ZMQ_RCVHWM, &disabled, sizeof(disabled));
Expand Down

0 comments on commit 020e89d

Please sign in to comment.