python 上传文件到百度云盘
方法1 使用bypy
参考链接:https://blog.csdn.net/CVSvsvsvsvs/article/details/83625972
官方项目链接:https://github.com/houtianze/bypy
里边有一个地方说locales要设置成utf-8啥的,我感觉我实验室的ubuntu是卡在这个地方了,解决不了。。。
1 安装bypy库
pip install bypy
2 拿到百度云盘的python操作授权码
bypy info
复制命令行中给出的地址,浏览器打开,复制授权码,然后粘贴进命令行即可。
3 上传文件
from bypy import ByPy
if __name__ == '__main__':
# 获取一个bypy对象,封装了所有百度云文件操作的方法
bp = ByPy()
# 百度网盘创建远程文件夹
bp.mkdir(remotepath='model')
# 上传某一文件到百度云网盘对应的远程文件夹
# ondup中参数代表复制文件,默认值为'overwrite',指定'newcopy'不会覆盖重复文件
local_file_path = "C:/Users/xxxx/Desktop/11.txt"
remote_file_path = "model/11.txt"
bp.upload(localpath=local_file_path, remotepath='model/1.txt', ondup='newcopy')
4 传参启动
from bypy import ByPy
import sys
import os
def upload_file(local_file_path, remote_file_path):
if os.path.exists(local_file_path) == False:
print("{0} not exists!".format(local_file_path))
exit(0)
# 获取一个bypy对象,封装了所有百度云文件操作的方法
bp = ByPy()
# 上传某一文件到百度云网盘对应的远程文件夹
# ondup中参数代表复制文件,默认值为'overwrite',指定'newcopy'不会覆盖重复文件
bp.upload(localpath=local_file_path, remotepath=remote_file_path, ondup='newcopy')
if __name__ == '__main__':
if len(sys.argv) < 3:
print("usage: python upload.py local_file_path remote_file_path")
exit(0)
local_file_path = sys.argv[1]
remote_file_path = sys.argv[2]
upload_file(local_file_path, remote_file_path)
方法2 自己开一个应用
参考博客:https://blog.csdn.net/a2824256/article/details/119887954
官方申请地址:https://pan.baidu.com/union/doc/
1 申请百度网盘开发者
在上面那个官方申请地址中找,应该是在右上角
2 创建一个应用
找一找吧,我也忘了,
申请完了他会给你
AppID:xxxxx
Appkey:xxxxxxxxxxxx
Secretkey:xxxxxxxxxxxx
Signkey:xxxxxxxxxxxxx
把这些信息保存下来。
3 获取access_token
操作百度网盘的关键是需要用到一个access_token
有三种获取access_token的方式
1 授权码模式授权
2 简化模式授权
3 设备码模式授权
1 授权码模式授权
这个模式适合平常代码上传下载使用,因为不用频繁获取access_token,并且access_token可以刷新。
流程:
首先浏览器打开一个网页
http://openapi.baidu.com/oauth/2.0/authorize?
response_type=code&
client_id=您应用的AppKey&
redirect_uri=您应用的授权回调地址&
scope=basic,netdisk&
device_id=您应用的AppID
appkey 填 你创建的应用的appkey
redirect_url 填 oob, 这个oob指百度官方的回调地址,不然你需要一个有域名的服务器单独开放一个接口来接受百度post过来的access_token
device_id 填写你应用的app_id,这个东西可以在控制台中找到,并且好像填0也可以。我也不知道为啥,单一般还是按照要求填写吧
打开这个url,你可以获取一个10分钟有效的code。
接下来你用这个code就可以去用代码获取一个为期30天并且可以刷新的access_token。这样30天内都可以用,并且到期了还可以刷新。
2 简化模式授权
流程是直接去请求一个不可刷新的access_token,但是这个access_token的显示位置是需要一个可以信任的显示区域,这个地方可以是百度官方的oob,也可以是你自己的服务器(开放一个响应接口,获取百度发起的post)。自己一般用途的话再去专门搞个服务器开放一个专用接口,就太麻烦了,所以可以简单的使用百度的接口。
将下面链接的YOUR_APP_KEY改成你的AppKey,打开该链接即可打开授权页面
把下面链接中的YOUR_APP_KEY换成上面给你的Appkey
https://openapi.baidu.com/oauth/2.0/authorize?response_type=code&client_id=YOUR_APP_KEY&redirect_uri=oob&scope=basic,netdisk&display=tv&qrcode=0&force_login=1
这样就直接得到了一个不可以刷新的access_token
3 设备码模式授权
这模式需要每次扫描二维码
具体没细看,如果用的话看官网教程吧
4 使用access_token
普通用户的上传下载需要对文件进行分片,每片大小不能超过4M,所以我建议使用官方给的python的sdk配合我下面的上传下载助手配合使用。
官方python sdk下载地址:https://pan.baidu.com/union/doc/Kl4gsu388
解压后把其中的openapi_client单独拿到你的一个新的文件夹中,例如test。
test/openapi_client
然后创建一个easy_helper文件夹
在easy_helper文件夹下创建一个EasyHelper.py
写入以下代码:
import sys
import requests
import openapi_client
from openapi_client.api import auth_api, fileupload_api, multimediafile_api
from pprint import pprint
import os
import json
import time
import hashlib
import math
import sys
from concurrent.futures import ThreadPoolExecutor
import mmap
class EasyHelper():
def __init__(self, app_id, app_name, app_key, secret_key, data_folder_path):
self.app_id = app_id
self.app_name = app_name
self.app_key = app_key
self.secret_key = secret_key
self.data_folder_path = data_folder_path
self.json_data_path = self.data_folder_path+"/access_token.json"
def mkdir(self, path):
if os.path.exists(path) == False:
os.mkdir(path)
def get_code(self):
code_url = " http://openapi.baidu.com/oauth/2.0/authorize?response_type=code&client_id={0}&redirect_uri=oob&scope=basic,netdisk&device_id={1}".format(self.app_key, self.app_id)
print("please open the following link to get a code and write here:")
print(code_url)
code = input()
return code
def read_json_data(self):
if os.path.exists(self.json_data_path) == False:
return None
file = open(self.json_data_path, "r")
json_data = json.load(file)
file.close()
return json_data
def get_access_token(self):
# 首先尝试读取json文件,看是否有access_token
json_data = self.read_json_data()
if json_data != None:
if json_data["expires_time"] < int(time.time()):
json_data = self.refresh_access_token(json_data)
return json_data["access_token"]
# 没有json_data的适合尝试通过code去获取一个授权码模式授权的json返回值
json_data = None
with openapi_client.ApiClient() as api_client:
# Create an instance of the API class
api_instance = auth_api.AuthApi(api_client)
code = self.get_code()
client_id = self.app_key
client_secret = self.secret_key
redirect_uri = "oob" # str |
# example passing only required values which don't have defaults set
try:
api_response = api_instance.oauth_token_code2token(code, client_id, client_secret, redirect_uri)
api_response = str(api_response).replace("\'", "\"")
json_data = json.loads(api_response)
expires_in = json_data["expires_in"]
expires_time = int(time.time()) + expires_in
json_data["expires_time"] = expires_time
self.mkdir(self.data_folder_path)
file = open(self.json_data_path, "w")
json.dump(json_data, file)
file.close()
except openapi_client.ApiException as e:
print("Exception when calling AuthApi->oauth_token_code2token: %s\n" % e)
access_token = None
if json_data != None:
access_token = json_data["access_token"]
return access_token
def refresh_access_token(self, json_data):
"""
refresh access token
"""
# Enter a context with an instance of the API client
with openapi_client.ApiClient() as api_client:
# Create an instance of the API class
api_instance = auth_api.AuthApi(api_client)
refresh_token = json_data["refresh_token"]
client_id = self.app_key
client_secret = self.secret_key
# example passing only required values which don't have defaults set
try:
api_response = api_instance.oauth_token_refresh_token(refresh_token, client_id, client_secret)
pprint(api_response)
api_response = str(api_response).replace("\'", "\"")
refresh_json_data = json.loads(api_response)
expires_in = refresh_json_data["expires_in"]
access_token = refresh_json_data["access_token"]
refresh_token = refresh_json_data["refresh_token"]
expires_time = int(time.time()) + expires_in
json_data["expires_time"] = expires_time
json_data["access_token"] = access_token
json_data["refresh_token"] = refresh_token
self.mkdir(self.data_folder_path)
file = open(self.json_data_path, "w")
json.dump(json_data, file)
file.close()
except openapi_client.ApiException as e:
print("Exception when calling AuthApi->oauth_token_refresh_token: %s\n" % e)
return json_data
def upload_file(self, local_file_path, remote_file_path):
'''
授权用户为普通用户时,单个分片大小固定为4MB,单文件总大小上限为4GB
授权用户为普通会员时,单个分片大小上限为16MB,单文件总大小上限为10GB
授权用户为超级会员时,用户单个分片大小上限为32MB,单文件总大小上限为20GB
:param local_file_path:
:param remote_file_path:
:return:
'''
def MessageDigestAlgorithm(fpath: str, algorithm: str) -> str:
f = open(fpath, 'rb')
hash = hashlib.new(algorithm)
# 1024*1024=1,048,576
chunk_size = 1048576
for chunk in iter(lambda: f.read(chunk_size), b''):
hash.update(chunk)
f.close()
return hash.hexdigest()
def precreate(access_token, local_file_path, remote_file_path):
"""
precreate
"""
uploadid, remote_file_path, size, block_list = None, remote_file_path, None, None
# Enter a context with an instance of the API client
with openapi_client.ApiClient() as api_client:
# Create an instance of the API class
api_instance = fileupload_api.FileuploadApi(api_client)
access_token = access_token # str |
remote_file_path = remote_file_path # str | 对于一般的第三方软件应用,路径以 "/apps/your-app-name/" 开头。对于小度等硬件应用,路径一般 "/来自:小度设备/" 开头。对于定制化配置的硬件应用,根据配置情况进行填写。
isdir = 0 # int | isdir
size = os.path.getsize(local_file_path) # int | size
autoinit = 1 # int | autoinit
# calc file md5
block_list = [] # str | 由MD5字符串组成的list
block_list.append(MessageDigestAlgorithm(local_file_path, "md5"))
block_list = json.dumps(block_list)
rtype = 3 # int | rtype (optional)
# example passing only required values which don't have defaults set
# and optional values
try:
api_response = api_instance.xpanfileprecreate(
access_token, remote_file_path, isdir, size, autoinit, block_list, rtype=rtype)
# pprint(api_response)
api_response = str(api_response).replace("\'", "\"")
json_data = json.loads(api_response)
remote_file_path = json_data["path"]
uploadid = json_data["uploadid"]
# return_type = json_data["return_type"]
except openapi_client.ApiException as e:
print("Exception when calling FileuploadApi->xpanfileprecreate: %s\n" % e)
return uploadid, remote_file_path, size, block_list
def upload(access_token, uploadid, local_file_path, remote_file_path):
"""
upload
"""
# Enter a context with an instance of the API client
with openapi_client.ApiClient() as api_client:
# Create an instance of the API class
api_instance = fileupload_api.FileuploadApi(api_client)
access_token = access_token # str |
partseq = "0" # str |
remote_file_path = remote_file_path # str |
uploadid = uploadid # str |
type = "tmpfile" # str |
try:
file = open(local_file_path, 'rb') # file_type | 要进行传送的本地文件分片
except Exception as e:
print("Exception when open file: %s\n" % e)
exit(-1)
# example passing only required values which don't have defaults set
# and optional values
try:
api_response = api_instance.pcssuperfile2(
access_token, partseq, remote_file_path, uploadid, type, file=file)
# pprint(api_response)
except openapi_client.ApiException as e:
print("Exception when calling FileuploadApi->pcssuperfile2: %s\n" % e)
def create(access_token, uploadid, size, remote_file_path, block_list):
"""
create
"""
# Enter a context with an instance of the API client
with openapi_client.ApiClient() as api_client:
# Create an instance of the API class
api_instance = fileupload_api.FileuploadApi(api_client)
access_token = access_token # str |
remote_file_path = remote_file_path # str | 与precreate的path值保持一致
isdir = 0 # int | isdir
size = size # int | 与precreate的size值保持一致
uploadid = uploadid # str | precreate返回的uploadid
block_list = block_list # str | 与precreate的block_list值保持一致
rtype = 3 # int | rtype (optional)
# example passing only required values which don't have defaults set
# and optional values
try:
api_response = api_instance.xpanfilecreate(
access_token, remote_file_path, isdir, size, uploadid, block_list, rtype=rtype)
# pprint(api_response)
except openapi_client.ApiException as e:
print("Exception when calling FileuploadApi->xpanfilecreate: %s\n" % e)
if os.path.exists(local_file_path) == False:
print("file {0} not exists!".format(local_file_path))
return
access_token = self.get_access_token()
# 1. 预上传
uploadid, remote_file_path, size, block_list = precreate(access_token, local_file_path, remote_file_path)
# 2. 分片上传(文件切片这里没有做,超级会员单文件最大20G)
upload(access_token, uploadid, local_file_path, remote_file_path)
# 3. 创建文件
create(access_token, uploadid, size, remote_file_path, block_list)
def download_file(self, remote_file_path, save_folder):
def get_file_fs_id(access_token, remote_file_path):
"""
listall
"""
remote_file_folder_end = remote_file_path.rfind("/")
remote_file_folder = remote_file_path[:remote_file_folder_end]
remote_file_name = remote_file_path[remote_file_folder_end+1:]
remote_file_fs_id = -1
# Enter a context with an instance of the API client
with openapi_client.ApiClient() as api_client:
# Create an instance of the API class
api_instance = multimediafile_api.MultimediafileApi(api_client)
access_token = access_token # str |
remote_file_folder = remote_file_folder # str |
recursion = 1 # int |
web = "1" # str | (optional)
# start limit 这两个一个是说从第几个文件算起,limit是最多返回多少个结果
start = 0 # int | (optional)
limit = 1000 # int | (optional)
order = "time" # str | (optional)
desc = 1 # int | (optional)
# example passing only required values which don't have defaults set
# and optional values
try:
api_response = api_instance.xpanfilelistall(
access_token, remote_file_folder, recursion, web=web, start=start, limit=limit, order=order, desc=desc)
# pprint(api_response)
api_response = str(api_response).replace("\'", "\"")
json_data = json.loads(api_response)
# print(json_data)
file_list = json_data["list"]
for i in range(len(file_list)):
if file_list[i]["server_filename"] == remote_file_name:
remote_file_fs_id = file_list[i]["fs_id"]
break
except openapi_client.ApiException as e:
print("Exception when calling MultimediafileApi->xpanfilelistall: %s\n" % e)
return remote_file_fs_id
def get_file_dlink_size(access_token, fs_id):
"""
filemetas
"""
file_dlink = ""
size = 0
# Enter a context with an instance of the API client
with openapi_client.ApiClient() as api_client:
# Create an instance of the API class
api_instance = multimediafile_api.MultimediafileApi(api_client)
access_token = access_token # str |
fsids = "[{0}]".format(fs_id) # str |
thumb = "1" # str | (optional)
extra = "1" # str | (optional)
dlink = "1" # str | (optional)
needmedia = 1 # int | (optional)
# example passing only required values which don't have defaults set
# and optional values
try:
api_response = api_instance.xpanmultimediafilemetas(
access_token, fsids, thumb=thumb, extra=extra, dlink=dlink, needmedia=needmedia)
# pprint(api_response)
api_response = str(api_response).replace("\'", "\"")
json_data = json.loads(api_response)
file_list = json_data["list"]
for i in range(len(file_list)):
if file_list[i]["fs_id"] == fs_id:
file_dlink = file_list[i]["dlink"]
size = file_list[i]["size"]
break
except openapi_client.ApiException as e:
print("Exception when calling MultimediafileApi->xpanmultimediafilemetas: %s\n" % e)
return file_dlink, size
def download(access_token, file_dlink, size, save_folder, save_file_name):
file_download_url = file_dlink+"&access_token={0}".format(access_token)
headers = {
"User-Agent": "pan.baidu.com",
}
chunk_size = 1024*1024
chunk_number = math.ceil(size/chunk_size)
# print(size)
# print(chunk_number)
save_file_path = save_folder + "/" + save_file_name
file = open(save_file_path, "wb")
for i in range(chunk_number):
start_bytes = i*chunk_size
end_bytes = min(start_bytes+chunk_size-1, size-1)
# print("start end: {0}-{1}".format(start_bytes, end_bytes))
headers["Range"] = "bytes={0}-{1}".format(start_bytes, end_bytes)
# print(file_download_url)
response = requests.get(file_download_url, headers=headers)
# print(response.status_code)
# print(response.text)
file.write(response.content)
file.close()
def download_multi_thread(access_token, file_dlink, size, save_folder, save_file_name):
def download_one_chunk(file_download_url, file_path, chunk_size, chunk_mmap_offset, chunk_start_bytes,
chunk_end_bytes):
headers = {
# "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
"User-Agent": "pan.baidu.com",
}
# print("chunk size: {0}, offset: {1}".format(chunk_size, chunk_mmap_offset))
fd = os.open(file_path, os.O_RDWR | os.O_CREAT)
t_mmap = mmap.mmap(fd, length=chunk_size, access=mmap.ACCESS_WRITE, offset=chunk_mmap_offset)
os.close(fd)
print("start end: {0}-{1}".format(chunk_start_bytes, chunk_end_bytes))
headers["Range"] = "bytes={0}-{1}".format(chunk_start_bytes, chunk_end_bytes)
response_iter_size = 1024*1024*1
with requests.get(file_download_url, headers=headers, stream=True) as response:
index = 0
for chunk in response.iter_content(chunk_size=response_iter_size):
response_one_iter_size = len(chunk)
t_mmap[index:index+response_one_iter_size] = chunk
index += response_one_iter_size
t_mmap.close()
file_download_url = file_dlink + "&access_token={0}".format(access_token)
# print(file_download_url)
save_file_path = save_folder + "/" + save_file_name
fd = os.open(save_file_path, os.O_RDWR | os.O_CREAT)
# 先扩展好位置
os.ftruncate(fd, size)
os.close(fd)
thread_number_limit = 8
pool = ThreadPoolExecutor(max_workers=thread_number_limit)
one_mmap_size = mmap.ALLOCATIONGRANULARITY
one_chunk_size = 1024*1024*10
one_chunk_size = math.ceil(one_chunk_size/one_mmap_size)*one_mmap_size
chunk_number = math.ceil(size / one_chunk_size)
# 由于mmap只能按照one_mmap_size分配,所以一个文件先按照one_mmap_size切分,再按照chunk_size分配,再交给每个线程
for i in range(chunk_number):
chunk_start_bytes = i * one_chunk_size
chunk_end_bytes = min(chunk_start_bytes + one_chunk_size - 1, size - 1)
t_chunk_size = chunk_end_bytes-chunk_start_bytes+1
chunk_mmap_offset = chunk_start_bytes
pool.submit(download_one_chunk, file_download_url, save_file_path, t_chunk_size, chunk_mmap_offset,
chunk_start_bytes, chunk_end_bytes)
pool.shutdown()
self.mkdir(save_folder)
access_token = self.get_access_token()
remote_file_fs_id = get_file_fs_id(access_token, remote_file_path)
# print(remote_file_fs_id)
if remote_file_fs_id == -1:
print("get_file_fs_id failed!")
exit(-1)
file_dlink, size = get_file_dlink_size(access_token, remote_file_fs_id)
save_file_name = remote_file_path[remote_file_path.rfind("/")+1:]
# download(access_token, file_dlink, size, save_folder, save_file_name)
download_multi_thread(access_token, file_dlink, size, save_folder, save_file_name)
if __name__ == '__main__':
# method = sys.argv[1]
# print(method)
app_id = xxxxxx
app_name = 'upload'
app_key = 'xxxxxxxxxx'
secret_key = 'xxxxxxxxxxxxxxxxxx'
data_folder_path = "D:/python/python_data/upload_baiduwangpan/easy_helper"
easy_helper = EasyHelper(app_id, app_name, app_key, secret_key, data_folder_path)
# if method == "upload":
# local_file_path = sys.argv[2]
# remote_file_path = sys.argv[3]
# remote_file_path_start = remote_file_path.find(":")+1
# remote_file_path = remote_file_path[remote_file_path_start:]
# easy_helper.upload_file(local_file_path, remote_file_path)
# elif method == "download":
# remote_file_path = sys.argv[2]
# save_folder = sys.argv[3]
# remote_file_path_start = remote_file_path.find(":") + 1
# remote_file_path = remote_file_path[remote_file_path_start:]
# easy_helper.download_file(remote_file_path, save_folder)
start_time = time.time()
# 上传大一点的mp4会出错,还不知道为什么,但是把后缀改成txt即可
# local_file_path = "C:/Users/xxxx/Desktop/2.txt"
# remote_file_path = "remote:/apps/upload/2.txt"
# remote_file_path_start = remote_file_path.find(":")+1
# remote_file_path = remote_file_path[remote_file_path_start:]
# easy_helper.upload_file(local_file_path, remote_file_path)
remote_file_path = "remote:/apps/upload/2.txt"
save_folder = "D:/"
remote_file_path_start = remote_file_path.find(":") + 1
remote_file_path = remote_file_path[remote_file_path_start:]
easy_helper.download_file(remote_file_path, save_folder)
end_time = time.time()
cost_time = end_time-start_time
print("cost_time: {0}s".format(cost_time))