-
-
Notifications
You must be signed in to change notification settings - Fork 139
/
Copy pathmormot.soa.server.pas
2493 lines (2361 loc) · 91.7 KB
/
mormot.soa.server.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/// Interface-based SOA Process Types and Classes for Server-Side
// - this unit is a part of the Open Source Synopse mORMot framework 2,
// licensed under a MPL/GPL/LGPL three license - see LICENSE.md
unit mormot.soa.server;
{
*****************************************************************************
Server-Side Interface-based Service Oriented Architecture (SOA) Process
- TInjectableObjectRest Service Implementation Parent Class
- TServiceFactoryServer Service Provider
- TServiceContainerServer Services Holder
- Asynchronous REST Synchronisation Classes
*****************************************************************************
}
interface
{$I ..\mormot.defines.inc}
uses
sysutils,
classes,
variants,
mormot.core.base,
mormot.core.os,
mormot.core.buffers,
mormot.core.unicode,
mormot.core.text,
mormot.core.datetime,
mormot.core.variants,
mormot.core.data,
mormot.core.perf,
mormot.core.rtti,
mormot.core.json,
mormot.core.threads,
mormot.core.interfaces,
mormot.db.core,
mormot.orm.base,
mormot.orm.core,
mormot.orm.rest,
mormot.soa.core,
mormot.soa.client,
mormot.rest.core,
mormot.rest.client,
mormot.rest.server;
{ ***************** TInjectableObjectRest Service Implementation Parent Class }
type
TServiceFactoryServer = class;
/// service implementation class, with direct access on the associated
// TServiceFactoryServer/TRestServer instances
// - allow dependency injection aka SOLID DI/IoC by the framework using
// inherited TInjectableObject.Resolve() methods
// - allows direct access to the underlying ORM using its Server method
// - this class will allow Server instance access outside the scope of
// remote SOA execution, e.g. when a DI is performed on server side: it
// is therefore a better alternative to ServiceRunningContext.Factory,
// ServiceRunningContext.Factory.RestServer or ServiceRunningContext.Request.Server
TInjectableObjectRest = class(TInjectableObject)
protected
fFactory: TServiceFactoryServer;
fServer: TRestServer;
public
/// initialize an instance, defining associated dependencies
// - the resolver may be e.g. a TServiceContainer
// - once the DI/IoC is defined, will call the AutoResolve() protected method
// - as called by TServiceFactoryServer.CreateInstance
constructor CreateWithResolverAndRest(aResolver: TInterfaceResolver;
aFactory: TServiceFactoryServer; aServer: TRestServer;
aRaiseEServiceExceptionIfNotFound: boolean = true); virtual;
/// access to the associated interface factory
// - this property will be injected by TServiceFactoryServer.CreateInstance,
// so may be nil if the instance was created outside the SOA context
property Factory: TServiceFactoryServer
read fFactory;
/// access ot the associated REST Server, e.g. to its ORM methods
// - slightly faster than Factory.RestServer
// - this value will be injected by TServiceFactoryServer.CreateInstance,
// so may be nil if the instance was created outside the SOA context
property Server: TRestServer
read fServer;
end;
/// class-reference type (metaclass) of a TInjectableObjectRest type
TInjectableObjectRestClass = class of TInjectableObjectRest;
{ ***************** TServiceFactoryServer Service Provider }
/// server-side service provider uses this to store one internal instance
// - used by TServiceFactoryServer in sicClientDriven, sicPerSession,
// sicPerUser or sicPerGroup mode
TServiceFactoryServerInstance = record
/// the internal Instance ID, as remotely sent in "id":1
InstanceID: TID;
/// GetTickCount64() shr 10 timestamp corresponding to the last access of
// this instance
LastAccess: cardinal;
/// the associated client session
Session: cardinal;
/// the implementation instance itself
Instance: TInterfacedObject;
end;
PServiceFactoryServerInstance = ^TServiceFactoryServerInstance;
/// server-side service provider uses this to store its internal instances
// - used by TServiceFactoryServer in sicClientDriven, sicPerSession,
// sicPerUser or sicPerGroup mode
TServiceFactoryServerInstanceDynArray = array of TServiceFactoryServerInstance;
/// callback called before any interface-method service execution to allow
// its execution
// - see Ctxt.Service, Ctxt.ServiceMethodIndex and Ctxt.ServiceParameters
// to identify the executed method context
// - Method parameter will help identify easily the corresponding method, and
// will contain in fact PServiceMethod(Ctxt.ServiceMethod)^
// - should return TRUE if the method can be executed
// - should return FALSE if the method should not be executed, and set the
// corresponding error to the supplied context e.g.
// ! Ctxt.Error('Unauthorized method',HTTP_NOTALLOWED);
// - i.e. called by TRestServerUriContext.InternalExecuteSoaByInterface
TOnServiceCanExecute = function(Ctxt: TRestServerUriContext;
const Method: TInterfaceMethod): boolean of object;
/// callbacked used by TServiceFactoryServer.RunOnAllInstances method
TOnServiceFactoryServerOne = function(Sender: TServiceFactoryServer;
var Instance: TServiceFactoryServerInstance; var Opaque): integer of object;
/// a service provider implemented on the server side
// - each registered interface has its own TServiceFactoryServer instance,
// available as one TServiceContainerServer item from TRest.Services property
// - will handle the implementation class instances of a given interface
// - by default, all methods are allowed to execution: you can call AllowAll,
// DenyAll, Allow or Deny in order to specify your exact security policy
TServiceFactoryServer = class(TServiceFactoryServerAbstract)
protected
fRestServer: TRestServer; // just a transtyped fResolver value
fInstance: TServiceFactoryServerInstanceDynArray;
fInstances: TDynArrayLocked;
fInstanceCurrentID: TID;
fInstanceCounter: cardinal;
fInstanceTimeOut: cardinal;
fInstanceDeprecatedTix32, fInstanceGCDeprecatedTix32: cardinal;
fInstanceGC: TSynObjectListLocked; // release refcnt>1 in separated lock
fStats: TSynMonitorInputOutputObjArray;
fImplementationClass: TInterfacedClass;
fImplementationClassKind: (
ickBlank, ickPersistent, ickInjectable, ickInjectableRest,
ickFromInjectedResolver, ickFake);
fImplementationClassInterfaceEntry: PInterfaceEntry;
fSharedInterface: IInterface;
fBackgroundThread: TSynBackgroundThreadMethod;
fOnMethodExecute: TOnServiceCanExecute;
fOnExecute: TInterfaceMethodExecuteEventDynArray;
fExecuteLock: TOSLock;
fExecuteCached: TInterfaceMethodExecuteCachedDynArray;
procedure SetServiceLogByIndex(const aMethods: TInterfaceFactoryMethodBits;
const aLogRest: IRestOrm; aLogClass: TOrmServiceLogClass);
procedure SetTimeoutSecInt(value: cardinal);
function GetTimeoutSec: cardinal;
function GetStat(const aMethod: RawUtf8): TSynMonitorInputOutput;
function GetStats(
Ctxt: TRestServerUriContext; MethodIndex: PtrInt): TSynMonitorInputOutput;
function GetInstanceGCCount: integer;
procedure InstanceFree(Obj: TInterfacedObject);
procedure InstanceFreeGC(Obj: TInterfacedObject);
function DoInstanceGC(Force: boolean): PtrInt;
function DoInstanceGCSession(aSessionID: cardinal): integer;
/// called by ExecuteMethod to append input/output params to Sender.TempTextWriter
procedure OnLogRestExecuteMethod(Sender: TInterfaceMethodExecuteRaw;
Step: TInterfaceMethodExecuteEventStep);
/// this method will create an implementation instance
// - reference count will be set to one, in order to allow safe passing
// of the instance into an interface, if AndIncreaseRefCount is TRUE
// - will handle TInterfacedPersistent and TInjectableObject
// as expected, if necessary
function CreateInstance(AndIncreaseRefCount: boolean): TInterfacedObject;
public
/// initialize the service provider on the server side
// - expect an direct server-side implementation class, which may inherit
// from plain TInterfacedClass, TInterfacedPersistent if you
// need an overridden constructor, or TInjectableObject to support DI/IoC
// - for sicClientDriven, sicPerSession, sicPerUser or sicPerGroup modes,
// a time out (in seconds) can be defined (default is 30 minutes) - if the
// specified aTimeOutSec is 0, interface will be forced in sicSingle mode
// - you should usualy have to call the TRestServer.ServiceRegister()
// method instead of calling this constructor directly
constructor Create(aRestServer: TRestServer; aInterface: PRttiInfo;
aInstanceCreation: TServiceInstanceImplementation;
aImplementationClass: TInterfacedClass; const aContractExpected: RawUtf8;
aTimeOutSec: cardinal; aSharedInstance: TInterfacedObject); reintroduce;
/// release all used memory
// - e.g. any internal TServiceFactoryServerInstance instances (any shared
// instance, and all still living instances in sicClientDriven mode)
destructor Destroy; override;
/// you can define here an event to allow/deny execution of any method
// of this service, at runtime
property OnMethodExecute: TOnServiceCanExecute
read fOnMethodExecute write fOnMethodExecute;
/// allow to hook the methods execution
// - several events could be registered, and will be called directly
// before and after method execution
// - if optInterceptInputOutput is defined in Options, then Sender.Input/Output
// fields will contain the execution data context when Hook is called
// - see OnMethodExecute if you want to implement security features
procedure AddInterceptor(const Hook: TOnInterfaceMethodExecute);
/// retrieve an instance of this interface from the server side
// - sicShared mode will retrieve the shared instance
// - sicPerThread mode will retrieve the instance corresponding to the
// current running thread
// - all other kind of instance creation will behave the same as sicSingle
// when accessed directly from this method, i.e. from server side: in fact,
// on the server side, there is no notion of client, session, user nor group
// - if ServiceRunningContext.Factory is nil (i.e. if there is no other
// service context currently associated), this method will also update
// ServiceRunningContext.Factory, so that the implementation method will be able
// to access the associated TRestServer instance if needed
function Get(out Obj): boolean; override;
/// retrieve the published signature of this interface
// - is always available on TServiceFactoryServer, but TServiceFactoryClient
// will be able to retrieve it only if TServiceContainerServer.PublishSignature
// is set to TRUE (which is not the default setting, for security reasons)
function RetrieveSignature: RawUtf8; override;
/// call a given method of this service provider
// - here Ctxt.ServiceMethod points to the corresponding fInterface.Methods[]
// (i.e. excluding _free_/_contract_/_signature_ pseudo-methods)
// - Ctxt.ServiceMethodIndex=0=ord(imFree) will free/release
// the corresponding aInstanceID - as called e.g. from
// $ {"method":"_free_", "params":[], "id":1234}
// - Ctxt.ServiceParameters is e.g. '[1,2]' i.e. a true JSON array, which
// will contain the incoming parameters in the same exact order than the
// corresponding implemented interface method
// - Ctxt.ID is an optional number, to be used in case of sicClientDriven
// kind of Instance creation to identify the corresponding client session
// - returns 200/HTTP_SUCCESS on success, or an HTTP error status, with an
// optional error message in aErrorMsg
// - on success, Ctxt.Call.OutBody shall contain a serialized JSON object
// with one nested result property, which may be a JSON array, containing
// all "var" or "out" parameters values, and then the method main result -
// for instance, ExecuteMethod(..,'[1,2]') over ICalculator.Add will return:
// $ {"result":[3],"id":0}
// the returned "id" number is the Instance identifier to be used for any later
// sicClientDriven remote call - or just 0 in case of sicSingle or sicShared
procedure ExecuteMethod(Ctxt: TRestServerUriContext);
/// call the supplied aEvent callback for all class instances implementing
// this service
// - aEvent should be quick because it is executed with a ReadOnlyLock
function RunOnAllInstances(const aEvent: TOnServiceFactoryServerOne;
var aOpaque): integer;
/// low-level get an implementation Inst.Instance for the given Inst.InstanceID
// - is called by ExecuteMethod() in sicClientDriven mode
// - returns -1 on error, or aMethodIndex for successful execution,
// e.g. 0 after {"method":"_free_".. call
// - otherwise, fill Inst.Instance with the matching implementation (or nil)
function RetrieveInstance(Ctxt: TRestServerUriContext;
var Inst: TServiceFactoryServerInstance; aMethodIndex, aSession: integer): integer;
/// define the the instance life time-out, in seconds
function SetTimeoutSec(value: cardinal): TServiceFactoryServerAbstract; override;
/// log method execution information to a TOrmServiceLog table
function SetServiceLog(const aMethod: array of RawUtf8;
const aLogRest: IRestOrm;
aLogClass: TOrmServiceLogClass = nil): TServiceFactoryServerAbstract; override;
/// low-level method called from client CacheFlush/_ping_ URI
function RenewSession(Ctxt: TRestServerUriContext): integer;
/// make some garbage collection when session is finished
// - return the number of instances released during this process
function OnCloseSession(aSessionID: cardinal): integer;
/// the associated TRestServer instance
property RestServer: TRestServer
read fRestServer;
/// direct access to per-method detailed process statistics
// - this Stats[] array follows Interface.Methods[] order
// - see Stat[] property to retrieve information about a method by name
property Stats: TSynMonitorInputOutputObjArray
read fStats;
/// retrieve detailed statistics about a method use
// - will return a reference to the actual item in Stats[]: caller should
// not free the returned instance
property Stat[const aMethod: RawUtf8]: TSynMonitorInputOutput
read GetStat;
published
/// the class type used to implement this interface
property ImplementationClass: TInterfacedClass
read fImplementationClass;
/// the instance life time-out, in seconds
// - for sicClientDriven, sicPerSession, sicPerUser or sicPerGroup modes
// - raise an exception for other kind of execution
// - you can also use the SetTimeOutSec() fluent function instead
property TimeoutSec: cardinal
read GetTimeoutSec write SetTimeoutSecInt;
/// how many instances are currently hosted by this interface
property InstanceCount: integer
read fInstances.Count;
/// how many instances have been created for this interface since startup
property InstanceCounter: cardinal
read fInstanceCounter;
/// how many deprecated instances are currently pending for this interface
// - is usually 0, unless some pending references are used elsewhere on
// server side (please ensure your code release interfaces ASAP)
property InstanceGCCount: integer
read GetInstanceGCCount;
end;
var
/// global mutex used by optExecGlobalLocked and optFreeGlobalLocked
GlobalInterfaceExecuteMethod: TOSLock;
{ ***************** TServiceContainerServer Services Holder }
type
/// service definition for master/slave replication notifications subscribe
// - implemented by TServiceRecordVersion, as used by
// TRestServer.RecordVersionSynchronizeMasterStart(), and expected by
// TRestServer.RecordVersionSynchronizeSlaveStart()
IServiceRecordVersion = interface(IInvokable)
['{06A355CA-19EB-4CC6-9D87-7B48967D1D9F}']
/// will register the supplied callback for the given table
function Subscribe(const SqlTableName: RawUtf8;
const revision: TRecordVersion;
const callback: IServiceRecordVersionCallback): boolean;
end;
/// event signature triggerred when a callback instance is released
// - used by TServiceContainerServer.OnCallbackReleasedOnClientSide
// and TServiceContainerServer.OnCallbackReleasedOnServerSide event properties
// - the supplied Instance will be a TInterfacedObjectFakeServer, and the
// Callback will be a pointer to the corresponding interface value
// - assigned implementation should be as fast a possible, since this event
// will be executed in a global lock for all server-side callbacks
TOnCallbackReleased = procedure(Sender: TServiceContainer;
Instance: TInterfacedObject; Callback: pointer) of object;
/// how TServiceContainerServer will handle SOA callbacks
// - by default, a callback released on the client side will log a warning
// and continue the execution (relying e.g. on a CallbackReleased() method to
// unsubscribe the event), but coRaiseExceptionIfReleasedByClient can be
// defined to raise an EInterfaceFactoryException in this case
TServiceCallbackOptions = set of (
coRaiseExceptionIfReleasedByClient);
/// a services provider class to be used on the server side
// - this will maintain a list of true implementation classes
// - inherits from TServiceContainerClientAbstract to allow remote access
TServiceContainerServer = class(TServiceContainerClientAbstract)
protected
fRestServer: TRestServer; // set by Create := fOwner as TRestServer
fPublishSignature: boolean;
fConnectionID: TRestConnectionID;
fFakeCallbacks: TSynObjectListLocked; // TInterfacedObjectFakeServer instances
fOnCallbackReleasedOnClientSide: TOnCallbackReleased;
fOnCallbackReleasedOnServerSide: TOnCallbackReleased;
fCallbackOptions: TServiceCallbackOptions;
fCallbacks: array of record
Service: TInterfaceFactory;
Arg: PInterfaceMethodArgument;
end;
fRecordVersionCallback: array of IServiceRecordVersionCallbackDynArray;
fCallbackNamesSorted: TRawUtf8DynArray;
fSessionTimeout: cardinal;
procedure ClearServiceList; override;
function AddServiceInternal(aService: TServiceFactory): PtrInt; override;
// here aFakeInstance are TInterfacedObjectFakeServer instances (not owned)
procedure FakeCallbackAdd(aFakeInstance: TObject);
procedure FakeCallbackRemove(aFakeInstance: TObject);
function GetFakeCallbacksCount: integer;
procedure SetPublishSignature(value: boolean);
procedure RecordVersionCallbackNotify(TableIndex: integer;
Occasion: TOrmOccasion; const DeletedID: TID;
const DeletedRevision: TRecordVersion; const AddUpdateJson: RawUtf8);
public
/// initialize the list
constructor Create(aOwner: TInterfaceResolver); override;
/// finalize the service container
destructor Destroy; override;
/// method called on the server side to register a service via its
// interface(s) and a specified implementation class or a shared
// instance (for sicShared mode)
// - will add a TServiceFactoryServer instance to the internal list
// - will raise an exception on error
// - will return the first of the registered TServiceFactoryServer created
// on success (i.e. the one corresponding to the first item of the aInterfaces
// array), or nil if registration failed (e.g. if any of the supplied interfaces
// is not implemented by the given class)
// - the same implementation class can be used to handle several interfaces
// (just as Delphi allows to do natively)
function AddImplementation(aImplementationClass: TInterfacedClass;
const aInterfaces: array of PRttiInfo;
aInstanceCreation: TServiceInstanceImplementation;
aSharedImplementation: TInterfacedObject;
const aContractExpected: RawUtf8): TServiceFactoryServer;
/// initialize and register a server-side interface callback instance
procedure GetFakeCallback(Ctxt: TRestServerUriContext;
ParamInterfaceInfo: PRttiInfo; FakeID: PtrInt; out Obj);
/// low-level function called from TRestServer.CacheFlush URI method
procedure ReleaseFakeCallback(Ctxt: TRestServerUriContext);
/// purge a fake callback from the internal list
// - called e.g. by ReleaseFakeCallback() or
// RemoveFakeCallbackOnConnectionClose()
procedure RemoveFakeCallback(FakeObj: TObject; {TInterfacedObjectFakeServer}
Ctxt: TRestServerUriContext);
/// purge all fake callbacks on a given connection
procedure RemoveFakeCallbackOnConnectionClose(aConnectionID: TRestConnectionID;
aConnectionOpaque: PRestServerConnectionOpaque);
/// class method able to check if a given server-side callback event fake
// instance has been released on the client side
// - may be used to automatically purge a list of subscribed callbacks,
// e.g. before trigerring the interface instance, and avoid an exception
// - can optionally append the callback class instance information to
// a local ShortString variable, e.g. for logging/debug purposes
class function CallbackReleasedOnClientSide(const callback: IInterface;
callbacktext: PShortString = nil): boolean; overload;
/// class method able to associate an Opaque pointer to a fake callback
// - allow to avoid a lookup in an array e.g. when a callback is released
class procedure CallbackSetOpaque(const callback: IInterface;
Opaque: pointer);
/// class method able to associate an Opaque pointer to a fake callback
class function CallbackGetOpaque(const callback: IInterface): pointer;
/// replace the connection ID of callbacks after a reconnection
// - returns the number of callbacks changed
function FakeCallbackReplaceConnectionID(
aConnectionIDOld, aConnectionIDNew: TRestConnectionID): integer;
/// register a callback interface which will be called each time a write
// operation is performed on a given TOrm with a TRecordVersion field
// - called e.g. by TRestServer.RecordVersionSynchronizeSubscribeMaster
function RecordVersionSynchronizeSubscribeMaster(
TableIndex: integer; RecordVersion: TRecordVersion;
const SlaveCallback: IServiceRecordVersionCallback): boolean;
/// notify any TRecordVersion callback for a table Add/Update from a
// TDocVariant content
// - used e.g. by TRestStorageMongoDB.DocFromJson()
procedure RecordVersionNotifyAddUpdate(Occasion: TOrmOccasion;
TableIndex: integer; const Document: TDocVariantData); overload;
/// notify any TRecordVersion callback for a table Add/Update from a
// TJsonObjectDecoder content
// - used e.g. by TRestStorageMongoDB.DocFromJson()
procedure RecordVersionNotifyAddUpdate(Occasion: TOrmOccasion;
TableIndex: integer; const Decoder: TJsonObjectDecoder); overload;
/// notify any TRecordVersion callback for a table Delete
procedure RecordVersionNotifyDelete(TableIndex: integer;
const ID: TID; const Revision: TRecordVersion);
/// make some garbage collection when session is finished
// - return the number of instances released during this process
function OnCloseSession(aSessionID: cardinal): integer; virtual;
/// log method execution information to a TOrmServiceLog table
// - TServiceFactoryServer.SetServiceLog() will be called for all registered
// interfaced-based services of this container
// - will write to the specified aLogRest instance, and will disable
// writing if aLogRest is nil
// - will write to a (inherited) TOrmServiceLog table, as available in
// TRest's model, unless a dedicated table is specified as aLogClass
// - you could specify a CSV list of method names to be excluded from logging
// (containing e.g. a password or a credit card number), containing either
// the interface name (as 'ICalculator.Add'), or not (as 'Add')
procedure SetServiceLog(const aLogRest: IRestOrm;
aLogClass: TOrmServiceLogClass = nil;
const aExcludedMethodNamesCsv: RawUtf8 = '');
/// defines if the "method":"_signature_" or /root/Interface._signature
// pseudo method is available to retrieve the whole interface signature,
// encoded as a JSON object
// - is set to FALSE by default, for security reasons: only "_contract_"
// pseudo method is available - see TServiceContainer.ContractExpected
property PublishSignature: boolean
read fPublishSignature write SetPublishSignature;
/// the default TServiceFactoryServer.TimeoutSec value
// - default is 30 minutes
// - you can customize each service using its corresponding TimeoutSec property
property SessionTimeout: cardinal
read fSessionTimeout write fSessionTimeout;
/// this event will be launched when a callback interface is notified as
// relased on the Client side
// - as an alternative, you may define the following method on the
// registration service interface type, which will be called when a
// callback registered via this service is released (e.g. to unsubscribe
// the callback from an interface list, via InterfaceArrayDelete):
// ! procedure CallbackReleased(const callback: IInvokable; const interfaceName: RawUtf8);
property OnCallbackReleasedOnClientSide: TOnCallbackReleased
read fOnCallbackReleasedOnClientSide;
/// this event will be launched when a callback interface is relased on
// the Server side
property OnCallbackReleasedOnServerSide: TOnCallbackReleased
read fOnCallbackReleasedOnServerSide;
/// defines how SOA callbacks will be handled
property CallbackOptions: TServiceCallbackOptions
read fCallbackOptions write fCallbackOptions;
published
/// how many interface callbackas are currently registered
property FakeCallbacksCount: integer
read GetFakeCallbacksCount;
end;
{ ***************** Asynchronous REST Synchronisation Classes }
type
/// this class implements a service, which may be called to push notifications
// for master/slave replication
// - as used by TRestServer.RecordVersionSynchronizeMasterStart(), and
// expected by TRestServer.RecordVersionSynchronizeSlaveStart()
TServiceRecordVersion = class(TInjectableObjectRest, IServiceRecordVersion)
public
/// will register the supplied callback for the given table
function Subscribe(const SqlTableName: RawUtf8;
const revision: TRecordVersion;
const callback: IServiceRecordVersionCallback): boolean;
end;
/// this class implements a callback interface, able to write all remote ORM
// notifications to the local DB
// - could be supplied as callback parameter, possibly via WebSockets
// transmission, to TRestServer.RecordVersionSynchronizeSubscribeMaster()
TServiceRecordVersionCallback = class(TInterfacedCallback,
IServiceRecordVersionCallback)
protected
fTable: TOrmClass;
fTableIndex: PtrInt;
fRecordVersionField: TOrmPropInfoRttiRecordVersion;
fBatch: TRestBatch;
fSlave: TRestServer; // fRest is master remote access
fOnNotify: TOnBatchWrite;
// local TOrmTableDeleted.ID follows current Model -> pre-compute offset
fTableDeletedIDOffset: Int64;
procedure SetCurrentRevision(const Revision: TRecordVersion;
Event: TOrmOccasion);
public
/// initialize the instance able to apply callbacks for a given table on
// a local slave REST server from a remote master REST server
// - the optional low-level aOnNotify callback will be triggerred for each
// incoming notification, to track the object changes in real-time
constructor Create(aSlave: TRestServer; aMaster: TRestClientUri;
aTable: TOrmClass; const aOnNotify: TOnBatchWrite); reintroduce;
/// finalize this callback instance
destructor Destroy; override;
/// this event will be raised on any Add on a versioned record
procedure Added(const NewContent: RawJson); virtual;
/// this event will be raised on any Update on a versioned record
procedure Updated(const ModifiedContent: RawJson); virtual;
/// this event will be raised on any Delete on a versioned record
procedure Deleted(const ID: TID; const Revision: TRecordVersion); virtual;
/// match TInterfaceFactory.MethodIndexCurrentFrameCallback signature,
// so that TRestHttpClientWebsockets.CallbackRequest will call it
// - it will create a temporary TRestBatch for the whole "jumbo frame"
procedure CurrentFrame(isLast: boolean); virtual;
/// low-level event handler triggerred by Added/Updated/Deleted methods
property OnNotify: TOnBatchWrite
read fOnNotify write fOnNotify;
end;
const
ORMVERSION_DELETEID_SHIFT = 58;
ORMVERSION_DELETEID_RANGE = Int64(1) shl ORMVERSION_DELETEID_SHIFT;
implementation
uses
mormot.orm.server;
{ ***************** TInjectableObjectRest Service Implementation Parent Class }
{ TInjectableObjectRest }
constructor TInjectableObjectRest.CreateWithResolverAndRest(
aResolver: TInterfaceResolver; aFactory: TServiceFactoryServer;
aServer: TRestServer; aRaiseEServiceExceptionIfNotFound: boolean);
begin
fFactory := aFactory; // may be needed by overriden Create: set before
fServer := aServer;
CreateWithResolver(aResolver, aRaiseEServiceExceptionIfNotFound);
end;
{ ***************** TServiceFactoryServer Service Provider }
{ TServiceFactoryServer }
constructor TServiceFactoryServer.Create(aRestServer: TRestServer;
aInterface: PRttiInfo; aInstanceCreation: TServiceInstanceImplementation;
aImplementationClass: TInterfacedClass; const aContractExpected: RawUtf8;
aTimeOutSec: cardinal; aSharedInstance: TInterfacedObject);
begin
// extract RTTI from the interface
fExecuteLock.Init;
fInstanceGC := TSynObjectListLocked.Create({ownobjects=}false);
fRestServer := aRestServer;
inherited Create(aRestServer, aInterface, aInstanceCreation, aContractExpected);
if fRestServer.MethodAddress(ShortString(InterfaceUri)) <> nil then
EServiceException.RaiseUtf8(
'%.Create: I% URI already exposed by %.% published method',
[self, InterfaceUri, fRestServer, InterfaceUri]);
fImplementationClass := aImplementationClass;
if fImplementationClass.InheritsFrom(TInterfacedObjectFake) then
begin
fImplementationClassKind := ickFake;
if aSharedInstance = nil then
EServiceException.RaiseUtf8('%.Create: no Shared Instance for %/I%',
[self, fImplementationClass, fInterfaceUri]);
if (aSharedInstance as TInterfacedObjectFake).
Factory.InterfaceTypeInfo <> aInterface then
EServiceException.RaiseUtf8(
'%.Create: shared % instance does not implement I%',
[self, fImplementationClass, fInterfaceUri]);
end
else
begin
if aRestServer.Services.Implements(fInterface.InterfaceTypeInfo) then
fImplementationClassKind := ickFromInjectedResolver
else if fImplementationClass.InheritsFrom(TInjectableObjectRest) then
fImplementationClassKind := ickInjectableRest
else if fImplementationClass.InheritsFrom(TInjectableObject) then
fImplementationClassKind := ickInjectable
else if fImplementationClass.InheritsFrom(TInterfacedPersistent) then
fImplementationClassKind := ickPersistent;
fImplementationClassInterfaceEntry := fImplementationClass.
GetInterfaceEntry(fInterface.InterfaceIID);
if fImplementationClassInterfaceEntry = nil then
EServiceException.RaiseUtf8('%.Create: % does not implement I%',
[self, fImplementationClass, fInterfaceUri]);
end;
if (fInterface.MethodIndexCallbackReleased >= 0) and
(InstanceCreation <> sicShared) then
EServiceException.RaiseUtf8(
'%.Create: I%() should be run as sicShared for CallbackReleased method', [self,
fInterface.Methods[fInterface.MethodIndexCallbackReleased].InterfaceDotMethodName]);
// initialize the shared instance or client driven parameters
case InstanceCreation of
sicShared:
begin
if aSharedInstance = nil then
fSharedInstance := CreateInstance(false)
else if aSharedInstance.InheritsFrom(fImplementationClass) then
fSharedInstance := aSharedInstance
else
EServiceException.RaiseUtf8(
'%.Create: % shared instance does not inherit from %',
[self, aSharedInstance, fImplementationClass]);
if fImplementationClassKind <> ickFake then
if (fSharedInstance = nil) or
not GetInterfaceFromEntry(fSharedInstance,
fImplementationClassInterfaceEntry, fSharedInterface) then
EServiceException.RaiseUtf8(
'%.Create: % is no implementation of I%',
[self, fSharedInstance, fInterfaceUri]);
end;
sicClientDriven,
sicPerSession,
sicPerUser,
sicPerGroup,
sicPerThread:
if (aTimeOutSec = 0) and
(InstanceCreation <> sicPerThread) then
fInstanceCreation := sicSingle
else
begin
// only instances list is protected, since client calls shall be pipelined
fInstances.DynArray.InitSpecific(
TypeInfo(TServiceFactoryServerInstanceDynArray),
fInstance, ptQWord, @fInstances.Count);
// fInstance[] are compared/sorted by InstanceID: TID/ptQWord
fInstanceTimeOut := aTimeOutSec;
end;
end;
SetLength(fStats, fInterface.MethodsCount);
// prepare some reusable execution context (avoid most memory allocations)
TInterfaceMethodExecuteCached.Prepare(fInterface, fExecuteCached);
end;
procedure TServiceFactoryServer.SetTimeoutSecInt(value: cardinal);
begin
if (self = nil) or
(InstanceCreation in SERVICE_IMPLEMENTATION_NOID) then
EServiceException.RaiseUtf8('%.SetTimeoutSecInt(%) with %',
[self, value, ToText(InstanceCreation)^]);
fInstanceTimeOut := value;
end;
function TServiceFactoryServer.SetTimeoutSec(
value: cardinal): TServiceFactoryServerAbstract;
begin
SetTimeoutSecInt(value);
result := self;
end;
function TServiceFactoryServer.GetTimeoutSec: cardinal;
begin
if (self = nil) or
(InstanceCreation in SERVICE_IMPLEMENTATION_NOID) then
result := 0
else
result := fInstanceTimeout;
end;
function TServiceFactoryServer.GetStat(
const aMethod: RawUtf8): TSynMonitorInputOutput;
begin
result := fStats[fInterface.CheckMethodIndex(aMethod)];
end;
function TServiceFactoryServer.GetStats(
Ctxt: TRestServerUriContext; MethodIndex: PtrInt): TSynMonitorInputOutput;
begin
result := nil;
if (MethodIndex < 0) or
not (mlInterfaces in fRestServer.StatLevels) then
exit;
result := fStats[MethodIndex];
if result = nil then
begin
fRestServer.Stats.Lock;
try
result := fStats[MethodIndex];
if result = nil then
begin
result := TSynMonitorInputOutput.Create(
Ctxt.ServiceMethod^.InterfaceDotMethodName);
fStats[MethodIndex] := result;
end;
finally
fRestServer.Stats.UnLock;
end;
end;
result.Processing := true;
end;
function TServiceFactoryServer.GetInstanceGCCount: integer;
begin
result := fInstanceGC.Count;
end;
destructor TServiceFactoryServer.Destroy;
var
endtix: Int64;
i: PtrInt;
begin
// clean up any pending implementation instances
endtix := GetTickCount64 + 5000; // paranoid wait for refcnt=1 from services
repeat
DoInstanceGC({force=}false);
if fInstanceGC.Count = 0 then
break;
SleepHiRes(1);
if GetTickCount64 > endtix then
begin
fRestServer.InternalLog('%.Destroy: I% InstanceGC.Count=% timeout - ' +
'you should fix your implementation to release its dependencies',
[ClassType, InterfaceUri, fInstanceGC.Count], sllWarning);
DoInstanceGC({force=}true); // may GPF but at least won't leak memory
break;
end;
until false;
fInstanceGC.Free;
if fInstances.Count > 0 then
begin
fRestServer.InternalLog('%.Destroy: finalize % I% %',
[ClassType, fInstances.Count, InterfaceUri, ToText(fInstanceCreation)^]);
for i := 0 to fInstances.Count - 1 do
InstanceFree(fInstance[i].Instance);
end;
// finalize service execution context
FreeAndNil(fBackgroundThread);
ObjArrayClear(fStats, true);
ObjArrayClear(fExecuteCached);
inherited Destroy;
fExecuteLock.Done;
end;
function TServiceFactoryServer.DoInstanceGC(Force: boolean): PtrInt;
var
obj: TInterfacedObject;
pending: TPointerDynArray;
i: PtrInt;
begin
// delete when RefCount = 1 (for optFreeInMainThread/PerInterfaceThread)
result := 0;
if fInstanceGC.Count = 0 then
exit;
fInstanceGC.Safe.WriteLock;
try
for i := fInstanceGC.Count - 1 downto 0 do // downto for proper Delete(i)
begin
obj := fInstanceGC.List[i];
if Force or
(obj.RefCount = 1) then
begin
if pending = nil then
SetLength(pending, i + 1);
pending[result] := obj; // free outside GC lock
inc(result);
fInstanceGC.Delete(i); // remove from list
end;
end;
finally
fInstanceGC.Safe.WriteUnLock;
end;
if result = 0 then
exit;
// the instances are actually released outside of fInstanceGC.Safe lock
for i := 0 to result - 1 do
InstanceFree(pending[i]); // may run in a background thread
fRestServer.Internallog('%.DoInstanceGC=% for I% %',
[ClassType, result, InterfaceUri, ToText(fInstanceCreation)^]);
end;
procedure TServiceFactoryServer.InstanceFree(Obj: TInterfacedObject);
procedure DoRelease;
var
start, stop: Int64;
timeout: boolean;
begin
timeout := (optFreeTimeout in fAnyOptions) and
(fRestServer.ServiceReleaseTimeoutMicrosec > 0);
if timeout then // release should be fast enough
QueryPerformanceMicroSeconds(start);
IInterface(Obj)._Release;
if not timeout then
exit;
QueryPerformanceMicroSeconds(stop);
dec(stop, start{%H-});
if stop > fRestServer.ServiceReleaseTimeoutMicrosec then
fRestServer.Internallog('%.InstanceFree: I%._Release took %',
[ClassType, InterfaceUri, MicroSecToString(stop)], sllWarning);
end;
begin
if Obj <> nil then
try
if (optFreeInMainThread in fAnyOptions) and
(GetCurrentThreadID <> MainThreadID) then
BackgroundExecuteInstanceRelease(Obj, nil)
else if (optFreeInPerInterfaceThread in fAnyOptions) and
Assigned(fBackgroundThread) then
BackgroundExecuteInstanceRelease(Obj, fBackgroundThread)
else if optFreeGlobalLocked in fAnyOptions then
begin
GlobalInterfaceExecuteMethod.Lock;
try
DoRelease;
finally
GlobalInterfaceExecuteMethod.UnLock;
end;
end
else if optFreeLockedPerInterface in fAnyOptions then
begin
fExecuteLock.Lock;
try
DoRelease;
finally
fExecuteLock.UnLock;
end;
end
else
DoRelease;
except
on E: Exception do
fRestServer.Internallog('%.InstanceFree: ignored % exception ' +
'during I%._Release', [PClass(self)^, PClass(E)^, InterfaceUri], sllDebug);
end;
end;
procedure TServiceFactoryServer.InstanceFreeGC(Obj: TInterfacedObject);
begin
if Obj <> nil then
if (optFreeDelayed in fAnyOptions) or
(Obj.RefCount > 1) then // RefCount>1 if still used on server side
fInstanceGC.Add(Obj) // delay the actual release
else
InstanceFree(Obj); // immediate _Release - maybe on specific thread
end;
function TServiceFactoryServer.DoInstanceGCSession(aSessionID: cardinal): integer;
var
i: PtrInt;
begin
result := 0;
fInstances.Safe.WriteLock;
try
for i := fInstances.Count - 1 downto 0 do // downto for proper Delete(i)
if fInstance[i].Session = aSessionID then
begin
fInstanceGC.Add(fInstance[i].Instance);
fInstances.DynArray.Delete(i);
inc(result);
end;
finally
fInstances.Safe.WriteUnLock;
end;
if result <> 0 then
DoInstanceGC({force=}false); // release now outside of the lock
end;
function TServiceFactoryServer.Get(out Obj): boolean;
var
Inst: TServiceFactoryServerInstance;
begin
result := false;
if self = nil then
exit;
case fInstanceCreation of
sicShared:
if fSharedInterface <> nil then
begin
// copy implementation interface
IInterface(Obj) := fSharedInterface;
result := true;
end;
sicPerThread:
begin
// use SERVICE_PSEUDO_METHOD_COUNT to create an instance if none
Inst.Instance := nil;
Inst.InstanceID := PtrUInt(GetCurrentThreadId);
if (RetrieveInstance(nil, Inst, SERVICE_PSEUDO_METHOD_COUNT, 0) >= 0) and
(Inst.Instance <> nil) then
result := GetInterfaceFromEntry(Inst.Instance,
fImplementationClassInterfaceEntry, Obj);
end;
else
begin
// no user/group/session on pure server-side -> always sicSingle
Inst.Instance := CreateInstance(false);
if Inst.Instance <> nil then
result := GetInterfaceFromEntry(Inst.Instance,
fImplementationClassInterfaceEntry, Obj);
end;
end;
if result then
with PServiceRunningContext(PerThreadRunningContextAddress)^ do
if Factory = nil then
Factory := self;
end;
function TServiceFactoryServer.RetrieveSignature: RawUtf8;
begin
if self = nil then
result := ''
else
result := fContract; // just return the current value
end;
function TServiceFactoryServer.RenewSession(Ctxt: TRestServerUriContext): integer;
var
tix, sess: cardinal;
i: integer;
P: PServiceFactoryServerInstance;
begin
result := 0;
if (self = nil) or
(Ctxt = nil) or
(fInstances.Count = 0) or
(Ctxt.Session <= CONST_AUTHENTICATION_NOT_USED) or
not (fInstanceCreation in [sicClientDriven, sicPerSession]) then
exit;
tix := Ctxt.TickCount64 shr 10;
fInstances.Safe.ReadLock;
try
P := pointer(fInstance);
sess := Ctxt.Session;
for i := 1 to fInstances.Count do
begin
if P^.Session = sess then
begin
P^.LastAccess := tix;
inc(result);
end;
inc(P);
end;
finally
fInstances.Safe.ReadUnLock;
end;
end;
function TServiceFactoryServer.OnCloseSession(aSessionID: cardinal): integer;
var
inst: TServiceFactoryServerInstance;
begin
result := 0;
if fInstances.Count > 0 then
case InstanceCreation of
sicPerSession:
begin
inst.InstanceID := aSessionID;
RetrieveInstance(nil, inst, ord(imFree), aSessionID); // O(log(n))
end;
sicClientDriven:
// release ASAP if was not notified by client
result := DoInstanceGCSession(aSessionID);
end;
end;