mmap 多线程同时写一个文件

1 使用read write写不可行分析

由于文件指针全局只有一个,所以多线程或多进程使用read write会导致指针位置跳转混乱,读写错误

在python爬虫多线程下载一个文件的时候,如果服务器支持分片下载,那么我们可以同时开多个线程来下载不同的分片,但是由于文件指针我发现多个文件指针在多线程的时候是共用同一个文件夹指针,导致用file.write的时候,第一个比如从0开始写,第二个从100开始写,第一个人写完后文件指针就指向1了,第二个指针你用file.tell()会发现文件指针已经变到了第一个线程的文件指针的位置了。

测试代码:

import os
import mmap
from threading import Thread
import time

def write_file(file_path, data, offset):
    file = open(file_path, "a+")
    file.seek(offset, 0)

    for i in range(len(data)):
        print("{0} {1} {2}".format(offset, file.tell(), data[i]))
        file.write(data[i])
        time.sleep(3)
    file.close()

if __name__ == '__main__':

    datas = ["123", "456"]

    threads = {}

    file_path = "C:/Users/xxx/Desktop/234.txt"

    for i in range(len(datas)):
        t = Thread(target=write_file, args=(file_path, datas[i], i*3))
        threads[i] = t
        t.start()

    for i in range(len(threads)):
        threads[i].join()

测试结果:

0 0 1
3 3 4
3 1 5
0 1 2
3 2 6
0 2 3

文件内容:

123

那有人就要说了,你多线程不行,你试试多进程啊。

嗯试了的,也是共用同一个文件指针

先是试试两个py分别运行: 1:

import os
import mmap
from threading import Thread
import time

def write_file(file_path, data, offset):
    file = open(file_path, "a+")
    file.seek(offset, 0)

    for i in range(len(data)):
        print("{0} {1} {2}".format(offset, file.tell(), data[i]))
        file.write(data[i])
        time.sleep(3)
    file.close()

if __name__ == '__main__':

    datas = ["123", "456"]

    file_path = "C:/Users/Amazing/Desktop/234.txt"

    t = Thread(target=write_file, args=(file_path, datas[0], 0))
    t.start()
    t.join()

结果:

0 0 1
0 1 2
0 3 3

2:

import os
import mmap
from threading import Thread
import time

def write_file(file_path, data, offset):
    file = open(file_path, "a+")
    file.seek(offset, 0)

    for i in range(len(data)):
        print("{0} {1} {2}".format(offset, file.tell(), data[i]))
        file.write(data[i])
        time.sleep(3)
    file.close()

if __name__ == '__main__':

    datas = ["123", "456"]

    file_path = "C:/Users/Amazing/Desktop/234.txt"

    t = Thread(target=write_file, args=(file_path, datas[1], 3))
    t.start()
    t.join()

结果:

3 3 4
3 2 5
3 4 6

文件内容:

142536

多进程代码:

from multiprocessing import Process
import os
import mmap
from threading import Thread
import time

def write_file(file_path, data, offset):
    file = open(file_path, "a+")
    file.seek(offset, 0)

    for i in range(len(data)):
        print("{0} {1} {2}".format(offset, file.tell(), data[i]))
        file.write(data[i])
        time.sleep(3)
    file.close()

if __name__ == '__main__':

    datas = ["123", "456"]

    file_path = "C:/Users/Amazing/Desktop/234.txt"

    for i in range(len(datas)):
        t = Process(target=write_file, args=(file_path, datas[i], i*3))
        t.start()

结果:

0 0 1
3 3 4
0 1 2
3 1 5
0 2 3
3 2 6

文件内容:

456

那又有人说了是不是python不行,试试c

main.cpp

#include <iostream>

#include <string>
#include <windows.h>
#include <stdio.h>
#include <stdlib.h>
#include <vector>
#include "ThreadPool.h"


using namespace std;

void write_one_file(string file_path, string data, int offset)
{
    FILE *fp = fopen(file_path.c_str(), "a+");
    fseek(fp, offset, 0);
    for(int i=0;i!=data.size();i++)
    {
        printf("%d %d %c\n", offset, ftell(fp), data[i]);
        fwrite((void *)(data.c_str()+i), 1, 1, fp);
        Sleep(3000);
    }
    fclose(fp);
}

int main()
{
    int n=10;
    string file_path = "C:/Users/xxx/Desktop/234.txt";
    vector<string> datas;
    datas.push_back("123");
    datas.push_back("456");

    ThreadPool threadpool(n);
    for(int i=0;i!=datas.size();i++)
    {
        threadpool.enqueue(write_one_file, file_path, datas[i], 3*i);
    }

    return 0;
}

