k8s CNI 之 AWS VPC-CNI 插件源码解析

k8s CNI 之 AWS VPC-CNI 插件源码解析诚然,K8S网络模型Flannel解决了容器间网络互通的问题,但对于如何解决集群内容器和集群外的虚拟机或者物理机直接互通的问题却无能为力。 其实,更确切说

前言

之前的文章分析过flannel网络模型,诚然,Flannel解决了k8s集群中容器间网络互通的问题,但对于如何解决集群内容器与集群外的虚拟机或者物理机直接互通的问题却无能为力。

其实,更确切说法是集群外服务无法直接ping通集群内容器ip。那么就意味着,在类似dubbo这种微服务发现和注册场景中,在网络层,k8s集群外的consumer是无法直接连通集群内的provider的

可能有人不禁要问,flannel为什么对于这种场景无能为力?

这是因为,k8s集群中容器的ip是由flanneld”另起炉灶”独立生成的,并不在vpc网段的范围内,导致集群外的服务器上的路由表缺失相应的路由条目将数据包转发到容器内。

聪明如你,马上想到”既然如此,那让容器分配的ip在vpc网段内,不就可以了吗?”

恭喜你,答对了!!!

vpc-cni方案整体沿用的正是这样的思路:从VPC网段中分配ip给容器。这样,集群内外就实现了无差别的网络直连互通;另外一个好处是,这种方案由于省却了Flanneld解封装vxlan数据包的步骤,网络性能毋庸置疑上会有显著提升。

在k8s的落地过程中,为了将业务系统平滑迁移到k8s中,尤其是建立在RPC+注册中心的微服务架构上,就必须保持集群内外的直连互通,这种场景下,vpc-cni方案无疑是首选

原理

主要实现逻辑:

Worker节点启动的时候挂载多个虚拟网卡ENI(Elastic Netowrk Interface)

  • 每个ENI都绑定了一个主IP(Primary ip)  和 多个 Secondry ip
  • ipamd(Local IP Address Manager)运行在每个worker 节点上,将所有ENI的所有secondary – ip 加入到本地ip地址池中
  • 当cni接受到创建pod事件请求时,就会通过grpc请求ipamd拿到ip并设置pod网络栈;反之,当接收到删除pod请求时就会通知ipamd释放ip并同时删除pod网络栈

image.png

CNI

遵守k8S CNI网络模型的接口规范,主要实现了cmdAdd cmdDel接口,分别处理pod网络的创建和销毁事件

  • cmdAdd 代码路径: cmd/routed-eni-cni-plugin/cni.go
func cmdAdd(args *skel.CmdArgs) error {
	return add(args, typeswrapper.New(), grpcwrapper.New(), rpcwrapper.New(), driver.New())
}

func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrapper.GRPC, rpcClient rpcwrapper.RPC, driverClient driver.NetworkAPIs) error {

	conf, log, err := LoadNetConf(args.StdinData)
    ...
	// 解析 k8s参数
    var k8sArgs K8sArgs
	if err := cniTypes.LoadArgs(args.Args, &k8sArgs); err != nil {
		log.Errorf("Failed to load k8s config from arg: %v", err)
		return errors.Wrap(err, "add cmd: failed to load k8s config from arg")
	}
    ...
	// 通过grpc发起请求到ipamd server
	conn, err := grpcClient.Dial(ipamdAddress, grpc.WithInsecure())
	...
	c := rpcClient.NewCNIBackendClient(conn)
    
        // 调用ipamd的AddNetwork接口获取ip地址
	r, err := c.AddNetwork(context.Background(),
		&pb.AddNetworkRequest{
			ClientVersion:              version,
			K8S_POD_NAME:               string(k8sArgs.K8S_POD_NAME),
			K8S_POD_NAMESPACE:          string(k8sArgs.K8S_POD_NAMESPACE),
			K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
			Netns:                      args.Netns,
			ContainerID:                args.ContainerID,
			NetworkName:                conf.Name,
			IfName:                     args.IfName,
		})
    ...
	addr := &net.IPNet{
		IP:   net.ParseIP(r.IPv4Addr),
		Mask: net.IPv4Mask(255, 255, 255, 255),
	}
    ...
                // 获取到ip后,调用driver模块配置pod的network namespace
		err = driverClient.SetupNS(hostVethName, args.IfName, args.Netns, addr, int(r.DeviceNumber), r.VPCcidrs, r.UseExternalSNAT, mtu, log)
	}
    ...
	ips := []*current.IPConfig{
		{
			Version: "4",
			Address: *addr,
		},
	}

	result := &current.Result{
		IPs: ips,
	}

	return cniTypes.PrintResult(result, conf.CNIVersion)
}

