python 上传文件到百度云盘

方法1 使用bypy

参考链接:https://blog.csdn.net/CVSvsvsvsvs/article/details/83625972

官方项目链接:https://github.com/houtianze/bypy

里边有一个地方说locales要设置成utf-8啥的,我感觉我实验室的ubuntu是卡在这个地方了,解决不了。。。

1 安装bypy库

pip install bypy

2 拿到百度云盘的python操作授权码

bypy info

复制命令行中给出的地址,浏览器打开,复制授权码,然后粘贴进命令行即可。

3 上传文件

from bypy import ByPy


if __name__ == '__main__':

    # 获取一个bypy对象,封装了所有百度云文件操作的方法
    bp = ByPy()

    # 百度网盘创建远程文件夹
    bp.mkdir(remotepath='model')

    # 上传某一文件到百度云网盘对应的远程文件夹
    # ondup中参数代表复制文件,默认值为'overwrite',指定'newcopy'不会覆盖重复文件

    local_file_path = "C:/Users/xxxx/Desktop/11.txt"

    remote_file_path = "model/11.txt"

    bp.upload(localpath=local_file_path, remotepath='model/1.txt', ondup='newcopy')

4 传参启动

from bypy import ByPy
import sys
import os

def upload_file(local_file_path, remote_file_path):

    if os.path.exists(local_file_path) == False:
        print("{0} not exists!".format(local_file_path))
        exit(0)

    # 获取一个bypy对象,封装了所有百度云文件操作的方法
    bp = ByPy()

    # 上传某一文件到百度云网盘对应的远程文件夹
    # ondup中参数代表复制文件,默认值为'overwrite',指定'newcopy'不会覆盖重复文件

    bp.upload(localpath=local_file_path, remotepath=remote_file_path, ondup='newcopy')



if __name__ == '__main__':
    if len(sys.argv) < 3:
        print("usage: python upload.py local_file_path remote_file_path")
        exit(0)
    local_file_path = sys.argv[1]
    remote_file_path = sys.argv[2]
    upload_file(local_file_path, remote_file_path)



方法2 自己开一个应用

参考博客:https://blog.csdn.net/a2824256/article/details/119887954

官方申请地址:https://pan.baidu.com/union/doc/

1 申请百度网盘开发者

在上面那个官方申请地址中找,应该是在右上角

2 创建一个应用

找一找吧,我也忘了,

申请完了他会给你

AppID:xxxxx
Appkey:xxxxxxxxxxxx
Secretkey:xxxxxxxxxxxx
Signkey:xxxxxxxxxxxxx

把这些信息保存下来。

3 获取access_token

操作百度网盘的关键是需要用到一个access_token

有三种获取access_token的方式

1 授权码模式授权

2 简化模式授权

3 设备码模式授权

1 授权码模式授权

这个模式适合平常代码上传下载使用,因为不用频繁获取access_token,并且access_token可以刷新。

流程:

首先浏览器打开一个网页

http://openapi.baidu.com/oauth/2.0/authorize?
response_type=code&
client_id=您应用的AppKey&
redirect_uri=您应用的授权回调地址&
scope=basic,netdisk&
device_id=您应用的AppID

appkey 填 你创建的应用的appkey

redirect_url 填 oob, 这个oob指百度官方的回调地址,不然你需要一个有域名的服务器单独开放一个接口来接受百度post过来的access_token

device_id 填写你应用的app_id,这个东西可以在控制台中找到,并且好像填0也可以。我也不知道为啥,单一般还是按照要求填写吧

打开这个url,你可以获取一个10分钟有效的code。

接下来你用这个code就可以去用代码获取一个为期30天并且可以刷新的access_token。这样30天内都可以用,并且到期了还可以刷新。

2 简化模式授权

