手写一个Jedis以及JedisPool

技术经验交流:点击入群


Redis、Jedis的用途就不介绍了,不了解的可以先去官网:


https://www.redis.net.cn/tutorial/3501.html


本文章着重讲解如何手动实现一个类似jedis的工具。


1. 源码探索


经过源码的研究,可以发现Jedis的实现是基于Socket,可以从Jedis的set(key,value)方法开始追溯:


public void set(String key, String value) {
    this.set(SafeEncoder.encode(key), SafeEncoder.encode(value));
}
public void set(byte[] key, byte[] value) {
    this.sendCommand(Command.SET, new byte[][]{key, value});
}    
public static void sendCommand(RedisOutputStream os, Protocol.Command command, byte[]... args) {
    sendCommand(os, command.raw, args);
}
private static void sendCommand(RedisOutputStream os, byte[] command, byte[]... args) {
    try {
        os.write((byte)42);
        os.writeIntCrLf(args.length + 1);
        os.write((byte)36);
        os.writeIntCrLf(command.length);
        os.write(command);
        os.writeCrLf();
        byte[][] var3 = args;
        int var4 = args.length;
        for(int var5 = 0; var5 < var4; ++var5) {
            byte[] arg = var3[var5];
            os.write((byte)36);
            os.writeIntCrLf(arg.length);
            os.write(arg);
            os.writeCrLf();
        }
    } catch (IOException var7) {
        throw new JedisConnectionException(var7);
    }
}


System.out.println((char)42);
System.out.println(((char)36));


控制台:

*

$


可以大体的看出他得实现过程。把sendCommand方法翻译一下就是:

