Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions .github/workflows/pr_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,3 @@ jobs:

- name: Run integration tests
run: cargo test --test integration_test --verbose

version-check:
name: Version Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Check if version was bumped
run: |
git fetch origin main
MAIN_VERSION=$(git show origin/main:Cargo.toml | grep -m 1 'version = ' | cut -d '"' -f 2)
PR_VERSION=$(grep -m 1 'version = ' Cargo.toml | cut -d '"' -f 2)
if [ "$MAIN_VERSION" = "$PR_VERSION" ]; then
echo "Error: Version in Cargo.toml has not been incremented"
exit 1
fi
echo "Version was bumped from $MAIN_VERSION to $PR_VERSION"
57 changes: 0 additions & 57 deletions .github/workflows/publish.yml

This file was deleted.

52 changes: 52 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Release

on:
push:
branches:
- main # Or your default branch

jobs:
release:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
# Need full history for cargo-release and fetch tags
fetch-depth: 0
# Use a token with write access to push back the version commit/tag
# You might need a Personal Access Token (PAT) if branch protection is strict
token: ${{ secrets.PAT_FOR_RELEASE || secrets.GITHUB_TOKEN }}

- name: Setup Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true

- name: Install cargo-release
run: cargo install cargo-release

- name: Configure Git user
run: |
git config user.name "GitHub Actions Bot"
git config user.email "actions@github.com"

- name: Run cargo release (Patch Bump)
env:
# Token needed by cargo-release to push commit/tag
GITHUB_TOKEN: ${{ secrets.PAT_FOR_RELEASE || secrets.GITHUB_TOKEN }}
# Token needed for publishing to crates.io
CARGO_REGISTRY_TOKEN: ${{ secrets.CRATES_IO_TOKEN }}
run: |
# --workspace handles bumping version for all members if needed
# --no-confirm skips interactive prompts
# --execute performs the actions (remove for dry-run)
# 'patch' specifies the version bump level (can be minor, major, etc.)
# cargo-release will bump version, commit, tag, push, and publish
cargo release --workspace patch --no-confirm --execute

# If you prefer to separate push and publish:
# cargo release --workspace patch --no-publish --no-push --no-tag --no-confirm --execute # Just bumps version and commits locally
# git push --follow-tags # Push commit and generated tag
# cargo publish --token ${CARGO_REGISTRY_TOKEN}
2 changes: 1 addition & 1 deletion benches/engine_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn bench_concurrent_processing(c: &mut Criterion) {
});
}

while let Some(_) = futures::StreamExt::next(&mut futures).await {}
while (futures::StreamExt::next(&mut futures).await).is_some() {}