流程是直接去请求一个不可刷新的access_token,但是这个access_token的显示位置是需要一个可以信任的显示区域,这个地方可以是百度官方的oob,也可以是你自己的服务器(开放一个响应接口,获取百度发起的post)。自己一般用途的话再去专门搞个服务器开放一个专用接口,就太麻烦了,所以可以简单的使用百度的接口。

将下面链接的YOUR_APP_KEY改成你的AppKey,打开该链接即可打开授权页面

把下面链接中的YOUR_APP_KEY换成上面给你的Appkey

https://openapi.baidu.com/oauth/2.0/authorize?response_type=code&client_id=YOUR_APP_KEY&redirect_uri=oob&scope=basic,netdisk&display=tv&qrcode=0&force_login=1

这样就直接得到了一个不可以刷新的access_token

3 设备码模式授权

这模式需要每次扫描二维码

具体没细看,如果用的话看官网教程吧

4 使用access_token

普通用户的上传下载需要对文件进行分片,每片大小不能超过4M,所以我建议使用官方给的python的sdk配合我下面的上传下载助手配合使用。

官方python sdk下载地址:https://pan.baidu.com/union/doc/Kl4gsu388

解压后把其中的openapi_client单独拿到你的一个新的文件夹中,例如test。

test/openapi_client

然后创建一个easy_helper文件夹

在easy_helper文件夹下创建一个EasyHelper.py

写入以下代码:

import sys
import requests
import openapi_client
from openapi_client.api import auth_api, fileupload_api, multimediafile_api
from pprint import pprint
import os
import json
import time
import hashlib
import math
import sys
from concurrent.futures import ThreadPoolExecutor
import mmap


