欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

ShenYu网关数据同步源码分析

时间:2023-07-10
目录

admin数据同步

从SelectorController讲起发布事件ConfigGroupEnumDataEventTypeEnumDataChangedEventDispatcher配置解析 gateway数据同步

ShenyuWebsocketClientWebsocketDataHandlerAbstractDataHandlerCommonPluginDataSubscriberbaseDataCache配置解析 验证猜想

增量数据同步全量数据同步总结
当在后台管理系统中,数据发生了更新后,如何将更新的数据同步到网关中呢?

ShenYu支持多种同步方式,本文以WebSocket为例分析。

admin数据同步 从SelectorController讲起

一般情况下像admin这种后台管理系统,启动的时候应该会全量的同步一次数据,后续如果发生修改,会增量同步数据。所以我们在admin的controller下,可以找到SelectorController的createSelector方法,从开始createSelector分析。

@PostMapping("")public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { Integer createCount = selectorService.createOrUpdate(selectorDTO); return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);}

具体实现如下

@RequiredArgsConstructor@Servicepublic class SelectorServiceImpl implements SelectorService { // 负责事件发布的eventPublisher private final ApplicationEventPublisher eventPublisher; @Override @Transactional(rollbackFor = Exception.class) public int createOrUpdate(final SelectorDTO selectorDTO) { int selectorCount; // 构建数据 DTO --> DO SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO); List selectorConditionDTOs = selectorDTO.getSelectorConditions(); // 判断是添加还是更新 if (StringUtils.isEmpty(selectorDTO.getId())) { // 插入选择器数据 selectorCount = selectorMapper.insertSelective(selectorDO); // 插入选择器中的条件数据 selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO)); }); // check selector add // 权限检查 if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) { DataPermissionDTO dataPermissionDTO = new DataPermissionDTO(); dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId()); dataPermissionDTO.setDataId(selectorDO.getId()); dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE); dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO)); } } else { // 更新数据,先删除再新增 selectorCount = selectorMapper.updateSelective(selectorDO); //delete rule condition then add selectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId())); selectorConditionDTOs.forEach(selectorConditionDTO -> { selectorConditionDTO.setSelectorId(selectorDO.getId()); SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO); selectorConditionMapper.insertSelective(selectorConditionDO); }); } // 发布事件 publishEvent(selectorDO, selectorConditionDTOs); // 更新upstream updateDivideUpstream(selectorDO); return selectorCount; } // ...... }

浏览以上代码,我们猜测和同步有关系的方法,如下:

发布事件

publishEvent(selectorDO, selectorConditionDTOs);

发布事件

private void publishEvent(final SelectorDO selectorDO, final List selectorConditionDTOs) { PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId()); List conditionDataList = selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList()); // publish change event. eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));}

eventPublisher的对象为org.springframework.context.ApplicationEventPublisher。那么他一定是通过Spring的事件发布机制实现的。

Spring的事件发布机制,需要有发布者、监听者、以及事件。目前我们已经找到了发布者,从上面源码中我们可以知道,事件为DataChangedEvent。

public class DataChangedEvent extends ApplicationEvent { private final DataEventTypeEnum eventType; private final ConfigGroupEnum groupKey; public DataChangedEvent(final ConfigGroupEnum groupKey, final DataEventTypeEnum type, final List<?> source) { super(source.stream().filter(Objects::nonNull).collect(Collectors.toList())); this.eventType = type; this.groupKey = groupKey; }}

ConfigGroupEnum

从ConfigGroupEnum中我们可以知道,admin需要与gateway同步的数据可能有插件、规则、选择器、元数据以及鉴权信息。

public enum ConfigGroupEnum { APP_AUTH, PLUGIN, RULE, SELECTOR, meta_DATA; public static ConfigGroupEnum acquireByName(final String name) { return Arrays.stream(ConfigGroupEnum.values()) .filter(e -> Objects.equals(e.name(), name)) .findFirst().orElseThrow(() -> new ShenyuException(String.format(" this ConfigGroupEnum can not support %s", name))); }}

