欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

【RPC项目】3.增加注册中心

时间:2023-04-16

增加注册中心工作内容主要是服务注册、服务发现、服务下线,负载均衡。并且将服务端提供的服务用 ZkServiceProviderImpl 保存。

注册中心使用 ZooKeeper 实现,使用 Curator 框架对 ZooKeeper 进行调用。
服务注册包括注册服务的方法,将服务写到 ZooKeeper 中。
服务发现包括发现服务的方法,通过负载均衡选择服务地址。
服务下线包括删除所有服务端的服务。
负载均衡这里只实现了最简单的随机分配。
ZkServiceProviderImpl 需要使用单例模式,增加了 SingletonFactory 类。

为了方便不同的服务端之间进行区分,增加了 RpcServiceConfig 类,主要用 version 和 group 来区分。同时将之间的 Service 和 Client 类优化,提取了启动的关键代码,将其他过程放到 Client 和 Server 文件夹内。

以下是部分重要代码:

package common.dto;import lombok.*;@AllArgsConstructor@NoArgsConstructor@Getter@Setter@Builder@ToStringpublic class RpcServiceConfig { private String version = ""; private String group = ""; private Object service; public String getRpcServiceName() { return this.getServiceName() + this.getGroup() + this.getVersion(); } public String getServiceName() { return this.service.getClass().getInterfaces()[0].getCanonicalName(); }}

package common.provider.impl;import common.Exceptions.RpcException;import common.dto.RpcServiceConfig;import common.provider.ServiceProvider;import common.register.ServiceRegistry;import common.register.zookeeper.ZkServiceRegistryImpl;import demo4.Server.SocketRpcServer;import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;@Slf4jpublic class ZkServiceProviderImpl implements ServiceProvider { private final Map serviceMap; private final Set registeredService; private final ServiceRegistry serviceRegistry; public ZkServiceProviderImpl() { serviceMap = new ConcurrentHashMap<>(); registeredService = ConcurrentHashMap.newKeySet(); serviceRegistry = new ZkServiceRegistryImpl(); } @Override public void addService(RpcServiceConfig rpcServiceConfig) { String rpcServiceName = rpcServiceConfig.getRpcServiceName(); if (registeredService.contains(rpcServiceName)) { return; } registeredService.add(rpcServiceName); serviceMap.put(rpcServiceName, rpcServiceConfig.getService()); log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces()); } @Override public Object getService(String rpcServiceName) { Object service = serviceMap.get(rpcServiceName); if (null == service) { throw new RpcException("SERVICE_CAN_NOT_BE_FOUND"); } return service; } @Override public void publishService(RpcServiceConfig rpcServiceConfig) { try { String host = InetAddress.getLocalHost().getHostAddress(); this.addService(rpcServiceConfig); serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(), new InetSocketAddress(host, SocketRpcServer.port)); } catch (UnknownHostException e) { log.error("occur exception when getHostAddress", e); } }}

package common.register.zookeeper;import common.register.ServiceRegistry;import lombok.extern.slf4j.Slf4j;import org.apache.curator.framework.Curatorframework;import java.net.InetSocketAddress;@Slf4jpublic class ZkServiceRegistryImpl implements ServiceRegistry { @Override public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) { String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString(); Curatorframework zkClient = CuratorUtils.getZkClient(); CuratorUtils.createPersistentNode(zkClient, servicePath); }}

