Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Aug 14, 2023
1 parent 1f2607e commit a13fb4d
Showing 1 changed file with 66 additions and 80 deletions.
146 changes: 66 additions & 80 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4316,91 +4316,77 @@ pub fn serve<T: BeaconChainTypes>(
.then(
|topics_res: Result<api_types::EventQuery, warp::Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| async {
let res = task_spawner
.blocking_response_task(Priority::P0, move || {
let topics = topics_res?;
// for each topic subscribed spawn a new subscription
let mut receivers = Vec::with_capacity(topics.topics.len());

if let Some(event_handler) = chain.event_handler.as_ref() {
for topic in topics.topics {
let receiver = match topic {
api_types::EventTopic::Head => event_handler.subscribe_head(),
api_types::EventTopic::Block => event_handler.subscribe_block(),
api_types::EventTopic::Attestation => {
event_handler.subscribe_attestation()
}
api_types::EventTopic::VoluntaryExit => {
event_handler.subscribe_exit()
}
api_types::EventTopic::FinalizedCheckpoint => {
event_handler.subscribe_finalized()
}
api_types::EventTopic::ChainReorg => {
event_handler.subscribe_reorgs()
}
api_types::EventTopic::ContributionAndProof => {
event_handler.subscribe_contributions()
}
api_types::EventTopic::PayloadAttributes => {
event_handler.subscribe_payload_attributes()
}
api_types::EventTopic::LateHead => {
event_handler.subscribe_late_head()
}
api_types::EventTopic::BlockReward => {
event_handler.subscribe_block_reward()
}
};
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_response_task(Priority::P0, move || {
let topics = topics_res?;
// for each topic subscribed spawn a new subscription
let mut receivers = Vec::with_capacity(topics.topics.len());

if let Some(event_handler) = chain.event_handler.as_ref() {
for topic in topics.topics {
let receiver = match topic {
api_types::EventTopic::Head => event_handler.subscribe_head(),
api_types::EventTopic::Block => event_handler.subscribe_block(),
api_types::EventTopic::Attestation => {
event_handler.subscribe_attestation()
}
api_types::EventTopic::VoluntaryExit => {
event_handler.subscribe_exit()
}
api_types::EventTopic::FinalizedCheckpoint => {
event_handler.subscribe_finalized()
}
api_types::EventTopic::ChainReorg => {
event_handler.subscribe_reorgs()
}
api_types::EventTopic::ContributionAndProof => {
event_handler.subscribe_contributions()
}
api_types::EventTopic::PayloadAttributes => {
event_handler.subscribe_payload_attributes()
}
api_types::EventTopic::LateHead => {
event_handler.subscribe_late_head()
}
api_types::EventTopic::BlockReward => {
event_handler.subscribe_block_reward()
}
};

receivers.push(
BroadcastStream::new(receiver)
.map(|msg| {
match msg {
Ok(data) => Event::default()
.event(data.topic_name())
.json_data(data)
.unwrap_or_else(|e| {
Event::default().comment(format!(
"error - bad json: {e:?}"
))
}),
// Do not terminate the stream if the channel fills
// up. Just drop some messages and send a comment to
// the client.
Err(BroadcastStreamRecvError::Lagged(n)) => {
Event::default().comment(format!(
"error - dropped {n} messages"
))
}
receivers.push(
BroadcastStream::new(receiver)
.map(|msg| {
match msg {
Ok(data) => Event::default()
.event(data.topic_name())
.json_data(data)
.unwrap_or_else(|e| {
Event::default()
.comment(format!("error - bad json: {e:?}"))
}),
// Do not terminate the stream if the channel fills
// up. Just drop some messages and send a comment to
// the client.
Err(BroadcastStreamRecvError::Lagged(n)) => {
Event::default().comment(format!(
"error - dropped {n} messages"
))
}
})
.map(|event| Ok::<_, std::convert::Infallible>(event)),
);
}
} else {
return Err(warp_utils::reject::custom_server_error(
"event handler was not initialized".to_string(),
));
}
})
.map(Ok::<_, std::convert::Infallible>),
);
}
} else {
return Err(warp_utils::reject::custom_server_error(
"event handler was not initialized".to_string(),
));
}

let s = futures::stream::select_all(receivers);
let s = futures::stream::select_all(receivers);

Ok(warp::sse::reply(warp::sse::keep_alive().stream(s)))
})
.await;
match res {
Ok(response) => response,
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
StatusCode::INTERNAL_SERVER_ERROR,
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
Ok(warp::sse::reply(warp::sse::keep_alive().stream(s)))
})
},
);

Expand Down

0 comments on commit a13fb4d

Please sign in to comment.