DataEventTypeEnum

从DataEventTypeEnum 我们可以知道,以下事件可能会触发同步,删除、创建、更新、刷新以及第一次的全量同步(我们猜测MYSELF,为全量标识)

public enum DataEventTypeEnum { DELETE, CREATE, UPDATE, REFRESH, MYSELF; public static DataEventTypeEnum acquireByName(final String name) { return Arrays.stream(DataEventTypeEnum.values()) .filter(e -> Objects.equals(e.name(), name)) .findFirst() .orElseThrow(() -> new ShenyuException(String.format(" this DataEventTypeEnum can not support %s", name))); }}

DataChangedEventDispatcher

监听者一般是实现了ApplicationListener接口的,于是我们全局搜索发现了下面这个类

@Componentpublic class DataChangedEventDispatcher implements ApplicationListener, InitializingBean { private final ApplicationContext applicationContext; private List listeners; public DataChangedEventDispatcher(final ApplicationContext applicationContext) { this.applicationContext = applicationContext; } @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List) event.getSource(), event.getEventType()); break; case meta_DATA: listener.onmetaDataChanged((List) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } } @Override public void afterPropertiesSet() { Collection listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values(); this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans)); }}

配置解析

继续往下走我们发现有以下几个类

既然有这么多种实现,那admin又是怎么选择的呢?这里我们可以回想一下,在项目启动的时候,我们配置了以下同步属性

通过搜寻"shenyu.sync.websocket",我们找到了WebsocketSyncProperties

@ConfigurationProperties(prefix = "shenyu.sync.websocket")public class WebsocketSyncProperties { private boolean enabled = true; public boolean isEnabled() { return enabled; } public void setEnabled(final boolean enabled) { this.enabled = enabled; }}

但是该配置文件只有是否开启websocket,并没有注册WebsocketDataChangedListener等通信相关的Bean,所以我们通过WebsocketSyncProperties 继续往下找,找到了下面这个类,这正是我们想要的~

@Configuration// 如果shenyu.sync.websocket.enabled=true,那么就会走这个配置// 这里默认为true@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)@EnableConfigurationProperties(WebsocketSyncProperties.class)static class WebsocketListener { @Bean @ConditionalOnMissingBean(WebsocketDataChangedListener.class) public DataChangedListener websocketDataChangedListener() { return new WebsocketDataChangedListener(); } @Bean @ConditionalOnMissingBean(WebsocketCollector.class) public WebsocketCollector websocketCollector() { return new WebsocketCollector(); } @Bean @ConditionalOnMissingBean(ServerEndpointExporter.class) public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}

回到listener,这里我们选择WebsocketDataChangedListener

