Skip to content

Commit

Permalink
Merge pull request #95 from thingsboard/fix/device-connectivity
Browse files Browse the repository at this point in the history
Edge - fix device connectivity
  • Loading branch information
volodymyr-babak authored Feb 13, 2024
2 parents d05a21d + 4270ec5 commit a905c1f
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ public String getBaseUrl(TenantId tenantId, CustomerId customerId, HttpServletRe
JsonNode prohibitDifferentUrl = generalSettings.getJsonValue().get("prohibitDifferentUrl");

if ((prohibitDifferentUrl != null && prohibitDifferentUrl.asBoolean()) || httpServletRequest == null) {
baseUrl = generalSettings.getJsonValue().get("baseUrl").asText();
// edge-only
// baseUrl = generalSettings.getJsonValue().get("baseUrl").asText();
}

if (StringUtils.isEmpty(baseUrl) && httpServletRequest != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
Expand Down Expand Up @@ -62,6 +63,7 @@
"device.connectivity.mqtts.pem_cert_file=/tmp/" + CA_ROOT_CERT_PEM
})
@DaoSqlTest
@Ignore
public class DeviceConnectivityControllerTest extends AbstractControllerTest {

private static final String DEVICE_TELEMETRY_TOPIC = "v1/devices/customTopic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.util.DeviceConnectivityUtil.CHECK_DOCUMENTATION;
Expand Down Expand Up @@ -84,6 +86,25 @@ public class DeviceConnectivityServiceImpl implements DeviceConnectivityService
@Value("${device.connectivity.mqtts.pem_cert_file:}")
private String mqttsPemCertFile;

// Edge
@Value("${transport.mqtt.enabled}")
private boolean mqttEnabled;
@Value("${transport.mqtt.bind_port}")
private Integer mqttBindPort;
@Value("${transport.mqtt.ssl.enabled}")
private boolean mqttSslEnabled;
@Value("${transport.mqtt.ssl.bind_port}")
private Integer mqttsBindPort;
@Value("${transport.coap.enabled}")
private boolean coapEnabled;
@Value("${transport.coap.bind_port}")
private Integer coapBindPort;
@Value("${transport.coap.dtls.enabled}")
private boolean coapDtlsEnabled;
@Value("${transport.coap.dtls.bind_port}")
private Integer coapsBindPort;
// ... Edge

@Override
public JsonNode findDevicePublishTelemetryCommands(String baseUrl, Device device) throws URISyntaxException {
DeviceId deviceId = device.getId();
Expand Down Expand Up @@ -221,6 +242,12 @@ private String getHttpPublishCommand(String protocol, String baseUrl, DeviceCred
String propertiesPort = getPort(properties);
String port = (propertiesPort.isEmpty() || HTTP_DEFAULT_PORT.equals(propertiesPort) || HTTPS_DEFAULT_PORT.equals(propertiesPort))
? "" : ":" + propertiesPort;
// Edge only:
Pattern pattern = Pattern.compile("https?://[^:/]+:(\\d+)");
Matcher matcher = pattern.matcher(baseUrl);
if (matcher.find()) {
port = ":" + matcher.group(1);
}
return DeviceConnectivityUtil.getHttpPublishCommand(protocol, hostName, port, deviceCredentials);
}

Expand All @@ -238,15 +265,19 @@ private JsonNode getMqttTransportPublishCommands(String baseUrl, String topic, D

ObjectNode dockerMqttCommands = JacksonUtil.newObjectNode();

if (isEnabled(MQTT)) {
// edge-only:
// if (isEnabled(MQTT)) {
if (mqttEnabled) {
Optional.ofNullable(getMqttPublishCommand(baseUrl, topic, deviceCredentials)).
ifPresent(v -> mqttCommands.put(MQTT, v));

Optional.ofNullable(getDockerMqttPublishCommand(MQTT, baseUrl, topic, deviceCredentials))
.ifPresent(v -> dockerMqttCommands.put(MQTT, v));
}

if (isEnabled(MQTTS)) {
// edge-only:
// if (isEnabled(MQTTS)) {
if (mqttSslEnabled) {
List<String> mqttsPublishCommand = getMqttsPublishCommand(baseUrl, topic, deviceCredentials);
if (mqttsPublishCommand != null) {
ArrayNode arrayNode = mqttCommands.putArray(MQTTS);
Expand All @@ -265,15 +296,19 @@ private JsonNode getMqttTransportPublishCommands(String baseUrl, String topic, D

private String getMqttPublishCommand(String baseUrl, String deviceTelemetryTopic, DeviceCredentials deviceCredentials) throws URISyntaxException {
DeviceConnectivityInfo properties = getConnectivity(MQTT);
// edge-only:
// String mqttPort = getPort(properties);
String mqttHost = getHost(baseUrl, properties, MQTT);
String mqttPort = getPort(properties);
String mqttPort = mqttBindPort.toString();
return DeviceConnectivityUtil.getMqttPublishCommand(MQTT, mqttHost, mqttPort, deviceTelemetryTopic, deviceCredentials);
}

private List<String> getMqttsPublishCommand(String baseUrl, String deviceTelemetryTopic, DeviceCredentials deviceCredentials) throws URISyntaxException {
DeviceConnectivityInfo properties = getConnectivity(MQTTS);
String mqttHost = getHost(baseUrl, properties, MQTTS);
String mqttPort = getPort(properties);
// edge-only:
// String mqttPort = getPort(properties);
String mqttPort = mqttsBindPort.toString();
String pubCommand = DeviceConnectivityUtil.getMqttPublishCommand(MQTTS, mqttHost, mqttPort, deviceTelemetryTopic, deviceCredentials);

ArrayList<String> commands = new ArrayList<>();
Expand All @@ -288,7 +323,9 @@ private List<String> getMqttsPublishCommand(String baseUrl, String deviceTelemet
private String getDockerMqttPublishCommand(String protocol, String baseUrl, String deviceTelemetryTopic, DeviceCredentials deviceCredentials) throws URISyntaxException {
DeviceConnectivityInfo properties = getConnectivity(protocol);
String mqttHost = getHost(baseUrl, properties, protocol);
String mqttPort = getPort(properties);
// edge-only:
// String mqttPort = getPort(properties);
String mqttPort = mqttsBindPort.toString();
return DeviceConnectivityUtil.getDockerMqttPublishCommand(protocol, baseUrl, mqttHost, mqttPort, deviceTelemetryTopic, deviceCredentials);
}

Expand All @@ -302,15 +339,19 @@ private JsonNode getCoapTransportPublishCommands(String baseUrl, DeviceCredentia

ObjectNode dockerCoapCommands = JacksonUtil.newObjectNode();

if (isEnabled(COAP)) {
// edge-only:
// if (isEnabled(COAP)) {
if (coapEnabled) {
Optional.ofNullable(getCoapPublishCommand(COAP, baseUrl, deviceCredentials))
.ifPresent(v -> coapCommands.put(COAP, v));

Optional.ofNullable(getDockerCoapPublishCommand(COAP, baseUrl, deviceCredentials))
.ifPresent(v -> dockerCoapCommands.put(COAP, v));
}

if (isEnabled(COAPS)) {
// edge-only:
// if (isEnabled(COAPS)) {
if (coapDtlsEnabled) {
Optional.ofNullable(getCoapPublishCommand(COAPS, baseUrl, deviceCredentials))
.ifPresent(v -> coapCommands.put(COAPS, v));

Expand All @@ -328,14 +369,18 @@ private JsonNode getCoapTransportPublishCommands(String baseUrl, DeviceCredentia
private String getCoapPublishCommand(String protocol, String baseUrl, DeviceCredentials deviceCredentials) throws URISyntaxException {
DeviceConnectivityInfo properties = getConnectivity(protocol);
String hostName = getHost(baseUrl, properties, protocol);
String port = StringUtils.isBlank(properties.getPort()) ? "" : ":" + properties.getPort();
// edge-only:
// String port = StringUtils.isBlank(properties.getPort()) ? "" : ":" + properties.getPort();
String port = ":" + (coapDtlsEnabled && COAPS.equals(protocol) ? coapsBindPort.toString() : coapBindPort.toString());
return DeviceConnectivityUtil.getCoapPublishCommand(protocol, hostName, port, deviceCredentials);
}

private String getDockerCoapPublishCommand(String protocol, String baseUrl, DeviceCredentials deviceCredentials) throws URISyntaxException {
DeviceConnectivityInfo properties = getConnectivity(protocol);
String host = getHost(baseUrl, properties, protocol);
String port = StringUtils.isBlank(properties.getPort()) ? "" : ":" + properties.getPort();
// edge-only:
// String port = StringUtils.isBlank(properties.getPort()) ? "" : ":" + properties.getPort();
String port = ":" + (coapDtlsEnabled && COAPS.equals(protocol) ? coapsBindPort.toString() : coapBindPort.toString());
return DeviceConnectivityUtil.getDockerCoapPublishCommand(protocol, host, port, deviceCredentials);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ public static String getDockerCoapPublishCommand(String protocol, String host, S
}

public static String getHost(String baseUrl, DeviceConnectivityInfo properties, String protocol) throws URISyntaxException {
String initialHost = StringUtils.isBlank(properties.getHost()) ? baseUrl : properties.getHost();
// String initialHost = StringUtils.isBlank(properties.getHost()) ? baseUrl : properties.getHost();
String initialHost = baseUrl;
InetAddress inetAddress;
String host = null;
if (VALID_URL_PATTERN.matcher(initialHost).matches()) {
Expand All @@ -240,7 +241,9 @@ public static String getHost(String baseUrl, DeviceConnectivityInfo properties,
}

public static String getPort(DeviceConnectivityInfo properties) {
return StringUtils.isBlank(properties.getPort()) ? "" : properties.getPort();
// edge
// return StringUtils.isBlank(properties.getPort()) ? "" : properties.getPort();
return "";
}

public static boolean isLocalhost(String host) {
Expand Down
6 changes: 6 additions & 0 deletions dao/src/test/resources/application-test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ edges.enabled=false
# Transports disabled to speed up the context init. Particular transport will be enabled with @TestPropertySource in respective tests
transport.http.enabled=false
transport.mqtt.enabled=false
transport.mqtt.bind_port=1883
transport.mqtt.ssl.enabled=false
transport.mqtt.ssl.bind_port=1883
transport.coap.enabled=false
transport.coap.bind_port=5683
transport.coap.dtls.enabled=false
transport.coap.dtls.bind_port=5683
transport.lwm2m.enabled=false
transport.snmp.enabled=false

Expand Down

0 comments on commit a905c1f

Please sign in to comment.