Skip to content

ProConcepts Stream Layers

UmaHarano edited this page Nov 6, 2024 · 11 revisions

Stream layers are feature layers with a stream service as their data source. They reference real-time datasets where the observations are live. Observations can include changes to location, attributes, or both. Stream layers can contain point, polyline, or polygon features. Even though the title of this ProConcepts is "Stream Layers", Stream layers and real-time datasets usage relevant to the API is discussed. The terms "events" and "observations" as relates to "events" streamed from a stream server are used interchangeably. They mean the same thing.

Language:      C#
Subject:       Map Authoring
Contributor:   ArcGIS Pro SDK Team <arcgisprosdk@esri.com>
Organization:  Esri, http://www.esri.com
Date:          10/06/2024
ArcGIS Pro:    3.4
Visual Studio: 2022

In this topic

Background

Stream layers are a special kind of feature-based layer designed to work with stream services. Stream services, similar to other services, are hosted on portal or online and require the GeoEvent Server extension. GeoEvent Server consumes incoming "raw" event data (using a "connector") and maps it to json for the stream service to consume (and also publishes the stream services). The stream service broadcasts the json data to clients such as ArcGIS web maps and ArcGIS Pro stream layers. Unlike feature services, stream services do not need to persist the incoming data before it can be visualized but broadcast it to clients directly.

Stream layers are connected to stream services using LayerFactory and the stream service URI. When the stream layer is created, it subscribes to the stream service and can immediately begin receiving (real time) data. Whereas a feature layer "pulls" features from the feature service, a stream layer receives features from the stream service (without polling). Stream services can also have one or both of the following associated feature services present:

  • A feature service which retains the latest observations (if snapshot archiving was enabled during publishing of the stream service)
  • feature service to store related attribute information (feature location, vehicle or aircraft characteristics, etc) if a related features service URL was specified during publishing of the stream service).

Consult the Service capabilities section of this ProConcepts for more information. Note: Stream services are published from GeoEvent Server, not from Pro.

Using Stream Layer vs Realtime FeatureClass

Both stream layer and Realtime feature class expose similar capabilities though they do have certain differences. For instance, the feature class provides details on the schema and service capabilities not available on the layer and likewise, the layer controls rendering and event display filtering options not relevant for the feature class. However, in most cases the same options can be manipulated on both.

One important consideration to keep in mind when choosing whether to code against the stream layer or its feature class is that settings changed via the layer are persisted in the project (if it is saved) whereas the same changes made via the feature class are not. Feature class setting changes are retained in-memory only for the lifetime of the session (or until they are overwritten by another change). Additionally, changes made via the layer will get reflected in the layer properties, ribbon, and symbology UI. Changes made via the feature class will not. Therefore, if maintaining consistency between the Pro UI and settings made via the API is a requirement, the stream layer should be used, and not its feature class, to manipulate equivalent settings.

For these reasons it is assumed add-in developers will be working mostly with the layer though, for completeness, when there is an equivalent setting, the ProConcepts document will cover both. Manipulating the Realtime feature class is required, for example, when operating in a headless configuration such as a console program using the "CoreHost" pattern.

Stream Layer Types

Stream layers and their underlying Realtime feature class can be one of two types (consult Stream layers):

  • Spatial
  • Non-spatial (often referred to as "attribute only")

Spatial stream layers receive broadcast events that do include a location (eg x, y). The location can either vary with each event as is the case when a vehicle or airplane is being tracked, or the location can be fixed as is the case when an event, in the literal sense, "happens" at a particular location such as an accident, crime, meeting event, or tweet.
Non-spatial stream layers receive broadcast events that do not include a location. The associated location must come from a related feature service. In many cases, non-spatial stream layers are used to receive broadcast events related to stationary sensors such as water gauges, weather stations, or air quality sensors. From the standpoint of the API the "type" of the layer (or its feature class) - spatial or non-spatial - is mostly irrelevant. They both support the same set of properties and functions albeit the intended use cases for each may be different.

