16
16
// limitations under the License.
17
17
18
18
using System ;
19
+ using System . Collections . Concurrent ;
19
20
using System . Collections . Generic ;
20
21
using System . Diagnostics ;
21
22
using System . Diagnostics . Contracts ;
22
23
using System . Linq ;
23
- using System . Net ;
24
24
using System . Reflection ;
25
25
using System . Text . Json ;
26
26
using System . Text . RegularExpressions ;
@@ -199,6 +199,7 @@ static ResourceManager()
199
199
private Task watcherTask ;
200
200
private CancellationTokenSource watcherTcs ;
201
201
private EventQueue < TEntity , TController > eventQueue ;
202
+ private ConcurrentDictionary < string , CancellationTokenSource > reconcileTokens ;
202
203
203
204
/// <summary>
204
205
/// Default constructor.
@@ -684,16 +685,6 @@ private async Task WatchAsync(CancellationToken cancellationToken)
684
685
{
685
686
await SyncContext . Clear ;
686
687
687
- //-----------------------------------------------------------------
688
- // We're going to use this dictionary to keep track of the [Status]
689
- // property of the resources we're watching so we can distinguish
690
- // between changes to the status vs. changes to anything else in
691
- // the resource.
692
- //
693
- // The dictionary simply holds the status property serialized to
694
- // JSON, with these keyed by resource name. Note that the resource
695
- // entities might not have a [Status] property.
696
-
697
688
var entityType = typeof ( TEntity ) ;
698
689
var statusGetter = entityType . GetProperty ( "Status" ) ? . GetMethod ;
699
690
@@ -747,9 +738,24 @@ private async Task WatchAsync(CancellationToken cancellationToken)
747
738
await finalizerManager . RegisterAllFinalizersAsync ( resource , cancellationToken : cancellationToken ) ;
748
739
}
749
740
741
+ var cts = CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken ) ;
742
+
743
+ reconcileTokens . AddOrUpdate (
744
+ key : resource . Uid ( ) ,
745
+ addValueFactory : ( id ) =>
746
+ {
747
+ return cts ;
748
+
749
+ } ,
750
+ updateValueFactory : ( id , token ) =>
751
+ {
752
+ return cts ;
753
+
754
+ } ) ;
755
+
750
756
using ( metrics . ReconcileTimeSeconds . NewTimer ( ) )
751
757
{
752
- result = await CreateController ( scope . ServiceProvider ) . ReconcileAsync ( resource , cancellationToken : cancellationToken ) ;
758
+ result = await CreateController ( scope . ServiceProvider ) . ReconcileAsync ( resource , cancellationToken : cts . Token ) ;
753
759
}
754
760
}
755
761
catch ( RequeueException e )
@@ -782,6 +788,13 @@ await eventQueue.RequeueAsync(
782
788
return ;
783
789
784
790
}
791
+ catch ( OperationCanceledException e )
792
+ {
793
+ logger ? . LogErrorEx ( e ) ;
794
+ metrics . ReconcileErrorsTotal . Inc ( ) ;
795
+
796
+ return ;
797
+ }
785
798
catch ( Exception e )
786
799
{
787
800
metrics . ReconcileErrorsTotal . Inc ( ) ;
@@ -804,6 +817,10 @@ await eventQueue.RequeueAsync(
804
817
return ;
805
818
}
806
819
}
820
+ finally
821
+ {
822
+ reconcileTokens . TryRemove ( resource . Uid ( ) , out _ ) ;
823
+ }
807
824
808
825
break ;
809
826
@@ -813,6 +830,11 @@ await eventQueue.RequeueAsync(
813
830
{
814
831
metrics . DeleteEventsTotal ? . Inc ( ) ;
815
832
833
+ if ( reconcileTokens . TryGetValue ( resource . Uid ( ) , out var token ) )
834
+ {
835
+ await token . CancelAsync ( ) ;
836
+ }
837
+
816
838
using ( metrics . DeleteTimeSeconds . NewTimer ( ) )
817
839
{
818
840
await CreateController ( scope . ServiceProvider ) . DeletedAsync ( resource , cancellationToken : cancellationToken ) ;
@@ -838,9 +860,24 @@ await eventQueue.RequeueAsync(
838
860
{
839
861
metrics . ReconcileEventsTotal ? . Inc ( ) ;
840
862
863
+ var cts = CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken ) ;
864
+
865
+ reconcileTokens . AddOrUpdate (
866
+ key : resource . Uid ( ) ,
867
+ addValueFactory : ( id ) =>
868
+ {
869
+ return cts ;
870
+
871
+ } ,
872
+ updateValueFactory : ( id , token ) =>
873
+ {
874
+ return cts ;
875
+
876
+ } ) ;
877
+
841
878
using ( metrics . ReconcileTimeSeconds . NewTimer ( ) )
842
879
{
843
- result = await CreateController ( scope . ServiceProvider ) . ReconcileAsync ( resource , cancellationToken : cancellationToken ) ;
880
+ result = await CreateController ( scope . ServiceProvider ) . ReconcileAsync ( resource , cancellationToken : cts . Token ) ;
844
881
}
845
882
}
846
883
catch ( RequeueException e )
@@ -872,6 +909,13 @@ await eventQueue.RequeueAsync(
872
909
873
910
return ;
874
911
}
912
+ catch ( OperationCanceledException e )
913
+ {
914
+ logger ? . LogErrorEx ( e ) ;
915
+ metrics . ReconcileErrorsTotal . Inc ( ) ;
916
+
917
+ return ;
918
+ }
875
919
catch ( Exception e )
876
920
{
877
921
metrics . ReconcileErrorsTotal ? . Inc ( ) ;
@@ -894,6 +938,11 @@ await eventQueue.RequeueAsync(
894
938
return ;
895
939
}
896
940
}
941
+ finally
942
+ {
943
+ reconcileTokens . TryRemove ( resource . Uid ( ) , out _ ) ;
944
+ }
945
+
897
946
break ;
898
947
899
948
case ModifiedEventType . Finalizing :
@@ -961,16 +1010,35 @@ await eventQueue.RequeueAsync(
961
1010
{
962
1011
metrics . StatusModifiedTotal ? . Inc ( ) ;
963
1012
1013
+ var cts = CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken ) ;
1014
+
1015
+ reconcileTokens . AddOrUpdate (
1016
+ key : resource . Uid ( ) ,
1017
+ addValueFactory : ( id ) =>
1018
+ {
1019
+ return cts ;
1020
+
1021
+ } ,
1022
+ updateValueFactory : ( id , token ) =>
1023
+ {
1024
+ return cts ;
1025
+
1026
+ } ) ;
1027
+
964
1028
using ( metrics . StatusModifiedTimeSeconds . NewTimer ( ) )
965
1029
{
966
- await CreateController ( scope . ServiceProvider ) . StatusModifiedAsync ( resource , cancellationToken : cancellationToken ) ;
1030
+ await CreateController ( scope . ServiceProvider ) . StatusModifiedAsync ( resource , cancellationToken : cts . Token ) ;
967
1031
}
968
1032
}
969
1033
catch ( Exception e )
970
1034
{
971
1035
metrics . StatusModifiedErrorsTotal ? . Inc ( ) ;
972
1036
logger ? . LogErrorEx ( e ) ;
973
1037
}
1038
+ finally
1039
+ {
1040
+ reconcileTokens . TryRemove ( resource . Uid ( ) , out _ ) ;
1041
+ }
974
1042
}
975
1043
else
976
1044
{
0 commit comments