linux网络实现分析(1)——数据包的接收(从网卡到协议栈)

linux网络实现分析(1)——数据包的接收(从网卡到协议栈)

——lvyilong316

说明:源码参考2.6.32

从网卡到协议栈的skb接收有两种方式:NAPI和非NAPI。其中有公共逻辑,也有区别。首先看下用到的基本数据结构。

1. 基本数据结构

l   softnet_data

   每cpu数据

struct softnet_data

{

    struct Qdisc        *output_queue;  //发送帧队列

    struct sk_buff_head input_pkt_queue;  //接收帧队列(入口队列)

    struct list_head    poll_list; //这是一个双向链表

    struct sk_buff      *completion_queue;

 

    struct napi_struct  backlog;

};

说明:

1.     可以看到发送帧队列并不是skb的链表,而是Qdisc的链表,这是因为发送一般需要Qos流控,所以发送帧会存入相应dev关联的Qdisc中(Qdisc中有skb的队列),详见“后面链路层数据包发送”分析。

2.     poll_list是一个双向链表,每一个节点是一个napi_struct结构,而napi_struct又是net_device的成员,所以这个链表也可以理解为一个net_device链表,这些net_device都带有输入帧等着被处理。

3.     input_pkt_queue是设备驱动将数据从物理介质接收后封装成skb后存放的缓存队列,所有非NAPI设备共有这一个输入缓存队列,而NAPI设备有自己的私有队列用于存放输入包。

/*

 * Structure for NAPI scheduling similar to tasklet but with weighting

 */

struct napi_struct {

    struct list_head    poll_list;//通过这个成员和其他napi_struct相连

 

    unsigned long       state;

    int         weight;

    int         (*poll)(struct napi_struct *, int);

#ifdef CONFIG_NETPOLL

    spinlock_t      poll_lock;

    int         poll_owner;

#endif

 

    unsigned int        gro_count;

 

    struct net_device   *dev;

    struct list_head    dev_list;

    struct sk_buff      *gro_list;

    struct sk_buff      *skb;

};

说明:

    /* The poll_list must only be managed by the entity which

     * changes the state of the NAPI_STATE_SCHED bit.  This means

     * whoever atomically sets that bit can add this napi_struct

     * to the per-cpu poll_list, and whoever clears that bit

     * can remove from the list right before clearing the bit.

     */

1. 每个napi_struct可以通过其poll_list成员链接在softnet_data的poll_list,进而组成链表。

2. 通过state成员是否设置NAPI_STATE_SCHED位,来表示该napi_struct是否已经链在某个cpu的softnet_data上,即是否处于调度状态。所以在链入softnet_data前,需要设置NAPI_STATE_SCHED位,在从softnet_data移除时要清除NAPI_STATE_SCHED位。

2. 处理过程

    非NAPI设备驱动,接收数据包,分配skb,封装完skb后会调用netif_rx。而对于 NAPI 方式,它没有使用 input_pkt_queue 队列,而是使用私有的 队列,所以它没有这一个步骤。

  netif_rx

netif_rx通常在中断环境中被驱动程序调用,但是也有例外,特别是当此函数是被环回设备调用时。因此,netif_rx在其启动时会关闭本地CPU的中断事件,在完成工作后再开启。

该函数从设备驱动程序中接收skb,并送入每cpu变量softnet_data的队列中,以完成数据包从设备驱动接受进入协议栈。

返回值:NET_RX_SUCCESS(接受入队),NET_RX_DROP(由于队满等原因丢弃)。

int netif_rx(struct sk_buff *skb)

