accept队列_workerman 消息队列[通俗易懂]

accept队列_workerman 消息队列[通俗易懂]tcegress队列实现分析_egressqueue

egress队列主要作用于报文出方向,分为两类:无类队列和分类队列。下面分析下源码。

无类别队列

accept队列_workerman 消息队列[通俗易懂]

image.png

添加下面这条qdisc时,kernel端代码流程
tc qdisc add dev eth0 root tbf rate 1024kbit limit 1024kbit burst 1024

//linux/net/sched/sck_api.c
static int tc_modify_qdisc(struct sk_buff *skb, struct nlmsghdr *n)
    struct net_device *dev;
    struct Qdisc *q;
    struct tcmsg *tcm;
    struct nlattr *tca[TCA_MAX + 1];

    //解析出配置的参数,保存到tca中
    nlmsg_parse(n, sizeof(*tcm), tca, TCA_MAX, NULL);
    tcm = nlmsg_data(n);
    clid = tcm->tcm_parent;
    dev = __dev_get_by_index(net, tcm->tcm_ifindex);

    q = dev->qdisc;
    /* It may be default qdisc, ignore it */
    //默认的qdisc,handle为0
    if (q && q->handle == 0)
        q = NULL;
    struct netdev_queue *dev_queue;
    //a. 取出一个 tx queue来保存新创建的qdisc
    dev_queue = netdev_get_tx_queue(dev, 0);
    q = qdisc_create(dev, dev_queue, p,
                 tcm->tcm_parent, tcm->tcm_handle,
                 tca, &err);
    
    //b. 将新创建的qdisc赋给ingress queue
    qdisc_graft(dev, p, skb, n, clid, q, NULL);

a. 创建qdisc

