Skip to content

Commit 07d7526

Browse files
authored
Support returning user subject with variables for AuthenticationProviderMTls (#1470)
1 parent 26eec37 commit 07d7526

File tree

6 files changed

+97
-30
lines changed

6 files changed

+97
-30
lines changed

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/MQTTAuthenticationService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import static io.streamnative.pulsar.handlers.mqtt.Constants.AUTH_TOKEN;
1919
import io.netty.handler.codec.mqtt.MqttConnectMessage;
2020
import io.netty.handler.codec.mqtt.MqttConnectPayload;
21-
import io.streamnative.pulsar.handlers.mqtt.identitypool.AuthenticationProviderMTls;
21+
import io.streamnative.pulsar.handlers.mqtt.authentication.AuthenticationProviderMTls;
2222
import io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils;
2323
import java.util.HashMap;
2424
import java.util.List;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.streamnative.pulsar.handlers.mqtt.authentication;
15+
16+
import java.util.Map;
17+
import lombok.AllArgsConstructor;
18+
import lombok.Data;
19+
20+
@Data
21+
@AllArgsConstructor
22+
public final class AuthRequest {
23+
private String subject;
24+
private Map<String, String> variables;
25+
}
Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt.identitypool;
15-
16-
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN;
17-
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.DN_KEYS;
18-
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SAN;
19-
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SHA1;
20-
import static io.streamnative.pulsar.handlers.mqtt.identitypool.ExpressionCompiler.SNID;
14+
package io.streamnative.pulsar.handlers.mqtt.authentication;
15+
16+
import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.DN;
17+
import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.SAN;
18+
import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.SHA1;
19+
import static io.streamnative.pulsar.handlers.mqtt.authentication.ExpressionCompiler.SNID;
20+
import com.fasterxml.jackson.core.JsonProcessingException;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
2122
import com.google.common.annotations.VisibleForTesting;
2223
import com.google.common.base.Joiner;
2324
import io.streamnative.oidc.broker.common.OIDCPoolResources;
@@ -43,10 +44,12 @@
4344
import lombok.extern.slf4j.Slf4j;
4445
import org.apache.commons.codec.binary.Hex;
4546
import org.apache.commons.lang.StringUtils;
47+
import org.apache.commons.lang.text.StrBuilder;
4648
import org.apache.pulsar.broker.ServiceConfiguration;
4749
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
4850
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
4951
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
52+
import org.apache.pulsar.common.util.ObjectMapperFactory;
5053
import org.apache.pulsar.metadata.api.MetadataStore;
5154
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
5255
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -64,6 +67,8 @@ public class AuthenticationProviderMTls implements AuthenticationProvider {
6467
@VisibleForTesting
6568
private OIDCPoolResources poolResources;
6669

70+
private final ObjectMapper objectMapper = ObjectMapperFactory.create();
71+
6772
@Getter
6873
@VisibleForTesting
6974
private final ConcurrentHashMap<String, ExpressionCompiler> poolMap = new ConcurrentHashMap<>();
@@ -212,7 +217,7 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
212217
final X509Certificate certificate = (X509Certificate) certs[0];
213218

214219
// parse DN
215-
Map<String, Object> params;
220+
Map<String, String> params;
216221
try {
217222
String subject = certificate.getSubjectX500Principal().getName();
218223
params = parseDN(subject);
@@ -228,20 +233,26 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
228233
// parse SHA1
229234
params.put(SHA1, parseSHA1FingerPrint(certificate));
230235

231-
String principal = matchPool(params);
232-
if (principal.isEmpty()) {
236+
String poolName = matchPool(params);
237+
if (poolName.isEmpty()) {
233238
errorCode = ErrorCode.NO_MATCH_POOL;
234239
throw new AuthenticationException("No matched identity pool from the client certificate");
235240
}
241+
AuthRequest authRequest = new AuthRequest(poolName, params);
242+
String authRequestJson = objectMapper.writeValueAsString(authRequest);
236243
metrics.recordSuccess();
237-
return principal;
244+
return authRequestJson;
238245
} catch (AuthenticationException e) {
239246
metrics.recordFailure(errorCode);
240247
throw e;
248+
} catch (JsonProcessingException e) {
249+
log.error("Failed to serialize the auth request", e);
250+
metrics.recordFailure(errorCode);
251+
throw new AuthenticationException(e.getMessage());
241252
}
242253
}
243254

244-
public String matchPool(Map<String, Object> params) throws AuthenticationException {
255+
public String matchPool(Map<String, String> params) throws AuthenticationException {
245256
List<String> principals = new ArrayList<>();
246257
poolMap.forEach((poolName, compiler) -> {
247258
Boolean matched = false;
@@ -284,32 +295,38 @@ static String parseSHA1FingerPrint(X509Certificate certificate) {
284295
}
285296
}
286297

287-
static Map<String, Object> parseDN(String dn) throws InvalidNameException {
288-
Map<String, Object> params = new HashMap<>();
298+
static Map<String, String> parseDN(String dn) throws InvalidNameException {
299+
Map<String, String> params = new HashMap<>();
289300
if (StringUtils.isEmpty(dn)) {
290301
return params;
291302
}
292303
params.put(DN, dn);
293304
LdapName ldapName = new LdapName(dn);
294305
for (Rdn rdn : ldapName.getRdns()) {
295306
String rdnType = rdn.getType().toUpperCase();
296-
if (DN_KEYS.contains(rdnType)) {
297-
String value = Rdn.escapeValue(rdn.getValue());
298-
value = value.replace("\r", "\\0D");
299-
value = value.replace("\n", "\\0A");
300-
params.put(rdnType, value);
301-
}
307+
String value = Rdn.escapeValue(rdn.getValue());
308+
value = value.replace("\r", "\\0D");
309+
value = value.replace("\n", "\\0A");
310+
params.put(rdnType, value);
302311
}
303312

304313
return params;
305314
}
306315

307-
static void parseSAN(X509Certificate certificate, @NotNull Map<String, Object> map) {
316+
static void parseSAN(X509Certificate certificate, @NotNull Map<String, String> map) {
308317
try {
318+
// byte[] extensionValue = certificate.getExtensionValue("2.5.29.17");
319+
// TODO How to get the original extension name
309320
Collection<List<?>> subjectAlternativeNames = certificate.getSubjectAlternativeNames();
310321
if (subjectAlternativeNames != null) {
311322
List<String> formattedSANList = subjectAlternativeNames.stream()
312-
.map(list -> getSanName((int) list.get(0)) + ":" + list.get(1))
323+
.map(list -> {
324+
String sanName = getSanName((int) list.get(0));
325+
String sanValue = (String) list.get(1);
326+
map.put(sanName, sanValue);
327+
sanName = mapSANNames(sanName, sanValue, map);
328+
return sanName + ":" + sanValue;
329+
})
313330
.collect(Collectors.toList());
314331
String formattedSAN = String.join(",", formattedSANList);
315332
map.put(SAN, formattedSAN);
@@ -319,10 +336,27 @@ static void parseSAN(X509Certificate certificate, @NotNull Map<String, Object> m
319336
}
320337
}
321338

339+
static String mapSANNames(String sanName, String sanValue, @NotNull Map<String, String> map) {
340+
String newSanName = sanName;
341+
// "RFC822NAME:aaa" -> "EMAIL:aaa,DEVICE_ID:aaa,RFC822NAME:aaa"
342+
if (sanName.equals("DNS")) {
343+
StrBuilder strBuilder = new StrBuilder();
344+
strBuilder.append("EMAIL:").append(sanValue).append(",");
345+
map.put("EMAIL", sanValue);
346+
347+
// strBuilder.append("DEVICE_ID:").append(sanValue).append(",");
348+
// map.put("DEVICE_ID", sanValue);
349+
350+
strBuilder.append(sanName);
351+
newSanName = strBuilder.toString();
352+
}
353+
return newSanName;
354+
}
355+
322356
private static String getSanName(int type) {
323357
return switch (type) {
324358
case 0 -> "OTHERNAME";
325-
case 1 -> "EMAIL";
359+
case 1 -> "RFC822NAME";
326360
case 2 -> "DNS";
327361
case 3 -> "X400";
328362
case 4 -> "DIR";

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/ExpressionCompiler.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/ExpressionCompiler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt.identitypool;
14+
package io.streamnative.pulsar.handlers.mqtt.authentication;
1515

1616

1717
import dev.cel.common.CelAbstractSyntaxTree;
@@ -73,7 +73,7 @@ private void compile() throws CelValidationException, CelEvaluationException {
7373
this.program = runtime.createProgram(ast);
7474
}
7575

76-
public Boolean eval(Map<String, Object> mapValue) throws Exception {
76+
public Boolean eval(Map<String, String> mapValue) throws Exception {
7777
final Object eval = program.eval(mapValue);
7878
if (eval instanceof Boolean) {
7979
return (Boolean) eval;

mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/identitypool/package-info.java renamed to mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/authentication/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt.identitypool;
14+
package io.streamnative.pulsar.handlers.mqtt.authentication;
Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
package io.streamnative.pulsar.handlers.mqtt.identitypool;
14+
package io.streamnative.pulsar.handlers.mqtt.authentication;
1515

16+
import com.fasterxml.jackson.databind.ObjectMapper;
1617
import io.netty.channel.local.LocalAddress;
1718
import io.streamnative.oidc.broker.common.pojo.Pool;
1819
import java.io.File;
@@ -25,9 +26,11 @@
2526
import javax.net.ssl.SSLPeerUnverifiedException;
2627
import javax.net.ssl.SSLSession;
2728
import javax.net.ssl.SSLSessionContext;
29+
import lombok.extern.slf4j.Slf4j;
2830
import org.apache.commons.io.FileUtils;
2931
import org.apache.pulsar.broker.ServiceConfiguration;
3032
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
33+
import org.apache.pulsar.common.util.ObjectMapperFactory;
3134
import org.apache.pulsar.common.util.SecurityUtility;
3235
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
3336
import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
@@ -38,13 +41,15 @@
3841
import org.testng.annotations.DataProvider;
3942
import org.testng.annotations.Test;
4043

44+
@Slf4j
4145
public class AuthenticationProviderMTlsTest {
4246

4347
private static final String SUPER_USER = "superUser";
4448
private static final String CLUSTER = "mtls-test";
4549

4650
private ServiceConfiguration serviceConfiguration;
4751
private LocalMemoryMetadataStore metadataStore;
52+
private final ObjectMapper objectMapper = ObjectMapperFactory.create();
4853

4954
@SuppressWarnings("UnstableApiUsage")
5055
@BeforeClass
@@ -73,7 +78,8 @@ public Object[][] reuseMetadata() {
7378
public void testExpression() throws Exception {
7479
String dn = FileUtils.readFileToString(new File(getResourcePath("mtls/cel-test.txt")), "UTF-8");
7580

76-
Map<String, Object> params = AuthenticationProviderMTls.parseDN(dn);
81+
Map<String, String> params = AuthenticationProviderMTls.parseDN(dn);
82+
params.put("O2", "StreamNative, Inc.");
7783

7884
ExpressionCompiler compiler = new ExpressionCompiler("DN.contains(\"CN=streamnative.io\")");
7985
Boolean eval = compiler.eval(params);
@@ -121,7 +127,9 @@ public void testAuthenticationProviderMTls(boolean reuseMetadata) throws Excepti
121127
SSLSession sslSession = new MockSSLSession(x509Certificates);
122128
AuthenticationDataCommand authData = new AuthenticationDataCommand("", LocalAddress.ANY, sslSession);
123129
String principal = authenticationProvider.authenticate(authData);
124-
Assert.assertEquals(principal, poolName);
130+
log.info("Principal: {}", principal);
131+
AuthRequest authRequest = objectMapper.readValue(principal, AuthRequest.class);
132+
Assert.assertEquals(authRequest.getSubject(), poolName);
125133
authenticationProvider.close();
126134
}
127135

0 commit comments

Comments
 (0)