欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

okhttp3url轮询负载均衡和重试切换失效url拦截器

时间:2023-06-09
okhttp3添加拦截器

new OkHttpClient.Builder().addInterceptor(new PollAndRetryInterceptor(urls)).build()

自定义轮询和重试切换Url拦截器

拦截器实现思路:
1、需要一个简单的算法来切换url,直接使用RoundRobinRule的算法
2、请求失败后需要重试,即捕获异常然后递归切换url重试
3、不能一直使用到失效的url,url重新恢复使用后需要加入到切换url队列中,因此我们需要一个定时任务来定时的检测url的可用性

主要代码

1、将url设置为有状态的对象,切换时取可用url,多次切换防止取到无用url
2、url不可用即可使用重试来兜底
3、新建ping任务来维护url的状态

定义url对象

public class Server { // url 地址 private String url; // 状态 是否可用 private Boolean isAlive; public String getUrl() { return url; } public Boolean isAlive() { return isAlive; } public void setAlive(Boolean alive) { isAlive = alive; } public Server(String url, Boolean isAlive) { this.url = url; this.isAlive = isAlive; }}

ShutdownEnabledTimer :

public class ShutdownEnabledTimer extends Timer { private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownEnabledTimer.class); private final Thread cancelThread; private final String name; public ShutdownEnabledTimer(String name, boolean daemon) { super(name, daemon); this.name = name; this.cancelThread = new Thread(new Runnable() { public void run() { ShutdownEnabledTimer.super.cancel(); } }); LOGGER.info("Shutdown hook installed for: {}", this.name); Runtime.getRuntime().addShutdownHook(this.cancelThread); } @Override public void cancel() { super.cancel(); LOGGER.info("Shutdown hook removed for: {}", this.name); try { Runtime.getRuntime().removeShutdownHook(this.cancelThread); } catch (IllegalStateException ise) { LOGGER.info("Exception caught (might be ok if at shutdown)", ise); } }}

拦截器代码:

public class PollAndRetryInterceptor implements Interceptor { private static final Logger logger = LoggerFactory.getLogger(PollAndRetryInterceptor.class); private static final String DEFAULT_NAME = "default"; // url private final List allServerList = new ArrayList<>(); // url size private final int size; // 计数:下次轮询服务下标 private final AtomicInteger nextUrlCyclicCounter; // 定时器执行ping服务 protected Timer lbTimer = null; // 定时器执行时间 private static final int PING_INTERVAL_SEConDS = 10; // 是否正在执行ping protected AtomicBoolean pingInProgress = new AtomicBoolean(false); // ping 超时时间 private static final int PING_TIME_OUT = 3000; public PollAndRetryInterceptor(String[] urls) { for (String url : urls) { allServerList.add(new Server(url, true)); } this.size = this.allServerList.size(); this.nextUrlCyclicCounter = new AtomicInteger(0); setPingTask(); } void setPingTask() { if (lbTimer != null) { lbTimer.cancel(); } lbTimer = new ShutdownEnabledTimer("PollAndRetryInterceptor-PingTimer-" + DEFAULT_NAME, true); lbTimer.schedule(new PingTask(), 0, PING_INTERVAL_SEConDS * 1000); forceQuickPing(); } public void forceQuickPing() { logger.debug("PollAndRetryInterceptor [{}]: forceQuickPing invoking", DEFAULT_NAME); try { new Pinger().runPinger(); } catch (Exception e) { logger.error("PollAndRetryInterceptor [{}]: Error running forceQuickPing()", DEFAULT_NAME, e); } } @Override public Response intercept(Chain chain) throws IOException { // 获取原始的originalRequest Request originalRequest = chain.request(); // 不需要负载均衡 if (size < 2) { return chain.proceed(originalRequest); } // 获取老的url HttpUrl oldUrl = originalRequest.url(); // 获取originalRequest的创建者builder Request.Builder builder = originalRequest.newBuilder(); // 重建新的HttpUrl,需要重新设置的url部分 HttpUrl newHttpUrl = getHttpUrl(oldUrl); // 没有可用地址 if (newHttpUrl == null) { return chain.proceed(originalRequest); } // 获取新newRequest Request newRequest = builder.url(newHttpUrl).build(); // 请求 return proceedRequest(chain, newRequest, 0); } public Response proceedRequest(Chain chain, Request request, int retryCount) throws IOException { try { return chain.proceed(request); } catch (IOException e) { // 防止切换到无效的地址、防止同一请求轮询到同一无效地址 if (retryCount++ < 10) { HttpUrl oldUrl = request.url(); // 切换url HttpUrl httpUrl = getHttpUrl(oldUrl); // 没有可用服务地址 if (httpUrl == null) { throw new ConnectException("no useful address"); } // 新建request Request newRequest = chain.request().newBuilder().url(httpUrl).build(); return proceedRequest(chain, newRequest, retryCount); } else { throw new ConnectException("no useful address"); } } } public HttpUrl getHttpUrl(HttpUrl oldUrl) { // 获取server Server server = chooseServer(); if (server == null) { return null; } // 获取新的url HttpUrl baseURL = HttpUrl.parse(server.getUrl()); // 重建新的HttpUrl,需要重新设置的url部分 assert baseURL != null; return oldUrl.newBuilder() .scheme(baseURL.scheme())// http协议如:http或者https .host(baseURL.host())// 主机地址 .port(baseURL.port())// 端口 .build(); } public Server chooseServer() { int count = 0; // 选10次,防止一个请求执行过长时间在选server上,快速失败 while (count++ < 10) { int nextServerIndex = incrementAndGetModulo(); Server server = allServerList.get(nextServerIndex); if (server.isAlive()) { return server; } } if (count >= 10) { logger.error("PollAndRetryInterceptor [{}] chooseServer: no service available", DEFAULT_NAME); } return null; } public int incrementAndGetModulo() { for (; ; ) { int current = nextUrlCyclicCounter.get(); int next = (current + 1) % size; if (nextUrlCyclicCounter.compareAndSet(current, next)) { return next; } } } class PingTask extends TimerTask { public void run() { try { new Pinger().runPinger(); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error pinging", DEFAULT_NAME, e); } } } class Pinger { public void runPinger() { if (!pingInProgress.compareAndSet(false, true)) { // 正在执行ping直接返回 return; } // 执行ping任务 try { Server[] allServers = allServerList.toArray(new Server[0]); int numCandidates = allServers.length; boolean[] results = pingServers(allServers); for (int i = 0; i < numCandidates; i++) { boolean isAlive = results[i]; Server svr = allServers[i]; svr.setAlive(isAlive); } } finally { pingInProgress.set(false); } } public boolean[] pingServers(Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; for (int i = 0; i < numCandidates; i++) { // serially Server server = servers[i]; results[i] = pingURL(server.getUrl(), PING_TIME_OUT); } return results; } public boolean pingURL(String url, int timeout) { try { HttpUrl baseURL = HttpUrl.parse(url); assert baseURL != null; Socket socket = new Socket(); socket.connect(new InetSocketAddress(baseURL.host(), baseURL.port()), timeout); return true; } catch (IOException exception) { return false; } } }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。