共享内存 信号量 多进程 读写处理文件
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/sem.h>
#include <sys/mman.h>
#include <sys/time.h>
const int string_len = 4096;
const int share_m_number = 2;
class Semaphore
{
public:
// semun 类声明
union semun
{
int val;
struct semid_ds *buf;
unsigned short *arry;
};
// 信号id变量声明
int sem_id = 0;
int task_id = 0;
bool owner=0;
Semaphore(int task_id=0, bool create=1, unsigned int number=1)
{
this->task_id = task_id;
this->owner = create;
this->create(this->task_id, this->owner);
if(this->owner)
{
this->set(number);
}
}
int create(int task_id, bool create)
{
if(create)
{
// 创建信号量
this->sem_id = semget(ftok("/proc/version", this->task_id), 1, 0666 | IPC_CREAT);
}
else
{
this->sem_id = semget(ftok("/proc/version", this->task_id), 1, 0666);
}
if(this->sem_id<=0)
{
printf("Create semaphore falied!\n");
return 0;
}
return 1;
}
int set(unsigned int source_number)
{
// 用于初始化信号量,在使用信号量前必须这样做
semun sem_union;
sem_union.val = source_number;
if (semctl(sem_id, 0, SETVAL, sem_union) == -1)
{
printf("Set semaphore failed!\n");
return 0;
}
return 1;
}
void del()
{
// 删除信号量
semun sem_union;
if (semctl(this->sem_id, 0, IPC_RMID, sem_union) == -1)
{
fprintf(stderr, "Failed to delete semaphore\n");
}
}
int p()
{
// 对信号量做减1操作,即等待P(sv)
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = -1;//P()
sem_b.sem_flg = SEM_UNDO;
if (semop(this->sem_id, &sem_b, 1) == -1)
{
fprintf(stderr, "semaphore_p failed\n");
return 0;
}
return 1;
}
int v()
{
// 这是一个释放操作,它使信号量变为可用,即发送信号V(sv)
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = 1; // V()
sem_b.sem_flg = SEM_UNDO;
if (semop(this->sem_id, &sem_b, 1) == -1)
{
fprintf(stderr, "semaphore_v failed\n");
return 0;
}
return 1;
}
~Semaphore()
{
if(this->owner)
{
this->del();
}
}
};
class Data
{
public:
char data[string_len];
int size;
};
Data *d;
FILE *read_fp = NULL, *write_fp=NULL;
Semaphore s1(1001,1,2);
Semaphore s2(1002,1,0);
int timestamp()
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (tv.tv_sec*1000 + tv.tv_usec/1000);
}
int read(int cnt)
{
size_t size_of_elements = 1;
size_t number_of_elements = string_len;
size_t read_bytes = fread(d[cnt].data, size_of_elements, number_of_elements, read_fp);
// printf("%s\n", d[cnt].data);
// printf("%d\n", int(read_bytes));
return read_bytes;
}
int write(int cnt, int size)
{
size_t size_of_elements = 1;
size_t number_of_elements = size;
for(int i=0;i!=size;i++)
{
if(d[cnt].data[i]>='a'&&d[cnt].data[i]<='z')
{
d[cnt].data[i]-=32;
}
}
size_t write_bytes = fwrite(d[cnt].data, size_of_elements, number_of_elements, write_fp);
// printf("%s\n", d[cnt].data);
// printf("%d\n", int(write_bytes));
return write_bytes;
}
void X()
{
int cnt=0;
int read_bytes=0;
read_fp = fopen("/home/amazing/c++/signal/data3.txt", "r");
while(1)
{
s1.p();
read_bytes = read(cnt);
d[cnt].size=read_bytes;
// printf("x%d\n", read_bytes);
// fflush(stdout);
cnt++;
cnt%=share_m_number;
s2.v();
if(read_bytes==0)
{
break;
}
}
fclose(read_fp);
}
void Y()
{
int cnt=0;
int read_bytes=0;
write_fp = fopen("/home/amazing/c++/signal/out.txt", "w");
while(1)
{
s2.p();
read_bytes = d[cnt].size;
// printf("y%d\n", read_bytes);
// fflush(stdout);
write(cnt, read_bytes);
cnt++;
cnt%=share_m_number;
s1.v();
if(read_bytes==0)
{
break;
}
}
fclose(write_fp);
}
int main()
{
int start_time = timestamp();
// 创建共享内存
d = (Data *)mmap(0, sizeof(Data)*share_m_number, PROT_READ | PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
// 创建进程
pid_t pid = fork();
if(pid==0)
{
s1.owner=0;
s2.owner=0;
printf("X pid %d\n", getpid());
X();
munmap(d, sizeof(Data)*share_m_number);
// printf("X end!\n");
}
else
{
printf("Y pid %d\n", getpid());
Y();
munmap(d, sizeof(Data)*share_m_number);
// printf("Y end!\n");
}
int end_time = timestamp();
double cost_time = (end_time-start_time)*1.0/1000;
printf("cost time %lfs\n", cost_time);
return 0;
}