欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SpringBoot+Netty实现远程过程调用

时间:2023-06-20
1 基本架构 2 代码实现 2.1 Netty服务端:

# 目录结构sp_netty_server ... - handler - NettyServerHandler.java - rpc - NettyServer.java - ServerChannelInitializer.java SpNettyServerApplication.java

依赖和配置文件:

org.springframework.boot spring-boot-starter-web org.projectlombok lombok 1.18.2 true io.netty netty-all 4.1.17.Final

server.port=8801

具体代码:

NettyServerHandler.java

@Slf4jpublic class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("Channel active ..."); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("服务器收到消息: {}", msg.toString()); ctx.write(msg.toString()+",Hello too !"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}

NettyServer.java

@Slf4jpublic class NettyServer { public void start() { InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8082); //new 一个主线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //new 一个工作线程组 EventLoopGroup workGroup = new NioEventLoopGroup(200); ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializer()) .localAddress(socketAddress) //设置队列大小 .option(ChannelOption.SO_BACKLOG, 1024) // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.SO_KEEPALIVE, true); //绑定端口,开始接收进来的连接 try { ChannelFuture future = bootstrap.bind(socketAddress).sync(); log.info("服务器启动开始监听端口: {}", socketAddress.getPort()); future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("服务器开启失败", e); } finally { //关闭主线程组 bossGroup.shutdownGracefully(); //关闭工作线程组 workGroup.shutdownGracefully(); } }}

ServerChannelInitializer.java

public class ServerChannelInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //添加编解码 socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new NettyServerHandler()); }}

SpNettyServerApplication.java

@Slf4j@SpringBootApplicationpublic class SpNettyServerApplication extends SpringBootServletInitializer { @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(SpNettyServerApplication.class); } public static void main(String[] args) { SpringApplication.run(SpNettyServerApplication.class, args); //开启Netty服务 NettyServer nettyServer = new NettyServer(); nettyServer.start(); log.info("server is running ..."); }}

2.2 Netty客户端:

# 目录结构sp_netty_client ... - controller - HelloController.java - data - ResponseResult.java - handler - NettyClientHandler.java - rpc - NettyClientUtil.java SpNettyClientApplication.java

具体代码:

SpNettyClientApplication.java

@SpringBootApplicationpublic class SpNettyClientApplication { public static void main(String[] args) { SpringApplication.run(SpNettyClientApplication.class, args); }}

NettyClientUtil.java

@Slf4jpublic class NettyClientUtil { public static ResponseResult helloNetty(String msg) { NettyClientHandler nettyClientHandler = new NettyClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap() .group(group) //该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输 .option(ChannelOption.TCP_NODELAY, true) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("decoder", new StringDecoder()); socketChannel.pipeline().addLast("encoder", new StringEncoder()); socketChannel.pipeline().addLast(nettyClientHandler); } }); try { ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync(); log.info("客户端发送成功...."); //发送消息 future.channel().writeAndFlush(msg); // 等待连接被关闭 future.channel().closeFuture().sync(); return nettyClientHandler.getResponseResult(); } catch (Exception e) { log.error("客户端Netty失败", e); } finally { //以一种优雅的方式进行线程退出 group.shutdownGracefully(); } return new ResponseResult(500, null, "server error"); }}

NettyClientHandler.java

@Slf4j@Setter@Getterpublic class NettyClientHandler extends ChannelInboundHandlerAdapter { private ResponseResult responseResult; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("client is running ..."); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("客户端收到消息: {}", msg.toString()); this.responseResult = new ResponseResult(200, null, msg.toString()); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}

ResponseResult.java

@Setter@Getter@ToString@NoArgsConstructor@AllArgsConstructorpublic class ResponseResult implements Serializable { private int code; private Object data; private String msg;}

HelloController.java

@RestControllerpublic class HelloController { @RequestMapping("/hello/{name}") public ResponseResult helloNetty(@PathVariable("name") String name) { return NettyClientUtil.helloNetty(name); }}

2.3 测试


参考:

https://xiyuan.blog.csdn.net/article/details/109248198

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。