class EasyHelper():

    def __init__(self, app_id, app_name, app_key, secret_key, data_folder_path):
        self.app_id = app_id
        self.app_name = app_name
        self.app_key = app_key
        self.secret_key = secret_key
        self.data_folder_path = data_folder_path
        self.json_data_path = self.data_folder_path+"/access_token.json"

    def mkdir(self, path):
        if os.path.exists(path) == False:
            os.mkdir(path)

    def get_code(self):
        code_url = " http://openapi.baidu.com/oauth/2.0/authorize?response_type=code&client_id={0}&redirect_uri=oob&scope=basic,netdisk&device_id={1}".format(self.app_key, self.app_id)
        print("please open the following link to get a code and write here:")
        print(code_url)
        code = input()
        return code

    def read_json_data(self):
        if os.path.exists(self.json_data_path) == False:
            return None
        file = open(self.json_data_path, "r")
        json_data = json.load(file)
        file.close()
        return json_data

    def get_access_token(self):
        # 首先尝试读取json文件,看是否有access_token
        json_data = self.read_json_data()
        if json_data != None:
            if json_data["expires_time"] < int(time.time()):
                json_data = self.refresh_access_token(json_data)
            return json_data["access_token"]

        # 没有json_data的适合尝试通过code去获取一个授权码模式授权的json返回值
        json_data = None
        with openapi_client.ApiClient() as api_client:
            # Create an instance of the API class
            api_instance = auth_api.AuthApi(api_client)
            code = self.get_code()
            client_id = self.app_key
            client_secret = self.secret_key
            redirect_uri = "oob" # str |

            # example passing only required values which don't have defaults set
            try:
                api_response = api_instance.oauth_token_code2token(code, client_id, client_secret, redirect_uri)
                api_response = str(api_response).replace("\'", "\"")
                json_data = json.loads(api_response)
                expires_in = json_data["expires_in"]
                expires_time = int(time.time()) + expires_in
                json_data["expires_time"] = expires_time
                self.mkdir(self.data_folder_path)
                file = open(self.json_data_path, "w")
                json.dump(json_data, file)
                file.close()
            except openapi_client.ApiException as e:
                print("Exception when calling AuthApi->oauth_token_code2token: %s\n" % e)

        access_token = None
        if json_data != None:
            access_token = json_data["access_token"]

        return access_token

    def refresh_access_token(self, json_data):
        """
        refresh access token
        """
        # Enter a context with an instance of the API client
        with openapi_client.ApiClient() as api_client:
            # Create an instance of the API class
            api_instance = auth_api.AuthApi(api_client)
            refresh_token = json_data["refresh_token"]
            client_id = self.app_key
            client_secret = self.secret_key

            # example passing only required values which don't have defaults set
            try:
                api_response = api_instance.oauth_token_refresh_token(refresh_token, client_id, client_secret)
                pprint(api_response)

                api_response = str(api_response).replace("\'", "\"")
                refresh_json_data = json.loads(api_response)
                expires_in = refresh_json_data["expires_in"]
                access_token = refresh_json_data["access_token"]
                refresh_token = refresh_json_data["refresh_token"]

                expires_time = int(time.time()) + expires_in
                json_data["expires_time"] = expires_time
                json_data["access_token"] = access_token
                json_data["refresh_token"] = refresh_token
                self.mkdir(self.data_folder_path)
                file = open(self.json_data_path, "w")
                json.dump(json_data, file)
                file.close()
            except openapi_client.ApiException as e:
                print("Exception when calling AuthApi->oauth_token_refresh_token: %s\n" % e)
        return json_data

    def upload_file(self, local_file_path, remote_file_path):
        '''
        授权用户为普通用户时,单个分片大小固定为4MB,单文件总大小上限为4GB
        授权用户为普通会员时,单个分片大小上限为16MB,单文件总大小上限为10GB
        授权用户为超级会员时,用户单个分片大小上限为32MB,单文件总大小上限为20GB
        :param local_file_path:
        :param remote_file_path:
        :return:
        '''
        def MessageDigestAlgorithm(fpath: str, algorithm: str) -> str:
            f = open(fpath, 'rb')
            hash = hashlib.new(algorithm)
            # 1024*1024=1,048,576
            chunk_size = 1048576
            for chunk in iter(lambda: f.read(chunk_size), b''):
                hash.update(chunk)
            f.close()
            return hash.hexdigest()

        def precreate(access_token, local_file_path, remote_file_path):
            """
            precreate
            """
            uploadid, remote_file_path, size, block_list = None, remote_file_path, None, None
            #    Enter a context with an instance of the API client
            with openapi_client.ApiClient() as api_client:
                # Create an instance of the API class
                api_instance = fileupload_api.FileuploadApi(api_client)
                access_token = access_token  # str |
                remote_file_path = remote_file_path  # str | 对于一般的第三方软件应用,路径以 "/apps/your-app-name/" 开头。对于小度等硬件应用,路径一般 "/来自:小度设备/" 开头。对于定制化配置的硬件应用,根据配置情况进行填写。
                isdir = 0  # int | isdir
                size = os.path.getsize(local_file_path)  # int | size
                autoinit = 1  # int | autoinit

                # calc file md5
                block_list = [] # str | 由MD5字符串组成的list
                block_list.append(MessageDigestAlgorithm(local_file_path, "md5"))
                block_list = json.dumps(block_list)

                rtype = 3  # int | rtype (optional)

                # example passing only required values which don't have defaults set
                # and optional values
                try:
                    api_response = api_instance.xpanfileprecreate(
                        access_token, remote_file_path, isdir, size, autoinit, block_list, rtype=rtype)
                    # pprint(api_response)
                    api_response = str(api_response).replace("\'", "\"")
                    json_data = json.loads(api_response)
                    remote_file_path = json_data["path"]
                    uploadid = json_data["uploadid"]
                    # return_type = json_data["return_type"]
                except openapi_client.ApiException as e:
                    print("Exception when calling FileuploadApi->xpanfileprecreate: %s\n" % e)
            return uploadid, remote_file_path, size, block_list

        def upload(access_token, uploadid, local_file_path, remote_file_path):
            """
            upload
            """
            # Enter a context with an instance of the API client
            with openapi_client.ApiClient() as api_client:
                # Create an instance of the API class
                api_instance = fileupload_api.FileuploadApi(api_client)
                access_token = access_token  # str |
                partseq = "0"  # str |
                remote_file_path =  remote_file_path  # str |
                uploadid = uploadid  # str |
                type = "tmpfile"  # str |
                try:
                    file = open(local_file_path, 'rb')  # file_type | 要进行传送的本地文件分片
                except Exception as e:
                    print("Exception when open file: %s\n" % e)
                    exit(-1)

                # example passing only required values which don't have defaults set
                # and optional values
                try:
                    api_response = api_instance.pcssuperfile2(
                        access_token, partseq, remote_file_path, uploadid, type, file=file)
                    # pprint(api_response)
                except openapi_client.ApiException as e:
                    print("Exception when calling FileuploadApi->pcssuperfile2: %s\n" % e)

        def create(access_token, uploadid, size, remote_file_path, block_list):
            """
            create
            """
            # Enter a context with an instance of the API client
            with openapi_client.ApiClient() as api_client:
                # Create an instance of the API class
                api_instance = fileupload_api.FileuploadApi(api_client)
                access_token = access_token  # str |
                remote_file_path = remote_file_path  # str | 与precreate的path值保持一致
                isdir = 0  # int | isdir
                size = size  # int | 与precreate的size值保持一致
                uploadid = uploadid  # str | precreate返回的uploadid
                block_list = block_list  # str | 与precreate的block_list值保持一致
                rtype = 3  # int | rtype (optional)

                # example passing only required values which don't have defaults set
                # and optional values
                try:
                    api_response = api_instance.xpanfilecreate(
                        access_token, remote_file_path, isdir, size, uploadid, block_list, rtype=rtype)
                    # pprint(api_response)
                except openapi_client.ApiException as e:
                    print("Exception when calling FileuploadApi->xpanfilecreate: %s\n" % e)

        if os.path.exists(local_file_path) == False:
            print("file {0} not exists!".format(local_file_path))
            return

        access_token = self.get_access_token()
        # 1. 预上传
        uploadid, remote_file_path, size, block_list = precreate(access_token, local_file_path, remote_file_path)
        # 2. 分片上传(文件切片这里没有做,超级会员单文件最大20G)
        upload(access_token, uploadid, local_file_path, remote_file_path)
        # 3. 创建文件
        create(access_token, uploadid, size, remote_file_path, block_list)


    def download_file(self, remote_file_path, save_folder):
        def get_file_fs_id(access_token, remote_file_path):
            """
            listall
            """
            remote_file_folder_end = remote_file_path.rfind("/")
            remote_file_folder = remote_file_path[:remote_file_folder_end]
            remote_file_name = remote_file_path[remote_file_folder_end+1:]

            remote_file_fs_id = -1
            # Enter a context with an instance of the API client
            with openapi_client.ApiClient() as api_client:
                # Create an instance of the API class
                api_instance = multimediafile_api.MultimediafileApi(api_client)
                access_token = access_token  # str |
                remote_file_folder = remote_file_folder  # str |
                recursion = 1  # int |
                web = "1"  # str |  (optional)
                # start limit 这两个一个是说从第几个文件算起,limit是最多返回多少个结果
                start = 0  # int |  (optional)
                limit = 1000  # int |  (optional)
                order = "time"  # str |  (optional)
                desc = 1  # int |  (optional)

                # example passing only required values which don't have defaults set
                # and optional values
                try:
                    api_response = api_instance.xpanfilelistall(
                        access_token, remote_file_folder, recursion, web=web, start=start, limit=limit, order=order, desc=desc)
                    # pprint(api_response)
                    api_response = str(api_response).replace("\'", "\"")
                    json_data = json.loads(api_response)
                    # print(json_data)
                    file_list = json_data["list"]
                    for i in range(len(file_list)):
                        if file_list[i]["server_filename"] == remote_file_name:
                            remote_file_fs_id = file_list[i]["fs_id"]
                            break
                except openapi_client.ApiException as e:
                    print("Exception when calling MultimediafileApi->xpanfilelistall: %s\n" % e)
            return remote_file_fs_id

        def get_file_dlink_size(access_token, fs_id):
            """
            filemetas
            """
            file_dlink = ""
            size = 0
            # Enter a context with an instance of the API client
            with openapi_client.ApiClient() as api_client:
                # Create an instance of the API class
                api_instance = multimediafile_api.MultimediafileApi(api_client)
                access_token = access_token  # str |
                fsids = "[{0}]".format(fs_id)  # str |
                thumb = "1"  # str |  (optional)
                extra = "1"  # str |  (optional)
                dlink = "1"  # str |  (optional)
                needmedia = 1  # int |  (optional)

                # example passing only required values which don't have defaults set
                # and optional values
                try:
                    api_response = api_instance.xpanmultimediafilemetas(
                        access_token, fsids, thumb=thumb, extra=extra, dlink=dlink, needmedia=needmedia)
                    # pprint(api_response)
                    api_response = str(api_response).replace("\'", "\"")
                    json_data = json.loads(api_response)
                    file_list = json_data["list"]
                    for i in range(len(file_list)):
                        if file_list[i]["fs_id"] == fs_id:
                            file_dlink = file_list[i]["dlink"]
                            size = file_list[i]["size"]
                            break
                except openapi_client.ApiException as e:
                    print("Exception when calling MultimediafileApi->xpanmultimediafilemetas: %s\n" % e)
            return file_dlink, size

        def download(access_token, file_dlink, size, save_folder, save_file_name):
            file_download_url = file_dlink+"&access_token={0}".format(access_token)
            headers = {
                "User-Agent": "pan.baidu.com",
            }
            chunk_size = 1024*1024
            chunk_number = math.ceil(size/chunk_size)
            # print(size)
            # print(chunk_number)
            save_file_path = save_folder + "/" + save_file_name
            file = open(save_file_path, "wb")
            for i in range(chunk_number):
                start_bytes = i*chunk_size
                end_bytes = min(start_bytes+chunk_size-1, size-1)
                # print("start end: {0}-{1}".format(start_bytes, end_bytes))
                headers["Range"] = "bytes={0}-{1}".format(start_bytes, end_bytes)
                # print(file_download_url)
                response = requests.get(file_download_url, headers=headers)
                # print(response.status_code)
                # print(response.text)
                file.write(response.content)
            file.close()

        def download_multi_thread(access_token, file_dlink, size, save_folder, save_file_name):
            def download_one_chunk(file_download_url, file_path, chunk_size, chunk_mmap_offset, chunk_start_bytes,
                                   chunk_end_bytes):
                headers = {
                    # "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
                    "User-Agent": "pan.baidu.com",
                }
                # print("chunk size: {0}, offset: {1}".format(chunk_size, chunk_mmap_offset))
                fd = os.open(file_path, os.O_RDWR | os.O_CREAT)
                t_mmap = mmap.mmap(fd, length=chunk_size, access=mmap.ACCESS_WRITE, offset=chunk_mmap_offset)
                os.close(fd)
                print("start end: {0}-{1}".format(chunk_start_bytes, chunk_end_bytes))
                headers["Range"] = "bytes={0}-{1}".format(chunk_start_bytes, chunk_end_bytes)
                response_iter_size = 1024*1024*1
                with requests.get(file_download_url, headers=headers, stream=True) as response:
                    index = 0
                    for chunk in response.iter_content(chunk_size=response_iter_size):
                        response_one_iter_size = len(chunk)
                        t_mmap[index:index+response_one_iter_size] = chunk
                        index += response_one_iter_size

                t_mmap.close()
            file_download_url = file_dlink + "&access_token={0}".format(access_token)
            # print(file_download_url)

            save_file_path = save_folder + "/" + save_file_name
            fd = os.open(save_file_path, os.O_RDWR | os.O_CREAT)
            # 先扩展好位置
            os.ftruncate(fd, size)
            os.close(fd)

            thread_number_limit = 8
            pool = ThreadPoolExecutor(max_workers=thread_number_limit)

            one_mmap_size = mmap.ALLOCATIONGRANULARITY
            one_chunk_size = 1024*1024*10
            one_chunk_size = math.ceil(one_chunk_size/one_mmap_size)*one_mmap_size
            chunk_number = math.ceil(size / one_chunk_size)
            # 由于mmap只能按照one_mmap_size分配,所以一个文件先按照one_mmap_size切分,再按照chunk_size分配,再交给每个线程
            for i in range(chunk_number):
                chunk_start_bytes = i * one_chunk_size
                chunk_end_bytes = min(chunk_start_bytes + one_chunk_size - 1, size - 1)
                t_chunk_size = chunk_end_bytes-chunk_start_bytes+1
                chunk_mmap_offset = chunk_start_bytes
                pool.submit(download_one_chunk, file_download_url, save_file_path, t_chunk_size, chunk_mmap_offset,
                                                          chunk_start_bytes, chunk_end_bytes)
            pool.shutdown()

        self.mkdir(save_folder)
        access_token = self.get_access_token()
        remote_file_fs_id = get_file_fs_id(access_token, remote_file_path)
        # print(remote_file_fs_id)
        if remote_file_fs_id == -1:
            print("get_file_fs_id failed!")
            exit(-1)
        file_dlink, size = get_file_dlink_size(access_token, remote_file_fs_id)
        save_file_name = remote_file_path[remote_file_path.rfind("/")+1:]
        # download(access_token, file_dlink, size, save_folder, save_file_name)
        download_multi_thread(access_token, file_dlink, size, save_folder, save_file_name)