public class WebsocketDataChangedListener implements DataChangedListener { @Override public void onSelectorChanged(final List selectorDataList, final DataEventTypeEnum eventType) { // 组装WebsocketData数据 WebsocketData websocketData = new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList); // 通信 WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); }}

我们再来看WebsocketCollector.send这个方法(WebsocketCollector正好也是前面注册过的Bean)

@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)public class WebsocketCollector { private static final Logger LOG = LoggerFactory.getLogger(WebsocketCollector.class); private static final Set SESSION_SET = new CopyOnWriteArraySet<>(); private static final String SESSION_KEY = "sessionKey"; public static void send(final String message, final DataEventTypeEnum type) { if (StringUtils.isBlank(message)) { return; } // 这里侧面证实了我们的猜想,DataEventTypeEnum.MYSELF为全量同步 // 因为这里创建了需要通信的websocket的session if (DataEventTypeEnum.MYSELF == type) { Session session = (Session) ThreadLocalUtils.get(SESSION_KEY); if (Objects.nonNull(session)) { sendMessageBySession(session, message); } } else { SESSION_SET.forEach(session -> sendMessageBySession(session, message)); } } private static synchronized void sendMessageBySession(final Session session, final String message) { try { // 通过websocket的session把消息发送出去 session.getBasicRemote().sendText(message); } catch (IOException e) { LOG.error("websocket send result is exception: ", e); } } // ......}

gateway数据同步 ShenyuWebsocketClient

有了数据的发送,那肯定得有接收,我们可以在shenyu-sync-data-websocket下面找到ShenyuWebsocketClient这个类,他继承了WebSocketClient。

public final class ShenyuWebsocketClient extends WebSocketClient { private static final Logger LOG = LoggerFactory.getLogger(ShenyuWebsocketClient.class); private volatile boolean alreadySync = Boolean.FALSE; private final WebsocketDataHandler websocketDataHandler; public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber, final List metaDataSubscribers, final List authDataSubscribers) { super(serverUri); this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers); } @Override public void onOpen(final ServerHandshake serverHandshake) {、 // 如果没有同步过,那么启动的时候会去同步一次 if (!alreadySync) { // 再次验证了MYSELF为,全量同步标识 send(DataEventTypeEnum.MYSELF.name()); alreadySync = true; } } @Override public void onMessage(final String result) { handleResult(result); } @Override public void onClose(final int i, final String s, final boolean b) { this.close(); } @Override public void onError(final Exception e) { this.close(); } @SuppressWarnings("ALL") private void handleResult(final String result) { LOG.info("handleResult({})", result); WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class); ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType()); String eventType = websocketData.getEventType(); String json = GsonUtils.getInstance().toJson(websocketData.getData()); websocketDataHandler.executor(groupEnum, json, eventType); }}

WebsocketDataHandler

浏览了一下ShenyuWebsocketClient,发现websocketDataHandler.executor(groupEnum, json, eventType);为关键方法,具体实现如下

public class WebsocketDataHandler { private static final EnumMap ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class); public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber, final List metaDataSubscribers, final List authDataSubscribers) { ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber)); ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber)); ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber)); ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers)); ENUM_MAP.put(ConfigGroupEnum.meta_DATA, new metaDataHandler(metaDataSubscribers)); } public void executor(final ConfigGroupEnum type, final String json, final String eventType) { ENUM_MAP.get(type).handle(json, eventType); }}

该类往内存中放入了五种handler,这五种与ConfigGroupEnum的类型一一对应。接下来调用此方法

ENUM_MAP.get(type).handle(json, eventType);

AbstractDataHandler

他是在AbstractDataHandler这个抽象方法里面做分发的

public abstract class AbstractDataHandler implements DataHandler { @Override public void handle(final String json, final String eventType) { List dataList = convert(json); if (CollectionUtils.isEmpty(dataList)) { return; } DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType); switch (eventTypeEnum) { case REFRESH: case MYSELF: doRefresh(dataList); break; case UPDATE: case CREATE: doUpdate(dataList); break; case DELETE: doDelete(dataList); break; default: break; } }}

因为我们是新增了一个选择器,所以最终会调用SelectorDataHandler.doUpdate

@Overrideprotected void doUpdate(final List dataList) { dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);}

CommonPluginDataSubscriber

查找onSelectorSubscribe方法,我们能找到CommonPluginDataSubscriber类

public class CommonPluginDataSubscriber implements PluginDataSubscriber { @Override public void onSelectorSubscribe(final SelectorData selectorData) { subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE); }}

subscribeDataHandler具体实现

// 订阅数据处理程序private void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { // 插件执行的操作 if (data instanceof PluginData) { PluginData pluginData = (PluginData) data; if (dataType == DataEventTypeEnum.UPDATE) { baseDataCache.getInstance().cachePluginData(pluginData); Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData)); } else if (dataType == DataEventTypeEnum.DELETE) { baseDataCache.getInstance().removePluginData(pluginData); Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData)); } } // 选择器执行的操作 else if (data instanceof SelectorData) { SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { // 将数据缓存到内存 baseDataCache.getInstance().cacheSelectData(selectorData); // 如果每个插件还有自己的处理逻辑,那么就去处理自己的逻辑 Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData)); } else if (dataType == DataEventTypeEnum.DELETE) { baseDataCache.getInstance().removeSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); } } // 规则执行的操作 else if (data instanceof RuleData) { RuleData ruleData = (RuleData) data; if (dataType == DataEventTypeEnum.UPDATE) { baseDataCache.getInstance().cacheRuleData(ruleData); Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData)); } else if (dataType == DataEventTypeEnum.DELETE) { baseDataCache.getInstance().removeRuleData(ruleData); Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData)); } } });}

