找回密码
 立即注册
首页 业界区 业界 hashicorp/raft模块实现的raft集群存在节点跨集群身份冲 ...

hashicorp/raft模块实现的raft集群存在节点跨集群身份冲突问题

奄幂牛 7 天前
问题场景描述

我通过模块github.com/hashicorp/raft使用golang实现了一个raft集群功能,发现如下场景中会遇到一个问题:
测试启动如下2个raft集群,集群名称,和集群node与IP地址如下,raft集群均通过BootstrapCluster方法初始化:
Cluster1 BootstrapCluster servers:
  1. - node1: {raft.ServerID: c1-node1, raft.ServerAddress: 192.168.100.1:7000}
  2. - node2: {raft.ServerID: c1-node2, raft.ServerAddress: 192.168.100.2:7000}
  3. - node3: {raft.ServerID: c1-node3, raft.ServerAddress: 192.168.100.3:7000}
复制代码
Cluster2 BootstrapCluster servers:
  1. - node3: {raft.ServerID: c2-node3, raft.ServerAddress: 192.168.100.3:7000}
  2. - node4: {raft.ServerID: c2-node4, raft.ServerAddress: 192.168.100.4:7000}
  3. - node5: {raft.ServerID: c2-node5, raft.ServerAddress: 192.168.100.5:7000}
复制代码
其中,"node3"的地址会存在2个集群中。

  • "node1","node2"按照"Cluster1"启动:
sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node1
sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node2

  • "node3","node4","node5"先按照"Cluster2"启动:
sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node3
sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node4
sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node5
然后就会发现"node3"会在"Cluster1"和"Cluster2"之间来回切换,一会属于"Cluster1",一会属于"Cluster2".
  1. INFO[0170] current state:Follower, leader address:127.0.0.5:800, servers:[{Suffrage:Voter ID:c2-node3 Address:127.0.0.3:800} {Suffrage:Voter ID:c2-node4 Address:127.0.0.4:800} {Suffrage:Voter ID:c2-node5 Address:127.0.0.5:800}], last contact:2025-05-14 15:35:53.330867 +0800 CST m=+169.779019126
  2. INFO[0171] current state:Follower, leader address:127.0.0.1:800, servers:[{Suffrage:Voter ID:c2-node3 Address:127.0.0.3:800} {Suffrage:Voter ID:c2-node4 Address:127.0.0.4:800} {Suffrage:Voter ID:c2-node5 Address:127.0.0.5:800}], last contact:2025-05-14 15:35:54.308388 +0800 CST m=+170.756576126
