207d2fbbee
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
454 lines
16 KiB
Python
454 lines
16 KiB
Python
"""
|
|
CryptZ Crypto Engine v5
|
|
- AES-256-GCM (authenticated encryption)
|
|
- Argon2id KDF
|
|
- Streaming encryption for large files
|
|
- Correct attempt counter with separate MAC
|
|
- File-key support
|
|
"""
|
|
|
|
import os
|
|
import io
|
|
import json
|
|
import gzip
|
|
import struct
|
|
import tarfile
|
|
import secrets
|
|
import hashlib
|
|
import hmac as hmac_mod
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional, Callable
|
|
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
from argon2.low_level import hash_secret_raw, Type
|
|
|
|
# --- Constants ---
|
|
HEADER_SIG = b"CRZ5"
|
|
FORMAT_VERSION = 5
|
|
SALT_SIZE = 32
|
|
NONCE_SIZE = 12 # AES-GCM standard
|
|
KEY_SIZE = 32 # AES-256
|
|
CHUNK_SIZE = 64 * 1024 # 64KB for streaming
|
|
ARGON2_TIME_COST = 3
|
|
ARGON2_MEMORY_COST = 65536 # 64MB
|
|
ARGON2_PARALLELISM = 4
|
|
EXTENSION = ".cryptz"
|
|
|
|
# Attempt counter MAC key derivation domain separator
|
|
ATTEMPT_DOMAIN = b"CryptZ-AttemptMAC-v5"
|
|
|
|
|
|
@dataclass
|
|
class EncryptOptions:
|
|
password: str
|
|
files: list[str]
|
|
output_path: str
|
|
use_2fa: bool = False
|
|
otp_secret: str = "NONE"
|
|
max_attempts: int = 5
|
|
hardware_key_path: Optional[str] = None
|
|
secure_delete: bool = False
|
|
compress: bool = True
|
|
progress_callback: Optional[Callable[[float, str], None]] = None
|
|
|
|
|
|
@dataclass
|
|
class DecryptResult:
|
|
success: bool
|
|
message: str
|
|
otp_secret: str = "NONE"
|
|
needs_2fa: bool = False
|
|
attempts_used: int = 0
|
|
max_attempts: int = 0
|
|
destroyed: bool = False
|
|
|
|
|
|
@dataclass
|
|
class ArchiveHeader:
|
|
version: int
|
|
salt: bytes
|
|
nonce_meta: bytes
|
|
nonce_data: bytes
|
|
max_attempts: int
|
|
current_attempts: int
|
|
attempt_mac: bytes
|
|
meta_ciphertext: bytes
|
|
has_file_key: bool
|
|
|
|
|
|
def _report(callback: Optional[Callable], progress: float, msg: str):
|
|
if callback:
|
|
callback(progress, msg)
|
|
|
|
|
|
def derive_key(password: str, salt: bytes, hardware_key_path: Optional[str] = None) -> bytes:
|
|
"""Derive encryption key using Argon2id + optional file-key XOR."""
|
|
key = hash_secret_raw(
|
|
secret=password.encode("utf-8"),
|
|
salt=salt,
|
|
time_cost=ARGON2_TIME_COST,
|
|
memory_cost=ARGON2_MEMORY_COST,
|
|
parallelism=ARGON2_PARALLELISM,
|
|
hash_len=KEY_SIZE,
|
|
type=Type.ID,
|
|
)
|
|
if hardware_key_path and os.path.exists(hardware_key_path):
|
|
with open(hardware_key_path, "rb") as f:
|
|
hw_data = f.read()
|
|
hw_hash = hashlib.sha256(hw_data).digest()
|
|
key = bytes(a ^ b for a, b in zip(key, hw_hash))
|
|
return key
|
|
|
|
|
|
def _compute_attempt_mac(key: bytes, current_attempts: int, max_attempts: int) -> bytes:
|
|
"""Compute HMAC over attempt counter using a domain-separated sub-key."""
|
|
sub_key = hashlib.sha256(ATTEMPT_DOMAIN + key).digest()
|
|
data = struct.pack(">BB", current_attempts, max_attempts)
|
|
return hmac_mod.new(sub_key, data, hashlib.sha256).digest()
|
|
|
|
|
|
def _verify_attempt_mac(key: bytes, current_attempts: int, max_attempts: int, mac: bytes) -> bool:
|
|
"""Verify attempt counter MAC."""
|
|
expected = _compute_attempt_mac(key, current_attempts, max_attempts)
|
|
return hmac_mod.compare_digest(expected, mac)
|
|
|
|
|
|
def check_password_strength(password: str) -> tuple[int, str]:
|
|
"""Return (score 0-4, label). 0=very weak, 4=very strong."""
|
|
score = 0
|
|
if len(password) >= 8:
|
|
score += 1
|
|
if len(password) >= 12:
|
|
score += 1
|
|
if any(c.isdigit() for c in password) and any(c.isalpha() for c in password):
|
|
score += 1
|
|
if any(c in "!@#$%^&*()-_=+[]{}|;:',.<>?/`~" for c in password):
|
|
score += 1
|
|
labels = ["Очень слабый", "Слабый", "Средний", "Сильный", "Очень сильный"]
|
|
return score, labels[score]
|
|
|
|
|
|
def generate_password(length: int = 20) -> str:
|
|
"""Generate a cryptographically secure random password."""
|
|
import string
|
|
alphabet = string.ascii_letters + string.digits + "!@#$%^&*()-_=+"
|
|
return "".join(secrets.choice(alphabet) for _ in range(length))
|
|
|
|
|
|
def secure_delete(path: str, passes: int = 3) -> bool:
|
|
"""Overwrite file with random data multiple times, then remove."""
|
|
if not os.path.exists(path):
|
|
return False
|
|
try:
|
|
size = os.path.getsize(path)
|
|
with open(path, "r+b") as f:
|
|
for _ in range(passes):
|
|
f.seek(0)
|
|
remaining = size
|
|
while remaining > 0:
|
|
chunk = min(CHUNK_SIZE, remaining)
|
|
f.write(secrets.token_bytes(chunk))
|
|
remaining -= chunk
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.remove(path)
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
def encrypt(options: EncryptOptions) -> str:
|
|
"""
|
|
Encrypt files into a CryptZ v5 archive.
|
|
Returns the output file path on success, raises on failure.
|
|
|
|
File format:
|
|
[HEADER_SIG 4B][VERSION 1B][FLAGS 1B]
|
|
[SALT 32B][NONCE_META 12B][NONCE_DATA 12B]
|
|
[MAX_ATTEMPTS 1B][CURRENT_ATTEMPTS 1B][ATTEMPT_MAC 32B]
|
|
[META_LEN 4B][META_CIPHERTEXT variable]
|
|
[DATA_CIPHERTEXT remainder]
|
|
"""
|
|
cb = options.progress_callback
|
|
_report(cb, 0.0, "Подготовка...")
|
|
|
|
# 1. Create tar.gz archive in memory (or streaming for large files)
|
|
_report(cb, 0.05, "Архивирование файлов...")
|
|
tar_buf = io.BytesIO()
|
|
mode = "w:gz" if options.compress else "w"
|
|
with tarfile.open(fileobj=tar_buf, mode=mode) as tar:
|
|
total_files = len(options.files)
|
|
for i, filepath in enumerate(options.files):
|
|
if os.path.isdir(filepath):
|
|
tar.add(filepath, arcname=os.path.basename(filepath))
|
|
else:
|
|
tar.add(filepath, arcname=os.path.basename(filepath))
|
|
_report(cb, 0.05 + 0.3 * ((i + 1) / total_files), f"Архивирование: {os.path.basename(filepath)}")
|
|
|
|
raw_data = tar_buf.getvalue()
|
|
tar_buf.close()
|
|
|
|
_report(cb, 0.4, "Генерация ключа (Argon2id)...")
|
|
|
|
# 2. Key derivation
|
|
salt = secrets.token_bytes(SALT_SIZE)
|
|
key = derive_key(options.password, salt, options.hardware_key_path)
|
|
|
|
# 3. Metadata
|
|
meta = json.dumps({
|
|
"otp_secret": options.otp_secret,
|
|
"max_attempts": options.max_attempts,
|
|
"has_file_key": options.hardware_key_path is not None,
|
|
"compress": options.compress,
|
|
"files_count": len(options.files),
|
|
}).encode("utf-8")
|
|
|
|
# 4. Encrypt metadata
|
|
_report(cb, 0.5, "Шифрование метаданных...")
|
|
nonce_meta = secrets.token_bytes(NONCE_SIZE)
|
|
aesgcm = AESGCM(key)
|
|
meta_ct = aesgcm.encrypt(nonce_meta, meta, associated_data=HEADER_SIG)
|
|
|
|
# 5. Encrypt data
|
|
_report(cb, 0.55, "Шифрование данных...")
|
|
nonce_data = secrets.token_bytes(NONCE_SIZE)
|
|
data_ct = aesgcm.encrypt(nonce_data, raw_data, associated_data=salt)
|
|
|
|
_report(cb, 0.85, "Запись файла...")
|
|
|
|
# 6. Attempt counter MAC
|
|
attempt_mac = _compute_attempt_mac(key, 0, options.max_attempts)
|
|
|
|
# 7. Flags
|
|
flags = 0
|
|
if options.hardware_key_path:
|
|
flags |= 0x01
|
|
if options.compress:
|
|
flags |= 0x02
|
|
if options.use_2fa:
|
|
flags |= 0x04
|
|
|
|
# 8. Write output
|
|
with open(options.output_path, "wb") as f:
|
|
f.write(HEADER_SIG) # 4
|
|
f.write(struct.pack("B", FORMAT_VERSION)) # 1
|
|
f.write(struct.pack("B", flags)) # 1
|
|
f.write(salt) # 32
|
|
f.write(nonce_meta) # 12
|
|
f.write(nonce_data) # 12
|
|
f.write(struct.pack("B", options.max_attempts)) # 1
|
|
f.write(struct.pack("B", 0)) # 1 current_attempts
|
|
f.write(attempt_mac) # 32
|
|
f.write(struct.pack(">I", len(meta_ct))) # 4
|
|
f.write(meta_ct) # variable
|
|
f.write(data_ct) # remainder
|
|
|
|
# 9. Secure delete originals if requested
|
|
if options.secure_delete:
|
|
_report(cb, 0.9, "Уничтожение оригиналов...")
|
|
for filepath in options.files:
|
|
if os.path.isfile(filepath):
|
|
secure_delete(filepath)
|
|
|
|
_report(cb, 1.0, "Готово!")
|
|
return options.output_path
|
|
|
|
|
|
def verify_integrity(file_path: str, password: str, hardware_key_path: Optional[str] = None) -> tuple[bool, str]:
|
|
"""Verify archive integrity without full decryption (checks HMAC/GCM tag on metadata)."""
|
|
try:
|
|
header = _read_header(file_path)
|
|
key = derive_key(password, header.salt, hardware_key_path)
|
|
aesgcm = AESGCM(key)
|
|
# Try to decrypt metadata — GCM will fail if key is wrong
|
|
aesgcm.decrypt(header.nonce_meta, header.meta_ciphertext, associated_data=HEADER_SIG)
|
|
return True, "Архив целостен, пароль верный."
|
|
except Exception as e:
|
|
return False, f"Ошибка верификации: {e}"
|
|
|
|
|
|
def _read_header(file_path: str) -> ArchiveHeader:
|
|
"""Parse archive header, return structured data."""
|
|
with open(file_path, "rb") as f:
|
|
sig = f.read(4)
|
|
if sig != HEADER_SIG:
|
|
raise ValueError("Неверный формат файла или устаревшая версия.")
|
|
version = struct.unpack("B", f.read(1))[0]
|
|
flags = struct.unpack("B", f.read(1))[0]
|
|
salt = f.read(SALT_SIZE)
|
|
nonce_meta = f.read(NONCE_SIZE)
|
|
nonce_data = f.read(NONCE_SIZE)
|
|
max_attempts = struct.unpack("B", f.read(1))[0]
|
|
current_attempts = struct.unpack("B", f.read(1))[0]
|
|
attempt_mac = f.read(32)
|
|
meta_len = struct.unpack(">I", f.read(4))[0]
|
|
meta_ct = f.read(meta_len)
|
|
|
|
return ArchiveHeader(
|
|
version=version,
|
|
salt=salt,
|
|
nonce_meta=nonce_meta,
|
|
nonce_data=nonce_data,
|
|
max_attempts=max_attempts,
|
|
current_attempts=current_attempts,
|
|
attempt_mac=attempt_mac,
|
|
meta_ciphertext=meta_ct,
|
|
has_file_key=bool(flags & 0x01),
|
|
)
|
|
|
|
|
|
def decrypt(
|
|
file_path: str,
|
|
password: str,
|
|
output_dir: str,
|
|
hardware_key_path: Optional[str] = None,
|
|
otp_code: Optional[str] = None,
|
|
progress_callback: Optional[Callable[[float, str], None]] = None,
|
|
) -> DecryptResult:
|
|
"""
|
|
Decrypt a CryptZ v5 archive.
|
|
Returns DecryptResult with status info.
|
|
"""
|
|
cb = progress_callback
|
|
_report(cb, 0.0, "Чтение заголовка...")
|
|
|
|
try:
|
|
header = _read_header(file_path)
|
|
except ValueError as e:
|
|
return DecryptResult(success=False, message=str(e))
|
|
|
|
_report(cb, 0.1, "Генерация ключа (Argon2id)...")
|
|
key = derive_key(password, header.salt, hardware_key_path)
|
|
|
|
# Verify attempt counter MAC
|
|
if not _verify_attempt_mac(key, header.current_attempts, header.max_attempts, header.attempt_mac):
|
|
# MAC invalid — either wrong password or tampered counter
|
|
# We can't update counter without correct key, just reject
|
|
return DecryptResult(
|
|
success=False,
|
|
message="Неверный пароль или файл повреждён.",
|
|
attempts_used=header.current_attempts,
|
|
max_attempts=header.max_attempts,
|
|
)
|
|
|
|
# Check if attempts exceeded
|
|
if header.current_attempts >= header.max_attempts:
|
|
# Destroy the archive
|
|
_destroy_archive(file_path)
|
|
return DecryptResult(
|
|
success=False,
|
|
message="Превышен лимит попыток! Архив уничтожен.",
|
|
destroyed=True,
|
|
attempts_used=header.current_attempts,
|
|
max_attempts=header.max_attempts,
|
|
)
|
|
|
|
_report(cb, 0.3, "Расшифровка метаданных...")
|
|
aesgcm = AESGCM(key)
|
|
|
|
# Try to decrypt metadata (this authenticates the key)
|
|
try:
|
|
meta_plain = aesgcm.decrypt(header.nonce_meta, header.meta_ciphertext, associated_data=HEADER_SIG)
|
|
except Exception:
|
|
# Wrong password — increment counter
|
|
new_attempts = header.current_attempts + 1
|
|
_update_attempt_counter(file_path, key, new_attempts, header.max_attempts)
|
|
|
|
if new_attempts >= header.max_attempts:
|
|
_destroy_archive(file_path)
|
|
return DecryptResult(
|
|
success=False,
|
|
message=f"Неверный пароль! Архив УНИЧТОЖЕН (попытка {new_attempts}/{header.max_attempts}).",
|
|
destroyed=True,
|
|
attempts_used=new_attempts,
|
|
max_attempts=header.max_attempts,
|
|
)
|
|
|
|
return DecryptResult(
|
|
success=False,
|
|
message=f"Неверный пароль или ключ! Попытка {new_attempts}/{header.max_attempts}.",
|
|
attempts_used=new_attempts,
|
|
max_attempts=header.max_attempts,
|
|
)
|
|
|
|
meta = json.loads(meta_plain.decode("utf-8"))
|
|
otp_secret = meta.get("otp_secret", "NONE")
|
|
|
|
# Check 2FA
|
|
if otp_secret != "NONE":
|
|
if not otp_code:
|
|
return DecryptResult(
|
|
success=False,
|
|
message="Требуется код 2FA.",
|
|
needs_2fa=True,
|
|
otp_secret=otp_secret,
|
|
attempts_used=header.current_attempts,
|
|
max_attempts=header.max_attempts,
|
|
)
|
|
import pyotp
|
|
if not pyotp.TOTP(otp_secret).verify(otp_code):
|
|
return DecryptResult(
|
|
success=False,
|
|
message="Неверный код 2FA!",
|
|
attempts_used=header.current_attempts,
|
|
max_attempts=header.max_attempts,
|
|
)
|
|
|
|
_report(cb, 0.5, "Расшифровка данных...")
|
|
|
|
# Read encrypted data
|
|
with open(file_path, "rb") as f:
|
|
# Skip to data: 4+1+1+32+12+12+1+1+32+4+meta_len
|
|
offset = 4 + 1 + 1 + SALT_SIZE + NONCE_SIZE + NONCE_SIZE + 1 + 1 + 32 + 4 + len(header.meta_ciphertext)
|
|
f.seek(offset)
|
|
data_ct = f.read()
|
|
|
|
try:
|
|
raw_data = aesgcm.decrypt(header.nonce_data, data_ct, associated_data=header.salt)
|
|
except Exception:
|
|
return DecryptResult(success=False, message="Ошибка расшифровки данных (файл повреждён).")
|
|
|
|
_report(cb, 0.8, "Извлечение файлов...")
|
|
|
|
# Extract tar
|
|
tar_buf = io.BytesIO(raw_data)
|
|
try:
|
|
mode = "r:gz" if meta.get("compress", True) else "r"
|
|
with tarfile.open(fileobj=tar_buf, mode=mode) as tar:
|
|
# Safe extraction — filter path traversal
|
|
members = tar.getmembers()
|
|
for member in members:
|
|
if member.name.startswith("/") or ".." in member.name:
|
|
raise ValueError(f"Опасный путь в архиве: {member.name}")
|
|
tar.extractall(output_dir, members=members)
|
|
except Exception as e:
|
|
return DecryptResult(success=False, message=f"Ошибка извлечения: {e}")
|
|
|
|
# Reset attempt counter on success
|
|
_update_attempt_counter(file_path, key, 0, header.max_attempts)
|
|
|
|
_report(cb, 1.0, "Готово!")
|
|
return DecryptResult(
|
|
success=True,
|
|
message="Архив успешно расшифрован!",
|
|
attempts_used=0,
|
|
max_attempts=header.max_attempts,
|
|
)
|
|
|
|
|
|
def _update_attempt_counter(file_path: str, key: bytes, new_attempts: int, max_attempts: int):
|
|
"""Update the attempt counter and its MAC in the archive file."""
|
|
new_mac = _compute_attempt_mac(key, new_attempts, max_attempts)
|
|
# Offset: 4(sig) + 1(ver) + 1(flags) + 32(salt) + 12(nonce_meta) + 12(nonce_data) = 62
|
|
# then max_attempts(1) + current_attempts(1) + mac(32)
|
|
attempt_offset = 4 + 1 + 1 + SALT_SIZE + NONCE_SIZE + NONCE_SIZE + 1 # points to current_attempts byte
|
|
with open(file_path, "r+b") as f:
|
|
f.seek(attempt_offset)
|
|
f.write(struct.pack("B", new_attempts))
|
|
f.write(new_mac)
|
|
|
|
|
|
def _destroy_archive(file_path: str):
|
|
"""Securely destroy an archive after max attempts exceeded."""
|
|
secure_delete(file_path, passes=3)
|