baseDataCache

调用baseDataCache.getInstance().cacheSelectData(selectorData);将数据放入baseDataCache这个单例内存中。

public final class baseDataCache { private static final baseDataCache INSTANCE = new baseDataCache(); private static final ConcurrentMap PLUGIN_MAP = Maps.newConcurrentMap(); private static final ConcurrentMap> SELECTOR_MAP = Maps.newConcurrentMap(); private static final ConcurrentMap> RULE_MAP = Maps.newConcurrentMap(); private baseDataCache() { } public static baseDataCache getInstance() { return INSTANCE; } public void cachePluginData(final PluginData pluginData) { Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.put(data.getName(), data)); } public void removePluginData(final PluginData pluginData) { Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.remove(data.getName())); } public void removePluginDataByPluginName(final String pluginName) { PLUGIN_MAP.remove(pluginName); } public void cleanPluginData() { PLUGIN_MAP.clear(); } public void cleanPluginDataSelf(final List pluginDataList) { pluginDataList.forEach(this::removePluginData); } public PluginData obtainPluginData(final String pluginName) { return PLUGIN_MAP.get(pluginName); } public void cacheSelectData(final SelectorData selectorData) { Optional.ofNullable(selectorData).ifPresent(this::selectorAccept); } public void removeSelectData(final SelectorData selectorData) { Optional.ofNullable(selectorData).ifPresent(data -> { final List selectorDataList = SELECTOR_MAP.get(data.getPluginName()); Optional.ofNullable(selectorDataList).ifPresent(list -> list.removeIf(e -> e.getId().equals(data.getId()))); }); } public void removeSelectDataByPluginName(final String pluginName) { SELECTOR_MAP.remove(pluginName); } public void cleanSelectorData() { SELECTOR_MAP.clear(); } public void cleanSelectorDataSelf(final List selectorDataList) { selectorDataList.forEach(this::removeSelectData); } public List obtainSelectorData(final String pluginName) { return SELECTOR_MAP.get(pluginName); } public void cacheRuleData(final RuleData ruleData) { Optional.ofNullable(ruleData).ifPresent(this::ruleAccept); } public void removeRuleData(final RuleData ruleData) { Optional.ofNullable(ruleData).ifPresent(data -> { final List ruleDataList = RULE_MAP.get(data.getSelectorId()); Optional.ofNullable(ruleDataList).ifPresent(list -> list.removeIf(rule -> rule.getId().equals(data.getId()))); }); } public void removeRuleDataBySelectorId(final String selectorId) { RULE_MAP.remove(selectorId); } public void cleanRuleData() { RULE_MAP.clear(); } public void cleanRuleDataSelf(final List ruleDataList) { ruleDataList.forEach(this::removeRuleData); } public List obtainRuleData(final String selectorId) { return RULE_MAP.get(selectorId); } private void ruleAccept(final RuleData data) { String selectorId = data.getSelectorId(); synchronized (RULE_MAP) { if (RULE_MAP.containsKey(selectorId)) { List existList = RULE_MAP.get(selectorId); final List resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList()); resultList.add(data); final List collect = resultList.stream().sorted(Comparator.comparing(Ruledata::getSort)).collect(Collectors.toList()); RULE_MAP.put(selectorId, collect); } else { RULE_MAP.put(selectorId, Lists.newArrayList(data)); } } } private void selectorAccept(final SelectorData data) { String key = data.getPluginName(); synchronized (SELECTOR_MAP) { if (SELECTOR_MAP.containsKey(key)) { List existList = SELECTOR_MAP.get(key); final List resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList()); resultList.add(data); final List collect = resultList.stream().sorted(Comparator.comparing(Selectordata::getSort)).collect(Collectors.toList()); SELECTOR_MAP.put(key, collect); } else { SELECTOR_MAP.put(key, Lists.newArrayList(data)); } } }}

