Initial commit
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,182 @@
|
||||
"""
|
||||
CryptZ Ultimate v5 — CLI Module
|
||||
Usage:
|
||||
cryptz.py encrypt -p PASSWORD -o OUTPUT file1 file2 ...
|
||||
cryptz.py decrypt -p PASSWORD -o OUTPUT_DIR archive.cryptz
|
||||
cryptz.py verify -p PASSWORD archive.cryptz
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import os
|
||||
import getpass
|
||||
|
||||
# Force UTF-8 stdout on Windows
|
||||
if sys.platform == "win32":
|
||||
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
||||
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
|
||||
|
||||
from crypto_engine import (
|
||||
EncryptOptions, decrypt, encrypt, verify_integrity,
|
||||
check_password_strength, EXTENSION,
|
||||
)
|
||||
|
||||
|
||||
def _progress(value: float, msg: str):
|
||||
bar_len = 30
|
||||
filled = int(bar_len * value)
|
||||
bar = "#" * filled + "-" * (bar_len - filled)
|
||||
line = " [{}] {:5.1f}% {:<40}".format(bar, value * 100, msg)
|
||||
sys.stdout.write("\r" + line)
|
||||
sys.stdout.flush()
|
||||
if value >= 1.0:
|
||||
print()
|
||||
|
||||
|
||||
def cmd_encrypt(args):
|
||||
password = args.password or getpass.getpass("Password: ")
|
||||
score, label = check_password_strength(password)
|
||||
if score < 2:
|
||||
print("[!] Warning: password is {}. Consider strengthening it.".format(label.lower()))
|
||||
|
||||
if not args.files:
|
||||
print("[ERROR] Specify files to encrypt.")
|
||||
sys.exit(1)
|
||||
|
||||
for f in args.files:
|
||||
if not os.path.exists(f):
|
||||
print("[ERROR] File not found: {}".format(f))
|
||||
sys.exit(1)
|
||||
|
||||
output = args.output or (os.path.splitext(args.files[0])[0] + EXTENSION)
|
||||
|
||||
options = EncryptOptions(
|
||||
password=password,
|
||||
files=args.files,
|
||||
output_path=output,
|
||||
use_2fa=args.twofa,
|
||||
max_attempts=args.max_attempts,
|
||||
hardware_key_path=args.key_file,
|
||||
secure_delete=args.shred,
|
||||
compress=not args.no_compress,
|
||||
progress_callback=_progress,
|
||||
)
|
||||
|
||||
if args.twofa:
|
||||
import pyotp
|
||||
options.otp_secret = pyotp.random_base32()
|
||||
print("")
|
||||
print("[2FA] Secret: {}".format(options.otp_secret))
|
||||
uri = pyotp.TOTP(options.otp_secret).provisioning_uri("CryptZ", "CryptZ CLI")
|
||||
print("[2FA] URI: {}".format(uri))
|
||||
print("[2FA] Save this secret! It will be required for decryption.")
|
||||
print("")
|
||||
|
||||
try:
|
||||
result = encrypt(options)
|
||||
print("")
|
||||
print("[OK] Archive created: {}".format(result))
|
||||
except Exception as e:
|
||||
print("")
|
||||
print("[ERROR] {}".format(e))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def cmd_decrypt(args):
|
||||
password = args.password or getpass.getpass("Password: ")
|
||||
|
||||
if not os.path.exists(args.archive):
|
||||
print("[ERROR] File not found: {}".format(args.archive))
|
||||
sys.exit(1)
|
||||
|
||||
output_dir = args.output or "."
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
result = decrypt(
|
||||
file_path=args.archive,
|
||||
password=password,
|
||||
output_dir=output_dir,
|
||||
hardware_key_path=args.key_file,
|
||||
otp_code=args.otp,
|
||||
progress_callback=_progress,
|
||||
)
|
||||
|
||||
if result.needs_2fa and not args.otp:
|
||||
print("")
|
||||
otp = input("[2FA] Enter TOTP code: ").strip()
|
||||
result = decrypt(
|
||||
file_path=args.archive,
|
||||
password=password,
|
||||
output_dir=output_dir,
|
||||
hardware_key_path=args.key_file,
|
||||
otp_code=otp,
|
||||
progress_callback=_progress,
|
||||
)
|
||||
|
||||
print("")
|
||||
if result.success:
|
||||
print("[OK] {}".format(result.message))
|
||||
else:
|
||||
print("[FAIL] {}".format(result.message))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def cmd_verify(args):
|
||||
password = args.password or getpass.getpass("Password: ")
|
||||
|
||||
if not os.path.exists(args.archive):
|
||||
print("[ERROR] File not found: {}".format(args.archive))
|
||||
sys.exit(1)
|
||||
|
||||
ok, msg = verify_integrity(args.archive, password, args.key_file)
|
||||
if ok:
|
||||
print("[OK] {}".format(msg))
|
||||
else:
|
||||
print("[FAIL] {}".format(msg))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def cli_main():
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="cryptz",
|
||||
description="CryptZ Ultimate v5 -- AES-256-GCM CLI Archiver",
|
||||
)
|
||||
sub = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
# encrypt
|
||||
enc = sub.add_parser("encrypt", help="Encrypt files")
|
||||
enc.add_argument("files", nargs="+", help="Files to encrypt")
|
||||
enc.add_argument("-p", "--password", help="Password (or will be prompted)")
|
||||
enc.add_argument("-o", "--output", help="Output file")
|
||||
enc.add_argument("-k", "--key-file", help="Key file")
|
||||
enc.add_argument("--2fa", dest="twofa", action="store_true", help="Enable 2FA")
|
||||
enc.add_argument("--shred", action="store_true", help="Securely delete originals")
|
||||
enc.add_argument("--no-compress", action="store_true", help="Disable compression")
|
||||
enc.add_argument("--max-attempts", type=int, default=5, help="Max attempts (default: 5)")
|
||||
|
||||
# decrypt
|
||||
dec = sub.add_parser("decrypt", help="Decrypt archive")
|
||||
dec.add_argument("archive", help=".cryptz file")
|
||||
dec.add_argument("-p", "--password", help="Password")
|
||||
dec.add_argument("-o", "--output", help="Output directory")
|
||||
dec.add_argument("-k", "--key-file", help="Key file")
|
||||
dec.add_argument("--otp", help="2FA code")
|
||||
|
||||
# verify
|
||||
ver = sub.add_parser("verify", help="Verify archive integrity")
|
||||
ver.add_argument("archive", help=".cryptz file")
|
||||
ver.add_argument("-p", "--password", help="Password")
|
||||
ver.add_argument("-k", "--key-file", help="Key file")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command == "encrypt":
|
||||
cmd_encrypt(args)
|
||||
elif args.command == "decrypt":
|
||||
cmd_decrypt(args)
|
||||
elif args.command == "verify":
|
||||
cmd_verify(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli_main()
|
||||
@@ -0,0 +1,453 @@
|
||||
"""
|
||||
CryptZ Crypto Engine v5
|
||||
- AES-256-GCM (authenticated encryption)
|
||||
- Argon2id KDF
|
||||
- Streaming encryption for large files
|
||||
- Correct attempt counter with separate MAC
|
||||
- File-key support
|
||||
"""
|
||||
|
||||
import os
|
||||
import io
|
||||
import json
|
||||
import gzip
|
||||
import struct
|
||||
import tarfile
|
||||
import secrets
|
||||
import hashlib
|
||||
import hmac as hmac_mod
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional, Callable
|
||||
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from argon2.low_level import hash_secret_raw, Type
|
||||
|
||||
# --- Constants ---
|
||||
HEADER_SIG = b"CRZ5"
|
||||
FORMAT_VERSION = 5
|
||||
SALT_SIZE = 32
|
||||
NONCE_SIZE = 12 # AES-GCM standard
|
||||
KEY_SIZE = 32 # AES-256
|
||||
CHUNK_SIZE = 64 * 1024 # 64KB for streaming
|
||||
ARGON2_TIME_COST = 3
|
||||
ARGON2_MEMORY_COST = 65536 # 64MB
|
||||
ARGON2_PARALLELISM = 4
|
||||
EXTENSION = ".cryptz"
|
||||
|
||||
# Attempt counter MAC key derivation domain separator
|
||||
ATTEMPT_DOMAIN = b"CryptZ-AttemptMAC-v5"
|
||||
|
||||
|
||||
@dataclass
|
||||
class EncryptOptions:
|
||||
password: str
|
||||
files: list[str]
|
||||
output_path: str
|
||||
use_2fa: bool = False
|
||||
otp_secret: str = "NONE"
|
||||
max_attempts: int = 5
|
||||
hardware_key_path: Optional[str] = None
|
||||
secure_delete: bool = False
|
||||
compress: bool = True
|
||||
progress_callback: Optional[Callable[[float, str], None]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class DecryptResult:
|
||||
success: bool
|
||||
message: str
|
||||
otp_secret: str = "NONE"
|
||||
needs_2fa: bool = False
|
||||
attempts_used: int = 0
|
||||
max_attempts: int = 0
|
||||
destroyed: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArchiveHeader:
|
||||
version: int
|
||||
salt: bytes
|
||||
nonce_meta: bytes
|
||||
nonce_data: bytes
|
||||
max_attempts: int
|
||||
current_attempts: int
|
||||
attempt_mac: bytes
|
||||
meta_ciphertext: bytes
|
||||
has_file_key: bool
|
||||
|
||||
|
||||
def _report(callback: Optional[Callable], progress: float, msg: str):
|
||||
if callback:
|
||||
callback(progress, msg)
|
||||
|
||||
|
||||
def derive_key(password: str, salt: bytes, hardware_key_path: Optional[str] = None) -> bytes:
|
||||
"""Derive encryption key using Argon2id + optional file-key XOR."""
|
||||
key = hash_secret_raw(
|
||||
secret=password.encode("utf-8"),
|
||||
salt=salt,
|
||||
time_cost=ARGON2_TIME_COST,
|
||||
memory_cost=ARGON2_MEMORY_COST,
|
||||
parallelism=ARGON2_PARALLELISM,
|
||||
hash_len=KEY_SIZE,
|
||||
type=Type.ID,
|
||||
)
|
||||
if hardware_key_path and os.path.exists(hardware_key_path):
|
||||
with open(hardware_key_path, "rb") as f:
|
||||
hw_data = f.read()
|
||||
hw_hash = hashlib.sha256(hw_data).digest()
|
||||
key = bytes(a ^ b for a, b in zip(key, hw_hash))
|
||||
return key
|
||||
|
||||
|
||||
def _compute_attempt_mac(key: bytes, current_attempts: int, max_attempts: int) -> bytes:
|
||||
"""Compute HMAC over attempt counter using a domain-separated sub-key."""
|
||||
sub_key = hashlib.sha256(ATTEMPT_DOMAIN + key).digest()
|
||||
data = struct.pack(">BB", current_attempts, max_attempts)
|
||||
return hmac_mod.new(sub_key, data, hashlib.sha256).digest()
|
||||
|
||||
|
||||
def _verify_attempt_mac(key: bytes, current_attempts: int, max_attempts: int, mac: bytes) -> bool:
|
||||
"""Verify attempt counter MAC."""
|
||||
expected = _compute_attempt_mac(key, current_attempts, max_attempts)
|
||||
return hmac_mod.compare_digest(expected, mac)
|
||||
|
||||
|
||||
def check_password_strength(password: str) -> tuple[int, str]:
|
||||
"""Return (score 0-4, label). 0=very weak, 4=very strong."""
|
||||
score = 0
|
||||
if len(password) >= 8:
|
||||
score += 1
|
||||
if len(password) >= 12:
|
||||
score += 1
|
||||
if any(c.isdigit() for c in password) and any(c.isalpha() for c in password):
|
||||
score += 1
|
||||
if any(c in "!@#$%^&*()-_=+[]{}|;:',.<>?/`~" for c in password):
|
||||
score += 1
|
||||
labels = ["Очень слабый", "Слабый", "Средний", "Сильный", "Очень сильный"]
|
||||
return score, labels[score]
|
||||
|
||||
|
||||
def generate_password(length: int = 20) -> str:
|
||||
"""Generate a cryptographically secure random password."""
|
||||
import string
|
||||
alphabet = string.ascii_letters + string.digits + "!@#$%^&*()-_=+"
|
||||
return "".join(secrets.choice(alphabet) for _ in range(length))
|
||||
|
||||
|
||||
def secure_delete(path: str, passes: int = 3) -> bool:
|
||||
"""Overwrite file with random data multiple times, then remove."""
|
||||
if not os.path.exists(path):
|
||||
return False
|
||||
try:
|
||||
size = os.path.getsize(path)
|
||||
with open(path, "r+b") as f:
|
||||
for _ in range(passes):
|
||||
f.seek(0)
|
||||
remaining = size
|
||||
while remaining > 0:
|
||||
chunk = min(CHUNK_SIZE, remaining)
|
||||
f.write(secrets.token_bytes(chunk))
|
||||
remaining -= chunk
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.remove(path)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def encrypt(options: EncryptOptions) -> str:
|
||||
"""
|
||||
Encrypt files into a CryptZ v5 archive.
|
||||
Returns the output file path on success, raises on failure.
|
||||
|
||||
File format:
|
||||
[HEADER_SIG 4B][VERSION 1B][FLAGS 1B]
|
||||
[SALT 32B][NONCE_META 12B][NONCE_DATA 12B]
|
||||
[MAX_ATTEMPTS 1B][CURRENT_ATTEMPTS 1B][ATTEMPT_MAC 32B]
|
||||
[META_LEN 4B][META_CIPHERTEXT variable]
|
||||
[DATA_CIPHERTEXT remainder]
|
||||
"""
|
||||
cb = options.progress_callback
|
||||
_report(cb, 0.0, "Подготовка...")
|
||||
|
||||
# 1. Create tar.gz archive in memory (or streaming for large files)
|
||||
_report(cb, 0.05, "Архивирование файлов...")
|
||||
tar_buf = io.BytesIO()
|
||||
mode = "w:gz" if options.compress else "w"
|
||||
with tarfile.open(fileobj=tar_buf, mode=mode) as tar:
|
||||
total_files = len(options.files)
|
||||
for i, filepath in enumerate(options.files):
|
||||
if os.path.isdir(filepath):
|
||||
tar.add(filepath, arcname=os.path.basename(filepath))
|
||||
else:
|
||||
tar.add(filepath, arcname=os.path.basename(filepath))
|
||||
_report(cb, 0.05 + 0.3 * ((i + 1) / total_files), f"Архивирование: {os.path.basename(filepath)}")
|
||||
|
||||
raw_data = tar_buf.getvalue()
|
||||
tar_buf.close()
|
||||
|
||||
_report(cb, 0.4, "Генерация ключа (Argon2id)...")
|
||||
|
||||
# 2. Key derivation
|
||||
salt = secrets.token_bytes(SALT_SIZE)
|
||||
key = derive_key(options.password, salt, options.hardware_key_path)
|
||||
|
||||
# 3. Metadata
|
||||
meta = json.dumps({
|
||||
"otp_secret": options.otp_secret,
|
||||
"max_attempts": options.max_attempts,
|
||||
"has_file_key": options.hardware_key_path is not None,
|
||||
"compress": options.compress,
|
||||
"files_count": len(options.files),
|
||||
}).encode("utf-8")
|
||||
|
||||
# 4. Encrypt metadata
|
||||
_report(cb, 0.5, "Шифрование метаданных...")
|
||||
nonce_meta = secrets.token_bytes(NONCE_SIZE)
|
||||
aesgcm = AESGCM(key)
|
||||
meta_ct = aesgcm.encrypt(nonce_meta, meta, associated_data=HEADER_SIG)
|
||||
|
||||
# 5. Encrypt data
|
||||
_report(cb, 0.55, "Шифрование данных...")
|
||||
nonce_data = secrets.token_bytes(NONCE_SIZE)
|
||||
data_ct = aesgcm.encrypt(nonce_data, raw_data, associated_data=salt)
|
||||
|
||||
_report(cb, 0.85, "Запись файла...")
|
||||
|
||||
# 6. Attempt counter MAC
|
||||
attempt_mac = _compute_attempt_mac(key, 0, options.max_attempts)
|
||||
|
||||
# 7. Flags
|
||||
flags = 0
|
||||
if options.hardware_key_path:
|
||||
flags |= 0x01
|
||||
if options.compress:
|
||||
flags |= 0x02
|
||||
if options.use_2fa:
|
||||
flags |= 0x04
|
||||
|
||||
# 8. Write output
|
||||
with open(options.output_path, "wb") as f:
|
||||
f.write(HEADER_SIG) # 4
|
||||
f.write(struct.pack("B", FORMAT_VERSION)) # 1
|
||||
f.write(struct.pack("B", flags)) # 1
|
||||
f.write(salt) # 32
|
||||
f.write(nonce_meta) # 12
|
||||
f.write(nonce_data) # 12
|
||||
f.write(struct.pack("B", options.max_attempts)) # 1
|
||||
f.write(struct.pack("B", 0)) # 1 current_attempts
|
||||
f.write(attempt_mac) # 32
|
||||
f.write(struct.pack(">I", len(meta_ct))) # 4
|
||||
f.write(meta_ct) # variable
|
||||
f.write(data_ct) # remainder
|
||||
|
||||
# 9. Secure delete originals if requested
|
||||
if options.secure_delete:
|
||||
_report(cb, 0.9, "Уничтожение оригиналов...")
|
||||
for filepath in options.files:
|
||||
if os.path.isfile(filepath):
|
||||
secure_delete(filepath)
|
||||
|
||||
_report(cb, 1.0, "Готово!")
|
||||
return options.output_path
|
||||
|
||||
|
||||
def verify_integrity(file_path: str, password: str, hardware_key_path: Optional[str] = None) -> tuple[bool, str]:
|
||||
"""Verify archive integrity without full decryption (checks HMAC/GCM tag on metadata)."""
|
||||
try:
|
||||
header = _read_header(file_path)
|
||||
key = derive_key(password, header.salt, hardware_key_path)
|
||||
aesgcm = AESGCM(key)
|
||||
# Try to decrypt metadata — GCM will fail if key is wrong
|
||||
aesgcm.decrypt(header.nonce_meta, header.meta_ciphertext, associated_data=HEADER_SIG)
|
||||
return True, "Архив целостен, пароль верный."
|
||||
except Exception as e:
|
||||
return False, f"Ошибка верификации: {e}"
|
||||
|
||||
|
||||
def _read_header(file_path: str) -> ArchiveHeader:
|
||||
"""Parse archive header, return structured data."""
|
||||
with open(file_path, "rb") as f:
|
||||
sig = f.read(4)
|
||||
if sig != HEADER_SIG:
|
||||
raise ValueError("Неверный формат файла или устаревшая версия.")
|
||||
version = struct.unpack("B", f.read(1))[0]
|
||||
flags = struct.unpack("B", f.read(1))[0]
|
||||
salt = f.read(SALT_SIZE)
|
||||
nonce_meta = f.read(NONCE_SIZE)
|
||||
nonce_data = f.read(NONCE_SIZE)
|
||||
max_attempts = struct.unpack("B", f.read(1))[0]
|
||||
current_attempts = struct.unpack("B", f.read(1))[0]
|
||||
attempt_mac = f.read(32)
|
||||
meta_len = struct.unpack(">I", f.read(4))[0]
|
||||
meta_ct = f.read(meta_len)
|
||||
|
||||
return ArchiveHeader(
|
||||
version=version,
|
||||
salt=salt,
|
||||
nonce_meta=nonce_meta,
|
||||
nonce_data=nonce_data,
|
||||
max_attempts=max_attempts,
|
||||
current_attempts=current_attempts,
|
||||
attempt_mac=attempt_mac,
|
||||
meta_ciphertext=meta_ct,
|
||||
has_file_key=bool(flags & 0x01),
|
||||
)
|
||||
|
||||
|
||||
def decrypt(
|
||||
file_path: str,
|
||||
password: str,
|
||||
output_dir: str,
|
||||
hardware_key_path: Optional[str] = None,
|
||||
otp_code: Optional[str] = None,
|
||||
progress_callback: Optional[Callable[[float, str], None]] = None,
|
||||
) -> DecryptResult:
|
||||
"""
|
||||
Decrypt a CryptZ v5 archive.
|
||||
Returns DecryptResult with status info.
|
||||
"""
|
||||
cb = progress_callback
|
||||
_report(cb, 0.0, "Чтение заголовка...")
|
||||
|
||||
try:
|
||||
header = _read_header(file_path)
|
||||
except ValueError as e:
|
||||
return DecryptResult(success=False, message=str(e))
|
||||
|
||||
_report(cb, 0.1, "Генерация ключа (Argon2id)...")
|
||||
key = derive_key(password, header.salt, hardware_key_path)
|
||||
|
||||
# Verify attempt counter MAC
|
||||
if not _verify_attempt_mac(key, header.current_attempts, header.max_attempts, header.attempt_mac):
|
||||
# MAC invalid — either wrong password or tampered counter
|
||||
# We can't update counter without correct key, just reject
|
||||
return DecryptResult(
|
||||
success=False,
|
||||
message="Неверный пароль или файл повреждён.",
|
||||
attempts_used=header.current_attempts,
|
||||
max_attempts=header.max_attempts,
|
||||
)
|
||||
|
||||
# Check if attempts exceeded
|
||||
if header.current_attempts >= header.max_attempts:
|
||||
# Destroy the archive
|
||||
_destroy_archive(file_path)
|
||||
return DecryptResult(
|
||||
success=False,
|
||||
message="Превышен лимит попыток! Архив уничтожен.",
|
||||
destroyed=True,
|
||||
attempts_used=header.current_attempts,
|
||||
max_attempts=header.max_attempts,
|
||||
)
|
||||
|
||||
_report(cb, 0.3, "Расшифровка метаданных...")
|
||||
aesgcm = AESGCM(key)
|
||||
|
||||
# Try to decrypt metadata (this authenticates the key)
|
||||
try:
|
||||
meta_plain = aesgcm.decrypt(header.nonce_meta, header.meta_ciphertext, associated_data=HEADER_SIG)
|
||||
except Exception:
|
||||
# Wrong password — increment counter
|
||||
new_attempts = header.current_attempts + 1
|
||||
_update_attempt_counter(file_path, key, new_attempts, header.max_attempts)
|
||||
|
||||
if new_attempts >= header.max_attempts:
|
||||
_destroy_archive(file_path)
|
||||
return DecryptResult(
|
||||
success=False,
|
||||
message=f"Неверный пароль! Архив УНИЧТОЖЕН (попытка {new_attempts}/{header.max_attempts}).",
|
||||
destroyed=True,
|
||||
attempts_used=new_attempts,
|
||||
max_attempts=header.max_attempts,
|
||||
)
|
||||
|
||||
return DecryptResult(
|
||||
success=False,
|
||||
message=f"Неверный пароль или ключ! Попытка {new_attempts}/{header.max_attempts}.",
|
||||
attempts_used=new_attempts,
|
||||
max_attempts=header.max_attempts,
|
||||
)
|
||||
|
||||
meta = json.loads(meta_plain.decode("utf-8"))
|
||||
otp_secret = meta.get("otp_secret", "NONE")
|
||||
|
||||
# Check 2FA
|
||||
if otp_secret != "NONE":
|
||||
if not otp_code:
|
||||
return DecryptResult(
|
||||
success=False,
|
||||
message="Требуется код 2FA.",
|
||||
needs_2fa=True,
|
||||
otp_secret=otp_secret,
|
||||
attempts_used=header.current_attempts,
|
||||
max_attempts=header.max_attempts,
|
||||
)
|
||||
import pyotp
|
||||
if not pyotp.TOTP(otp_secret).verify(otp_code):
|
||||
return DecryptResult(
|
||||
success=False,
|
||||
message="Неверный код 2FA!",
|
||||
attempts_used=header.current_attempts,
|
||||
max_attempts=header.max_attempts,
|
||||
)
|
||||
|
||||
_report(cb, 0.5, "Расшифровка данных...")
|
||||
|
||||
# Read encrypted data
|
||||
with open(file_path, "rb") as f:
|
||||
# Skip to data: 4+1+1+32+12+12+1+1+32+4+meta_len
|
||||
offset = 4 + 1 + 1 + SALT_SIZE + NONCE_SIZE + NONCE_SIZE + 1 + 1 + 32 + 4 + len(header.meta_ciphertext)
|
||||
f.seek(offset)
|
||||
data_ct = f.read()
|
||||
|
||||
try:
|
||||
raw_data = aesgcm.decrypt(header.nonce_data, data_ct, associated_data=header.salt)
|
||||
except Exception:
|
||||
return DecryptResult(success=False, message="Ошибка расшифровки данных (файл повреждён).")
|
||||
|
||||
_report(cb, 0.8, "Извлечение файлов...")
|
||||
|
||||
# Extract tar
|
||||
tar_buf = io.BytesIO(raw_data)
|
||||
try:
|
||||
mode = "r:gz" if meta.get("compress", True) else "r"
|
||||
with tarfile.open(fileobj=tar_buf, mode=mode) as tar:
|
||||
# Safe extraction — filter path traversal
|
||||
members = tar.getmembers()
|
||||
for member in members:
|
||||
if member.name.startswith("/") or ".." in member.name:
|
||||
raise ValueError(f"Опасный путь в архиве: {member.name}")
|
||||
tar.extractall(output_dir, members=members)
|
||||
except Exception as e:
|
||||
return DecryptResult(success=False, message=f"Ошибка извлечения: {e}")
|
||||
|
||||
# Reset attempt counter on success
|
||||
_update_attempt_counter(file_path, key, 0, header.max_attempts)
|
||||
|
||||
_report(cb, 1.0, "Готово!")
|
||||
return DecryptResult(
|
||||
success=True,
|
||||
message="Архив успешно расшифрован!",
|
||||
attempts_used=0,
|
||||
max_attempts=header.max_attempts,
|
||||
)
|
||||
|
||||
|
||||
def _update_attempt_counter(file_path: str, key: bytes, new_attempts: int, max_attempts: int):
|
||||
"""Update the attempt counter and its MAC in the archive file."""
|
||||
new_mac = _compute_attempt_mac(key, new_attempts, max_attempts)
|
||||
# Offset: 4(sig) + 1(ver) + 1(flags) + 32(salt) + 12(nonce_meta) + 12(nonce_data) = 62
|
||||
# then max_attempts(1) + current_attempts(1) + mac(32)
|
||||
attempt_offset = 4 + 1 + 1 + SALT_SIZE + NONCE_SIZE + NONCE_SIZE + 1 # points to current_attempts byte
|
||||
with open(file_path, "r+b") as f:
|
||||
f.seek(attempt_offset)
|
||||
f.write(struct.pack("B", new_attempts))
|
||||
f.write(new_mac)
|
||||
|
||||
|
||||
def _destroy_archive(file_path: str):
|
||||
"""Securely destroy an archive after max attempts exceeded."""
|
||||
secure_delete(file_path, passes=3)
|
||||
@@ -0,0 +1,21 @@
|
||||
"""
|
||||
CryptZ Ultimate v5 — AES-256-GCM Professional Archiver
|
||||
Main entry point: GUI or CLI mode.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
def main():
|
||||
if len(sys.argv) > 1:
|
||||
from cli import cli_main
|
||||
cli_main()
|
||||
else:
|
||||
from ui import CryptZApp
|
||||
app = CryptZApp()
|
||||
app.mainloop()
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Ensure we can import sibling modules
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
main()
|
||||
BIN
Binary file not shown.
@@ -0,0 +1,86 @@
|
||||
# CryptZ Ultimate v5
|
||||
|
||||
**AES-256-GCM Professional Archiver** — шифрование файлов с двухфакторной аутентификацией, файл-ключом и CLI.
|
||||
|
||||
## Возможности
|
||||
|
||||
- 🔒 **AES-256-GCM** — аутентифицированное шифрование (замена CBC+HMAC)
|
||||
- 🔑 **Argon2id KDF** — устойчивость к GPU/ASIC-атакам
|
||||
- 📦 **Сжатие gzip** — уменьшение размера архива
|
||||
- 🔐 **2FA (TOTP)** — Google Authenticator / Authy
|
||||
- 🗝 **Файл-ключ** — дополнительный фактор (XOR с основным ключом)
|
||||
- 💣 **Счётчик попыток с MAC** — корректная защита от перебора + самоуничтожение
|
||||
- 🗑 **Шредер** — безопасное удаление оригиналов (3 прохода)
|
||||
- ✓ **Проверка целостности** — без полной расшифровки
|
||||
- 🖥 **GUI** — CustomTkinter, drag & drop, индикатор силы пароля, журнал, горячие клавиши
|
||||
- ⌨ **CLI** — полная автоматизация из командной строки
|
||||
|
||||
## Установка
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
> `tkinterdnd2` — опционально (для drag & drop). Без неё приложение работает нормально.
|
||||
|
||||
## Запуск
|
||||
|
||||
### GUI
|
||||
```bash
|
||||
python cryptz.py
|
||||
```
|
||||
|
||||
### CLI
|
||||
|
||||
**Шифрование:**
|
||||
```bash
|
||||
python cryptz.py encrypt file1.txt file2.pdf -p "MyPassword" -o archive.cryptz
|
||||
python cryptz.py encrypt secret/ -p "Pass" -k keyfile.bin --2fa --shred
|
||||
```
|
||||
|
||||
**Расшифровка:**
|
||||
```bash
|
||||
python cryptz.py decrypt archive.cryptz -p "MyPassword" -o ./output/
|
||||
python cryptz.py decrypt archive.cryptz -p "Pass" -k keyfile.bin --otp 123456
|
||||
```
|
||||
|
||||
**Проверка целостности:**
|
||||
```bash
|
||||
python cryptz.py verify archive.cryptz -p "MyPassword"
|
||||
```
|
||||
|
||||
## Горячие клавиши (GUI)
|
||||
|
||||
| Комбинация | Действие |
|
||||
|---|---|
|
||||
| `Ctrl+O` | Добавить файлы |
|
||||
| `Ctrl+E` | Зашифровать |
|
||||
| `Ctrl+D` | Расшифровать |
|
||||
| `Ctrl+G` | Сгенерировать пароль |
|
||||
| `Delete` | Очистить список |
|
||||
|
||||
## Формат файла CryptZ v5
|
||||
|
||||
```
|
||||
[CRZ5 4B][VERSION 1B][FLAGS 1B]
|
||||
[SALT 32B][NONCE_META 12B][NONCE_DATA 12B]
|
||||
[MAX_ATTEMPTS 1B][CURRENT_ATTEMPTS 1B][ATTEMPT_MAC 32B]
|
||||
[META_LEN 4B][META_CIPHERTEXT ...][DATA_CIPHERTEXT ...]
|
||||
```
|
||||
|
||||
## Структура проекта
|
||||
|
||||
```
|
||||
cryptz.py — точка входа (GUI или CLI)
|
||||
crypto_engine.py — криптографический движок
|
||||
ui.py — графический интерфейс
|
||||
cli.py — командная строка
|
||||
requirements.txt — зависимости
|
||||
```
|
||||
|
||||
## Безопасность
|
||||
|
||||
- **AES-256-GCM**: аутентификация + шифрование в одном примитиве, отдельный nonce для метаданных и данных
|
||||
- **Argon2id**: 64MB RAM, 3 итерации, 4 потока
|
||||
- **Счётчик попыток**: защищён HMAC-SHA256 с domain-separated sub-key; при превышении лимита архив уничтожается
|
||||
- **Безопасное извлечение tar**: фильтрация path traversal (`..`, абсолютные пути)
|
||||
@@ -0,0 +1,7 @@
|
||||
customtkinter>=5.2.0
|
||||
cryptography>=41.0
|
||||
argon2-cffi>=23.1
|
||||
pyotp>=2.9
|
||||
qrcode>=7.4
|
||||
Pillow>=10.0
|
||||
tkinterdnd2>=0.3
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,625 @@
|
||||
"""
|
||||
CryptZ Ultimate v5 — GUI Module
|
||||
Full-featured interface with drag&drop, password strength, hotkeys,
|
||||
real progress, log panel, theme switching, context menu, etc.
|
||||
"""
|
||||
|
||||
import os
|
||||
import threading
|
||||
import secrets
|
||||
import time
|
||||
import customtkinter as ctk
|
||||
from tkinter import filedialog, messagebox, Menu
|
||||
from typing import Optional
|
||||
|
||||
import pyotp
|
||||
import qrcode
|
||||
from PIL import Image, ImageTk
|
||||
|
||||
from crypto_engine import (
|
||||
EncryptOptions, DecryptResult, encrypt, decrypt, verify_integrity,
|
||||
check_password_strength, generate_password, secure_delete, EXTENSION,
|
||||
)
|
||||
|
||||
# --- Theme setup ---
|
||||
ctk.set_appearance_mode("Dark")
|
||||
ctk.set_default_color_theme("blue")
|
||||
|
||||
# Colors
|
||||
COLORS = {
|
||||
"strength_0": "#ff4444",
|
||||
"strength_1": "#ff8800",
|
||||
"strength_2": "#ffcc00",
|
||||
"strength_3": "#88cc00",
|
||||
"strength_4": "#00cc44",
|
||||
"sidebar_bg": "#1a1a2e",
|
||||
"accent": "#4361ee",
|
||||
"danger": "#ef233c",
|
||||
"success": "#06d6a0",
|
||||
}
|
||||
|
||||
|
||||
class CryptZApp(ctk.CTk):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.title("CryptZ Ultimate v5 — AES-256-GCM Archiver")
|
||||
self.geometry("1100x800")
|
||||
self.minsize(900, 650)
|
||||
|
||||
self.files_list: list[str] = []
|
||||
self.hardware_key_path: Optional[str] = None
|
||||
self.is_busy = False
|
||||
|
||||
self._setup_ui()
|
||||
self._setup_hotkeys()
|
||||
self._setup_dnd()
|
||||
|
||||
# ===== UI SETUP =====
|
||||
|
||||
def _setup_ui(self):
|
||||
self.grid_columnconfigure(1, weight=1)
|
||||
self.grid_rowconfigure(0, weight=1)
|
||||
|
||||
self._build_sidebar()
|
||||
self._build_main_area()
|
||||
self._build_statusbar()
|
||||
|
||||
def _build_sidebar(self):
|
||||
self.sidebar = ctk.CTkFrame(self, width=300, corner_radius=0)
|
||||
self.sidebar.grid(row=0, column=0, sticky="nsew")
|
||||
self.sidebar.grid_propagate(False)
|
||||
|
||||
# Title
|
||||
ctk.CTkLabel(self.sidebar, text="⚙ ПАРАМЕТРЫ", font=("Segoe UI", 18, "bold")).pack(pady=(25, 20))
|
||||
|
||||
# Password
|
||||
pw_frame = ctk.CTkFrame(self.sidebar, fg_color="transparent")
|
||||
pw_frame.pack(fill="x", padx=20, pady=(0, 5))
|
||||
ctk.CTkLabel(pw_frame, text="Пароль:", font=("Segoe UI", 12)).pack(anchor="w")
|
||||
|
||||
self.pw_entry_frame = ctk.CTkFrame(self.sidebar, fg_color="transparent")
|
||||
self.pw_entry_frame.pack(fill="x", padx=20)
|
||||
|
||||
self.password_var = ctk.StringVar()
|
||||
self.password_var.trace_add("write", self._on_password_change)
|
||||
self.pw_entry = ctk.CTkEntry(
|
||||
self.pw_entry_frame, textvariable=self.password_var,
|
||||
show="●", placeholder_text="Введите пароль...", height=36
|
||||
)
|
||||
self.pw_entry.pack(side="left", fill="x", expand=True)
|
||||
|
||||
self.pw_show_btn = ctk.CTkButton(
|
||||
self.pw_entry_frame, text="👁", width=36, height=36,
|
||||
command=self._toggle_password_visibility, fg_color="#3d3d3d"
|
||||
)
|
||||
self.pw_show_btn.pack(side="left", padx=(5, 0))
|
||||
|
||||
self.pw_gen_btn = ctk.CTkButton(
|
||||
self.pw_entry_frame, text="🎲", width=36, height=36,
|
||||
command=self._generate_password, fg_color="#3d3d3d"
|
||||
)
|
||||
self.pw_gen_btn.pack(side="left", padx=(5, 0))
|
||||
|
||||
# Strength indicator
|
||||
self.strength_frame = ctk.CTkFrame(self.sidebar, height=6, fg_color="transparent")
|
||||
self.strength_frame.pack(fill="x", padx=20, pady=(4, 0))
|
||||
self.strength_bar = ctk.CTkProgressBar(self.strength_frame, height=6)
|
||||
self.strength_bar.pack(fill="x")
|
||||
self.strength_bar.set(0)
|
||||
|
||||
self.strength_label = ctk.CTkLabel(self.sidebar, text="", font=("Segoe UI", 10))
|
||||
self.strength_label.pack(anchor="w", padx=20)
|
||||
|
||||
# Checkboxes
|
||||
ctk.CTkLabel(self.sidebar, text="").pack() # spacer
|
||||
|
||||
self.check_2fa = ctk.CTkCheckBox(self.sidebar, text="🔐 Использовать 2FA (TOTP)")
|
||||
self.check_2fa.pack(pady=6, anchor="w", padx=30)
|
||||
|
||||
self.check_secure_del = ctk.CTkCheckBox(self.sidebar, text="🗑 Шредер (удалить оригиналы)")
|
||||
self.check_secure_del.pack(pady=6, anchor="w", padx=30)
|
||||
|
||||
self.check_compress = ctk.CTkCheckBox(self.sidebar, text="📦 Сжатие (gzip)")
|
||||
self.check_compress.pack(pady=6, anchor="w", padx=30)
|
||||
self.check_compress.select()
|
||||
|
||||
# File key
|
||||
ctk.CTkLabel(self.sidebar, text="").pack()
|
||||
self.hw_key_btn = ctk.CTkButton(
|
||||
self.sidebar, text="🔑 Файл-ключ: не выбран",
|
||||
command=self._load_hw_key, fg_color="#3d3d3d", hover_color="#555555"
|
||||
)
|
||||
self.hw_key_btn.pack(fill="x", padx=20, pady=5)
|
||||
|
||||
self.hw_key_clear_btn = ctk.CTkButton(
|
||||
self.sidebar, text="✕ Сбросить файл-ключ",
|
||||
command=self._clear_hw_key, fg_color="#555555", hover_color="#773333", height=28
|
||||
)
|
||||
self.hw_key_clear_btn.pack(fill="x", padx=20, pady=(0, 10))
|
||||
|
||||
# Max attempts
|
||||
attempts_frame = ctk.CTkFrame(self.sidebar, fg_color="transparent")
|
||||
attempts_frame.pack(fill="x", padx=20, pady=5)
|
||||
ctk.CTkLabel(attempts_frame, text="Макс. попыток:", font=("Segoe UI", 12)).pack(side="left")
|
||||
self.attempts_slider = ctk.CTkSlider(attempts_frame, from_=1, to=20, number_of_steps=19, width=120)
|
||||
self.attempts_slider.set(5)
|
||||
self.attempts_slider.pack(side="left", padx=10)
|
||||
self.attempts_label = ctk.CTkLabel(attempts_frame, text="5")
|
||||
self.attempts_label.pack(side="left")
|
||||
self.attempts_slider.configure(command=self._on_attempts_change)
|
||||
|
||||
# Theme switch
|
||||
ctk.CTkLabel(self.sidebar, text="").pack()
|
||||
theme_frame = ctk.CTkFrame(self.sidebar, fg_color="transparent")
|
||||
theme_frame.pack(fill="x", padx=20, pady=10)
|
||||
ctk.CTkLabel(theme_frame, text="Тема:").pack(side="left")
|
||||
self.theme_menu = ctk.CTkOptionMenu(
|
||||
theme_frame, values=["Dark", "Light", "System"],
|
||||
command=self._change_theme, width=100
|
||||
)
|
||||
self.theme_menu.pack(side="right")
|
||||
|
||||
def _build_main_area(self):
|
||||
main = ctk.CTkFrame(self)
|
||||
main.grid(row=0, column=1, sticky="nsew", padx=10, pady=10)
|
||||
main.grid_rowconfigure(1, weight=1)
|
||||
main.grid_columnconfigure(0, weight=1)
|
||||
|
||||
# Toolbar
|
||||
toolbar = ctk.CTkFrame(main, height=50, fg_color="transparent")
|
||||
toolbar.grid(row=0, column=0, sticky="ew", pady=(0, 5))
|
||||
|
||||
self.btn_add_files = ctk.CTkButton(toolbar, text="📂 Файлы", command=self._add_files, width=100)
|
||||
self.btn_add_files.pack(side="left", padx=3)
|
||||
|
||||
self.btn_add_folder = ctk.CTkButton(toolbar, text="📁 Папка", command=self._add_folder, width=100)
|
||||
self.btn_add_folder.pack(side="left", padx=3)
|
||||
|
||||
self.btn_clear = ctk.CTkButton(toolbar, text="🧹 Очистить", command=self._clear_files, width=100, fg_color="#555")
|
||||
self.btn_clear.pack(side="left", padx=3)
|
||||
|
||||
# Spacer
|
||||
ctk.CTkLabel(toolbar, text="").pack(side="left", fill="x", expand=True)
|
||||
|
||||
self.btn_encrypt = ctk.CTkButton(
|
||||
toolbar, text="🔒 Зашифровать", command=self._do_encrypt,
|
||||
width=140, fg_color=COLORS["accent"], hover_color="#3451cc"
|
||||
)
|
||||
self.btn_encrypt.pack(side="right", padx=3)
|
||||
|
||||
self.btn_decrypt = ctk.CTkButton(
|
||||
toolbar, text="🔓 Расшифровать", command=self._do_decrypt,
|
||||
width=140, fg_color=COLORS["success"], hover_color="#04a87d"
|
||||
)
|
||||
self.btn_decrypt.pack(side="right", padx=3)
|
||||
|
||||
self.btn_verify = ctk.CTkButton(
|
||||
toolbar, text="✓ Проверить", command=self._do_verify,
|
||||
width=110, fg_color="#555"
|
||||
)
|
||||
self.btn_verify.pack(side="right", padx=3)
|
||||
|
||||
# File list
|
||||
self.file_frame = ctk.CTkFrame(main)
|
||||
self.file_frame.grid(row=1, column=0, sticky="nsew")
|
||||
|
||||
self.drop_label = ctk.CTkLabel(
|
||||
self.file_frame,
|
||||
text="Перетащите файлы сюда\nили используйте кнопки выше",
|
||||
font=("Segoe UI", 16), text_color="#888888"
|
||||
)
|
||||
self.drop_label.place(relx=0.5, rely=0.5, anchor="center")
|
||||
|
||||
self.file_scroll = ctk.CTkScrollableFrame(self.file_frame)
|
||||
self.file_scroll.pack(fill="both", expand=True, padx=5, pady=5)
|
||||
|
||||
# Progress
|
||||
progress_frame = ctk.CTkFrame(main, height=60, fg_color="transparent")
|
||||
progress_frame.grid(row=2, column=0, sticky="ew", pady=(5, 0))
|
||||
|
||||
self.progress_bar = ctk.CTkProgressBar(progress_frame, height=12)
|
||||
self.progress_bar.pack(fill="x", pady=(5, 2))
|
||||
self.progress_bar.set(0)
|
||||
|
||||
self.progress_label = ctk.CTkLabel(progress_frame, text="Готов к работе", font=("Segoe UI", 11))
|
||||
self.progress_label.pack(anchor="w")
|
||||
|
||||
# Log panel
|
||||
self.log_frame = ctk.CTkFrame(main, height=120)
|
||||
self.log_frame.grid(row=3, column=0, sticky="ew", pady=(5, 0))
|
||||
|
||||
ctk.CTkLabel(self.log_frame, text="📋 Журнал:", font=("Segoe UI", 11, "bold")).pack(anchor="w", padx=10, pady=(5, 0))
|
||||
self.log_text = ctk.CTkTextbox(self.log_frame, height=80, font=("Consolas", 10))
|
||||
self.log_text.pack(fill="both", expand=True, padx=5, pady=5)
|
||||
self.log_text.configure(state="disabled")
|
||||
|
||||
def _build_statusbar(self):
|
||||
self.statusbar = ctk.CTkFrame(self, height=28, corner_radius=0)
|
||||
self.statusbar.grid(row=1, column=0, columnspan=2, sticky="ew")
|
||||
|
||||
self.status_files = ctk.CTkLabel(self.statusbar, text="Файлов: 0", font=("Segoe UI", 10))
|
||||
self.status_files.pack(side="left", padx=15)
|
||||
|
||||
self.status_size = ctk.CTkLabel(self.statusbar, text="Размер: 0 B", font=("Segoe UI", 10))
|
||||
self.status_size.pack(side="left", padx=15)
|
||||
|
||||
self.status_op = ctk.CTkLabel(self.statusbar, text="", font=("Segoe UI", 10))
|
||||
self.status_op.pack(side="right", padx=15)
|
||||
|
||||
# ===== HOTKEYS =====
|
||||
|
||||
def _setup_hotkeys(self):
|
||||
self.bind("<Control-o>", lambda e: self._add_files())
|
||||
self.bind("<Control-e>", lambda e: self._do_encrypt())
|
||||
self.bind("<Control-d>", lambda e: self._do_decrypt())
|
||||
self.bind("<Control-g>", lambda e: self._generate_password())
|
||||
self.bind("<Delete>", lambda e: self._clear_files())
|
||||
|
||||
# ===== DRAG & DROP =====
|
||||
|
||||
def _setup_dnd(self):
|
||||
"""Try to setup tkinterdnd2. Silently skip if not available."""
|
||||
try:
|
||||
from tkinterdnd2 import DND_FILES
|
||||
self.drop_target_register(DND_FILES)
|
||||
self.dnd_bind("<<Drop>>", self._on_drop)
|
||||
except (ImportError, Exception):
|
||||
pass
|
||||
|
||||
def _on_drop(self, event):
|
||||
"""Handle drag & drop files."""
|
||||
files = self.tk.splitlist(event.data)
|
||||
for f in files:
|
||||
if f not in self.files_list:
|
||||
self.files_list.append(f)
|
||||
self._refresh_file_list()
|
||||
|
||||
# ===== FILE MANAGEMENT =====
|
||||
|
||||
def _add_files(self):
|
||||
paths = filedialog.askopenfilenames(title="Выбрать файлы")
|
||||
if paths:
|
||||
for p in paths:
|
||||
if p not in self.files_list:
|
||||
self.files_list.append(p)
|
||||
self._refresh_file_list()
|
||||
self._log(f"Добавлено файлов: {len(paths)}")
|
||||
|
||||
def _add_folder(self):
|
||||
folder = filedialog.askdirectory(title="Выбрать папку")
|
||||
if folder:
|
||||
count = 0
|
||||
for root, dirs, files in os.walk(folder):
|
||||
for fname in files:
|
||||
full = os.path.join(root, fname)
|
||||
if full not in self.files_list:
|
||||
self.files_list.append(full)
|
||||
count += 1
|
||||
self._refresh_file_list()
|
||||
self._log(f"Добавлено из папки: {count} файлов")
|
||||
|
||||
def _clear_files(self):
|
||||
self.files_list.clear()
|
||||
self._refresh_file_list()
|
||||
self._log("Список очищен")
|
||||
|
||||
def _remove_file(self, path: str):
|
||||
if path in self.files_list:
|
||||
self.files_list.remove(path)
|
||||
self._refresh_file_list()
|
||||
|
||||
def _refresh_file_list(self):
|
||||
# Clear scrollable frame
|
||||
for widget in self.file_scroll.winfo_children():
|
||||
widget.destroy()
|
||||
|
||||
if not self.files_list:
|
||||
self.drop_label.place(relx=0.5, rely=0.5, anchor="center")
|
||||
else:
|
||||
self.drop_label.place_forget()
|
||||
|
||||
for filepath in self.files_list:
|
||||
row = ctk.CTkFrame(self.file_scroll, fg_color="transparent", height=30)
|
||||
row.pack(fill="x", pady=1)
|
||||
|
||||
# Icon based on type
|
||||
ext = os.path.splitext(filepath)[1].lower()
|
||||
icon = "📄"
|
||||
if ext in (".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp"):
|
||||
icon = "🖼"
|
||||
elif ext in (".mp3", ".wav", ".flac", ".ogg"):
|
||||
icon = "🎵"
|
||||
elif ext in (".mp4", ".avi", ".mkv", ".mov"):
|
||||
icon = "🎬"
|
||||
elif ext in (".zip", ".rar", ".7z", ".tar", ".gz"):
|
||||
icon = "📦"
|
||||
elif ext == EXTENSION:
|
||||
icon = "🔒"
|
||||
elif os.path.isdir(filepath):
|
||||
icon = "📁"
|
||||
|
||||
name = os.path.basename(filepath)
|
||||
size = ""
|
||||
if os.path.isfile(filepath):
|
||||
s = os.path.getsize(filepath)
|
||||
size = self._format_size(s)
|
||||
|
||||
label = ctk.CTkLabel(row, text=f"{icon} {name} ({size})", font=("Segoe UI", 11), anchor="w")
|
||||
label.pack(side="left", fill="x", expand=True, padx=5)
|
||||
|
||||
del_btn = ctk.CTkButton(
|
||||
row, text="✕", width=28, height=28, fg_color="#553333",
|
||||
hover_color="#883333", command=lambda p=filepath: self._remove_file(p)
|
||||
)
|
||||
del_btn.pack(side="right", padx=5)
|
||||
|
||||
# Context menu on right-click
|
||||
label.bind("<Button-3>", lambda e, p=filepath: self._show_context_menu(e, p))
|
||||
|
||||
self._update_statusbar()
|
||||
|
||||
def _show_context_menu(self, event, filepath):
|
||||
menu = Menu(self, tearoff=0)
|
||||
menu.add_command(label="Удалить из списка", command=lambda: self._remove_file(filepath))
|
||||
menu.add_command(label="Открыть расположение", command=lambda: os.startfile(os.path.dirname(filepath)))
|
||||
menu.post(event.x_root, event.y_root)
|
||||
|
||||
def _update_statusbar(self):
|
||||
count = len(self.files_list)
|
||||
total_size = sum(os.path.getsize(f) for f in self.files_list if os.path.isfile(f))
|
||||
self.status_files.configure(text=f"Файлов: {count}")
|
||||
self.status_size.configure(text=f"Размер: {self._format_size(total_size)}")
|
||||
|
||||
@staticmethod
|
||||
def _format_size(size_bytes: int) -> str:
|
||||
if size_bytes < 1024:
|
||||
return f"{size_bytes} B"
|
||||
elif size_bytes < 1024 ** 2:
|
||||
return f"{size_bytes / 1024:.1f} KB"
|
||||
elif size_bytes < 1024 ** 3:
|
||||
return f"{size_bytes / 1024**2:.1f} MB"
|
||||
else:
|
||||
return f"{size_bytes / 1024**3:.2f} GB"
|
||||
|
||||
# ===== PASSWORD =====
|
||||
|
||||
def _on_password_change(self, *args):
|
||||
pw = self.password_var.get()
|
||||
if not pw:
|
||||
self.strength_bar.set(0)
|
||||
self.strength_label.configure(text="")
|
||||
return
|
||||
score, label = check_password_strength(pw)
|
||||
self.strength_bar.set(score / 4)
|
||||
color = COLORS[f"strength_{score}"]
|
||||
self.strength_bar.configure(progress_color=color)
|
||||
self.strength_label.configure(text=label, text_color=color)
|
||||
|
||||
def _toggle_password_visibility(self):
|
||||
current = self.pw_entry.cget("show")
|
||||
self.pw_entry.configure(show="" if current == "●" else "●")
|
||||
self.pw_show_btn.configure(text="🙈" if current == "●" else "👁")
|
||||
|
||||
def _generate_password(self):
|
||||
pw = generate_password(20)
|
||||
self.password_var.set(pw)
|
||||
self.pw_entry.configure(show="")
|
||||
self.pw_show_btn.configure(text="🙈")
|
||||
# Copy to clipboard
|
||||
self.clipboard_clear()
|
||||
self.clipboard_append(pw)
|
||||
self._log("Пароль сгенерирован и скопирован в буфер обмена")
|
||||
|
||||
# ===== FILE KEY =====
|
||||
|
||||
def _load_hw_key(self):
|
||||
path = filedialog.askopenfilename(title="Выбрать файл-ключ")
|
||||
if path:
|
||||
self.hardware_key_path = path
|
||||
name = os.path.basename(path)
|
||||
self.hw_key_btn.configure(text=f"🔑 Ключ: {name[:20]}")
|
||||
self._log(f"Файл-ключ: {name}")
|
||||
|
||||
def _clear_hw_key(self):
|
||||
self.hardware_key_path = None
|
||||
self.hw_key_btn.configure(text="🔑 Файл-ключ: не выбран")
|
||||
self._log("Файл-ключ сброшен")
|
||||
|
||||
# ===== SIDEBAR CALLBACKS =====
|
||||
|
||||
def _on_attempts_change(self, value):
|
||||
self.attempts_label.configure(text=str(int(value)))
|
||||
|
||||
def _change_theme(self, value):
|
||||
ctk.set_appearance_mode(value)
|
||||
|
||||
# ===== OPERATIONS =====
|
||||
|
||||
def _do_encrypt(self):
|
||||
if self.is_busy:
|
||||
return
|
||||
if not self.files_list:
|
||||
messagebox.showwarning("CryptZ", "Добавьте файлы для шифрования!")
|
||||
return
|
||||
pw = self.password_var.get()
|
||||
if not pw:
|
||||
messagebox.showwarning("CryptZ", "Введите пароль!")
|
||||
return
|
||||
|
||||
output_path = filedialog.asksaveasfilename(
|
||||
title="Сохранить зашифрованный архив",
|
||||
defaultextension=EXTENSION,
|
||||
filetypes=[("CryptZ Archive", f"*{EXTENSION}"), ("All files", "*.*")]
|
||||
)
|
||||
if not output_path:
|
||||
return
|
||||
|
||||
# 2FA setup
|
||||
use_2fa = self.check_2fa.get()
|
||||
otp_secret = "NONE"
|
||||
if use_2fa:
|
||||
otp_secret = pyotp.random_base32()
|
||||
self._show_2fa_setup(otp_secret)
|
||||
|
||||
options = EncryptOptions(
|
||||
password=pw,
|
||||
files=self.files_list.copy(),
|
||||
output_path=output_path,
|
||||
use_2fa=use_2fa,
|
||||
otp_secret=otp_secret,
|
||||
max_attempts=int(self.attempts_slider.get()),
|
||||
hardware_key_path=self.hardware_key_path,
|
||||
secure_delete=bool(self.check_secure_del.get()),
|
||||
compress=bool(self.check_compress.get()),
|
||||
progress_callback=self._progress_callback,
|
||||
)
|
||||
|
||||
self.is_busy = True
|
||||
self._log("Шифрование начато...")
|
||||
threading.Thread(target=self._encrypt_thread, args=(options,), daemon=True).start()
|
||||
|
||||
def _encrypt_thread(self, options: EncryptOptions):
|
||||
try:
|
||||
result = encrypt(options)
|
||||
self.after(0, lambda: self._on_encrypt_done(result))
|
||||
except Exception as e:
|
||||
self.after(0, lambda: self._on_error(str(e)))
|
||||
|
||||
def _on_encrypt_done(self, path):
|
||||
self.is_busy = False
|
||||
self._log(f"✅ Архив создан: {os.path.basename(path)}")
|
||||
messagebox.showinfo("CryptZ", "Архив успешно создан!\n{}".format(path))
|
||||
|
||||
def _do_decrypt(self):
|
||||
if self.is_busy:
|
||||
return
|
||||
pw = self.password_var.get()
|
||||
if not pw:
|
||||
messagebox.showwarning("CryptZ", "Введите пароль!")
|
||||
return
|
||||
|
||||
file_path = filedialog.askopenfilename(
|
||||
title="Выбрать зашифрованный архив",
|
||||
filetypes=[("CryptZ Archive", f"*{EXTENSION}"), ("All files", "*.*")]
|
||||
)
|
||||
if not file_path:
|
||||
return
|
||||
|
||||
output_dir = filedialog.askdirectory(title="Папка для извлечения")
|
||||
if not output_dir:
|
||||
return
|
||||
|
||||
self.is_busy = True
|
||||
self._log("Расшифровка начата...")
|
||||
threading.Thread(
|
||||
target=self._decrypt_thread,
|
||||
args=(file_path, pw, output_dir, None),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
def _decrypt_thread(self, file_path, password, output_dir, otp_code):
|
||||
result = decrypt(
|
||||
file_path=file_path,
|
||||
password=password,
|
||||
output_dir=output_dir,
|
||||
hardware_key_path=self.hardware_key_path,
|
||||
otp_code=otp_code,
|
||||
progress_callback=self._progress_callback,
|
||||
)
|
||||
self.after(0, lambda: self._on_decrypt_done(result, file_path, password, output_dir))
|
||||
|
||||
def _on_decrypt_done(self, result: DecryptResult, file_path, password, output_dir):
|
||||
self.is_busy = False
|
||||
if result.needs_2fa:
|
||||
self._ask_2fa_code(file_path, password, output_dir)
|
||||
return
|
||||
if result.success:
|
||||
self._log(f"✅ {result.message}")
|
||||
messagebox.showinfo("CryptZ", result.message)
|
||||
else:
|
||||
self._log(f"❌ {result.message}")
|
||||
messagebox.showerror("CryptZ", result.message)
|
||||
|
||||
def _ask_2fa_code(self, file_path, password, output_dir):
|
||||
"""Show 2FA input dialog."""
|
||||
dialog = ctk.CTkInputDialog(text="Введите код 2FA (6 цифр):", title="Двухфакторная аутентификация")
|
||||
code = dialog.get_input()
|
||||
if code:
|
||||
self.is_busy = True
|
||||
threading.Thread(
|
||||
target=self._decrypt_thread,
|
||||
args=(file_path, password, output_dir, code),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
def _do_verify(self):
|
||||
if self.is_busy:
|
||||
return
|
||||
pw = self.password_var.get()
|
||||
if not pw:
|
||||
messagebox.showwarning("CryptZ", "Введите пароль!")
|
||||
return
|
||||
|
||||
file_path = filedialog.askopenfilename(
|
||||
title="Выбрать архив для проверки",
|
||||
filetypes=[("CryptZ Archive", f"*{EXTENSION}"), ("All files", "*.*")]
|
||||
)
|
||||
if not file_path:
|
||||
return
|
||||
|
||||
ok, msg = verify_integrity(file_path, pw, self.hardware_key_path)
|
||||
if ok:
|
||||
self._log(f"✅ {msg}")
|
||||
messagebox.showinfo("CryptZ", msg)
|
||||
else:
|
||||
self._log(f"❌ {msg}")
|
||||
messagebox.showerror("CryptZ", msg)
|
||||
|
||||
# ===== 2FA SETUP =====
|
||||
|
||||
def _show_2fa_setup(self, otp_secret: str):
|
||||
"""Show QR code window for 2FA setup."""
|
||||
win = ctk.CTkToplevel(self)
|
||||
win.title("Настройка 2FA")
|
||||
win.geometry("400x450")
|
||||
win.transient(self)
|
||||
win.grab_set()
|
||||
|
||||
ctk.CTkLabel(win, text="Сканируйте QR-код в\nGoogle Authenticator / Authy",
|
||||
font=("Segoe UI", 14)).pack(pady=15)
|
||||
|
||||
# Generate QR
|
||||
uri = pyotp.TOTP(otp_secret).provisioning_uri(name="CryptZ", issuer_name="CryptZ Ultimate")
|
||||
qr_img = qrcode.make(uri).resize((250, 250))
|
||||
photo = ImageTk.PhotoImage(qr_img)
|
||||
|
||||
qr_label = ctk.CTkLabel(win, image=photo, text="")
|
||||
qr_label.image = photo
|
||||
qr_label.pack(pady=10)
|
||||
|
||||
ctk.CTkLabel(win, text=f"Секрет: {otp_secret}", font=("Consolas", 11)).pack(pady=5)
|
||||
|
||||
ctk.CTkButton(win, text="Готово", command=win.destroy, width=120).pack(pady=15)
|
||||
|
||||
# ===== PROGRESS & LOG =====
|
||||
|
||||
def _progress_callback(self, progress: float, message: str):
|
||||
self.after(0, lambda: self._update_progress(progress, message))
|
||||
|
||||
def _update_progress(self, progress: float, message: str):
|
||||
self.progress_bar.set(progress)
|
||||
self.progress_label.configure(text=message)
|
||||
self.status_op.configure(text=message)
|
||||
|
||||
def _log(self, message: str):
|
||||
timestamp = time.strftime("%H:%M:%S")
|
||||
self.log_text.configure(state="normal")
|
||||
self.log_text.insert("end", "[{}] {}\n".format(timestamp, message))
|
||||
self.log_text.see("end")
|
||||
self.log_text.configure(state="disabled")
|
||||
|
||||
def _on_error(self, error_msg: str):
|
||||
self.is_busy = False
|
||||
self._log(f"❌ Ошибка: {error_msg}")
|
||||
messagebox.showerror("CryptZ", f"Ошибка: {error_msg}")
|
||||
Reference in New Issue
Block a user