{

    struct softnet_data *queue;

    unsigned long flags;

 

    /* if netpoll wants it, pretend we never saw it */

    if (netpoll_rx(skb)) //NETPOLL是否需要消费该帧

        return NET_RX_DROP;

 

    if (!skb->tstamp.tv64) // skb->tstamp.tv64是否设置

        net_timestamp(skb);// 初始化skb->tstamp.tv64

 

    local_irq_save(flags); //关闭本地cpuIRQ

    queue = &__get_cpu_var(softnet_data); //获取本cpusoftnet_data

 

    __get_cpu_var(netdev_rx_stat).total++; //更新cpu统计计数

    if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {//如果队列不满

        if (queue->input_pkt_queue.qlen) {//如果输入队列不空

enqueue:

            __skb_queue_tail(&queue->input_pkt_queue, skb); //数据包入队

            local_irq_restore(flags);

            return NET_RX_SUCCESS;

        }

        //如果输入队列为空

        napi_schedule(&queue->backlog);

        goto enqueue;

    }

 

    __get_cpu_var(netdev_rx_stat).dropped++;

    local_irq_restore(flags);

 

    kfree_skb(skb);

    return NET_RX_DROP;

}

可以看到该函数的主要功能时将skb放入queue->input_pkt_queue(非NAPI设备共享queue->input_pkt_queue这个队列,而NAPI设备有自己的私有队列),为什么只有队列为空时才调用napi_schedule而,不空时入队后直接返回呢?有没有想过就这样返回后由谁来处理数据包呢?首先回答后者,处理包的任务由软中断负责,而软中断由napi_schedule来触发(见后面分析)。如果队列不为空,说明之前触发过软中断(最开始肯定是空的),并且软中断还没有把包处理完,这时只需将新包加入队列即可,之前唤醒的软中断稍后会一起处理掉这个新入队的skb。如果队列为空,说明软中断还未被调用过,或者说上次软中断已经处理完成,所以这次需要再次调用napi_schedule


   napi_schedule

static inline void napi_schedule(struct napi_struct *n)

{

    if (napi_schedule_prep(n))  //检测napi是否能被scheduled

        __napi_schedule(n);

}

l   napi_schedule_prep – 检测napi是否能被调度

测试napi例程是否已经运行,如果没有则将其标记为运行状态(通过设置NAPI_STATE_SCHED位),NAPI_STATE_SCHED标记位用来保证当前只有一个NAPI poll函数在运行。同时要检查当前napi的状态不能是NAPI_STATE_DISABLE的。

static inline int napi_schedule_prep(struct napi_struct *n)

{

    return !napi_disable_pending(n) &&

        !test_and_set_bit(NAPI_STATE_SCHED, &n->state);

}

 

l   __napi_schedule

void __napi_schedule(struct napi_struct *n)

{

    unsigned long flags;

 

local_irq_save(flags);

//napi_struct添加到softnet_datapoll_list

list_add_tail(&n->poll_list, &__get_cpu_var(softnet_data).poll_list);

//调用NET_RX_SOFTIRQ软中断

    __raise_softirq_irqoff(NET_RX_SOFTIRQ);

    local_irq_restore(flags);

}

至此,中断的上半部已经完成,以下的工作则交由中断的下半部来实现。总之,NON-NAPI 的中断上半部接收过程可以简单的描述为,它首先为新到来的数据帧分 配合适长度的 SKB,再将接收到的数据从 NIC 中拷贝过来,然后将这个 SKB 链入当前 CPU 的 softnet_data 中的链表中,最后进一步触发中断下半部发继续处理。

而NAPI的中断处理函数(驱动)直接调用napi_schedule,和非NAPI相比跳过了netif_rx函数,所以NAPI和非NAPI的最主要区别也就出来了:

(1)  NAPI设备接收数据包不需要存入softnet_data的input_pkt_queue,而是由其自身的驱动函数放入设备自身的私有队列中。非NAPI则要由其驱动函数放入softnet_data的input_pkt_queue

(2)  NAPI调用napi_schedule传入的参数是设备自身对应的napi_struct,将来软中断就会调用设备自身初始化给napi_struct的poll方法,而NAPI则通过调用napi_schedule传入的参数是softnet_data结构中的backlog成员,其poll方法被初始化为内核的process_backlog函数。NAPI通过调用自身的poll方法就能直接将后续到达的数据包从自身私用队列送入协议栈,直到私有队列空,而不用再产生中断(非NAPI设备产生中断的作用是将skb收入softnet_data的input_pkt_queue,再触发软中断,而NAPI设备产生中断只为触发软中断)。

    处理NET_RX_SOFTIRQ软中断的函数是net_rx_action。

