增加注册中心工作内容主要是服务注册、服务发现、服务下线,负载均衡。并且将服务端提供的服务用 ZkServiceProviderImpl 保存。
注册中心使用 ZooKeeper 实现,使用 Curator 框架对 ZooKeeper 进行调用。
服务注册包括注册服务的方法,将服务写到 ZooKeeper 中。
服务发现包括发现服务的方法,通过负载均衡选择服务地址。
服务下线包括删除所有服务端的服务。
负载均衡这里只实现了最简单的随机分配。
ZkServiceProviderImpl 需要使用单例模式,增加了 SingletonFactory 类。
为了方便不同的服务端之间进行区分,增加了 RpcServiceConfig 类,主要用 version 和 group 来区分。同时将之间的 Service 和 Client 类优化,提取了启动的关键代码,将其他过程放到 Client 和 Server 文件夹内。
以下是部分重要代码:
package common.dto;import lombok.*;@AllArgsConstructor@NoArgsConstructor@Getter@Setter@Builder@ToStringpublic class RpcServiceConfig { private String version = ""; private String group = ""; private Object service; public String getRpcServiceName() { return this.getServiceName() + this.getGroup() + this.getVersion(); } public String getServiceName() { return this.service.getClass().getInterfaces()[0].getCanonicalName(); }}
package common.provider.impl;import common.Exceptions.RpcException;import common.dto.RpcServiceConfig;import common.provider.ServiceProvider;import common.register.ServiceRegistry;import common.register.zookeeper.ZkServiceRegistryImpl;import demo4.Server.SocketRpcServer;import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;@Slf4jpublic class ZkServiceProviderImpl implements ServiceProvider { private final Map
package common.register.zookeeper;import common.register.ServiceRegistry;import lombok.extern.slf4j.Slf4j;import org.apache.curator.framework.Curatorframework;import java.net.InetSocketAddress;@Slf4jpublic class ZkServiceRegistryImpl implements ServiceRegistry { @Override public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) { String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString(); Curatorframework zkClient = CuratorUtils.getZkClient(); CuratorUtils.createPersistentNode(zkClient, servicePath); }}
package common.register.zookeeper;import common.Exceptions.RpcException;import common.dto.RpcRequest;import common.loadbalance.LoadBalance;import common.loadbalance.loadbalancer.RandomLoadBalance;import common.register.ServiceDiscovery;import lombok.extern.slf4j.Slf4j;import org.apache.curator.framework.Curatorframework;import java.net.InetSocketAddress;import java.util.List;@Slf4jpublic class ZkServiceDiscoveryImpl implements ServiceDiscovery { private final LoadBalance loadBalance; public ZkServiceDiscoveryImpl() { //this.loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance"); this.loadBalance = new RandomLoadBalance(); } @Override public InetSocketAddress lookupService(RpcRequest rpcRequest) { String rpcServiceName = rpcRequest.getRpcServiceName(); Curatorframework zkClient = CuratorUtils.getZkClient(); List
package common.register.zookeeper;import demo4.Server.SocketRpcServer;import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;@Slf4jpublic class CustomShutdownHook { private static final CustomShutdownHook CUSTOM_SHUTDOWN_HOOK = new CustomShutdownHook(); public static CustomShutdownHook getCustomShutdownHook() { return CUSTOM_SHUTDOWN_HOOK; } public void clearAll() { log.info("addShutdownHook for clearAll"); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), SocketRpcServer.port); CuratorUtils.clearRegistry(CuratorUtils.getZkClient(), inetSocketAddress); } catch (UnknownHostException ignored) { } //ThreadPoolFactoryUtils.shutDownAllThreadPool(); })); }}
package common.register.zookeeper;import lombok.extern.slf4j.Slf4j;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.Curatorframework;import org.apache.curator.framework.CuratorframeworkFactory;import org.apache.curator.framework.imps.CuratorframeworkState;import org.apache.curator.framework.recipes.cache.PathChildrenCache;import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import java.net.InetSocketAddress;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.TimeUnit;@Slf4jpublic final class CuratorUtils { private static final int base_SLEEP_TIME = 1000; private static final int MAX_RETRIES = 3; public static final String ZK_REGISTER_ROOT_PATH = "/rpc-demo"; private static final Map
package common.singletonfactory;import java.lang.reflect.InvocationTargetException;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public final class SingletonFactory { private static final Map
package demo4.Client;import common.dto.RpcRequest;import common.dto.RpcResponse;import common.dto.RpcServiceConfig;import common.transport.RpcRequestTransport;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.UUID;@Slf4jpublic class RpcClientProxy implements InvocationHandler { private static final String INTERFACE_NAME = "interfaceName"; private final RpcRequestTransport rpcRequestTransport; private final RpcServiceConfig rpcServiceConfig; public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = rpcServiceConfig; } public RpcClientProxy(RpcRequestTransport rpcRequestTransport) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = new RpcServiceConfig(); } @SuppressWarnings("unchecked") public
package demo4.Client;import common.dto.RpcRequest;import common.dto.RpcResponse;import common.dto.RpcServiceConfig;import common.transport.RpcRequestTransport;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.UUID;@Slf4jpublic class RpcClientProxy implements InvocationHandler { private static final String INTERFACE_NAME = "interfaceName"; private final RpcRequestTransport rpcRequestTransport; private final RpcServiceConfig rpcServiceConfig; public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = rpcServiceConfig; } public RpcClientProxy(RpcRequestTransport rpcRequestTransport) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = new RpcServiceConfig(); } @SuppressWarnings("unchecked") public
package demo4.Client;import common.dto.RpcRequest;import common.dto.RpcResponse;import common.dto.RpcServiceConfig;import common.transport.RpcRequestTransport;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.UUID;@Slf4jpublic class RpcClientProxy implements InvocationHandler { private static final String INTERFACE_NAME = "interfaceName"; private final RpcRequestTransport rpcRequestTransport; private final RpcServiceConfig rpcServiceConfig; public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = rpcServiceConfig; } public RpcClientProxy(RpcRequestTransport rpcRequestTransport) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = new RpcServiceConfig(); } @SuppressWarnings("unchecked") public
package demo4.Server;import common.dto.RpcRequest;import common.dto.RpcResponse;import common.dto.RpcServiceConfig;import common.provider.ServiceProvider;import common.provider.impl.ZkServiceProviderImpl;import common.register.zookeeper.CustomShutdownHook;import common.singletonfactory.SingletonFactory;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;@Slf4jpublic class SocketRpcServer { public static final int port = 9999; //private final ExecutorService threadPool; private final ServiceProvider serviceProvider; private final RpcServerHandler rpcRequestHandler; public SocketRpcServer() { //threadPool = ThreadPoolFactoryUtils.createCustomThreadPoolIfAbsent("socket-server-rpc-pool"); serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class); rpcRequestHandler = new RpcServerHandler(); } public void registerService(RpcServiceConfig rpcServiceConfig) { serviceProvider.publishService(rpcServiceConfig); } public void start() { try (ServerSocket server = new ServerSocket()) { String host = InetAddress.getLocalHost().getHostAddress(); server.bind(new InetSocketAddress(host, port)); CustomShutdownHook.getCustomShutdownHook().clearAll(); Socket socket; while ((socket = server.accept()) != null) { log.info("client connected [{}]", socket.getInetAddress()); process(socket); //threadPool.execute(new SocketRpcRequestHandlerRunnable(socket)); } //threadPool.shutdown(); } catch (IOException e) { log.error("occur IOException:", e); } catch (Exception e) { log.error("occur Exception:", e); } } public void process(Socket socket) throws Exception { try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) { RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); Object result = rpcRequestHandler.handle(rpcRequest); objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId())); objectOutputStream.flush(); } catch (IOException | ClassNotFoundException e) { log.error("occur exception:", e); } }}
package demo4;import common.Hello;import common.HelloService;import common.dto.RpcServiceConfig;import common.transport.Socket.SocketRpcRequestTransport;import demo4.Client.RpcClientProxy;public class SocketClientMain { public static void main(String[] args) { RpcClientProxy rpcClientProxy = new RpcClientProxy(new SocketRpcRequestTransport(),new RpcServiceConfig()); HelloService helloService = rpcClientProxy.getProxy(HelloService.class); String hello = helloService.hello(new Hello("111", "222")); System.out.println(hello); }}
public class SocketServerMain { public static void main(String[] args) { SocketRpcServer socketRpcServer = new SocketRpcServer(); RpcServiceConfig rpcServiceConfig = new RpcServiceConfig(); rpcServiceConfig.setService(new HelloServiceImpl()); socketRpcServer.registerService(rpcServiceConfig); socketRpcServer.start(); }}