from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import os
class RSAEncryptor:
def __init__(self):
self.key_storage_path = os.path.join(os.path.dirname(__file__), 'keys')
self.public_key_path = os.path.join(self.key_storage_path, 'public_key.pem')
self.private_key_path = os.path.join(self.key_storage_path, 'private_key.pem')
self.private_key = None
self.public_key = None
self.init_rsa_functions()
def init_rsa_functions(self):
if not os.path.exists(self.key_storage_path):
os.makedirs(self.key_storage_path)
if not (os.path.exists(self.private_key_path) or os.path.exists(self.public_key_path)):
self.generate_keys()
else:
self.load_keys()
def generate_keys(self):
# 生成RSA私钥
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# 提取公钥
self.public_key = self.private_key.public_key()
# 将私钥保存到文件
with open(self.private_key_path, "wb") as f:
f.write(self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption() # 如果需要可以添加密码保护
))
# 将公钥保存到文件
with open(self.public_key_path, "wb") as f:
f.write(self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
))
def load_keys(self):
self.public_key = None
self.private_key = None
if os.path.exists(self.private_key_path):
# 从文件加载私钥
with open(self.private_key_path, "rb") as key_file:
self.private_key = serialization.load_pem_private_key(
key_file.read(),
password=None, # 如果之前设置了密码,则需要在这里提供
backend=default_backend()
)
if os.path.exists(self.public_key_path):
# 从文件加载公钥
with open(self.public_key_path, "rb") as key_file:
self.public_key = serialization.load_pem_public_key(
key_file.read(),
backend=default_backend()
)
def encrypt(self, bytes_data: bytes):
# 加密消息
encrypted_bytes = self.public_key.encrypt(
bytes_data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return encrypted_bytes
def decrypt(self, encrypted_bytes: bytes):
# 解密消息
decrypted_bytes = self.private_key.decrypt(
encrypted_bytes,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return decrypted_bytes
def sign(self, bytes_data: bytes):
# 签名消息
signature = self.private_key.sign(
bytes_data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return signature
def verify(self, bytes_data: bytes, signature: bytes):
# 验证签名
try:
self.public_key.verify(
signature,
bytes_data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception as e:
print("Signature verification failed:", e)
return False
if __name__ == "__main__":
# 测试
message = "Hello, RSA!"
message_bytes = message.encode('utf-8')
rsa_encryptor = RSAEncryptor()
message_encrypt_bytes = rsa_encryptor.encrypt(message_bytes)
print("Encrypted message:", message_encrypt_bytes)
message_decrypt_bytes = rsa_encryptor.decrypt(message_encrypt_bytes)
print("Decrypted message:", message_decrypt_bytes.decode('utf-8'))
signature = rsa_encryptor.sign(message_bytes)
print("Signature:", signature)
is_valid = rsa_encryptor.verify(message_bytes, signature)
print("Signature valid:", is_valid)