raft算法练习-UDP通讯进行分布式选举
前言
“raft算法练习-模拟三节点的分布式选举” 是基于rpc通讯的,这篇文章将通讯协议改为udp进行实现;
UDP
-
UDP的优点:
快,比TCP稍安全
UDP没有TCP的握手、确认、窗口、重传、拥塞控制等机制,UDP是一个无状态的传输协议,所以它在传递数据时非常快。没有TCP的这些机制,UDP较TCP被攻击者利用的漏洞就要少一些。但UDP也是无法避免攻击的,比如:UDP Flood攻击…… -
UDP的缺点:
不可靠,不稳定
因为UDP没有TCP那些可靠的机制,在数据传递时,如果网络质量不好,就会很容易丢包。 -
关于UDP的百度百科介绍:
https://baike.baidu.com/item/UDP
实现
package main
import (
"log"
"math/rand"
"net"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
/* raft算法 - leader 选举 提升了可用性, 当其中某个服务挂掉也能运行 注意: 当有服务>= 2才有效 注意: 当前运行服务 >= n/2 */
var rafter *Raft
var cluster = []string{
"192.168.0.9:12221", "192.168.0.9:12222", "192.168.0.9:12223"}
var electionTime = 5 // follower -> candidate 的最大时间, 单位秒
var heartBeatTime = 1 // leader 发送心跳周期
// udp 实现
func main(){
rafter = &Raft{
state: 0, // 初始化 follower状态
myAddr: "192.168.0.9:12221",
heartBeat: make(chan bool),
beElection: make(chan bool),
electCh: make(chan bool),
votedFor: "",
vote: 0,
randEr: rand.New(rand.NewSource(time.Now().UnixNano())),
leader: "",
}
listener, err := net.ListenUDP("udp", GetUdpAddr(rafter.myAddr))
if err != nil {
panic(err)
}
rafter.Conn = listener
go read()
go join()
// 启动选举
go election()
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
select {
case s := <-ch:
// TODO 通知退出
log.Println("通知退出....")
if i, ok := s.(syscall.Signal); ok {
os.Exit(int(i))
} else {
os.Exit(0)
}
}
}
func read() {
for {
data := make([]byte, 1024)
n, remoteAddr, err := rafter.Conn.ReadFromUDP(data)
if err != nil {
log.Printf("error during read: %s", err)
continue
}
log.Printf("receive %s from <%s> leader = %s \n", data[:n], remoteAddr, rafter.leader)
cmd := string(data[:n])
if cmd == "拉票" {
log.Println("remoteAddr = ", remoteAddr, "要你给他投票", "我的状态 = ", rafter.state, rafter.votedFor )
// 已经我是leader, 通知你结束竞选
if rafter.state == 3 {
_= send([]byte("结束竞选"), remoteAddr)
}
// 一个节点某一任期内最多只能投一票
// follower 才能投票
if rafter.votedFor == "" && rafter.state == 0 {
log.Println("是否有投票 = ", rafter.votedFor)
rafter.votedFor = remoteAddr.String()
_= send([]byte("给你投票"), remoteAddr)
}
}
if cmd == "结束竞选" {
rafter.beElection <- false
}
if cmd == "给你投票" {
rafter.electCh <- true
}
if strings.Index(cmd, "leader") != -1{
log.Println("有新的leader = ", cmd)
rafter.state = 0 // 有新的leader,当前节点就是follower状态
}
if cmd == "心跳" {
rafter.leader = remoteAddr.String() // 来自 leader 的心跳
rafter.heartBeat <- true
}
}
}
func send(data []byte, to *net.UDPAddr) error {
_, err := rafter.Conn.WriteToUDP(data, to)
return err
}
func sendHeartBeat() {
log.Println("广播心跳")
for _, v := range ClusterTable {
send([]byte("心跳"), v)
}
}
func join() {
ClusterTable = make(map[string]*net.UDPAddr, len(cluster))
for _,v := range cluster{
ClusterTableLock.Lock()
ClusterTable[v] = GetUdpAddr(v)
ClusterTableLock.Unlock()
}
}
func GetUdpAddr(str string) *net.UDPAddr {
strList := strings.Split(str, ":")
return &net.UDPAddr{
IP: net.ParseIP(strList[0]), Port: Str2Int(strList[1])}
}
//声明raft节点类型
type Raft struct {
Conn *net.UDPConn
myAddr string
// 锁
mu sync.Mutex
// 节点编号
me int
// 当前任期
leader string
// 为哪个节点投票
votedFor string
// 当前节点状态
// 0 follower 1 candidate 2 leader
state int
// 发送最后一条消息的时间
lastMessageTime int64
// 当前节点的领导
currentLeader int
// 当前选票
vote int
// 心跳信号
heartBeat chan bool
// 成功当选信号
beElection chan bool
// 投票信号
electCh chan bool
// 随机
randEr *rand.Rand
消息通道
//message chan bool
选举通道
//electCh chan bool
返回心跳信号
//hearbeatRe chan bool
超时时间
//timeout int
}
func (rf *Raft) getState() int {
return rf.state
}
// 集群表
var (
ClusterTable map[string]*net.UDPAddr
ClusterTableLock sync.Mutex
)
// follower -> candidate
func election(){
// 随机秒后将变为竞选者, 必须大于心跳时间, 如当前心跳为1, 则+2
for {
t := rafter.randEr.Intn(electionTime)+heartBeatTime+1
tTime := time.Duration(t)
log.Println("随机值t = ", tTime)
timer := time.NewTimer(tTime * time.Second)
select {
case t := <-timer.C:
// 2秒内收不到来自 leader 的心跳
rafter.votedFor = ""
rafter.state = 1
rafter.leader = ""
rafter.currentLeader = 0
rafter.vote = 0
log.Println("开始拉票 : ", t)
canvass()
case <- rafter.heartBeat:
// 重置
log.Println("重置 = ", tTime)
timer.Reset(tTime*time.Second)
case <- rafter.electCh:
log.Println("获得选票")
rafter.vote++
log.Println(rafter.vote+1, ",", len(ClusterTable)/2)
// 这里做了一下改动: +1 的目的是投自己一票, >=一半服务投票就竞选成功
if rafter.vote+1 >= len(ClusterTable)/2 {
// 结束竞选
timer.Stop()
electionEnd(true)
}
case be := <- rafter.beElection:
// 结束竞选
timer.Stop()
electionEnd(be)
}
// 等待下次竞选
rafter.state = 0
}
}
func (rf *Raft) depiao(){
rf.electCh <- true
}
func electionEnd(be bool){
// 被告知别人已当选,那么自行切换到follower
if !be {
log.Println("竞选失败, 别人已当选")
rafter.state = 0
rafter.vote = 0 // 重置票数
return
}
rafter.state = 3
// 给所有节点宣布
for _, v := range ClusterTable {
send([]byte("leader:"+rafter.myAddr), v)
}
// 发送心跳
sendHeartBeat()
for {
// 心跳时间要小于变为竞选者的时间周期
timer := time.NewTimer(time.Duration(heartBeatTime) * time.Second)
select {
case <-timer.C:
sendHeartBeat()
}
}
}
// 选举拉票,竞选 leader
// candidate -> leader
func canvass(){
succeed := true
// 给其他节点发送投票请求
for k, v := range ClusterTable {
if rafter.myAddr == k {
continue
}
err := send([]byte("拉票"), v)
log.Println("向",k,"进行拉票, err = ", err)
if err != nil {
succeed = false
break
}
}
if !succeed {
log.Println("拉票失败,重新拉票")
canvass()
}
}
// Str2Int64 string -> int
func Str2Int(str string) int {
if i, err := strconv.Atoi(str); err == nil {
return i
}
return 0
}
结果
分别启动, 三个服务, 服务端口分别为 12221, 12222, 12223
本文由ManGe原创,转载请附上原创地址
今天的文章raft算法练习-UDP通讯进行分布式选举「建议收藏」分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/68365.html