Skip to content

Commit

Permalink
Merge branch 'master' into feat_sync_namespace_change
Browse files Browse the repository at this point in the history
  • Loading branch information
xcsnx authored Sep 15, 2024
2 parents 8088148 + ea27db8 commit e4d3e4b
Show file tree
Hide file tree
Showing 20 changed files with 103 additions and 71 deletions.
10 changes: 2 additions & 8 deletions .github/workflows/e2e-k8s.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,16 @@ jobs:
java-version: '17'
distribution: 'temurin'

- uses: dorny/paths-filter@v2
id: filter
with:
filters: '.github/filters.yml'
list-files: json

- name: Build with Maven
if: steps.filter.outputs.changed == 'true'
run: ./mvnw -B clean install -Prelease,docker -Dmaven.javadoc.skip=true -B -Drat.skip=true -Dmaven.test.skip=true -Djacoco.skip=true -DskipITs -DskipTests package -T1C

- name: Save ShenYu Maven Repos
if: steps.filter.outputs.changed == 'true' && steps.restore-maven-cache.outputs.cache-hit != 'true'
uses: actions/cache/save@v3
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
# - name: Build shenyu-e2e-engine with Maven
# run: ./mvnw -B -f ./shenyu-e2e/pom.xml -pl shenyu-e2e-engine -am clean install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.utils.GsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -102,7 +103,7 @@ public void onDiscoveryUpstreamChanged(final List<DiscoverySyncData> changed, fi
}
// create or update
createOrUpdate(upstreamPath, data);
LOG.info("[DataChangedListener] change discoveryUpstream path={}|data={}", upstreamPath, data);
LOG.info("[DataChangedListener] change discoveryUpstream path={}|data={}", upstreamPath, GsonUtils.getInstance().toJson(data));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public Boolean exists(final String key) {
List<KeyValue> keyValues = client.getKVClient().get(ByteSequence.from(key, StandardCharsets.UTF_8), option).get().getKvs();
return !keyValues.isEmpty();
} catch (Exception e) {
LOG.error("check node exists error. {}", e.getMessage());
throw new ShenyuException(e.getMessage());
LOG.error("check node exists error", e);
throw new ShenyuException(e);
}
}

Expand All @@ -78,7 +78,7 @@ public void put(final String key, final String value) {
client.getKVClient().put(ByteSequence.from(key, StandardCharsets.UTF_8), ByteSequence.from(value, StandardCharsets.UTF_8)).get();
} catch (Exception e) {
LOG.error("update value of node error.", e);
throw new ShenyuException(e.getMessage());
throw new ShenyuException(e);
}
}

