24
24
import jakarta .inject .Inject ;
25
25
import jakarta .inject .Singleton ;
26
26
import java .net .URI ;
27
+ import java .net .URLEncoder ;
28
+ import java .nio .charset .StandardCharsets ;
27
29
import java .util .Arrays ;
28
30
import java .util .List ;
29
31
import java .util .Optional ;
@@ -71,9 +73,14 @@ public Flux<String> getSubjects(String kafkaCluster) {
71
73
*/
72
74
public Mono <SchemaResponse > getSubject (String kafkaCluster , String subject , String version ) {
73
75
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
76
+ String encodedSubject = URLEncoder .encode (subject , StandardCharsets .UTF_8 );
77
+ String encodedVersion = URLEncoder .encode (version , StandardCharsets .UTF_8 );
78
+
74
79
HttpRequest <?> request = HttpRequest .GET (
75
- URI .create (StringUtils .prependUri (config .getUrl (), SUBJECTS + subject + VERSIONS + version )))
80
+ URI .create (StringUtils .prependUri (config .getUrl (),
81
+ SUBJECTS + encodedSubject + VERSIONS + encodedVersion )))
76
82
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
83
+
77
84
return Mono .from (httpClient .retrieve (request , SchemaResponse .class ))
78
85
.onErrorResume (HttpClientResponseException .class ,
79
86
ex -> ex .getStatus ().equals (HttpStatus .NOT_FOUND ) ? Mono .empty () : Mono .error (ex ));
@@ -88,15 +95,18 @@ public Mono<SchemaResponse> getSubject(String kafkaCluster, String subject, Stri
88
95
*/
89
96
public Flux <SchemaResponse > getAllSubjectVersions (String kafkaCluster , String subject ) {
90
97
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
98
+ String encodedSubject = URLEncoder .encode (subject , StandardCharsets .UTF_8 );
99
+
91
100
HttpRequest <?> request = HttpRequest .GET (
92
- URI .create (StringUtils .prependUri (config .getUrl (), SUBJECTS + subject + "/versions" )))
101
+ URI .create (StringUtils .prependUri (config .getUrl (),
102
+ SUBJECTS + encodedSubject + "/versions" )))
93
103
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
94
104
95
105
return Flux .from (httpClient .retrieve (request , Integer [].class ))
96
106
.flatMap (ids -> Flux .fromIterable (Arrays .asList (ids ))
97
107
.flatMap (id -> {
98
- HttpRequest <?> requestVersion = HttpRequest .GET (
99
- URI . create ( StringUtils . prependUri ( config . getUrl (), SUBJECTS + subject + VERSIONS + id )))
108
+ HttpRequest <?> requestVersion = HttpRequest .GET (URI . create ( StringUtils . prependUri ( config . getUrl (),
109
+ SUBJECTS + encodedSubject + VERSIONS + id )))
100
110
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
101
111
102
112
return httpClient .retrieve (requestVersion , SchemaResponse .class );
@@ -115,10 +125,13 @@ public Flux<SchemaResponse> getAllSubjectVersions(String kafkaCluster, String su
115
125
*/
116
126
public Mono <SchemaResponse > register (String kafkaCluster , String subject , SchemaRequest body ) {
117
127
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
118
- HttpRequest <?> request =
119
- HttpRequest .POST (URI .create (StringUtils .prependUri (config .getUrl (), SUBJECTS + subject + "/versions" )),
120
- body )
121
- .basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
128
+ String encodedSubject = URLEncoder .encode (subject , StandardCharsets .UTF_8 );
129
+
130
+ HttpRequest <?> request = HttpRequest .POST (
131
+ URI .create (StringUtils .prependUri (config .getUrl (),
132
+ SUBJECTS + encodedSubject + "/versions" )), body )
133
+ .basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
134
+
122
135
return Mono .from (httpClient .retrieve (request , SchemaResponse .class ));
123
136
}
124
137
@@ -132,9 +145,13 @@ public Mono<SchemaResponse> register(String kafkaCluster, String subject, Schema
132
145
*/
133
146
public Mono <Integer []> deleteSubject (String kafkaCluster , String subject , boolean hardDelete ) {
134
147
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
148
+ String encodedSubject = URLEncoder .encode (subject , StandardCharsets .UTF_8 );
149
+
135
150
MutableHttpRequest <?> request = HttpRequest .DELETE (
136
- URI .create (StringUtils .prependUri (config .getUrl (), SUBJECTS + subject + "?permanent=" + hardDelete )))
151
+ URI .create (StringUtils .prependUri (config .getUrl (),
152
+ SUBJECTS + encodedSubject + "?permanent=" + hardDelete )))
137
153
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
154
+
138
155
return Mono .from (httpClient .retrieve (request , Integer [].class ));
139
156
}
140
157
@@ -147,13 +164,16 @@ public Mono<Integer[]> deleteSubject(String kafkaCluster, String subject, boolea
147
164
* @param hardDelete Should the subject be hard deleted or not
148
165
* @return The version of the deleted subject
149
166
*/
150
- public Mono <Integer > deleteSubjectVersion (String kafkaCluster , String subject , String version ,
151
- boolean hardDelete ) {
167
+ public Mono <Integer > deleteSubjectVersion (String kafkaCluster , String subject , String version , boolean hardDelete ) {
152
168
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
169
+ String encodedSubject = URLEncoder .encode (subject , StandardCharsets .UTF_8 );
170
+ String encodedVersion = URLEncoder .encode (version , StandardCharsets .UTF_8 );
171
+
153
172
MutableHttpRequest <?> request = HttpRequest .DELETE (
154
- URI .create (StringUtils .prependUri (config .getUrl (), SUBJECTS + subject + VERSIONS + version
155
- + "?permanent=" + hardDelete )))
173
+ URI .create (StringUtils .prependUri (config .getUrl (),
174
+ SUBJECTS + encodedSubject + VERSIONS + encodedVersion + "?permanent=" + hardDelete )))
156
175
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
176
+
157
177
return Mono .from (httpClient .retrieve (request , Integer .class ));
158
178
}
159
179
@@ -165,14 +185,17 @@ public Mono<Integer> deleteSubjectVersion(String kafkaCluster, String subject, S
165
185
* @param body The request
166
186
* @return The schema compatibility validation
167
187
*/
168
- public Mono <SchemaCompatibilityCheckResponse > validateSchemaCompatibility (String kafkaCluster , String subject ,
188
+ public Mono <SchemaCompatibilityCheckResponse > validateSchemaCompatibility (String kafkaCluster ,
189
+ String subject ,
169
190
SchemaRequest body ) {
170
191
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
171
- HttpRequest <?> request = HttpRequest .POST (URI .create (
172
- StringUtils .prependUri (config .getUrl (), "/compatibility/subjects/" + subject
173
- + "/versions?verbose=true" )),
174
- body )
192
+ String encodedSubject = URLEncoder .encode (subject , StandardCharsets .UTF_8 );
193
+
194
+ HttpRequest <?> request = HttpRequest .POST (
195
+ URI .create (StringUtils .prependUri (config .getUrl (),
196
+ "/compatibility/subjects/" + encodedSubject + "/versions?verbose=true" )), body )
175
197
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
198
+
176
199
return Mono .from (httpClient .retrieve (request , SchemaCompatibilityCheckResponse .class ))
177
200
.onErrorResume (HttpClientResponseException .class ,
178
201
ex -> ex .getStatus ().equals (HttpStatus .NOT_FOUND ) ? Mono .empty () : Mono .error (ex ));
@@ -186,12 +209,17 @@ public Mono<SchemaCompatibilityCheckResponse> validateSchemaCompatibility(String
186
209
* @param body The schema compatibility request
187
210
* @return The schema compatibility update
188
211
*/
189
- public Mono <SchemaCompatibilityResponse > updateSubjectCompatibility (String kafkaCluster , String subject ,
212
+ public Mono <SchemaCompatibilityResponse > updateSubjectCompatibility (String kafkaCluster ,
213
+ String subject ,
190
214
SchemaCompatibilityRequest body ) {
191
215
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
192
- HttpRequest <?> request =
193
- HttpRequest .PUT (URI .create (StringUtils .prependUri (config .getUrl (), CONFIG + subject )), body )
194
- .basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
216
+ String encodedSubject = URLEncoder .encode (subject , StandardCharsets .UTF_8 );
217
+
218
+ HttpRequest <?> request = HttpRequest .PUT (
219
+ URI .create (StringUtils .prependUri (config .getUrl (),
220
+ CONFIG + encodedSubject )), body )
221
+ .basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
222
+
195
223
return Mono .from (httpClient .retrieve (request , SchemaCompatibilityResponse .class ));
196
224
}
197
225
@@ -204,8 +232,13 @@ public Mono<SchemaCompatibilityResponse> updateSubjectCompatibility(String kafka
204
232
*/
205
233
public Mono <SchemaCompatibilityResponse > getCurrentCompatibilityBySubject (String kafkaCluster , String subject ) {
206
234
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
207
- HttpRequest <?> request = HttpRequest .GET (URI .create (StringUtils .prependUri (config .getUrl (), CONFIG + subject )))
235
+ String encodedSubject = URLEncoder .encode (subject , StandardCharsets .UTF_8 );
236
+
237
+ HttpRequest <?> request = HttpRequest .GET (
238
+ URI .create (StringUtils .prependUri (config .getUrl (),
239
+ CONFIG + encodedSubject )))
208
240
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
241
+
209
242
return Mono .from (httpClient .retrieve (request , SchemaCompatibilityResponse .class ))
210
243
.onErrorResume (HttpClientResponseException .class ,
211
244
ex -> ex .getStatus ().equals (HttpStatus .NOT_FOUND ) ? Mono .empty () : Mono .error (ex ));
@@ -220,9 +253,13 @@ public Mono<SchemaCompatibilityResponse> getCurrentCompatibilityBySubject(String
220
253
*/
221
254
public Mono <SchemaCompatibilityResponse > deleteCurrentCompatibilityBySubject (String kafkaCluster , String subject ) {
222
255
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
223
- MutableHttpRequest <?> request =
224
- HttpRequest .DELETE (URI .create (StringUtils .prependUri (config .getUrl (), CONFIG + subject )))
225
- .basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
256
+ String encodedSubject = URLEncoder .encode (subject , StandardCharsets .UTF_8 );
257
+
258
+ MutableHttpRequest <?> request = HttpRequest .DELETE (
259
+ URI .create (StringUtils .prependUri (config .getUrl (),
260
+ CONFIG + encodedSubject )))
261
+ .basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
262
+
226
263
return Mono .from (httpClient .retrieve (request , SchemaCompatibilityResponse .class ));
227
264
}
228
265
@@ -235,11 +272,12 @@ public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(Str
235
272
*/
236
273
public Mono <List <TagTopicInfo >> associateTags (String kafkaCluster , List <TagTopicInfo > tagSpecs ) {
237
274
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
238
- HttpRequest <?> request = HttpRequest
239
- .POST ( URI . create ( StringUtils . prependUri (
240
- config .getUrl (),
275
+
276
+ HttpRequest <?> request = HttpRequest .POST (
277
+ URI . create ( StringUtils . prependUri ( config .getUrl (),
241
278
"/catalog/v1/entity/tags" )), tagSpecs )
242
279
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
280
+
243
281
return Mono .from (httpClient .retrieve (request , Argument .listOf (TagTopicInfo .class )));
244
282
}
245
283
@@ -252,9 +290,12 @@ public Mono<List<TagTopicInfo>> associateTags(String kafkaCluster, List<TagTopic
252
290
*/
253
291
public Mono <List <TagInfo >> createTags (String kafkaCluster , List <TagInfo > tags ) {
254
292
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
255
- HttpRequest <?> request = HttpRequest .POST (URI .create (StringUtils .prependUri (
256
- config .getUrl (), "/catalog/v1/types/tagdefs" )), tags )
293
+
294
+ HttpRequest <?> request = HttpRequest .POST (
295
+ URI .create (StringUtils .prependUri (config .getUrl (),
296
+ "/catalog/v1/types/tagdefs" )), tags )
257
297
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
298
+
258
299
return Mono .from (httpClient .retrieve (request , Argument .listOf (TagInfo .class )));
259
300
}
260
301
@@ -268,11 +309,12 @@ public Mono<List<TagInfo>> createTags(String kafkaCluster, List<TagInfo> tags) {
268
309
*/
269
310
public Mono <HttpResponse <Void >> dissociateTag (String kafkaCluster , String entityName , String tagName ) {
270
311
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
271
- HttpRequest <?> request = HttpRequest
272
- .DELETE ( URI . create ( StringUtils . prependUri (
273
- config .getUrl (),
312
+
313
+ HttpRequest <?> request = HttpRequest .DELETE (
314
+ URI . create ( StringUtils . prependUri ( config .getUrl (),
274
315
"/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags/" + tagName )))
275
316
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
317
+
276
318
return Mono .from (httpClient .exchange (request , Void .class ));
277
319
}
278
320
@@ -284,11 +326,12 @@ public Mono<HttpResponse<Void>> dissociateTag(String kafkaCluster, String entity
284
326
*/
285
327
public Mono <TopicListResponse > getTopicWithCatalogInfo (String kafkaCluster , int limit , int offset ) {
286
328
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
287
- HttpRequest <?> request = HttpRequest
288
- .GET ( URI . create ( StringUtils . prependUri (
289
- config .getUrl (), "/catalog/v1/search/basic?type=kafka_topic&limit="
290
- + limit + "&offset=" + offset )))
329
+
330
+ HttpRequest <?> request = HttpRequest .GET (
331
+ URI . create ( StringUtils . prependUri ( config .getUrl (),
332
+ "/catalog/v1/search/basic?type=kafka_topic&limit=" + limit + "&offset=" + offset )))
291
333
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
334
+
292
335
return Mono .from (httpClient .retrieve (request , TopicListResponse .class ));
293
336
}
294
337
@@ -302,10 +345,12 @@ public Mono<TopicListResponse> getTopicWithCatalogInfo(String kafkaCluster, int
302
345
public Mono <HttpResponse <TopicDescriptionUpdateResponse >> updateDescription (String kafkaCluster ,
303
346
TopicDescriptionUpdateBody body ) {
304
347
ManagedClusterProperties .SchemaRegistryProperties config = getSchemaRegistry (kafkaCluster );
305
- HttpRequest <?> request = HttpRequest
306
- .PUT (URI .create (StringUtils .prependUri (
307
- config .getUrl (), "/catalog/v1/entity" )), body )
348
+
349
+ HttpRequest <?> request = HttpRequest .PUT (
350
+ URI .create (StringUtils .prependUri (config .getUrl (),
351
+ "/catalog/v1/entity" )), body )
308
352
.basicAuth (config .getBasicAuthUsername (), config .getBasicAuthPassword ());
353
+
309
354
return Mono .from (httpClient .exchange (request , TopicDescriptionUpdateResponse .class ));
310
355
}
311
356
0 commit comments