netty channelfuture_thrift netty

netty channelfuture_thrift netty文章目录ChannelChannel的常用方法ChannelFutureCloseFutureChannelChannel的常用方法close()关闭channelcloseFuture()处理关

Channel

Channel的常用方法

  • close() 关闭channel
  • closeFuture() 处理关闭channel后要执行的操作
    • sync()方法是同步等待channel的关闭
    • addListener()方法是异步等待channel的关闭
  • pipeline()添加处理器
  • write() 将数据写入,netty里面其实还有一个缓冲区,当我们执行flush()方法后或者是缓冲区中数据达到一个数量了就会将缓冲区的数据发送出去
  • writeAndFlush() 将数据写入并刷出

ChannelFuture

解决客户端连接服务器 connect()方法 异步非阻塞 问题

一个普通netty客户端的代码如下

public class HelloClient2 { 
   
    public static void main(String[] args) throws InterruptedException { 
   

        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() { 
   
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception { 
   
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 连接服务器 该方法返回的就是ChannelFuture
                .connect("localhost", 8080);

        // 这个sync()方法必须调用,如果不调用直接运行的话,下面发送给服务器的数据没有发送成功
        channelFuture.sync();
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("aaa");
    }
}

为什么调用该方法channelFuture.sync();就能发送数据成功嘞?

这是因为上面一行连接服务器的代码.connect("localhost", 8080); 该方法是异步非阻塞的,主线程发起了连接服务器的操作,但是连接服务器是NioEventLoopGroup中的一个线程执行,如果不调用sync()方法就会出现还没有连接成功服务器,就执行了获取channel的语句并通过获取的channel发送数据,

以后我们看到Future 或者是 Promise 的类型都是和异步方法配套使用的,他俩的用途就是正确处理结果

上面出现了问题,解决方法也有几种:

  • 第一种就是调用ChannelFuture类中的sync()方法,主线程就会阻塞,当客户端与服务器连接成功后才会往下面执行,

  • 第二种解决 调用addListener() 异步处理结果

public static void main(String[] args) throws InterruptedException { 
   

        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() { 
   
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception { 
   
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 连接服务器 该方法返回的就是ChannelFuture 该方法是异步非阻塞的
                .connect("localhost", 8080);

        // 第一种解决方法,方法同步处理结果,这里阻塞住,等客户端与服务器连接成功后在往下执行
// channelFuture.sync();
// Channel channel = channelFuture.channel();
// channel.writeAndFlush("aaa");

// 第二种解决 调用addListener() 异步处理结果,上面的同步处理结果是主线程等另一个线程连接成功后再自己拿到连接成功的结果channel
// 然后自己发送数据。而这里的异步是主线程完全就是甩手掌柜,通过channel发送数据也让其他线程来做,主线程只是提供一个回调对象
// 当客户端与服务器连接成功后,EventLoop会自动调用ChannelFutureListener对象中的operationComplete()方法。
        channelFuture.addListener(new ChannelFutureListener() { 
   
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception { 
   
                // 我们可以通过ChannelFuture对象获取channel对象 然后再发送数据
                Channel channel = channelFuture.channel();
                channel.writeAndFlush("客户端通过addListener()方法发送的数据");
            }
        });
    }

CloseFuture

处理客户端与服务器断开连接后,channel关闭后 还想执行的代码

package com.hs.nettyPrimary;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.util.Scanner;

/** * 处理客户端与服务器断开连接后,channel关闭后 还想执行的代码 * 业务需求是: * 客户端获取用户控制台的输入,输入的数据都发送给服务器端,当输入Q的时候就表示停止数据的发送 * 然后关闭channel,在channel关闭后再执行一段代码。 * @author hs * @date 2021/07/18 */
@Slf4j
public class HelloClient3 { 
   
    public static void main(String[] args) throws InterruptedException { 
   

        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() { 
   
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception { 
   
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 连接服务器
                .connect("localhost", 8080);

        channelFuture.sync();
        Channel channel = channelFuture.channel();
        log.debug("{}",channel);

        // 新开一个线程来处理客户端发送数据给服务器
        new Thread(()->{ 
   
            Scanner scanner = new Scanner(System.in);
            while (true){ 
   
                String line = scanner.nextLine();
                if ("Q".equals(line)){ 
   
                    channel.close();
                    // 这里也不行,因为close()方法是异步非阻塞的,关闭需要时间,这会出现关闭后的操作在关闭前执行
                    // log.debug("这里channel关闭后的操作!!");
                    break;
                }else{ 
   
                    channel.writeAndFlush(line);
                }
            }
        }).start();

        // 写在这里肯定不行,因为这里主线程会执行的 ,那就写在上面一个线程里面嘞?
        // log.debug("这里channel关闭后的操作!!");

    }
}

上面只是完成了获取控制台的输入并发送给服务器,如果输入Q就channel关闭,但是channel关闭后的的代码执行 还没有实现。

先多添加一个handler,是日志相关的,主要是方便看清控制台更多的输出

// 多加一个handler 是日志信息相关的
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

正在的解决方法其实就是通过channel获取一个CloseChannel对象,然后选择同步或者是异步的方式来处理

@Slf4j
public class HelloClient3 { 
   
    public static void main(String[] args) throws InterruptedException { 
   

        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() { 
   
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception { 
   
                        // 多一些日志信息
                        socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 连接服务器
                .connect("localhost", 8080);

        channelFuture.sync();
        Channel channel = channelFuture.channel();
        log.debug("{}",channel);

        // 新开一个线程来处理客户端发送数据给服务器
        new Thread(()->{ 
   
            Scanner scanner = new Scanner(System.in);
            while (true){ 
   
                String line = scanner.nextLine();
                if ("Q".equals(line)){ 
   
                    channel.close();
                    break;
                }else{ 
   
                    channel.writeAndFlush(line);
                }
            }
        },"input").start();

        // 正确实现channel关闭后要执行的业务逻辑
        // 1. 要获取一个 CloseFuture对象,同样它也有同步处理关闭和异步处理关闭两种方式
        ChannelFuture closeFuture = channel.closeFuture();
        // 2. 同步处理关闭 ,这里会将主线程阻塞,等channel关闭后才会继续往下执行
// log.debug("wait Channel close....");
// closeFuture.sync();
// log.debug("channel关闭后操作");
        
        // 2. 异步处理,异步处理,上面真正执行关闭channel的EventLoop线程 成功关闭channel后就会调用这里面的方法
        closeFuture.addListener(new ChannelFutureListener() { 
   
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception { 
   
                log.debug("channel关闭后操作");
            }
        });
    }
}

上面还存在一个问题,那就是当channel已经关闭后,但是客户端这边的程序还在执行,这是因为虽然我们创建的线程已经关闭了,但是NioEventloopGroup里面还有一些线程还在执行,下面进行优雅的关闭。

  1. 首先是把创建NioEventloopGroup对象的地方往前移,而不是像之前一样使用匿名对象。

  2. 然后在channel关闭后的语句中调用nioEventLoopGroup.shutdownGracefully()方法,优雅的关闭 Gracefully就是优雅的意思

package com.hs.nettyPrimary;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.Scanner;

/** * 处理客户端与服务器断开连接后,channel关闭后 还想执行的代码 * 业务需求是: * 客户端获取用户控制台的输入,输入的数据都发送给服务器端,当输入Q的时候就表示停止数据的发送 * @author hs * @date 2021/07/17 */
@Slf4j
public class HelloClient3 { 
   
    public static void main(String[] args) throws InterruptedException { 
   

        // 把创建NioEventLoopGroup对象的语句往前移
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(nioEventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() { 
   
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception { 
   
                        // 多添加一个handler 是日志信息相关的
                        socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 连接服务器
                .connect("localhost", 8080);

        channelFuture.sync();
        Channel channel = channelFuture.channel();
        log.debug("{}",channel);

        // 新开一个线程来处理客户端发送数据给服务器
        new Thread(()->{ 
   
            Scanner scanner = new Scanner(System.in);
            while (true){ 
   
                String line = scanner.nextLine();
                if ("Q".equals(line)){ 
   
                    channel.close();
                    break;
                }else{ 
   
                    channel.writeAndFlush(line);
                }
            }
        },"input").start();

        // 正确实现channel关闭后要执行的业务逻辑
        // 1. 要获取一个 CloseFuture对象,同样它也有同步处理关闭和异步处理关闭两种方式
        ChannelFuture closeFuture = channel.closeFuture();
        // 2. 异步处理,上面真正执行关闭channel的EventLoop线程 成功关闭channel后就会调用这里面的方法
        closeFuture.addListener(new ChannelFutureListener() { 
   
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception { 
   
                log.debug("channel关闭后操作");
                nioEventLoopGroup.shutdownGracefully();
            }
        });
    }
}

小疑问,为什么netty要用异步?就比如让客户端的主线程去创建连接,创建连接成功后再继续往下执行,这里单线程不也是可以吗?那么为什么要使用异步嘞?

这里netty采用异步的方式提升的是吞吐量,并不是响应时间,比如下面一个例子

在这里插入图片描述

然后将看病细分,每个步骤花费五分钟
在这里插入图片描述

接下来,每个医生就仅仅处理一个步骤

在这里插入图片描述

以前一个医生一个小时只能接待4个病人,但现在一个小时可以接待12个病人了

要点:

  • 异步没有缩短响应时间,反而有所增加
  • 异步提升的是吞吐量,增加单位时间内处理请求的个数
  • 需要合理进行任务的拆分
  • 异步必须配合多线程,多核cpu。

今天的文章netty channelfuture_thrift netty分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/79885.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注