ThreadPool.h

//
// Created by Amazing on 2022/9/16.
//
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <iostream>

using namespace std;

#ifndef AMAZING_THREADPOOL_H
#define AMAZING_THREADPOOL_H

class ThreadPool {

public:
    ThreadPool(size_t);                          //构造函数
    // 使用template关键字定义了两个新的类别,F和一个可变模版参数 Args 其声明方式就是 <class... T>,这里是<class... Args>
    // result_of 是推导F(Args...)这个函数的返回值
    // std::future对象来获取异步操作的结果
    // 整体这句话的意思就是定义了一个参数是Args的函数F,
    // 定义了一个函数enqueue,其参数是一个函数的地址和其参数,参数的类别和个数不定,里边的&&是
    // &&是右值引用(即将消亡的值就是右值,函数返回的临时变量也是右值)
    // &可以绑定左值(左值是指表达式结束后依然存在的持久对象)
    // 这里的&&是为了进行右值引用
    // 移动构造函数: 直接用参数对象里面的指针来初始化当前对象的指针,是一种浅层复制。
    // 做完这种指针对指针的复制,即把参数指针所指向的对象转给了当前正在被构造的这个指针,函数体内接着就把参数n里面的指针给置为空指针, 这个对象里面的指针置为空指针,将来析构函数去析构它的时候是delete一个空指针,就不会发生多次析构的事情,这个就是一个移动构造函数。
    // 该enqueue函数的返回值是一个异步调用的结果,其类型是F(Args...)的返回类型
    template<class F, class... Args>             //类模板
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;//任务入队
    ~ThreadPool();                              //析构函数

private:
    std::vector< std::thread > workers;            //线程队列,每个元素为一个Thread对象
    // 这里的function<void()>就是说一个返回值是void的函数的意思
    // 为什么不直接用void*的函数指针呢?是因为function还包含了函数的调用类型等信息,在模板编程时是安全的,emmmm,大致是这么个意思
    std::queue< std::function<void()> > tasks;     //任务队列,每个元素为一个函数对象

    std::mutex queue_mutex;                        //互斥量
    std::condition_variable condition;             //条件变量
    bool stop;                                     //停止
};

// 构造函数,把线程插入线程队列,插入时调用emplace_back(),用匿名函数lambda初始化Thread对象
inline ThreadPool::ThreadPool(size_t threads) : stop(false){
    // 这里用emplace_back感觉是想避免复制构造或移动构造,比较线程的创建比较耗时,经过我测试移动构造函数,我认为是在emplace_back中传
    // 类的构造函数所需要的参数,这样才可以避免复制构造或移动构造

    std::function<void()> f=[this]
    {
        for(;;)
        {
            // task是一个函数类型,从任务队列接收任务
            std::function<void()> task;
            {
                //给互斥量加锁,锁对象生命周期结束后自动解锁
                std::unique_lock<std::mutex> lock(this->queue_mutex);

                //(1)当匿名函数返回false时才阻塞线程,阻塞时自动释放锁。
                //(2)当匿名函数返回true且受到通知时解阻塞,然后加锁。
                // this->stop == 1 说明线程池要停止了,该线程应该结束
                // !this->tasks.empty() 说明有了新的任务,那就派出一个线程去运行这个函数
                this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });

                if(this->stop && this->tasks.empty())
                    return;

                //从任务队列取出一个任务
                task = std::move(this->tasks.front());
                this->tasks.pop();
            }                            // 自动解锁
            task();                      // 执行这个任务
        }
    };
    for(size_t i = 0; i<threads; ++i)
        workers.emplace_back(f);
}

// 添加新的任务到任务队列
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{
    // 获取函数返回值类型
    using return_type = typename std::result_of<F(Args...)>::type;

    // 创建一个指向任务的智能指针,能够自动销毁不需要的内存对象
    // packaged_task是包装一个函数,应该传给它返回类型和要包装的函数的函数指针
    // bind 是用来将参数和函数绑定,形成一个新的函数指针
    // forward是要完美转发参数
    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    // future绑定一个异步调用的结果
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);  //加锁
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });          //把任务加入队列
    }                                                   //自动解锁
    condition.notify_one();                             //通知条   件变量,唤醒一个线程
    return res;
}

// 析构函数,删除所有线程
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