Expand All @@ -102,7 +102,7 @@ public void deleteEtcdPathRecursive(final String path) {
client.getKVClient().delete(ByteSequence.from(path, StandardCharsets.UTF_8), option).get(10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("delete node of recursive error.", e);
throw new ShenyuException(e.getMessage());
throw new ShenyuException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

public class ZookeeperClient {

private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperClient.class);
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperClient.class);

private final ZookeeperConfig config;

Expand Down Expand Up @@ -75,7 +75,7 @@ public void start() {
try {
this.client.blockUntilConnected();
} catch (InterruptedException e) {
LOGGER.warn("Interrupted during zookeeper client starting.");
LOG.warn("Interrupted during zookeeper client starting.");
Thread.currentThread().interrupt();
}
}
Expand Down Expand Up @@ -111,7 +111,6 @@ public boolean isExist(final String key) {
try {
return null != client.checkExists().forPath(key);
} catch (Exception e) {
LOGGER.error("check if key exist error", e);
return false;
}
}
Expand All @@ -138,7 +137,7 @@ public String getDirectly(final String key) {
* @return value.
*/
public String get(final String key) {
TreeCache cache = findFromcache(key);
TreeCache cache = findFromCache(key);
if (Objects.isNull(cache)) {
return getDirectly(key);
}
Expand All @@ -159,8 +158,17 @@ public String get(final String key) {
public void createOrUpdate(final String key, final String value, final CreateMode mode) {
String val = StringUtils.isEmpty(value) ? "" : value;
try {
client.create().orSetData().creatingParentsIfNeeded().withMode(mode).forPath(key, val.getBytes(StandardCharsets.UTF_8));
synchronized (ZookeeperClient.class) {
if (Objects.nonNull(client.checkExists()) && Objects.nonNull(client.checkExists().forPath(key))) {
LOG.debug("path exists, update zookeeper key={} with value={}", key, val);
client.setData().forPath(key, val.getBytes(StandardCharsets.UTF_8));
return;
}
LOG.debug("path not exists, set zookeeper key={} with value={}", key, val);
client.create().orSetData().creatingParentsIfNeeded().withMode(mode).forPath(key, val.getBytes(StandardCharsets.UTF_8));
}
} catch (Exception e) {
LOG.error("create or update key with value error, key:{} value:{}", key, value, e);
throw new ShenyuException(e);
}
}
Expand Down Expand Up @@ -244,7 +252,7 @@ public TreeCache addCache(final String path, final TreeCacheListener... listener
* @param key key.
* @return cache.
*/
private TreeCache findFromcache(final String key) {
private TreeCache findFromCache(final String key) {
for (Map.Entry<String, TreeCache> cache : caches.entrySet()) {
if (key.startsWith(cache.getKey())) {
return cache.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
*/
public class DiscoveryDataChangedEventSyncListener implements DataChangedEventListener {

private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryDataChangedEventSyncListener.class);
private static final Logger LOG = LoggerFactory.getLogger(DiscoveryDataChangedEventSyncListener.class);

private final KeyValueParser keyValueParser;

Expand Down Expand Up @@ -82,7 +82,7 @@ public void onChange(final DiscoveryDataChangedEvent event) {
DiscoverySyncData discoverySyncData = buildProxySelectorData(event.getValue());
final List<DiscoveryUpstreamData> upstreamDataList = discoverySyncData.getUpstreamDataList();
if (CollectionUtils.isEmpty(upstreamDataList)) {
LOGGER.warn("shenyu proxySelectorData#discoveryUpstreamList is empty");
LOG.warn("shenyu proxySelectorData#discoveryUpstreamList is empty");
return;
}
switch (currentEvent) {
Expand All @@ -95,25 +95,25 @@ public void onChange(final DiscoveryDataChangedEvent event) {
d.setDateCreated(new Timestamp(System.currentTimeMillis()));
d.setDateUpdated(new Timestamp(System.currentTimeMillis()));
discoveryUpstreamMapper.insert(DiscoveryTransfer.INSTANCE.mapToDo(d));
LOGGER.info("shenyu [DiscoveryDataChangedEventSyncListener] ADDED Upstream {}", d.getUrl());
LOG.info("shenyu [DiscoveryDataChangedEventSyncListener] ADDED Upstream {}", d.getUrl());
}
} catch (DuplicateKeyException ex) {
LOGGER.info("shenyu [DiscoveryDataChangedEventSyncListener] Upstream {} exist", d.getUrl());
LOG.info("shenyu [DiscoveryDataChangedEventSyncListener] Upstream {} exist", d.getUrl());
}
});
break;
case UPDATED:
upstreamDataList.stream().map(DiscoveryTransfer.INSTANCE::mapToDo).forEach(discoveryUpstreamDO -> {
discoveryUpstreamDO.setDiscoveryHandlerId(discoveryHandlerId);
int effect = discoveryUpstreamMapper.updateDiscoveryHandlerIdAndUrl(discoveryUpstreamDO);
LOGGER.info("shenyu [DiscoveryDataChangedEventSyncListener] UPDATE Upstream {}, effect = {} ", discoveryUpstreamDO.getUrl(), effect);
LOG.info("shenyu [DiscoveryDataChangedEventSyncListener] UPDATE Upstream {}, effect = {} ", discoveryUpstreamDO.getUrl(), effect);
});
break;
case DELETED:
if (CollectionUtils.isNotEmpty(upstreamDataList)) {
upstreamDataList.forEach(up -> {
discoveryUpstreamMapper.deleteByUrl(discoveryHandlerId, up.getUrl());
LOGGER.info("shenyu [DiscoveryDataChangedEventSyncListener] DELETE Upstream {}", up.getUrl());
LOG.info("shenyu [DiscoveryDataChangedEventSyncListener] DELETE Upstream {}", up.getUrl());
});
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public String get(final String key) {
}
return Objects.isNull(data.getData()) ? null : new String(data.getData(), StandardCharsets.UTF_8);
}

/**
* create or update key with value.
*
Expand All @@ -159,8 +159,17 @@ public String get(final String key) {
public void createOrUpdate(final String key, final String value, final CreateMode mode) {
String val = StringUtils.isEmpty(value) ? "" : value;
try {
client.create().orSetData().creatingParentsIfNeeded().withMode(mode).forPath(key, val.getBytes(StandardCharsets.UTF_8));
synchronized (ClusterZookeeperClient.class) {
if (Objects.nonNull(client.checkExists()) && Objects.nonNull(client.checkExists().forPath(key))) {
LOGGER.debug("path exists, update zookeeper key={} with value={}", key, val);
client.setData().forPath(key, val.getBytes(StandardCharsets.UTF_8));
return;
}
LOGGER.debug("path not exists, set zookeeper key={} with value={}", key, val);
client.create().orSetData().creatingParentsIfNeeded().withMode(mode).forPath(key, val.getBytes(StandardCharsets.UTF_8));
}
} catch (Exception e) {
LOGGER.error("create or update key with value error, key:{} value:{}", key, value, e);
throw new ShenyuException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ private void updateSelectorHandler(final String selectorId, final List<CommonUps

// publish discovery change event.
List<DiscoveryUpstreamData> discoveryUpstreamDataList = discoveryUpstreamService.findBySelectorId(selectorId);

if (CollectionUtils.isEmpty(discoveryUpstreamDataList)) {
discoveryUpstreamDataList = aliveList.stream().map(DiscoveryTransfer.INSTANCE::mapToDiscoveryUpstreamData).collect(Collectors.toList());
}

discoveryUpstreamDataList.removeIf(u -> {
for (CommonUpstream alive : aliveList) {
if (alive.getUpstreamUrl().equals(u.getUrl())) {
Expand All @@ -421,11 +426,13 @@ private void updateSelectorHandler(final String selectorId, final List<CommonUps
LOG.info("change alive selectorId={}|url={}", selectorId, upstream.getUrl());
discoveryUpstreamService.changeStatusBySelectorIdAndUrl(selectorId, upstream.getUrl(), Boolean.TRUE);
});

DiscoverySyncData discoverySyncData = new DiscoverySyncData();
discoverySyncData.setUpstreamDataList(discoveryUpstreamDataList);
discoverySyncData.setPluginName(pluginName);
discoverySyncData.setSelectorId(selectorId);
discoverySyncData.setSelectorName(selectorDO.getName());
LOG.debug("UpstreamCacheManager update selectorId={}|json={}", selectorId, GsonUtils.getGson().toJson(discoverySyncData));
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.DISCOVER_UPSTREAM, DataEventTypeEnum.UPDATE, Collections.singletonList(discoverySyncData)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
* Abstract strategy.
*/
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

/**
* The Event publisher.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*/
public abstract class FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {

private final Logger logger = LoggerFactory.getLogger(FallbackShenyuClientRegisterService.class);
private static final Logger LOG = LoggerFactory.getLogger(FallbackShenyuClientRegisterService.class);

private final Map<String, FallbackHolder> fallsRegisters = new ConcurrentHashMap<>();

Expand All @@ -64,9 +64,9 @@ public String registerURI(final String selectorName, final List<URIRegisterDTO>
try {
this.removeFallBack(key);
result = this.doRegisterURI(selectorName, uriList);
logger.info("Register success: {},{}", selectorName, uriList);
LOG.info("Register success: {},{}", selectorName, uriList);
} catch (Exception ex) {
logger.warn("Register exception: cause:{}", ex.getMessage());
LOG.error("Register exception: cause:", ex);
result = "";
this.addFallback(key, new FallbackHolder(selectorName, uriList));
}
Expand All @@ -81,7 +81,7 @@ private void addFallback(final String key, final FallbackHolder holder) {
FallbackRegisterTask registryTask = new FallbackRegisterTask(key, this);
fallsRegisters.put(key, holder);
timer.add(registryTask);
logger.info("Add to Fallback and wait for execution, {}:{}", holder.getSelectorName(), holder.getUriList());
LOG.info("Add to Fallback and wait for execution, {}:{}", holder.getSelectorName(), holder.getUriList());
}

private void removeFallBack(final String key) {
Expand All @@ -94,7 +94,7 @@ private void recover(final String key) {
List<URIRegisterDTO> uriList = fallbackHolder.getUriList();
String selectorName = fallbackHolder.getSelectorName();
this.doRegisterURI(selectorName, uriList);
logger.info("Register success: {},{}", selectorName, uriList);
LOG.info("Register success: {},{}", selectorName, uriList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shenyu.admin.model.entity.SelectorDO;
import org.apache.shenyu.admin.service.MetaDataService;
import org.apache.shenyu.admin.service.SelectorService;
import org.apache.shenyu.admin.service.converter.DivideSelectorHandleConverter;
import org.apache.shenyu.admin.utils.CommonUpstreamUtils;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.DiscoverySyncData;
Expand All @@ -41,7 +40,6 @@
import org.apache.shenyu.register.common.enums.EventType;
import org.springframework.stereotype.Service;

import jakarta.annotation.Resource;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -56,9 +54,6 @@
@Service
public class ShenyuClientRegisterDivideServiceImpl extends AbstractContextPathRegisterService {

@Resource
private DivideSelectorHandleConverter divideSelectorHandleConverter;

@Override
public String rpcType() {
return RpcTypeEnum.HTTP.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.shenyu.admin.service.register;

import jakarta.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.admin.listener.DataChangedEvent;
import org.apache.shenyu.admin.model.entity.MetaDataDO;
import org.apache.shenyu.admin.model.entity.SelectorDO;
import org.apache.shenyu.admin.service.MetaDataService;
import org.apache.shenyu.admin.service.SelectorService;
import org.apache.shenyu.admin.service.converter.GrpcSelectorHandleConverter;
import org.apache.shenyu.admin.utils.CommonUpstreamUtils;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.DiscoverySyncData;
Expand Down Expand Up @@ -58,9 +56,6 @@ public class ShenyuClientRegisterGrpcServiceImpl extends AbstractShenyuClientReg

private static final Logger LOG = LoggerFactory.getLogger(ShenyuClientRegisterGrpcServiceImpl.class);

@Resource
private GrpcSelectorHandleConverter grpcSelectorHandleConverter;

@Override
public String rpcType() {
return RpcTypeEnum.GRPC.getName();
Expand Down Expand Up @@ -116,7 +111,6 @@ protected String buildHandle(final List<URIRegisterDTO> uriList, final SelectorD
canAddList.addAll(diffStatusList);
}
}

if (doSubmit(selectorDO.getId(), canAddList)) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.shenyu.admin.model.vo.DiscoveryRelVO;
import org.apache.shenyu.admin.model.vo.DiscoveryUpstreamVO;
import org.apache.shenyu.admin.model.vo.DiscoveryVO;
import org.apache.shenyu.admin.utils.CommonUpstreamUtils;
import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.ProxySelectorData;
import org.apache.shenyu.common.dto.convert.selector.CommonUpstream;
Expand Down Expand Up @@ -329,5 +330,13 @@ public DiscoveryUpstreamDTO mapToDTO(DiscoveryUpstreamDO discoveryUpstreamDO) {
return discoveryUpstreamDTO;
}).orElse(null);
}


/**
* mapToDiscoveryUpstreamData.
* @param commonUpstream commonUpstream
* @return DiscoveryUpstreamData
*/
public DiscoveryUpstreamData mapToDiscoveryUpstreamData(CommonUpstream commonUpstream) {
return mapToData(CommonUpstreamUtils.buildDefaultDiscoveryUpstreamDTO(commonUpstream.getUpstreamUrl().split(":")[0], Integer.valueOf(commonUpstream.getUpstreamUrl().split(":")[1]), commonUpstream.getProtocol()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.admin.model.entity.MetaDataDO;
import org.apache.shenyu.admin.model.entity.SelectorDO;
import org.apache.shenyu.admin.service.converter.DivideSelectorHandleConverter;
import org.apache.shenyu.admin.service.impl.MetaDataServiceImpl;
import org.apache.shenyu.common.dto.convert.rule.impl.DivideRuleHandle;
import org.apache.shenyu.common.dto.convert.selector.DivideUpstream;
Expand All @@ -38,7 +37,6 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.test.util.ReflectionTestUtils;

import java.lang.reflect.Method;
import java.util.ArrayList;
Expand Down Expand Up @@ -69,8 +67,6 @@ public final class ShenyuClientRegisterDivideServiceImplTest {

@BeforeEach
public void setUp() {
DivideSelectorHandleConverter divideSelectorHandleConverter = new DivideSelectorHandleConverter();
ReflectionTestUtils.setField(shenyuClientRegisterDivideService, "divideSelectorHandleConverter", divideSelectorHandleConverter);
}

@Test
Expand Down
Loading

0 comments on commit e4d3e4b

Please sign in to comment.