配置解析

接下来,我们找到gateway的配置文件

通过分析此配置,我们可以找到以下配置类

public class WebsocketConfig { private String urls; public String getUrls() { return urls; } public void setUrls(final String urls) { this.urls = urls; } @Override public boolean equals(final Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } WebsocketConfig that = (WebsocketConfig) o; return Objects.equals(urls, that.urls); } @Override public int hashCode() { return Objects.hash(urls); } @Override public String toString() { return "WebsocketConfig{" + "urls='" + urls + ''' + '}'; }}

通过WebsocketConfig 我们可以找到WebsocketSyncDataConfiguration,该类注册了两个Bean

@Configuration@ConditionalOnClass(WebsocketSyncDataService.class)@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")public class WebsocketSyncDataConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncDataConfiguration.class); @Bean public SyncDataService websocketSyncDataService(final ObjectProvider websocketConfig, final ObjectProvider pluginSubscriber, final ObjectProvider> metaSubscribers, final ObjectProvider authSubscribers) { LOGGER.info("you use websocket sync shenyu data......."); return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); } @Bean @ConfigurationProperties(prefix = "shenyu.sync.websocket") public WebsocketConfig websocketConfig() { return new WebsocketConfig(); }}

接下来我们观察WebsocketSyncDataService这个类

public class WebsocketSyncDataService implements SyncDataService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(WebsocketSyncDataService.class); private final List clients = new ArrayList<>(); private final ScheduledThreadPoolExecutor executor; public WebsocketSyncDataService(final WebsocketConfig websocketConfig, final PluginDataSubscriber pluginDataSubscriber, final List metaDataSubscribers, final List authDataSubscribers) { // 以逗号分隔admin的url String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); // 根据url的个数创建定时任务线程池 executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true)); // 循环遍历,为所有client设置数据 for (String url : urls) { try { clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { LOG.error("websocket url({}) is error", url, e); } } try { // 循环遍历,为所有client建立连接 for (WebSocketClient client : clients) { boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); if (success) { LOG.info("websocket connection is successful....."); } else { LOG.error("websocket connection is error....."); } // 执行定时任务,每隔10秒执行一次 // 主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。 // 如果没有断开,就进行 ping-pong 检测 executor.scheduleAtFixedRate(() -> { try { if (client.isClosed()) { boolean reconnectSuccess = client.reconnectBlocking(); if (reconnectSuccess) { LOG.info("websocket reconnect server[{}] is successful.....", client.getURI().toString()); } else { LOG.error("websocket reconnection server[{}] is error.....", client.getURI().toString()); } } else { client.sendPing(); LOG.debug("websocket send to [{}] ping message successful", client.getURI().toString()); } } catch (InterruptedException e) { LOG.error("websocket connect is error :{}", e.getMessage()); } }, 10, 10, TimeUnit.SECONDS); } } catch (InterruptedException e) { LOG.info("websocket connection...exception....", e); } } @Override public void close() { for (WebSocketClient client : clients) { if (!client.isClosed()) { client.close(); } } if (Objects.nonNull(executor)) { executor.shutdown(); } }}

验证猜想 增量数据同步

Java Line Breakpoints

CommonPluginDataSubscriber.java:154CommonPluginDataSubscriber.java:141CommonPluginDataSubscriber.java:96SelectorDataHandler.java:50AbstractDataHandler.java:77WebsocketDataHandler.java:59ShenyuWebsocketClient.java:89WebsocketCollector.java:131WebsocketDataChangedListener.java:48DataChangedEventDispatcher.java:64SelectorServiceImpl.java:318SelectorServiceImpl.java:152SelectorController.java:92AbstractCircuitBreaker.java:123

admin




gateway




全量数据同步

Java Line Breakpoints

WebsocketCollector.java:86ShenyuWebsocketClient.java:66
启动时

gateway

admin

总结

和猜想一致

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。