【LINUX/UNIX网络编程】之使用消息队列,信号量和命名管道实现的多进程服务器(多人群聊系统)

RT,使用消息队列,信号量和命名管道实现的多人群聊系统。

本学期Linux、unix网络编程的第三个作业。

先上实验要求:

实验三  多进程服务器

【实验目的】

1、熟练掌握进程的创建与终止方法;

2、熟练掌握进程间通信方法;

2、应用套接字函数完成多进程服务器,实现服务器与客户端的信息交互。

【实验学时】

4学时

【实验内容】

通过一个服务器实现最多5个客户之间的信息群发。

服务器显示客户的登录与退出;

客户连接后首先发送客户名称,之后发送群聊信息;

客户输入bye代表退出,在线客户能显示其他客户的登录于退出。

任务分析:

实现提示:

1、服务器端:

服务器进程称之为主进程,主进程创建一个转发子进程和最多5个通信子进程。

主进程与转发子进程之间:

信号量(初值5,主进程接受一个客户连接后执行P操作判断是否超过5,转发子进程有一个客户退出后执行V操作,并发消息队列标识符)

命名管道SERVER(转发子进程将可用的消息队列标识符写入管道,主进程从管道中读取消息队列标识符)

转发子进程与通信子进程之间:

命名管道CLIENT(通信子进程向命名管道写入客户端发来的消息,转发子进程从管道中读取消息并发送给对应的客户端)

消息队列(转发子进程将客户发来的信息通过消息队列发送给每个通信子进程)

(1)主进程:

从转发子进程获取一个可用的消息队列标识符;

接收客户连接请求,如果连接数超过最大连接数,向客户发送退出标志,否则发送OK标志;

每接受一个连接,创建一个通信子进程并将连接socket、消息队列标识符、客户地址传递给通信子进程。

(2)通信子进程:

创建一个子进程负责从消息队列中读取消息,发送给客户。

通信子进程负责接收客户发来信息,通过命名管道CLIENT发送给转发子进程;

若信息为用户名,附带消息队列、客户地址发送给转发子进程;

若信息为退出,终止子进程,程序结束

(3)转发子进程:

创建5个消息队列;

维护客户信息表:消息队列、客户名、客户IP、客户端口、状态。

从命名管道CLIENT中读取通信子进程发来的消息,消息类型为:用户名、退出及一般信息;

若为用户名,依据消息队列在更新客户信息表,状态为可用;

若为一般信息,将信息转换后写入可用客户的消息队列,等待其他通信子进程读取;

若为退出,在客户信息表中状态设为不可用,执行信号量V操作,并将可用客户的消息队列标识符写入到命名管道SERVER;

2、客户端:

根据用户从终端输入的服务器IP地址及端口号连接到相应的服务器;

连接成功后,先发送客户名称;

创建一个子进程负责接收服务器发来的信息,并显示;

主进程循环从终端输入信息,并将信息发送给服务器;

当发送给服务器为bye后,关闭子进程,程序退出。

架构看起来很复杂,我们可以绘制一下流程图方便理清思路。

【LINUX/UNIX网络编程】之使用消息队列,信号量和命名管道实现的多进程服务器(多人群聊系统)

【LINUX/UNIX网络编程】之使用消息队列,信号量和命名管道实现的多进程服务器(多人群聊系统)

在word里面截图不是很清晰啊。。。

开始写代码吧:首先clientmsg.h,它定义了一些消息的操作符(OP)和CLIENTMSG这个结构体(用于服务器和客户端之间传递消息)

 //CLIENTMSG between server and client
#ifndef _clientmsg
#define _clientmsg //USER MSG EXIT for OP of CLIENTMSG
#define EXIT -1
#define USER 1
#define MSG 2
#define OK 3 #ifndef CMSGLEN
#define CMSGLEN 100
#endif struct CLIENTMSG{
int OP;
char username[];
char buf[CMSGLEN];
}; #endif

然后实现一下servermsg.h,用于服务器内部的转发子进程和通信子进程之间的消息传递。

 //SERVERMSG for communicate to translate
//MESSAGE for translate to communicate
#ifndef _servermsg
#define _servermsg #include <netinet/in.h>
#include "clientmsg.h" #ifndef CMSGLEN
#define CMSGLEN 100
#endif struct SERVERMSG{
int OP;
char username[];
char buf[CMSGLEN];
struct sockaddr_in client;
int stat;
int qid;
}; struct MESSAGE{
long msgtype;
struct SERVERMSG msg;
}; #endif

