问题场景描述
我通过模块github.com/hashicorp/raft使用golang实现了一个raft集群功能,发现如下场景中会遇到一个问题:
测试启动如下2个raft集群,集群名称,和集群node与IP地址如下,raft集群均通过BootstrapCluster方法初始化:
Cluster1 BootstrapCluster servers:- - node1: {raft.ServerID: c1-node1, raft.ServerAddress: 192.168.100.1:7000}
- - node2: {raft.ServerID: c1-node2, raft.ServerAddress: 192.168.100.2:7000}
- - node3: {raft.ServerID: c1-node3, raft.ServerAddress: 192.168.100.3:7000}
复制代码 Cluster2 BootstrapCluster servers:- - node3: {raft.ServerID: c2-node3, raft.ServerAddress: 192.168.100.3:7000}
- - node4: {raft.ServerID: c2-node4, raft.ServerAddress: 192.168.100.4:7000}
- - node5: {raft.ServerID: c2-node5, raft.ServerAddress: 192.168.100.5:7000}
复制代码 其中,"node3"的地址会存在2个集群中。
- "node1","node2"按照"Cluster1"启动:
sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node1
sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node2
- "node3","node4","node5"先按照"Cluster2"启动:
sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node3
sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node4
sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node5
然后就会发现"node3"会在"Cluster1"和"Cluster2"之间来回切换,一会属于"Cluster1",一会属于"Cluster2".- INFO[0170] current state:Follower, leader address:127.0.0.5:800, servers:[{Suffrage:Voter ID:c2-node3 Address:127.0.0.3:800} {Suffrage:Voter ID:c2-node4 Address:127.0.0.4:800} {Suffrage:Voter ID:c2-node5 Address:127.0.0.5:800}], last contact:2025-05-14 15:35:53.330867 +0800 CST m=+169.779019126
- INFO[0171] current state:Follower, leader address:127.0.0.1:800, servers:[{Suffrage:Voter ID:c2-node3 Address:127.0.0.3:800} {Suffrage:Voter ID:c2-node4 Address:127.0.0.4:800} {Suffrage:Voter ID:c2-node5 Address:127.0.0.5:800}], last contact:2025-05-14 15:35:54.308388 +0800 CST m=+170.756576126
复制代码 我的代码如下
- package main
- import (
- "flag"
- "fmt"
- "io"
- "net"
- "os"
- "strconv"
- "strings"
- "time"
- "github.com/hashicorp/raft"
- log "github.com/sirupsen/logrus"
- )
- type raftCluster struct {
- localRaftID raft.ServerID
- servers map[raft.ServerID]raft.ServerAddress // raftID : raftAddressPort
- raft *raft.Raft
- electionTimeout time.Duration
- }
- func (r *raftCluster) Start() error {
- config := raft.DefaultConfig()
- config.HeartbeatTimeout = 2000 * time.Millisecond
- config.ElectionTimeout = 5000 * time.Millisecond
- config.CommitTimeout = 2000 * time.Millisecond
- config.LeaderLeaseTimeout = 2000 * time.Millisecond
- config.LocalID = r.localRaftID
- config.LogOutput = log.StandardLogger().Out
- r.electionTimeout = config.ElectionTimeout * time.Duration(len(r.servers)*2)
- localAddressPort := string(r.servers[r.localRaftID])
- tcpAddr, err := net.ResolveTCPAddr("tcp", localAddressPort)
- if err != nil {
- return fmt.Errorf("resolve tcp address %s, %v", localAddressPort, err)
- }
- transport, err := raft.NewTCPTransport(localAddressPort, tcpAddr, 2, 10*time.Second, log.StandardLogger().Out)
- if err != nil {
- return fmt.Errorf("fail to create tcp transport, localAddressPort:%s, tcpAddr:%v, %v",
- localAddressPort, tcpAddr, err)
- }
- snapshots := raft.NewInmemSnapshotStore()
- logStore := raft.NewInmemStore()
- stableStore := raft.NewInmemStore()
- fm := NewFsm()
- r.raft, err = raft.NewRaft(config, fm, logStore, stableStore, snapshots, transport)
- if err != nil {
- return fmt.Errorf("create raft error, %v", err)
- }
- var configuration raft.Configuration
- for sID, addr := range r.servers {
- server := raft.Server{
- ID: sID,
- Address: addr,
- }
- configuration.Servers = append(configuration.Servers, server)
- }
- err = r.raft.BootstrapCluster(configuration).Error()
- if err != nil {
- return fmt.Errorf("raft bootstrap faild, conf:%v, %v", configuration, err)
- }
- log.Infof("bootstrap cluster as config: %v", configuration)
- return nil
- }
- func (r *raftCluster) checkLeaderState() {
- ticker := time.NewTicker(time.Second)
- for {
- select {
- case leader := <-r.raft.LeaderCh():
- log.Infof("im leader:%v, state:%s, leader address:%s", leader, r.raft.State(), r.raft.Leader())
- case <-ticker.C:
- verifyErr := r.raft.VerifyLeader().Error()
- servers := r.raft.GetConfiguration().Configuration().Servers
- switch verifyErr {
- case nil:
- log.Infof("im leader, servers:%v", servers)
- case raft.ErrNotLeader:
- // check cluster leader
- log.Infof("current state:%v, servers:%+v, leader address:%v, last contact:%v",
- r.raft.State(), servers, r.raft.Leader(), r.raft.LastContact())
- }
- }
- }
- }
- func main() {
- var (
- clusters = flag.String("cluster", "",
- "cluster node address, fmt: ID,IP,Port;ID,IP,Port")
- clusterId = flag.String("id", "", "cluster id")
- )
- flag.Parse()
- if *clusterId == "" {
- log.Infof("cluster id messing")
- os.Exit(1)
- }
- servers := make(map[raft.ServerID]raft.ServerAddress)
- for _, cluster := range strings.Split(*clusters, ";") {
- info := strings.Split(cluster, ",")
- var (
- nid string
- nip net.IP
- nport int
- err error
- )
- switch {
- case len(info) == 3:
- nid = info[0]
- nip = net.ParseIP(info[1])
- if nip == nil {
- log.Infof("cluster %s ip %s parse failed", cluster, info[1])
- os.Exit(1)
- }
- nport, err = strconv.Atoi(info[2])
- if err != nil {
- log.Infof("cluster %s port %s parse failed, %v", cluster, info[2], err)
- }
- default:
- log.Infof("cluster args value is bad format")
- os.Exit(1)
- }
- log.Infof("cluster node id:%s, ip:%v, port:%d", nid, nip, nport)
- addr := net.TCPAddr{IP: nip, Port: nport}
- servers[raft.ServerID(nid)] = raft.ServerAddress(addr.String())
- }
- r := raftCluster{
- localRaftID: raft.ServerID(*clusterId),
- servers: servers,
- }
- err := r.Start()
- if err != nil {
- log.Infof("rafter cluster start failed, %v", err)
- os.Exit(1)
- }
- r.checkLeaderState()
- }
- // SimpleFsm: 实现一个简单的Fsm
- type SimpleFsm struct {
- db database
- }
- func NewFsm() *SimpleFsm {
- fsm := &SimpleFsm{
- db: NewDatabase(),
- }
- return fsm
- }
- func (f *SimpleFsm) Apply(l *raft.Log) interface{} {
- return nil
- }
- func (f *SimpleFsm) Snapshot() (raft.FSMSnapshot, error) {
- return &f.db, nil
- }
- func (f *SimpleFsm) Restore(io.ReadCloser) error {
- return nil
- }
- type database struct{}
- func NewDatabase() database {
- return database{}
- }
- func (d *database) Get(key string) string {
- return "not implemented"
- }
- func (d *database) Set(key, value string) {}
- func (d *database) Persist(sink raft.SnapshotSink) error {
- _, _ = sink.Write([]byte{})
- _ = sink.Close()
- return nil
- }
- func (d *database) Release() {}
复制代码 问题排除
重新编译运行后,我们看到node3始终保持在Cluster2中,并且可以看到如下日志- sudo ifconfig lo0 alias 127.0.0.2 up
- sudo ifconfig lo0 alias 127.0.0.3 up
- sudo ifconfig lo0 alias 127.0.0.4 up
- sudo ifconfig lo0 alias 127.0.0.5 up
复制代码 在Cluster1的leader日志中,我们可以看到该leader向node3发送心跳失败的日志:- [WARN] raft: rejecting appendEntries request since node is not in configuration: from=c1-node1
复制代码 提醒
注意,这个修改方法还没有得到官方的认可,可能会有其他潜在的影响,使用之前应该自我评估。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |