[多线程学习笔记] 流水线

 

 

[多线程学习笔记] 流水线
/*************************************************************************
    > File Name: pipe.c
    > Author: likeyi
    > Mail: likeyiyy@sina.com 
    > Created Time: Thu 17 Apr 2014 03:53:25 PM CST
 ************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
typedef struct buffer
{
    unsigned char * buffer;
    unsigned long length;
}buffer_t;
typedef struct stage_tag
{
    pthread_mutex_t mutex;
    pthread_cond_t avail;
    pthread_cond_t ready;
    int busy;
    int data_ready;
    buffer_t * data;
    pthread_t thread;
    long total;     /* 这个用来计数,看一个线程被调用多少次,当没有空闲的线程时,就用这个来比较。*/
}stage_t;
static stage_t * get_tcp_thread();
static stage_t * get_ip_thread();
static stage_t * get_eth_thread();
static stage_t * tcp_thread_t,* ip_thread_t,*eth_thread_t;

static unsigned char * tcp_hdr = "tcp header";
static unsigned char * ip_hdr = "ip header";
static unsigned char * eth_hdr = "eth header";

void * pipe_add_tcp_header(void * arg)
{
    /* 每次读取文件的一行,作为数据,发出去。*/
    pthread_detach(pthread_self());
    FILE * fp;
    unsigned char data[4096];
    stage_t * ip_thread = get_ip_thread();
    if((fp = fopen("data","r")) == NULL)
    {
        printf("Error When open data file\n");
        pthread_exit(NULL); /*NOTICE: pthread_exit 不应该是局部变量。*/
    }
    int i = 0;
    int size = strlen(tcp_hdr);
    while(1)
    {
        pthread_mutex_lock(&ip_thread->mutex);
        /*
        * 什么IP 正忙?哈哈,那么我只有等了。
        * 是的data_ready的状态说明正忙。
        * */
        while(ip_thread->data_ready)
        {
            pthread_cond_wait(&ip_thread->ready,&ip_thread->mutex); 
        }

        /*
        * 开始准备数据。
        * */
        if(fgets(data,sizeof(data),fp) == NULL)
        {
            break;
        }
        int size2 = strlen(data);
        unsigned char * next_data = malloc(size + size2 + 1);
        strcpy(next_data,tcp_hdr);
        strcpy(next_data + size,data);
        /* 数据封装完毕 */
        ip_thread->data->buffer = next_data;
        ip_thread->data->length = size + size2;

        
        ip_thread->data_ready = 1; 

        pthread_cond_signal(&ip_thread->avail);
        /*
        * 呵呵,某种意义上,我这个时候是不是也应该告诉我的上级,我也做好准备了?
        * 但是,我没有上级,哈哈哈。
        * */
        pthread_mutex_unlock(&ip_thread->mutex);
        
    }
    printf("tcp_thread stoped\n");
    
}
void * pipe_add_ip_header(void * arg)
{
    stage_t * ip_stage = get_ip_thread();
    pthread_mutex_lock(&ip_stage->mutex);
    
    int size = strlen(ip_hdr);
    while(1)
    {
        /* 数据没准备好 
        *  看起来用data_ready来表示是否busy很科学的样子,我这样做反而不好了。
        * */
        while(ip_stage->data_ready != 1)
        {
            pthread_cond_wait(&ip_stage->avail,&ip_stage->mutex);
        }
        /*
        * 
        * */

        int size2 = ip_stage->data->length;
        unsigned char * next_data = malloc(size + size2 + 2);
        strcpy(next_data,ip_hdr);
        *(next_data + size) =  ;
        strcpy(next_data + size + 1,ip_stage->data->buffer);
        /* 数据封装完毕 */
/*********************************************************************/
        stage_t * eth_thread = get_eth_thread();
        pthread_mutex_lock(&eth_thread->mutex);
        while(eth_thread->data_ready)
        {
            pthread_cond_wait(&eth_thread->ready,&eth_thread->mutex);
        }
        eth_thread->data->buffer = next_data;
        eth_thread->data->length = size + size2;

        eth_thread->data_ready = 1;
        pthread_cond_signal(&eth_thread->avail);
        pthread_mutex_unlock(&eth_thread->mutex);

/*********************************************************************/

        /*
        * 我自己呢?data_ready = 0,并且释放信号。
        * */
        ip_stage->data_ready = 0;
        pthread_cond_signal(&ip_stage->ready);
    }

}
void * pipe_add_ethdr(void * arg)
{
    pthread_detach(pthread_self());
    stage_t * eth_stage = get_eth_thread();
    pthread_mutex_lock(&eth_stage->mutex);
    while(1)
    {
        while(eth_stage->data_ready != 1)
        {
            pthread_cond_wait(&eth_stage->avail,&eth_stage->mutex);
        }
        printf("Data is :%s\n",eth_stage->data->buffer);
        eth_stage->data_ready = 0;
        pthread_cond_signal(&eth_stage->ready);
    }
}
static stage_t * get_tcp_thread()
{
    return tcp_thread_t; 
}
static stage_t * get_ip_thread()
{
    return ip_thread_t;
}
static stage_t * get_eth_thread()
{
    return eth_thread_t;
}
int main(int argc, char ** argv)
{
    eth_thread_t = malloc(sizeof(stage_t));
    ip_thread_t = malloc(sizeof(stage_t));
    tcp_thread_t = malloc(sizeof(stage_t));

    eth_thread_t->data = malloc(sizeof(buffer_t));
    ip_thread_t->data = malloc(sizeof(buffer_t));
    tcp_thread_t->data = malloc(sizeof(buffer_t));

    pthread_mutex_init(&eth_thread_t->mutex,NULL);
    pthread_mutex_init(&ip_thread_t->mutex,NULL);
    pthread_mutex_init(&tcp_thread_t->mutex,NULL);

    pthread_cond_init(&eth_thread_t->avail,NULL);
    pthread_cond_init(&ip_thread_t->avail,NULL);
    pthread_cond_init(&tcp_thread_t->avail,NULL);

    pthread_cond_init(&eth_thread_t->ready,NULL);
    pthread_cond_init(&ip_thread_t->ready,NULL);
    pthread_cond_init(&tcp_thread_t->ready,NULL);

    pthread_create(&eth_thread_t->thread,NULL,pipe_add_ethdr,NULL);
    pthread_create(&ip_thread_t->thread,NULL,pipe_add_ip_header,NULL);
    pthread_create(&tcp_thread_t->thread,NULL,pipe_add_tcp_header,NULL);

    pthread_exit(NULL);
    
    
}
[多线程学习笔记] 流水线

 