由于需要操作信号量,所以将一些信号量的操作做成函数

semaphore.h

 #ifndef _semaphore
#define _semaphore union semun
{
int val;
struct semid_ds *buf;
unsigned short *array;
}; int CreateSem(key_t key,int value);
int Sem_P(int semid);
int Sem_V(int semid);
int GetvalueSem(int semid);
void DestroySem(int semid); #endif

对函数的实现:semaphore.c

 #include <stdlib.h>
#include <fcntl.h>
#include <sys/sem.h>
#include "semaphore.h" int CreateSem(key_t key,int value)
{
union semun sem;
int semid;
sem.val=value;
semid=semget(key,,IPC_CREAT);
if (semid==-){
perror("semget error"); exit();
}
semctl(semid,,SETVAL,sem);
return semid;
} int Sem_P(int semid)
{
struct sembuf sops={,-,IPC_NOWAIT};
return (semop(semid,&sops,));
} int Sem_V(int semid)
{
struct sembuf sops={,+,IPC_NOWAIT};
return (semop(semid,&sops,));
} int GetvalueSem(int semid)
{
union semun sem;
return semctl(semid,,GETVAL,sem);
}
void DestroySem(int semid)
{
union semun sem;
sem.val=; semctl(semid,,IPC_RMID,sem);
}

接下来是非常重要的服务器端实现(里面有很多调试信息,比较懒没有删掉,直接在里面注释掉了。)

server.c

 #include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/ipc.h>
