linux Stream Parser实现

概念

新版内核提供了strparser这种机制,可以在内核中做基于tcp之上的数据流协议解析, 比如TLS。
可以参考AF_KTLS项目, 以及内核的KCM模块。

通常数据流协议,在协议头中会指定playload有几个字节,然后通过底层tcp读取完协议头header和完整的payload后,就是一条消息记录。

使用strparser后, 就可以在读取完一条完整的消息后产生回调, 协议的消息处理函数就能对整条的消息进行处理。
从而上层协议就无需关心中间层的缓存实现, 只需定义消息的处理函数,以及对收到错误格式的消息的处理函数等。

初始化

上层协议通过调用strp_init来设置回调函数,同时设置一个定时器,一条消息超时没有接收完整后将被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int strp_init(struct strparser *strp, struct sock *csk,
struct strp_callbacks *cb)
{
struct socket *sock = csk->sk_socket;
if (!cb || !cb->rcv_msg || !cb->parse_msg)
return -EINVAL;
if (!sock->ops->read_sock || !sock->ops->peek_len)
return -EAFNOSUPPORT;
memset(strp, 0, sizeof(*strp));
strp->sk = csk;
setup_timer(&strp->rx_msg_timer, strp_rx_msg_timeout, //消息粒度的定时器
(unsigned long)strp);
INIT_WORK(&strp->rx_work, strp_rx_work);
strp->cb.rcv_msg = cb->rcv_msg;
strp->cb.parse_msg = cb->parse_msg;
strp->cb.read_sock_done = cb->read_sock_done ? : default_read_sock_done;
strp->cb.abort_parser = cb->abort_parser ? : strp_abort_rx_strp;
return 0;
}

回调函数

1
2
3
4
5
6
struct strp_callbacks {
int (*parse_msg)(struct strparser *strp, struct sk_buff *skb); //消息头解析,返回消息需要的字节数; =0表示需要更多数据来决定长度; <0表示出错
void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb); //收到完整的消息后,回调通知应用程序来处理消息
int (*read_sock_done)(struct strparser *strp, int err); //从tcp读取数据后,strp解析的时候可能出错,比如内存不足等,read_sock_done可以处理错误等
void (*abort_parser)(struct strparser *strp, int err); //读取超时的回调, 或者parse_msg收到错误协议格式等
};

流程

  • tcp收到数据后会调用sk_data_ready回调函数作为事件通知,在该函数中调用strp_data_ready,把事件传递给strp层。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    void strp_data_ready(struct strparser *strp)
    {
    if (unlikely(strp->rx_stopped))
    return;
    /* This check is needed to synchronize with do_strp_rx_work.
    * do_strp_rx_work acquires a process lock (lock_sock) whereas
    * the lock held here is bh_lock_sock. The two locks can be
    * held by different threads at the same time, but bh_lock_sock
    * allows a thread in BH context to safely check if the process
    * lock is held. In this case, if the lock is held, queue work.
    */
    if (sock_owned_by_user(strp->sk)) {
    queue_work(strp_wq, &strp->rx_work);
    return;
    }
    if (strp->rx_paused)
    return;
    if (strp->rx_need_bytes) { //已经解析好协议,知道剩余需要的长度
    if (strp_peek_len(strp) >= strp->rx_need_bytes) //tcp中有足够的数据
    strp->rx_need_bytes = 0;
    else
    return; //tcp数据不够直接返回
    }
    if (strp_read_sock(strp) == -ENOMEM) //从tcp中读取
    queue_work(strp_wq, &strp->rx_work);
    }
  • 在strp_data_ready()中调用strp_read_sock()从tcp中读取和解析消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    static int strp_read_sock(struct strparser *strp)
    {
    struct socket *sock = strp->sk->sk_socket;
    read_descriptor_t desc;
    desc.arg.data = strp;
    desc.error = 0;
    desc.count = 1; /* give more than one skb per call */
    /* sk should be locked here, so okay to do read_sock */
    sock->ops->read_sock(strp->sk, &desc, strp_recv); //tcp_read_sock, 从tcp中读取skb传给strp_recv函数
    desc.error = strp->cb.read_sock_done(strp, desc.error); //错误处理
    return desc.error;
    }
  • strp_read_sock()会调用tcp_read_sock()从tcp的sk_receive_queue中读取skb,然后调用strp_recv()来处理skb

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
    unsigned int orig_offset, size_t orig_len)
    {
    struct strparser *strp = (struct strparser *)desc->arg.data;
    //strp->rx_skb_head 包含了整个消息的数据量等信息,每个消息使用该skb及其frag_list,作为完整的消息传递给rcv_msg()回调
    //strp->rx_need_bytes 剩余需要的tcp数据
    //收到新的消息,调用parse_msg()解析协议头部,得到剩余需要读取的数据大小
    //协议解析出错,调用abort_parser()
    //当前消息全部读取完毕, 调用rcv_msg()通知上层协议
    }
  • 定时器
    在读取到新的消息的时候,会设置一个定时器,如果超时表示这条记录没有读取完,也会调用abort_parser()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    static void strp_rx_msg_timeout(unsigned long arg)
    {
    struct strparser *strp = (struct strparser *)arg;
    /* Message assembly timed out */
    STRP_STATS_INCR(strp->stats.rx_msg_timeouts);
    lock_sock(strp->sk);
    strp->cb.abort_parser(strp, ETIMEDOUT);
    release_sock(strp->sk);
    }
  • 消息缓存大小
    具体的消息缓存大小由上层协议自己设计,可以通过调用strp_pause(), 停止读取tcp中的数据, 这样把数据保存在tcp缓存中,
    由tcp的流量控制来平滑双方的网络速率。