线程工作有三个基本模式:流水线,工作组,客户/服务器模型。

三个基本模式可以相互组合,比如流水线和工作组就可以组合到一块。

书上的例子还是可以看的,我也第一次有实体这个概念,

通过结构体把线程,互斥量,条件变量,谓词分装到一块真是个不错的注意。

我简化了书上的列子(或者更复杂),模拟了TCP到ether层传输的过程,每经过一层,它们都打上自己的标志,然后传给下一层。

假设流水线上有A-BC-D四个等级,由于A是流水线的开头,D是流水线的结尾,所以它们两个有所不同,但是流水线中间的工作方式都是类似的。

A: 它检测下一级流水线是否做好准备,假如做好准备了,就把数据发给下一层,并且告诉下一层数据准备好了。

注意这里的概念,上级要知道下级做好准备了才会发送数据,但是发送了数据之后又必须告诉下级数据准备好了。

看起来,上级看下级准备好了通知下级,下级一直处于接受状态,并不需啊哟上级告诉下级数据准备好了就可以,因为隐私的说,下级准备好了就可以接受数据了。

其实不可以,因为你下级准备好,我上级不一定准备好,你不能自己取数据,你可能取得错误的数据,必须,我知道你准备好了,我这边开始准备,然后再通知你。

 

那么,上级先准备好,然后再等下级可不可以呢?看似可以,但是上级的数据从哪里来的呢?上级也是流水线中的一级。

A的执行流是:

wait for next not busy.

process

tell next is ok.

当然,由于是流水线的头,它可能更灵活一些。

 

B. B首先检测数据是否可用,可用,则处理数据,然后看看C是否准备好了吗,假如C没有准备好就一直等,注意B等C的时候,B也不接受新的数据了。

   当C可用后,B把数据发给C,告诉C准备好了,然后告诉A你又可以给我发数据了。

    wait for prev send me data.

    .... (I am in busy mode in fact)

    wait for next ready.

    tell next is ok

    tell pev is not busy now.

 

D:tail的处理模式是,

     wait for prev send me data

     ....(I am in busy mode)

     tell prev is not busy now.

 

总结:

流水线的上一级还是修改了下一级的数据结构,这个很不好,但是假如不修改的话,应该是上一级告诉下一级数据好了的时候,是不是应该告诉下一级去哪里取数据?

[多线程学习笔记] 流水线,布布扣,bubuko.com

[多线程学习笔记] 流水线

上一篇:javascript总结--2014-04-17


下一篇:服务调用方案(Spring Http Invoker) - 我们到底能走多远系列(40)