#endif //AMAZING_THREADPOOL_H

结果:

3 3 4
0 0 1
3 1 5
0 1 2
3 2 6
0 2 3

文件内容:

123

所以用文件指针写入是不可行的,并行写入的时候,不肯能每次先加锁,然后fseek,然后再写,那就不是并行了。

2 mmap

需要使用mmap这个东西,在python上是mmap,在linux上也是mmap,但是在windows上就是CreateFileMapping了。

这个东西说的是把文件的某一段映射到内存中,然后你像写数组一样,就可以用多线程写同一个数组的不同地方了,但是问题又来了,mmap能不能同时开多个不同的小段,因为多线程下载文件必定是文件太大了,那么直接粗暴的把整个文件都映射到内存,少则几百M,多则上G,对内存的使用是非常不友好的。

经测试mmap可以对一个文件开不同的段,但是开段时的偏移必须是某个东西的倍数,在linux上测试的这个偏移必须是4096的倍数,在windows上的python测试的是65536

参考博客:https://docs.python.org/zh-cn/3/library/mmap.html

我先在python上测试了单进程单线程开多个mmap段测试代码:

import mmap


if __name__ == '__main__':

    datas = ["123", "456"]

    threads = {}

    file_path = "C:/Users/Amazing/Desktop/234.txt"

    file = open(file_path, "a+")

    file_mem1 = mmap.mmap(file.fileno(), length=3, access=mmap.ACCESS_WRITE, offset=0)
    file_mem2 = mmap.mmap(file.fileno(), length=3, access=mmap.ACCESS_WRITE, offset=mmap.ALLOCATIONGRANULARITY)

    for i in range(3):
        file_mem1[i] = ord(datas[0][i])
        file_mem2[i] = ord(datas[1][i])

    file_mem1.close()
    file_mem2.close()
    file.close()

这种写法与c是不太一样的,你可以像下面这样写,就很像了。

import mmap
import os

if __name__ == '__main__':

    datas = ["123", "456"]

    threads = {}

    file_path = "C:/Users/Amazing/Desktop/234.txt"

    fd = os.open(file_path, os.O_RDWR | os.O_CREAT)

    file_mem1 = mmap.mmap(fd, length=3, access=mmap.ACCESS_WRITE, offset=0)
    file_mem2 = mmap.mmap(fd, length=3, access=mmap.ACCESS_WRITE, offset=mmap.ALLOCATIONGRANULARITY)

    for i in range(3):
        file_mem1[i] = ord(datas[0][i])
        file_mem2[i] = ord(datas[1][i])

    file_mem1.close()
    file_mem2.close()
    os.close(fd)

c的代码也要注意偏移要是页大小的倍数,一般一页是4096字节,可以由sysconf(_SC_PAGE_SIZE)这个函数返回得到,类型是long int。

这里需要注意需要先用ftruncate,把需要的空间申请出来。

c这边说的是mmap返回之后,就可以关闭fd而不取消mapping。

示例代码:

#include <stdio.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string>

using namespace std;