复制代码
我的代码如下
  1. package main
  2. import (
  3.         "flag"
  4.         "fmt"
  5.         "io"
  6.         "net"
  7.         "os"
  8.         "strconv"
  9.         "strings"
  10.         "time"
  11.         "github.com/hashicorp/raft"
  12.         log "github.com/sirupsen/logrus"
  13. )
  14. type raftCluster struct {
  15.         localRaftID     raft.ServerID
  16.         servers         map[raft.ServerID]raft.ServerAddress // raftID : raftAddressPort
  17.         raft            *raft.Raft
  18.         electionTimeout time.Duration
  19. }
  20. func (r *raftCluster) Start() error {
  21.         config := raft.DefaultConfig()
  22.         config.HeartbeatTimeout = 2000 * time.Millisecond
  23.         config.ElectionTimeout = 5000 * time.Millisecond
  24.         config.CommitTimeout = 2000 * time.Millisecond
  25.         config.LeaderLeaseTimeout = 2000 * time.Millisecond
  26.         config.LocalID = r.localRaftID
  27.         config.LogOutput = log.StandardLogger().Out
  28.         r.electionTimeout = config.ElectionTimeout * time.Duration(len(r.servers)*2)
  29.         localAddressPort := string(r.servers[r.localRaftID])
  30.         tcpAddr, err := net.ResolveTCPAddr("tcp", localAddressPort)
  31.         if err != nil {
  32.                 return fmt.Errorf("resolve tcp address %s, %v", localAddressPort, err)
  33.         }
  34.         transport, err := raft.NewTCPTransport(localAddressPort, tcpAddr, 2, 10*time.Second, log.StandardLogger().Out)
  35.         if err != nil {
  36.                 return fmt.Errorf("fail to create tcp transport, localAddressPort:%s, tcpAddr:%v, %v",
  37.                         localAddressPort, tcpAddr, err)
  38.         }
  39.         snapshots := raft.NewInmemSnapshotStore()
  40.         logStore := raft.NewInmemStore()
  41.         stableStore := raft.NewInmemStore()
  42.         fm := NewFsm()
  43.         r.raft, err = raft.NewRaft(config, fm, logStore, stableStore, snapshots, transport)
  44.         if err != nil {
  45.                 return fmt.Errorf("create raft error, %v", err)
  46.         }
  47.         var configuration raft.Configuration
  48.         for sID, addr := range r.servers {
  49.                 server := raft.Server{
  50.                         ID:      sID,
  51.                         Address: addr,
  52.                 }
  53.                 configuration.Servers = append(configuration.Servers, server)
  54.         }
  55.         err = r.raft.BootstrapCluster(configuration).Error()
  56.         if err != nil {
  57.                 return fmt.Errorf("raft bootstrap faild, conf:%v, %v", configuration, err)
  58.         }
  59.         log.Infof("bootstrap cluster as config: %v", configuration)
  60.         return nil
  61. }
  62. func (r *raftCluster) checkLeaderState() {
  63.         ticker := time.NewTicker(time.Second)
  64.         for {
  65.                 select {
  66.                 case leader := <-r.raft.LeaderCh():
  67.                         log.Infof("im leader:%v, state:%s, leader address:%s", leader, r.raft.State(), r.raft.Leader())
  68.                 case <-ticker.C:
  69.                         verifyErr := r.raft.VerifyLeader().Error()
  70.                         servers := r.raft.GetConfiguration().Configuration().Servers
  71.                         switch verifyErr {
  72.                         case nil:
  73.                                 log.Infof("im leader, servers:%v", servers)
  74.                         case raft.ErrNotLeader:
  75.                                 // check cluster leader
  76.                                 log.Infof("current state:%v, servers:%+v, leader address:%v, last contact:%v",
  77.                                         r.raft.State(), servers, r.raft.Leader(), r.raft.LastContact())
  78.                         }
  79.                 }
  80.         }
  81. }
  82. func main() {
  83.         var (
  84.                 clusters = flag.String("cluster", "",
  85.                         "cluster node address, fmt: ID,IP,Port;ID,IP,Port")
  86.                 clusterId = flag.String("id", "", "cluster id")
  87.         )
  88.         flag.Parse()
  89.         if *clusterId == "" {
  90.                 log.Infof("cluster id messing")
  91.                 os.Exit(1)
  92.         }
  93.         servers := make(map[raft.ServerID]raft.ServerAddress)
  94.         for _, cluster := range strings.Split(*clusters, ";") {
  95.                 info := strings.Split(cluster, ",")
  96.                 var (
  97.                         nid   string
  98.                         nip   net.IP
  99.                         nport int
  100.                         err   error
  101.                 )
  102.                 switch {
  103.                 case len(info) == 3:
  104.                         nid = info[0]
  105.                         nip = net.ParseIP(info[1])
  106.                         if nip == nil {
  107.                                 log.Infof("cluster %s ip %s parse failed", cluster, info[1])
  108.                                 os.Exit(1)
  109.                         }
  110.                         nport, err = strconv.Atoi(info[2])
  111.                         if err != nil {
  112.                                 log.Infof("cluster %s port %s parse failed, %v", cluster, info[2], err)
  113.                         }
  114.                 default:
  115.                         log.Infof("cluster args value is bad format")
  116.                         os.Exit(1)
  117.                 }
  118.                 log.Infof("cluster node id:%s, ip:%v, port:%d", nid, nip, nport)
  119.                 addr := net.TCPAddr{IP: nip, Port: nport}
  120.                 servers[raft.ServerID(nid)] = raft.ServerAddress(addr.String())
  121.         }
  122.         r := raftCluster{
  123.                 localRaftID: raft.ServerID(*clusterId),
  124.                 servers:     servers,
  125.         }
  126.         err := r.Start()
  127.         if err != nil {
  128.                 log.Infof("rafter cluster start failed, %v", err)
  129.                 os.Exit(1)
  130.         }
  131.         r.checkLeaderState()
  132. }
  133. // SimpleFsm: 实现一个简单的Fsm
  134. type SimpleFsm struct {
  135.         db database
  136. }
  137. func NewFsm() *SimpleFsm {
  138.         fsm := &SimpleFsm{
  139.                 db: NewDatabase(),
  140.         }
  141.         return fsm
  142. }
  143. func (f *SimpleFsm) Apply(l *raft.Log) interface{} {
  144.         return nil
  145. }
  146. func (f *SimpleFsm) Snapshot() (raft.FSMSnapshot, error) {
  147.         return &f.db, nil
  148. }
  149. func (f *SimpleFsm) Restore(io.ReadCloser) error {
  150.         return nil
  151. }
  152. type database struct{}
  153. func NewDatabase() database {
  154.         return database{}
  155. }
  156. func (d *database) Get(key string) string {
  157.         return "not implemented"
  158. }
  159. func (d *database) Set(key, value string) {}
  160. func (d *database) Persist(sink raft.SnapshotSink) error {
  161.         _, _ = sink.Write([]byte{})
  162.         _ = sink.Close()
  163.         return nil
  164. }
  165. func (d *database) Release() {}
复制代码
问题排除

重新编译运行后,我们看到node3始终保持在Cluster2中,并且可以看到如下日志
  1. sudo ifconfig lo0 alias 127.0.0.2 up
  2. sudo ifconfig lo0 alias 127.0.0.3 up
  3. sudo ifconfig lo0 alias 127.0.0.4 up
  4. sudo ifconfig lo0 alias 127.0.0.5 up
复制代码
在Cluster1的leader日志中,我们可以看到该leader向node3发送心跳失败的日志:
  1. [WARN]  raft: rejecting appendEntries request since node is not in configuration: from=c1-node1
复制代码
提醒

注意,这个修改方法还没有得到官方的认可,可能会有其他潜在的影响,使用之前应该自我评估。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册