-
-
Notifications
You must be signed in to change notification settings - Fork 81
Alpaca Streaming API
All WebSocket-based streaming messages described here, here, and here is available for you via events of Alpaca.Markets.IAlpacaStreamingClient, Alpaca.Markets.IAlpacaDataStreamingClient, and Alpaca.Markets.IAlpacaCryptoDataStreamingClient interfaces respectively. All these events provide you read-only interfaces for related JSON objects.
For creating an instance of each client interface we recommend using special extension methods of the IEnvironment interface designed for this task.
For Alpaca.Markets.IAlpacaStreamingClient interface:
var client = Environments.Paper.GetAlpacaStreamingClient(new SecretKey(KEY_ID, SECRET_KEY));
For Alpaca.Markets.IAlpacaDataStreamingClient interface:
var client = Environments.Paper.AlpacaDataStreamingClient(new SecretKey(KEY_ID, SECRET_KEY));
For Alpaca.Markets.IAlpacaCryptoDataStreamingClient interface:
var client = Environments.Paper.AlpacaCryptoDataStreamingClient(new SecretKey(KEY_ID, SECRET_KEY));
You can read more about these extension methods on this Wiki page.
You have to call the ConnectAsync method for connecting the stream and the DisconnectAsync method for disconnecting. It also would be very helpful to free resources used by the instance of Alpaca.Markets.IAlpacaStreamingClient or Alpaca.Markets.IAlpacaDataStreamingClient objects using standard .NET System.IDisposable pattern. Another helpful method is ConnectAndAuthenticateAsync - you can use it if you do not want to track connected and authenticated states separately.
The Alpaca.Markets.IAlpacaStreamingClient interface provides single OnTradeUpdate event occurred when a new trade (order) update received from the stream.
As soon as a client will connect and successfully authenticate you will receive these events. You do not need to call any special methods for enabling/disabling these events - just subscribe to them as usual .NET events.
The Alpaca.Markets.IAlpacaDataStreamingClient interface uses another approach for active subscription management. Each active subscription represented as an instance of the Alpaca.Markets.IAlpacaDataSubscription generic interface, instantiated with a specific update type.
- You have to obtain such a subscription object using the GetTradeSubscription, GetQuoteSubscription, GetMinuteBarSubscription or GetDailyBarSubscription method.
- Each subscription object exposes the Received event with a specific update type (ITrade for trade subscription, IQuote for quote subscription, IBar for a minute/daily bars subscription) - you should subscribe to this .NET event for receiving notifications from the streaming client.
- All subscription objects for newly created Alpaca.Markets.IAlpacaDataStreamingClient class is in the unsubscribed state regardless of .NET event invocations list size.
- You can change the state of a single subscription object or a list of different subscriptions using the Subscribe and Unsubscribe methods of the Alpaca.Markets.IAlpacaDataStreamingClient class respectively.
The next code snippet demonstrates this process in detail - create client, create subscriptions, subscribe events, wait for events, close connection:
using var client = Environments.Live
.GetAlpacaDataStreamingClient(new SecretKey(KEY_ID, SECRET_KEY));
await client.ConnectAndAuthenticateAsync();
var waitObjects = new []
{
new System.Threading.AutoResetEvent(false),
new System.Threading.AutoResetEvent(false)
};
var tradeSubscription = client.GetTradeSubscription("AAPL");
tradeSubscription.Received += (trade) =>
{
Console.WriteLine($"Trade received for the {trade.Symbol} contract");
waitObjects[0].Set();
};
var quoteSubscription = client.GetQuoteSubscription("AAPL");
quoteSubscription.Received += (quote) =>
{
Console.WriteLine($"Quote received for the {quote.Symbol} contract");
waitObjects[1].Set();
};
await client.SubscribeAsync(tradeSubscription);
await client.SubscribeAsync(quoteSubscription);
System.Threading.WaitHandle.WaitAll(waitObjects, TimeSpan.FromSeconds(10));
await client.UnsubscribeAsync(tradeSubscription);
await client.UnsubscribeAsync(quoteSubscription);
await client.DisconnectAsync();
The IAsyncEnumerable Support
The Alpaca.Markets.Extensions package contains helper interfaces (and related implementations) and methods for more simple real-time data events consuming in the asynchronous code:
- The AsAsyncEnumerable extension method of the IAlpacaDataSubscription interface converts the instance of this interface into the instance of IAsyncEnumerable interface for consuming it with C#
await foreach
statement. - The IDisposableAlpacaDataSubscription interface and internal implementation that combines IAlpacaDataSubscription, System.IAsyncDisposable, and System.IDisposable interfaces and allows you to consume subscription events in the scope of C#
using
orawait using
statements.
The next code snippet demonstrates both features in action:
using var client = Environments.Live
.GetAlpacaDataStreamingClient(new SecretKey(KEY_ID, SECRET_KEY));
await client.ConnectAndAuthenticateAsync();
var cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(15));
await using var subscription = await client.SubscribeTradeAsync(Symbol);
await foreach (var trade in subscription
.AsAsyncEnumerable(cancellationTokenSource.Token)
.ConfigureAwait(false))
{
Console.WriteLine($"Trade received for the {trade.Symbol} contract");
}
await client.DisconnectAsync();
If the C# compiler tells you that the SubscribeTradeAsync
method doesn't exist it means that you've just forgotten to add the next line into your C# file using
section:
using Alpaca.Markets.Extensions;
You shouldn't call Subscribe and Unsubscribe methods manually - they will be called for you (in the implementation class constructor and inside the DisposeAsync() method) automatically. And keep in mind that C# await foreach
statement will not execute any code behind the loop body until enumerable completion - in this example, it will happen after 15 seconds, but you can use the cancellation token received from the outer code.