int main(int argc, char *argv[])
{
    int fd = 0;
    string file_path = "/root/c/test_mmap/1.txt";

    fd = open(file_path.c_str(), O_RDWR | O_CREAT, 0664);

    int file_size = 4096*2;
    //方案1: write(fd,"1",1);
    ftruncate(fd, file_size);   // 方案2

    if (fd == -1) {
        printf("ERROR: open failed!\n");
        return -1;
    }

    string datas[2] = {"123", "456"};
    int data_size=3;
    printf("sysconf(_SC_PAGE_SIZE): %ld\n", sysconf(_SC_PAGE_SIZE));
    int ALLOCATIONGRANULARITY = sysconf(_SC_PAGE_SIZE);

    char *p1 = (char*)mmap(NULL, data_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    char *p2 = (char*)mmap(NULL, data_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, ALLOCATIONGRANULARITY);
    close(fd);
    if (p1 == MAP_FAILED) {
        printf("ERROR: mmap failed!\n");
        munmap(p1, data_size);
        munmap(p2, data_size);
        return -1;
    }
    if (p2 == MAP_FAILED) {
        printf("ERROR: mmap failed!\n");
        munmap(p1, data_size);
        munmap(p2, data_size);
        return -1;
    }

    for(int i=0;i!=data_size;i++)
    {
        p1[i] = datas[0][i];
        p2[i] = datas[1][i];
    }
    munmap(p1, data_size);
    munmap(p2, data_size);
    return 0;
}

python 多线程写同一个文件

优点是可以并行写文件了,坏处是要掌握好偏移量,小于65526字节64kB的文件还是得但用同一个mmap。

示例代码:

import os
import mmap
from threading import Thread
import time

def write_file(file_mem, data, offset):
    for i in range(len(data)):
        print("offset:{0} data:{1}".format(offset, data[i]))
        file_mem[i] = ord(data[i])
        time.sleep(3)

if __name__ == '__main__':

    datas = ["123", "456"]

    threads = []
    file_mems = []

    file_path = "C:/Users/xxxx/Desktop/234.txt"

    fd = os.open(file_path, os.O_RDWR | os.O_CREAT)

    for i in range(len(datas)):
        t_file_mem = mmap.mmap(fd, length=len(datas[i]), access=mmap.ACCESS_WRITE, offset=i*mmap.ALLOCATIONGRANULARITY)
        t_thread = Thread(target=write_file, args=(t_file_mem, datas[i], i*3))

        file_mems.append(t_file_mem)
        threads.append(t_thread)
        t_thread.start()

    os.close(fd)

    for i in range(len(threads)):
        threads[i].join()
        file_mems[i].close()

运行结果:

offset:0 data:1
offset:3 data:4
offset:0 data:2offset:3 data:5

offset:0 data:3offset:3 data:6

文件内容:

123很多空然后456然后很多空

但是这里要注意不能使用from multiprocessing import Pool

这个线程池不允许传mmap,你开全局变量也不行

可以使用from concurrent.futures import ThreadPoolExecutor

样例代码:

import os
import mmap
from concurrent.futures import ThreadPoolExecutor
import time

def write_file(file_path, data, offset):
    fd = os.open(file_path, os.O_RDWR | os.O_CREAT)
    file_mem = mmap.mmap(fd, length=len(data), access=mmap.ACCESS_WRITE, offset=offset)
    os.close(fd)
    for i in range(len(data)):
        print("offset:{0} data:{1}".format(offset, data[i]))
        file_mem[i] = ord(data[i])
        time.sleep(3)

if __name__ == '__main__':

    datas = ["123", "456", "789", "012"]

    threads = []
    file_mems = []

    file_path = "C:/Users/xxxxx/Desktop/234.txt"

    thread_number_limit = 3
    pool = ThreadPoolExecutor(max_workers=thread_number_limit)

    for i in range(len(datas)):
        pool.submit(write_file, file_path, datas[i], i*mmap.ALLOCATIONGRANULARITY)

    pool.shutdown()

运行结果:

offset:0 data:1
offset:65536 data:4
offset:131072 data:7
offset:0 data:2
offset:65536 data:5
offset:131072 data:8
offset:65536 data:6offset:0 data:3

offset:131072 data:9
offset:196608 data:0
offset:196608 data:1
offset:196608 data:2

文件内容:

是正确的,因为一段64kb,太大了这里就不展示了

c++ 多线程mmap写同一个文件

这个是测试可行的

ThreadPool.h

//
// Created by Amazing on 2022/9/16.
//
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <iostream>

using namespace std;

#ifndef AMAZING_THREADPOOL_H
#define AMAZING_THREADPOOL_H

class ThreadPool {

public:
    ThreadPool(size_t);                          //构造函数
    // 使用template关键字定义了两个新的类别,F和一个可变模版参数 Args 其声明方式就是 <class... T>,这里是<class... Args>
    // result_of 是推导F(Args...)这个函数的返回值
    // std::future对象来获取异步操作的结果
    // 整体这句话的意思就是定义了一个参数是Args的函数F,
    // 定义了一个函数enqueue,其参数是一个函数的地址和其参数,参数的类别和个数不定,里边的&&是
    // &&是右值引用(即将消亡的值就是右值,函数返回的临时变量也是右值)
    // &可以绑定左值(左值是指表达式结束后依然存在的持久对象)
    // 这里的&&是为了进行右值引用
    // 移动构造函数: 直接用参数对象里面的指针来初始化当前对象的指针,是一种浅层复制。
    // 做完这种指针对指针的复制,即把参数指针所指向的对象转给了当前正在被构造的这个指针,函数体内接着就把参数n里面的指针给置为空指针, 这个对象里面的指针置为空指针,将来析构函数去析构它的时候是delete一个空指针,就不会发生多次析构的事情,这个就是一个移动构造函数。
    // 该enqueue函数的返回值是一个异步调用的结果,其类型是F(Args...)的返回类型
    template<class F, class... Args>             //类模板
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;//任务入队
    ~ThreadPool();                              //析构函数

private:
    std::vector< std::thread > workers;            //线程队列,每个元素为一个Thread对象
    // 这里的function<void()>就是说一个返回值是void的函数的意思
    // 为什么不直接用void*的函数指针呢?是因为function还包含了函数的调用类型等信息,在模板编程时是安全的,emmmm,大致是这么个意思
    std::queue< std::function<void()> > tasks;     //任务队列,每个元素为一个函数对象

    std::mutex queue_mutex;                        //互斥量
    std::condition_variable condition;             //条件变量
    bool stop;                                     //停止
};

// 构造函数,把线程插入线程队列,插入时调用emplace_back(),用匿名函数lambda初始化Thread对象
inline ThreadPool::ThreadPool(size_t threads) : stop(false){
    // 这里用emplace_back感觉是想避免复制构造或移动构造,比较线程的创建比较耗时,经过我测试移动构造函数,我认为是在emplace_back中传
    // 类的构造函数所需要的参数,这样才可以避免复制构造或移动构造

    std::function<void()> f=[this]
    {
        for(;;)
        {
            // task是一个函数类型,从任务队列接收任务
            std::function<void()> task;
            {
                //给互斥量加锁,锁对象生命周期结束后自动解锁
                std::unique_lock<std::mutex> lock(this->queue_mutex);

                //(1)当匿名函数返回false时才阻塞线程,阻塞时自动释放锁。
                //(2)当匿名函数返回true且受到通知时解阻塞,然后加锁。
                // this->stop == 1 说明线程池要停止了,该线程应该结束
                // !this->tasks.empty() 说明有了新的任务,那就派出一个线程去运行这个函数
                this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });

                if(this->stop && this->tasks.empty())
                    return;

                //从任务队列取出一个任务
                task = std::move(this->tasks.front());
                this->tasks.pop();
            }                            // 自动解锁
            task();                      // 执行这个任务
        }
    };
    for(size_t i = 0; i<threads; ++i)
        workers.emplace_back(f);
}

