raft算法练习-UDP通讯进行分布式选举「建议收藏」

raft算法练习-UDP通讯进行分布式选举「建议收藏」上一篇文章“raft算法练习-模拟三节点的分布式选举”(https://mange.work/blog?id=53)是基于rpc通讯的,这篇文章将通讯协议改为udp进行实现;UDP的优点:快,比TCP稍安全UDP没有T

raft算法练习-UDP通讯进行分布式选举

前言

“raft算法练习-模拟三节点的分布式选举” 是基于rpc通讯的,这篇文章将通讯协议改为udp进行实现;

UDP

  • UDP的优点:
    快,比TCP稍安全
    UDP没有TCP的握手、确认、窗口、重传、拥塞控制等机制,UDP是一个无状态的传输协议,所以它在传递数据时非常快。没有TCP的这些机制,UDP较TCP被攻击者利用的漏洞就要少一些。但UDP也是无法避免攻击的,比如:UDP Flood攻击……

  • UDP的缺点:
    不可靠,不稳定
    因为UDP没有TCP那些可靠的机制,在数据传递时,如果网络质量不好,就会很容易丢包。

  • 关于UDP的百度百科介绍:
    https://baike.baidu.com/item/UDP

实现

package main

import (
	"log"
	"math/rand"
	"net"
	"os"
	"os/signal"
	"strconv"
	"strings"
	"sync"
	"syscall"
	"time"
)

/* raft算法 - leader 选举 提升了可用性, 当其中某个服务挂掉也能运行 注意: 当有服务>= 2才有效 注意: 当前运行服务 >= n/2 */


var rafter *Raft
var cluster = []string{ 
   "192.168.0.9:12221", "192.168.0.9:12222", "192.168.0.9:12223"}
var electionTime = 5 // follower -> candidate 的最大时间, 单位秒
var heartBeatTime = 1 // leader 发送心跳周期

// udp 实现
func main(){ 
   

	rafter = &Raft{ 
   
		state: 0, // 初始化 follower状态
		myAddr: "192.168.0.9:12221",
		heartBeat: make(chan bool),
		beElection: make(chan bool),
		electCh: make(chan bool),
		votedFor: "",
		vote: 0,
		randEr: rand.New(rand.NewSource(time.Now().UnixNano())),
		leader: "",
	}

	listener, err := net.ListenUDP("udp", GetUdpAddr(rafter.myAddr))
	if err != nil { 
   
		panic(err)
	}

	rafter.Conn =  listener

	go read()

	go join()

	// 启动选举
	go election()

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
	select { 
   
		case s := <-ch:
			// TODO 通知退出
			log.Println("通知退出....")
			if i, ok := s.(syscall.Signal); ok { 
   
				os.Exit(int(i))
			} else { 
   
				os.Exit(0)
			}
	}

}

func read() { 
   

	for { 
   
		data := make([]byte, 1024)
		n, remoteAddr, err := rafter.Conn.ReadFromUDP(data)
		if err != nil { 
   
			log.Printf("error during read: %s", err)
			continue
		}
		log.Printf("receive %s from <%s> leader = %s \n", data[:n], remoteAddr, rafter.leader)
		cmd := string(data[:n])

		if cmd == "拉票" { 
   
			log.Println("remoteAddr = ", remoteAddr, "要你给他投票", "我的状态 = ", rafter.state, rafter.votedFor )

			// 已经我是leader, 通知你结束竞选
			if rafter.state == 3 { 
   
				_= send([]byte("结束竞选"), remoteAddr)
			}

			// 一个节点某一任期内最多只能投一票
			// follower 才能投票
			if rafter.votedFor == "" && rafter.state == 0 { 
   
				log.Println("是否有投票 = ", rafter.votedFor)
				rafter.votedFor = remoteAddr.String()

				_= send([]byte("给你投票"), remoteAddr)
			}

		}

		if cmd == "结束竞选" { 
   
			rafter.beElection <- false
		}

		if cmd == "给你投票" { 
   
			rafter.electCh <- true
		}

		if strings.Index(cmd, "leader") != -1{ 
   
			log.Println("有新的leader = ", cmd)
			rafter.state = 0 // 有新的leader,当前节点就是follower状态
		}

		if cmd == "心跳" { 
   
			rafter.leader = remoteAddr.String() // 来自 leader 的心跳
			rafter.heartBeat <- true
		}
	}

}

func send(data []byte, to *net.UDPAddr) error { 
   
	_, err := rafter.Conn.WriteToUDP(data, to)
	return err
}

func sendHeartBeat() { 
   
	log.Println("广播心跳")
	for _, v := range ClusterTable { 
   
		send([]byte("心跳"), v)
	}
}

func join() { 
   
	ClusterTable = make(map[string]*net.UDPAddr, len(cluster))
	for _,v := range cluster{ 
   
		ClusterTableLock.Lock()
		ClusterTable[v] = GetUdpAddr(v)
		ClusterTableLock.Unlock()
	}
}

func GetUdpAddr(str string) *net.UDPAddr { 
   
	strList := strings.Split(str, ":")
	return &net.UDPAddr{ 
   IP: net.ParseIP(strList[0]), Port: Str2Int(strList[1])}
}

//声明raft节点类型
type Raft struct { 
   
	Conn *net.UDPConn

	myAddr string

	// 锁
	mu sync.Mutex
	// 节点编号
	me int
	// 当前任期
	leader string

	// 为哪个节点投票
	votedFor string
	// 当前节点状态
	// 0 follower 1 candidate 2 leader
	state int
	// 发送最后一条消息的时间
	lastMessageTime int64
	// 当前节点的领导
	currentLeader int

	// 当前选票
	vote int

	// 心跳信号
	heartBeat chan bool

	// 成功当选信号
	beElection chan bool

	// 投票信号
	electCh chan bool

	// 随机
	randEr *rand.Rand

	消息通道
	//message chan bool
	选举通道
	//electCh chan bool
	返回心跳信号
	//hearbeatRe chan bool
	超时时间
	//timeout int
}

func (rf *Raft) getState() int { 
   
	return rf.state
}

// 集群表
var (
	ClusterTable map[string]*net.UDPAddr
	ClusterTableLock sync.Mutex
)

// follower -> candidate
func election(){ 
   
	// 随机秒后将变为竞选者, 必须大于心跳时间, 如当前心跳为1, 则+2
	for { 
   
		t := rafter.randEr.Intn(electionTime)+heartBeatTime+1
		tTime := time.Duration(t)
		log.Println("随机值t = ", tTime)
		timer := time.NewTimer(tTime * time.Second)
		select { 
   
			case t := <-timer.C:
				// 2秒内收不到来自 leader 的心跳
				rafter.votedFor = ""
				rafter.state = 1
				rafter.leader = ""
				rafter.currentLeader = 0
				rafter.vote = 0

				log.Println("开始拉票 : ", t)
				canvass()

			case  <- rafter.heartBeat:
				// 重置
				log.Println("重置 = ", tTime)
				timer.Reset(tTime*time.Second)

			case <- rafter.electCh:
				log.Println("获得选票")
				rafter.vote++
				log.Println(rafter.vote+1, ",", len(ClusterTable)/2)
				// 这里做了一下改动: +1 的目的是投自己一票, >=一半服务投票就竞选成功
				if rafter.vote+1 >= len(ClusterTable)/2 { 
   
					// 结束竞选
					timer.Stop()
					electionEnd(true)
				}

			case be := <- rafter.beElection:
				// 结束竞选
				timer.Stop()
				electionEnd(be)

		}

		// 等待下次竞选
		rafter.state = 0
	}
}

func (rf *Raft) depiao(){ 
   
	rf.electCh <- true
}

func electionEnd(be bool){ 
   
	// 被告知别人已当选,那么自行切换到follower
	if !be { 
   
		log.Println("竞选失败, 别人已当选")
		rafter.state = 0
		rafter.vote = 0 // 重置票数
		return
	}

	rafter.state = 3
	// 给所有节点宣布
	for _, v := range ClusterTable { 
   
		send([]byte("leader:"+rafter.myAddr), v)
	}

	// 发送心跳
	sendHeartBeat()
	for { 
   
		// 心跳时间要小于变为竞选者的时间周期
		timer := time.NewTimer(time.Duration(heartBeatTime) * time.Second)
		select { 
   
		case <-timer.C:
			sendHeartBeat()
		}
	}

}

// 选举拉票,竞选 leader
// candidate -> leader
func canvass(){ 
   
	succeed := true
	// 给其他节点发送投票请求
	for k, v := range ClusterTable { 
   
		if rafter.myAddr == k { 
   
			continue
		}
		err := send([]byte("拉票"), v)
		log.Println("向",k,"进行拉票, err = ", err)
		if err != nil { 
   
			succeed = false
			break
		}
	}
	if !succeed { 
   
		log.Println("拉票失败,重新拉票")
		canvass()
	}
}



// Str2Int64 string -> int
func Str2Int(str string) int { 
   
	if i, err := strconv.Atoi(str); err == nil { 
   
		return i
	}
	return 0
}

结果

分别启动, 三个服务, 服务端口分别为 12221, 12222, 12223

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

本文由ManGe原创,转载请附上原创地址

今天的文章raft算法练习-UDP通讯进行分布式选举「建议收藏」分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/68365.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注