static struct Qdisc * qdisc_create(struct net_device *dev, struct 
         netdev_queue *dev_queue, struct Qdisc *p, u32 parent, u32 
         handle, struct nlattr **tca, int *errp)
    //kind为tbf
    struct nlattr *kind = tca[TCA_KIND];
    struct Qdisc *sch;
    struct Qdisc_ops *ops;
    //根据kind到链表qdisc_base查找是否已经加载,如果没有,则动态加载
    //对于tbf来说,ops为tbf_qdisc_ops
    ops = qdisc_lookup_ops(kind);
    //申请qdisc内存
    sch = qdisc_alloc(dev_queue, ops);
    //命令行指定了parent为root,所以parent为TC_H_ROOT(0xFFFFFFFFU)
    sch->parent = parent;
    //如果没有指定handle,则自动分配一个(范围为[8000-FFFF]:0000)
    if (handle == 0) {
        handle = qdisc_alloc_handle(dev);
        err = -ENOMEM;
        if (handle == 0)
            goto err_out3;
    sch->handle = handle;
    //如果ops提供了init函数,则调用,对于tbf来说,init为tbf_init
    ops->init(sch, tca[TCA_OPTIONS])
   //参考下面对于qdisc_list_add的注释,因为parent为root,所以不会执行
    qdisc_list_add(sch);
    return sch;

tbf qdisc初始化函数,解析命令行参数,填充私有数据tbf_sched_data

static int tbf_init(struct Qdisc *sch, struct nlattr *opt)
{
    struct tbf_sched_data *q = qdisc_priv(sch);

    if (opt == NULL)
        return -EINVAL;

    q->t_c = ktime_get_ns();
    qdisc_watchdog_init(&q->watchdog, sch);
    q->qdisc = &noop_qdisc;
    return tbf_change(sch, opt);
}

b. 将新创建的qdisc赋给tx queue

static int qdisc_graft(struct net_device *dev, struct Qdisc *parent,
               struct sk_buff *skb, struct nlmsghdr *n, u32 classid,
               struct Qdisc *new, struct Qdisc *old)
    //如果parent为NULL,说明是第一次替换默认qdisc
    if (parent == NULL) {
        num_q = dev->num_tx_queues;
        for (i = 0; i < num_q; i++) {
            struct netdev_queue *dev_queue = netdev_get_tx_queue(dev, i);
            //先将new qdisc赋给 dev_queue->qdisc_sleeping
            old = dev_graft_qdisc(dev_queue, new);
        }
        dev->qdisc = new ? : &noop_qdisc;
        //如果网卡是up的,才会将dev_queue->qdisc_sleeping赋给dev_queue->qdisc,处理报文时,用的也是dev_queue->qdisc。
        if (dev->flags & IFF_UP)
            dev_activate(dev);
    }
    else {
        //替换之前添加的qdisc
        const struct Qdisc_class_ops *cops = parent->ops->cl_ops;
        err = -EOPNOTSUPP;
        //调用old qdisc的cops->graft嫁接新的qdisc
        if (cops && cops->graft) {
            unsigned long cl = cops->get(parent, classid);
            if (cl) {
                err = cops->graft(parent, cl, new, &old);
                cops->put(parent, cl);
            } else
                err = -ENOENT;
        }

有类别队列

accept队列_workerman 消息队列[通俗易懂]

image.png

a. 添加根队列
tc qdisc add dev eth0 root handle 1: cbq bandwidth 10Mbit avpkt 1000 cell 8 mpu 64

除了ops->init的具体实现,创建qdisc部分和无类别队列基本上一致。对于cbq来说,ops->init函数为cbq_init

tc_modify_qdisc --> qdisc_create -->ops->init(sch, tca[TCA_OPTIONS])

static int cbq_init(struct Qdisc *sch, struct nlattr *opt)
    //cbq_sched_data 是cbq队列的私有数据
    struct cbq_sched_data *q = qdisc_priv(sch);

    //clhash是用于存放class的hash链表
    qdisc_class_hash_init(&q->clhash);

    //link是struct cbq_class类型的class,为默认值类。
    q->link.refcnt = 1;
    q->link.sibling = &q->link;
    q->link.common.classid = sch->handle; //classid为qdisc的handle值
    q->link.qdisc = sch;

    //在默认class上也要创建默认qdisc pfifo_qdisc_ops
    q->link.q = qdisc_create_dflt(sch->dev_queue, &pfifo_qdisc_ops, sch->handle);
    ...
    cbq_link_class(&q->link);
    //将默认的class link添加到hash链表
    qdisc_class_hash_insert(&q->clhash, &this->common);

b. 在根队列上添加class
当通过tc命令行添加class时,内核调用tc_ctl_tclass
tc class add dev eth0 parent 1:0 classid 1:1 cbq bandidth 10Mbit

static int tc_ctl_tclass(struct sk_buff *skb, struct nlmsghdr *n)
    portid = tcm->tcm_parent; //值为1:0
    clid = tcm->tcm_handle; //值为1:1
    //根据qid查找qdisc,qid就是handle id,即通过主序列号查找qdisc。
    //不管添加几个类,parent是父队列还是父类,只会用冒号前面的主序列号来查找qdisc
    #define TC_H_MAJ_MASK (0xFFFF0000U)
    #define TC_H_MAJ(h) ((h)&TC_H_MAJ_MASK)
    qid = TC_H_MAJ(clid);//获取主序列号
    q = qdisc_lookup(dev, qid);
    if (!q)
        return -ENOENT;
    //获取 cl_ops cbq_class_ops
    cops = q->ops->cl_ops;
    if (cops == NULL)
        return -EINVAL;
    //根据class id查找是不是已经存在,对应cbq来说,就是调用cbq_get函数(调用cbq_class_lookup(q, classid)进行查找)
    cl = cops->get(q, clid);
    if (cl == 0) {
        err = -ENOENT;
    //如果不存在,但是命令不是添加新class或者没有create flag,则返回错误
    if (n->nlmsg_type != RTM_NEWTCLASS ||
         !(n->nlmsg_flags & NLM_F_CREATE))
        goto out;
    } else {
        switch (n->nlmsg_type) {
        case RTM_NEWTCLASS:
            //如果已经存在,但是flag为NLM_F_EXCL,意思是已存在则返回
            err = -EEXIST;
            if (n->nlmsg_flags & NLM_F_EXCL)
                goto out;
            break;
        case RTM_DELTCLASS:
            err = -EOPNOTSUPP;
            //删除此class
            if (cops->delete)
                err = cops->delete(q, cl);
            if (err == 0)
                tclass_notify(net, skb, n, q, cl, RTM_DELTCLASS);
            goto out;

    new_cl = cl;
    err = -EOPNOTSUPP;
    //如果没有提供change函数,则返回EOPNOTSUPP错误,说明不支持class操作
    if (cops->change)
        //调用change函数,对cbq来说就是 cbq_change_class
        err = cops->change(q, clid, portid, tca, &new_cl);

static int cbq_change_class(struct Qdisc *sch, u32 classid, u32 parentid, struct nlattr **tca, unsigned long *arg)
    //获取cbq的私有数据
    struct cbq_sched_data *q = qdisc_priv(sch);
    //获取默认的class
    struct cbq_class *parent;
    parent = &q->link;
    //如果指定了parentid,则需要进行查找。
    //查找失败直接返回
    if (parentid) {
        parent = cbq_class_lookup(q, parentid);
        err = -EINVAL;
        if (parent == NULL)
            goto failure;
    //分配新class结构体
    cl = kzalloc(sizeof(*cl), GFP_KERNEL);
    //为class分配默认队列
    cl->q = qdisc_create_dflt(sch->dev_queue, &pfifo_qdisc_ops, classid);
    //保存classid
    cl->common.classid = classid;
    //指向父类
    cl->tparent = parent;
    //保存根队列
    cl->qdisc = sch;
    //将class添加到cbq队列的clhash链表中
    cbq_link_class(cl);
        struct cbq_sched_data *q = qdisc_priv(this->qdisc);
        qdisc_class_hash_insert(&q->clhash, &this->common);

按照上面的分析,可通过如下命令创建一个根队列1:,再在根队列的命令空间1:下创建四个类1:1, 1:2, 1:3, 1:4

#创建根队列cbq,替换默认的根队列pfifo_qdisc_ops
#分配默认类,classid为handle id,即1:, 并且在类上分配默认队列pfifo_qdisc_ops
tc qdisc add dev eth0 root handle 1: cbq bandwidth 10Mbit avpkt 1000 cell 8 mpu 64

#添加新类,classid为1:1, 父类id为1:0
#并在新类上添加默认队列pfifo_qdisc_ops
tc class add dev eth0 parent 1:0 classid 1:1 cbq bandidth 10Mbit rate 10Mbit maxburst 20 allot 1514 prio 8 avpkt 1000 cell 8 weight 1Mbit

#添加新类,classid为1:2, 父类id为1:1
#并在新类上添加默认队列pfifo_qdisc_ops
tc class add dev eth0 parent 1:1 classid 1:2 cbq bandwidth 10Mbit rate 8Mbit maxburst 20 allot 1514 prio 2 avpkt 1000 cell 8 weight 800Kbit split 1:0 bounded

#添加新类,classid为1:3, 父类id为1:1
#并在新类上添加默认队列pfifo_qdisc_ops
tc class add dev eth0 parent 1:1 classid 1:3 cbq bandwidth 10Mbit rate 1Mbit maxburst 20 allot 1514 prio 1 avpkt 1000 cell 8 weight 100Kbit split 1:0

#添加新类,classid为1:4, 父类id为1:1
#并在新类上添加默认队列pfifo_qdisc_ops
tc class add dev eth0 parent 1:1 classid 1:4 cbq bandwidth 10 Mbit rate 1Mbit maxburst 20 allot 1514 prio 6 avpkt 1000 cell 8 weight 100Kbit split 1:0

至此,根队列上有五个类,即默认类和四个新添加类, 这五个类都会添加到 cbq_sched_data->clhash 链表中,并且每个类都会有默认队列pfifo_qdisc_ops。
默认类1:0是1:1的父类,1:1是1:2,1:3和1:4的父类,并且每个类都有默认队列pfifo_qdisc_ops

root 0xffffffff
|
root qdisc handle = 1:0
|
default class 1:0 -> default qdisc pfifo_qdisc_ops
|
class 1:1 -> default qdisc pfifo_qdisc_ops
|
class 1:2    class 1:3   class 1:4 -> default qdisc pfifo_qdisc_ops

c. 添加filter

应用路由分类器到cbq队列的根,父分类编号为1:0;过滤协议为ip,优先级别为100,过滤器为基于路由表。
tc filter add dev eth0 parent 1:0 protocol ip prio 100 route

建立路由映射分类 1:2 , 1:3 , 1:4
tc filter add dev eth0 parent 1:0 protocol ip prio 100 route to 2 flowid 1:2
tc filter add dev eth0 parent 1:0 protocol ip prio 100 route to 3 flowid 1:3
tc filter add dev eth0 parent 1:0 protocol ip prio 100 route to 4 flowid 1:4

#通过命令行tc 添加filter规则时,kernel调用tc_ctl_tfilter
static int tc_ctl_tfilter(struct sk_buff *skb, struct nlmsghdr *n)
//如果没有指定parent,则使用根队列的handle
    /* Find qdisc */
    if (!parent) {
        q = dev->qdisc;
        parent = q->handle;
    } else {
        //根据主序列号查找qdisc
        q = qdisc_lookup(dev, TC_H_MAJ(t->tcm_parent));
        if (q == NULL)
            return -EINVAL;
    }
    //获取cl_ops
    /* Is it classful? */
    cops = q->ops->cl_ops;
    if (!cops)
        return -EINVAL;

    //如果cl_ops没有提供tcf_chain函数,则说明不支持filter,返回错误EOPNOTSUPP,
    //对于cbq来说,tcf_chain  就是 cbq_find_tcf
    if (cops->tcf_chain == NULL)
        return -EOPNOTSUPP;

    unsigned long cl = 0;
    //parent的从序列号不为0,说明指定了class id,否则使用默认类
    if (TC_H_MIN(parent)) {
        //对于cbq来说,cbq_get
        //根据指定的class id,即parent的值查找class
        cl = cops->get(q, parent);
            struct cbq_sched_data *q = qdisc_priv(sch);
            struct cbq_class *cl = cbq_class_lookup(q, classid);
            //到clhash中查找
            clc = qdisc_class_find(&q->clhash, classid);
            if (cl) {
                cl->refcnt++;
                return (unsigned long)cl;
            }
            return 0;
            //如果查找失败,说明指定的class不存在,返回错误
            if (cl == 0)
                return -ENOENT;

    //根据 cl 获取 filter链表,对于cbq来说,就是cbq_find_tcf
    /* And the last stroke */
    chain = cops->tcf_chain(q, cl);
        struct cbq_sched_data *q = qdisc_priv(sch);
        struct cbq_class *cl = (struct cbq_class *)arg;
        //如果cl为空,则使用默认类q->link
        if (cl == NULL)
            cl = &q->link;
        //struct tcf_proto __rcu    *filter_list;
        return &cl->filter_list;

    //根据protocol和prio查找要添加的filter是否已经在 filter_list 中
    /* Check the chain for existence of proto-tcf with this priority */
    for (back = chain; (tp = rtnl_dereference(*back)) != NULL; back = &tp->next) {
        if (tp->prio >= prio) {
            if (tp->prio == prio) {
                if (!nprio || (tp->protocol != protocol && protocol))
                    goto errout;
            } else
                tp = NULL;
            break;
        }
    }
    //如果第一次添加
    if (tp == NULL) {
        tp = kzalloc(sizeof(*tp), GFP_KERNEL);
        //根据命令行参数 route 查找ops,ops通过     register_tcf_proto_ops 注册
        tp_ops = tcf_proto_lookup_ops(tca[TCA_KIND]);
        tp->ops = tp_ops;
           tp->protocol = protocol;
        tp->classify = tp_ops->classify;
           tp->classid = parent;
        //调用tp_ops的init函数,对于route来说就是route4_init,此函数为空,什么都不做
        tp_ops->init(tp);
     else if (tca[TCA_KIND] && nla_strcmp(tca[TCA_KIND], tp->ops->kind))
    //如果查找到了tp,但是命令行指定的kind和tp中保存的kind不一样,则报错返回。
    //即上次添加filter时,kind为route,则会将bpf保存到tp->ops->kind,如果这次添加filter
    //时,kind为u32,则报错
        goto errout;

    //对于route来说,ops->get为 route4_get
    //t->tcm_handle 为命令行指定的 flowid
        fh = tp->ops->get(tp, t->tcm_handle);
        struct route4_head *head = rtnl_dereference(tp->root);
        //如果head为空,说明是第一次添加filter
        if (!head)
            return 0;

        h1 = to_hash(handle);
        if (h1 > 256)
            return 0;

        h2 = from_hash(handle >> 16);
        if (h2 > 32)
            return 0;

        //h1指定hash桶,每个桶中是hash链表
        //h2指定hash链表的key,指向链表头
        b = rtnl_dereference(head->table[h1]);
        if (b) {
            for (f = rtnl_dereference(b->ht[h2]);
             f;
                 f = rtnl_dereference(f->next))
                if (f->handle == handle)
                    return (unsigned long)f;
        }

    if (fh == 0) {
    //没有找到,并且命令行想要删除,则返回报错
    if (n->nlmsg_type == RTM_DELTFILTER && t->tcm_handle == 0) {
        goto errout;
    //没有找到,但是命令行没有指定创建,则返回报错
    if (n->nlmsg_type != RTM_NEWTFILTER ||
            !(n->nlmsg_flags & NLM_F_CREATE))
            goto errout;
    } else {
    switch (n->nlmsg_type) {
        case RTM_NEWTFILTER:
        //命令行指定删除
        case RTM_DELTFILTER:
        err = tp->ops->delete(tp, fh);
    }

    //添加或者替换filter rule,route4_change
    tp->ops->change(net, skb, tp, cl, t->tcm_handle, tca, &fh,
                  n->nlmsg_flags & NLM_F_CREATE ? TCA_ACT_NOREPLACE : TCA_ACT_REPLACE);
        struct route4_head *head = rtnl_dereference(tp->root);
        if (head == NULL) {
            head = kzalloc(sizeof(struct route4_head), GFP_KERNEL);
             rcu_assign_pointer(tp->root, head);

    //分配 route filter结构体
    f = kzalloc(sizeof(struct route4_filter), GFP_KERNEL);
    route4_set_parms
        if (tb[TCA_ROUTE4_TO])
            f->id = to;
        if (tb[TCA_ROUTE4_CLASSID]) {
            f->res.classid = nla_get_u32(tb[TCA_ROUTE4_CLASSID]);
    //最后将f添加到 head指定的表中。
    //将tp插入链表,安装优先级,优先级值越大越靠后
    RCU_INIT_POINTER(tp->next, rtnl_dereference(*back));
    rcu_assign_pointer(*back, tp);
  1. 添加route
    该路由是与前面所建立的路由映射一一对应。
1)发往主机192.168.1.24的数据包通过分类2转发(分类2的速率8Mbit)
         ip route add 192.168.1.24 dev eth0 via 192.168.1.66 realm 2 
2)发往主机192.168.1.30的数据包通过分类3转发(分类3的速率1Mbit)
         ip route add 192.168.1.30 dev eth0 via 192.168.1.66 realm 3
3)发往子网192.168.1.0/24 的数据包通过分类4转发(分类4的速率1Mbit)
         ip route add 192.168.1.0/24 dev eth0 via 192.168.1.66 realm 4

添加realms代码流程

#通过ip route add 添加realms
rtnl_register(PF_INET, RTM_NEWROUTE, inet_rtm_newroute, NULL, NULL);
static int inet_rtm_newroute(struct sk_buff *skb, struct nlmsghdr *nlh)
    rtm_to_fib_config(net, skb, nlh, &cfg);
        case RTA_FLOW:
            cfg->fc_flow = nla_get_u32(attr);
            break;
    fib_table_insert(tb, &cfg);
        fib_create_info(cfg);
            nh->nh_tclassid = cfg->fc_flow;

#查找路由时,将 nh_tclassid 赋给 rt->dst.tclassid
__mkroute_input
__mkroute_output
    rt_set_nexthop(rth, fl4->daddr, res, fnhe, fi, type, 0);
        rt->dst.tclassid = nh->nh_tclassid;

出方向过滤流程

前面流程已经添加了规则,数据包发送时,如何匹配?
下面已有类队列cbq为例,分析enqueue和dequeue的操作。

int dev_queue_xmit(struct sk_buff *skb)
    __dev_queue_xmit(skb, NULL);
        q = rcu_dereference_bh(txq->qdisc);
        //提供了enqueue函数的qdisc,比如cbq
        if (q->enqueue) {
            //报文入队或者直接发送
            __dev_xmit_skb(skb, q, dev, txq);
            goto out;
        }
         //lo,macvlan等虚拟接口默认qdisc为noqueue,没提供enqueue函数,会在此处直接发送到网卡
        if (dev->flags & IFF_UP) {
            //对数据包做校验,比如添加vlan等
            skb = validate_xmit_skb(skb, dev);
            //调用网卡驱动的函数发送报文
            skb = dev_hard_start_xmit(skb, dev, txq, &rc);

//qdisc提供enqueue函数的流程
//走到这个函数也不一定走enqueue流程,也有可能直接发送报文
static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
                 struct net_device *dev,
                 struct netdev_queue *txq)
{
    //qdisc的状态为__QDISC_STATE_DEACTIVATED时,可能是因为网卡down了,这个时候直接drop报文
    if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
        kfree_skb(skb);
        rc = NET_XMIT_DROP;
    } else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
           qdisc_run_begin(q)) {
        //满足上面三个条件会走到这个流程
        //a. qdisc支持TCQ_F_CAN_BYPASS,即qdisc允许bypass qdisc的处理,大部分的qdisc都不支持,必须走dqisc流程,
        //对报文进行处理后才能发出去。
        //b. qdisc缓存报文的队列长度为0
        //c. qdisc之前没有running,本次将qdisc设置为running
        /*
         * This is a work-conserving queue; there are no old skbs
         * waiting to be sent out; and the qdisc is not running -
         * xmit the skb directly.
         */

        qdisc_bstats_update(q, skb);
        //sch_direct_xmit 返回值大于0,说明还有报文要发送,调用__qdisc_run继续发送
        if (sch_direct_xmit(skb, q, dev, txq, root_lock, true)) {
            if (unlikely(contended)) {
                spin_unlock(&q->busylock);
                contended = false;
            }
            __qdisc_run(q);
        } else
            //报文发送完毕,清除标志位 __QDISC___STATE_RUNNING
            //qdisc->__state &= ~__QDISC___STATE_RUNNING;
            qdisc_run_end(q);

        rc = NET_XMIT_SUCCESS;
    } else {
        //对于不支持bypass tx处理的qdisc来说,会在此处将报文入队
        //对于cbq来说,q->enqueue指向 cbq_enqueue
        rc = q->enqueue(skb, q) & NET_XMIT_MASK;
        
        if (qdisc_run_begin(q)) {
            if (unlikely(contended)) {
                spin_unlock(&q->busylock);
                contended = false;
            }
            __qdisc_run(q);
        }
    }
}

//调用dequeue循环从队列取包发送。
//但是不能一直发送,满足下面两个条件之一就会停止发送,并启动tx软中断,在软中断中会继续发送报文
//a. 发送数据包个数超过了quota,即weight_p(默认64)
//b. 有其他进程需要占用cpu,即被其他进程抢占了cpu
void __qdisc_run(struct Qdisc *q)
{
    int quota = weight_p;
    int packets;

    while (qdisc_restart(q, &packets)) {
        /*
         * Ordered by possible occurrence: Postpone processing if
         * 1. we've exceeded packet quota
         * 2. another process needs the CPU;
         */
        quota -= packets;
        if (quota <= 0 || need_resched()) {
            __netif_schedule(q);
                //使能发送软中断,会在软中断处理函数 net_tx_action 中继续发送报文
                if (!test_and_set_bit(__QDISC_STATE_SCHED, &q->state))
                    __netif_reschedule(q);
                        sd = this_cpu_ptr(&softnet_data);
                        q->next_sched = NULL;
                        *sd->output_queue_tailp = q;
                        sd->output_queue_tailp = &q->next_sched;
                        raise_softirq_irqoff(NET_TX_SOFTIRQ);
            break;
        }
    }

    qdisc_run_end(q);
}
static inline int qdisc_restart(struct Qdisc *q, int *packets)
{
    struct netdev_queue *txq;
    struct net_device *dev;
    spinlock_t *root_lock;
    struct sk_buff *skb;
    bool validate;

    /* Dequeue packet */
    skb = dequeue_skb(q, &validate, packets);
        //调用qdisc的dequeue从队列中取出报文,对于cbq来说,就是 cbq_dequeue
        skb = q->dequeue(q);
    //skb为空,说明队列已经没有报文,返回0
    if (unlikely(!skb))
        return 0;

    root_lock = qdisc_lock(q);
    dev = qdisc_dev(q);
    txq = skb_get_tx_queue(dev, skb);
    //发送报文
    return sch_direct_xmit(skb, q, dev, txq, root_lock, validate);
}

//报文入队
cbq_enqueue(struct sk_buff *skb, struct Qdisc *sch)
    //找到合适的 class
    struct cbq_class *cl = cbq_classify(skb, sch, &ret);
        struct cbq_sched_data *q = qdisc_priv(sch);
        //取出存放class的链表
        struct cbq_class *head = &q->link;
        u32 prio = skb->priority;
        /*
         *  Step 1. If skb->priority points to one of our classes, use it.
         */
        //首先根据skb的priority作为classid查找类
        //用户程序可以通过套接字选项SO_PRIORITY在skb->priority设置一个类的classid
        //TC_H_MAJ(prio ^ sch->handle) == 0 这个是判断classid是否是这个qdisc下的,因为classid的高16位代表这个类
        //所在的qdisc,应该和handle的高16为相同。
        if (TC_H_MAJ(prio ^ sch->handle) == 0 &&
            (cl = cbq_class_lookup(q, prio)) != NULL)
            return cl;
        //遍历所有类的所有filter,找到合适的类并返回
        for (;;) {
            //取出类head的过滤链表
            fl = rcu_dereference_bh(head->filter_list);
            //遍历过滤链表进行匹配
            result = tc_classify_compat(skb, fl, &res);
            //取出classid
            cl = (void *)res.class;
            if (cl->level == 0)
                return cl;
        //如果没有找到合适的类,则使用默认类
        if (TC_H_MAJ(prio) == 0 &&
            !(cl = head->defaults[prio & TC_PRIO_MAX]) &&
            !(cl = head->defaults[TC_PRIO_BESTEFFORT]))
            return head;

    //将skb放入cl类的队列中,cl->q 默认为 pfifo_qdisc_ops
    ret = qdisc_enqueue(skb, cl->q);
        //sch->enqueue指向 pfifo_enqueue
        return sch->enqueue(skb, sch);
    if (ret == NET_XMIT_SUCCESS) {
        sch->q.qlen++;
        cbq_mark_toplevel(q, cl);
        if (!cl->next_alive)
            //将cl保存到q->active,并设置q->activemask,这样在dequeue时,才知道哪个类上有数据
            cbq_activate_class(cl);
        return ret;
    }
    
//报文出队
static struct sk_buff *cbq_dequeue(struct Qdisc *sch)
    for (;;) {
        q->wd_expires = 0;

        skb = cbq_dequeue_1(sch);
            struct cbq_sched_data *q = qdisc_priv(sch);
            struct sk_buff *skb;
            unsigned int activemask;
            //如果activemask不为0,说明类上有报文需要发送
            activemask = q->activemask & 0xFF;
            while (activemask) {
                int prio = ffz(~activemask);
                activemask &= ~(1<<prio);
                //取出类,调用类的cl->q->dequeue将报文出队,并返回
                skb = cbq_dequeue_prio(sch, prio);
                    skb = cl->q->dequeue(cl->q);
                if (skb)
                    return skb;
            }
        if (skb) {
            qdisc_bstats_update(sch, skb);
            sch->q.qlen--;
            qdisc_unthrottled(sch);
            return skb;
        }
    }

参考

traffic control 之 egress 队列

也可参考:traffic control 之 egress 队列 – 简书 (jianshu.com)

今天的文章accept队列_workerman 消息队列[通俗易懂]分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/76382.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注