os.write("*".getBytes());
os.write(//数组长度\r\n);
os.write("$".getBytes());
os.write(//命令长度\r\n)
os.write(命令\r\n);
然后进入循环写入: 
    $+参数长度\r\n
    参数\r\n


整理一下:就是Redis的通信协议,官方文档:https://redis.io/topics/protocol

总结一下就是以下几点:


*3   // 数据一共有三个数组
//数组1
$6   //下行为6个长度的字符串
APPEND   
//数组2
$5    // 下行为5个长度的字符串
fantj
//数组3
$3    // 下行为3个长度的字符串
666


手写Jedis


通过源码可以看到,它在Connection类中进行sendCommend,那我们也一样:


Connection.java


负责与Redis的Server端建立连接并获取反馈信息。

package com.fantj.jedis.connect;
import com.fantj.jedis.protocol.Protocol;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
/**
 * 连接类
 * 在这里进行创建连接并处理IO请求,用inputStream进行数据回显,
 * 提供OutputStream给协议层,以便让其给服务端发送命令
 */
public class Connection {
    private String host = "localhost";
    private int port = 6379;
    private Socket socket;
    private OutputStream outputStream;
    private InputStream inputStream;
    public Connection() {
    }
    public Connection(String host) {
        this.host = host;
    }
    public Connection(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public Connection sendCommand(Protocol.Command cmd, byte[]... args) {
        connect();
        Protocol.sendCommand(outputStream, cmd, args);
        return this;
    }
    private void connect() {
        try {
            if (socket == null) {  //IO复用
                socket = new Socket(host, port);
                inputStream = socket.getInputStream();
                outputStream = socket.getOutputStream();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 操作状态的返回
     * 比如:SET 操作成功返回 +OK
     */
    public String getStatus() {
        byte[] bytes = new byte[1024];
        try {
            socket.getInputStream().read(bytes);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new String(bytes);
    }
}


而真正的sendCommend实现是在Protocol类中实现(Jedis源码也是这样):


Protocol.java


负责提供RESP协议支持和拼接。

package com.fantj.jedis.protocol;
import java.io.IOException;
import java.io.OutputStream;
/**
 * RESP协议
 * 详见; https://redis.io/topics/protocol
 */
public class Protocol {
    // jedis后来将这些常量优化为byte,在os进行写出的时候对其进行char转型
    private static final String DOLLAR_BYTE = "$";
    private static final String ASTERISK_BYTE = "*";
    public static final byte PLUS_BYTE = 43;
    public static final byte MINUS_BYTE = 45;
    public static final byte COLON_BYTE = 58;
    private static final String BLANK_BYTE = "\r\n";
    /**
     * 拼接RESP 并 发送write
     */
    public static void sendCommand(OutputStream os,Protocol.Command cmd, byte[] ... args){
        // 1. 生成协议 *3 $3 SET $3 key $5 value
        StringBuffer stringBuffer = new StringBuffer();
        // 1.1 数组长度 *3
        stringBuffer.append(ASTERISK_BYTE).append(args.length+1).append(BLANK_BYTE);
        // 1.2 命令长度 $3
        stringBuffer.append(DOLLAR_BYTE).append(cmd.name().length()).append(BLANK_BYTE);
        // 1.3 命令 SET / GET
        stringBuffer.append(cmd).append(BLANK_BYTE);
        for (byte[] arg: args){
            // 1.4 key/value 长度
            stringBuffer.append(DOLLAR_BYTE).append(arg.length).append(BLANK_BYTE);
            // 1.5 key/value
            stringBuffer.append(new String(arg)).append(BLANK_BYTE);
        }
        // 写出到服务端
        try {
            os.write(stringBuffer.toString().getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 定义一个枚举类 存放命令
     */
    public static  enum Command{
        SET , GET , KEYS, APPEND
    }}


Client.java


对外提供API。

package com.fantj.jedis.client;
import com.fantj.jedis.connect.Connection;
import com.fantj.jedis.protocol.Protocol;
/**
 * 客户端
 * 给开发人员使用提供API
 */
public class Client {
    private Connection connection;
    public Client(String host, int port){
        connection = new Connection(host,port);
    }
    public String set(String key, String value) {
        connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes());
        return connection.getStatus();
    }
    public String get(String key) {
        connection.sendCommand(Protocol.Command.GET, key.getBytes());
        return connection.getStatus();
    }
    public void set(String key, String value, String nx, String ex, int i) {
        connection.sendCommand(Protocol.Command.SET, key.getBytes(), value.getBytes(), nx.getBytes(), ex.getBytes(), String.valueOf(i).getBytes());
    }
    public void append(String key, String value){
        connection.sendCommand(Protocol.Command.APPEND, key.getBytes(),value.getBytes());
    }
}



Jedis.java


对Client类的进一层封装,留给开发人员使用。

package com.fantj.jedis.client;
public class Jedis extends Client{
    public Jedis(String host, int port) {
        super(host, port);
    }
    @Override
    public String set(String key, String value) {
        return super.set(key, value);
    }
    @Override
    public String get(String key) {
        return super.get(key);
    }
    @Override
    public void set(String key, String value, String nx, String ex, int i) {
        super.set(key, value, nx, ex, i);
    }
    @Override
    public void append(String key, String value) {
        super.append(key, value);
    }
}


测试

/**
 * 测试我们自己写的客户端
 */
public class Main {
    private Jedis client = new Jedis("www.xxx.top",6380);
    @Test
    public void set(){
        client.set("fantj","fantj");
        String result = client.get("fantj");
        System.out.println(result);
    }
    @Test
    public void setNx(){
        client.set("fantj","fantj","NX","EX",10000);
        String result = client.get("fantj");
        System.out.println(result);
    }
    @Test
    public void append(){
//        client.append("fantj","-2019");
        String fantj = client.get("fantj");
        System.out.println(fantj);
    }
    @Test
    public void testChar(){
        System.out.println((char)42);
        System.out.println(((char)36));
    }
}


测试都可以通过。


总结一下原理


假设我执行append("fantj","666")这个命令,那客户端的操作过程:


1、RESP协议对命令的分析

*3   // 数据一共有三个数组
//数组1
$6   //下行为6个长度的字符串
APPEND   
//数组2
$5    // 下行为5个长度的字符串
fantj
//数组3
$3    // 下行为3个长度的字符串
666


2、然后我们进行拼接

StringBuffer sb = new StringBuffer();
// 注意每个类型表示完后都进行换行
sb.append("*").append("3").append("\r\n");
sb.append("$").append("6").append("\r\n");
sb.append("APPEND").append("\r\n");
sb.append("$").append("5").append("\r\n");
sb.append("fantj").append("\r\n");
sb.append("$").append("3").append("\r\n");
sb.append("666").append("\r\n");


3、然后用os进行写出

os.write(sb.toString().getBytes());


连接池的实现



Pool<T>.java

/**
 * 连接池契约
 */
public interface Pool<T> {
    /**
     * 初始化连接池
     * @param maxTotal 最大连接数
     * @param maxWaitMillis 最大等待时间
     */
    public void init(int maxTotal, long maxWaitMillis);
    /**
     * 获取连接
     * @return 返回jedis对象
     */
    public Jedis getResource() throws Exception;
    /**
     * 释放连接
     */
    public void release(T t);
}


JedisPool.java


实现方式与ExcutorThreadPool工作流程类似,注释写的挺全的就不做详细解读了。

package com.fantj.jedis.pool;
import com.fantj.jedis.client.Jedis;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class JedisPool implements Pool<Jedis>{
    public JedisPool(String url, int port) {
        this.url = url;
        this.port = port;
        init(maxTotal,maxWaitMillis);
    }
    public JedisPool(String url, int port, int maxTotal, long maxWaitMillis) {
        this.url = url;
        this.port = port;
        this.maxTotal = maxTotal;
        this.maxWaitMillis = maxWaitMillis;
    }
    private String url;
    private int port;
    private int maxTotal = 20;
    private long maxWaitMillis = 1000;
    // 空闲的连接queue
    private LinkedBlockingQueue<Jedis> idleWorkQueue = null;
    private Queue<Jedis> activeWorkQueue = null;
    // 当前连接数量
    private AtomicInteger count = new AtomicInteger(0);
    @Override
    public void init(int maxTotal, long maxWaitMillis) {
        maxTotal = maxTotal;
        maxWaitMillis = maxWaitMillis;
        idleWorkQueue = new LinkedBlockingQueue<>(maxTotal);
        activeWorkQueue = new LinkedBlockingQueue<>(maxTotal);
    }
    @Override
    public Jedis getResource() throws Exception {
        Jedis jedis = null;
        // 1. 记录开始时间,检测超时
        long startTime = System.currentTimeMillis();
        while (true){
            // 2. 从空闲队列中获取连接,如果拿到,一式两份存放到活动队列
            jedis = idleWorkQueue.poll();
            if (jedis != null){
                activeWorkQueue.offer(jedis);
                return jedis;
            }
            // 3. 如果失败,判断池是否满,没满则创建
            if (count.get() < maxTotal){
                if (count.incrementAndGet() <= maxTotal){
                    jedis = new Jedis(url,port);
                    activeWorkQueue.offer(jedis);
                    System.out.printf("创建了一个新的连接: %s \r\n", jedis.toString());
                    return jedis;
                }else {
                    count.decrementAndGet();
                }
            }
            // 4. 如果连接池满了,则在超时时间内进行等待
            try {
                jedis = idleWorkQueue.poll(maxWaitMillis-(System.currentTimeMillis()-startTime), TimeUnit.MILLISECONDS);
                if (jedis != null){
                    activeWorkQueue.offer(jedis);
                    return jedis;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 5. poll可能被中断,所以在这里再进行超时判断
            if (maxWaitMillis < (System.currentTimeMillis()-startTime)){
                throw new RuntimeException("JedisPool: jedis connect timeout");
            }
        }
    }
    @Override
    public void release(Jedis jedis) {
        if (activeWorkQueue.remove(jedis)){
            idleWorkQueue.offer(jedis);
        }
    }
}

GitHub地址: https://github.com/fantj2016/easy-jedis

(点击文字可跳转)

1. 深究Spring中Bean的生命周期

2. 深入SpringBoot核心注解原理

3.线上环境部署概览

4.Springboot Vue shiro 实现前后端分离、权限控制

上一篇:Redis4.0新特性(三)-PSYNC2


下一篇:Photoshop简单制作粉笔字教程