package common.register.zookeeper;import common.Exceptions.RpcException;import common.dto.RpcRequest;import common.loadbalance.LoadBalance;import common.loadbalance.loadbalancer.RandomLoadBalance;import common.register.ServiceDiscovery;import lombok.extern.slf4j.Slf4j;import org.apache.curator.framework.Curatorframework;import java.net.InetSocketAddress;import java.util.List;@Slf4jpublic class ZkServiceDiscoveryImpl implements ServiceDiscovery { private final LoadBalance loadBalance; public ZkServiceDiscoveryImpl() { //this.loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance"); this.loadBalance = new RandomLoadBalance(); } @Override public InetSocketAddress lookupService(RpcRequest rpcRequest) { String rpcServiceName = rpcRequest.getRpcServiceName(); Curatorframework zkClient = CuratorUtils.getZkClient(); List serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName); if (serviceUrlList == null || serviceUrlList.size() == 0) { throw new RpcException("SERVICE_CAN_NOT_BE_FOUN", rpcServiceName); } // load balancing String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest); log.info("Successfully found the service address:[{}]", targetServiceUrl); String[] socketAddressArray = targetServiceUrl.split(":"); String host = socketAddressArray[0]; int port = Integer.parseInt(socketAddressArray[1]); return new InetSocketAddress(host, port); }}

package common.register.zookeeper;import demo4.Server.SocketRpcServer;import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;@Slf4jpublic class CustomShutdownHook { private static final CustomShutdownHook CUSTOM_SHUTDOWN_HOOK = new CustomShutdownHook(); public static CustomShutdownHook getCustomShutdownHook() { return CUSTOM_SHUTDOWN_HOOK; } public void clearAll() { log.info("addShutdownHook for clearAll"); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), SocketRpcServer.port); CuratorUtils.clearRegistry(CuratorUtils.getZkClient(), inetSocketAddress); } catch (UnknownHostException ignored) { } //ThreadPoolFactoryUtils.shutDownAllThreadPool(); })); }}

package common.register.zookeeper;import lombok.extern.slf4j.Slf4j;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.Curatorframework;import org.apache.curator.framework.CuratorframeworkFactory;import org.apache.curator.framework.imps.CuratorframeworkState;import org.apache.curator.framework.recipes.cache.PathChildrenCache;import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import java.net.InetSocketAddress;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.TimeUnit;@Slf4jpublic final class CuratorUtils { private static final int base_SLEEP_TIME = 1000; private static final int MAX_RETRIES = 3; public static final String ZK_REGISTER_ROOT_PATH = "/rpc-demo"; private static final Map> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>(); private static final Set REGISTERED_PATH_SET = ConcurrentHashMap.newKeySet(); private static Curatorframework zkClient; private static final String DEFAULT_ZOOKEEPER_ADDRESS = "127.0.0.1:2181"; private CuratorUtils() { } public static void createPersistentNode(Curatorframework zkClient, String path) { try { if (REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) { log.info("The node already exists、The node is:[{}]", path); } else { //eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999 zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); log.info("The node was created successfully、The node is:[{}]", path); } REGISTERED_PATH_SET.add(path); } catch (Exception e) { log.error("create persistent node for path [{}] fail", path); } } public static List getChildrenNodes(Curatorframework zkClient, String rpcServiceName) { if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) { return SERVICE_ADDRESS_MAP.get(rpcServiceName); } List result = null; String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName; try { result = zkClient.getChildren().forPath(servicePath); SERVICE_ADDRESS_MAP.put(rpcServiceName, result); registerWatcher(rpcServiceName, zkClient); } catch (Exception e) { log.error("get children nodes for path [{}] fail", servicePath); } return result; } public static void clearRegistry(Curatorframework zkClient, InetSocketAddress inetSocketAddress) { REGISTERED_PATH_SET.stream().parallel().forEach(p -> { try { if (p.endsWith(inetSocketAddress.toString())) { zkClient.delete().forPath(p); } } catch (Exception e) { log.error("clear registry for path [{}] fail", p); } }); log.info("All registered services on the server are cleared:[{}]", REGISTERED_PATH_SET.toString()); } public static Curatorframework getZkClient() { // check if user has set zk address //Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue()); //String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS; String zookeeperAddress = DEFAULT_ZOOKEEPER_ADDRESS; // if zkClient has been started, return directly if (zkClient != null && zkClient.getState() == CuratorframeworkState.STARTED) { return zkClient; } // Retry strategy、Retry 3 times, and will increase the sleep time between retries. RetryPolicy retryPolicy = new ExponentialBackoffRetry(base_SLEEP_TIME, MAX_RETRIES); zkClient = CuratorframeworkFactory.builder() // the server to connect to (can be a server list) .connectString(zookeeperAddress) .retryPolicy(retryPolicy) .build(); zkClient.start(); try { // wait 30s until connect to the zookeeper if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) { throw new RuntimeException("Time out waiting to connect to ZK!"); } } catch (InterruptedException e) { e.printStackTrace(); } return zkClient; } private static void registerWatcher(String rpcServiceName, Curatorframework zkClient) throws Exception { String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName; PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true); PathChildrenCacheListener pathChildrenCacheListener = (curatorframework, pathChildrenCacheEvent) -> { List serviceAddresses = curatorframework.getChildren().forPath(servicePath); SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses); }; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); pathChildrenCache.start(); }}

package common.singletonfactory;import java.lang.reflect.InvocationTargetException;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public final class SingletonFactory { private static final Map OBJECT_MAP = new ConcurrentHashMap<>(); private SingletonFactory() { } public static T getInstance(Class c) { if (c == null) { throw new IllegalArgumentException(); } String key = c.toString(); if (OBJECT_MAP.containsKey(key)) { return c.cast(OBJECT_MAP.get(key)); } else { return c.cast(OBJECT_MAP.computeIfAbsent(key, k -> { try { return c.getDeclaredConstructor().newInstance(); } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new RuntimeException(e.getMessage(), e); } })); } }}

package demo4.Client;import common.dto.RpcRequest;import common.dto.RpcResponse;import common.dto.RpcServiceConfig;import common.transport.RpcRequestTransport;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.UUID;@Slf4jpublic class RpcClientProxy implements InvocationHandler { private static final String INTERFACE_NAME = "interfaceName"; private final RpcRequestTransport rpcRequestTransport; private final RpcServiceConfig rpcServiceConfig; public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = rpcServiceConfig; } public RpcClientProxy(RpcRequestTransport rpcRequestTransport) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = new RpcServiceConfig(); } @SuppressWarnings("unchecked") public T getProxy(Class clazz) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this); } @SneakyThrows @SuppressWarnings("unchecked") @Override public Object invoke(Object proxy, Method method, Object[] args) { log.info("invoked method: [{}]", method.getName()); RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName()) .parameters(args) .interfaceName(method.getDeclaringClass().getName()) .paramTypes(method.getParameterTypes()) .requestId(UUID.randomUUID().toString()) .group(rpcServiceConfig.getGroup()) .version(rpcServiceConfig.getVersion()) .build(); log.info(rpcRequest.toString()); RpcResponse rpcResponse = null; rpcResponse = (RpcResponse) rpcRequestTransport.sendRpcRequest(rpcRequest); return rpcResponse.getData(); }}