If it is required that you need to identify the stream layer type (or are just curious), it can be inferred from the tracking information on the layer (something we haven't discussed yet). Essentially, spatial stream layers must have either a spatial track or no track. Non spatial layers must have an "attribute only" track. Therefore we can write some logic as follows:

  //spatial or non-spatial?
  if (streamLayer.TrackType == TrackType.AttributeOnly)
  {
     //this is a non-spatial stream layer
  }
  else
  {
      //this must be a spatial stream layer
  }

For a RealtimeFeatureClass access its RealtimeFeatureClassDefinition and GetTrackType() using the same logic pattern.

Connecting to a Stream Service

To connect to a stream service (to create a stream layer) use any of the LayerFactory overloads that consume a URI. The most basic form of a request is either via LayerFactory.Instance.Createlayer(Uri dataUri,....) or LayerFactory.Instance.FeatureCreatelayer(Uri dataUri,....) casting the result to a StreamLayer.

  var air_traffic_url = 
@"https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
  await QueuedTask.Run(() => {
     var uri = new Uri(air_traffic_url, UriKind.Absolute);
     var streamLayer = LayerFactory.Instance.CreateLayer(
                             uri, MapView.Active.Map) as StreamLayer;
     //TODO work with the stream layer
  });

Uris can be either service connections (as shown above) or portal item Uris. If a portal item uri is provided then that portal must be the currently active portal or the connection request will fail.
As a derived type of FeatureLayer, stream layers can also be created with additional options specified upfront using the T CreateLayer<T>(LayerCreationParams layerDef, ...) overload and a FeatureLayerCreationParams instance as the LayerCreationParams argument. In this example, a stream layer is created with a definition query and renderer specified upfront:

 var air_traffic_url = 
  @"https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
  await QueuedTask.Run(() => {
     var uri = new Uri(air_traffic_url, UriKind.Absolute);
     var symbol = SymbolFactory.Instance.ConstructPointSymbol(
          ColorFactory.Instance.RedRGB, 10, SimpleMarkerStyle.Hexagon)
                                                    .MakeSymbolReference();
     var sl_params = new FeatureLayerCreationParams(new Uri(air_traffic_url)) {
          IsVisible = false,
          DefinitionQuery = new DefinitionQuery("RWY = '29L'", "Runway"),
          RendererDefinition = new SimpleRendererDefinition(symbol)
        };
     var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
                                                sl_params, MapView.Active.Map);
     //TODO work with the stream layer
  });

When a stream layer is created it is automatically streaming so streaming does not need to be explicitly started. Refer to the Streaming section for more details.

Connecting via the Realtime Datastore

ArcGIS.Core.Data.Realtime data stores follow the same pattern for establishing a connection with a stream service as with other data stores:

  • Create the relevant (Realtime) connection property class using the uri of the service
  • Pass the connection properties as the argument to the Realtime datastore constructor
  • Access a feature class or table from the datastore (dispose and clean up when it is no longer needed)

When establishing a connection to a feature class via the data store, the feature class will not be streaming by default (opposite of the layer). Instead, call "StartStreaming" on the feature class to explicitly open the connection.

 //Within a Queued Task
 var props= new RealtimeServiceConnectionProperties(new Uri(@"--stream_service_url--"),
                           RealtimeDatastoreType.StreamService
                   );
  using (var rt_datastore = new RealtimeDatastore(realtimeServiceConProp))
  {
     var name = rt_datastore.GetTableNames().First();
     using (var rfc = rt_datastore.OpenTable(name) as RealtimeFeatureClass)
     {
        //feature class, by default, is not streaming (opposite of the stream layer)
         rfc.StartStreaming();
        //TODO use the feature class
        //...
     }
  }

Note: A Realtime data store of type RealtimeDatastoreType.StreamService only contains one Realtime feature class (or table).

Streaming

Stream layers operate in two modes: Streaming vs not streaming (or streaming started vs streaming stopped). This is reflected in the StreamLayer streamLayer.IsStreamingConnectionOpen property. If streamLayer.IsStreamingConnectionOpen == true; then the stream layer is streaming and is receiving events from the stream service. If streamLayer.IsStreamingConnectionOpen == false; then the stream layer is not streaming and the connection is closed (technically the connection could also be transitioning between open and closed or vice versa). When the connection is closed, the stream layer is not receiving events. Switching modes between streaming on and off is accomplished via StartStreaming() and StopStreaming() methods on both the layer and feature class.

Start Streaming

To start streaming, call streamLayer.StartStreaming() (or realtimeFc.StartStreaming()). When a stream layer is created, it is automatically placed into streaming mode so there is no need to explicitly call StartStreaming on a newly created layer. When a Realtime datastore connection is created it is not automatically streaming. The Realtime feature class must be retrieved and streaming explicitly started. When starting streaming on the layer, its streamLayer.IsStreamingConnectionOpen property is set to true once streaming has started. There can be some latency involved in establishing the layer connection to the stream service so streamLayer.IsStreamingConnectionOpen state can return false after a StartStreaming call (or layer creation) until the connection is established. For a finer-grained granularity of the connection state, the RealtimeFeatureClass exposes a public StreamingConnectionState GetStreamingConnectionState() method. The StreamingConnectionState includes Unknown, Ready, Connecting, Open, Failed, and Closed. "Open" is equivalent to IsStreamingConnectionOpen == true. All other states are equivalent to IsStreamingConnectionOpen == false.

When streaming is started on the layer (or feature class if calling realTimeFc.StartStreaming()), all existing observations in the Realtime feature class are deleted before new events are retrieved. Existing observations retrieved from a previously open connection are never persisted beyond the next start streaming call (or between Pro sessions). If the stream service has an associated feature service (for snapshot archiving or location information) then those records are retrieved from the service first to provide an initial display of data, otherwise nothing will draw until new events are received.

