Netty 入门学习

360影视 2025-01-13 16:42 2

摘要:最开始: Akka Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题 Spark 1.6:支持配置使用 Akka 或者 NettySpark 2:完全废弃Akka,全部使用Netty

学习Spark源码绕不开通信,Spark通信是基于Netty实现的,所以先简单学习总结一下Netty。

最开始: Akka Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题 Spark 1.6:支持配置使用 Akka 或者 Netty
Spark 2:完全废弃Akka,全部使用Netty

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。 Spark 借鉴Akka 通过 Netty 实现了类似的简约版的Actor 模型

// 创建ServerBootstrap实例,服务器启动对象ServerBootstrap bootstrap = new ServerBootstrap;ChannelFuture channelFuture = Bootstrap.bind(8888).sync;// 等待服务器关闭channelFuture.channel.closeFuture.sync;

主要是启动 ServerBootstrap、绑定端口、等待关闭。

Client 主要代码:

// 创建Bootstrap实例,客户端启动对象Bootstrap bootstrap = new Bootstrap;// 连接服务端ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync;

Server 添加 Handler

bootstrap.childHandler(new ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws exception { socketChannel.pipeline.addLast(new Serverhandler); }});bootstrap.handler(new ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline.addLast(new ClientHandler); }});

这里的 ServerHandler 和 ClientHandler 都是自己实现的类,处理具体的逻辑。 如channelActive 建立连接时发消息给服务器,channelRead 读取数据时调用,处理读取数据的逻辑。给服务器或者客户端发消息可以用 writeAndFlush 方法。

地址:https://gitee.com/dongkelun/java-learning/tree/master/netty-learning/src/main/java/com/dkl/java/demo

NettyServer

package com.dkl.java.demo;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.Socket.nio.NioServerSocketChannel;public class NettyServer { public static void main(String args) { try { bind; } catch (InterruptedException e) { throw new RuntimeException(e); } } public static void bind throws InterruptedException { // 创建boss线程组,用于接收连接 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 创建worker线程组,用于处理连接上的I/O操作,含有子线程NioEventGroup个数为CPU核数大小的2倍 EventLoopGroup workerGroup = new NioEventLoopGroup; try { // 创建ServerBootstrap实例,服务器启动对象 ServerBootstrap bootstrap = new ServerBootstrap; // 使用链式编程配置参数 // 将boss线程组和worker线程组暂存到ServerBootstrap bootstrap.group(bossGroup, workerGroup); // 设置服务端Channel类型为NioServerSocketChannel作为通道实现 bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加ServerHandler到ChannelPipeline,对workerGroup的SocketChannel(客户端)设置处理器 socketChannel.pipeline.addLast(new ServerHandler); } }); // 设置启动参数,初始化服务器连接队列大小。服务端处理客户端连接请求是顺序处理,一个时间内只能处理一个客户端请求 // 当有多个客户端同时来请求时,未处理的请求先放入队列中 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 绑定端口并启动服务器,bind方法是异步的,sync方法是等待异步操作执行完成,返回ChannelFuture异步对象 ChannelFuture channelFuture = bootstrap.bind(8888).sync; // 等待服务器关闭 channelFuture.channel.closeFuture.sync; } finally { // 优雅地关闭boss线程组 bossGroup.shutdownGracefully; // 优雅地关闭worker线程组 workerGroup.shutdownGracefully; } }}

ServerHandler

package com.dkl.java.demo;import io.netty.buffer.byteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;import io.netty.util.ReferenceCountUtil;public class ServerHandler extends ChannelInboundHandlerAdapter { /** * 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用 * * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("执行 channelRegistered"); } /** * 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调 * 用 * * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("执行 channelUnregistered"); } /** * 当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("执行 channelActive"); } /** * 当 Channel 离开活动状态并且不再连接它的远程节点时被调用 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("执行 channelInactive"); } /** * 当从 Channel 读取数据时被调用 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("执行 channelRead"); // 处理接收到的数据 ByteBuf byteBuf = (ByteBuf) msg; try { // 将接收到的字节数据转换为字符串 String message = byteBuf.toString(CharsetUtil.UTF_8); // 打印接收到的消息 System.out.println("Server端收到客户消息: " + message); // 发送响应消息给客户端 ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端,我收到你的消息啦~", CharsetUtil.UTF_8)); } finally { // 释放ByteBuf资源 ReferenceCountUtil.release(byteBuf); } } /** * 当 Channel 上的一个读操作完成时被调用,对通道的读取完成的事件或通知。当读取完成可通知发送方或其他的相关方,告诉他们接受方读取完成 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("执行 channelReadComplete"); } /** * 当 ChannelnboundHandler.fireUserEventTriggered方法被调用时被 * 调用 * * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("执行 userEventTriggered"); } /** * 当 Channel 的可写状态发生改变时被调用。可以通过调用 Channel 的 isWritable方法 * * 来检测 Channel 的可写性。与可写性相关的阈值可以通过 * * Channel.config.setWriteHighWaterMark和 Channel.config.setWriteLowWaterMark方法来 * * 设置 * * @param ctx * @throws Exception */ @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("执行 channelWritabilityChanged"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("执行 exceptionCaught"); // 异常处理 cause.printStackTrace; ctx.close; }}

NettyClient

package com.dkl.java.demo;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient { public static void main(String args) { start; } public static void start { // 创建EventLoopGroup,用于处理客户端的I/O操作 EventLoopGroup groupThread = new NioEventLoopGroup; try { // 创建Bootstrap实例,客户端启动对象 Bootstrap bootstrap = new Bootstrap; bootstrap.group(groupThread); // 设置服务端Channel类型为NioSocketChannel作为通道实现 bootstrap.channel(NioSocketChannel.class); // 设置客户端处理 bootstrap.handler(new ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline.addLast(new ClientHandler); } }); // 绑定端口 ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync; channelFuture.channel.closeFuture.sync; } catch (InterruptedException e) { throw new RuntimeException(e); } finally { // 优雅地关闭线程 groupThread.shutdownGracefully; } }}

ClientHandler

package com.dkl.java.demo;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { // 连接建立时的处理,发送请求消息给服务器 ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端!我是客户端,测试通道连接", CharsetUtil.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 处理接收到的数据 ByteBuf byteBuf = (ByteBuf) msg; try { // 将接收到的字节数据转换为字符串 String message = byteBuf.toString(CharsetUtil.UTF_8); // 打印接收到的消息 System.out.println("受到服务端响应的消息: " + message); // TODO: 对数据进行业务处理 } finally { // 释放ByteBuf资源 ReferenceCountUtil.release(byteBuf); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 异常处理 cause.printStackTrace; ctx.close; }}

Server 端

连接时:执行 channelRegistered执行 channelActive执行 channelRead执行 channelReadComplete断开连接时:执行 channelReadComplete(强制中断 Client 连接执行 exceptionCaught执行 userEventTriggered (exceptionCaught 中 ctx.close) 触发)执行 channelInactive执行 channelUnregisteredchannelReadComplete 中 ctx.close; 触发:执行 channelInactive执行 channelUnregistered

Client 端

执行 channelRegistered执行 channelActive执行 channelRead执行 channelReadComplete

•Spark版本:3.2.3•Server: org.apache.spark.network.server.TransportServer.init•Client: org.apache.spark.network.client.TransportClientFactory.createClient

来源:散文随风想

相关推荐