
信号量同步共享内存读写实例
前言
在前几篇文章讲述了共享内存和信号量,在共享内存的文章中两个进程不是并发运行的,而是线性运行等一个进程写完了退出了再启动另一个进程进行读操作,因为他们的操作不是原子的是不安全的。而前面又讲到了信号量可以进行进程间的同步,所以把信号量结合到共享内存中,就能在多进程进行共享内存的同步操作
目标
目前使用一个生产者和一个消费者的简单模型,同时读写一个100字节大小的共享内存数据区(不考虑环形缓冲区,读写完即止),消费者作为父进程,创建生产者子进程,生产者完成所有写操作之后退出。两个进程同时进行对共享内存的读写
代码实现
共享内存数据结构定义
笔者使用的是无名信号量,所以是需要共享内存支持才能在进程之间作用,所以把sem_t也封装到共享内存结构体里面
typedef struct shareData
{
sem_t sem; //64位下32字节,内部对齐是8的倍数
uint8_t data[DATA_SIZE];
uint32_t write_ptr = 0;
uint32_t read_ptr = 0;
uint32_t sem_init_flag = 0;
uint8_t reserved[112]; //作为字节对齐保留
}shareData_t; //总计256字节
根据sem_t的定义
#if __WORDSIZE == 64
# define __SIZEOF_SEM_T 32
#else
# define __SIZEOF_SEM_T 16
#endif
/* Value returned if `sem_open' failed. */
#define SEM_FAILED ((sem_t *) 0)
typedef union
{
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;
笔者在64位机做实验,这里对应__SIZEOF_SEM_T值为32,然后由于sem_t联合体里面有long int的数据类型,那么它的内存对齐就是8个字节倍数,所以综合考虑最终共享内存结构体大小定义为256字节
sem_init_flag用于判断是否已经初始化信号量,因为无名信号量的初始化只需要一次
共享内存类实现
为了减少主函数的代码量和可移植性,笔者将对共享内存的所有操作封装成一个类,两个进程只需要调用类的接口即可,这样看起来更直观清晰
class sharedMem
{
private:
shareData_t *shm_ptr;
char *shm_name;
public:
sharedMem(/* args */);
~sharedMem();
int init(char *name);
void deinit();
void destroy();
int write(uint8_t *data,size_t size);
int read(uint8_t *data,size_t size);
};
类的私有成员一个是指向共享内存地址的指针,还有一个是共享内存的名称,这里使用的是POSIX的共享内存API
整个类的接口有初始化,去初始化,销毁共享内存,写共享内存,读共享内存几种
初始化
共享内存的初始化主要是分为两步,一步是创建共享内存(如果没有)并映射其地址,
fd = shm_open(shm_name,O_CREAT | O_RDWR,0640);
ftruncate(fd,sizeof(shareData_t));
shm_ptr = (shareData_t *)mmap(NULL,sizeof(shareData_t),PROT_READ | PROT_WRITE,MAP_SHARED,fd,0);
第二部是判断信号量是否已经初始化,如果没有初始化就需要进行初始化,初始化后其他进程避免初始化
在对信号量的初始化判断的时候使用到了文件锁,进行加锁操作,避免其他进程同时进行判断从而可能存在竞争。
if(flock(fd,LOCK_EX) == 0) //加文件锁,排他锁,查看是否已经初始化sem
{
if(shm_ptr->sem_init_flag != SEM_INIT_FLAG)
{
ret = sem_init(&shm_ptr->sem,1,1);
if(ret != 0)
{
perror("sem_init failed:");
return -1;
}
shm_ptr->sem_init_flag = SEM_INIT_FLAG;
SHM_PRINT("sem_init succ");
}
else
SHM_PRINT("sem inited");
flock(fd,LOCK_UN);//释放锁
}
去初始化
去初始化的作用就是接触共享内存的映射
munmap(shm_ptr,sizeof(shareData_t));
销毁共享内存
销毁共享内存只需要调用一次,一般由管理这个共享内存的主进程调用销毁
shm_unlink(shm_name);
写共享内存
读写共享内存时需要加信号量操作,就如同多线程读写加互斥锁操作一样
这里笔者的写完成判断是写指针达到data数组的大小,在这之后生产者进程直接退出
sem_wait(&shm_ptr->sem);
if(sizeof(shm_ptr->data) - shm_ptr->write_ptr >= size)
{
memcpy(shm_ptr->data + shm_ptr->write_ptr,data,size);
shm_ptr->write_ptr += size;
sem_post(&shm_ptr->sem);
return size;
}
else
{
sem_post(&shm_ptr->sem);
SHM_PRINT("shared Mem full");
return -1;
}
读共享内存
读共享内存也和写操作一样,有时候可能读操作会比写操作快,那么就需要重新进行等待
sem_wait(&shm_ptr->sem);
if(shm_ptr->read_ptr == sizeof(shm_ptr->data))
{
sem_post(&shm_ptr->sem);
return -1;
}
if(shm_ptr->write_ptr - shm_ptr->read_ptr >= size)
{
memcpy(data,shm_ptr->data + shm_ptr->read_ptr,size);
shm_ptr->read_ptr += size;
sem_post(&shm_ptr->sem);
return size;
}
else
{
sem_post(&shm_ptr->sem);
SHM_PRINT("read faster than write");
return 0;
}
主进程(消费者)
这里笔者将消费者进程作为主进程,在进行信号量的初始化之后会去fork子进程,子进程调用execve函数执行生产者内容
int main(int argc,char **argv)
{
uint8_t buff[4];
int ret = 0;
pid_t child_pid = 0;
sharedMem g_shareMem;
int status;
ret = g_shareMem.init(SHM_NAME);
if((child_pid = fork()) == 0)
{
char *argv[] = { "./sharedMemWrite", NULL };
char *envp[] = { NULL };
execve("./sharedMemWrite",argv,envp);
}
while(1)
{
sleep(1);
ret = g_shareMem.read(buff,sizeof(buff));
if(ret < 0)
{
break;
}
else if(ret == 0)
{
continue;
}
SHM_PRINT("read buff: %u %u %u %u",buff[0],buff[1],buff[2],buff[3]);
}
g_shareMem.deinit();
waitpid(child_pid,&status,0);
SHM_PRINT("child proc exit");
g_shareMem.destroy();
return 0;
}
子进程(生产者)
子进程代码就比较简单了,就直接映射共享内存然后写就行了
int main(int argc,char **agrv)
{
uint8_t buff[4];
int ret = 0;
uint8_t i = 0;
uint8_t j = 0;
sharedMem g_shareMem;
ret = g_shareMem.init(SHM_NAME);
while(1)
{
sleep(1);
for(i = 0;i < sizeof(buff);i++)
{
buff[i] = i + j;
}
ret = g_shareMem.write(buff,sizeof(buff));
if(ret < 0)
break;
j = j + 4;
}
g_shareMem.deinit();
return 0;
}
结果现象
执行sharedMemRead程序,结果如下:
jhliu@test:~/OS_code/sharedMemSync$ ls
shareData.h sharedMem.cpp sharedMem.h sharedMemRead sharedMemRead.cpp sharedMemWrite sharedMemWrite.cpp
jhliu@test:~/OS_code/sharedMemSync$ ./sharedMemRead
PID615102: shm_open succ
PID615102: shm st_size:256
PID615102: mmap succ
PID615102: sem_init succ
PID615104: shm_open succ
PID615104: shm st_size:256
PID615104: mmap succ
PID615104: sem inited
PID615102: read faster than write
PID615102: read buff: 0 1 2 3
PID615102: read buff: 4 5 6 7
PID615102: read buff: 8 9 10 11
PID615102: read buff: 12 13 14 15
PID615102: read buff: 16 17 18 19
PID615102: read buff: 20 21 22 23
PID615102: read buff: 24 25 26 27
PID615102: read buff: 28 29 30 31
PID615102: read buff: 32 33 34 35
PID615102: read buff: 36 37 38 39
PID615102: read buff: 40 41 42 43
PID615102: read buff: 44 45 46 47
PID615102: read buff: 48 49 50 51
PID615102: read buff: 52 53 54 55
PID615102: read buff: 56 57 58 59
PID615102: read buff: 60 61 62 63
PID615102: read buff: 64 65 66 67
PID615102: read buff: 68 69 70 71
PID615102: read buff: 72 73 74 75
PID615102: read buff: 76 77 78 79
PID615102: read buff: 80 81 82 83
PID615102: read buff: 84 85 86 87
PID615102: read buff: 88 89 90 91
PID615102: read buff: 92 93 94 95
PID615102: read buff: 96 97 98 99
PID615104: shared Mem full
PID615104: munmap succ
PID615102: munmap succ
PID615102: child proc exit
PID615102: shm_unlink succ
可以看到主进程615102先创建共享内存,然后映射到自己的虚拟内存中,并且初始化了信号量,然后启动子进程615104,子进程只需要映射共享内存,并判断信号量已经初始化。接下来生产者和消费者两个进程就轮流读写。最后子进程615104退出然后主进程615102检测到子进程退出也销毁共享内存
上面生产者和消费者的读写速度都是1s一次,下面笔者改了生产者的速度为2s一次:
jhliu@test:~/OS_code/sharedMemSync$ ./sharedMemRead
PID615614: shm_open succ
PID615614: shm st_size:256
PID615614: mmap succ
PID615614: sem_init succ
PID615615: shm_open succ
PID615615: shm st_size:256
PID615615: mmap succ
PID615615: sem inited
PID615614: read faster than write
PID615614: read faster than write
PID615614: read buff: 0 1 2 3
PID615614: read faster than write
PID615614: read buff: 4 5 6 7
PID615614: read faster than write
PID615614: read buff: 8 9 10 11
PID615614: read faster than write
PID615614: read buff: 12 13 14 15
PID615614: read faster than write
PID615614: read buff: 16 17 18 19
PID615614: read faster than write
PID615614: read buff: 20 21 22 23
PID615614: read faster than write
PID615614: read buff: 24 25 26 27
PID615614: read faster than write
PID615614: read buff: 28 29 30 31
PID615614: read faster than write
PID615614: read buff: 32 33 34 35
PID615614: read buff: 36 37 38 39
PID615614: read faster than write
PID615614: read buff: 40 41 42 43
PID615614: read faster than write
PID615614: read buff: 44 45 46 47
PID615614: read faster than write
PID615614: read buff: 48 49 50 51
PID615614: read faster than write
PID615614: read buff: 52 53 54 55
PID615614: read faster than write
PID615614: read buff: 56 57 58 59
PID615614: read faster than write
PID615614: read buff: 60 61 62 63
PID615614: read faster than write
PID615614: read buff: 64 65 66 67
PID615614: read faster than write
PID615614: read buff: 68 69 70 71
PID615614: read faster than write
PID615614: read buff: 72 73 74 75
PID615614: read faster than write
PID615614: read buff: 76 77 78 79
PID615614: read faster than write
PID615614: read buff: 80 81 82 83
PID615614: read faster than write
PID615614: read buff: 84 85 86 87
PID615614: read faster than write
PID615614: read buff: 88 89 90 91
PID615614: read faster than write
PID615614: read buff: 92 93 94 95
PID615614: read faster than write
PID615614: read buff: 96 97 98 99
PID615614: munmap succ
PID615615: shared Mem full
PID615615: munmap succ
PID615614: child proc exit
PID615614: shm_unlink succ
完整源码
shareData.h:
#ifndef __SHAREDATA__H
#define __SHAREDATA__H
#include <semaphore.h>
#include <stdint.h>
#include <stdio.h>
#include <unistd.h>
#define SHM_NAME "/wjbtest"
#define SHM_PRINT(fmt, ...) printf("PID%d: " fmt "\r\n", getpid(), ##__VA_ARGS__)
#define DATA_SIZE 100
#define SEM_INIT_FLAG 0x12345678
typedef struct shareData
{
sem_t sem; //64位下32字节,内部对齐是8的倍数
uint8_t data[DATA_SIZE];
uint32_t write_ptr = 0;
uint32_t read_ptr = 0;
uint32_t sem_init_flag = 0;
uint8_t reserved[112]; //作为字节对齐保留
}shareData_t; //总计256字节
#endif
sharedMem.h:
#ifndef __SHAREDMEM_H
#define __SHAREDMEM_H
#include "shareData.h"
class sharedMem
{
private:
shareData_t *shm_ptr;
char *shm_name;
public:
sharedMem(/* args */);
~sharedMem();
int init(char *name);
void deinit();
void destroy();
int write(uint8_t *data,size_t size);
int read(uint8_t *data,size_t size);
};
#endif
sharedMem.cpp:
#include <cstddef>
#include <sys/mman.h>
#include <sys/stat.h> /* For mode constants */
#include <fcntl.h> /* For O_* constants */
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/file.h>
#include "sharedMem.h"
sharedMem::sharedMem(/* args */)
{
}
sharedMem::~sharedMem()
{
}
int sharedMem::init(char *name)
{
int fd = 0;
int ret = 0;
shm_name = name;
fd = shm_open(shm_name,O_CREAT | O_RDWR,0640);
if(fd == -1)
{
perror("shm_open failed:");
return -1;
}
SHM_PRINT("shm_open succ");
if(ftruncate(fd,sizeof(shareData_t)) != 0)
{
perror("ftruncate failed:");
return -1;
}
struct stat filestate;
fstat(fd,&filestate);
SHM_PRINT("shm st_size:%ld",filestate.st_size);
shm_ptr = (shareData_t *)mmap(NULL,sizeof(shareData_t),PROT_READ | PROT_WRITE,MAP_SHARED,fd,0);
if(shm_ptr == NULL)
{
perror("mmap failed:");
return -1;
}
SHM_PRINT("mmap succ");
if(flock(fd,LOCK_EX) == 0) //加文件锁,排他锁,查看是否已经初始化sem
{
if(shm_ptr->sem_init_flag != SEM_INIT_FLAG)
{
ret = sem_init(&shm_ptr->sem,1,1);
if(ret != 0)
{
perror("sem_init failed:");
return -1;
}
shm_ptr->sem_init_flag = SEM_INIT_FLAG;
SHM_PRINT("sem_init succ");
}
else
SHM_PRINT("sem inited");
flock(fd,LOCK_UN);//释放锁
}
close(fd);
return 0;
}
void sharedMem::deinit()
{
munmap(shm_ptr,sizeof(shareData_t));
SHM_PRINT("munmap succ");
}
void sharedMem::destroy()
{
shm_unlink(shm_name);
SHM_PRINT("shm_unlink succ");
}
int sharedMem::write(uint8_t *data,size_t size)
{
sem_wait(&shm_ptr->sem);
if(sizeof(shm_ptr->data) - shm_ptr->write_ptr >= size)
{
memcpy(shm_ptr->data + shm_ptr->write_ptr,data,size);
shm_ptr->write_ptr += size;
sem_post(&shm_ptr->sem);
return size;
}
else
{
sem_post(&shm_ptr->sem);
SHM_PRINT("shared Mem full");
return -1;
}
}
int sharedMem::read(uint8_t *data,size_t size)
{
sem_wait(&shm_ptr->sem);
if(shm_ptr->read_ptr == sizeof(shm_ptr->data))
{
sem_post(&shm_ptr->sem);
return -1;
}
if(shm_ptr->write_ptr - shm_ptr->read_ptr >= size)
{
memcpy(data,shm_ptr->data + shm_ptr->read_ptr,size);
shm_ptr->read_ptr += size;
sem_post(&shm_ptr->sem);
return size;
}
else
{
sem_post(&shm_ptr->sem);
SHM_PRINT("read faster than write");
return 0;
}
}
sharedMemRead.cpp:
#include <cstddef>
#include <sys/mman.h>
#include <sys/stat.h> /* For mode constants */
#include <fcntl.h> /* For O_* constants */
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/wait.h>
#include "sharedMem.h"
int main(int argc,char **argv)
{
uint8_t buff[4];
int ret = 0;
pid_t child_pid = 0;
sharedMem g_shareMem;
int status;
ret = g_shareMem.init(SHM_NAME);
if((child_pid = fork()) == 0)
{
char *argv[] = { "./sharedMemWrite", NULL };
char *envp[] = { NULL };
execve("./sharedMemWrite",argv,envp);
}
while(1)
{
sleep(1);
ret = g_shareMem.read(buff,sizeof(buff));
if(ret < 0)
{
break;
}
else if(ret == 0)
{
continue;
}
SHM_PRINT("read buff: %u %u %u %u",buff[0],buff[1],buff[2],buff[3]);
}
g_shareMem.deinit();
waitpid(child_pid,&status,0);
SHM_PRINT("child proc exit");
g_shareMem.destroy();
return 0;
}
sharedMemWrite.cpp:
#include <cstddef>
#include <sys/mman.h>
#include <sys/stat.h> /* For mode constants */
#include <fcntl.h> /* For O_* constants */
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include "sharedMem.h"
int main(int argc,char **agrv)
{
uint8_t buff[4];
int ret = 0;
uint8_t i = 0;
uint8_t j = 0;
sharedMem g_shareMem;
ret = g_shareMem.init(SHM_NAME);
while(1)
{
sleep(2);
for(i = 0;i < sizeof(buff);i++)
{
buff[i] = i + j;
}
ret = g_shareMem.write(buff,sizeof(buff));
if(ret < 0)
break;
j = j + 4;
}
g_shareMem.deinit();
return 0;
}
