怎么实现一个分布式kv系统-2-静态分区

摘要

本节要实现的有3点

  • 解析toml文件
  • 计算key的hash值
  • 将请求路由到对应的shard

编程实现

1. 定义&解析toml规则文件

定义sharding.toml文件

[[shards]]
name = "Moscow"
id = 0

[[shards]]
name = "Minsk"
id = 1

[[shards]]
name = "Kiev"
id = 2

导入解析toml的包

添加configFile参数解析

在main中读取配置文件

import (
	"github.com/BurntSushi/toml"
)

var (
	configFile = flag.String("config-file", "sharding.toml", "The configuration file")
)

func main() {
	flag.Parse()

	var c config.Config
	if _, err := toml.DecodeFile(*configFile, &c); err != nil {
		log.Fatalf("failed to decode config file(%q):%v", *configFile, err)
	}
}

2. 添加config模块

创建config/config.go

定义Config & Shard结构

package config

type Shard struct {
	Name string
	Id   string
}

type Config struct {
	Shards []Shard
}

3. 指定shard

先不自动shard,先手动指定shard

添加shard参数解析

检查shard是否存在

确认shard的id

// main.go
var (
  shardName   = flag.String("shardName", "Moscow", "The name of the shard")
)

ok, shard := c.ExistsShard(*shardName)
if !ok {
  log.Fatalf("shard %v not exists", shardName)
}

// config.go
func (c *Config) ExistsShard(name string) (bool, Shard) {
  var rc Shard
  for _, shard := range c.Shards {
    if shard.Name == name {
      return true, shard
    }
  }
  return false, rc
}

4. 写入数据到指定shard

写入规则:hash(key) % shardCounter 就是需要写入的分片

更新Server结构,添加shardCounter&shardIndex字段

// web.go
type Server struct {
	db           *db.Database
	shardCounter int
	shardIdex    int
}

func NewServer(db *db.Database, shardCounter, shardIdex int) *Server {
	return &Server{
		db:           db,
		shardCounter: shardCounter,
		shardIndex:    shardIndex,
	}
}

导入计算hash的包: hash/fnv包,计算shard

更新GetHandler、SetHandler,调用getShard来获取应该写入数据的分片。

// web.go
func (s *Server) getShard(key string) uint64 {
	h := fnv.New64()
	h.Write([]byte(key))
	return h.Sum64() % uint64(s.shardCounter)
}

func (s *Server) GetHandler(w http.ResponseWriter, r *http.Request) {
	r.ParseForm()
	key := r.Form.Get("key")
	value, err := s.db.GetKey(key)
	fmt.Fprintf(w, "%q:%q; shard: %d; %v Get Called\n", key, value, s.getShard(key), err)
}

func (s *Server) SetHandler(w http.ResponseWriter, r *http.Request) {
	r.ParseForm()
	key := r.Form.Get("key")
	value := r.Form.Get("value")
	err := s.db.SetKey(key, []byte(value))
	fmt.Fprintf(w, "%q:%q; shard: %d; %v; Set called\n", key, value, s.getShard(key), err)
}

4.获取所有节点的地址

以上虽然可以计算出分片,可以还没有办法路由给其他的分片。

首先,需要知道其他分片的地址。

其次,将信息传入到Server对象中。

接着,定义redirect函数,请求转发。

最后,改造GetHandle、GetHandle转发请求。

// config.go
func (c *Config) GetAddress() map[int]string {
	addrs := make(map[int]string)
	for _, addr := range c.Shards {
		addrs[addr.Id] = addr.Address
	}
	return addrs
}

// web.go
type Server struct {
	db           *db.Database
	shardCounter int
	shardIndex   int
	addrs        map[int]string
}

// main.go
addrs := c.GetAddress()
svr := web.NewServer(db, len(c.Shards), shard.Id, addrs)

// web.go
func (s *Server) redirect(w http.ResponseWriter, r *http.Request, shard int) error {
	resp, err := http.Get("http://" + s.addrs[shard] + r.RequestURI)
	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		fmt.Fprintf(w, "Error redirect request:%v", err)
		return err
	}
	defer resp.Body.Close()
	io.Copy(w, resp.Body)
	return nil
}



func (s *Server) GetHandler(w http.ResponseWriter, r *http.Request) {
	r.ParseForm()
	key := r.Form.Get("key")
	shard := s.getShard(key)
	if shard != uint32(s.shardIndex) {
		err := s.redirect(w, r, int(shard))
		if err != nil {
			return
		} 
	} else {
		value, err := s.db.GetKey(key)
		fmt.Fprintf(w, "%q:%q; target shard:%d; current shard:%d; %v Get Called\n", key, value, shard, s.shardIndex, err)
	}
}

func (s *Server) SetHandler(w http.ResponseWriter, r *http.Request) {
	r.ParseForm()
	key := r.Form.Get("key")
	value := r.Form.Get("value")
	shard := s.getShard(key)
	if shard != uint32(s.shardIndex) {
		err := s.redirect(w, r, int(shard))
		if err != nil {
			return
		}
	} else {
		err := s.db.SetKey(key, []byte(value))
		fmt.Fprintf(w, "%q:%q; target shard:%d; current shard:%d; %v Set Called\n", key, value, shard, s.shardIndex, err)
	}
}

5.测试

get(){
    for key in a b c;do 
        echo -e "\n========= get $key========="
        for port in 8000 8001 8002;do 
            curl "http://127.0.0.1:$port/get?key=$key"
        done
    done
}

set(){
    for key in a b c;do 
        echo -e "\n========= set $key========="
        for port in 8000 8001 8002;do 
            curl "http://127.0.0.1:$port/set?key=$key&value=$key"
        done
    done
}

set
get

参考资料

本节完整代码:https://github.com/YuriyNasretdinov/distribkv/tree/part2

youtube视频:https://www.youtube.com/watch?v=5VK5tAyZDxQ&list=PLWwSgbaBp9XrMkjEhmTIC37WX2JfwZp7I&index=3

B站视频:https://www.bilibili.com/video/BV1nR4y177YM?p=2

上一篇:汇编 字符串统计 大写 小写 数字 其他


下一篇:A股全自动化交易——从零到实盘20(完结)