Files

454 lines
16 KiB
Python
Raw Permalink Normal View History

2026-05-31 18:46:06 +08:00
"""
CryptZ Crypto Engine v5
- AES-256-GCM (authenticated encryption)
- Argon2id KDF
- Streaming encryption for large files
- Correct attempt counter with separate MAC
- File-key support
"""
import os
import io
import json
import gzip
import struct
import tarfile
import secrets
import hashlib
import hmac as hmac_mod
from dataclasses import dataclass, field
from typing import Optional, Callable
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from argon2.low_level import hash_secret_raw, Type
# --- Constants ---
HEADER_SIG = b"CRZ5"
FORMAT_VERSION = 5
SALT_SIZE = 32
NONCE_SIZE = 12 # AES-GCM standard
KEY_SIZE = 32 # AES-256
CHUNK_SIZE = 64 * 1024 # 64KB for streaming
ARGON2_TIME_COST = 3
ARGON2_MEMORY_COST = 65536 # 64MB
ARGON2_PARALLELISM = 4
EXTENSION = ".cryptz"
# Attempt counter MAC key derivation domain separator
ATTEMPT_DOMAIN = b"CryptZ-AttemptMAC-v5"
@dataclass
class EncryptOptions:
password: str
files: list[str]
output_path: str
use_2fa: bool = False
otp_secret: str = "NONE"
max_attempts: int = 5
hardware_key_path: Optional[str] = None
secure_delete: bool = False
compress: bool = True
progress_callback: Optional[Callable[[float, str], None]] = None
@dataclass
class DecryptResult:
success: bool
message: str
otp_secret: str = "NONE"
needs_2fa: bool = False
attempts_used: int = 0
max_attempts: int = 0
destroyed: bool = False
@dataclass
class ArchiveHeader:
version: int
salt: bytes
nonce_meta: bytes
nonce_data: bytes
max_attempts: int
current_attempts: int
attempt_mac: bytes
meta_ciphertext: bytes
has_file_key: bool
def _report(callback: Optional[Callable], progress: float, msg: str):
if callback:
callback(progress, msg)
def derive_key(password: str, salt: bytes, hardware_key_path: Optional[str] = None) -> bytes:
"""Derive encryption key using Argon2id + optional file-key XOR."""
key = hash_secret_raw(
secret=password.encode("utf-8"),
salt=salt,
time_cost=ARGON2_TIME_COST,
memory_cost=ARGON2_MEMORY_COST,
parallelism=ARGON2_PARALLELISM,
hash_len=KEY_SIZE,
type=Type.ID,
)
if hardware_key_path and os.path.exists(hardware_key_path):
with open(hardware_key_path, "rb") as f:
hw_data = f.read()
hw_hash = hashlib.sha256(hw_data).digest()
key = bytes(a ^ b for a, b in zip(key, hw_hash))
return key
def _compute_attempt_mac(key: bytes, current_attempts: int, max_attempts: int) -> bytes:
"""Compute HMAC over attempt counter using a domain-separated sub-key."""
sub_key = hashlib.sha256(ATTEMPT_DOMAIN + key).digest()
data = struct.pack(">BB", current_attempts, max_attempts)
return hmac_mod.new(sub_key, data, hashlib.sha256).digest()
def _verify_attempt_mac(key: bytes, current_attempts: int, max_attempts: int, mac: bytes) -> bool:
"""Verify attempt counter MAC."""
expected = _compute_attempt_mac(key, current_attempts, max_attempts)
return hmac_mod.compare_digest(expected, mac)
def check_password_strength(password: str) -> tuple[int, str]:
"""Return (score 0-4, label). 0=very weak, 4=very strong."""
score = 0
if len(password) >= 8:
score += 1
if len(password) >= 12:
score += 1
if any(c.isdigit() for c in password) and any(c.isalpha() for c in password):
score += 1
if any(c in "!@#$%^&*()-_=+[]{}|;:',.<>?/`~" for c in password):
score += 1
labels = ["Очень слабый", "Слабый", "Средний", "Сильный", "Очень сильный"]
return score, labels[score]
def generate_password(length: int = 20) -> str:
"""Generate a cryptographically secure random password."""
import string
alphabet = string.ascii_letters + string.digits + "!@#$%^&*()-_=+"
return "".join(secrets.choice(alphabet) for _ in range(length))
def secure_delete(path: str, passes: int = 3) -> bool:
"""Overwrite file with random data multiple times, then remove."""
if not os.path.exists(path):
return False
try:
size = os.path.getsize(path)
with open(path, "r+b") as f:
for _ in range(passes):
f.seek(0)
remaining = size
while remaining > 0:
chunk = min(CHUNK_SIZE, remaining)
f.write(secrets.token_bytes(chunk))
remaining -= chunk
f.flush()
os.fsync(f.fileno())
os.remove(path)
return True
except OSError:
return False
def encrypt(options: EncryptOptions) -> str:
"""
Encrypt files into a CryptZ v5 archive.
Returns the output file path on success, raises on failure.
File format:
[HEADER_SIG 4B][VERSION 1B][FLAGS 1B]
[SALT 32B][NONCE_META 12B][NONCE_DATA 12B]
[MAX_ATTEMPTS 1B][CURRENT_ATTEMPTS 1B][ATTEMPT_MAC 32B]
[META_LEN 4B][META_CIPHERTEXT variable]
[DATA_CIPHERTEXT remainder]
"""
cb = options.progress_callback
_report(cb, 0.0, "Подготовка...")
# 1. Create tar.gz archive in memory (or streaming for large files)
_report(cb, 0.05, "Архивирование файлов...")
tar_buf = io.BytesIO()
mode = "w:gz" if options.compress else "w"
with tarfile.open(fileobj=tar_buf, mode=mode) as tar:
total_files = len(options.files)
for i, filepath in enumerate(options.files):
if os.path.isdir(filepath):
tar.add(filepath, arcname=os.path.basename(filepath))
else:
tar.add(filepath, arcname=os.path.basename(filepath))
_report(cb, 0.05 + 0.3 * ((i + 1) / total_files), f"Архивирование: {os.path.basename(filepath)}")
raw_data = tar_buf.getvalue()
tar_buf.close()
_report(cb, 0.4, "Генерация ключа (Argon2id)...")
# 2. Key derivation
salt = secrets.token_bytes(SALT_SIZE)
key = derive_key(options.password, salt, options.hardware_key_path)
# 3. Metadata
meta = json.dumps({
"otp_secret": options.otp_secret,
"max_attempts": options.max_attempts,
"has_file_key": options.hardware_key_path is not None,
"compress": options.compress,
"files_count": len(options.files),
}).encode("utf-8")
# 4. Encrypt metadata
_report(cb, 0.5, "Шифрование метаданных...")
nonce_meta = secrets.token_bytes(NONCE_SIZE)
aesgcm = AESGCM(key)
meta_ct = aesgcm.encrypt(nonce_meta, meta, associated_data=HEADER_SIG)
# 5. Encrypt data
_report(cb, 0.55, "Шифрование данных...")
nonce_data = secrets.token_bytes(NONCE_SIZE)
data_ct = aesgcm.encrypt(nonce_data, raw_data, associated_data=salt)
_report(cb, 0.85, "Запись файла...")
# 6. Attempt counter MAC
attempt_mac = _compute_attempt_mac(key, 0, options.max_attempts)
# 7. Flags
flags = 0
if options.hardware_key_path:
flags |= 0x01
if options.compress:
flags |= 0x02
if options.use_2fa:
flags |= 0x04
# 8. Write output
with open(options.output_path, "wb") as f:
f.write(HEADER_SIG) # 4
f.write(struct.pack("B", FORMAT_VERSION)) # 1
f.write(struct.pack("B", flags)) # 1
f.write(salt) # 32
f.write(nonce_meta) # 12
f.write(nonce_data) # 12
f.write(struct.pack("B", options.max_attempts)) # 1
f.write(struct.pack("B", 0)) # 1 current_attempts
f.write(attempt_mac) # 32
f.write(struct.pack(">I", len(meta_ct))) # 4
f.write(meta_ct) # variable
f.write(data_ct) # remainder
# 9. Secure delete originals if requested
if options.secure_delete:
_report(cb, 0.9, "Уничтожение оригиналов...")
for filepath in options.files:
if os.path.isfile(filepath):
secure_delete(filepath)
_report(cb, 1.0, "Готово!")
return options.output_path
def verify_integrity(file_path: str, password: str, hardware_key_path: Optional[str] = None) -> tuple[bool, str]:
"""Verify archive integrity without full decryption (checks HMAC/GCM tag on metadata)."""
try:
header = _read_header(file_path)
key = derive_key(password, header.salt, hardware_key_path)
aesgcm = AESGCM(key)
# Try to decrypt metadata — GCM will fail if key is wrong
aesgcm.decrypt(header.nonce_meta, header.meta_ciphertext, associated_data=HEADER_SIG)
return True, "Архив целостен, пароль верный."
except Exception as e:
return False, f"Ошибка верификации: {e}"
def _read_header(file_path: str) -> ArchiveHeader:
"""Parse archive header, return structured data."""
with open(file_path, "rb") as f:
sig = f.read(4)
if sig != HEADER_SIG:
raise ValueError("Неверный формат файла или устаревшая версия.")
version = struct.unpack("B", f.read(1))[0]
flags = struct.unpack("B", f.read(1))[0]
salt = f.read(SALT_SIZE)
nonce_meta = f.read(NONCE_SIZE)
nonce_data = f.read(NONCE_SIZE)
max_attempts = struct.unpack("B", f.read(1))[0]
current_attempts = struct.unpack("B", f.read(1))[0]
attempt_mac = f.read(32)
meta_len = struct.unpack(">I", f.read(4))[0]
meta_ct = f.read(meta_len)
return ArchiveHeader(
version=version,
salt=salt,
nonce_meta=nonce_meta,
nonce_data=nonce_data,
max_attempts=max_attempts,
current_attempts=current_attempts,
attempt_mac=attempt_mac,
meta_ciphertext=meta_ct,
has_file_key=bool(flags & 0x01),
)
def decrypt(
file_path: str,
password: str,
output_dir: str,
hardware_key_path: Optional[str] = None,
otp_code: Optional[str] = None,
progress_callback: Optional[Callable[[float, str], None]] = None,
) -> DecryptResult:
"""
Decrypt a CryptZ v5 archive.
Returns DecryptResult with status info.
"""
cb = progress_callback
_report(cb, 0.0, "Чтение заголовка...")
try:
header = _read_header(file_path)
except ValueError as e:
return DecryptResult(success=False, message=str(e))
_report(cb, 0.1, "Генерация ключа (Argon2id)...")
key = derive_key(password, header.salt, hardware_key_path)
# Verify attempt counter MAC
if not _verify_attempt_mac(key, header.current_attempts, header.max_attempts, header.attempt_mac):
# MAC invalid — either wrong password or tampered counter
# We can't update counter without correct key, just reject
return DecryptResult(
success=False,
message="Неверный пароль или файл повреждён.",
attempts_used=header.current_attempts,
max_attempts=header.max_attempts,
)
# Check if attempts exceeded
if header.current_attempts >= header.max_attempts:
# Destroy the archive
_destroy_archive(file_path)
return DecryptResult(
success=False,
message="Превышен лимит попыток! Архив уничтожен.",
destroyed=True,
attempts_used=header.current_attempts,
max_attempts=header.max_attempts,
)
_report(cb, 0.3, "Расшифровка метаданных...")
aesgcm = AESGCM(key)
# Try to decrypt metadata (this authenticates the key)
try:
meta_plain = aesgcm.decrypt(header.nonce_meta, header.meta_ciphertext, associated_data=HEADER_SIG)
except Exception:
# Wrong password — increment counter
new_attempts = header.current_attempts + 1
_update_attempt_counter(file_path, key, new_attempts, header.max_attempts)
if new_attempts >= header.max_attempts:
_destroy_archive(file_path)
return DecryptResult(
success=False,
message=f"Неверный пароль! Архив УНИЧТОЖЕН (попытка {new_attempts}/{header.max_attempts}).",
destroyed=True,
attempts_used=new_attempts,
max_attempts=header.max_attempts,
)
return DecryptResult(
success=False,
message=f"Неверный пароль или ключ! Попытка {new_attempts}/{header.max_attempts}.",
attempts_used=new_attempts,
max_attempts=header.max_attempts,
)
meta = json.loads(meta_plain.decode("utf-8"))
otp_secret = meta.get("otp_secret", "NONE")
# Check 2FA
if otp_secret != "NONE":
if not otp_code:
return DecryptResult(
success=False,
message="Требуется код 2FA.",
needs_2fa=True,
otp_secret=otp_secret,
attempts_used=header.current_attempts,
max_attempts=header.max_attempts,
)
import pyotp
if not pyotp.TOTP(otp_secret).verify(otp_code):
return DecryptResult(
success=False,
message="Неверный код 2FA!",
attempts_used=header.current_attempts,
max_attempts=header.max_attempts,
)
_report(cb, 0.5, "Расшифровка данных...")
# Read encrypted data
with open(file_path, "rb") as f:
# Skip to data: 4+1+1+32+12+12+1+1+32+4+meta_len
offset = 4 + 1 + 1 + SALT_SIZE + NONCE_SIZE + NONCE_SIZE + 1 + 1 + 32 + 4 + len(header.meta_ciphertext)
f.seek(offset)
data_ct = f.read()
try:
raw_data = aesgcm.decrypt(header.nonce_data, data_ct, associated_data=header.salt)
except Exception:
return DecryptResult(success=False, message="Ошибка расшифровки данных (файл повреждён).")
_report(cb, 0.8, "Извлечение файлов...")
# Extract tar
tar_buf = io.BytesIO(raw_data)
try:
mode = "r:gz" if meta.get("compress", True) else "r"
with tarfile.open(fileobj=tar_buf, mode=mode) as tar:
# Safe extraction — filter path traversal
members = tar.getmembers()
for member in members:
if member.name.startswith("/") or ".." in member.name:
raise ValueError(f"Опасный путь в архиве: {member.name}")
tar.extractall(output_dir, members=members)
except Exception as e:
return DecryptResult(success=False, message=f"Ошибка извлечения: {e}")
# Reset attempt counter on success
_update_attempt_counter(file_path, key, 0, header.max_attempts)
_report(cb, 1.0, "Готово!")
return DecryptResult(
success=True,
message="Архив успешно расшифрован!",
attempts_used=0,
max_attempts=header.max_attempts,
)
def _update_attempt_counter(file_path: str, key: bytes, new_attempts: int, max_attempts: int):
"""Update the attempt counter and its MAC in the archive file."""
new_mac = _compute_attempt_mac(key, new_attempts, max_attempts)
# Offset: 4(sig) + 1(ver) + 1(flags) + 32(salt) + 12(nonce_meta) + 12(nonce_data) = 62
# then max_attempts(1) + current_attempts(1) + mac(32)
attempt_offset = 4 + 1 + 1 + SALT_SIZE + NONCE_SIZE + NONCE_SIZE + 1 # points to current_attempts byte
with open(file_path, "r+b") as f:
f.seek(attempt_offset)
f.write(struct.pack("B", new_attempts))
f.write(new_mac)
def _destroy_archive(file_path: str):
"""Securely destroy an archive after max attempts exceeded."""
secure_delete(file_path, passes=3)