什么是Netty?
Netty是一个NIO客户机-服务器框架,它支持快速而容易地开发网络应用程序,如协议服务器和客户机。它大大简化和简化了网络编程,如TCP和UDP套接字服务器。
“快速简单”并不意味着生成的应用程序将遭受可维护性或性能问题的困扰。Netty经过了精心的设计,其经验来自于FTP、SMTP、HTTP以及各种基于二进制和文本的遗留协议的实现。因此,Netty成功地找到了一种不妥协地实现易开发性、性能、稳定性和灵活性的方法。
Netty 的应用场景
互联网行业
1) 互联网行业: 在分布式系统中, 各个节点之间需要远程服务调用, 高性能的 RPC 框架必不可少, Netty 作为异步高性能的通信框架, 往往作为基础通信组件被这些 RPC 框架使用。
2) 典型的应用有: 阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信, Dubbo 协议默认使用 Netty 作为基础通信组件, 用于实现各进程节点之间的内部通信。
游戏行业
1) 无论是手游服务端还是大型的网络游戏, Java 语言得到了越来越广泛的应用。
2) Netty 作为高性能的基础通信组件, 提供了 TCP/UDP 和 HTTP 协议栈, 方便定制和开发私有协议栈, 账号登录服务器。
3) 地图服务器之间可以方便的通过 Netty 进行高性能的通信。
大数据领域
1) 经典的 Hadoop 高性能通信和序列化组件 Avro 的 RPC 框架, 默认采用 Netty 进行跨界点通信。
2) 它的 Netty Service 基于 Netty 框架二次封装实现。
代码示例
maven依赖
<!--netty依赖-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
服务端
package com.example.netty.io;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* netty 服务端
*
* @author lanx
* @date 2022/3/20
*/
public class Server {
public static void main(String[] args) throws InterruptedException {
//用户接收客户端连接的线程工作组
EventLoopGroup bossGroup = new NioEventLoopGroup();
//用于接收客户端连接读写操作的线程组
EventLoopGroup workerGroup = new NioEventLoopGroup();
//辅助类 帮我我们创建netty服务
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//绑定两个工作组
.channel(NioServerSocketChannel.class)//设置NIO模式
//option 针对于服务端配置; childOption 针对于客户端连接通道配置
.option(ChannelOption.SO_BACKLOG, 1024)//设置tcp缓冲区
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)//设置发送数据的缓存大小
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024)//设置读取数据的缓存大小
.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持长连接
//初始化绑定服务通道
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//为通道进行初始化:数据传输过来的时候会进行拦截和执行 (可以有多个拦截器)
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
//释放连接
cf.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
package com.example.netty.io;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* 服务端 监听器
*
* @author lanx
* @date 2022/3/20
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当我们的通道被激活的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------服务端通道激活---------");
}
/**
* 当我们通道里有数据进行读取的时候触发的监听
*
* @param ctx netty服务上下文
* @param msg 实际传输的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// NIO 通信 (传输的数据是什么? ---------> Buffer 对象)
ByteBuf buf = (ByteBuf) msg;
//定义byte数组
byte[] req = new byte[buf.readableBytes()];
// 从缓冲区获取数据到 req
buf.readBytes(req);
//读取到的数据转换为字符串
String body = new String(req, "utf-8");
System.out.println("服务端读取到数据:" + body);
//响应给客户端的数据
ctx.writeAndFlush(Unpooled.copiedBuffer("netty server response data".getBytes()));
// 添加 addListener 可以触发关闭通道监听事件(客户端短连接场景使用)
// .addListener(ChannelFutureListener.CLOSE);
}catch (Exception e){
e.printStackTrace();
}finally {
//释放数据
ReferenceCountUtil.release(msg);
}
}
/**
* 当我们读取完成数据的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------服务端数据读取完毕---------");
}
/**
* 当我们读取数据异常的时候触发的监听
*
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("--------服务端数据读取异常---------");
ctx.close();
}
}
客户端
package com.example.netty.io;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* netty 客户端
* @author lanx
* @date 2022/3/20
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
//线程工作组
EventLoopGroup workerGroup = new NioEventLoopGroup();
//辅助类 帮我我们创建netty服务
Bootstrap b = new Bootstrap();
b.group( workerGroup)//绑定两个工作组
.channel(NioSocketChannel.class)//设置NIO模式
//初始化绑定服务通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//为通道进行初始化:数据传输过来的时候会进行拦截和执行 (可以有多个拦截器)
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1",8765).syncUninterruptibly();
cf.channel().writeAndFlush(Unpooled.copiedBuffer("netty client request data".getBytes()));
//释放连接
cf.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
}
}
package com.example.netty.io;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* 客户端 监听器
*
* @author lanx
* @date 2022/3/20
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当我们的通道被激活的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------客户端通道激活---------");
}
/**
* 当我们通道里有数据进行读取的时候触发的监听
*
* @param ctx netty服务上下文
* @param msg 实际传输的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// NIO 通信 (传输的数据是什么? ---------> Buffer 对象)
ByteBuf buf = (ByteBuf) msg;
//定义byte数组
byte[] req = new byte[buf.readableBytes()];
// 从缓冲区获取数据到 req
buf.readBytes(req);
//读取到的数据转换为字符串
String body = new String(req, "utf-8");
System.out.println("客户端读取到数据:" + body);
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放数据 (如果你读取数据后又写出去数据就不需要调用此方法,因为底层代码帮忙实现额释放数据)
ReferenceCountUtil.release(msg);
}
}
/**
* 当我们读取完成数据的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------客户端数据读取完毕---------");
}
/**
* 当我们读取数据异常的时候触发的监听
*
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("--------客户端数据读取异常---------");
ctx.close();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/10522.html