总结:cni通过grpc请求ipamd服务获取ip,拿到ip后调用driver模块设置pod的网络环境

  • cmdDel

      释放pod ip并清理pod的网络环境

func cmdDel(args *skel.CmdArgs) error {
	return del(args, typeswrapper.New(), grpcwrapper.New(), rpcwrapper.New(), driver.New())
}

func del(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrapper.GRPC, rpcClient rpcwrapper.RPC, driverClient driver.NetworkAPIs) error {

	conf, log, err := LoadNetConf(args.StdinData)
    ...
	var k8sArgs K8sArgs
	if err := cniTypes.LoadArgs(args.Args, &k8sArgs); err != nil {
		log.Errorf("Failed to load k8s config from args: %v", err)
		return errors.Wrap(err, "del cmd: failed to load k8s config from args")
	}
	// 发起grpc请求通知ipamd释放ip
	conn, err := grpcClient.Dial(ipamdAddress, grpc.WithInsecure())
	...
	c := rpcClient.NewCNIBackendClient(conn)

	r, err := c.DelNetwork(context.Background(), &pb.DelNetworkRequest{
		ClientVersion:              version,
		K8S_POD_NAME:               string(k8sArgs.K8S_POD_NAME),
		K8S_POD_NAMESPACE:          string(k8sArgs.K8S_POD_NAMESPACE),
		K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
		NetworkName:                conf.Name,
		ContainerID:                args.ContainerID,
		IfName:                     args.IfName,
		Reason:                     "PodDeleted",
	})
	...
	deletedPodIP := net.ParseIP(r.IPv4Addr)
	if deletedPodIP != nil {
		addr := &net.IPNet{
			IP:   deletedPodIP,
			Mask: net.IPv4Mask(255, 255, 255, 255),
		}
		...
            // 调用driver模块的TearDownNS接口删除清理pod网络栈
			err = driverClient.TeardownNS(addr, int(r.DeviceNumber), log)
        ...
	return nil
}

driver


该模块主要提供创建和销毁pod网络栈的工具,dirver模块的主函数是SetupNS和TeardownNS

代码路径: cmd/routed-eni-cni-plugin/driver.go

代码逻辑:

image.png

  • SetupNS

  该函数主要功能是配置pod网络栈,包括准备pod的网络环境和策略路由的配置

  在 aws-cni 网络模型中,节点上的每一个ENI都会生成相应的路由表来转发from-pod的流量;通过策略路由方式,让to-pod的流量优先走主路由表,而对于from-pod的流量则走ENI对应的路由表,所以在配置pod网络环境中有配置策略路由的过程

func (os *linuxNetwork) SetupNS(hostVethName string, contVethName string, netnsPath string, addr *net.IPNet, deviceNumber int, vpcCIDRs []string, useExternalSNAT bool, mtu int, log logger.Logger) error {
	log.Debugf("SetupNS: hostVethName=%s, contVethName=%s, netnsPath=%s, deviceNumber=%d, mtu=%d", hostVethName, contVethName, netnsPath, deviceNumber, mtu)
	return setupNS(hostVethName, contVethName, netnsPath, addr, deviceNumber, vpcCIDRs, useExternalSNAT, os.netLink, os.ns, mtu, log, os.procSys)
}


func setupNS(hostVethName string, contVethName string, netnsPath string, addr *net.IPNet, deviceNumber int, vpcCIDRs []string, useExternalSNAT bool, netLink netlinkwrapper.NetLink, ns nswrapper.NS, mtu int, log logger.Logger, procSys procsyswrapper.ProcSys) error {

        // 调用setupVeth函数设置pod网络环境
	hostVeth, err := setupVeth(hostVethName, contVethName, netnsPath, addr, netLink, ns, mtu, procSys, log)
    ...
	addrHostAddr := &net.IPNet{
		IP:   addr.IP,
		Mask: net.CIDRMask(32, 32)}

        // 在节点上的主路由表添加到pod的路由 ip route add $ip dev veth-1 
	route := netlink.Route{
		LinkIndex: hostVeth.Attrs().Index,
		Scope:     netlink.SCOPE_LINK,
		Dst:       addrHostAddr}
   
        // netlink接口封装了linux的 "ip link"、"ip route"、 "ip rule"等命令
	if err := netLink.RouteReplace(&route); err != nil {
		return errors.Wrapf(err, "setupNS: unable to add or replace route entry for %s", route.Dst.IP.String())
	}
    
        // 使用"ip rule"命令添加to-pod策略路由 512: from all to 10.0.97.30 lookup main 
	err = addContainerRule(netLink, true, addr, mainRouteTable)
       ...
    
       // 通过ENI deviceNumber 判断是否primary ENI, 0表示Primary ENI
       // 如果ENI不是 primary ENI,则添加流量从pod出来的策略路由 
       // 1536: from 10.0.97.30 lookup eni-1 
	if deviceNumber > 0 {
		tableNumber := deviceNumber + 1
		err = addContainerRule(netLink, false, addr, tableNumber)
        ...
	}
	return nil
}

最终实现的效果:

# ip rule list
0:	from all lookup local 
512:	from all to 10.0.97.30 lookup main <---------- to Pod's traffic 1025: not from all to 10.0.0.0/16 lookup main 1536: from 10.0.97.30 lookup eni-1 <-------------- from Pod's traffic
  • createVethPairContext

 createVethPairContext 结构体包含了创建vethpair所需参数,run 方法其实是setupVeth函数的具体实现,包含了创建vethpair,启用vethpir、配置pod网关、路由等步骤

func newCreateVethPairContext(contVethName string, hostVethName string, addr *net.IPNet, mtu int) *createVethPairContext {
	return &createVethPairContext{
		contVethName: contVethName,
		hostVethName: hostVethName,
		addr:         addr,
		netLink:      netlinkwrapper.NewNetLink(),
		ip:           ipwrapper.NewIP(),
		mtu:          mtu,
	}
}

func (createVethContext *createVethPairContext) run(hostNS ns.NetNS) error {
	veth := &netlink.Veth{
		LinkAttrs: netlink.LinkAttrs{
			Name:  createVethContext.contVethName,
			Flags: net.FlagUp,
			MTU:   createVethContext.mtu,
		},
		PeerName: createVethContext.hostVethName,
	}
    
        // 执行 ip link add 为pod创建vethpair
	if err := createVethContext.netLink.LinkAdd(veth); err != nil {
		return err
	}

	hostVeth, err := createVethContext.netLink.LinkByName(createVethContext.hostVethName)
	...
        // 执行 ip link set $link up 启用vethpair的主机端
	if err = createVethContext.netLink.LinkSetUp(hostVeth); err != nil {
		return errors.Wrapf(err, "setup NS network: failed to set link %q up", createVethContext.hostVethName)
	}

	contVeth, err := createVethContext.netLink.LinkByName(createVethContext.contVethName)
	if err != nil {
		return errors.Wrapf(err, "setup NS network: failed to find link %q", createVethContext.contVethName)
	}

	// 启用pod端的vethpair
	if err = createVethContext.netLink.LinkSetUp(contVeth); err != nil {
		return errors.Wrapf(err, "setup NS network: failed to set link %q up", createVethContext.contVethName)
	}

        // 添加默认网关169.254.1.1 route add default gw addr
	if err = createVethContext.netLink.RouteReplace(&netlink.Route{
		LinkIndex: contVeth.Attrs().Index,
		Scope:     netlink.SCOPE_LINK,
		Dst:       gwNet}); err != nil {
		return errors.Wrap(err, "setup NS network: failed to add default gateway")
	}

        // 添加默认路由 效果 default via 169.254.1.1 dev eth0
	if err = createVethContext.ip.AddDefaultRoute(gwNet.IP, contVeth); err != nil {
		return errors.Wrap(err, "setup NS network: failed to add default route")
	}
    
        // 给网卡eth0添加ip地址 "ip addr add $ip dev eth0"
	if err = createVethContext.netLink.AddrAdd(contVeth, &netlink.Addr{IPNet: createVethContext.addr}); err != nil {
		return errors.Wrapf(err, "setup NS network: failed to add IP addr to %q", createVethContext.contVethName)
	}

	// 为默认网关添加arp静态条目
	neigh := &netlink.Neigh{
		LinkIndex:    contVeth.Attrs().Index,
		State:        netlink.NUD_PERMANENT,
		IP:           gwNet.IP,
		HardwareAddr: hostVeth.Attrs().HardwareAddr,
	}

	if err = createVethContext.netLink.NeighAdd(neigh); err != nil {
		return errors.Wrap(err, "setup NS network: failed to add static ARP")
	}
    
        // 将vethpair 的一端移动到主机侧 network namespace 
	if err = createVethContext.netLink.LinkSetNsFd(hostVeth, int(hostNS.Fd())); err != nil {
		return errors.Wrap(err, "setup NS network: failed to move veth to host netns")
	}
	return nil
}
  • TeardownNS

     清理pod网络环境

func (os *linuxNetwork) TeardownNS(addr *net.IPNet, deviceNumber int, log logger.Logger) error {
	log.Debugf("TeardownNS: addr %s, deviceNumber %d", addr.String(), deviceNumber)
	return tearDownNS(addr, deviceNumber, os.netLink, log)
}

func tearDownNS(addr *net.IPNet, deviceNumber int, netLink netlinkwrapper.NetLink, log logger.Logger) error {
   ...
	// 删除to-pod方向的策略路由 执行 "ip rule del"
	toContainerRule := netLink.NewRule()
	toContainerRule.Dst = addr
	toContainerRule.Priority = toContainerRulePriority
	err := netLink.RuleDel(toContainerRule)
     ...
     // 判断ENI是否为Primary ENI,如果是非Primary,则同时删除from-pod的策略路由
	if deviceNumber > 0 {
		err := deleteRuleListBySrc(*addr)
      ...
	}
	addrHostAddr := &net.IPNet{
		IP:   addr.IP,
		Mask: net.CIDRMask(32, 32)}
         ...
	return nil
}

IPAMD

IPAMD是本地ip地址池管理进程,以daemonset的方式运行在每个worker节点上,维护着节点上所有可用ip地址;那么,问题来了,ip地址池中的数据是从哪里来的呢?

其实,aws ec2中有一个 ec2metadata 的概念,保存着关于该实例的元数据信息,包括绑定到ec2的所有ENI,以及ENI上的所有ip,并提供接口获取:

curl  http://169.254.169.254/latest/meta-data/network/interfaces/macs/

curl  http://169.254.169.254/latest/meta-data/network/interfaces/macs/0a:da:9d:51:47:28/local-ipv4s

ipamd正是在初始化的过程中通过该接口将ENI/IP信息保存在dataStore中,以上过程是在nodeInit方法中实现的

nodeInit

func (c *IPAMContext) nodeInit() error {
        ...
        // 请求ec2元数据接口,获取所有的ENI数据
	metadataResult, err := c.awsClient.DescribeAllENIs()
	...
	enis := c.filterUnmanagedENIs(metadataResult.ENIMetadata)
         ....
		// 添加ENI信息
		retry := 0
		for {
			retry++
			if err = c.setupENI(eni.ENIID, eni, isTrunkENI, isEFAENI); err == nil {
				log.Infof("ENI %s set up.", eni.ENIID)
				break
			}
                 ...
	return nil
}
  • setupENI

   setupENI的主要任务是完成dataStore数据初始化,包括:

  • 将ENI 添加到 datastore中
  • 启用与eni相关的 vethpair
  • 将ENI 的所有secondary IP 添加datastore中
func (c *IPAMContext) setupENI(eni string, eniMetadata awsutils.ENIMetadata, isTrunkENI, isEFAENI bool) error {
	primaryENI := c.awsClient.GetPrimaryENI()
    
	err := c.dataStore.AddENI(eni, eniMetadata.DeviceNumber, eni == primaryENI, isTrunkENI, isEFAENI)
	...
	c.primaryIP[eni] = eniMetadata.PrimaryIPv4Address()

	if eni != primaryENI {
		err = c.networkClient.SetupENINetwork(c.primaryIP[eni], eniMetadata.MAC, eniMetadata.DeviceNumber, eniMetadata.SubnetIPv4CIDR)
        ...
	}
    ...
	c.addENIsecondaryIPsToDataStore(eniMetadata.IPv4Addresses, eni)
	c.addENIprefixesToDataStore(eniMetadata.IPv4Prefixes, eni)

	return nil
}

dataStore

dataStore 是一个通过结构体构造的本地DB,维护着本地节点ENI信息,以及ENI上绑定的所有ip,每条ip信息都以ipamkey作为主键;当ip被分配,则会以(network name, CNI_CONTAINERID, CNI_IFNAME)作为主键值;反之,ip没有被分配,ipamkey会被设置为空值

代码路径 /pkg/ipamd/datastore/data_store.go

type DataStore struct {
	total                    int 
	assigned                 int  
	allocatedPrefix          int
	eniPool                  ENIPool 
	lock                     sync.Mutex
	log                      logger.Logger
	CheckpointMigrationPhase int 
	backingStore             Checkpointer
	cri                      cri.APIs
	isPDEnabled              bool
}

type ENI struct {
	ID         string
	createTime time.Time
	IsPrimary bool
	IsTrunk bool
	IsEFA bool
	DeviceNumber int
	AvailableIPv4Cidrs map[string]*CidrInfo
}

type AddressInfo struct {
	IPAMKey        IPAMKey
	Address        string
	UnassignedTime time.Time
}

type CidrInfo struct {
	Cidr net.IPNet    // 192.168.1.1/24
	IPv4Addresses map[string]*AddressInfo
	IsPrefix bool
}

type ENIPool map[string]*ENI   //['eniid]eni

datastore包含两个主要的方法 AssignPodIPv4Address和UnAssignPodIPv4Address cni本质上是直接调用这两个方法来分别获取ip和释放ip

  • AssignPodIPv4Address
// 将ip分配给pod
func (ds *DataStore) AssignPodIPv4Address(ipamKey IPAMKey) (ipv4address string, deviceNumber int, err error) {
   // 对 dataStore操作加互斥锁
	ds.lock.Lock()
	defer ds.lock.Unlock()
      ...
      // 遍历dataStore的eniPool拿到ip
      for _, eni := range ds.eniPool {
		for _, availableCidr := range eni.AvailableIPv4Cidrs {
			var addr *AddressInfo
			var strPrivateIPv4 string
			var err error

			if (ds.isPDEnabled && availableCidr.IsPrefix) || (!ds.isPDEnabled && !availableCidr.IsPrefix) {
				strPrivateIPv4, err = ds.getFreeIPv4AddrfromCidr(availableCidr)
				if err != nil {
					ds.log.Debugf("Unable to get IP address from CIDR: %v", err)
					//Check in next CIDR
					continue
				}
				...

			addr = availableCidr.IPv4Addresses[strPrivateIPv4]
		        ...
			availableCidr.IPv4Addresses[strPrivateIPv4] = addr
                        // 对于已分配的ip,设置其ipamkey
			ds.assignPodIPv4AddressUnsafe(ipamKey, eni, addr)
                         ...
			return addr.Address, eni.DeviceNumber, nil
		}
	}
    ...
}
  • UnAssignPodIPv4Address
// 释放ip地址
func (ds *DataStore) UnassignPodIPv4Address(ipamKey IPAMKey) (e *ENI, ip string, deviceNumber int, err error) {

    ...
        // 通过主键ipamKey 在enipool中找对对应的pod ip地址
	eni, availableCidr, addr := ds.eniPool.FindAddressForSandbox(ipamKey)
    ...
        // 调用unassignPodIPv4AddressUnsafe 设置ip为未分配状态,即将IP地址对应的主键ipamkey设置为空
	ds.unassignPodIPv4AddressUnsafe(addr)
	...
        // 设置ip释放时间为当前时间
	addr.UnassignedTime = time.Now()
    ...
	return eni, addr.Address, eni.DeviceNumber, nil
}

文章均为原创,关注公众号
云猿生 \color{green} {云猿生}
获取更多知识

image.png

今天的文章k8s CNI 之 AWS VPC-CNI 插件源码解析分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/14359.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注