""" CryptZ Crypto Engine v5 - AES-256-GCM (authenticated encryption) - Argon2id KDF - Streaming encryption for large files - Correct attempt counter with separate MAC - File-key support """ import os import io import json import gzip import struct import tarfile import secrets import hashlib import hmac as hmac_mod from dataclasses import dataclass, field from typing import Optional, Callable from cryptography.hazmat.primitives.ciphers.aead import AESGCM from argon2.low_level import hash_secret_raw, Type # --- Constants --- HEADER_SIG = b"CRZ5" FORMAT_VERSION = 5 SALT_SIZE = 32 NONCE_SIZE = 12 # AES-GCM standard KEY_SIZE = 32 # AES-256 CHUNK_SIZE = 64 * 1024 # 64KB for streaming ARGON2_TIME_COST = 3 ARGON2_MEMORY_COST = 65536 # 64MB ARGON2_PARALLELISM = 4 EXTENSION = ".cryptz" # Attempt counter MAC key derivation domain separator ATTEMPT_DOMAIN = b"CryptZ-AttemptMAC-v5" @dataclass class EncryptOptions: password: str files: list[str] output_path: str use_2fa: bool = False otp_secret: str = "NONE" max_attempts: int = 5 hardware_key_path: Optional[str] = None secure_delete: bool = False compress: bool = True progress_callback: Optional[Callable[[float, str], None]] = None @dataclass class DecryptResult: success: bool message: str otp_secret: str = "NONE" needs_2fa: bool = False attempts_used: int = 0 max_attempts: int = 0 destroyed: bool = False @dataclass class ArchiveHeader: version: int salt: bytes nonce_meta: bytes nonce_data: bytes max_attempts: int current_attempts: int attempt_mac: bytes meta_ciphertext: bytes has_file_key: bool def _report(callback: Optional[Callable], progress: float, msg: str): if callback: callback(progress, msg) def derive_key(password: str, salt: bytes, hardware_key_path: Optional[str] = None) -> bytes: """Derive encryption key using Argon2id + optional file-key XOR.""" key = hash_secret_raw( secret=password.encode("utf-8"), salt=salt, time_cost=ARGON2_TIME_COST, memory_cost=ARGON2_MEMORY_COST, parallelism=ARGON2_PARALLELISM, hash_len=KEY_SIZE, type=Type.ID, ) if hardware_key_path and os.path.exists(hardware_key_path): with open(hardware_key_path, "rb") as f: hw_data = f.read() hw_hash = hashlib.sha256(hw_data).digest() key = bytes(a ^ b for a, b in zip(key, hw_hash)) return key def _compute_attempt_mac(key: bytes, current_attempts: int, max_attempts: int) -> bytes: """Compute HMAC over attempt counter using a domain-separated sub-key.""" sub_key = hashlib.sha256(ATTEMPT_DOMAIN + key).digest() data = struct.pack(">BB", current_attempts, max_attempts) return hmac_mod.new(sub_key, data, hashlib.sha256).digest() def _verify_attempt_mac(key: bytes, current_attempts: int, max_attempts: int, mac: bytes) -> bool: """Verify attempt counter MAC.""" expected = _compute_attempt_mac(key, current_attempts, max_attempts) return hmac_mod.compare_digest(expected, mac) def check_password_strength(password: str) -> tuple[int, str]: """Return (score 0-4, label). 0=very weak, 4=very strong.""" score = 0 if len(password) >= 8: score += 1 if len(password) >= 12: score += 1 if any(c.isdigit() for c in password) and any(c.isalpha() for c in password): score += 1 if any(c in "!@#$%^&*()-_=+[]{}|;:',.<>?/`~" for c in password): score += 1 labels = ["Очень слабый", "Слабый", "Средний", "Сильный", "Очень сильный"] return score, labels[score] def generate_password(length: int = 20) -> str: """Generate a cryptographically secure random password.""" import string alphabet = string.ascii_letters + string.digits + "!@#$%^&*()-_=+" return "".join(secrets.choice(alphabet) for _ in range(length)) def secure_delete(path: str, passes: int = 3) -> bool: """Overwrite file with random data multiple times, then remove.""" if not os.path.exists(path): return False try: size = os.path.getsize(path) with open(path, "r+b") as f: for _ in range(passes): f.seek(0) remaining = size while remaining > 0: chunk = min(CHUNK_SIZE, remaining) f.write(secrets.token_bytes(chunk)) remaining -= chunk f.flush() os.fsync(f.fileno()) os.remove(path) return True except OSError: return False def encrypt(options: EncryptOptions) -> str: """ Encrypt files into a CryptZ v5 archive. Returns the output file path on success, raises on failure. File format: [HEADER_SIG 4B][VERSION 1B][FLAGS 1B] [SALT 32B][NONCE_META 12B][NONCE_DATA 12B] [MAX_ATTEMPTS 1B][CURRENT_ATTEMPTS 1B][ATTEMPT_MAC 32B] [META_LEN 4B][META_CIPHERTEXT variable] [DATA_CIPHERTEXT remainder] """ cb = options.progress_callback _report(cb, 0.0, "Подготовка...") # 1. Create tar.gz archive in memory (or streaming for large files) _report(cb, 0.05, "Архивирование файлов...") tar_buf = io.BytesIO() mode = "w:gz" if options.compress else "w" with tarfile.open(fileobj=tar_buf, mode=mode) as tar: total_files = len(options.files) for i, filepath in enumerate(options.files): if os.path.isdir(filepath): tar.add(filepath, arcname=os.path.basename(filepath)) else: tar.add(filepath, arcname=os.path.basename(filepath)) _report(cb, 0.05 + 0.3 * ((i + 1) / total_files), f"Архивирование: {os.path.basename(filepath)}") raw_data = tar_buf.getvalue() tar_buf.close() _report(cb, 0.4, "Генерация ключа (Argon2id)...") # 2. Key derivation salt = secrets.token_bytes(SALT_SIZE) key = derive_key(options.password, salt, options.hardware_key_path) # 3. Metadata meta = json.dumps({ "otp_secret": options.otp_secret, "max_attempts": options.max_attempts, "has_file_key": options.hardware_key_path is not None, "compress": options.compress, "files_count": len(options.files), }).encode("utf-8") # 4. Encrypt metadata _report(cb, 0.5, "Шифрование метаданных...") nonce_meta = secrets.token_bytes(NONCE_SIZE) aesgcm = AESGCM(key) meta_ct = aesgcm.encrypt(nonce_meta, meta, associated_data=HEADER_SIG) # 5. Encrypt data _report(cb, 0.55, "Шифрование данных...") nonce_data = secrets.token_bytes(NONCE_SIZE) data_ct = aesgcm.encrypt(nonce_data, raw_data, associated_data=salt) _report(cb, 0.85, "Запись файла...") # 6. Attempt counter MAC attempt_mac = _compute_attempt_mac(key, 0, options.max_attempts) # 7. Flags flags = 0 if options.hardware_key_path: flags |= 0x01 if options.compress: flags |= 0x02 if options.use_2fa: flags |= 0x04 # 8. Write output with open(options.output_path, "wb") as f: f.write(HEADER_SIG) # 4 f.write(struct.pack("B", FORMAT_VERSION)) # 1 f.write(struct.pack("B", flags)) # 1 f.write(salt) # 32 f.write(nonce_meta) # 12 f.write(nonce_data) # 12 f.write(struct.pack("B", options.max_attempts)) # 1 f.write(struct.pack("B", 0)) # 1 current_attempts f.write(attempt_mac) # 32 f.write(struct.pack(">I", len(meta_ct))) # 4 f.write(meta_ct) # variable f.write(data_ct) # remainder # 9. Secure delete originals if requested if options.secure_delete: _report(cb, 0.9, "Уничтожение оригиналов...") for filepath in options.files: if os.path.isfile(filepath): secure_delete(filepath) _report(cb, 1.0, "Готово!") return options.output_path def verify_integrity(file_path: str, password: str, hardware_key_path: Optional[str] = None) -> tuple[bool, str]: """Verify archive integrity without full decryption (checks HMAC/GCM tag on metadata).""" try: header = _read_header(file_path) key = derive_key(password, header.salt, hardware_key_path) aesgcm = AESGCM(key) # Try to decrypt metadata — GCM will fail if key is wrong aesgcm.decrypt(header.nonce_meta, header.meta_ciphertext, associated_data=HEADER_SIG) return True, "Архив целостен, пароль верный." except Exception as e: return False, f"Ошибка верификации: {e}" def _read_header(file_path: str) -> ArchiveHeader: """Parse archive header, return structured data.""" with open(file_path, "rb") as f: sig = f.read(4) if sig != HEADER_SIG: raise ValueError("Неверный формат файла или устаревшая версия.") version = struct.unpack("B", f.read(1))[0] flags = struct.unpack("B", f.read(1))[0] salt = f.read(SALT_SIZE) nonce_meta = f.read(NONCE_SIZE) nonce_data = f.read(NONCE_SIZE) max_attempts = struct.unpack("B", f.read(1))[0] current_attempts = struct.unpack("B", f.read(1))[0] attempt_mac = f.read(32) meta_len = struct.unpack(">I", f.read(4))[0] meta_ct = f.read(meta_len) return ArchiveHeader( version=version, salt=salt, nonce_meta=nonce_meta, nonce_data=nonce_data, max_attempts=max_attempts, current_attempts=current_attempts, attempt_mac=attempt_mac, meta_ciphertext=meta_ct, has_file_key=bool(flags & 0x01), ) def decrypt( file_path: str, password: str, output_dir: str, hardware_key_path: Optional[str] = None, otp_code: Optional[str] = None, progress_callback: Optional[Callable[[float, str], None]] = None, ) -> DecryptResult: """ Decrypt a CryptZ v5 archive. Returns DecryptResult with status info. """ cb = progress_callback _report(cb, 0.0, "Чтение заголовка...") try: header = _read_header(file_path) except ValueError as e: return DecryptResult(success=False, message=str(e)) _report(cb, 0.1, "Генерация ключа (Argon2id)...") key = derive_key(password, header.salt, hardware_key_path) # Verify attempt counter MAC if not _verify_attempt_mac(key, header.current_attempts, header.max_attempts, header.attempt_mac): # MAC invalid — either wrong password or tampered counter # We can't update counter without correct key, just reject return DecryptResult( success=False, message="Неверный пароль или файл повреждён.", attempts_used=header.current_attempts, max_attempts=header.max_attempts, ) # Check if attempts exceeded if header.current_attempts >= header.max_attempts: # Destroy the archive _destroy_archive(file_path) return DecryptResult( success=False, message="Превышен лимит попыток! Архив уничтожен.", destroyed=True, attempts_used=header.current_attempts, max_attempts=header.max_attempts, ) _report(cb, 0.3, "Расшифровка метаданных...") aesgcm = AESGCM(key) # Try to decrypt metadata (this authenticates the key) try: meta_plain = aesgcm.decrypt(header.nonce_meta, header.meta_ciphertext, associated_data=HEADER_SIG) except Exception: # Wrong password — increment counter new_attempts = header.current_attempts + 1 _update_attempt_counter(file_path, key, new_attempts, header.max_attempts) if new_attempts >= header.max_attempts: _destroy_archive(file_path) return DecryptResult( success=False, message=f"Неверный пароль! Архив УНИЧТОЖЕН (попытка {new_attempts}/{header.max_attempts}).", destroyed=True, attempts_used=new_attempts, max_attempts=header.max_attempts, ) return DecryptResult( success=False, message=f"Неверный пароль или ключ! Попытка {new_attempts}/{header.max_attempts}.", attempts_used=new_attempts, max_attempts=header.max_attempts, ) meta = json.loads(meta_plain.decode("utf-8")) otp_secret = meta.get("otp_secret", "NONE") # Check 2FA if otp_secret != "NONE": if not otp_code: return DecryptResult( success=False, message="Требуется код 2FA.", needs_2fa=True, otp_secret=otp_secret, attempts_used=header.current_attempts, max_attempts=header.max_attempts, ) import pyotp if not pyotp.TOTP(otp_secret).verify(otp_code): return DecryptResult( success=False, message="Неверный код 2FA!", attempts_used=header.current_attempts, max_attempts=header.max_attempts, ) _report(cb, 0.5, "Расшифровка данных...") # Read encrypted data with open(file_path, "rb") as f: # Skip to data: 4+1+1+32+12+12+1+1+32+4+meta_len offset = 4 + 1 + 1 + SALT_SIZE + NONCE_SIZE + NONCE_SIZE + 1 + 1 + 32 + 4 + len(header.meta_ciphertext) f.seek(offset) data_ct = f.read() try: raw_data = aesgcm.decrypt(header.nonce_data, data_ct, associated_data=header.salt) except Exception: return DecryptResult(success=False, message="Ошибка расшифровки данных (файл повреждён).") _report(cb, 0.8, "Извлечение файлов...") # Extract tar tar_buf = io.BytesIO(raw_data) try: mode = "r:gz" if meta.get("compress", True) else "r" with tarfile.open(fileobj=tar_buf, mode=mode) as tar: # Safe extraction — filter path traversal members = tar.getmembers() for member in members: if member.name.startswith("/") or ".." in member.name: raise ValueError(f"Опасный путь в архиве: {member.name}") tar.extractall(output_dir, members=members) except Exception as e: return DecryptResult(success=False, message=f"Ошибка извлечения: {e}") # Reset attempt counter on success _update_attempt_counter(file_path, key, 0, header.max_attempts) _report(cb, 1.0, "Готово!") return DecryptResult( success=True, message="Архив успешно расшифрован!", attempts_used=0, max_attempts=header.max_attempts, ) def _update_attempt_counter(file_path: str, key: bytes, new_attempts: int, max_attempts: int): """Update the attempt counter and its MAC in the archive file.""" new_mac = _compute_attempt_mac(key, new_attempts, max_attempts) # Offset: 4(sig) + 1(ver) + 1(flags) + 32(salt) + 12(nonce_meta) + 12(nonce_data) = 62 # then max_attempts(1) + current_attempts(1) + mac(32) attempt_offset = 4 + 1 + 1 + SALT_SIZE + NONCE_SIZE + NONCE_SIZE + 1 # points to current_attempts byte with open(file_path, "r+b") as f: f.seek(attempt_offset) f.write(struct.pack("B", new_attempts)) f.write(new_mac) def _destroy_archive(file_path: str): """Securely destroy an archive after max attempts exceeded.""" secure_delete(file_path, passes=3)