@@ -137,7 +137,7 @@ public async Task<bool> InitAsync()
137
137
}
138
138
else
139
139
{
140
- Trace ( $ "Attempting to register ourselves with IoT Hub using owner connection string: { _iotHubOwnerConnectionString } ") ;
140
+ Trace ( $ "Attempting to register ourselves with IoT Hub using owner connection string. ") ;
141
141
RegistryManager manager = RegistryManager . CreateFromConnectionString ( _iotHubOwnerConnectionString ) ;
142
142
143
143
// remove any existing device
@@ -154,13 +154,13 @@ public async Task<bool> InitAsync()
154
154
{
155
155
string hostname = _iotHubOwnerConnectionString . Substring ( 0 , _iotHubOwnerConnectionString . IndexOf ( ";" ) ) ;
156
156
deviceConnectionString = hostname + ";DeviceId=" + ApplicationName + ";SharedAccessKey=" + newDevice . Authentication . SymmetricKey . PrimaryKey ;
157
- Trace ( $ "Device connection string is: { deviceConnectionString } ") ;
157
+ Trace ( $ "Generated device connection string. ") ;
158
158
Trace ( $ "Adding it to device cert store.") ;
159
159
await SecureIoTHubToken . WriteAsync ( ApplicationName , deviceConnectionString , IotDeviceCertStoreType , IotDeviceCertStorePath ) ;
160
160
}
161
161
else
162
162
{
163
- Trace ( $ "Could not register ourselves with IoT Hub using owner connection string: { _iotHubOwnerConnectionString } ") ;
163
+ Trace ( $ "Could not register ourselves with IoT Hub using owner connection string. ") ;
164
164
Trace ( "exiting..." ) ;
165
165
return false ;
166
166
}
@@ -171,7 +171,7 @@ public async Task<bool> InitAsync()
171
171
deviceConnectionString = await SecureIoTHubToken . ReadAsync ( ApplicationName , IotDeviceCertStoreType , IotDeviceCertStorePath ) ;
172
172
if ( ! string . IsNullOrEmpty ( deviceConnectionString ) )
173
173
{
174
- Trace ( $ "Create Publisher IoTHub client with device connection string: ' { deviceConnectionString } ' using '{ IotHubProtocol } ' for communication.") ;
174
+ Trace ( $ "Create Publisher IoTHub client with device connection string using '{ IotHubProtocol } ' for communication.") ;
175
175
_iotHubClient = DeviceClient . CreateFromConnectionString ( deviceConnectionString , IotHubProtocol ) ;
176
176
ExponentialBackoff exponentialRetryPolicy = new ExponentialBackoff ( int . MaxValue , TimeSpan . FromMilliseconds ( 2 ) , TimeSpan . FromMilliseconds ( 1024 ) , TimeSpan . FromMilliseconds ( 3 ) ) ;
177
177
_iotHubClient . SetRetryPolicy ( exponentialRetryPolicy ) ;
@@ -270,16 +270,12 @@ public void Enqueue(string json)
270
270
/// </summary>
271
271
private async Task MonitoredItemsProcessor ( CancellationToken ct )
272
272
{
273
- string contentPropertyKey = "content-type" ;
274
- string contentPropertyValue = "application/opcua+uajson" ;
275
- string devicenamePropertyKey = "devicename" ;
276
- string devicenamePropertyValue = ApplicationName ;
277
- int userPropertyLength = contentPropertyKey . Length + contentPropertyValue . Length + devicenamePropertyKey . Length + devicenamePropertyValue . Length ;
273
+ uint jsonSquareBracketLength = 2 ;
278
274
Microsoft . Azure . Devices . Client . Message tempMsg = new Microsoft . Azure . Devices . Client . Message ( ) ;
279
275
// the system properties are MessageId (max 128 byte), Sequence number (ulong), ExpiryTime (DateTime) and more. ideally we get that from the client.
280
276
int systemPropertyLength = 128 + sizeof ( ulong ) + tempMsg . ExpiryTimeUtc . ToString ( ) . Length ;
281
277
// if batching is requested the buffer will have the requested size, otherwise we reserve the max size
282
- uint iotHubMessageBufferSize = ( _iotHubMessageSize > 0 ? _iotHubMessageSize : IotHubMessageSizeMax ) - ( uint ) userPropertyLength - ( uint ) systemPropertyLength ;
278
+ uint iotHubMessageBufferSize = ( _iotHubMessageSize > 0 ? _iotHubMessageSize : IotHubMessageSizeMax ) - ( uint ) systemPropertyLength - ( uint ) jsonSquareBracketLength ;
283
279
byte [ ] iotHubMessageBuffer = new byte [ iotHubMessageBufferSize ] ;
284
280
MemoryStream iotHubMessage = new MemoryStream ( iotHubMessageBuffer ) ;
285
281
DateTime nextSendTime = DateTime . UtcNow + TimeSpan . FromSeconds ( _defaultSendIntervalSeconds ) ;
@@ -295,6 +291,7 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
295
291
296
292
iotHubMessage . Position = 0 ;
297
293
iotHubMessage . SetLength ( 0 ) ;
294
+ iotHubMessage . Write ( Encoding . UTF8 . GetBytes ( "[" ) , 0 , 1 ) ;
298
295
while ( true )
299
296
{
300
297
// sanity check the send interval, compute the timeout and get the next monitored item message
@@ -327,7 +324,7 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
327
324
// sanity check that the user has set a large enough IoTHub messages size
328
325
if ( ( _iotHubMessageSize > 0 && jsonMessageSize > _iotHubMessageSize ) || ( _iotHubMessageSize == 0 && jsonMessageSize > iotHubMessageBufferSize ) )
329
326
{
330
- Trace ( Utils . TraceMasks . Error , $ "There is a telemetry message (size: { jsonMessageSize } ), which will not fit into an IoTHub message (max size: { _iotHubMessageSize } ].") ;
327
+ Trace ( Utils . TraceMasks . Error , $ "There is a telemetry message (size: { jsonMessageSize } ), which will not fit into an IoTHub message (max size: { iotHubMessageBufferSize } ].") ;
331
328
Trace ( Utils . TraceMasks . Error , $ "Please check your IoTHub message size settings. The telemetry message will be discarded silently. Sorry:(") ;
332
329
_tooLargeCount ++ ;
333
330
continue ;
@@ -338,10 +335,12 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
338
335
if ( _iotHubMessageSize > 0 || ( _iotHubMessageSize == 0 && _defaultSendIntervalSeconds > 0 ) )
339
336
{
340
337
// if there is still space to batch, do it. otherwise send the buffer and flag the message for later buffering
341
- if ( iotHubMessage . Position + jsonMessageSize <= iotHubMessage . Capacity )
338
+ if ( iotHubMessage . Position + jsonMessageSize + 1 <= iotHubMessage . Capacity )
342
339
{
340
+ // add the message and a comma to the buffer
343
341
iotHubMessage . Write ( Encoding . UTF8 . GetBytes ( jsonMessage . ToString ( ) ) , 0 , jsonMessageSize ) ;
344
- Trace ( Utils . TraceMasks . OperationDetail , $ "Added new message with size { jsonMessageSize } to IoTHub message (size is now { iotHubMessage . Position } ).") ;
342
+ iotHubMessage . Write ( Encoding . UTF8 . GetBytes ( "," ) , 0 , 1 ) ;
343
+ Trace ( Utils . TraceMasks . OperationDetail , $ "Added new message with size { jsonMessageSize } to IoTHub message (size is now { ( iotHubMessage . Position - 1 ) } ).") ;
345
344
continue ;
346
345
}
347
346
else
@@ -368,25 +367,28 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
368
367
Microsoft . Azure . Devices . Client . Message encodedIotHubMessage = null ;
369
368
370
369
// if we reached the send interval, but have nothing to send, we continue
371
- if ( ! gotItem && iotHubMessage . Position == 0 )
370
+ if ( ! gotItem && iotHubMessage . Position == 1 )
372
371
{
373
372
nextSendTime += TimeSpan . FromSeconds ( _defaultSendIntervalSeconds ) ;
374
373
iotHubMessage . Position = 0 ;
375
374
iotHubMessage . SetLength ( 0 ) ;
375
+ iotHubMessage . Write ( Encoding . UTF8 . GetBytes ( "[" ) , 0 , 1 ) ;
376
376
continue ;
377
377
}
378
378
379
379
// if there is no batching and not interval configured, we send the JSON message we just got, otherwise we send the buffer
380
380
if ( _iotHubMessageSize == 0 && _defaultSendIntervalSeconds == 0 )
381
381
{
382
- encodedIotHubMessage = new Microsoft . Azure . Devices . Client . Message ( Encoding . UTF8 . GetBytes ( jsonMessage . ToString ( ) ) ) ;
382
+ // we use also an array for a single message to make backend processing more consistent
383
+ encodedIotHubMessage = new Microsoft . Azure . Devices . Client . Message ( Encoding . UTF8 . GetBytes ( "[" + jsonMessage . ToString ( ) + "]" ) ) ;
383
384
}
384
385
else
385
386
{
387
+ // remove the trailing comma and add a closing square bracket
388
+ iotHubMessage . SetLength ( iotHubMessage . Length - 1 ) ;
389
+ iotHubMessage . Write ( Encoding . UTF8 . GetBytes ( "]" ) , 0 , 1 ) ;
386
390
encodedIotHubMessage = new Microsoft . Azure . Devices . Client . Message ( iotHubMessage . ToArray ( ) ) ;
387
391
}
388
- encodedIotHubMessage . Properties . Add ( contentPropertyKey , contentPropertyValue ) ;
389
- encodedIotHubMessage . Properties . Add ( devicenamePropertyKey , devicenamePropertyValue ) ;
390
392
if ( _iotHubClient != null )
391
393
{
392
394
nextSendTime += TimeSpan . FromSeconds ( _defaultSendIntervalSeconds ) ;
@@ -406,11 +408,14 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
406
408
// reset the messaage
407
409
iotHubMessage . Position = 0 ;
408
410
iotHubMessage . SetLength ( 0 ) ;
411
+ iotHubMessage . Write ( Encoding . UTF8 . GetBytes ( "[" ) , 0 , 1 ) ;
409
412
410
413
// if we had not yet buffered the last message because there was not enough space, buffer it now
411
414
if ( needToBufferMessage )
412
415
{
416
+ // add the message and a comma to the buffer
413
417
iotHubMessage . Write ( Encoding . UTF8 . GetBytes ( jsonMessage . ToString ( ) ) , 0 , jsonMessageSize ) ;
418
+ iotHubMessage . Write ( Encoding . UTF8 . GetBytes ( "," ) , 0 , 1 ) ;
414
419
}
415
420
}
416
421
else
@@ -459,23 +464,5 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
459
464
private static CancellationTokenSource _tokenSource ;
460
465
private static Task _monitoredItemsProcessorTask ;
461
466
private static DeviceClient _iotHubClient ;
462
-
463
- /// <summary>
464
- /// Classes for the telemetry message sent to IoTHub.
465
- /// </summary>
466
- private class OpcUaMessage
467
- {
468
- public string ApplicationUri ;
469
- public string DisplayName ;
470
- public string NodeId ;
471
- public OpcUaValue Value ;
472
- }
473
-
474
- private class OpcUaValue
475
- {
476
- public string Value ;
477
- public string SourceTimestamp ;
478
- }
479
-
480
467
}
481
468
}
0 commit comments