Skip to content

Commit

Permalink
Merge pull request #11139 from AndriiLandiak/feature/edge-proxy-rpc
Browse files Browse the repository at this point in the history
[Edge] Proxy for grpc client
  • Loading branch information
ViacheslavKlimov authored Jul 4, 2024
2 parents b42fcf3 + 7a25009 commit f5a85ca
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,16 @@ private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
initRuleChains();
}
}
if (msg.getEntityId().getEntityType() == EntityType.EDGE) {
if (msg.getEntityId().getEntityType() == EntityType.EDGE && isCore) {
EdgeId edgeId = new EdgeId(msg.getEntityId().getId());
EdgeRpcService edgeRpcService = systemContext.getEdgeRpcService();
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
edgeRpcService.deleteEdge(tenantId, edgeId);
} else if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) {
Edge edge = systemContext.getEdgeService().findEdgeById(tenantId, edgeId);
edgeRpcService.updateEdge(tenantId, edge);
if (edgeRpcService != null) {
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
edgeRpcService.deleteEdge(tenantId, edgeId);
} else if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) {
Edge edge = systemContext.getEdgeService().findEdgeById(tenantId, edgeId);
edgeRpcService.updateEdge(tenantId, edge);
}
}
}
if (msg.getEntityId().getEntityType() == EntityType.DEVICE && ComponentLifecycleEvent.DELETED == msg.getEvent() && isMyPartition(msg.getEntityId())) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@
package org.thingsboard.server.controller;

import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.exception.ThingsboardException;
Expand Down Expand Up @@ -62,8 +58,7 @@ public class EdgeEventController extends BaseController {
notes = "Returns a page of edge events for the requested edge. " +
PAGE_DATA_PARAMETERS)
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/edge/{edgeId}/events", method = RequestMethod.GET)
@ResponseBody
@GetMapping(value = "/edge/{edgeId}/events")
public PageData<EdgeEvent> getEdgeEvents(
@Parameter(description = EDGE_ID_PARAM_DESCRIPTION, required = true)
@PathVariable(EDGE_ID) String strEdgeId,
Expand All @@ -88,4 +83,5 @@ public PageData<EdgeEvent> getEdgeEvents(
TimePageLink pageLink = createTimePageLink(pageSize, page, textSearch, sortProperty, sortOrder, startTime, endTime);
return checkNotNull(edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, pageLink));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package org.thingsboard.server.service.edge.instructions;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.AttributeScope;
Expand Down Expand Up @@ -164,4 +162,5 @@ private String convertEdgeVersionToDocsFormat(String edgeVersion) {
protected String getBaseDirName() {
return UPGRADE_DIR;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
*/
package org.thingsboard.server.service.edge.instructions;

import jakarta.servlet.http.HttpServletRequest;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeInstructions;

import jakarta.servlet.http.HttpServletRequest;

public interface EdgeInstallInstructionsService {

EdgeInstructions getInstallInstructions(Edge edge, String installationMethod, HttpServletRequest request);

void setAppVersion(String version);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import io.grpc.Server;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -55,9 +58,6 @@
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;

import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import com.google.gson.reflect.TypeToken;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
import org.thingsboard.server.gen.transport.TransportProtos;
Expand Down Expand Up @@ -106,4 +106,5 @@ private String getScopeOfDefault(JsonObject data) {
}
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.TenantProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.TenantUpdateMsg;
Expand All @@ -37,22 +36,6 @@ public TenantUpdateMsg constructTenantUpdateMsg(UpdateMsgType msgType, Tenant te

@Override
public TenantProfileUpdateMsg constructTenantProfileUpdateMsg(UpdateMsgType msgType, TenantProfile tenantProfile, EdgeVersion edgeVersion) {
tenantProfile = JacksonUtil.clone(tenantProfile);
// clear all config
var configuration = tenantProfile.getDefaultProfileConfiguration();
configuration.setRpcTtlDays(0);
configuration.setMaxJSExecutions(0);
configuration.setMaxREExecutions(0);
configuration.setMaxDPStorageDays(0);
configuration.setMaxTbelExecutions(0);
configuration.setQueueStatsTtlDays(0);
configuration.setMaxTransportMessages(0);
configuration.setDefaultStorageTtlDays(0);
configuration.setMaxTransportDataPoints(0);
configuration.setRuleEngineExceptionsTtlDays(0);
configuration.setMaxRuleNodeExecutionsPerMessage(0);
tenantProfile.getProfileData().setConfiguration(configuration);

return TenantProfileUpdateMsg.newBuilder().setMsgType(msgType).setEntity(JacksonUtil.toString(tenantProfile)).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DeviceRpcCallMsg;
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
Expand All @@ -37,4 +36,5 @@ public interface DeviceProcessor extends EdgeProcessor {
DownlinkMsg convertDeviceEventToDownlink(EdgeEvent edgeEvent, EdgeId edgeId, EdgeVersion edgeVersion);

ListenableFuture<Void> processDeviceRpcCallFromEdge(TenantId tenantId, Edge edge, DeviceRpcCallMsg deviceRpcCallMsg);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.thingsboard.edge.rpc;

import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg;

import javax.net.ssl.SSLException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
Expand All @@ -67,6 +69,16 @@ public class EdgeGrpcClient implements EdgeRpcClient {
private String certResource;
@Value("${cloud.rpc.max_inbound_message_size:4194304}")
private int maxInboundMessageSize;
@Value("${cloud.rpc.proxy.enabled}")
private boolean proxyEnabled;
@Value("${cloud.rpc.proxy.host:}")
private String proxyHost;
@Value("${cloud.rpc.proxy.port:0}")
private int proxyPort;
@Value("${cloud.rpc.proxy.username:}")
private String proxyUsername;
@Value("${cloud.rpc.proxy.password:}")
private String proxyPassword;
@Getter
private int serverMaxInboundMessageSize;

Expand All @@ -88,6 +100,7 @@ public void connect(String edgeKey,
.keepAliveTime(keepAliveTimeSec, TimeUnit.SECONDS)
.keepAliveTimeout(keepAliveTimeoutSec, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true);

if (sslEnabled) {
try {
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
Expand All @@ -102,6 +115,18 @@ public void connect(String edgeKey,
} else {
builder.usePlaintext();
}

if (proxyEnabled && StringUtils.isNotEmpty(proxyHost) && proxyPort > 0) {
InetSocketAddress proxyAddress = new InetSocketAddress(proxyHost, proxyPort);
InetSocketAddress targetAddress = new InetSocketAddress(rpcHost, rpcPort);
builder.proxyDetector(socketAddress -> HttpConnectProxiedSocketAddress.newBuilder()
.setTargetAddress(targetAddress)
.setProxyAddress(proxyAddress)
.setUsername(proxyUsername)
.setPassword(proxyPassword)
.build());
}

channel = builder.build();
EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub(channel);
log.info("[{}] Sending a connect request to the TB!", edgeKey);
Expand Down

0 comments on commit f5a85ca

Please sign in to comment.