package demo4.Client;import common.dto.RpcRequest;import common.dto.RpcResponse;import common.dto.RpcServiceConfig;import common.transport.RpcRequestTransport;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.UUID;@Slf4jpublic class RpcClientProxy implements InvocationHandler { private static final String INTERFACE_NAME = "interfaceName"; private final RpcRequestTransport rpcRequestTransport; private final RpcServiceConfig rpcServiceConfig; public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = rpcServiceConfig; } public RpcClientProxy(RpcRequestTransport rpcRequestTransport) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = new RpcServiceConfig(); } @SuppressWarnings("unchecked") public T getProxy(Class clazz) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this); } @SneakyThrows @SuppressWarnings("unchecked") @Override public Object invoke(Object proxy, Method method, Object[] args) { log.info("invoked method: [{}]", method.getName()); RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName()) .parameters(args) .interfaceName(method.getDeclaringClass().getName()) .paramTypes(method.getParameterTypes()) .requestId(UUID.randomUUID().toString()) .group(rpcServiceConfig.getGroup()) .version(rpcServiceConfig.getVersion()) .build(); log.info(rpcRequest.toString()); RpcResponse rpcResponse = null; rpcResponse = (RpcResponse) rpcRequestTransport.sendRpcRequest(rpcRequest); return rpcResponse.getData(); }}

