欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

kitex中consistenthashing的实现

时间:2023-06-16

一致性哈希算法(consistent hashing)

kitex 中一致性的很多细节都和我预先理解的不一样。

这种负载均衡算法是在client侧实现的,那么client是怎么知道所有的ip的? 感觉这种算法应该是做一个中间件比较好,client请求实现一致性hash的中间件,中间件依据一致性hash算法来选取节点返回ip port,client侧应当不关注路由算法才对。算法中环是用数组实现的,这不奇怪。但节点的路由是二分查找找到的,这就有点奇怪。consistent hashing 在consist.go中,很难理解这个文件为什么不叫consistent.gobuildWeightedVirtualNodes和buildVirtualNodes的代码长相惊人的类似,一看就是ctrl c v后懒得改

客户端测试代码:

package mainimport ("context""fmt""log""time""github.com/cloudwego/kitex-examples/kitex_gen/api""github.com/cloudwego/kitex-examples/kitex_gen/api/echo""github.com/cloudwego/kitex/client""github.com/cloudwego/kitex/pkg/loadbalance")type ctxKey intconst (ctxConsistentKey ctxKey = iota)func main() {opt := loadbalance.NewConsistentHashOption(func(ctx context.Context, request interface{}) string {key, _ := ctx.Value(ctxConsistentKey).(string)return key})opt.Weighted = truelb := loadbalance.NewConsistBalancer(opt)client, err := echo.NewClient("echo", client.WithHostPorts("0.0.0.0:8801", "0.0.0.0:8802"), client.WithLoadBalancer(lb))if err != nil {log.Fatal(err)}var i int = 0for {// call a serverctx := context.Background()ctx = context.WithValue(ctx, ctxConsistentKey, "my key"+fmt.Sprintf("%d", i))req := &api.Request{Message: "my request" + fmt.Sprintf("%d", i)}resp, err := client.Echo(ctx, req)if err != nil {log.Fatal(err)}log.Println("call id :"+fmt.Sprintf("%d", i), resp)time.Sleep(time.Millisecond)// time.Sleep(time.Second)i++}}

// 这一行进行配置,这里没有进行到后端的路由client, err := echo.NewClient("echo", client.WithHostPorts("0.0.0.0:8801", "0.0.0.0:8802"), client.WithLoadBalancer(lb))

// 调用时才进行路由resp, err := client.Echo(ctx, req)

调用堆栈:

TODO:客户端有缓存,如果所路由到的对端挂掉,这个缓存会清空重建吗? 这块没看

这里依据是否根据权重来判断,buildWeightedVirtualNodes 和buildVirtualNodes 函数没处理好,冗余代码比较多

func (cb *consistBalancer) buildNodes(ins []discovery.Instance) ([]realNode, []virtualNode) {ret := make([]realNode, len(ins))for i := range ins {ret[i].Ins = ins[i]}if cb.opt.Weighted {return ret, cb.buildWeightedVirtualNodes(ret)}return ret, cb.buildVirtualNodes(ret)}

建立虚拟节点:(代码有改动)

// build virtual nodesfunc (cb *consistBalancer) buildWeightedVirtualNodes(rNodes []realNode) []virtualNode {if len(rNodes) == 0 {return []virtualNode{}}vlen := 0for i := range rNodes {// 10 100vlen += rNodes[i].Ins.Weight() * int(cb.opt.VirtualFactor)}// 2000ret := make([]virtualNode, vlen)if vlen == 0 {return ret}maxLen := 0for i := range rNodes {// TODO 优化 代码难看,使用 maxif len(rNodes[i].Ins.Address().String()) > maxLen {maxLen = len(rNodes[i].Ins.Address().String())}}// l-> lengthl := maxLen + 1 + cb.opt.virtualFactorLen // "$address + # + itoa(i)"// pre-allocate []byte here, and reuse it to prevent memory allocationb := make([]byte, l)// record the start indexcur := 0for i := range rNodes {ins := rNodes[i].InsbAddr := utils.StringToSliceByte(ins.Address().String())// assign the first few bits of b to stringcopy(b, bAddr)// initialize the last few bits, skipping '#'for j := len(bAddr) + 1; j < len(b); j++ {b[j] = 0}b[len(bAddr)] = '#'// len of curlen := int(cb.opt.VirtualFactor) * ins.Weight()for j := 0; j < len; j++ {k := jcnt := 0// assign values to b one by one, starting with the last onefor k > 0 {b[l-1-cnt] = byte(k % 10)k /= 10cnt++}// at this point, the index inside ret should be cur + jindex := cur + jlog.Println("b: ", b,"cur :", cur,"j :", j,"index :", index)ret[index].hash = xxhash.Sum64(b)ret[index].RealNode = &rNodes[i]}cur += len}sort.Sort(&vNodeType{s: ret})return ret}

ret[index].hash = xxhash.Sum64(b)

这里计算出hash

一张图片说明一切:
拼接出:hash的key为: 【ip】【port】【#】【序列号】如: 【0.0.0.0:8802#123】

感觉这里的#号好像没什么用,可能是为了方便debug

最后得到的是一个数组,数组大小依据虚拟节点个数还有实例个数确定,数组依据hash 的大小来确定,排序是为了后面的二分能找到具体的节点。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。