17
17
18
18
package org .apache .shardingsphere .infra .instance ;
19
19
20
- import com .google .common .base .Preconditions ;
21
20
import lombok .AccessLevel ;
22
21
import lombok .Getter ;
23
22
import org .apache .shardingsphere .infra .config .mode .ModeConfiguration ;
24
- import org .apache .shardingsphere .infra .instance .metadata .InstanceMetaData ;
25
- import org .apache .shardingsphere .infra .instance .metadata .InstanceType ;
23
+ import org .apache .shardingsphere .infra .exception .core .ShardingSpherePreconditions ;
26
24
import org .apache .shardingsphere .infra .instance .workerid .WorkerIdGenerator ;
27
25
import org .apache .shardingsphere .infra .lock .LockContext ;
28
26
import org .apache .shardingsphere .infra .state .instance .InstanceState ;
29
27
import org .apache .shardingsphere .infra .util .eventbus .EventBusContext ;
30
28
31
29
import javax .annotation .concurrent .ThreadSafe ;
32
30
import java .util .Collection ;
33
- import java .util .LinkedHashMap ;
34
- import java .util .Map ;
35
31
import java .util .Optional ;
36
32
import java .util .Properties ;
37
33
import java .util .concurrent .CopyOnWriteArrayList ;
@@ -51,17 +47,15 @@ public final class ComputeNodeInstanceContext {
51
47
52
48
private final ModeConfiguration modeConfiguration ;
53
49
54
- @ SuppressWarnings ("rawtypes" )
55
50
@ Getter (AccessLevel .NONE )
56
- private final AtomicReference <LockContext > lockContext = new AtomicReference <>();
51
+ private final AtomicReference <LockContext <?> > lockContext = new AtomicReference <>();
57
52
58
53
private final EventBusContext eventBusContext ;
59
54
60
55
private final Collection <ComputeNodeInstance > allClusterInstances = new CopyOnWriteArrayList <>();
61
56
62
- @ SuppressWarnings ("rawtypes" )
63
57
public ComputeNodeInstanceContext (final ComputeNodeInstance instance , final WorkerIdGenerator workerIdGenerator ,
64
- final ModeConfiguration modeConfig , final LockContext lockContext , final EventBusContext eventBusContext ) {
58
+ final ModeConfiguration modeConfig , final LockContext <?> lockContext , final EventBusContext eventBusContext ) {
65
59
this .instance = instance ;
66
60
this .workerIdGenerator .set (workerIdGenerator );
67
61
this .modeConfiguration = modeConfig ;
@@ -79,86 +73,76 @@ public ComputeNodeInstanceContext(final ComputeNodeInstance instance, final Mode
79
73
* @param workerIdGenerator worker id generator
80
74
* @param lockContext lock context
81
75
*/
82
- @ SuppressWarnings ("rawtypes" )
83
- public void init (final WorkerIdGenerator workerIdGenerator , final LockContext lockContext ) {
76
+ public void init (final WorkerIdGenerator workerIdGenerator , final LockContext <?> lockContext ) {
84
77
this .workerIdGenerator .set (workerIdGenerator );
85
78
this .lockContext .set (lockContext );
86
79
}
87
80
88
81
/**
89
82
* Update instance status.
90
83
*
91
- * @param id instance ID
84
+ * @param instanceId instance ID
92
85
* @param status status
93
86
*/
94
- public void updateStatus (final String id , final String status ) {
87
+ public void updateStatus (final String instanceId , final String status ) {
95
88
Optional <InstanceState > instanceState = InstanceState .get (status );
96
89
if (!instanceState .isPresent ()) {
97
90
return ;
98
91
}
99
- if (instance .getMetaData ().getId ().equals (id )) {
92
+ if (instance .getMetaData ().getId ().equals (instanceId )) {
100
93
instance .switchState (instanceState .get ());
101
94
}
102
- updateRelatedComputeNodeInstancesStatus (id , instanceState .get ());
103
- }
104
-
105
- private void updateRelatedComputeNodeInstancesStatus (final String instanceId , final InstanceState instanceState ) {
106
- for (ComputeNodeInstance each : allClusterInstances ) {
107
- if (each .getMetaData ().getId ().equals (instanceId )) {
108
- each .switchState (instanceState );
109
- }
110
- }
95
+ allClusterInstances .stream ().filter (each -> each .getMetaData ().getId ().equals (instanceId )).forEach (each -> each .switchState (instanceState .get ()));
111
96
}
112
97
113
98
/**
114
- * Update instance worker id .
99
+ * Update instance labels .
115
100
*
116
- * @param instanceId instance id
117
- * @param workerId worker id
101
+ * @param instanceId instance ID
102
+ * @param labels labels
118
103
*/
119
- public void updateWorkerId (final String instanceId , final Integer workerId ) {
104
+ public void updateLabels (final String instanceId , final Collection < String > labels ) {
120
105
if (instance .getMetaData ().getId ().equals (instanceId )) {
121
- instance . setWorkerId ( workerId );
106
+ updateLabels ( instance , labels );
122
107
}
123
- allClusterInstances .stream ().filter (each -> each .getMetaData ().getId ().equals (instanceId )).forEach (each -> each .setWorkerId (workerId ));
108
+ allClusterInstances .stream ().filter (each -> each .getMetaData ().getId ().equals (instanceId )).forEach (each -> updateLabels (each , labels ));
109
+ }
110
+
111
+ private void updateLabels (final ComputeNodeInstance computeNodeInstance , final Collection <String > labels ) {
112
+ computeNodeInstance .getLabels ().clear ();
113
+ computeNodeInstance .getLabels ().addAll (labels );
124
114
}
125
115
126
116
/**
127
- * Update instance label .
117
+ * Update instance worker ID .
128
118
*
129
- * @param instanceId instance id
130
- * @param labels collection of label
119
+ * @param instanceId instance ID
120
+ * @param workerId worker ID
131
121
*/
132
- public void updateLabel (final String instanceId , final Collection < String > labels ) {
122
+ public void updateWorkerId (final String instanceId , final Integer workerId ) {
133
123
if (instance .getMetaData ().getId ().equals (instanceId )) {
134
- instance .getLabels ().clear ();
135
- instance .getLabels ().addAll (labels );
136
- }
137
- for (ComputeNodeInstance each : allClusterInstances ) {
138
- if (each .getMetaData ().getId ().equals (instanceId )) {
139
- each .getLabels ().clear ();
140
- each .getLabels ().addAll (labels );
141
- }
124
+ instance .setWorkerId (workerId );
142
125
}
126
+ allClusterInstances .stream ().filter (each -> each .getMetaData ().getId ().equals (instanceId )).forEach (each -> each .setWorkerId (workerId ));
143
127
}
144
128
145
129
/**
146
- * Get worker id .
130
+ * Get worker ID .
147
131
*
148
- * @return worker id
132
+ * @return worker ID
149
133
*/
150
134
public int getWorkerId () {
151
135
return instance .getWorkerId ();
152
136
}
153
137
154
138
/**
155
- * Generate worker id .
139
+ * Generate worker ID .
156
140
*
157
141
* @param props properties
158
- * @return worker id
142
+ * @return worker ID
159
143
*/
160
144
public int generateWorkerId (final Properties props ) {
161
- Preconditions . checkArgument (workerIdGenerator .get () != null , "Worker id generator is not initialized." );
145
+ ShardingSpherePreconditions . checkNotNull (workerIdGenerator .get (), () -> new IllegalArgumentException ( "Worker id generator is not initialized." ) );
162
146
int result = workerIdGenerator .get ().generate (props );
163
147
instance .setWorkerId (result );
164
148
return result ;
@@ -184,26 +168,9 @@ public void deleteComputeNodeInstance(final ComputeNodeInstance instance) {
184
168
}
185
169
186
170
/**
187
- * Get compute node instances by instance type and labels.
188
- *
189
- * @param instanceType instance type
190
- * @param labels collection of contained label
191
- * @return compute node instances
192
- */
193
- public Map <String , InstanceMetaData > getAllClusterInstances (final InstanceType instanceType , final Collection <String > labels ) {
194
- Map <String , InstanceMetaData > result = new LinkedHashMap <>(allClusterInstances .size (), 1F );
195
- for (ComputeNodeInstance each : allClusterInstances ) {
196
- if (each .getMetaData ().getType () == instanceType && labels .stream ().anyMatch (each .getLabels ()::contains )) {
197
- result .put (each .getMetaData ().getId (), each .getMetaData ());
198
- }
199
- }
200
- return result ;
201
- }
202
-
203
- /**
204
- * Get compute node instance by instance id.
171
+ * Get compute node instance.
205
172
*
206
- * @param instanceId instance id
173
+ * @param instanceId instance ID
207
174
* @return compute node instance
208
175
*/
209
176
public Optional <ComputeNodeInstance > getComputeNodeInstanceById (final String instanceId ) {
@@ -216,8 +183,7 @@ public Optional<ComputeNodeInstance> getComputeNodeInstanceById(final String ins
216
183
* @return lock context
217
184
* @throws IllegalStateException if lock context is not initialized
218
185
*/
219
- @ SuppressWarnings ("rawtypes" )
220
- public LockContext getLockContext () throws IllegalStateException {
186
+ public LockContext <?> getLockContext () throws IllegalStateException {
221
187
return Optional .ofNullable (lockContext .get ()).orElseThrow (() -> new IllegalStateException ("Lock context is not initialized." ));
222
188
}
223
189
}
0 commit comments