package demo4.Client;import common.dto.RpcRequest;import common.dto.RpcResponse;import common.dto.RpcServiceConfig;import common.transport.RpcRequestTransport;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.UUID;@Slf4jpublic class RpcClientProxy implements InvocationHandler { private static final String INTERFACE_NAME = "interfaceName"; private final RpcRequestTransport rpcRequestTransport; private final RpcServiceConfig rpcServiceConfig; public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = rpcServiceConfig; } public RpcClientProxy(RpcRequestTransport rpcRequestTransport) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = new RpcServiceConfig(); } @SuppressWarnings("unchecked") public T getProxy(Class clazz) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this); } @SneakyThrows @SuppressWarnings("unchecked") @Override public Object invoke(Object proxy, Method method, Object[] args) { log.info("invoked method: [{}]", method.getName()); RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName()) .parameters(args) .interfaceName(method.getDeclaringClass().getName()) .paramTypes(method.getParameterTypes()) .requestId(UUID.randomUUID().toString()) .group(rpcServiceConfig.getGroup()) .version(rpcServiceConfig.getVersion()) .build(); log.info(rpcRequest.toString()); RpcResponse rpcResponse = null; rpcResponse = (RpcResponse) rpcRequestTransport.sendRpcRequest(rpcRequest); return rpcResponse.getData(); }}

package demo4.Server;import common.dto.RpcRequest;import common.dto.RpcResponse;import common.dto.RpcServiceConfig;import common.provider.ServiceProvider;import common.provider.impl.ZkServiceProviderImpl;import common.register.zookeeper.CustomShutdownHook;import common.singletonfactory.SingletonFactory;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;@Slf4jpublic class SocketRpcServer { public static final int port = 9999; //private final ExecutorService threadPool; private final ServiceProvider serviceProvider; private final RpcServerHandler rpcRequestHandler; public SocketRpcServer() { //threadPool = ThreadPoolFactoryUtils.createCustomThreadPoolIfAbsent("socket-server-rpc-pool"); serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class); rpcRequestHandler = new RpcServerHandler(); } public void registerService(RpcServiceConfig rpcServiceConfig) { serviceProvider.publishService(rpcServiceConfig); } public void start() { try (ServerSocket server = new ServerSocket()) { String host = InetAddress.getLocalHost().getHostAddress(); server.bind(new InetSocketAddress(host, port)); CustomShutdownHook.getCustomShutdownHook().clearAll(); Socket socket; while ((socket = server.accept()) != null) { log.info("client connected [{}]", socket.getInetAddress()); process(socket); //threadPool.execute(new SocketRpcRequestHandlerRunnable(socket)); } //threadPool.shutdown(); } catch (IOException e) { log.error("occur IOException:", e); } catch (Exception e) { log.error("occur Exception:", e); } } public void process(Socket socket) throws Exception { try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) { RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); Object result = rpcRequestHandler.handle(rpcRequest); objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId())); objectOutputStream.flush(); } catch (IOException | ClassNotFoundException e) { log.error("occur exception:", e); } }}

package demo4;import common.Hello;import common.HelloService;import common.dto.RpcServiceConfig;import common.transport.Socket.SocketRpcRequestTransport;import demo4.Client.RpcClientProxy;public class SocketClientMain { public static void main(String[] args) { RpcClientProxy rpcClientProxy = new RpcClientProxy(new SocketRpcRequestTransport(),new RpcServiceConfig()); HelloService helloService = rpcClientProxy.getProxy(HelloService.class); String hello = helloService.hello(new Hello("111", "222")); System.out.println(hello); }}

public class SocketServerMain { public static void main(String[] args) { SocketRpcServer socketRpcServer = new SocketRpcServer(); RpcServiceConfig rpcServiceConfig = new RpcServiceConfig(); rpcServiceConfig.setService(new HelloServiceImpl()); socketRpcServer.registerService(rpcServiceConfig); socketRpcServer.start(); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。