if __name__ == '__main__':
    # method = sys.argv[1]
    # print(method)

    app_id = xxxxxx
    app_name = 'upload'
    app_key = 'xxxxxxxxxx'
    secret_key = 'xxxxxxxxxxxxxxxxxx'
    data_folder_path = "D:/python/python_data/upload_baiduwangpan/easy_helper"

    easy_helper = EasyHelper(app_id, app_name, app_key, secret_key, data_folder_path)

    # if method == "upload":
    #     local_file_path = sys.argv[2]
    #     remote_file_path = sys.argv[3]
    #     remote_file_path_start = remote_file_path.find(":")+1
    #     remote_file_path = remote_file_path[remote_file_path_start:]
    #     easy_helper.upload_file(local_file_path, remote_file_path)
    # elif method == "download":
    #     remote_file_path = sys.argv[2]
    #     save_folder = sys.argv[3]
    #     remote_file_path_start = remote_file_path.find(":") + 1
    #     remote_file_path = remote_file_path[remote_file_path_start:]
    #     easy_helper.download_file(remote_file_path, save_folder)

    start_time = time.time()
    # 上传大一点的mp4会出错,还不知道为什么,但是把后缀改成txt即可

    # local_file_path = "C:/Users/xxxx/Desktop/2.txt"
    # remote_file_path = "remote:/apps/upload/2.txt"
    # remote_file_path_start = remote_file_path.find(":")+1
    # remote_file_path = remote_file_path[remote_file_path_start:]
    # easy_helper.upload_file(local_file_path, remote_file_path)


    remote_file_path = "remote:/apps/upload/2.txt"
    save_folder = "D:/"
    remote_file_path_start = remote_file_path.find(":") + 1
    remote_file_path = remote_file_path[remote_file_path_start:]
    easy_helper.download_file(remote_file_path, save_folder)

    end_time = time.time()

    cost_time = end_time-start_time
    print("cost_time: {0}s".format(cost_time))
文章目录