// 添加新的任务到任务队列
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{
    // 获取函数返回值类型
    using return_type = typename std::result_of<F(Args...)>::type;

    // 创建一个指向任务的智能指针,能够自动销毁不需要的内存对象
    // packaged_task是包装一个函数,应该传给它返回类型和要包装的函数的函数指针
    // bind 是用来将参数和函数绑定,形成一个新的函数指针
    // forward是要完美转发参数
    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    // future绑定一个异步调用的结果
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);  //加锁
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });          //把任务加入队列
    }                                                   //自动解锁
    condition.notify_one();                             //通知条   件变量,唤醒一个线程
    return res;
}

// 析构函数,删除所有线程
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

#endif //AMAZING_THREADPOOL_H

main.cpp

#include <iostream>
#include "ThreadPool.h"

#include <stdio.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string>

using namespace std;

void write_file(string file_path, string data, int offset)
{
    int fd = 0;
    fd = open(file_path.c_str(), O_RDWR | O_CREAT, 0664);
    char *p = (char*)mmap(NULL, data.size(), PROT_READ | PROT_WRITE, MAP_SHARED, fd, offset);
    // memcpy(p, data.c_str(), data.size());

    for(int i=0;i!=data.size();i++)
    {
        sleep(2);
        printf("%d %c\n", offset, data[i]);
        p[i] = data[i];
    }

    munmap(p, data.size());
    close(fd);
}

int main(int argc, char *argv[])
{

    string file_path = "/root/c/test_mmap/1.txt";


    int fd = 0;
    fd = open(file_path.c_str(), O_RDWR | O_CREAT, 0664);
    int file_size = 4096*2;
    if (fd == -1) {
        printf("ERROR: open failed!\n");
        return -1;
    }
    //方案1: write(fd,"1",1);
    ftruncate(fd, file_size);   // 方案2
    close(fd);



    string datas[2] = {"123", "456"};
    int data_size=3;
    int PAGE_SIZE = sysconf(_SC_PAGE_SIZE);

    ThreadPool pool(4);


    for(int i=0;i!=2;i++)
    {
        pool.enqueue(write_file, file_path, datas[i], PAGE_SIZE*i);
    }
    return 0;
}

运行结果:

0 1
4096 4
0 2
4096 5
0 3
4096 6

文件内容:

123很多空然后456然后很多空
文章目录