本文共 54639 字,大约阅读时间需要 182 分钟。
Java共支持3种网络编程模型/IO模式:BIO、NIO、AIO
Java BIO
:同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善。Java NIO
:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。Java AIO(NIO.2)
:异步非阻塞,AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
实例说明:
1)使用BIO模型编写一个服务器端,监听6666端口,当有客户端连接时,就启动一个线程与之通讯。 2)要求使用线程池机制改善,可以连接多个客户端。 3)服务器端可以接收客户端发送的数据(telnet方式即可)。 4)代码演示package com.wolfx.bio;import java.io.IOException;import java.io.InputStream;import java.net.ServerSocket;import java.net.Socket;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * @description: * @author: sukang * @date: 2020-10-19 15:19 */public class BIOServer { public static void main(String[] args) throws IOException { //线程池机制 //思路 //1、创建一个线程池 //2、如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法) ExecutorService executorService = Executors.newCachedThreadPool(); //创建ServerSocket ServerSocket serverSocket = new ServerSocket(6666); System.out.println("服务器启动了"); while(true){ System.out.println("线程信息 id =" + Thread.currentThread().getId() + " 名字 =" + Thread.currentThread().getName()); //监听,等待客户端连接 System.out.println("等待连接..."); final Socket accept = serverSocket.accept(); System.out.println("连接到一个客户端"); //就创建一个线程,与之通讯(单独写一个方法) executorService.execute(new Runnable() { public void run() { //可以和客户端通讯 handler(accept); } }); } } //编写一个handler方法,和客户端通讯 public static void handler(Socket socket){ try { System.out.println("线程信息 id =" + Thread.currentThread().getId() + " 名字 =" + Thread.currentThread().getName()); byte[] bytes = new byte[1024]; //通过socket获取输入流 InputStream inputStream = socket.getInputStream(); //循环的读取客户端发送的数据 while(true){ System.out.println("线程信息id="+Thread.currentThread().getId()+"名字="+Thread.currentThread().getName()); System.out.println("read...."); int read = inputStream.read(bytes); if(read != -1){ System.out.println(new String(bytes,0,read));//输出客户端发送的数据 }else { break; } } } catch ( IOException e ) { e.printStackTrace(); } finally { System.out.println("关闭和client的连接"); try { socket.close(); } catch ( IOException e ) { e.printStackTrace(); } } }}
启动BIOServer
telnet方式测试1)每个请求都需要创建独立的线程,与对应的客户端进行数据Read,业务处理,数据Write。
2)当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。 3)连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在Read操作上,造成线程资源浪费1)BIO以流的方式处理数据,而NIO以块的方式处理数据,块I/O的效率比流I/O高很多。
2)BIO是阻塞的,NIO则是非阻塞的。 3)BIO基于字节流和字符流进行操作,而NIO基于Channel(通道)和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。Selector、Channel和Buffer的关系图(简单版)
关系图的说明: 1)每个channel都会对应一个Buffer 2)Selector对应一个线程,Selector对应多个channel(连接) 3)该图反应了有三个channel注册到该selector 4)程序切换到哪个channel是有事件决定的,Event就是一个重要的概念 5)Selector会根据不同的事件,在各个通道上切换 6)Buffer就是一个内存块,底层是有一个数组 7)数据的读取写入是通过Buffer,这个和BIO,BIO中要么是输入流,或者是输出流,不能双向,但是NIO的Buffer是可以读也可以写,需要flip方法切换channel是双向的,可以返回底层操作系统的情况,比如Linux,底层的操作系统通道就是双向的1)使用前面学习后的ByteBuffer(缓冲)和FileChannel(通道),将"hello,尚硅谷"写入到file01.txt中
2)文件不存在就创建 3)代码演示package com.wolfx.nio;import java.io.FileOutputStream;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;/** * @description: * 本地文件写数据 * @author: sukang * @date: 2020-10-19 16:55 */public class NIOFileChannel01 { public static void main(String[] args) throws IOException { String str = "你好!世界"; //创建一个文件输入流 FileOutputStream fileOutputStream = new FileOutputStream("d:\\file01.txt"); //通过文件输出流获取文件FileChannel FileChannel channel = fileOutputStream.getChannel(); //创建一个字节缓冲区 ByteBuffer allocate = ByteBuffer.allocate(1024); //将字符放在缓冲区内 allocate.put(str.getBytes()); //执行缓冲区的flip()方法 allocate.flip(); //将数据从缓冲区写到通道中 channel.write(allocate); //关闭文件流 fileOutputStream.close(); }}
测试
1)使用前面学习后的ByteBuffer(缓冲)和FileChannel(通道),将file01.txt中的数据读入到程序,并显示在控制台屏幕
2)假定文件已经存在 3)代码演示package com.wolfx.nio;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;/** * @description: * 本地文件读数据 * @author: sukang * @date: 2020-10-19 16:55 */public class NIOFileChannel02 { public static void main(String[] args) throws IOException { //创建一个文件 File file = new File("d:\\file01.txt"); //创建一个文件输入流 FileInputStream fileInputStream = new FileInputStream(file); //通过文件输入流获取文件FileChannel FileChannel channel = fileInputStream.getChannel(); //创建一个字节缓冲区 ByteBuffer allocate = ByteBuffer.allocate((int) file.length()); //从通道中读数据到缓冲区 channel.read(allocate); System.out.println(new String(allocate.array())); //关闭文件流 fileInputStream.close(); }}
测试
1)使用FileChannel(通道)和方法read,write,完成文件的拷贝
2)拷贝一个文本文件1.txt,放在项目下即可 3)代码演示package com.wolfx.nio;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;/** * @description: * 使用一个Buffer完成文件读取、写入 * @author: sukang * @date: 2020-10-19 16:55 */public class NIOFileChannel03 { public static void main(String[] args) throws IOException { //创建一个文件 File file = new File("1.txt"); //创建一个文件输入流 FileInputStream fileInputStream = new FileInputStream(file); //创建一个文件输出流 FileOutputStream fileOutputStream = new FileOutputStream("2.txt"); //通过文件输入流获取文件fileInputChannel FileChannel fileInputChannel = fileInputStream.getChannel(); //通过文件输出流获取文件fileOutputChannel FileChannel fileOutputChannel = fileOutputStream.getChannel(); //创建一个字节缓冲区 ByteBuffer allocate = ByteBuffer.allocate((int) file.length()); //从通道中读数据到缓冲区 fileInputChannel.read(allocate); allocate.flip(); //从缓冲区把数据写到文件通道中去 fileOutputChannel.write(allocate); //关闭输入输出文件流 fileInputStream.close(); fileOutputStream.close(); }}
NIO非阻塞网络编程原理分析图
NIO非阻塞网络编程相关的(Selector、SelectionKey、ServerScoketChannel和SocketChannel)关系梳理图 对上图的说明: 1)当客户端连接时,会通过ServerSocketChannel得到SocketChannel 2)Selector进行监听select方法,返回有事件发生的通道的个数 3)将socketChannel注册到Selector上,register(Selectorsel,intops),一个selector上可以注册多个SocketChannel 4)注册后返回一个SelectionKey,会和该Selector关联(集合) 5)进一步得到各个SelectionKey(有事件发生) 6)在通过SelectionKey反向获取SocketChannel,方法channel( 7)可以通过得到的channel,完成业务处理1)SelectionKey,表示Selector和网络通道的注册关系,共四种:
int OP_ACCEPT:有新的网络连接可以accept,值为16 int OP_CONNECT:代表连接已经建立,值为8 int OP_READ:代表读操作,值为1 int OP_WRITE:代表写操作,值为42)SelectionKey相关方法
1)ServerSocketChannel在服务器端监听新的客户端Socket连接
2)相关方法如下1)SocketChannel,网络IO通道,具体负责进行读写操作。NIO把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。
2)相关方法如下实例要求:
1)编写一个NIO群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞) 2)实现多人群聊 3)服务器端:可以监测用户上线,离线,并实现消息转发功能 4)客户端:通过channel可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到) 5)目的:进一步理解NIO非阻塞网络编程机制 6)示意图分析和代码 服务端package com.wolfx.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Iterator;/** * @description: * 服务端 * @author: sukang * @date: 2020-10-20 10:09 */public class GroupChatServer { //定义属性 private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; //构造器 //初始化工作 public GroupChatServer(){ try { //得到选择器 selector = Selector.open(); //ServerSocketChannel listenChannel = ServerSocketChannel.open(); //绑定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); //设置非阻塞模式 listenChannel.configureBlocking(false); //将该listenChannel注册到selector listenChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("基于NIO的聊天室在["+PORT+"]端口启动成功"); } catch ( IOException e ) { e.printStackTrace(); } } //监听 public void listen(){ try { //循环处理 while (true){ int count = selector.select(); if(count > 0){//有事件处理 //遍历得到selectionKey集合 Iteratoriterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ //取出selectionkey SelectionKey key = iterator.next(); //监听到accept if(key.isAcceptable()){ SocketChannel sc = listenChannel.accept(); sc.configureBlocking(false); //将该sc注册到selector sc.register(selector, SelectionKey.OP_READ); //提示 System.out.println(sc.getRemoteAddress() + "上线"); } if(key.isReadable()){//通道发送read事件,即通道是可读的状态 //处理读 readData(key); } //当前的key删除,防止重复处理 iterator.remove(); } }else{ System.out.println("等待..."); } } } catch ( Exception e ) { e.printStackTrace(); } finally { } } //读取客户端消息 private void readData(SelectionKey key){ //取到关联的channel SocketChannel channel = null; try { //得到channel channel = (SocketChannel) key.channel(); //创建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); //根据count的值做处理 if(count > 0){ //把缓存区数据转成字符串 String msg = new String(buffer.array()); //输出消息 System.out.println("from 客户端:" + msg); //向其他的客户端转发消息(去掉自己),专门写一个方法来处理 sendInfoToOtherClients(msg, channel); } } catch ( IOException e ) { try { System.out.println(channel.getRemoteAddress() + " 离线了..."); //取消注册 key.channel(); //关闭通道 channel.close(); } catch ( IOException ex ) { ex.printStackTrace(); } } } //转发消息给其他客户(通道) private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException { System.out.println("服务器转发消息中..."); //遍历所有注册到selector上的SocketChannel,并排除self for (SelectionKey key:selector.keys()) { //通过key取出对应的SocketChannel Channel targetChannel = key.channel(); //排除自己 if(targetChannel instanceof SocketChannel && targetChannel != self){ //转型 SocketChannel dest = (SocketChannel) targetChannel; //将msg存储到buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); //将buffer的数据写入通道 dest.write(buffer); } } } public static void main(String[] args) { //创建服务器对象 GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); }}
客户端
package com.wolfx.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Scanner;/** * @description: * 客户端 * @author: sukang * @date: 2020-10-20 11:10 */public class GroupChatClient { //定义相关的属性 private final String HOST = "127.0.0.1";//服务器的ip private final int PORT = 6667;//服务器端口 private Selector selector; private SocketChannel socketChannel; private String username; //构造器,完成初始化工作 public GroupChatClient() throws IOException { selector = Selector.open(); //连接服务器 socketChannel = SocketChannel.open(new InetSocketAddress(HOST,PORT)); //设置非阻塞 socketChannel.configureBlocking(false); //将channel注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getRemoteAddress().toString(); System.out.println(username + " is ok..."); } //向服务器发送消息 public void sendInfo(String info){ info = username + " 说: " + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); } catch ( IOException e ) { e.printStackTrace(); } } //读取从服务端回复的消息 public void readInfo(){ try { int readChannels = selector.select(); if(readChannels > 0){//有可以用的通道 Iteratoriterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); if(key.isReadable()){ //得到相关的通道 SocketChannel sc = (SocketChannel) key.channel(); //得到一个Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //读取 sc.read(buffer); //把读到的缓冲区的数据转成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } iterator.remove(); }else{ } } catch ( IOException e ) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { //启动我们客户端 final GroupChatClient chatClient = new GroupChatClient(); //启动一个线程,每隔3秒,从服务器读取数据 new Thread(){ @Override public void run(){ while (true){ chatClient.readInfo(); try { sleep(3000); } catch ( InterruptedException e ) { e.printStackTrace(); } } } }.start(); //发送数据给服务器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String s = scanner.nextLine(); chatClient.sendInfo(s); } }}
测试结果
传统IO
传统IO的数据拷贝流程如下图: 1)数据需要从磁盘
拷贝到内核空间
,再从内核空间拷到用户空间
(JVM)。 2)程序可能进行数据修改等操作。 3)再将数据拷贝到内核空间
,内核空间再拷贝到网卡内存
,通过网络发送出去(或拷贝到磁盘
)。 即数据的读写(这里用户空间发到网络也算作写),都至少需要两次拷贝。 当然磁盘到内核空间属于DMA拷贝(DMA即直接内存存取,原理是外部设备不通过CPU而直接与系统内存交换数据)。而内核空间到用户空间则需要CPU的参与进行拷贝,既然需要CPU参与,也就涉及到了内核态和用户态的相互切换
,如下图: NIO的零拷贝 零拷贝的数据拷贝如下图: 内核态与用户态切换如下图: 改进的地方: 4次减少到了2次
;从4次减少到了3次
(其中只有1次涉及了CPU,另外2次是DMA直接存取)。但这还没有达到我们零拷贝的目标。如果底层NIC(网络接口卡)支持gather操作,我们能进一步减少内核中的数据拷贝。在Linux 2.4以及更高版本的内核中,socket缓冲区描述符已被修改用来适应这个需求。这种方式不但减少多次的上下文切换,同时消除了需要CPU参与的重复的数据拷贝。用户这边的使用方式不变,而内部已经有了质的改变:
NIO的零拷贝由transferTo()
方法实现。transferTo()方法将数据从FileChannel
对象传送到可写的字节通道(如Socket Channel等)。在内部实现中,由native方法transferTo0()
来实现,它依赖底层操作系统的支持。在UNIX和Linux系统中,调用这个方法将会引起sendfile()系统调用。 使用场景一般是: 文件较大,读写较慢,追求速度JVM内存不足,不能加载太大数据内存带宽不够,即存在其他程序或线程存在大量的IO操作,导致带宽本来就小
以上都建立在不需要进行数据文件操作的情况下,如果既需要这样的速度,也需要进行数据操作怎么办?
那么使用NIO的直接内存!NIO的直接内存
首先,它的作用位置处于传统IO(BIO)与零拷贝之间,为何这么说?各种操作
,最后再写到磁盘或是发送到网络,效率较慢但支持数据文件操作。而直接内存则介于两者之间,效率一般且可操作文件数据
。直接内存(mmap技术)将文件直接映射到内核空间的内存,返回一个操作地址
(address),它解决了文件数据需要拷贝到JVM才能进行操作的窘境。而是直接在内核空间直接进行操作,省去了内核空间拷贝到用户空间
这一步操作。
MappedByteBuffer
实现的。核心即是map()
方法,该方法把文件映射到内存中,获得内存地址addr,然后通过这个addr构造MappedByteBuffer类,以暴露各种文件操作API。 由于MappedByteBuffer申请的是堆外内存,因此不受Minor GC控制
,只能在发生Full GC时才能被回收。而DirectByteBuffer
改善了这一情况,它是MappedByteBuffer类的子类,同时它实现了DirectBuffer接口,维护一个Cleaner对象来完成内存回收。因此它既可以通过Full GC
来回收内存,也可以调用clean()
方法来进行回收。 另外,直接内存
的大小可通过jvm参数来设置:-XX:MaxDirectMemorySize
。 NIO的MappedByteBuffer还有一个兄弟叫做HeapByteBuffer
。顾名思义,它用来在堆中申请内存,本质是一个数组。由于它位于堆中,因此可受GC管控,易于回收。 mmap和sendFile的区别
mmap:通过内存映射,将
sendFile:Linux2.1版本提供了sendFile函数,其基本原理如下:数据根本不经过用户态,直接从内核缓冲区进入到SocketBuffer,同时,由于和用户态完全无关,就减少了一次上下文切换文件映射到内核缓冲区
,同时,用户空间可以共享内核空间的数据
。这样,在进行网络传输时,就可以减少内核空间到用户空间的拷贝次数。
1)JDK7引入了AsynchronousI/O,即AIO。在进行I/O编程中,常用到两种模式:Reactor和Proactor。Java的NIO就是Reactor,当有事件触发时,服务器端得到通知,进行相应的处理。
2)AIO即NIO2.0,叫做异步不阻塞的IO。AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。 3)目前AIO还没有广泛应用,Netty也是基于NIO,而不是AIO。1)NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
2)需要具备其他的额外技能:要熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。 3)开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。 4)JDKNIO的Bug:例如臭名昭著的EpollBug,它会导致Selector空轮询,最终导致CPU100%。直到JDK1.7版本该问题仍旧存在,没有被根本解决。Netty对JDK自带的NIO的API进行了封装,解决了上述问题。
1)设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池 2)使用方便:详细记录的Javadoc,用户指南和示例;没有其他依赖项,JDK5(Netty3.x)或6(Netty4.x)就足够了。 3)高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。 4)安全:完整的SSL/TLS和StartTLS支持。 5)社区活跃、不断更新:社区活跃,版本迭代周期短,发现的Bug可以被及时修复,同时,更多的新功能会被加入1)netty版本分为netty3.x和netty4.x、netty5.x
2)因为Netty5出现重大bug,已经被官网废弃了,目前推荐使用的是Netty4.x的稳定版本1)目前存在的线程模型有:
传统阻塞I/O服务模型 Reactor模式2)Netty线程模式(Netty主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor)
模型特点
1)采用阻塞IO模式获取输入的数据 2)每个连接都需要独立的线程完成数据的输入,业务处理,数据返回问题分析
1)当并发数很大,就会创建大量的线程,占用很大系统资源 2)连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费针对传统阻塞I/O服务模型的2个缺点,解决方案:
1)基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理Reactor对应的叫法:1.反应器模式2.分发者模式(Dispatcher)3.通知者模式(notifier) 2)基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。I/O复用结合线程池,就是Reactor模式基本设计思想,如图
1)Reactor模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动) 2)服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程,因此Reactor模式也叫Dispatcher模式 3)Reactor模式使用IO复用监听事件,收到事件后,分发给某个线程(进程),这点就是网络服务器高并发处理关键1)Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
2)Handlers:处理程序执行I/O事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。根据Reactor的数量和处理资源池线程的数量不同,有3种典型的实现
1)单Reactor单线程 2)单Reactor多线程 3)主从Reactor多线程原理图
方案说明 1)Select是前面I/O复用模型介绍的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求 2)Reactor对象通过Select监控客户端请求事件,收到事件后通过Dispatch进行分发 3)如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理 4)如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应 5)Handler会完成Read→业务处理→Send的完整业务流程结合实例:服务器端用一个线程通过多路复用搞定所有的IO操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,前面的NIO案例就属于这种模型。
方案优缺点分析
1)优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成 2)缺点:性能问题,只有一个线程,无法完全发挥多核CPU的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈 3)缺点:可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障 4)使用场景:客户端的数量有限,业务处理非常快速,比如Redis在业务处理的时间复杂度O(1)的情况原理图
对上图的小结 1)Reactor对象通过select监控客户端请求事件,收到事件后,通过dispatch进行分发 2)如果建立连接请求,则右Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件 3)如果不是连接请求,则由reactor分发调用连接对应的handler来处理 4)handler只负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务 5)worker线程池会分配独立线程完成真正的业务,并将结果返回给handler 6)handler收到响应后,通过send将结果返回给client方案优缺点分析
1)优点:可以充分的利用多核cpu的处理能力 2)缺点:多线程数据共享和访问比较复杂,reactor处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈原理图
上图的方案说明 1)Reactor主线程MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件 2)当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor 3)subreactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理 4)当有新事件发生时,subreactor就会调用对应的handler处理 5)handler通过read读取数据,分发给后面的worker线程处理 6)worker线程池分配独立的worker线程进行业务处理,并返回结果 7)handler收到响应的结果后,再通过send将结果返回给client 8)Reactor主线程可以对应多个Reactor子线程,即MainRecator可以关联多个SubReactor方案优缺点说明
1)优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。 2)优点:父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据。 3)缺点:编程复杂度较高3种模式用生活案例来理解
1)单Reactor单线程,前台接待员和服务员是同一个人,全程为顾客服 2)单Reactor多线程,1个前台接待员,多个服务员,接待员只负责接待 3)主从Reactor多线程,多个前台接待员,多个服务生Reactor模式具有如下的优点
1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的 2)可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销 3)扩展性好,可以方便的通过增加Reactor实例个数来充分利用CPU资源 4)复用性好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性工作原理示意图1-简单版
对上图说明 1)BossGroup线程维护Selector,只关注Accecpt 2)当接收到Accept事件,获取到对应的SocketChannel,封装成NIOScoketChannel并注册到Worker线程(事件循环),并进行维护 3)当Worker线程监听到selector中通道发生自己感兴趣的事件后,就进行处理(就由handler),注意handler已经加入到通道工作原理示意图2-进阶版
工作原理示意图-详细版 对上图的说明小结 1)Netty抽象出两组线程池BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写 2)BossGroup和WorkerGroup类型都是NioEventLoopGroup 3)NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop 4)NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯 5)NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop 6)每个BossNioEventLoop循环执行的步骤有3步7)每个WorkerNIOEventLoop循环执行的步骤
8)每个WorkerNIOEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel,即通过pipeline可以获取到对应通道,管道中维护了很多的处理器
实例要求:
1)Netty服务器在6668端口监听,客户端能发送消息给服务器"hello,服务器~" 2)服务器可以回复消息给客户端"hello,客户端~" 3)目的:对Netty线程模型有一个初步认识,便于理解Netty模型理论代码如下
NettyServerpackage com.wolfx.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * @description: * 服务端 * @author: sukang * @date: 2020-10-21 14:54 */public class NettyServer { public static void main(String[] args) { //创建BossGroup和WorkerGroup //说明 //1、创建两个线程组bossGroup和workerGroup //2、bossGroup只是处理连接请求, 真正的和客户端业务处理,会交给workerGroup完成 //3、两个都是无限循环 //4、bossGroup和workerGroup含有的子线程(NioEventLoop)的个数 //默认实际cpu核数*2 NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来进行设置 bootstrap.group(bossGroup, workerGroup)//设置两个线程组 .channel(NioServerSocketChannel.class)//使用NioSocketChannel作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列得到连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态 .childHandler(new ChannelInitializer() {//创建一个通道测试对象(匿名对象) //给pipeline设置处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } });//给我们的workerGroup的EventLoop对应的管道设置处理器 System.out.println("...服务器 is ready..."); //绑定一个端口并且同步,生成了一个ChannelFuture对象 //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(6668).sync(); } catch ( Exception e ) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
NettyServerHandler
package com.wolfx.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;/** * @description: * 自定义Handler * 说明 * 1、我们自定义一个Handler需要继续netty规定好的某个HandlerAdpter(规范) * 2、这时我们自定义一个Handler,才能称为handler * @author: sukang * @date: 2020-10-21 15:30 */public class NettyServerHandler extends ChannelInboundHandlerAdapter { //读取数据实际(这里我们可以读取客户端发送的消息) //1、ChannelHandlerContext ctx:上下文对象,含有管道pipeline,通道channel,地址 //2、Object msg: 就是客户端发送的数据 默认Object @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName()); System.out.println("server ctx =" + ctx); System.out.println("看看 channel和 pipeline的关系"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline();//本质是一个双向链接,出站入站 //将msg转换一个ByteBuf //ByteBuf 是Netty提供,不是NIO的ByteBuffer ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:"+channel.remoteAddress()); } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //将数据写入到缓存,并刷新 //一般讲, 我们对这个发送的数据进行编码 ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~(>^ω^<)喵",CharsetUtil.UTF_8)); } //处理异常, 一般是需要关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}
NettyClient
package com.wolfx.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/** * @description: * 客户端 * @author: sukang * @date: 2020-10-21 16:06 */public class NettyClient { public static void main(String[] args) { //客户端需要一个事件循环组 NioEventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是ServerBootstrap而是Bootstrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group)//设置线程组 .channel(NioSocketChannel.class)//设置客户端通道的实现类(反射) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器 } }); System.out.println("客户端 ok..."); //启动客户端去连接服务器端 //关于ChannelFuture要分析,涉及到netty的异步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",6668).sync(); //给关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } catch ( Exception e ) { e.printStackTrace(); } finally { group.shutdownGracefully(); } }}
NettyClientHandler
package com.wolfx.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/** * @description: * @author: sukang * @date: 2020-10-21 16:16 */public class NettyClientHandler extends ChannelInboundHandlerAdapter { //当通道就绪就会触发该方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client"+ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server:(>^ω^<)喵", CharsetUtil.UTF_8)); } //当通道有读取事件时,会触发 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址:"+ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
基本介绍
1)异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。 2)Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。 3)调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。 4)Netty的异步模型是建立在future和callback的之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制)Future说明
1)表示异步的执行结果
,可以通过它提供的方法来检测执行是否完成,比如检索计算等等。 2)ChannelFuture是一个接口:publicinterfaceChannelFutureextendsFuture我们可以添加监听器,当监听的事件发生时,就会通知到监听器
。 工作原理示意图
说明: 1)在使用Netty进行编程时,拦截操作和转换出入站数据只需要您提供callback或利用future即可。这使得链式操作简单、高效,并有利于编写可重用的、通用的代码。 2)Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来。Future-Listener机制
1)当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。 2)常见有如下操作举例说明
演示:绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑1)Bootstrap意思是引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类
2)常见的方法有 public ServerBootstrap group(EventLoopGroup parentGroup,EventLoopGroup childGroup),该方法用于服务器端,用来设置两个EventLoop public B group(EventLoopGroup group),该方法用于客户端,用来设置一个EventLoop public B channel(Class<?extendsC> channelClass),该方法用来设置一个服务器端的通道实现 public B option(ChannelOption option,T value),用来给ServerChannel添加配置 public ServerBootstrap childOption(ChannelOption childOption,T value),用来给接收到的通道添加配置 public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类(自定义的handler) public ChannelFuture bind(int inetPort),该方法用于服务器端,用来设置占用的端口号 public ChannelFuture connect(String inetHost,int inetPort),该方法用于客户端,用来连接服务器端Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
常见的方法有 Channel channel(),返回当前正在进行IO操作的通道 ChannelFuture sync(),等待异步操作执行完毕1)Netty网络通信的组件,能够用于执行网络I/O操作。
2)通过Channel可获得当前网络连接的通道的状态 3)通过Channel可获得网络连接的配置参数(例如接收缓冲区大小) 4)Channel提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I/O调用都将立即返回,并且不保证在调用结束时所请求的I/O操作已完成 5)调用立即返回一个ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I/O操作成功、失败或取消时回调通知调用方 6)支持关联I/O操作与对应的处理程序 7)不同协议、不同的阻塞类型的连接都有不同的Channel类型与之对应,常用的Channel类型: NioSocketChannel,异步的客户端TCPSocket连接。 NioServerSocketChannel,异步的服务器端TCPSocket连接。 NioDatagramChannel,异步的UDP连接。 NioSctpChannel,异步的客户端Sctp连接。 NioSctpServerChannel,异步的Sctp服务器端连接,这些通道涵盖了UDP和TCP网络IO以及文件IO。1)Netty基于Selector对象实现I/O多路复用,通过Selector一个线程可以监听多个连接的Channel事件。
2)当向一个Selector中注册Channel后,Selector内部的机制就可以自动不断地查询(Select)这些注册的Channel是否有已就绪的I/O事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个Channel1)ChannelHandler是一个接口,处理I/O事件或拦截I/O操作,并将其转发到其ChannelPipeline(业务处理链)中的下一个处理程序。
2)ChannelHandler本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类 3)ChannelHandler及其实现类一览图 4)我们经常需要自定义一个Handler类去继承ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法ChannelPipeline是一个重点:
1)ChannelPipeline是一个Handler的集合,它负责处理和拦截inbound或者outbound的事件和操作,相当于一个贯穿Netty的链。(也可以这样理解:ChannelPipeline是保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作) 2)ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler如何相互交互 3)在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应,它们的组成关系如下 4)常用方法 ChannelPipeline addFirst(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的第一个位置 ChannelPipeline addLast(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的最后一个位置1)保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象
2)即ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和Channel的信息,方便对ChannelHandler进行调用 3)常用方法1)Netty在创建Channel实例后,一般都需要设置ChannelOption参数。
2)ChannelOption参数如下:1)EventLoopGroup是一组EventLoop的抽象,Netty为了更好的利用多核CPU资源,一般会有多个EventLoop同时工作,每个EventLoop维护着一个Selector实例。
2)EventLoopGroup提供next接口,可以从组里面按照一定规则获取其中一个EventLoop来处理任务。在Netty服务器端编程中,我们一般都需要提供两个EventLoopGroup,例如:BossEventLoopGroup和WorkerEventLoopGroup。 3)通常一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop线程。BossEventLoop负责接收客户端的连接并将SocketChannel交给WorkerEventLoopGroup来进行IO处理,如下图所示 4)常用方法 public NioEventLoopGroup(),构造方法 public Future<?> shutdownGracefully(),断开连接,关闭线程1)Netty提供一个专门用来操作缓冲区(即Netty的数据容器)的工具类
2)常用方法如下所示 3)举例说明Unpooled获取Netty的数据容器ByteBuf的基本使用【案例演示】 案例一package com.wolfx.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;/** * @description: * @author: sukang * @date: 2020-10-22 18:32 */public class NettyByteBuf01 { public static void main(String[] args) { //创建一个ByteBuf //说明 //1、创建对象,该对象包含一个数组arr,是一个byte[10] //2、在netty的buffer中,不需要使用flip进行反转 //底层维护了readerindex和writerIndex //3、通过readerindex和writerIndex和capacity,将buffer分成三个区域 //0---readerindex已经读取的区域 //readerindex---writerIndex,可读的区域 //writerIndex--capacity,可写的区域 ByteBuf buffer = Unpooled.buffer(10); for (int i = 0; i < 10; i++) { buffer.writeByte(i); } System.out.println("capacity =" + buffer.capacity());//10 //输出 for (int i = 0; i < buffer.capacity(); i++) { System.out.println(buffer.readByte()); } System.out.println("执行完毕"); }}
测试结果
案例二package com.wolfx.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import java.nio.charset.Charset;/** * @description: * @author: sukang * @date: 2020-10-22 18:47 */public class NettyByteBuf02 { public static void main(String[] args) { //创建ByteBuf ByteBuf byteBuf = Unpooled.copiedBuffer("hello, world!", Charset.forName("utf-8")); //使用相关的方法 if(byteBuf.hasArray()){//true byte[] content = byteBuf.array(); //将content转成字符串 System.out.println(new String(content,Charset.forName("utf-8"))); System.out.println("byteBuf =" + byteBuf); System.out.println(byteBuf.arrayOffset());//0 System.out.println(byteBuf.readerIndex());//0 System.out.println(byteBuf.writerIndex());//12 System.out.println(byteBuf.capacity());//36 System.out.println(byteBuf.getByte(0));//104 int len = byteBuf.readableBytes();//可读的字节数12 System.out.println("len="+len); //使用for取出各个字节 for (int i = 0; i < len; i++) { System.out.println((char)byteBuf.getByte(i)); } //按照某个范围读取 System.out.println(byteBuf.getCharSequence(0,4,Charset.forName("utf-8"))); System.out.println(byteBuf.getCharSequence(4,6,Charset.forName("utf-8"))); } }}
测试结果
实例要求:
1)编写一个Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞) 2)实现多人群聊 3)服务器端:可以监测用户上线,离线,并实现消息转发功能 4)客户端:通过channel可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到) 5)目的:进一步理解Netty非阻塞网络编程机制 代码如下: GroupChatServerpackage com.wolfx.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;/** * @description: * @author: sukang * @date: 2020-10-22 19:21 */public class GroupChatServer { private int port; //监听端口 public GroupChatServer(int port){ this.port = port; } //编写run方法,处理客户端的请求 public void run() { //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();//8个 try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer(){ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //获取到pipeline ChannelPipeline pipeline = socketChannel.pipeline(); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 //加入自己的业务处理handler pipeline.addLast(new GroupChatServerHandler()); } }); System.out.println("netty 服务器启动"); ChannelFuture channelFuture = bootstrap.bind(port).sync(); //监听关闭 channelFuture.channel().closeFuture().sync(); } catch ( Exception e ) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { new GroupChatServer(7000).run(); }}
GroupChatServerHandler
package com.wolfx.netty;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;import java.text.SimpleDateFormat;/** * @description: * @author: sukang * @date: 2020-10-22 19:53 */public class GroupChatServerHandler extends SimpleChannelInboundHandler{ //public static List channels = new ArrayList (); //使用一个hashmap管理 //public static Map channels = new HashMap (); //定义一个channle组,管理所有的channel //GlobalEventExecutor.INSTANCE)是全局的事件执行器,是一个单例 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-ddHH:mm:ss"); //handlerAdded表示连接建立,一旦连接,第一个被执行 //将当前channel加入到channelGroup @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //将该客户加入聊天的信息推送给其它在线的客户端 /*该方法会将channelGroup中所有的channel遍历,并发送消息,我们不需要自己遍历*/ channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天"+sdf.format(new java.util.Date())+"\n"); channelGroup.add(channel); } //断开连接,将xx客户端离开信息推送给当前在线的客户 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress()+"离开了\n"); System.out.println("channelGroup size" + channelGroup.size()); } //表示channel处于活动状态,提示xx上线 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()+"上线了~"); } //表示channel处于不活动状态,提示xx离线了 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()+"离线了~"); } //读取数据 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //获取到当前channel final Channel channel = ctx.channel(); //这是我们遍历channelGroup, 根据不同的情况,会送不同的消息 channelGroup.forEach(ch -> { if(channel != ch){//不是当前channel, 转发消息 ch.writeAndFlush("[客户]"+channel.remoteAddress()+"发送了消息"+msg+"\n"); }else{//回显自己发送的消息给自己 ch.writeAndFlush("[自己]发送了消息"+msg+"\n"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //关闭通道 ctx.close(); }}
GroupChatClient
package com.wolfx.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;/** * @description: * @author: sukang * @date: 2020-10-22 20:26 */public class GroupChatClient { //属性 private final String host; private final int port; public GroupChatClient(String host, int port) { this.host = host; this.port = port; } public void run() { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { //得到pipeline ChannelPipeline pipeline = ch.pipeline(); //加入相关hander pipeline.addLast("decoder",new StringDecoder()); pipeline.addLast("encoder",new StringEncoder()); //加入自定义的handler pipeline.addLast(new GroupChatClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); //得到channel Channel channel = channelFuture.channel(); System.out.println("-------"+channel.localAddress()+"--------"); //客户端需要输入信息,创建一个扫描器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String msg = scanner.nextLine(); //通过channel发送到服务器端 channel.writeAndFlush(msg+"\r\n"); } } catch ( Exception e ) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) { new GroupChatClient("127.0.0.1", 7000).run(); }}
GroupChatClientHandler
package com.wolfx.netty;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * @description: * @author: sukang * @date: 2020-10-22 20:41 */public class GroupChatClientHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); }}
在进行Java NIO学习时,发现,如果客户端连续不断的向服务端发送数据包时,服务端接收的数据会出现两个数据包粘在一起的情况,这就是TCP协议中经常会遇到的粘包以及拆包的问题。
我们都知道TCP属于传输层的协议,传输层除了有TCP协议外还有UDP协议。那么UDP是否会发生粘包或拆包的现象呢?答案是不会。UDP是基于报文发送的,从UDP的帧结构可以看出,在UDP首部采用了16bit来指示UDP数据报文的长度,因此在应用层能很好的将不同的数据报文区分开,从而避免粘包和拆包的问题。而TCP是基于字节流的,虽然应用层和TCP传输层之间的数据交互是大小不等的数据块,但是TCP把这些数据块仅仅看成一连串无结构的字节流,没有边界;另外从TCP的帧结构也可以看出,在TCP的首部没有表示数据长度的字段,基于上面两点,在使用TCP传输数据时,才有粘包或者拆包现象发生的可能。
现在假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,那么服务端收到的数据可以分为三种,现列举如下:
第一种情况,接收端正常收到两个数据包,即没有发生拆包和粘包的现象,此种情况不在本文的讨论范围内。 第二种情况,接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于接收端来说很难处理。 第三种情况,这种情况有两种表现形式,如下图。接收端收到了两个数据包,但是这两个数据包要么是不完整的,要么就是多出来一块,这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。发生TCP粘包或拆包有很多原因,现列出常见的几点:
1、要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。 2、待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。 3、要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。 4、接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。通过以上分析,我们清楚了粘包或拆包发生的原因,那么如何解决这个问题呢?解决问题的关键在于如何给每个数据包添加边界信息,常用的方法有如下几个:
1、发送端给每个数据包添加包首部,首部中应该至少包含数据包的长度,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度了。 2、发送端将每个数据包封装为固定长度(不够的可以通过补0填充),这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。 3、可以在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开。1)RPC(RemoteProcedureCall)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
2)两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图) 3)常见的RPC框架有:比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift,Spring旗下的SpringCloud。 ![在这里插入图片描述](https://img-blog.csdnimg.cn/20201024141910710.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3N1Y2hhaGFlcmthbmc=,size_16,color_FFFFFF,t_70#pic_center1)服务消费方(client)以本地调用方式调用服务
2)client stub接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体 3)client stub将消息进行编码并发送到服务端 4)server stub收到消息后进行解码 5)server stub根据解码结果调用本地的服务 6)本地服务执行并将结果返回给server stub 7)server stub将返回导入结果进行编码并发送至消费方 8)client stub接收到消息并进行解码 9)服务消费方(client)得到结果小结:RPC的目标就是将2-8这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用
需求说明
1)dubbo底层使用了Netty作为网络通讯框架,要求用Netty实现一个简单的RPC框架 2)模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty4.1.20设计说明
1)创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。 2)创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。 3)创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用Netty请求提供者返回数据 4)开发的分析图 代码实现package com.wolfx.dubbo.publicinterface;/** * @description: * 这个是接口, 是提供方和服务消费方都需要 * @author: sukang * @date: 2020-10-23 15:32 */public interface HelloService { String hello(String msg);}
package com.wolfx.dubbo.provider;import com.wolfx.dubbo.publicinterface.HelloService;/** * @description: * @author: sukang * @date: 2020-10-23 15:37 */public class HelloServiceImpl implements HelloService { private static int count = 0; //当有消费方调用改方法时,就返回一个结果 @Override public String hello(String msg) { System.out.println("收到客户端消息=" + msg); //根据msg返回不同的结果 if(msg != null){ return "你好客户端,我已经收到你的消息 [" + msg + "]第" + (++count) + " 次"; }else{ return "你好客户端,我已经收到你的消息"; } }}
package com.wolfx.dubbo.provider;import com.wolfx.dubbo.netty.NettyServer;/** * @description: * ServerBootstrap会启动一个服务提供者,就是NettyServer * @author: sukang * @date: 2020-10-23 15:52 */public class ServerBootstrap { public static void main(String[] args) { NettyServer.startServer("127.0.0.1",7000); }}
package com.wolfx.dubbo.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * @description: * @author: sukang * @date: 2020-10-23 15:53 */public class NettyServer { public static void startServer(String hostName,int port){ startServer0(hostName, port); } //编写一个方法, 完成对NettyServer的初始化和启动 private static void startServer0(String hostname, int port){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler());//业务处理器 } }); ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync(); System.out.println("服务提供方开始提供服务~~"); channelFuture.channel().closeFuture().sync(); } catch ( Exception e ) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
package com.wolfx.dubbo.netty;import com.wolfx.dubbo.customer.ClientBootstrap;import com.wolfx.dubbo.provider.HelloServiceImpl;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * @description: * @author: sukang * @date: 2020-10-23 16:07 */public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取客户端发送的消息, 并调用服务 System.out.println("msg =" + msg); //客户端在调用服务器api时,我们需要定义一个协议 //比如我们要求 每次发送消息是都必须以某个字符串开头 "HelloService#hello#你好" if (msg.toString().startsWith(ClientBootstrap.providerName)){ String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}
package com.wolfx.dubbo.netty;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.concurrent.Callable;/** * @description: * @author: sukang * @date: 2020-10-24 13:29 */public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context;//上下文 private String result; //返回的结果 private String para; //客户端调用方法时,传入的参数 //与服务器的连接创建后,就会被调用,这个方法是第一个被调用(1) @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive被调用"); context = ctx; //因为我们在其他方法会使用ctx } //收到服务器的数据后,调用方法(4) @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(" channelRead 被调用"); result = msg.toString(); notify();//唤醒等待的线程 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } //被代理对象调用,发送数据给服务器,->wait->等待被唤醒(channelRead)->返回结果(3) -> 5 @Override public Object call() throws Exception { System.out.println(" call 被调用 "); context.writeAndFlush(para); //进行wait wait();//等待channelRead方法获取到服务器的结果后,唤醒 System.out.println("call2 被调用"); return result; } //(2) void setPara(String para){ System.out.println("setPara"); this.para = para; }}
package com.wolfx.dubbo.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import java.lang.reflect.Proxy;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * @description: * @author: sukang * @date: 2020-10-24 13:46 */public class NettyClient { //创建线程池 private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static NettyClientHandler client; private int count = 0; //编写方法使用代理模式,获取一个代理对象 public Object getBean(final Class serviceClass, final String providerName){ return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class []{serviceClass},(proxy,method,args) ->{ System.out.println("(proxy,method,args)进入...."+(++count)+"次"); //{}部分的代码,客户端每调用一次hello,就会进入到该代码 if(client == null){ initClient(); } //设置要发给服务器端的信息 //providerName 协议头args[0]就是客户端调用apihello(???),参数 client.setPara(providerName + args[0]); return executor.submit(client).get(); }); } //初始化客户端 private static void initClient(){ client = new NettyClientHandler(); //创建EventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer(){ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(client); } }); try { bootstrap.connect("127.0.0.1",7000).sync(); } catch ( Exception e ) { e.printStackTrace(); } }}
package com.wolfx.dubbo.customer;import com.wolfx.dubbo.netty.NettyClient;import com.wolfx.dubbo.publicinterface.HelloService;/** * @description: * @author: sukang * @date: 2020-10-24 14:07 */public class ClientBootstrap { //这里定义协议头 public static final String providerName = "HelloService#hello#"; public static void main(String[] args) throws InterruptedException { //创建一个消费者 NettyClient customer = new NettyClient(); //创建代理对象 HelloService service = (HelloService)customer.getBean(HelloService.class, providerName); for (;;) { Thread.sleep(2*1000); //通过代理对象调用服务提供者的方法(服务) String res = service.hello("你好 dubbo~"); System.out.println("调用的结果 res=" + res); } }}
转载地址:http://monti.baihongyu.com/