09 服务端代码范例(nio)

文章目录


使用nio包下的类简单实现一个服务端代码熟悉前面介绍的API

  • 采用阻塞式
  • 采用非阻塞
  • 阻塞和非阻塞混用

1 阻塞式服务端

1.1 服务端

采用阻塞模式,用线程池中的工作线程处理每个客户连接。

  • 当ServerSocketChannel与SocketChannel采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多个线程。在EchoServer类中,利用java.util.concurrent包中提供的线程池ExecutorService来处理与客户的连接。
  • EchoServer类的构造方法负责创建线程池,启动服务器,把它绑定到一个本地端口。EchoServer类的service()方法负责接收客户的连接。每接收到一个客户连接,就把它交给线程池来处理,线程池取出一个空闲的线程,来执行Handler对象的run()方法。Handler类的handle()方法负责与客户通信。该方法先获得与SocketChannel关联的Socket对象,然后从Socket对象中得到输入流与输出流,再接收和发送数据。
package study.wyy.net.nio.server;

import lombok.extern.slf4j.Slf4j;
import study.wyy.net.nio.thread.RequestHandler;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author wyaoyao
 * @date 2021/3/17 17:12
 * 阻塞式服务端示例
 */
@Slf4j
public class BlockEchoServer {

    /**
     * 服务端口号
     */
    private final int port;
    private final ServerSocketChannel serverSocketChannel;
    private final ServerSocket serverSocket;
    private final ExecutorService executorService;
    /**
     * 线程池中工作的线程数目
     */
    private final int POOL_MULTIPLE = 4;


    public BlockEchoServer(int port) throws IOException {
        this.port = port;
        executorService = Executors.newFixedThreadPool(
                // ava.lang.Runtime.availableProcessors() 方法: 返回可用处理器的Java虚拟机的数量。
                Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE
        );
        // 打开通道
        this.serverSocketChannel = ServerSocketChannel.open();
        // 返回与ServerSocketChannel关联的ServerSocket对象,每个ServerSocketChannel对象都与一个ServerSocket对象关联
        serverSocket = serverSocketChannel.socket();
        // 使得在同一个主机上关闭了服务器,紧接着再启动服务器程序时,可以顺利绑定相同的端口
        serverSocket.setReuseAddress(true);
        // 与本地的端口绑定
        serverSocket.bind(new InetSocketAddress(port));
        log.info("the server has bind address is {}:{}", this.serverSocket.getInetAddress().getHostAddress(), this.port);
    }

