共享内存 信号量 多进程 读写处理文件

共享内存 信号量 多进程 读写处理文件

#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/sem.h>
#include <sys/mman.h>
#include <sys/time.h>

const int string_len = 4096;
const int share_m_number = 2;

class Semaphore
{
public:
    // semun 类声明
    union semun
    {
        int val;
        struct semid_ds *buf;
        unsigned short *arry;
    };
    // 信号id变量声明
    int sem_id = 0;
    int task_id = 0;
    bool owner=0;

    Semaphore(int task_id=0, bool create=1, unsigned int number=1)
    {
        this->task_id = task_id;
        this->owner = create;
        this->create(this->task_id, this->owner);
        if(this->owner)
        {
            this->set(number);
        }
    }

    int create(int task_id, bool create)
    {
        if(create)
        {
            // 创建信号量
            this->sem_id = semget(ftok("/proc/version", this->task_id), 1, 0666 | IPC_CREAT);
        }
        else
        {
            this->sem_id = semget(ftok("/proc/version", this->task_id), 1, 0666);
        }
        if(this->sem_id<=0)
        {
            printf("Create semaphore falied!\n");
            return 0;
        }
        return 1;
    }

    int set(unsigned int source_number)
    {
        // 用于初始化信号量,在使用信号量前必须这样做
        semun sem_union;

        sem_union.val = source_number;
        if (semctl(sem_id, 0, SETVAL, sem_union) == -1)
        {
            printf("Set semaphore failed!\n");
            return 0;
        }
        return 1;
    }

    void del()
    {
        // 删除信号量
        semun sem_union;

        if (semctl(this->sem_id, 0, IPC_RMID, sem_union) == -1)
        {
            fprintf(stderr, "Failed to delete semaphore\n");
        }
    }

    int p()
    {
        // 对信号量做减1操作,即等待P(sv)
        struct sembuf sem_b;
        sem_b.sem_num = 0;
        sem_b.sem_op = -1;//P()
        sem_b.sem_flg = SEM_UNDO;
        if (semop(this->sem_id, &sem_b, 1) == -1)
        {
            fprintf(stderr, "semaphore_p failed\n");
            return 0;
        }

        return 1;
    }

    int v()
    {
        // 这是一个释放操作,它使信号量变为可用,即发送信号V(sv)
        struct sembuf sem_b;
        sem_b.sem_num = 0;
        sem_b.sem_op = 1; // V()
        sem_b.sem_flg = SEM_UNDO;
        if (semop(this->sem_id, &sem_b, 1) == -1)
        {
            fprintf(stderr, "semaphore_v failed\n");
            return 0;
        }
        return 1;
    }

    ~Semaphore()
    {
        if(this->owner)
        {
            this->del();
        }
    }
};

class Data
{
public:
    char data[string_len];
    int size;
};

Data *d;

FILE *read_fp = NULL, *write_fp=NULL;

Semaphore s1(1001,1,2);
Semaphore s2(1002,1,0);

int timestamp()
{
    struct timeval tv;

    gettimeofday(&tv, NULL);
    return (tv.tv_sec*1000 + tv.tv_usec/1000);
}

int read(int cnt)
{
    size_t size_of_elements = 1;
    size_t number_of_elements = string_len;
    size_t read_bytes = fread(d[cnt].data, size_of_elements, number_of_elements, read_fp);
//    printf("%s\n", d[cnt].data);
//    printf("%d\n", int(read_bytes));
    return read_bytes;
}

int write(int cnt, int size)
{
    size_t size_of_elements = 1;
    size_t number_of_elements = size;
    for(int i=0;i!=size;i++)
    {
        if(d[cnt].data[i]>='a'&&d[cnt].data[i]<='z')
        {
            d[cnt].data[i]-=32;
        }
    }
    size_t write_bytes = fwrite(d[cnt].data, size_of_elements, number_of_elements, write_fp);
//    printf("%s\n", d[cnt].data);
//    printf("%d\n", int(write_bytes));
    return write_bytes;
}

void X()
{
    int cnt=0;
    int read_bytes=0;
    read_fp = fopen("/home/amazing/c++/signal/data3.txt", "r");
    while(1)
    {
        s1.p();
        read_bytes = read(cnt);
        d[cnt].size=read_bytes;
//        printf("x%d\n", read_bytes);
//        fflush(stdout);
        cnt++;
        cnt%=share_m_number;
        s2.v();
        if(read_bytes==0)
        {
            break;
        }
    }
    fclose(read_fp);
}

void Y()
{
    int cnt=0;
    int read_bytes=0;
    write_fp = fopen("/home/amazing/c++/signal/out.txt", "w");
    while(1)
    {
        s2.p();
        read_bytes = d[cnt].size;
//        printf("y%d\n", read_bytes);
//        fflush(stdout);
        write(cnt, read_bytes);
        cnt++;
        cnt%=share_m_number;
        s1.v();
        if(read_bytes==0)
        {
            break;
        }
    }
    fclose(write_fp);
}

int main()
{
    int start_time = timestamp();
    // 创建共享内存
    d = (Data *)mmap(0, sizeof(Data)*share_m_number, PROT_READ | PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1,  0);
    // 创建进程
    pid_t pid = fork();
    if(pid==0)
    {
        s1.owner=0;
        s2.owner=0;
        printf("X pid %d\n", getpid());
        X();
        munmap(d, sizeof(Data)*share_m_number);
//        printf("X end!\n");
    }
    else
    {
        printf("Y pid %d\n", getpid());
        Y();
        munmap(d, sizeof(Data)*share_m_number);
//        printf("Y end!\n");
    }
    int end_time = timestamp();
    double cost_time = (end_time-start_time)*1.0/1000;
    printf("cost time %lfs\n", cost_time);
    return 0;
}
文章目录