Skip to content

Alpaca Streaming API

Oleg Rakhmatulin edited this page Nov 29, 2021 · 18 revisions

All WebSocket-based streaming messages described here, here, and here is available for you via events of Alpaca.Markets.IAlpacaStreamingClient, Alpaca.Markets.IAlpacaDataStreamingClient, and Alpaca.Markets.IAlpacaCryptoDataStreamingClient interfaces respectively. All these events provide you read-only interfaces for related JSON objects.

For creating an instance of each client interface we recommend using special extension methods of the IEnvironment interface designed for this task.

For Alpaca.Markets.IAlpacaStreamingClient interface:

var client = Environments.Paper.GetAlpacaStreamingClient(new SecretKey(KEY_ID, SECRET_KEY));

For Alpaca.Markets.IAlpacaDataStreamingClient interface:

var client = Environments.Paper.AlpacaDataStreamingClient(new SecretKey(KEY_ID, SECRET_KEY));

For Alpaca.Markets.IAlpacaCryptoDataStreamingClient interface:

var client = Environments.Paper.AlpacaCryptoDataStreamingClient(new SecretKey(KEY_ID, SECRET_KEY));

You can read more about these extension methods on this Wiki page.

General Usage Patterns

You have to call the ConnectAsync method for connecting the stream and the DisconnectAsync method for disconnecting. It also would be very helpful to free resources used by the instance of Alpaca.Markets.IAlpacaStreamingClient or Alpaca.Markets.IAlpacaDataStreamingClient objects using standard .NET System.IDisposable pattern. Another helpful method is ConnectAndAuthenticateAsync - you can use it if you do not want to track connected and authenticated states separately.

Handling Trading Events

The Alpaca.Markets.IAlpacaStreamingClient interface provides single OnTradeUpdate event occurred when a new trade (order) update received from the stream.

As soon as a client will connect and successfully authenticate you will receive these events. You do not need to call any special methods for enabling/disabling these events - just subscribe to them as usual .NET events.

Subscribing Realtime Data

The Alpaca.Markets.IAlpacaDataStreamingClient interface uses another approach for active subscription management. Each active subscription represented as an instance of the Alpaca.Markets.IAlpacaDataSubscription generic interface, instantiated with a specific update type.

The next code snippet demonstrates this process in detail - create client, create subscriptions, subscribe events, wait for events, close connection:

using var client = Environments.Live
    .GetAlpacaDataStreamingClient(new SecretKey(KEY_ID, SECRET_KEY));

await client.ConnectAndAuthenticateAsync();

var waitObjects = new []
{
    new System.Threading.AutoResetEvent(false),
    new System.Threading.AutoResetEvent(false)
};

var tradeSubscription = client.GetTradeSubscription("AAPL");
tradeSubscription.Received += (trade) =>
{
    Console.WriteLine($"Trade received for the {trade.Symbol} contract");
    waitObjects[0].Set();
};

var quoteSubscription = client.GetQuoteSubscription("AAPL");
quoteSubscription.Received += (quote) =>
{
    Console.WriteLine($"Quote received for the {quote.Symbol} contract");
    waitObjects[1].Set();
};

await client.SubscribeAsync(tradeSubscription);
await client.SubscribeAsync(quoteSubscription);

System.Threading.WaitHandle.WaitAll(waitObjects, TimeSpan.FromSeconds(10));

await client.UnsubscribeAsync(tradeSubscription);
await client.UnsubscribeAsync(quoteSubscription);

await client.DisconnectAsync();

The IAsyncEnumerable Support

The Alpaca.Markets.Extensions package contains helper interfaces (and related implementations) and methods for more simple real-time data events consuming in the asynchronous code:

The next code snippet demonstrates both features in action:

using var client = Environments.Live
    .GetAlpacaDataStreamingClient(new SecretKey(KEY_ID, SECRET_KEY));

await client.ConnectAndAuthenticateAsync();

var cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(15));

await using var subscription = await client.SubscribeTradeAsync(Symbol);

await foreach (var trade in subscription
    .AsAsyncEnumerable(cancellationTokenSource.Token)
    .ConfigureAwait(false))
{
    Console.WriteLine($"Trade received for the {trade.Symbol} contract");
}

await client.DisconnectAsync();

If the C# compiler tells you that the SubscribeTradeAsync method doesn't exist it means that you've just forgotten to add the next line into your C# file using section:

using Alpaca.Markets.Extensions;

You shouldn't call Subscribe and Unsubscribe methods manually - they will be called for you (in the implementation class constructor and inside the DisposeAsync() method) automatically. And keep in mind that C# await foreach statement will not execute any code behind the loop body until enumerable completion - in this example, it will happen after 15 seconds, but you can use the cancellation token received from the outer code.