`
897371388
  • 浏览: 526897 次
文章分类
社区版块
存档分类
最新评论

实现基于VxWorks的消息队列通信机制的C/S通信

 
阅读更多

写在前面


要实现如图所示的C/S通信需要掌握的知识有:

1.VxWorks的消息队列的API(具体内容可以在VxWorks内核源码的目录下的src/wv/msgQlib.c这个文件中找到)

主要用到的方法有:msgQCreate,msgQSend,msgQReceive

MSG_Q_ID msgQCreate
    (
    int         maxMsgs,        /* max messages that can be queued */
    int         maxMsgLength,   /* max bytes in a message */
    int         options         /* message queue options */
    )

STATUS msgQSend
    (
    FAST MSG_Q_ID       msgQId,         /* message queue on which to send */
    char *              buffer,         /* message to send */
    FAST UINT           nBytes,         /* length of message */
    int                 timeout,        /* ticks to wait */
    int                 priority        /* MSG_PRI_NORMAL or MSG_PRI_URGENT */
    )
int msgQReceive
    (
    FAST MSG_Q_ID       msgQId,         /* message queue from which to receive */
    char *              buffer,         /* buffer to receive message */
    UINT                maxNBytes,      /* length of buffer */
    int                 timeout         /* ticks to wait */
    )

2.信号量。VxWorks主要有三种类型的信号量:二进制信号量、互斥信号量和计数信号量,还提供POSIX信号量。信号量可以用来实现任务间通信、同步和互斥。

3.Vxworks任务的调度策略。主要是采用基于优先级的抢占调度策略,同时还可使用轮转(RR)调度算法。因为基于优先级的调度可以很好的体现嵌入式操作系统的实时性需求,然后有一个缺陷是,如果多个同优先级任务需要共享一个处理器,某任务永远占有CPU,不阻塞,其他任务将没有得到调度的机会,因此需要与RR调度算法结合使用。


实现原理

具体分析如下:

1.定义了三个消息队列,包括一个请求队列、两个应答队列。(如果需要实现双工通信的话,必须有两个队列,一个队列是无法双工通信的)

2.定义了消息的结构体,包括任务的tid和发送的消息值。

3.定义了三个任务,包括一个服务端任务、两个客户端任务。

4.定义了两个信号量,实现同步,控制程序的执行顺序。首先根据优先级不同,客户端任务将被先执行,采用信号量控制使客户端一先向请求队列发送三个消息,接着客户端二向请求队列发送三个消息。接下来服务端读取请求队列的消息,根据消息结构体中的tid值给相对应的客户端的应答队列反馈信息,信息内容为客户端发送信息内容值的平方。根据信号量的操作,客户端一将先从应答队列一读取消息,客户端一完成这一过程后,客户端二才能从应答队列二读取消息。

5.删除消息队列和删除信号量。


实现代码

具体实现C/S通信的代码如下:

/**
	@author zouliping
**/

#include "vxWorks.h"
#include "taskLib.h"
#include "msgQLib.h"
#include "stdio.h"
#include "string.h"
#include "sysLib.h"
#include "semLib.h"

#define CLIENT_TASK_PRI 100
#define SERVER_TAST_PRI 101
#define TASK_STACK_SIZE 50000

typedef struct message
{
	int tid;
	int value;
}msg;

LOCAL MSG_Q_ID requestQ;
LOCAL MSG_Q_ID responseQ1;
LOCAL MSG_Q_ID responseQ2;

LOCAL BOOL notDone;
LOCAL int msgNum = 3;

LOCAL SEM_ID clientSem1;
LOCAL SEM_ID clientSem2;

LOCAL STATUS mClientTask1();
LOCAL STATUS mClientTask2();
LOCAL STATUS mServerTask();

STATUS CS(void)
{
	notDone = TRUE;
	/*创建应答和响应消息队列*/
	if((requestQ = msgQCreate(2 * msgNum,sizeof(msg),MSG_Q_FIFO)) == NULL)
	{
        perror("Error in creating requestQ");
    }
    if((responseQ1 = msgQCreate(msgNum,sizeof(msg),MSG_Q_FIFO)) == NULL)
	{
        perror("Error in creating responseQ1");
    }
    if((responseQ2 = msgQCreate(msgNum,sizeof(msg),MSG_Q_FIFO)) == NULL)
	{
        perror("Error in creating responseQ2");
    }

    /*创建两个信号量*/
	if ((clientSem1 = semBCreate (SEM_Q_PRIORITY, SEM_FULL)) == NULL)
	{ 
		perror ("Error in creating cSemId1 semaphore"); 
		return (ERROR);
	}
	if ((clientSem2 = semBCreate (SEM_Q_PRIORITY, SEM_FULL)) == NULL)
	{ 
		perror ("Error in creating cSemId2 semaphore"); 
		return (ERROR);
	}

    /*创建两个client和一个server任务*/
    if(taskSpawn("tClientTask1",CLIENT_TASK_PRI,0,TASK_STACK_SIZE,(FUNCPTR)mClientTask1,0,0,0,0,0,0,0,0,0,0) == ERROR){
        perror("serverTask:Error in spawning mClientTask1");
        return (ERROR);
    }
    if(taskSpawn("tClientTask2",CLIENT_TASK_PRI,0,TASK_STACK_SIZE,(FUNCPTR)mClientTask2,0,0,0,0,0,0,0,0,0,0) == ERROR){
        perror("serverTask:Error in spawning mClientTask2");
        return (ERROR);
    }
    if(taskSpawn("tServerTask",SERVER_TAST_PRI,0,TASK_STACK_SIZE,(FUNCPTR)mServerTask,0,0,0,0,0,0,0,0,0,0) == ERROR){
        perror("serverTask:Error in spawning tServerTask");
        return (ERROR);
    }

    while (notDone)
    {
    	taskDelay (sysClkRateGet());
    }
	
	/*删除消息队列*/
	if(msgQDelete(requestQ) == ERROR || msgQDelete(responseQ1) == ERROR || msgQDelete(responseQ2) == ERROR)
	{
            perror("Error in deleting msgQ");
            return (ERROR);
    }

    /*删除信号量*/
    if (semDelete(clientSem1) == ERROR || semDelete(clientSem2) == ERROR)
    {
		perror ("Error in deleting semaphore"); 
		return (ERROR);
	}

	return (OK);
} 

/*服务端任务*/
STATUS mServerTask(void)
{
	msg sendItem;
	msg receiveItem;
	int i;

	printf ("mServerTask started: task id = %#x.\n", taskIdSelf());


	for(i = 0;i < 2 * msgNum;i++)
	{
		/*从客户端接受消息,取出请求队列中的消息*/
        if((msgQReceive(requestQ,(char *)&receiveItem,sizeof(receiveItem),WAIT_FOREVER)) == ERROR)
        {
            perror("Error in receiving the message");
            return (ERROR);
        }
        else
		{
            printf("mServerTask:get message of value %d from mClientTask%d.\n",receiveItem.value,receiveItem.tid);
		}
	

		/*给两个客户端发送反馈消息*/
		sendItem.tid = taskIdSelf();
		sendItem.value = (receiveItem.value) * (receiveItem.value);
		if(receiveItem.tid == 1)
		{
			if((msgQSend(responseQ1,(char *)&sendItem,sizeof(sendItem),WAIT_FOREVER,MSG_PRI_NORMAL)) == ERROR){
	            perror("Error in sending the message to responsQ1");
	            return(ERROR);
	        }
	        else
	            printf("mServerTask sending to mClientTask1:server tid = %d, mClientTask1 getting message = %d \n",taskIdSelf(),sendItem.value);

		}
		else if(receiveItem.tid == 2)
		{
			if((msgQSend(responseQ2,(char *)&sendItem,sizeof(sendItem),WAIT_FOREVER,MSG_PRI_NORMAL)) == ERROR){
	            perror("Error in sending the message to responsQ1");
	            return(ERROR);
	        }
	        else
	            printf("mServerTask sending to mClientTask2:server tid = %d, mClientTask2 getting message = %d \n",taskIdSelf(),sendItem.value);
		}
	}

	semGive(clientSem1);

	return (OK);
}

/*客户端一任务*/
STATUS mClientTask1(void)
{
	msg sendItem;
	msg receiveItem;
	int i;

	printf ("mClientTask1 started: task id = %#x.\n", taskIdSelf());

	semTake (clientSem1, WAIT_FOREVER);

	/*client1发送三个消息到服务端*/
	for(i = 0;i < msgNum;i++)
	{
		sendItem.tid = 1;
		sendItem.value = i;

		if((msgQSend(requestQ,(char *)&sendItem,sizeof(sendItem),WAIT_FOREVER,MSG_PRI_NORMAL)) == ERROR)
		{
            perror("Error in sending the message");
            return(ERROR);
        }
        else
            printf("mclientTask1 sending message to server: tid = %d,sending message = %d.\n",taskIdSelf(),i);
	}

	semGive(clientSem2);

	semTake (clientSem1, WAIT_FOREVER);

	/*从服务端接受消息*/
	for(i = 0;i < msgNum;i++)
	{
		if((msgQReceive(responseQ1,(char *)&receiveItem,sizeof(receiveItem),WAIT_FOREVER)) == ERROR)
		{
            perror("Error in sending the message");
            return(ERROR);
        }
        else
            printf("mclientTask1 receiving message from server: tid = %d,receiving message =  %d.\n",receiveItem.tid,receiveItem.value);
	}

	semGive(clientSem2);

	return (OK);
}

/*客户端二任务*/
STATUS mClientTask2(void)
{
	msg sendItem;
	msg receiveItem;
	int i;

	printf ("mClientTask2 started: task id = %#x.\n", taskIdSelf());
	semTake (clientSem2, WAIT_FOREVER);

	/*client2发送三个消息到服务端*/
	for(i = 0;i < msgNum;i++)
	{
		sendItem.tid = 2;
		sendItem.value = i;

		if((msgQSend(requestQ,(char *)&sendItem,sizeof(sendItem),WAIT_FOREVER,MSG_PRI_NORMAL)) == ERROR)
		{
            perror("Error in sending the message");
            return(ERROR);
        }
        else
            printf("mclientTask2 sending message to server: tid = %d,sending message = %d.\n",taskIdSelf(),i);
	}

	semTake (clientSem2, WAIT_FOREVER);

	/*从服务端接受消息*/
	for(i = 0;i < msgNum;i++)
	{
		if((msgQReceive(responseQ2,(char *)&receiveItem,sizeof(receiveItem),WAIT_FOREVER)) == ERROR)
		{
            perror("Error in sending the message");
            return(ERROR);
        }
        else
            printf("mclientTask2 receiving message from server: tid = %d,receiving message = %d.\n",receiveItem.tid,receiveItem.value);
	}

	notDone = FALSE;

	return (OK);
}

运行结果

代码运行结果如下:

在Tornado中的vxsim打印结果信息

windview演示结果



分享到:
评论

相关推荐

    MSG.rar_vxworks queue_消息队列 _消息队列 C

    vxworkws消息队列通信,实现了vxworks消息队列通信的机制

    基于VxWorks的微机保护装置网络通信设计

    针对微机保护系统对采样数据传输的实时性需要,给出了一种基于VxWorks嵌入式实时操作系统的...该方案利用VxWorks的灵活通信机制,并通过建立基于双缓冲消息队列的客户端/服务器通信模型来提高系统的实时性和可靠性。

    VxWorks嵌入式实验报告.pdf

    它们之间使用消息队列实现任务间的通信,tClientTask1 和 tClientTask1 发送消息(函数名,如 add 和参数),tServerTask 接收消息,在服务器任务上执行函数(如实现加法运算),将执行的 结果通过消息队列返回给 ...

    北京中科信软实时嵌入式系统 VxWorks培训1

    (RTOS)任务间的通信机制。 异常、中断和定时器程序设计。 实时应用系统设计概要; VxWorks系统概述 VxWorks操作系统的配置和启动 VxWorks组件概述 VxWorks文件系统 VxWorks网络系统、 VxWorks图形...

    Vxworks培训资料.pptx

    Vxworks提供了一套丰富的任务间通信机制,包括: 内存共享(Shared memory):简单的数据共享方法 信号量(Semaphore):用于基本的互斥及同步 消息队列(Message queues)和管道(pipe): 用于同一CPU上任务间...

    VxWorks操作系统指南

    1.4.4. 消息队列 1.4.5. 管道 1.4.6. 系统实现 1.5.网络通信 1.5.1. 套接口(Sockets) 1.6.中断服务程序 1.7.时间管理器 2.VxWorks应用指导 2.1.系统启动 2.1.1. 启动盘的制作 2.1.1. 主机Tornado环境...

    VxWorks下的多重定时器设计

    本文针对VxWorks下UDP网络通信中的可靠传输问题,提出了一个支持重传和定时等待确认的协议,并利用VxWorks系统提供的信号量同步、消息队列和看门狗定时器等多种机制,综合设计了一种可扩展的三重定时器。

    Vxworks资料

    消息队列  1.4.5. 管道  1.4.6. 系统实现 1.5.网络通信 11  1.5.1. 套接口(Sockets) 1.6.中断服务程序 14 1.7.时间管理器 14 2.VxWorks应用指导 16 2.1.系统启动 17  2.1.1. 启动盘的制作  2.1.1. 主机...

    基于VxWorks下的多重定时器设计

    VxWorks是一种嵌入式实时操作系统(RTOS),具有内核小、可...为了实现基于UDP网络的可靠通信,本文利用VxWorks的多种任务间通信机制和看门狗定时器机制,设计了一种多重定时器模型,该模型可以确保数据包的可靠传递。

    VxWorks实验指导书

    从Tornado2.2的使用讲到交叉编译环境的构建,从BSP的定制讲到VxWorks系统API的使用,手把手教你学会VxWorks编程。 目录如下: 第一章 Wind River ........................................................... 4 ...

    嵌入式实时系统任务内调度的策略和实现

    摘要介绍了任务内调度的策略和实现,其利用嵌入式实时系统的任务调度机制和消息队列构造以进程为单位的二级调度策略,支持用状态、信号描述的SDL进程的C程序语言编程结构。它的精简和高效,对提高处理能力和系统整体...

    华为编程开发规范与案例

    P=D/S D=DA+0.5DB+0.25DC 其中: P -问题发生率 D -1个季度内错误总数 DA -1个季度内A类错误总数 DB -1个季度内B类错误总数 DC -1个季度内C类错误总数 S -1个季度内收到问题报告单总数 1...

Global site tag (gtag.js) - Google Analytics