当前位置:Linux教程 - Linux - UNIX多线程数据共享与线程同步

UNIX多线程数据共享与线程同步



        


    在UNIX中,一个进程让另外实体进行某项事务而采取的操作为fork的一个子进程,子进程只是将父进程的数据区拷贝一份到自己的数据区。在符合POSIX标准的UNIX操作系统下,同一个进程的线程之间共享进程指令、大多数数据(线程私有数据除外)、信号处理方式、进程运行环境等。由于线程共享进程的全局变量,因此可以采用用户自己编写的消息队列来实现数据的共享。


      建立多任务模型,并用线程来实现

      符合POSIX标准的UNIX操作系统提供了线程的控制函数,如:线程的创建和终止、线程之间的互斥、线程之间的同步等。利用这些系统函数可以成功地模拟消息队列,来实现线程间数据共享和同步,以完成多任务的实时性。为成功地描述线程间数据共享和同步,以下列任务模型为例。

      首先建立消息队列属性数据结构

       #define MAXQUEUE 30

       typedef struct mq_attrib {

       char name[20];

       pthread_mutex_t mutex_buff;

       pthread_mutex_t mutex_cond;

       pthread cond_t cond;

       int maxElements;

       int elementLength;

       int curElementNum;

       caddr_t buff;

      }mq_attrib,mq_attribstruct,mq_attrib_t;

       mq_attrib_t msqueue[MAXQUEUE];

      数据结构定义了消息队列的名字name,最大消息个数maxElements,单个消息长度elementLength,当前消息个数curElementNum,存放消息的缓冲区buff,保护缓冲区锁mutex_buff,线程同步条件变量cond,保护线程同步条件变量锁mutex_cond。

      消息队列的创建

      依据此数据结构进行消息队列的创建,函数为msqueue_create(参数解释:name消息队列名,maxnum消息的最大个数,length单个消息的长度)。

      int msqueue_create( name, maxnum, length )

      charname;

      int maxnum,length;

      {

       int i;

       for ( i=0; i

       if ( msqueue[i]==NULL )break;

       //如果消息队列全部被分配,返回错

       if ( i==MAXQUEUE ) return MQERROR;

       msqueue[i]=malloc(sizeof(mq_attribstruct));

       sprintf( msqueue[i]->name, "%s", name);

       msqueue[i]->maxElements = maxnum;

       msqueue[i]->elementLength = length;

       msqueue[i]->curElementNum = 0;

       msqueue[i]->buff=malloc(maxnumlength);

       //对保护锁进行初始化

       pthread_mutex_init(&&msqueue[i]

      ->mutex_buff, NULL);

       pthread_mutex_init(&&msqueue[i]

      ->mutex_cond, NULL);

       //对线程同步条件变量初始化

       pthread_cond_init(&&msqueue[i]->cond, NULL);

       return i;

      }

      应用消息队列进行消息的发送和接收

      发送消息到消息队列:

      消息队列的发送和接收是在不同的线程中进行的。首先介绍发送消息到消息队列的函数:

      int msqueue_send ( id, buff, length )

      int id, length;

      caddr_t buff;

      {

       int pos;

       //消息队列id错,返回错

       if ( id<0 || id >= MAXQUEU ) return MQERROR;

       //消息长度与创建时的长度不符,返回错

       if ( length != msqueue[id]->elementLength ) return MQERROR;

       //消息队列满,不能发送

       if ( msqueue[id]->curElementNum >= msqueue[id]->maxElements )

       return MQERROR;

       //在对消息队列缓冲区操作前,锁住缓冲区,以免其他线程操作

       pthread_mutex_lock ( &&msqueue[id]->mutex_buff );

       pos = msqueue[id]->curElementNum * msqueue[id]->elementLength;

       bcopy ( buff, &&msqueue[id]->buff[pos], msqueue[id]->elementLength );

       msqueue[id]->curElementNum ++;

       pthread_mutex_unlock ( &&msqueue[id]->mutex_buff );

       //如果插入消息前,消息队列是空的,插入消息后,消息队列为非空,则通知等待从消息队列取消息的线程,条件满足,可以取出消息进行处理

       if ( msqueue[id]->curElementNum == 1 ) {

       pthread_mutex_lock ( &&msqueue[id]->mutex_cond );

       pthread_cond_broadcast ( &&msqueue[id]->cond );

       pthread_mutex_unlock ( &&msqueue[id]->mutex_cond );

       }

       return length;

      }

      从消息队列中接收消息:

      消息队列的接收函数msqueue_receive,其参数:id为消息队列数组的索引号,buff为消息内容,length为消息长度。

      int msqueue_receive ( id, buff, length )

      int id, length;

      caddr_t buff;

      {

       caddr_t temp;

       int pos;

       if(id<0||id>=MAXQUEUE)return MQERROR;

       if(length != msqueue[id]->elementLength)

      return MQERROR;

       //如果消息队列为空,则等待,直到消息队列为非空条件满足

       if ( msqueue[id]->curElementNum == 0){

       pthread_mutex_lock ( &&msqueue[id]->mutex_cond );

       pthread_cond_wait ( &&msqueue[id]->cond, &&msqueue[id]->mutex_cond );

       pthread_mutex_unlock ( &&msqueue[id]->mutex_cond );

       }

       //取消息前,锁住消息队列缓冲区,以免其他线程存放或取消息

       pthread_mutex_lock ( &&msqueue[id]->mutex_buff );

       //为符合消息队列FIFO特性,取出消息后,进行消息队列的调整

      temp =

      malloc((msqueue[id]->curElementNum-1)

      msqueue[id]-elementLength );

       bcopy ( &&msqueue[id]->buff[0], buff, msqueue[id]->elementLength );

       msqueue[id]->curElementNum --;

       bcopy ( &&msqueue[id]->buff[msqueue[id]->elementLength], temp,

       msqueue[id]->elementLength

      msqueue[id]->curElementNum);

       bcopy ( temp, &&msqueue[id]->buff[0],

       msqueue[id]->elementLength

      msqueue[id]->curElementNum);

       free ( temp );

       //解除缓冲区锁

       pthread_mutex_unlock ( &&msqueue[id]->mutex_buff );

       return length;

      }

      多任务模型的实现

      在讨论完消息队列的创建、删除、发送和接收后,下面讲述消息队列在线程中的应用以实现多任务线程间的数据共享。

      首先在main主函数中创建消息队列和线程:

      //定义全局变量

      Int msqueue_record, msqueue_process;

      Void main()

      {

       pthread_t pthreadID1;

       //创建消息队列,用于线程间通信

       msqueue_record = msqueue_create (“record”, 200, 200);

       msqueue_process = msqueue_create (“process”, 200, 200);

       //创建数据采集线程

       pthread_create ( &&pthreadID1, NULL, receiveData, NULL);

       //创建数据处理线程

       pthread_create ( &&pthreadID2, NULL, process, NULL);

       //创建数据记录线程

       pthread_create ( &&pthreadID1, NULL, record, NULL);

       //等待进程结束

       wait_thread_end( );

      }

      数据采集线程:

      voidreceiveData( )

      {

       int count;

       unsigned char buff[200];

       for(;;) {

      //从数据口采集数据,并将数据放置于buff中

      //wait_data_from_data_port( buff )

      //将数据写入消息队列msqueue_record中

      msqueue_send ( msqueue_record, buff, 200 );

      //将数据写入消息队列msqueue_process中

      msqueue_send ( msqueue_process, buff, 200 );

       }

      }

      记录线程函数:

      voidrecord ( )

      {

       int num, count;

      unsigned char buffer[200];

       for ( ;; ) {

      count = msqueue_receive ( msg_record, &&buffer, 200 );

      if ( count < 0) {

      perror ( "msgrcv in record");

      continue;

      }

       //将取到的消息进行记录处理

       //record_message_to_lib();

       }

      }

      数据处理线程函数:

      int process( )

      {

      int count;

       unsigned char buffer[200];

       for ( ;; ) {

       count = msqueue_receive ( msg_process, &&buffer, 200 );

      if ( count < 0) {

       perror ( "msgrcv in record");

       continue;

       }

       //将取到的消息进行处理

       //process_message_data()

       }

      }

      在实现多任务系统时,作者曾经做过以下三种实现方法的比较:进程间通信采用IPC机制,线程间通信采用进程通信方式IPC,线程间通信采用基于作者开发的消息队列。结果表明:利用用户下的数据区进行线程间通信的速度最快,效率最高,而IPC方式慢。

      

    (作者联系方式:[email protected])
    发布人:netbull 来自:新浪科技