Stop Streaming

To stop streaming on the layer, call streamLayer.StopStreaming() (or realtimeFc.StopStreaming() if using the feature class). The stream layer connection will be closed and streamLayer.IsStreamingConnectionOpen will be set to false. While streaming is stopped, event rows in the feature class are not expired and no new events will be received until streaming is re-started. Therefore, turning streaming off can be advantageous when you wish to access the set of events currently stored, in-memory, in the feature class, on the client (eg to perform a lengthy analysis) without being impacted by updates to the events. Search(...) and Select(...) methods can also safely be executed against the stream layer and client add-in logic can be guaranteed to process all returned rows (refer to the next section for details).

Search and Select

When the stream layer (or feature class) is streaming, Search(...) and Select(...) behavior can be affected if the stream service has a particularly high velocity. Events selected via the Search(...) orSelect(...) can be expired before the client add-in logic can process them (via the traditional RowCursor.MoveNext()). For example, a selection may return 100 rows but only 10 of them get processed by the add-in logic before the other 90 are expired. In other words, following this same example, querying GetCount() on the selection returns a count of "100" but executing MoveNext() on the cursor only iterates 10 times (not 100) because by the time the 10th MoveNext call is made the other 90 rows have been expired. For this reason, it is generally recommended that streaming be stopped before any Search(...) or Select(...) operation.

 //assume we ~are streaming~
 var select = streamLayer.Select(....);
 var count = select.GetCount();//eg, 100 rows are selected
 int num_processed = 0;
 var rc = select.Search();
 while(rc.MoveNext()) {
  //add-in logic to process the result
  num_processed++;
 }
 if (num_processed != count) {
  //stream velocity ended up expiring previously selected rows
  ...

To process rows during streaming, use Subscribe() and/or SearchAndSubscribe() in conjunction with a RealtimeCursor and rtCursor.WaitForRowsAsync(...). Consult Row Events for more details.

Service Capabilities

Service capabilities refers to the presence of:

  • An "archive" feature service used to record the latest observations (on the server).
  • A "related" feature service used to capture the spatial location of stationary events and/or static or non-changing event attributes such as vehicle make, manufacturer, plane type, passenger capacity, and so on.

Consult stream service json response keepLatestArchive and relatedFeatures properties.

Stream layers with an associated feature service (archive and/or related) will automatically display event locations when a connection is established. Stream layers without a service capability will only display events when they are received via a broadcast from the stream service. This can be a little disconcerting in cases where the stream service velocity is slow or infrequent. It is possible that streaming can be started but no data initially draws on the layer because no new events have been broadcast (since the connection was established) and so the layer remains "blank".

Although service capability does not affect the functionality of the stream layer, if add-ins wish to determine the service capabilities of a given stream layer they can interrogate the layer's RealtimeFeatureClass for its StreamServiceFeatureClassDefinition (derived from RealtimeFeatureClassDefinition) for an archive and/or related feature service url.

 //Define an enum to represent stream layer service capabilities
 internal enum StreamLayerServiceCapabilities {
    None = 0,
    Archive,
    Related,
    ArchiveAndRelated
  }
 //Extension method to determine the capabilities
 internal static class StreamLayerExtensions {

    public static StreamLayerServiceCapabilities GetServiceCapabilities(
                                                  this StreamLayer streamLayer) {
      //must be on the QueuedTask
      using (var fc = streamLayer.GetFeatureClass())
      using (var sfcdef = fc.GetDefinition() as StreamServiceFeatureClassDefinition)
      {
        var url = sfcdef?.GetArchiveFeatureServiceLayerURL()?.ToString() ?? "";
        var url2 = sfcdef?.GetRelatedFeatureServiceLayerURL()?.ToString() ?? "";
        var has_archive = !string.IsNullOrEmpty(url);
        var has_related = !string.IsNullOrEmpty(url2);
        if (has_archive && has_related)
          return StreamLayerServiceCapabilities.ArchiveAndRelated;
        if (has_archive)
          return StreamLayerServiceCapabilities.Archive;
        if (has_related)
          return StreamLayerServiceCapabilities.Related;
      }
      return StreamLayerServiceCapabilities.None;
    }
    ...

   //usage elsewhere
   //On QueuedTask...
   var streamLayer ... ;
   var capabilities = streamLayer.GetServiceCapabilities()
  

Service capabilities are also visible on the "Source" tab of the layer properties within the Pro UI.

StreamLayerProps.png

Track Aware

Track Aware or "tracking awareness" refers to the presence of a common identifier, a "track id" used to relate events together with the same id into "tracks". A track can represent the changing location of a vehicle (spatial track) or the changing value of a sensor (non-spatial track) depending on the nature of the stream service and its connected layer(s). In any case, the track id groups together all related observations distinguishing them from other observations for different objects. A stream layer (or Realtime feature class) cannot be "set" track aware, it is a characteristic it inherits from its source stream service. For spatial stream layers track-awareness is optional - some services are track aware, some are not - whereas for non-spatial stream layers it is required - all non-spatial stream services are track aware. To determine if a layer is track aware, query its IsTrackAware property. To retrieve the track id field name, query streamLayer.TrackIdFieldName.

  var streamLayer = ... ;
  if (streamLayer.IsTrackAware)
  {
     //layer is track aware
     var trackField = streamLayer.TrackIdFieldName;
  }

If dealing with the Realtime feature class, access the track information off the RealtimeFeatureClassDefinition via HasTrackIDField() and GetTrackIDField().

  //Must be on QueuedTask
  using (var rfc = streamLayer.GetFeatureClass())
  using (var rfc_def = rfc.GetDefinition())
  {
     if (rfc_def.HasTrackIDField()) {
        //Feature class is track aware
        var trackField = rfc_def.GetTrackIDField();

     }
   }

For spatial layers (and feature classes) that are track aware, the tracking type is always TrackType.Spatial as the track represents the changing position of an object being "tracked". For non-spatial layers (and their feature class), they must be track aware and the tracking type is always TrackType.AttributeOnly. AttributeOnly tracks relate changing observation values for a fixed location (eg pressure, temperature, water level, or some other telemetry) rather than a changing location. The type of tracking the stream layer is using can be queried off either the stream layer via TrackType or via the Realtime feature class RealtimeFeatureClassDefinition and GetTrackType().

 //Track type from the layer
 var streamLayer = ...;//else where
 var trackType  = streamLayer.TrackType;
 //TrackType.None, TrackType.AttributeOnly, TrackType.Spatial

 ...
 //Track type from the feature class
 //On the QueuedTask...
 using (var fc = streamLayer.GetFeatureClass())
 using (var fcdef = fc.GetDefinition() as RealtimeFeatureClassDefinition) {
   var trackType = fcdef.GetTrackType();
   //etc.
   

Expiring Events

During streaming, new events are inserted into the in-memory Realtime feature class whilst previous event rows "get" expired and removed. The rate at which previous observations are expired and flushed from the table can be controlled by one of two settings:

  • Either set a maximum count above which events are deleted
  • Or set a maximum age (i.e. duration) after which events are deleted

If the layer is track aware, the maximum count or age is set per track otherwise it (count or age) applies to all the events combined. Maximum count is set via SetExpirationMaxCount(count). Maximum age is set via SetExpirationMaxAge(timespan). These methods exist on both the layer and feature class. As per Using Stream Layer vs Realtime FeatureClass remember that changing these settings via the feature class will not get persisted.

Which setting to apply is controlled via SetExpirationMethod(...), again available on both the layer and feature class. Call SetExpirationMethod with a value of RowExpirationMethod.MaxCount to apply the maximum count limit whereas a value of RowExpirationMethod.MaxAge applies the maximum age.

Maximum count:

 //Must be on QueuedTask
 if (streamLayer.GetExpirationMethod() != FeatureExpirationMethod.MaximumFeatureCount)
        streamLayer.SetExpirationMethod(FeatureExpirationMethod.MaximumFeatureCount);
 streamLayer.SetExpirationMaxCount(15);
 //FYI
 if (streamLayer.IsTrackAware)
 {
    //MaxCount is per track! otherwise for the entire layer
  }

A similar pattern is followed for max age:

 //Must be on QueuedTask
 if (streamLayer.GetExpirationMethod() != FeatureExpirationMethod.MaximumFeatureAge)
        streamLayer.SetExpirationMethod(FeatureExpirationMethod.MaximumFeatureAge);
 //set to 12 hours (max is 24 hours)
 streamLayer.SetExpirationMaxAge(new TimeSpan(12,0,0));
 //FYI
 if (streamLayer.IsTrackAware)
 {
    //MaxAge is per track! otherwise for the entire layer
  }

Definition Queries

Definition query expressions can be applied in three different ways:

  • Use LayerFactory and the FeatureLayerCreationParams.DefinitionFilter parameter to apply the filter expression at the same time the stream layer is created.
  • Use the (inherited) SetDefinitionQuery(query) method on the stream layer to apply the query on the layer.
  • Use the SetFilterWhereClause(query) method on the RealtimeFeatureClass to apply the query on the feature class.

Definition queries (whether set on the layer or feature class) are applied on the connection to the stream service. If the stream layer is currently streaming its underlying connection is closed and a new connection is established with the query string from the definition query applied. In other words, if a layer or feature class is streaming when the query is applied, streaming is automatically stopped and restarted. When streaming is (re)started all previously retrieved observations are always automatically deleted from the feature class (see Start Streaming). Therefore, applying a definition query to a streaming layer (or feature class) truncates all existing rows. This is why, in the UI, the definition query tab shows a warning to the user if the layer is streaming.

LABus_properties.png

If a definition query is applied to the feature class, via realTimeFC.SetFilterWhereClause(query), the query is not persisted in the layer and is not reflected in the layer UI (same as changing any setting on the feature class). Assuming the feature class was retrieved via a GetFeatureClass() call off a stream layer, and a query applied: when the Pro project is closed and reopened, the layer connection will be reestablished either with the layer's definition query, if it had one (persisted in the project) or with no definition query. It will not be reestablished with the query that was set on its feature class.

Stream layer and Realtime feature class definition queries follow the same rules as do GeoEvent Server attribute filters. For a list of the supported operators consult the GeoEvent Server attribute filter documentation.

Editing

Stream layers and Realtime feature class are read-only. They do not support editing.

Rendering

Stream layers can have up to three different renderers, one each to display current observations, previous observations and track lines (note: only spatial track aware stream layers can specifiy renderers for previous observations and track lines). When selecting renderers to apply, stream layers can use the same set of renderer choices as do "regular" feature layers with the exception of the track line renderer. The track line renderer can only use a simple renderer in the current release. Renderers are retrieved and set via GetRenderer(...) and SetRenderer(...) calls on the layer respectively.

Current Observations

The current observation renderer is the default renderer for the layer. For spatial stream layers, the current observations can be considered the latest event or observation received from the service. For non-spatial stream layers, the current observation location does not change and so the same location (for a given observation) is always drawn. Creating a renderer for the current observations follows the same basic pattern as for any feature layer:

  • The renderer can be specified upfront as part of the LayerFactory CreateLayer FeatureLayerCreationParams.
  • The renderer can be applied after the layer has been created via a SetRenderer(renderer) call.
    (The renderer can also be set on the CIM directly using the underlying CIMFeatureLayer layer definition and the CIMFeatureLayer.Renderer property.)

For example, here a connection is established to a point stream layer and a simple renderer is applied for the current observations. The layer visibility is set to false:

  var labus = 
     @"https://geoeventsample1.esri.com:6443/arcgis/rest/services/LABus/StreamServer";
  var uri = new Uri(labus, UriKind.Absolute);

  QueuedTask.Run(() => {
     var createParams = new FeatureLayerCreationParams(uri){
        IsVisible = false,
        RendererDefinition = new SimpleRendererDefinition() {
            SymbolTemplate = SymbolFactory.Instance.ConstructPointSymbol(
                        ColorFactory.Instance.BlueRGB,
                        12,
                        SimpleMarkerStyle.Pushpin).MakeSymbolReference()
     } 
  };
  var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
                            createParams, MapView.Active.Map);
  ...
 });

Certain of the more complex renderers, such as unique value or class breaks renderers, depend on statistics derived from the underlying layer (via streamLayer.CreateRenderer(...)) to determine unique value classes, class breaks, and so forth. In the case of a stream layer, depending on its velocity and/or associated service capabilities, the derived unique values or class breaks computed by CreateRenderer(...) can vary depending on when a CreateRenderer call is made - the layer may have received only a few records vs having received many records which can affect the range of values present in the table. Add-in developers should keep this in mind if they are using CreateRenderer(...) to "auto" assign unique value or class break ranges from the layer statistics vs. defining them by hand.

In this example, a unique value renderer is defined for a stream layer and the value classes are assigned by hand using the CIM rather than being derived from layer statistics with a CreateRenderer(...) call:

 var airTraffic = 
 @"https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
 var uri = new Uri(airTraffic, UriKind.Absolute);
 //Must be on QueuedTask!
 var createParams = new FeatureLayerCreationParams(uri) {
      IsVisible = false
 };
 var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
                          createParams, map);

 //Define the unique values by hand
 var uvr = new CIMUniqueValueRenderer() {
      Fields = new string[] { "ACTYPE" },
      UseDefaultSymbol = true,
      DefaultLabel = "Others",
      DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
                      CIMColor.CreateRGBColor(185, 185, 185), 
                              8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
  };

  var classes = new List<CIMUniqueValueClass>();
  //add in classes - one for ACTYPE of 727, one for DC 9. We know the data can contain
  //these values even if records containing those values may not have been broadcast yet
  classes.Add(
     new CIMUniqueValueClass() {
        Values = new CIMUniqueValue[] {
                      new CIMUniqueValue() { FieldValues = new string[] { "B727" } } },
        Visible = true,
        Label = "Boeing 727",
        Symbol = SymbolFactory.Instance.ConstructPointSymbol(
                      ColorFactory.Instance.RedRGB, 10, SimpleMarkerStyle.Hexagon)
                                                                .MakeSymbolReference()
   });
   classes.Add(
     new CIMUniqueValueClass() {
        Values = new CIMUniqueValue[] {
                   new CIMUniqueValue() { FieldValues = new string[] { "DC9" } } },
        Visible = true,
        Label = "DC 9",
        Symbol = SymbolFactory.Instance.ConstructPointSymbol(
                      ColorFactory.Instance.GreenRGB, 10, SimpleMarkerStyle.Hexagon)
                                                                .MakeSymbolReference()
   });
   //add the classes to a group
   var groups = new List<CIMUniqueValueGroup>() {
          new CIMUniqueValueGroup() {
             Classes = classes.ToArray()
          }
   };
   //add the groups to the renderer
   uvr.Groups = groups.ToArray();
   //Apply the renderer
   streamLayer.SetRenderer(uvr);
   streamLayer.SetVisibility(true);//turn on the layer

Previous Observation Renderer

Note: Previous observation renderer applies to spatial track aware stream layers only. Previous observations can be rendered along with the current observations. When a track aware stream layer is created it is always assigned a default previous observation (simple) renderer. To assign a different previous observations renderer use the stream layer CreateRenderer(RendererDefinition, FeatureRendererTarget) and SetRenderer(CIMRenderer, FeatureRendererTarget)overloads. Specify a target value of FeatureRendererTarget.PreviousObservations. The existing previous observation renderer can be retrieved with streamLayer.GetRenderer(FeatureRendererTarget.PreviousObservations).

 var streamLayer = .... ;
 
 //The layer must be track aware and spatial
 if (streamLayer.TrackType != TrackType.Spatial)
     return;
 
 var prevRenderer = new SimpleRendererDefinition() {
          SymbolTemplate = SymbolFactory.Instance.ConstructPointSymbol(
              ColorFactory.Instance.GreenRGB, 10, SimpleMarkerStyle.Hexagon)
                .MakeSymbolReference()
  };
  streamLayer.SetRenderer(
         streamLayer.CreateRenderer(prevRenderer), 
               FeatureRendererTarget.PreviousObservations);

Previous observations visibility can be controlled via SetPreviousObservationsVisibility(true -or- false). The maximum number of previous observations to display can be controlled via SetPreviousObservationsCount(count). The count should be set to a value less than the current "GetExpirationMaxCount" setting. Setting the previous observation count equal to or greater than the current "GetExpirationMaxCount" will have no effect. It will always be capped at the max expiration count, whatever that is. Refer to Expiring Events for more information on GetExpirationMaxCount.

 //The layer must be track aware and spatial for these setting to have an effect
 if (streamLayer.TrackType != TrackType.Spatial)
          return;//these setting will be ignored
 //On the QueuedTask
 var max = streamLayer.GetExpirationMaxCount();

 //assume "max" is greater than 1
 streamLayer.SetPreviousObservationsCount(max -1);
 if (!streamLayer.ArePreviousObservationsVisible)
    streamLayer.SetPreviousObservationsVisibility(true);

Track Lines Renderer

Note: Track lines renderer applies to spatial track aware stream layers only. Track lines can be drawn to connect previous observations together. This can be useful for showing the path that a moving feature has traveled. Track lines can only be rendered using a simple renderer and the line style must be solid, otherwise the renderer symbology is ignored. To change the default track lines renderer use the stream layer CreateRenderer(RendererDefinition, FeatureRendererTarget) and SetRenderer(CIMRenderer, FeatureRendererTarget)overloads same as can be used for the other renderers. Specify a target value of FeatureRendererTarget.TrackLines. Retrieve the existing track lines renderer with streamLayer.GetRenderer(FeatureRendererTarget.TrackLines).

  //The layer must be track aware and spatial
  if (streamLayer.TrackType != TrackType.Spatial)
     return;//these setting will be ignored
  //Must be on QueuedTask!
  //Note: only a simple renderer with solid line symbol is supported for track 
  //line renderering
  var trackRenderer = new SimpleRendererDefinition() {
      SymbolTemplate = SymbolFactory.Instance.ConstructLineSymbol(
           ColorFactory.Instance.BlueRGB, 2, SimpleLineStyle.Solid)
                .MakeSymbolReference()
   };
   streamLayer.SetRenderer(
       streamLayer.CreateRenderer(trackRenderer), 
               FeatureRendererTarget.TrackLines);

Track line visibility is controlled via SetTrackLinesVisibility(true -or- false).

 //The layer must be track aware and spatial for this setting to have an effect
 if (streamLayer.TrackType != TrackType.Spatial)
          return;//these setting will be ignored
 //On the QueuedTask
 if (!streamLayer.AreTrackLinesVisible)
    streamLayer.SetTrackLinesVisibility(true);

Previous observation count, visibility, and track line visibility can also be set via the CIM. This may be useful if you are already manipulating the CIM definition to apply or modify the layer renderers and want to set everything as a single transaction:

  //we are within a queued task...
  //acquire a reference to the relevant stream layer
  var streamLayer = ...;

 //get the CIM Definition
 var def = streamLayer.GetDefinition() as CIMFeatureLayer;
 //... other CIM changes here ...

 //set the number of previous observations, 
 //set show previous observations and track lines to true
 def.PreviousObservationsCount = (int)streamLayer.GetExpirationMaxCount() - 1;
 def.ShowPreviousObservations = true;
 def.ShowTracks = true;
 //apply the changes back
 streamLayer.SetDefinition(def);

Row Events

Row events occur whenever rows are inserted or deleted to or from the Realtime feature class/table. Row events can have 3 possible sources identified by the RealtimeRowSource enum:

  • RealtimeRowSource.PreExisting: Previously retrieved rows were already in the table when the subscription was made. We will explain "subscription" shortly.
  • RealtimeRowSource.EventInsert: New events were broadcast from the service during streaming.
  • RealtimeRowSource.EventDelete: Events were deleted from the table. This could be because rows were truncated when streaming was started, rows were explicitly truncated via the API (whether streaming is started or stopped), or previous observations expired during streaming.

To process row events, clients should follow a three step process:

  1. Clients subscribe for events on the feature class via the layer or feature class. Subscription returns a RealtimeCursor.
  2. Clients retrieve the row events via WaitForRowsAsync(...) called on the RealtimeCursor.
  3. Clients unsubscribe from row events either explicitly via RealtimeCursor Unsubscribe() or implicitly via RealtimeCursor Dispose().

Clients should also be prepared to handle cancellation of row retrieval.

Subscribe

To subscribe for row events, add-ins call either Subscribe(query_filter) or SearchAndSubscribe(query_filter) on the stream layer or Realtime feature class. An optional query filter parameter can be provided to filter row events otherwise all row events are returned. Use SearchAndSubscribe in lieu of Subscribe(...) to query all pre-existing rows in the Realtime feature class first before subscribing*. Subscription is independent of streaming connection state. It does not matter if the layer/feature class is streaming or not. On subscription, a RealtimeCursor is returned to the client which is used to retrieve incoming row events.

Note: multiple subscriptions can be made to a stream layer or Realtime feature class. Each subscription results in its own RealtimeCursor that is monitored for rows. If callers choose to use the RealtimeFeatureClass to subscribe they have the additional flexibility of using a System.Threading.Tasks.Task to perform the subscription, in addition to the QueuedTask, which may be useful in various scenarios.

*Preexisting rows that were searched via SearchAndSubscribe will be returned immediately in the initial WaitForRowsAsync call on the Realtime row cursor.

 //retrieve the stream layer and its feature class
 //subscribe. 

 var stream_layer = ...;
 var filter = new SpatialQueryFilter() { .... };

 //Feature class must always be retrieved on the QueuedTask
 var rfc = await QueuedTask.Run(() => stream_layer .GetFeatureClass());

 //Note: we can use a System Task -not- just a QueuedTask to Subscribe
 //with the feature class
 await System.Threading.Tasks.Task.Run(async () => {
    //Search and subscribe..
    using(var rc = rfc.SearchAndSubscribe(filter, true)) {
        //or...    rfc.Subscribe(filter, true) //No search...

        //We must still call await WaitForRowsAsync to actually ~get~ any rows.

In this snippet, because we are subscribing on the stream layer, we ~must~ use the QueuedTask.

 var stream_layer = ...;
 var filter = new SpatialQueryFilter() { .... };

 await QueuedTask.Run(async () => {  
     using(var rc = stream_layer.SearchAndSubscribe(filter, true)) {
          //or...   stream_layer.Subscribe(filter, true) //No search...

       //etc

WaitForRowsAsync

Once we are subscribed, we use the returned RealtimeCursor and its WaitForRowsAsync to (asynchronously) retrieve row events as they happen. The awaited task will complete when rows become ready in the cursor. The RealtimeCursor follows the same semantic as a "regular" row cursor once rows are ready - namely, call MoveNext() to iterate over the returned rows in a forward-only fashion. When the rows provided in the cursor are exhausted rowCursor.MoveNext() returns false and the next row event(s) can be awaited by renewing the WaitForRowsAsync() call. Clients repeat this pattern of making a WaitForRowsAsync call to listen for; and MoveNext to process; incoming events. Client code can process all rows retrieved in the cursor without any interference from other row events occurring back on the table. Further row events are handled via additional WaitForRowsAsync calls.

RealtimeCursor can either be recycling or non-recycling. A recycling cursor will overwrite the rowCursor.Current member with each retrieved row whereas a non-recycling will maintain a unique row for each row retrieved. This is the same recycling behavior as for the ArcGIS.Core.Data.RowCursor. Add-ins can stop listening for additional row events by not renewing the WaitForRowsAsync call after they have exhausted the current cursor rows (via MoveNext) at which point they should unsubscribe, the topic of the next section.

 //As before, retrieve the stream layer and/or its feature class
 //and subscribe. We are use a System Task with the feature class...

 await System.Threading.Tasks.Task.Run(async () => {
    //Search and subscribe..
    using(var rc = rfc.SearchAndSubscribe(filter, true)) {
      //get the rows for incoming events now that we are subscribed
      while (await rc.WaitForRowsAsync(CancellationToken.None)) {
        //process in a forward-only fashion
        while (rc.MoveNext()) {
           //check RealtimeRowSource  if we want to know the origin of the event
           RealtimeRowSource source = rc.Current.GetRowSource();
           ...
           //TODO, process the rows as needed
           var some_val = rc.Current[field_index];//access row values as normal
           ...
        }
      }
      ...

Unsubscribe

As long as a RealtimeCursor is subscribed to a Realtime feature class, it will continue to receive row events until it is unsubscribed (whether the events are being processed via WaitForRowsAsync calls or not). RealtimeCursors can either be explicitly unsubscribed by calling realtimeCursor.Unsubscribe() (unusual) or implicitly unsubscribed by allowing the cursor to be disposed when it goes out of scope of an enclosing using(...) { } statement (most common) [though Dispose can also be explicitly called].

If the cursor is currently awaiting rows in a WaitForRowsAsync call and the Unsubscribe is explicitly called, the task will complete and calling MoveNext will return false. No special logic is required to handle unsubscription if it should occur while the row cursor is listening for events though typically it would occur after WaitForRowsAsync calls have completed and the row cursor has gone out of scope (and is being disposed). Not unsubscribing the cursor can lead to the cursor's internal queue becoming full. Remember, even though the add-in may no longer be processing rows, it the cursor is still subscribed it is still receiving rows. If the cursor's queue becomes full, the RealtimeCursor will be automatically unsubscribed to prevent an internal buffer overflow condition.

Once a cursor has been unsubscribed it cannot be re-subscribed and calling WaitForRowsAsync will result in undefined behavior - probably an exception. The same is true if WaitForRowsAsync is called and the cursor has already been disposed.

Note: The connection state of the cursor can be tested with rc.GetState(). This returns a RealtimeCursorState indicating if the cursor is subscribed, unsubscribed, or was automatically unsubscribed because of an overrun.

 //implicit unsubscribe (preferred)
 //Search and subscribe..or subscribe
 using(var rc = rfc.SearchAndSubscribe(filter, true)) {
      //get the rows for incoming events now that we are subscribed
      while (await rc.WaitForRowsAsync(CancellationToken.None)) {
         ...
         //process rows here
         break;//some exit condition causes us to conclude processing
      }
  }//<-- cursor goes out of scope. It is disposed and unsubscribed


  //explicit unsubscribe and/or disposal
  var rc = rfc.SearchAndSubscribe(filter, true)) {
    //TODO listen for rows
  }

  //unsubscribe
  rc.Unsubscribe();
  //do something before dispose - eg check state, etc
  ...
  rc.Dispose();

  //-- or simply --
  rc.Dispose();//also unsubscribes if still subscribed

Cancellation

An ongoing WaitForRowsAsync call can be cancelled via its CancellationToken parameter. Use of a CancellationToken is a standard Microsoft pattern for cancelling Tasks. The CancellationToken can be cancelled explicitly by calling its parent CancellationTokenSource's Cancel() method or by specifying a timeout in the constructor of the CancellationTokenSource which cancels the token once the duration has expired. When a Task is cancelled, it completes immediately and a TaskCanceledException is thrown indicating that the awaited method is cancelled. The task.IsCanceled property can also be checked to determine if the task was canceled or completed successfully. Add-ins are responsible for catching the TaskCanceledException.

 //handle cancellation
 using(var rc = rfc.Subscribe(filter, true)) {
   //auto-cancel after 20 seconds
   var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
   //Handle TaskCanceledException
   try
   {
     //pass in the cancellation token to WaitForRowsAsync
      while (await rc.WaitForRowsAsync(cancel.Token)) {
        //Process rows
      }
   }
   //we must catch TaskCanceledException 
   catch(TaskCanceledException tce)
   {
      //TODO - we were cancelled
   }
   cancel.Dispose();
   ...
   //to explicitly cancel....
   var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
   
   //else where...
   if (_cancelWaitingForRows)
     cancel.Cancel();//explicit cancellation

Developing with ArcGIS Pro

    Migration


Framework

    Add-ins

    Configurations

    Customization

    Styling


Arcade


Content


CoreHost


DataReviewer


Editing


Geodatabase

    3D Analyst Data

    Plugin Datasources

    Topology

    Linear Referencing

    Object Model Diagram


Geometry

    Relational Operations


Geoprocessing


Knowledge Graph


Layouts

    Reports


Map Authoring

    3D Analyst

    CIM

    Graphics

    Scene

    Stream

    Voxel


Map Exploration

    Map Tools


Networks

    Network Diagrams


Parcel Fabric


Raster


Sharing


Tasks


Workflow Manager Classic


Workflow Manager


Reference

Clone this wiki locally