l   net_rx_action

static void net_rx_action(struct softirq_action *h)

 //获取struct napi_struct链表

    struct list_head *list = &__get_cpu_var(softnet_data).poll_list;

    unsigned long time_limit = jiffies + 2;//每次net_rx_action的执行时间

    int budget = netdev_budget;

    void *have;

 

    local_irq_disable();

 

    while (!list_empty(list)) {

        struct napi_struct *n;

        int work, weight;

        // budget为全部变量,为每次net_rx_action调用,最多处理的skb数量

        if (unlikely(budget <= 0 || time_after(jiffies, time_limit)))

            goto softnet_break;

 

        local_irq_enable();

 

        /* Even though interrupts have been re-enabled, this

         * access is safe because interrupts can only add new

         * entries to the tail of this list, and only ->poll()

         * calls can remove this head entry from the list.

         */

        n = list_entry(list->next, struct napi_struct, poll_list);

        have = netpoll_poll_lock(n);

 

        weight = n->weight; //weigthpoll方法每次调用每次最多处理skb的数量

        work = 0;

        if (test_bit(NAPI_STATE_SCHED, &n->state)) {

            //调用struct napi_structpoll方法,每次最多处理weightskb

            work = n->poll(n, weight);  //实际处理了workskb

            trace_napi_poll(n);

        }

        WARN_ON_ONCE(work > weight);

        budget -= work;

        local_irq_disable();

        if (unlikely(work == weight)) {//说明队列中的skb已经处理完

            if (unlikely(napi_disable_pending(n))) {

                local_irq_enable();

                napi_complete(n);

                local_irq_disable();

            } else

                list_move_tail(&n->poll_list, list);

        }

// work == weight说明队列中的skb还没处理完,需要将本napi_struct放入链尾,下次继续执行其poll方法处理skb

        netpoll_poll_unlock(have);

    }

out:

    local_irq_enable();

return;

//net_rx_action*返回(执行时间到或者处理skb数量超过了budget),而入口队列(softnet_data .input_pkt_queue)中仍有skb时则会执行最后一段代码。

 softnet_break:

    __get_cpu_var(netdev_rx_stat).time_squeeze++;

    __raise_softirq_irqoff(NET_RX_SOFTIRQ);// NET_RX_SOFTIRQ再次被调度

    goto out;

}

l   虚拟poll方法

   不同设备驱动的poll方法不一样,对于非NAPI设备其poll方法被初始化为process_backlog。

static int process_backlog(struct napi_struct *napi, int quota)

{

    int work = 0;

    struct softnet_data *queue = &__get_cpu_var(softnet_data);

    unsigned long start_time = jiffies;

    napi->weight = weight_p;

    do {

        struct sk_buff *skb;

        local_irq_disable();

        skb = __skb_dequeue(&queue->input_pkt_queue);  //取出skb

        if (!skb) {//队列已空

            __napi_complete(napi);

            local_irq_enable();

            break;

        }

        local_irq_enable();

        netif_receive_skb(skb);

    } while (++work < quota && jiffies == start_time);

    return work;

}

对于NAPI设备,使用自己的poll函数,从自己的私有队列取出skb(NAPI设备不使用公用入口队列softnet_data .input_pkt_queue),然后调用netif_receive_skb(skb)方法。而进入netif_receive_skb方法也就正式进入协议栈的处理逻辑了。

整个处理流程如下图所示。

linux网络实现分析(1)——数据包的接收(从网卡到协议栈)


上一篇:EDA:最简单的自然语言处理数据增广方法


下一篇:应用统计学与R语言实现学习笔记(九)——线性回归