    public void service() {
        while (true) {
            SocketChannel socketChannel;
            try {
                // 等待接收客户端连接,一旦有客户端连接,就会返会与当前客户端连接的SocketChannel的对象
                socketChannel = serverSocketChannel.accept();
                // 开启一个线程去处理当前客户端连接
                executorService.submit(new RequestHandler(socketChannel));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

package study.wyy.net.nio.thread;


import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.net.Socket;
import java.nio.channels.SocketChannel;

/**
 * @author wyaoyao
 * @date 2021/3/17 17:35
 */
@Slf4j
public class RequestHandler implements Runnable {
    private final SocketChannel socketChannel;
    private Socket socket;

    public RequestHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            // 获得与socketChannel关联的Socket对象
            socket = socketChannel.socket();
            log.info("new client connection from {}:{} accept", socket.getInetAddress(), socket.getPort());

            // 获取输入流
            BufferedReader reader = getReader(socket);
            // 获取输出流
            PrintWriter writer = getWriter(socket);
            String msg = null;
            while ((msg = reader.readLine()) != null) {
                log.info("accept message from client is {}", msg);

                // 返回响应给客户端
                writer.println("echo: " + msg);
                if (msg.contains("bye")) {
                    // 如果客户端发来的是bye,则退出当前会话
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (socketChannel != null){
                try {
                    socketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    public BufferedReader getReader(Socket socket) throws IOException {
        InputStream inputStream = socket.getInputStream();
        return new BufferedReader(new InputStreamReader(inputStream));
    }

    public PrintWriter getWriter(Socket socket) throws IOException {
        return new PrintWriter(socket.getOutputStream(), true);
    }
}

1.2 测试

  1. 为了测试先写一个客户端: 采用阻塞式
package study.wyy.net.nio.client;

import lombok.Builder;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;

/**
 * @author wyaoyao
 * @date 2021/3/17 17:59
 */
@Slf4j
public class BlockEchoClient {

    private final SocketChannel socketChannel;
    private final String serverHost;
    private final int serverPort;

    public BlockEchoClient(String serverHost, int serverPort) throws IOException {
        this.serverHost = serverHost;
        this.serverPort = serverPort;
        this.socketChannel = SocketChannel.open();
        // 连接服务器
        SocketAddress remote = new InetSocketAddress(serverHost, serverPort);
        socketChannel.connect(remote);
        log.info("connect echo server success");
    }

    public void send(String message) {
        try {
            BufferedReader reader = getReader(socketChannel.socket());
            PrintWriter writer = getWriter(socketChannel.socket());
            // 发送数据
            writer.println(message);
            log.info("send request success; content is {}", message);
            // 读取服务端的响应
            String s1 = reader.readLine();
            log.info("get response success; response is {}", s1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void close() throws IOException {
        if(socketChannel != null){
            socketChannel.close();
        }
    }

    public BufferedReader getReader(Socket socket) throws IOException {
        InputStream inputStream = socket.getInputStream();
        return new BufferedReader(new InputStreamReader(inputStream));
    }

    public PrintWriter getWriter(Socket socket) throws IOException {
        return new PrintWriter(socket.getOutputStream(), true);
    }
}
  1. 启动服务端
public class BlockEchoServerTest {
    public static void main(String[] args) throws IOException {
        BlockEchoServer server = new BlockEchoServer(10010);
        // 等待客户端连接
        server.service();
    }
}
  1. 启动客户端,模拟3个客户端同时访问
public static void main(String[] args) throws IOException {
        Arrays.asList(1,2,3).stream().forEach(i->{
            new Thread(()->{
                BlockEchoClient client = null;
                try {
                    client = new BlockEchoClient("localhost", 10010);
                    client.send("hello! from " + Thread.currentThread().getName());

                    client.send("你好! from " + Thread.currentThread().getName());
                    client.send("bye! from " +  Thread.currentThread().getName());
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if(client != null){
                        try {
                            client.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            },"client" +i).start();

        });
    }

测试客户端输出结果:

07:56:29.945 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
07:56:29.945 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
07:56:29.945 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
07:56:29.949 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client1
07:56:29.949 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client2
07:56:29.949 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client3
07:56:29.981 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client1
07:56:29.981 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client1
07:56:30.993 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client1
07:56:30.993 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client1
07:56:31.981 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client3
07:56:31.981 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client3
07:56:32.002 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client3
07:56:32.003 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client3
07:56:32.016 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client1
07:56:32.017 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client3
07:56:32.982 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client2
07:56:32.983 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client2
07:56:33.004 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client2
07:56:33.005 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client2
07:56:37.027 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client2

由于采用多线程的方式,使得可以同时处理多个客户的连接。

如果服务端修改为不采用多线程:

public void service() {
     while (true) {
         SocketChannel socketChannel;
         try {
             // 等待接收客户端连接,一旦有客户端连接,就会返会与当前客户端连接的SocketChannel的对象
             socketChannel = serverSocketChannel.accept();
             // 开启一个线程去处理当前客户端连接
             // executorService.submit(new RequestHandler(socketChannel));
             // 这里不开启线程
             new RequestHandler(socketChannel).run();
         } catch (IOException e) {
             e.printStackTrace();
         }
     }
 }

在测试:

08:12:59.029 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
08:12:59.029 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
08:12:59.029 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
08:12:59.036 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client3
08:12:59.036 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client2
08:12:59.036 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client1
08:12:59.049 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client2
08:12:59.049 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client2
08:12:59.051 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client2
08:12:59.051 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client2
08:12:59.053 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client2
08:12:59.065 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client1
08:12:59.065 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client1
08:12:59.096 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client1
08:12:59.096 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client1
08:12:59.107 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client1
08:12:59.119 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client3
08:12:59.119 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client3
08:12:59.140 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client3
08:12:59.140 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client3
08:12:59.151 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client3

三个客户端同时发送了是第一个请求,但是服务端只能依次一个客户端处理,比如这里先处理client2,直到把client2的三个请求(hello,你好,bye)才会去处理下一个客户端。

2 非阻塞式服务端

在非阻塞模式下,EchoServer只需要启动一个主线程,就能同时处理三件事

  • 接收客户的连接。
  • 接收客户发送的数据。
  • 向客户发回响应数据。

EchoServer委托Selector来负责监控接收连接就绪事件、读就绪事件和写就绪事件,如果有特定事件发生,就处理该事件。
EchoServer类的构造方法负责启动服务器,把它绑定到一个本地端口,代码如下:

  1. 构造方法
@Slf4j
public class NoBlockEchoServer {

    /**
     * 委托给Selector来负责接收连接就绪事件,读就绪事件,写就绪事件
     */
    private final Selector selector;

    private final ServerSocketChannel serverSocketChannel;

    private final ServerSocket serverSocket;

    private final int port;

    public NoBlockEchoServer(int port) throws IOException {
        // 创建一个Selector对象
        this.selector = Selector.open();
        // 创建一个ServerSocketChannel对象
        serverSocketChannel = ServerSocketChannel.open();
        // 返回与ServerSocketChannel关联的ServerSocket对象,每个ServerSocketChannel对象都与一个ServerSocket对象关联
        serverSocket = serverSocketChannel.socket();
        // 使得在同一个主机上关闭了服务器,紧接着再启动服务器程序时,可以顺利绑定相同的端口
        serverSocket.setReuseAddress(true);
        // 设置serverSocketChannel为非阻塞工作模式
        serverSocketChannel.configureBlocking(false);
        // 绑定本地端口
        this.port = port;
        serverSocketChannel.bind(new InetSocketAddress(port));
        log.info("the server has bind address is {}:{}", this.serverSocket.getInetAddress().getHostAddress(), this.port);
    }
}
  1. service()方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:
public void service() throws IOException {
        // 注册一个连接事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // select()返回已经发生的SelectionKey对象的数量,该方法是阻塞的,如果一个也没有就进入阻塞,
        while (selector.select() > 0) {
            // 进入循环就说明事件发生
            // 获取相关事件已经被Selector捕获的SelectionKey的集合
            Set<SelectionKey> readyKes = selector.selectedKeys();
            // 遍历处理这些已经捕获到的事件
            Iterator<SelectionKey> iterator = readyKes.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = null;
                try {
                    // 取出一个SelectionKey,进行处理
                    key = iterator.next();
                    // 既然取出来,就可以从集合中删除了
                    iterator.remove();
                    // 判断事件类型
                    if (key.isAcceptable()) {
                        // 处理连接就绪事件
                    }
                    if (key.isReadable()) {
                        // 处理读就绪事件
                    }
                    if (key.isWritable()) {
                        // 处理写就绪事件
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    if (null != null) {
                        // 失效掉这个key, selector不再感兴趣这个SelectionKey感兴趣的事件
                        key.cancel();
                        // 关闭与这个key关联的socketChannel
                        key.channel().close();
                    }
                }
            }
        }
    }

service方法中,首先由serverSocketChannel向Selector 注册连接就绪事件。如果Selector监控到该事件发生,就会把相应的SelectionKey对象加入到selected-keys集合中(相关事件已经被Selector捕获的SelectionKey的集合)。接下来第一层while循环,会不断的询问Selector已经发生的事件,然后依次处理这些事件。

其中获取已经发生的事件的SelectionKey个数,如果当前没有任何事件发生,这个方法就会阻塞下去,直到至少一件事情发生。selector.selectedKeys()这个方法返回已经被Selector捕获的SelectionKey的集合(selected-keys集合),selected-keys集合存放了相关事件已经发生的SelectionKey对象。

接下来就是遍历selected-keys集合,处理这些已经捕获到的事件。如果出现异常,就会失效这个SelectionKey,并且关闭与之关联的channel

2.1 处理连接事件

刚刚service方法已经通过if留出了每个事件类型处理的地方,现在就先处理连接事件,代码如下:

private void handleAcceptable(SelectionKey selectionKey) throws IOException {
        // 获取与SelectionKey关联的serverSocketChannel,就是通过serverSocketChannel来传输数据的
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        // 获取与客户端连接的SocketChannel
        SocketChannel socketChannel = serverSocketChannel.accept();
        log.info("accept client connection from {}:{} accept", socketChannel.socket().getInetAddress(), socketChannel.socket().getPort());
        // 设置socketChannel为非阻塞
        socketChannel.configureBlocking(false);
        // 创建一个缓冲区,用于存放客户端发来的数据
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        // SocketChannel向selector注册读就绪事件和写就绪事件
        // 并把byteBuffer作为附件注册进去,在读写事件发生的时候获取byteBuffer,进行数据读写
        socketChannel.register(selector,SelectionKey.OP_READ | SelectionKey.OP_WRITE,byteBuffer);

    }

如果isAcceptable方法返回true,就表示这个SelectionKey所有感兴趣的接收连接就绪事件已经发生了

首先通过SelectionKey的channel()方法获的与之关联的ServerSocketChannel,然后调用ServerSocketChannel的accpet方法获取与客户端连接的SocketChannel对象。这个SocketChannel对象默认是阻塞模式的,所以首先调用configureBlocking(fasle)方法将其设置为非阻塞模式。

SocketChannel调用register方法向selector注册读就绪事件和写就绪事件,并把byteBuffer作为附件与新建的这个SelectionKey关联。

2.2 处理读就绪事件

如果isReadable方法返回true,就表示这个SelectionKey所有感兴趣的读就绪事件已经发生了

private void handleReadable(SelectionKey selectionKey) throws IOException {
    // 获取关联的的附件
    ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
    // 获取与当前SelectionKey关联的SocketChannel
    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    // 创建ByteBuffer字节缓冲区,用于存放读取到的数据
    ByteBuffer readBuffer = ByteBuffer.allocate(32);
    socketChannel.read(readBuffer);
    // flip():把极限设为位置,再把位置设为0
    readBuffer.flip();

    // 把buffer的极限设置为容量
    buffer.limit(buffer.capacity());
    // 把readBuffer中的数据拷贝到buffer中
    // 假定buffer的容量足够大,不会出现缓冲区溢出的情况
    buffer.put(readBuffer);
}
上一篇:MySQLDay02


下一篇:父子组件传值