#include "semaphore.h"
#include "servermsg.h" void trans_process(int semid);
void communicate_process(int connetfd,int qid,struct sockaddr_in client); int main(){ struct sockaddr_in server;
struct sockaddr_in client;
int listenfd,connetfd;
char ip[];
int port;
int addrlen;
struct CLIENTMSG clientMsg;
int ret,status;
/*---------------------socket-------------------*/
if((listenfd = socket(AF_INET,SOCK_STREAM,))== -){
perror("socket() error\n");
exit();
} /*----------------------IO-----------------------*/
printf("Please input the ip:\n");
scanf("%s",ip);
printf("Please input the port:\n");
scanf("%d",&port); /*---------------------bind----------------------*/
bzero(&server,sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr.s_addr = inet_addr(ip);
if(bind(listenfd,(struct sockaddr *)&server,sizeof(server))== -){
perror("bind() error\n");
exit();
} /*----------------------listen-------------------*/
if (listen(listenfd,)== -){
perror("listen() error\n");
exit();
} //创建命名管道
unlink("SERVER");
mkfifo("SERVER",O_CREAT);
int rd = open("SERVER",O_RDONLY|O_NONBLOCK);
int semid;
key_t k = ftok(".",'b');
semid = CreateSem(k,);
pid_t pid_1,pid_2;
pid_1 = fork();
if(pid_1 == ){
trans_process(semid);
exit();
}
else if(pid_1 > ){
while(){
addrlen = sizeof(client);
if((connetfd = accept(listenfd,(struct sockaddr *)&client,&addrlen))== -){
perror("accept() error\n");
exit();
}
ret = Sem_P(semid);
if(ret == ){
int qid;
read(rd,&qid,sizeof(qid));
//printf("qid1:%d\n",qid );
pid_2 = fork();
if (pid_2 > ){
close(connetfd);
waitpid(pid_2,&status,WNOHANG);
continue;
}
else if(pid_2 == ){
communicate_process(connetfd,qid,client);
exit();
}
else {
perror("the second fork error\n");
}
}
else {
clientMsg.OP = EXIT;
send(connetfd,&clientMsg,sizeof(clientMsg),);
close(connetfd);
}
waitpid(pid_1,&status,WNOHANG); }
}
else {
perror("first time fork error\n");
}
/*----------------------close-------------------*/
close(connetfd);
close(listenfd); return ;
} /*----------------------------函数实现区----------------------------*/
void trans_process(int semid){
struct SERVERMSG ent[];
struct MESSAGE sendMsg;
struct SERVERMSG msg;
int i;
for(i=;i<;i++){
ent[i].stat = ;
}
int wfd = open("SERVER",O_WRONLY|O_NONBLOCK);
for(i=;i<;i++){
key_t key = ftok(".",(char)i+);
ent[i].qid = msgget(key,IPC_CREAT);
write(wfd,&ent[i].qid,sizeof(ent[i].qid));
}
unlink("CLIENT");
mkfifo("CLIENT",O_CREAT);
int rfd = open("CLIENT",O_RDONLY|O_NONBLOCK);
int len;
while(){
bzero(&msg,sizeof(msg));
len = read(rfd,&msg,sizeof(msg));
//printf(" %d,%s ,%s\n",msg.OP,msg.username,msg.buf );
//sleep(3);
if(len > ){
if(msg.OP == USER){
for(i=;i<;i++){
if(ent[i].qid == msg.qid){
bcopy(msg.username,ent[i].username,strlen(msg.username));
ent[i].client = msg.client;
ent[i].stat = ;
break;
}
}
}
else if(msg.OP == EXIT){
for(i=;i<;i++){
if(ent[i].qid == msg.qid){
ent[i].stat = ;
write(wfd,&ent[i].qid,sizeof(ent[i].qid));
Sem_V(semid);
break;
}
}
}
//bzero(&sendMsg,sizeof(sendMsg));
sendMsg.msg = msg;
for(i=;i<;i++){
if(ent[i].stat == ){
printf("stat 1...\n");
int m_len = sizeof(msg);
int sta=msgsnd(ent[i].qid,&sendMsg,len,);
//printf("flag:%d\n",sta );
}
}
}
else {
continue;
}
} } void communicate_process(int connetfd,int qid,struct sockaddr_in client){
struct CLIENTMSG sendMsg;
struct CLIENTMSG recvMsg;
struct MESSAGE server_Msg;
struct SERVERMSG client_sndMsg;
struct SERVERMSG msg;
int status;
int wfd = open("CLIENT",O_WRONLY|O_NONBLOCK);
pid_t pid;
pid = fork();
if(pid < ){
perror("communicate_process fork error\n");
}
else if (pid == ){
bzero(&sendMsg,sizeof(sendMsg));
sendMsg.OP = OK;
send(connetfd,&sendMsg,sizeof(sendMsg),);
while(){
int m_len = sizeof(msg);
bzero(&server_Msg,sizeof(server_Msg));
int sta=msgrcv(qid,&server_Msg,m_len,,);
//printf("flag:%d\n",sta );
//printf("send..%d,%s,%s\n",server_Msg.msg.OP,server_Msg.msg.username,server_Msg.msg.buf );
bzero(&sendMsg,sizeof(sendMsg));
bcopy(server_Msg.msg.username,sendMsg.username,strlen(server_Msg.msg.username));
sendMsg.OP = server_Msg.msg.OP;
bcopy(server_Msg.msg.buf,sendMsg.buf,strlen(server_Msg.msg.buf));
//printf("send..%d,%s,%s\n",sendMsg.OP,sendMsg.username,sendMsg.buf );
send(connetfd,&sendMsg,sizeof(sendMsg),);
}
}
else{
while(){
bzero(&recvMsg,sizeof(recvMsg));
int len =recv(connetfd,&recvMsg,sizeof(recvMsg),);
if(len > ){
if(recvMsg.OP == USER){
printf("user %s login from ip:%s,port:%d\n",recvMsg.username,inet_ntoa(client.sin_addr),ntohs(client.sin_port) );
client_sndMsg.OP = USER;
}
else if(recvMsg.OP == EXIT){
printf("user %s is logout\n",recvMsg.username );
client_sndMsg.OP = EXIT;
write(wfd,&client_sndMsg,sizeof(client_sndMsg));
break;
}
else if(recvMsg.OP == MSG){
client_sndMsg.OP = MSG;
}
bzero(&client_sndMsg,sizeof(client_sndMsg));
bcopy(recvMsg.username,client_sndMsg.username,strlen(recvMsg.username));
bcopy(recvMsg.buf,client_sndMsg.buf,strlen(recvMsg.buf));
client_sndMsg.client = client;
//printf("qid2:%d\n",qid );
client_sndMsg.qid = qid;
client_sndMsg.OP = recvMsg.OP;
write(wfd,&client_sndMsg,sizeof(client_sndMsg)); }
else{
continue;
}
}
kill(pid,SIGKILL);
waitpid(pid,&status,WNOHANG);
close(wfd);
close(connetfd);
}
}

写出了服务端,就可以非常容易的写出客户端了。

client.c

 #include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <signal.h>
#include <unistd.h>
#include "clientmsg.h" int main(){
int sockfd;
char ip[];
int port;
int status;
pid_t pid;
struct sockaddr_in server;
struct CLIENTMSG clientMsg; /*---------------------socket---------------------*/
if((sockfd = socket(AF_INET,SOCK_STREAM,))== -){
perror("socket error\n");
exit();
} /*---------------------connect--------------------*/
printf("Please input the ip:\n");
scanf("%s",ip);
printf("Please input the port:\n");
scanf("%d",&port);
bzero(&server,sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
inet_aton(ip,&server.sin_addr);
if(connect(sockfd,(struct sockaddr *)&server,sizeof(server))== -){
perror("connect() error\n");
exit();
}
recv(sockfd,&clientMsg,sizeof(clientMsg),);
if(clientMsg.OP == OK){
int len;
pid = fork();
if(pid == ){
while(){
bzero(&clientMsg,sizeof(clientMsg));
len =recv(sockfd,&clientMsg,sizeof(clientMsg),);
if(len > ){
if(clientMsg.OP ==USER){
printf("the user %s is login.\n",clientMsg.username );
}
else if(clientMsg.OP == EXIT){
printf("the user %s is logout.\n",clientMsg.username);
}
else if(clientMsg.OP == MSG){
printf("%s: %s\n",clientMsg.username,clientMsg.buf );
}
}
}
exit(EXIT_SUCCESS);
}
else if(pid > ){
printf("Please input the username:\n");
scanf("%s",clientMsg.username);
clientMsg.OP = USER;
send(sockfd,&clientMsg,sizeof(clientMsg),);
while(){
clientMsg.OP = MSG;
scanf("%s",clientMsg.buf);
if(strcmp("bye",clientMsg.buf) == ){
clientMsg.OP = EXIT;
send(sockfd,&clientMsg,sizeof(clientMsg),);
break;
}
send(sockfd,&clientMsg,sizeof(clientMsg),); }
kill(pid,SIGKILL);
waitpid(pid,&status,WNOHANG);
}
else{
perror("fork error!\n");
}
}
else{
printf("以达到最大连接数!\n");
}
/*------------------------close--------------------------*/
close(sockfd); return ;
}

最后是makefile:

main:server.o client.o semaphore.o
gcc server.o semaphore.o -oserver
gcc client.o -oclient
server.o:server.c semaphore.h clientmsg.h servermsg.h
gcc -c server.c
client.o:client.c clientmsg.h
gcc -c client.c
semaphore.o:semaphore.h semaphore.c
gcc -c semaphore.c
clean:
rm -rf *.o

下面上一下演示过程:(测试环境,Red Hat Enterprise Linux 6 + centos系Linux,ubuntu下可能会有些问题。)

首先先把服务端启动开来,为了方便测试,这里直接使用的是127.0.0.1的localhost。

【LINUX/UNIX网络编程】之使用消息队列,信号量和命名管道实现的多进程服务器(多人群聊系统)

然后启动两个客户端用来测试,在用户登录的时候客户端会有消息提醒。服务端会有日志打印输出客户端的名字和登录ip、端口。

【LINUX/UNIX网络编程】之使用消息队列,信号量和命名管道实现的多进程服务器(多人群聊系统)

【LINUX/UNIX网络编程】之使用消息队列,信号量和命名管道实现的多进程服务器(多人群聊系统)

客户可以发送消息了,如图发送与接收均正常。可以同时启动<=5个客户端进行群聊,这里为了简单演示只是启动了2个。(修改信号量代码可以实现n多个客户的同时登陆):

【LINUX/UNIX网络编程】之使用消息队列,信号量和命名管道实现的多进程服务器(多人群聊系统)

输入bye以后即可退出聊天并下线。当有客户下线的时候,在线的客户端会收到下线提醒,客户端会有日志打印输出。

【LINUX/UNIX网络编程】之使用消息队列,信号量和命名管道实现的多进程服务器(多人群聊系统)

【LINUX/UNIX网络编程】之使用消息队列,信号量和命名管道实现的多进程服务器(多人群聊系统)

这个实验内容前前后后花了我2天才写完,刚开始没有弄清楚这一整套的工作机制与流程,写起来很是吃力,程序就是各种调不通。本来都想放弃了,但是后来还是咬咬牙坚持了一下来,饭要一口一口吃,程序要一点一点的写,万事不能操之过急,写代码一定要心平气和,头脑清晰。由于gdb调试工具用的不是很熟练,只能在程序里面一段一段的print变量来DEBUG,很是辛苦啊。

上一篇:转 可能是最漂亮的Spring事务管理


下一篇:UDP套接字编程