diff --git a/.github/workflows/pr_checks.yml b/.github/workflows/pr_checks.yml index 45dc8a3..ae4d2eb 100644 --- a/.github/workflows/pr_checks.yml +++ b/.github/workflows/pr_checks.yml @@ -25,22 +25,3 @@ jobs: - name: Run integration tests run: cargo test --test integration_test --verbose - - version-check: - name: Version Check - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - with: - fetch-depth: 0 - - - name: Check if version was bumped - run: | - git fetch origin main - MAIN_VERSION=$(git show origin/main:Cargo.toml | grep -m 1 'version = ' | cut -d '"' -f 2) - PR_VERSION=$(grep -m 1 'version = ' Cargo.toml | cut -d '"' -f 2) - if [ "$MAIN_VERSION" = "$PR_VERSION" ]; then - echo "Error: Version in Cargo.toml has not been incremented" - exit 1 - fi - echo "Version was bumped from $MAIN_VERSION to $PR_VERSION" diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml deleted file mode 100644 index f0721b3..0000000 --- a/.github/workflows/publish.yml +++ /dev/null @@ -1,57 +0,0 @@ -name: Publish - -on: - push: - branches: [main] - -# Add these permissions -permissions: - contents: write - -env: - CARGO_TERM_COLOR: always - -jobs: - publish: - name: Publish to crates.io - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Install Rust toolchain - uses: dtolnay/rust-toolchain@stable - - - name: Rust Cache - uses: Swatinem/rust-cache@v2 - - - name: Run tests - run: cargo test --verbose - - - name: Run integration tests - run: cargo test --test integration_test --verbose - - - name: Get version - id: get_version - run: | - VERSION=$(grep -m 1 'version = ' Cargo.toml | cut -d '"' -f 2) - echo "version=$VERSION" >> $GITHUB_OUTPUT - - - name: Create Release - id: create_release - uses: actions/create-release@v1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - tag_name: v${{ steps.get_version.outputs.version }} - release_name: Release v${{ steps.get_version.outputs.version }} - body: | - Release of version ${{ steps.get_version.outputs.version }} - - See [CHANGELOG.md](./CHANGELOG.md) for details. - draft: false - prerelease: false - - - name: Publish to crates.io - run: cargo publish --token ${CRATES_TOKEN} - env: - CRATES_TOKEN: ${{ secrets.CRATES_TOKEN }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..2f762c9 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,52 @@ +name: Release + +on: + push: + branches: + - main # Or your default branch + +jobs: + release: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + # Need full history for cargo-release and fetch tags + fetch-depth: 0 + # Use a token with write access to push back the version commit/tag + # You might need a Personal Access Token (PAT) if branch protection is strict + token: ${{ secrets.PAT_FOR_RELEASE || secrets.GITHUB_TOKEN }} + + - name: Setup Rust toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + + - name: Install cargo-release + run: cargo install cargo-release + + - name: Configure Git user + run: | + git config user.name "GitHub Actions Bot" + git config user.email "actions@github.com" + + - name: Run cargo release (Patch Bump) + env: + # Token needed by cargo-release to push commit/tag + GITHUB_TOKEN: ${{ secrets.PAT_FOR_RELEASE || secrets.GITHUB_TOKEN }} + # Token needed for publishing to crates.io + CARGO_REGISTRY_TOKEN: ${{ secrets.CRATES_IO_TOKEN }} + run: | + # --workspace handles bumping version for all members if needed + # --no-confirm skips interactive prompts + # --execute performs the actions (remove for dry-run) + # 'patch' specifies the version bump level (can be minor, major, etc.) + # cargo-release will bump version, commit, tag, push, and publish + cargo release --workspace patch --no-confirm --execute + + # If you prefer to separate push and publish: + # cargo release --workspace patch --no-publish --no-push --no-tag --no-confirm --execute # Just bumps version and commits locally + # git push --follow-tags # Push commit and generated tag + # cargo publish --token ${CARGO_REGISTRY_TOKEN} diff --git a/benches/engine_bench.rs b/benches/engine_bench.rs index 4ee5d8d..36be712 100644 --- a/benches/engine_bench.rs +++ b/benches/engine_bench.rs @@ -136,7 +136,7 @@ fn bench_concurrent_processing(c: &mut Criterion) { }); } - while let Some(_) = futures::StreamExt::next(&mut futures).await {} + while (futures::StreamExt::next(&mut futures).await).is_some() {} assert_eq!( executor diff --git a/src/engine.rs b/src/engine.rs index 7fdd17c..69a34ee 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -127,6 +127,7 @@ impl Engine { /// /// Use [`Engine::with_event_channel_capacity`] and [`Engine::with_action_channel_capacity`] /// to customize these values. + #[must_use] pub fn new() -> Self { Self { collectors: vec![], @@ -145,6 +146,7 @@ impl Engine { /// # Arguments /// /// * `capacity` - The maximum number of events that can be buffered + #[must_use] pub fn with_event_channel_capacity(mut self, capacity: usize) -> Self { self.event_channel_capacity = capacity; self @@ -158,6 +160,7 @@ impl Engine { /// # Arguments /// /// * `capacity` - The maximum number of actions that can be buffered + #[must_use] pub fn with_action_channel_capacity(mut self, capacity: usize) -> Self { self.action_channel_capacity = capacity; self @@ -202,27 +205,14 @@ where self.executors.push(executor); } - /// Starts the engine and returns a set of tasks that can be awaited. - /// - /// This method: - /// 1. Creates channels for events and actions - /// 2. Spawns tasks for each collector, strategy, and executor - /// 3. Returns a [`JoinSet`] containing all spawned tasks - /// - /// The engine will continue running until one of: - /// - A collector's event stream ends - /// - A fatal error occurs - /// - The returned [`JoinSet`] is dropped - /// - /// # Returns - /// - /// A [`JoinSet`] containing all spawned tasks. The caller should await this - /// set to keep the engine running. - /// + /// Runs the bot engine, starting all components and returning a `JoinSet` for task management. + /// + /// This function consumes the `Engine` instance. + /// /// # Errors - /// - /// This method will return an error if any strategy fails to sync its initial - /// state. + /// + /// Returns an error if any strategy fails its initial `sync_state` call. + #[allow(clippy::too_many_lines)] pub async fn run(self) -> Result> { let (event_sender, _): (Sender, _) = broadcast::channel(self.event_channel_capacity); let (action_sender, _): (Sender, _) = broadcast::channel(self.action_channel_capacity); @@ -231,13 +221,13 @@ where // Spawn executors in separate threads. for (idx, executor) in self.executors.into_iter().enumerate() { - let mut receiver = action_sender.subscribe(); - let executor_label = format!("executor_{}", idx); + let mut action_receiver = action_sender.subscribe(); + let executor_label = format!("executor_{idx}"); set.spawn(async move { info!("starting executor... "); loop { - match receiver.recv().await { + match action_receiver.recv().await { Ok(action) => { let start = Instant::now(); if let Err(e) = executor.execute(action).await { @@ -256,7 +246,7 @@ where Err(e) => { METRICS.record_error(&executor_label, "channel_error"); error!( - error = %BotError::channel_error(format!("Failed to receive action: {}", e)), + error = %BotError::channel_error(format!("Failed to receive action: {e}")), "channel error" ); } @@ -269,7 +259,7 @@ where for (idx, mut strategy) in self.strategies.into_iter().enumerate() { let mut event_receiver = event_sender.subscribe(); let action_sender = action_sender.clone(); - let strategy_label = format!("strategy_{}", idx); + let strategy_label = format!("strategy_{idx}"); strategy.sync_state().await.map_err(|e| { METRICS.record_error(&strategy_label, "sync_error"); @@ -291,19 +281,19 @@ where if let Err(e) = action_sender.send(action) { METRICS.record_error(&strategy_label, "channel_error"); error!( - error = %BotError::channel_error(format!("Failed to send action: {}", e)), + error = %BotError::channel_error(format!("Failed to send action: {e}")), "channel error" ); } } // Update queue size metrics - METRICS.update_action_queue_size(&strategy_label, action_sender.len() as i64); + METRICS.update_action_queue_size(&strategy_label, action_sender.len().try_into().unwrap_or(i64::MAX)); } Err(e) => { METRICS.record_error(&strategy_label, "channel_error"); error!( - error = %BotError::channel_error(format!("Failed to receive event: {}", e)), + error = %BotError::channel_error(format!("Failed to receive event: {e}")), "channel error" ); } @@ -315,7 +305,7 @@ where // Spawn collectors in separate threads. for (idx, collector) in self.collectors.into_iter().enumerate() { let event_sender = event_sender.clone(); - let collector_label = format!("collector_{}", idx); + let collector_label = format!("collector_{idx}"); set.spawn(async move { info!("starting collector... "); @@ -335,14 +325,14 @@ where if let Err(e) = event_sender.send(event) { METRICS.record_error(&collector_label, "channel_error"); error!( - error = %BotError::channel_error(format!("Failed to send event: {}", e)), + error = %BotError::channel_error(format!("Failed to send event: {e}")), "channel error" ); } else { METRICS.inc_events_processed(&collector_label); // Update queue size metrics - METRICS.update_event_queue_size(&collector_label, event_sender.len() as i64); + METRICS.update_event_queue_size(&collector_label, event_sender.len().try_into().unwrap_or(i64::MAX)); } } }); @@ -464,7 +454,7 @@ mod tests { let result = engine.run().await; assert!(result.is_err()); if let Err(BotError::StrategyError { message, .. }) = result { - assert!(message.contains("Failed to sync strategy state"), "Unexpected error message: {}", message); + assert!(message.contains("Failed to sync strategy state"), "Unexpected error message: {message}"); } else { panic!("Expected StrategyError"); } @@ -489,7 +479,7 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; // The executor should have logged errors but continued running - assert!(join_set.len() > 0); + assert!(!join_set.is_empty()); } #[tokio::test] @@ -520,7 +510,7 @@ mod tests { assert!(!actions.is_empty()); // The engine should still be running - assert!(join_set.len() > 0); + assert!(!join_set.is_empty()); } #[tokio::test] @@ -576,6 +566,6 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; // The strategy should continue running but not produce any actions - assert!(join_set.len() > 0); + assert!(!join_set.is_empty()); } } diff --git a/src/metrics.rs b/src/metrics.rs index 5b40cda..62701b1 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -7,6 +7,8 @@ mod metrics_impl { register_histogram_vec, register_int_counter_vec, register_int_gauge_vec, HistogramVec, IntCounterVec, IntGaugeVec, }; + use prometheus_client::metrics::counter::Counter; + use std::sync::atomic::AtomicU64; /// Metrics collection and reporting for the bot engine. /// @@ -163,6 +165,12 @@ mod metrics_impl { } } + impl Default for Metrics { + fn default() -> Self { + Self::new() + } + } + /// The global metrics instance used throughout the crate. /// /// This instance provides access to all Prometheus metrics collectors: @@ -188,8 +196,15 @@ mod metrics_stub { /// to facilitate testing without requiring a real metrics backend. pub struct Metrics; + impl Default for Metrics { + fn default() -> Self { + Self::new() + } + } + impl Metrics { /// Creates a new mock metrics instance. + #[must_use] pub fn new() -> Self { Metrics } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 50828f0..ef600e1 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -259,7 +259,7 @@ async fn test_concurrent_processing() { } // Wait for all processing to complete - while let Some(_) = futures.next().await {} + while (futures.next().await).is_some() {} // Verify all trades were executed let trades = executed_trades.lock().await;