assert_eq!(
executor
Expand Down
60 changes: 25 additions & 35 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl<E, A> Engine<E, A> {
///
/// Use [`Engine::with_event_channel_capacity`] and [`Engine::with_action_channel_capacity`]
/// to customize these values.
#[must_use]
pub fn new() -> Self {
Self {
collectors: vec![],
Expand All @@ -145,6 +146,7 @@ impl<E, A> Engine<E, A> {
/// # Arguments
///
/// * `capacity` - The maximum number of events that can be buffered
#[must_use]
pub fn with_event_channel_capacity(mut self, capacity: usize) -> Self {
self.event_channel_capacity = capacity;
self
Expand All @@ -158,6 +160,7 @@ impl<E, A> Engine<E, A> {
/// # Arguments
///
/// * `capacity` - The maximum number of actions that can be buffered
#[must_use]
pub fn with_action_channel_capacity(mut self, capacity: usize) -> Self {
self.action_channel_capacity = capacity;
self
Expand Down Expand Up @@ -202,27 +205,14 @@ where
self.executors.push(executor);
}

/// Starts the engine and returns a set of tasks that can be awaited.
///
/// This method:
/// 1. Creates channels for events and actions
/// 2. Spawns tasks for each collector, strategy, and executor
/// 3. Returns a [`JoinSet`] containing all spawned tasks
///
/// The engine will continue running until one of:
/// - A collector's event stream ends
/// - A fatal error occurs
/// - The returned [`JoinSet`] is dropped
///
/// # Returns
///
/// A [`JoinSet`] containing all spawned tasks. The caller should await this
/// set to keep the engine running.
///
/// Runs the bot engine, starting all components and returning a `JoinSet` for task management.
///
/// This function consumes the `Engine` instance.
///
/// # Errors
///
/// This method will return an error if any strategy fails to sync its initial
/// state.
///
/// Returns an error if any strategy fails its initial `sync_state` call.
#[allow(clippy::too_many_lines)]
pub async fn run(self) -> Result<JoinSet<()>> {
let (event_sender, _): (Sender<E>, _) = broadcast::channel(self.event_channel_capacity);
let (action_sender, _): (Sender<A>, _) = broadcast::channel(self.action_channel_capacity);
Expand All @@ -231,13 +221,13 @@ where

// Spawn executors in separate threads.
for (idx, executor) in self.executors.into_iter().enumerate() {
let mut receiver = action_sender.subscribe();
let executor_label = format!("executor_{}", idx);
let mut action_receiver = action_sender.subscribe();
let executor_label = format!("executor_{idx}");

set.spawn(async move {
info!("starting executor... ");
loop {
match receiver.recv().await {
match action_receiver.recv().await {
Ok(action) => {
let start = Instant::now();
if let Err(e) = executor.execute(action).await {
Expand All @@ -256,7 +246,7 @@ where
Err(e) => {
METRICS.record_error(&executor_label, "channel_error");
error!(
error = %BotError::channel_error(format!("Failed to receive action: {}", e)),
error = %BotError::channel_error(format!("Failed to receive action: {e}")),
"channel error"
);
}
Expand All @@ -269,7 +259,7 @@ where
for (idx, mut strategy) in self.strategies.into_iter().enumerate() {
let mut event_receiver = event_sender.subscribe();
let action_sender = action_sender.clone();
let strategy_label = format!("strategy_{}", idx);
let strategy_label = format!("strategy_{idx}");

strategy.sync_state().await.map_err(|e| {
METRICS.record_error(&strategy_label, "sync_error");
Expand All @@ -291,19 +281,19 @@ where
if let Err(e) = action_sender.send(action) {
METRICS.record_error(&strategy_label, "channel_error");
error!(
error = %BotError::channel_error(format!("Failed to send action: {}", e)),
error = %BotError::channel_error(format!("Failed to send action: {e}")),
"channel error"
);
}
}

// Update queue size metrics
METRICS.update_action_queue_size(&strategy_label, action_sender.len() as i64);
METRICS.update_action_queue_size(&strategy_label, action_sender.len().try_into().unwrap_or(i64::MAX));
}
Err(e) => {
METRICS.record_error(&strategy_label, "channel_error");
error!(
error = %BotError::channel_error(format!("Failed to receive event: {}", e)),
error = %BotError::channel_error(format!("Failed to receive event: {e}")),
"channel error"
);
}
Expand All @@ -315,7 +305,7 @@ where
// Spawn collectors in separate threads.
for (idx, collector) in self.collectors.into_iter().enumerate() {
let event_sender = event_sender.clone();
let collector_label = format!("collector_{}", idx);
let collector_label = format!("collector_{idx}");

set.spawn(async move {
info!("starting collector... ");
Expand All @@ -335,14 +325,14 @@ where
if let Err(e) = event_sender.send(event) {
METRICS.record_error(&collector_label, "channel_error");
error!(
error = %BotError::channel_error(format!("Failed to send event: {}", e)),
error = %BotError::channel_error(format!("Failed to send event: {e}")),
"channel error"
);
} else {
METRICS.inc_events_processed(&collector_label);

// Update queue size metrics
METRICS.update_event_queue_size(&collector_label, event_sender.len() as i64);
METRICS.update_event_queue_size(&collector_label, event_sender.len().try_into().unwrap_or(i64::MAX));
}
}
});
Expand Down Expand Up @@ -464,7 +454,7 @@ mod tests {
let result = engine.run().await;
assert!(result.is_err());
if let Err(BotError::StrategyError { message, .. }) = result {
assert!(message.contains("Failed to sync strategy state"), "Unexpected error message: {}", message);
assert!(message.contains("Failed to sync strategy state"), "Unexpected error message: {message}");
} else {
panic!("Expected StrategyError");
}
Expand All @@ -489,7 +479,7 @@ mod tests {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// The executor should have logged errors but continued running
assert!(join_set.len() > 0);
assert!(!join_set.is_empty());
}

#[tokio::test]
Expand Down Expand Up @@ -520,7 +510,7 @@ mod tests {
assert!(!actions.is_empty());

// The engine should still be running
assert!(join_set.len() > 0);
assert!(!join_set.is_empty());
}

#[tokio::test]
Expand Down Expand Up @@ -576,6 +566,6 @@ mod tests {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// The strategy should continue running but not produce any actions
assert!(join_set.len() > 0);
assert!(!join_set.is_empty());
}
}
15 changes: 15 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod metrics_impl {
register_histogram_vec, register_int_counter_vec, register_int_gauge_vec, HistogramVec,
IntCounterVec, IntGaugeVec,
};
use prometheus_client::metrics::counter::Counter;
use std::sync::atomic::AtomicU64;

/// Metrics collection and reporting for the bot engine.
///
Expand Down Expand Up @@ -163,6 +165,12 @@ mod metrics_impl {
}
}

impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}

/// The global metrics instance used throughout the crate.
///
/// This instance provides access to all Prometheus metrics collectors:
Expand All @@ -188,8 +196,15 @@ mod metrics_stub {
/// to facilitate testing without requiring a real metrics backend.
pub struct Metrics;

impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}

impl Metrics {
/// Creates a new mock metrics instance.
#[must_use]
pub fn new() -> Self {
Metrics
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ async fn test_concurrent_processing() {
}

// Wait for all processing to complete
while let Some(_) = futures.next().await {}
while (futures.next().await).is_some() {}

// Verify all trades were executed
let trades = executed_trades.lock().await;
Expand Down