From f9926c0b415a4e0cc02b4a8d34e09cdedc110435 Mon Sep 17 00:00:00 2001 From: Justin Bollinger Date: Wed, 22 Apr 2026 14:41:38 -0400 Subject: [PATCH] feat(notify): add notification package with Pushover backend Introduce hate_crack.notify package with a small functional public API and a CrackTailer thread for polling hashcat output files. Package layout keeps the HTTP call (_send_pushover) isolated so future backends (Slack, generic webhooks) can be added as a sibling function rather than a framework rewrite. Core pieces: - settings.py: NotifySettings dataclass plus atomic config persistence (save_enabled, add_to_allowlist) via read-modify-write + os.replace. - pushover.py: single _send_pushover() that never raises; network errors, missing requests, and missing creds all funnel to False. - _suppress.py: thread-local suppression context manager so orchestrator attacks can chain primitives without flooding notifications. - tailer.py: CrackTailer(threading.Thread) that seeks to EOF on start, polls at a user-configurable interval, and collapses per-tick bursts into a single aggregate notification when they exceed the cap. - __init__.py: public API (init, prompt_notify_for_attack, notify_job_done, notify_crack, start_tailer, stop_tailer, toggle_enabled, suppressed_notifications). Privacy guarantee: notification payloads contain only attack name, counts, and hash path, never plaintexts. 72 new tests cover dataclass defaults, atomic config writes, idempotent allowlist updates, HTTP payload privacy, suppression nesting and thread-locality, tailer EOF seek, burst cap, truncation recovery, and the per-attack prompt's [y/n/always] flow. Co-Authored-By: Claude Sonnet 4.6 --- hate_crack/notify/__init__.py | 291 +++++++++++++++++++++++++++++++ hate_crack/notify/_suppress.py | 38 ++++ hate_crack/notify/pushover.py | 69 ++++++++ hate_crack/notify/settings.py | 166 ++++++++++++++++++ hate_crack/notify/tailer.py | 208 ++++++++++++++++++++++ tests/test_notify_integration.py | 242 +++++++++++++++++++++++++ tests/test_notify_pushover.py | 88 ++++++++++ tests/test_notify_settings.py | 159 +++++++++++++++++ tests/test_notify_suppression.py | 74 ++++++++ tests/test_notify_tailer.py | 219 +++++++++++++++++++++++ 10 files changed, 1554 insertions(+) create mode 100644 hate_crack/notify/__init__.py create mode 100644 hate_crack/notify/_suppress.py create mode 100644 hate_crack/notify/pushover.py create mode 100644 hate_crack/notify/settings.py create mode 100644 hate_crack/notify/tailer.py create mode 100644 tests/test_notify_integration.py create mode 100644 tests/test_notify_pushover.py create mode 100644 tests/test_notify_settings.py create mode 100644 tests/test_notify_suppression.py create mode 100644 tests/test_notify_tailer.py diff --git a/hate_crack/notify/__init__.py b/hate_crack/notify/__init__.py new file mode 100644 index 0000000..9f835e4 --- /dev/null +++ b/hate_crack/notify/__init__.py @@ -0,0 +1,291 @@ +"""Public notification API for hate_crack. + +Overview +======== + +This package wires per-attack and per-crack notifications into the core +hashcat runner. The design is intentionally small and functional: a single +module-level ``_settings`` object, a handful of helper functions, and one +``CrackTailer`` thread class where polling state genuinely wants OO. + +Wiring +====== + +At startup ``main.py`` calls :func:`init` with the resolved config path and +parsed config dict. After that, the rest of the codebase interacts with +this package via: + +- :func:`prompt_notify_for_attack` -- called by attacks.py before an attack + starts; asks the user ``[y/n/always]`` and stashes per-run consent. +- :func:`start_tailer` / :func:`stop_tailer` -- called by the hashcat + command wrapper to spin a background watcher on ``{hashfile}.out``. +- :func:`notify_job_done` -- called by the hashcat command wrapper after + the subprocess exits, fires one summary notification. +- :func:`suppressed_notifications` -- context manager for orchestrator + attacks that chain many primitives; collapses all nested notifications + so the orchestrator can fire a single aggregate at the end. + +Adding a new backend +==================== + +The single concrete HTTP call lives in :mod:`hate_crack.notify.pushover`. +To add Slack/webhooks, write a sibling ``_send_slack()`` there (or in a +new module) and dispatch from :func:`notify_job_done` / +:func:`notify_crack`. No framework, no ABC — one function per transport. +""" + +from __future__ import annotations + +import logging +from typing import Callable + +from hate_crack.notify._suppress import ( + is_suppressed, + suppressed_notifications, +) +from hate_crack.notify.pushover import _send_pushover +from hate_crack.notify.settings import ( + NotifySettings, + add_to_allowlist, + load_settings, + save_enabled, +) +from hate_crack.notify.tailer import ( + CrackTailer, + extract_username_from_out_line, +) + +logger = logging.getLogger(__name__) + + +# Module-level runtime state. Treated as a singleton because the CLI is a +# single-process tool with a single user; no need to pass a context object +# through every attack signature. +_settings: NotifySettings | None = None +_config_path: str | None = None + +# Per-run consent cache: attack_name -> bool. Populated by +# ``prompt_notify_for_attack`` and consulted by ``notify_job_done`` so we +# don't need to re-prompt mid-chain. +_run_consent: dict[str, bool] = {} + +# Input function indirection so tests can inject answers without pulling +# in a terminal. Swap via ``set_input_func``. +_input_func: Callable[[str], str] = input + + +__all__ = [ + "CrackTailer", + "NotifySettings", + "add_to_allowlist", + "clear_state_for_tests", + "extract_username_from_out_line", + "get_settings", + "init", + "is_suppressed", + "notify_crack", + "notify_job_done", + "prompt_notify_for_attack", + "set_input_func", + "start_tailer", + "stop_tailer", + "suppressed_notifications", + "toggle_enabled", + "_send_pushover", +] + + +def init(config_path: str | None, config_parser: dict | None) -> None: + """Bootstrap the notify subsystem from the resolved config. + + Called once from ``main.py`` after its config-loading block. Safe to + call multiple times — the second call replaces settings but does not + reset per-run consent (the user may already have answered prompts). + """ + global _settings, _config_path + _config_path = config_path + _settings = load_settings(config_parser) + + +def get_settings() -> NotifySettings: + """Return the active settings, or fresh defaults if ``init`` never ran.""" + return _settings if _settings is not None else NotifySettings() + + +def set_input_func(func: Callable[[str], str]) -> None: + """Test hook: swap the ``input()`` used by :func:`prompt_notify_for_attack`.""" + global _input_func + _input_func = func + + +def clear_state_for_tests() -> None: + """Reset module state. Only used by the test suite.""" + global _settings, _config_path, _input_func + _settings = None + _config_path = None + _run_consent.clear() + _input_func = input + + +def toggle_enabled() -> bool: + """Flip ``notify_enabled``, persist to ``config.json``, return new state. + + If ``init`` was never called we still toggle an in-memory default — the + UI update must not crash even if the config file is unreachable. + """ + global _settings + if _settings is None: + _settings = NotifySettings() + _settings.enabled = not _settings.enabled + if _config_path: + try: + save_enabled(_config_path, _settings.enabled) + except OSError as exc: + logger.warning("Could not persist notify_enabled: %s", exc) + return _settings.enabled + + +def _in_allowlist(attack_name: str) -> bool: + return attack_name in get_settings().attack_allowlist + + +def prompt_notify_for_attack(attack_name: str) -> bool: + """Ask the user whether this attack should fire a notification. + + Returns ``True`` if notifications should fire for this run. + + Flow: + + 1. Notifications disabled globally -> return False silently (no prompt). + 2. Attack already in allowlist -> return True (no prompt, auto-on). + 3. Otherwise -> prompt ``[y/n/always]``: + * ``y`` -> consent for this run only. + * ``n`` / "" -> no consent. + * ``always`` -> persist to allowlist and consent for this run. + + Per-run consent is stashed in ``_run_consent[attack_name]`` so the + hashcat wrapper can query it at job-done time without re-prompting. + """ + settings = get_settings() + if not settings.enabled: + _run_consent[attack_name] = False + return False + if _in_allowlist(attack_name): + _run_consent[attack_name] = True + return True + + try: + raw = _input_func( + f"\n[notify] Send Pushover notifications for '{attack_name}'? [y/N/always]: " + ) + except EOFError: + raw = "" + answer = (raw or "").strip().lower() + if answer == "always": + _run_consent[attack_name] = True + if _config_path: + try: + add_to_allowlist(_config_path, attack_name) + # Also update the in-memory settings so a later call in the + # same session sees the allowlist without re-reading config. + if attack_name not in settings.attack_allowlist: + settings.attack_allowlist.append(attack_name) + except OSError as exc: + logger.warning("Could not persist allowlist entry: %s", exc) + return True + if answer in ("y", "yes"): + _run_consent[attack_name] = True + return True + _run_consent[attack_name] = False + return False + + +def _should_fire(attack_name: str) -> bool: + if is_suppressed(): + return False + settings = get_settings() + if not settings.enabled: + return False + if _in_allowlist(attack_name): + return True + return _run_consent.get(attack_name, False) + + +def notify_job_done( + attack_name: str, + cracked_count: int, + hash_file: str | None = None, +) -> None: + """Fire a single "attack complete" notification. + + No-op when suppressed, disabled, or the user declined at the prompt. + """ + if not _should_fire(attack_name): + return + settings = get_settings() + title = f"hate_crack: {attack_name} complete" + if hash_file: + message = ( + f"Attack '{attack_name}' finished.\n" + f"Cracked so far: {cracked_count}\n" + f"Hash file: {hash_file}" + ) + else: + message = f"Attack '{attack_name}' finished.\nCracked so far: {cracked_count}" + _send_pushover(settings.pushover_token, settings.pushover_user, title, message) + + +def notify_crack(label: str, attack_name: str) -> None: + """Fire a per-crack notification (called from :class:`CrackTailer`).""" + if not _should_fire(attack_name): + return + settings = get_settings() + title = "hate_crack: new crack" + message = f"{label} cracked ({attack_name})" + _send_pushover(settings.pushover_token, settings.pushover_user, title, message) + + +def _notify_aggregate(count: int, attack_name: str) -> None: + """Aggregated "N accounts cracked" notification for burst-capped ticks.""" + if not _should_fire(attack_name): + return + settings = get_settings() + title = "hate_crack: crack burst" + message = f"{count} new accounts cracked ({attack_name})" + _send_pushover(settings.pushover_token, settings.pushover_user, title, message) + + +def start_tailer(out_path: str, attack_name: str) -> CrackTailer | None: + """Start a :class:`CrackTailer` if per-crack notifications are enabled. + + Returns the running tailer (so the caller can stop it later), or + ``None`` when suppression/disabled/disallowed mean we shouldn't tail. + """ + if is_suppressed(): + return None + settings = get_settings() + if not settings.enabled: + return None + if not settings.per_crack_enabled: + return None + if not _should_fire(attack_name): + return None + tailer = CrackTailer( + out_path=out_path, + attack_name=attack_name, + settings=settings, + notify_callback=notify_crack, + aggregate_callback=_notify_aggregate, + ) + tailer.start() + return tailer + + +def stop_tailer(tailer: CrackTailer | None) -> None: + """Stop a tailer started by :func:`start_tailer`. ``None`` is a no-op.""" + if tailer is None: + return + try: + tailer.stop() + except Exception as exc: + logger.warning("CrackTailer.stop() failed: %s", exc) diff --git a/hate_crack/notify/_suppress.py b/hate_crack/notify/_suppress.py new file mode 100644 index 0000000..d70a40a --- /dev/null +++ b/hate_crack/notify/_suppress.py @@ -0,0 +1,38 @@ +"""Thread-local notification suppression. + +Multi-phase orchestrators (e.g. ``extensive_crack``) chain many hashcat +invocations. Without suppression every primitive would fire its own +"job complete" notification — loud, useless, and easy to rate-limit into +the ground. Callers wrap the chain in :func:`suppressed_notifications` +and then fire exactly one aggregate notification at the end. + +Suppression is thread-local so a tailer thread in a different context +can't accidentally silence itself by observing another thread's flag. +""" + +from __future__ import annotations + +import contextlib +import threading + +_suppressed = threading.local() + + +def is_suppressed() -> bool: + """Return True if the calling thread is inside a suppression context.""" + return getattr(_suppressed, "active", False) + + +@contextlib.contextmanager +def suppressed_notifications(): + """Context manager that suppresses notify_* calls on the current thread. + + Nests correctly: the previous state is restored on exit so an outer + ``with`` block still reflects "suppressed" after an inner one exits. + """ + prev = getattr(_suppressed, "active", False) + _suppressed.active = True + try: + yield + finally: + _suppressed.active = prev diff --git a/hate_crack/notify/pushover.py b/hate_crack/notify/pushover.py new file mode 100644 index 0000000..e0b3e74 --- /dev/null +++ b/hate_crack/notify/pushover.py @@ -0,0 +1,69 @@ +"""Pushover HTTP backend. + +Single concrete entry point: :func:`_send_pushover`. Exposed as a module-level +function (not a class) so that wiring in a Slack or generic webhook backend +later is a matter of writing a sibling ``_send_slack()`` alongside this one. + +Design invariants: + +- The call is a best-effort side effect. A crashed attack is strictly worse + than a missed notification, so every exception below the ``requests`` layer + is caught and swallowed (returning ``False``). +- Missing ``requests`` is treated the same as any other transport error — + we log a warning once and return ``False`` without re-raising. +- Payload contains *only* title + message + credentials. No plaintexts, no + hash data. The tailer guarantees this at call sites; this function is the + final safety barrier if a caller passes bad data (we still forward whatever + string it gives us, but we never introspect outside the message). +""" + +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) + +try: + import requests # type: ignore[import-untyped] +except ImportError: # pragma: no cover - requests is a hard dep of the package + requests = None # type: ignore[assignment] + + +PUSHOVER_URL = "https://api.pushover.net/1/messages.json" + + +def _send_pushover(token: str, user: str, title: str, message: str) -> bool: + """POST a notification to Pushover. + + Returns ``True`` on HTTP 200, ``False`` on any other outcome. Never + raises — network errors, missing credentials, and missing ``requests`` + are all funneled into a ``False`` return so the caller can treat the + notification as a fire-and-forget side effect. + """ + if not token or not user: + logger.debug("Pushover not configured: missing token or user") + return False + + if requests is None: + logger.warning("Pushover requested but 'requests' is not importable") + return False + + payload = { + "token": token, + "user": user, + "title": title, + "message": message, + } + try: + response = requests.post(PUSHOVER_URL, data=payload, timeout=10) + except Exception as exc: + # requests.RequestException covers most, but a plugin/mock may raise + # a bare Exception — we must never let this escape. + logger.warning("Pushover send failed: %s", exc) + return False + + status = getattr(response, "status_code", None) + if status == 200: + return True + logger.warning("Pushover send returned HTTP %s", status) + return False diff --git a/hate_crack/notify/settings.py b/hate_crack/notify/settings.py new file mode 100644 index 0000000..0e388bc --- /dev/null +++ b/hate_crack/notify/settings.py @@ -0,0 +1,166 @@ +"""Notification settings: dataclass + atomic config persistence. + +Settings live in the same ``config.json`` that drives the rest of hate_crack. +This module isolates (a) the typed shape of notification config and +(b) the read-modify-write persistence primitives used by the runtime toggle +and the ``[yes/no/always]`` per-attack prompt. + +Persistence follows the same pattern as ``main.py`` (``json.load`` -> +mutate -> ``json.dump(..., indent=2)`` via a temp file and ``os.replace``) +so a crash mid-write cannot corrupt the config. +""" + +from __future__ import annotations + +import json +import os +import tempfile +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class NotifySettings: + """Typed view of the ``notify_*`` keys from ``config.json``. + + Defaults mirror ``config.json.example`` so freshly-loaded configs and + in-memory fallbacks agree. + """ + + enabled: bool = False + pushover_token: str = "" + pushover_user: str = "" + per_crack_enabled: bool = False + attack_allowlist: list[str] = field(default_factory=list) + suppress_in_orchestrators: bool = True + max_cracks_per_burst: int = 5 + poll_interval_seconds: float = 5.0 + + +def _coerce_bool(value: Any, default: bool) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + if isinstance(value, str): + return value.strip().lower() in ("1", "true", "yes", "on") + return default + + +def _coerce_int(value: Any, default: int) -> int: + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _coerce_float(value: Any, default: float) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default + + +def _coerce_str(value: Any, default: str) -> str: + if value is None: + return default + return str(value) + + +def _coerce_list(value: Any) -> list[str]: + if isinstance(value, list): + return [str(v) for v in value] + return [] + + +def load_settings(config_parser: dict | None) -> NotifySettings: + """Build a ``NotifySettings`` from a parsed config dict. + + Unknown / missing / badly-typed keys fall back to dataclass defaults + so the runtime always has a valid settings object, even when the + config was written by an older hate_crack install. + """ + cfg = config_parser or {} + defaults = NotifySettings() + return NotifySettings( + enabled=_coerce_bool(cfg.get("notify_enabled"), defaults.enabled), + pushover_token=_coerce_str(cfg.get("notify_pushover_token"), defaults.pushover_token), + pushover_user=_coerce_str(cfg.get("notify_pushover_user"), defaults.pushover_user), + per_crack_enabled=_coerce_bool( + cfg.get("notify_per_crack_enabled"), defaults.per_crack_enabled + ), + attack_allowlist=_coerce_list(cfg.get("notify_attack_allowlist")), + suppress_in_orchestrators=_coerce_bool( + cfg.get("notify_suppress_in_orchestrators"), + defaults.suppress_in_orchestrators, + ), + max_cracks_per_burst=_coerce_int( + cfg.get("notify_max_cracks_per_burst"), defaults.max_cracks_per_burst + ), + poll_interval_seconds=_coerce_float( + cfg.get("notify_poll_interval_seconds"), defaults.poll_interval_seconds + ), + ) + + +def _atomic_rewrite(config_path: str, mutator) -> None: + """Read config_path, apply ``mutator(dict)`` in place, write atomically. + + - Missing file is treated as empty dict (mutator runs on ``{}``). + - Invalid JSON is silently replaced with the mutator's output; we do not + want to block a notification toggle on a pre-existing bad config. + - Write goes to a temp file in the same directory and is swapped in + via ``os.replace`` so readers never see a half-written file. + """ + data: dict = {} + if os.path.isfile(config_path): + try: + with open(config_path) as f: + loaded = json.load(f) + if isinstance(loaded, dict): + data = loaded + except (OSError, json.JSONDecodeError): + data = {} + mutator(data) + directory = os.path.dirname(os.path.abspath(config_path)) or "." + os.makedirs(directory, exist_ok=True) + fd, tmp_path = tempfile.mkstemp(prefix=".config-", suffix=".json", dir=directory) + try: + with os.fdopen(fd, "w") as tmp: + json.dump(data, tmp, indent=2) + os.replace(tmp_path, config_path) + except Exception: + # Best-effort cleanup of stale temp file on failure. + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + +def save_enabled(config_path: str, enabled: bool) -> None: + """Persist ``notify_enabled`` without disturbing other config keys.""" + + def _apply(data: dict) -> None: + data["notify_enabled"] = bool(enabled) + + _atomic_rewrite(config_path, _apply) + + +def add_to_allowlist(config_path: str, attack_name: str) -> None: + """Append ``attack_name`` to ``notify_attack_allowlist`` if absent. + + Idempotent: already-present entries are a no-op. + """ + if not attack_name: + return + + def _apply(data: dict) -> None: + current = data.get("notify_attack_allowlist") + if not isinstance(current, list): + current = [] + if attack_name not in current: + current.append(attack_name) + data["notify_attack_allowlist"] = current + + _atomic_rewrite(config_path, _apply) diff --git a/hate_crack/notify/tailer.py b/hate_crack/notify/tailer.py new file mode 100644 index 0000000..0808eb1 --- /dev/null +++ b/hate_crack/notify/tailer.py @@ -0,0 +1,208 @@ +"""Background tailer that watches ``{hashfile}.out`` for new cracks. + +Design rationale: + +- Polling (not inotify/fsevents) keeps the implementation portable across + macOS/Linux without adding a dependency. +- We seek to EOF on start: the hashfile may already contain cracks from a + previous run, and re-notifying those would be both wrong and spammy. +- Burst cap: if a single poll tick yields more cracks than the user's + configured threshold, we collapse them into one aggregated notification + instead of N pings. Cracks tend to arrive in rule-file bursts, so this + knob matters. +- The thread is a daemon so a hung tailer can't block process exit. +- ``stop()`` joins with a timeout to guarantee forward progress even if the + poll loop is stuck on a slow filesystem. +""" + +from __future__ import annotations + +import logging +import os +import threading +from typing import Callable + +from hate_crack.notify.settings import NotifySettings + +logger = logging.getLogger(__name__) + + +def extract_username_from_out_line(line: str) -> str | None: + """Extract a username from a single hashcat ``.out`` line, if present. + + We support the four output layouts hate_crack can produce upstream: + + ======================= =========================================== ============ + Input shape Example Returns + ======================= =========================================== ============ + bare hash ``5f4dcc3b5aa:plaintext`` ``None`` + user:hash:plain ``alice:5f4dcc:plaintext`` ``"alice"`` + pwdump ``alice:1001:aad3b:31d6:::plaintext`` ``"alice"`` + NetNTLMv2 ``alice::DOMAIN:challenge:response:plain`` ``"alice"`` + ======================= =========================================== ============ + + The function deliberately NEVER returns the plaintext — that would + defeat the privacy guarantee documented on the notify public API. + """ + if not line: + return None + stripped = line.rstrip("\r\n") + if not stripped: + return None + + parts = stripped.split(":") + if len(parts) < 2: + return None + + # Bare hash (hash:plain): exactly two colons-separated fields and the + # first field is a plausible hash — any non-empty first field that is + # not a pure hex string may still be a username, so we fall through. + if len(parts) == 2: + return None + + first = parts[0] + if not first: + # Lines that start with ':' aren't in any format we care about. + return None + + # pwdump: user:RID:LM:NT:::plain -> field[1] is numeric RID + if len(parts) >= 7 and parts[1].isdigit(): + return first + + # NetNTLMv2: user::DOMAIN:... -> field[1] is empty + if len(parts) >= 5 and parts[1] == "": + return first + + # user:hash:plain style — assume first field is username. + # We only claim this when there's at least 3 fields so we don't + # mis-label a bare ``hash:plain`` as ``username:plain``. + if len(parts) >= 3: + return first + + return None + + +class CrackTailer(threading.Thread): + """Polls ``out_path`` for new cracked lines and dispatches notifications. + + The tailer is created and started by :func:`hate_crack.notify.start_tailer` + and stopped by :func:`hate_crack.notify.stop_tailer`. External callers + normally don't touch this class directly; it's public for tests. + """ + + daemon = True + + def __init__( + self, + out_path: str, + attack_name: str, + settings: NotifySettings, + notify_callback: Callable[[str, str], None], + aggregate_callback: Callable[[int, str], None], + *, + username_extractor: Callable[[str], str | None] | None = None, + ) -> None: + super().__init__(name=f"CrackTailer[{attack_name}]", daemon=True) + self.out_path = out_path + self.attack_name = attack_name + self.settings = settings + self._notify_crack = notify_callback + self._notify_aggregate = aggregate_callback + self._extract_username = username_extractor or extract_username_from_out_line + self._stop = threading.Event() + # Leftover bytes from the previous read that didn't end in \n. + self._buffer = b"" + # File position to read from; set on first successful open. + self._file_pos: int | None = None + + def stop(self) -> None: + """Signal the thread to exit and join with a bounded timeout. + + Safe to call on a tailer that was never started; in that case the + signal is still set (so a later ``start()`` would exit immediately) + and we skip the join that would otherwise raise. + """ + self._stop.set() + if self._started.is_set(): + self.join(timeout=10.0) + + def run(self) -> None: # pragma: no cover - thread entry, exercised via tests + try: + self._seek_to_eof() + # First tick of ``wait()`` blocks for the full interval; this is + # deliberate so we don't hammer the filesystem immediately after + # starting if hashcat hasn't written anything yet. + while not self._stop.wait(self.settings.poll_interval_seconds): + try: + self._poll_once() + except Exception as exc: + logger.warning("CrackTailer poll failed: %s", exc) + except Exception as exc: # pragma: no cover - defensive + logger.warning("CrackTailer crashed: %s", exc) + + def _seek_to_eof(self) -> None: + """Remember the current EOF so existing cracks aren't re-notified.""" + if os.path.isfile(self.out_path): + try: + self._file_pos = os.path.getsize(self.out_path) + except OSError: + self._file_pos = 0 + else: + self._file_pos = 0 + + def _poll_once(self) -> None: + if not os.path.isfile(self.out_path): + return + try: + size = os.path.getsize(self.out_path) + except OSError: + return + + if self._file_pos is None: + self._file_pos = size + return + + if size < self._file_pos: + # File was truncated / rewritten — reset rather than read garbage. + self._file_pos = 0 + self._buffer = b"" + + if size == self._file_pos: + return + + new_lines = self._read_new_lines(size) + if not new_lines: + return + + count = len(new_lines) + if count > self.settings.max_cracks_per_burst: + self._notify_aggregate(count, self.attack_name) + return + + for line_bytes in new_lines: + try: + line = line_bytes.decode("utf-8", errors="replace") + except Exception: + continue + label = self._extract_username(line) or self.attack_name + self._notify_crack(label, self.attack_name) + + def _read_new_lines(self, size: int) -> list[bytes]: + """Read from ``self._file_pos`` up to ``size`` and return full lines. + + Incomplete trailing bytes are buffered for the next tick. + """ + try: + with open(self.out_path, "rb") as f: + f.seek(self._file_pos) + data = f.read(size - self._file_pos) + except OSError: + return [] + self._file_pos = size + + combined = self._buffer + data + lines = combined.split(b"\n") + # The last element is either empty (buffer ended on \n) or a partial + # line — stash it for next poll either way. + self._buffer = lines.pop() if lines else b"" + return [line for line in lines if line] diff --git a/tests/test_notify_integration.py b/tests/test_notify_integration.py new file mode 100644 index 0000000..b757851 --- /dev/null +++ b/tests/test_notify_integration.py @@ -0,0 +1,242 @@ +"""Integration-style tests for the notify public API.""" +import json +from pathlib import Path +from unittest.mock import patch + +import pytest + +from hate_crack import notify + + +@pytest.fixture(autouse=True) +def _reset_notify_state(): + notify.clear_state_for_tests() + yield + notify.clear_state_for_tests() + + +def _init_with(tmp_path: Path, **kwargs) -> Path: + """Create a config file and initialize notify against it.""" + cfg = { + "notify_enabled": True, + "notify_pushover_token": "tok", + "notify_pushover_user": "usr", + } + cfg.update(kwargs) + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps(cfg)) + notify.init(str(config_path), cfg) + return config_path + + +class TestNotifyJobDone: + def test_no_op_when_disabled(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=False) + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_job_done("Brute Force", 3) + send.assert_not_called() + + def test_fires_when_enabled_and_in_allowlist(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_job_done("Brute Force", 7, hash_file="/tmp/h") + assert send.called + args = send.call_args.args + # (token, user, title, message) + assert args[0] == "tok" + assert args[1] == "usr" + assert "Brute Force" in args[2] + assert "7" in args[3] + + def test_no_op_when_enabled_but_attack_not_allowed(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=True) + # No allowlist, no consent -> should not fire. + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_job_done("Brute Force", 1) + send.assert_not_called() + + def test_fires_after_per_run_consent(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _prompt: "y") + assert notify.prompt_notify_for_attack("Brute Force") is True + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_job_done("Brute Force", 1) + assert send.called + + def test_suppression_silences_fire(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + with patch("hate_crack.notify._send_pushover") as send: + with notify.suppressed_notifications(): + notify.notify_job_done("Brute Force", 1) + send.assert_not_called() + + def test_does_not_leak_plaintext_in_message(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + # Message we pass to notify_job_done is count + attack_name + hash + # path only. Confirm nothing in the payload contains a plausible + # plaintext password token. + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_job_done("Brute Force", 42, hash_file="/tmp/h.txt") + title, message = send.call_args.args[2], send.call_args.args[3] + for banned in ("plaintext", "password=", "secret"): + assert banned not in title + assert banned not in message + + +class TestNotifyCrack: + def test_fires_when_allowed(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_crack("alice", "Brute Force") + assert send.called + + def test_no_op_when_suppressed(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + with patch("hate_crack.notify._send_pushover") as send: + with notify.suppressed_notifications(): + notify.notify_crack("alice", "Brute Force") + send.assert_not_called() + + +class TestPromptNotifyForAttack: + def test_no_prompt_when_globally_disabled(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=False) + calls: list[str] = [] + notify.set_input_func(lambda p: calls.append(p) or "y") + assert notify.prompt_notify_for_attack("Brute Force") is False + assert calls == [] + + def test_no_prompt_when_already_in_allowlist(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + calls: list[str] = [] + notify.set_input_func(lambda p: calls.append(p) or "n") + assert notify.prompt_notify_for_attack("Brute Force") is True + assert calls == [] + + def test_answer_yes(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _: "y") + assert notify.prompt_notify_for_attack("Brute Force") is True + + def test_answer_no(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _: "n") + assert notify.prompt_notify_for_attack("Brute Force") is False + + def test_answer_empty_defaults_to_no(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _: "") + assert notify.prompt_notify_for_attack("Brute Force") is False + + def test_answer_always_persists_to_allowlist(self, tmp_path: Path) -> None: + config_path = _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _: "always") + assert notify.prompt_notify_for_attack("Brute Force") is True + data = json.loads(config_path.read_text()) + assert "Brute Force" in data.get("notify_attack_allowlist", []) + # Settings in memory also updated so we don't re-prompt next call. + settings = notify.get_settings() + assert "Brute Force" in settings.attack_allowlist + + def test_always_is_idempotent(self, tmp_path: Path) -> None: + config_path = _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _: "always") + notify.prompt_notify_for_attack("Brute Force") + notify.prompt_notify_for_attack("Brute Force") + data = json.loads(config_path.read_text()) + assert data["notify_attack_allowlist"].count("Brute Force") == 1 + + +class TestToggleEnabled: + def test_toggle_flips_and_persists(self, tmp_path: Path) -> None: + config_path = _init_with(tmp_path, notify_enabled=False) + assert notify.get_settings().enabled is False + new_state = notify.toggle_enabled() + assert new_state is True + assert notify.get_settings().enabled is True + data = json.loads(config_path.read_text()) + assert data["notify_enabled"] is True + + # Flip back. + assert notify.toggle_enabled() is False + data = json.loads(config_path.read_text()) + assert data["notify_enabled"] is False + + def test_toggle_without_init_still_works(self) -> None: + notify.clear_state_for_tests() + # Call toggle before init ever ran — should just flip in-memory. + assert notify.toggle_enabled() is True + assert notify.get_settings().enabled is True + + +class TestStartStopTailer: + def test_start_tailer_noop_when_disabled(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=False, notify_per_crack_enabled=True) + t = notify.start_tailer(str(tmp_path / "h.out"), "Brute Force") + assert t is None + + def test_start_tailer_noop_when_per_crack_disabled(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_per_crack_enabled=False, + notify_attack_allowlist=["Brute Force"], + ) + t = notify.start_tailer(str(tmp_path / "h.out"), "Brute Force") + assert t is None + + def test_start_tailer_noop_when_suppressed(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_per_crack_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + with notify.suppressed_notifications(): + t = notify.start_tailer(str(tmp_path / "h.out"), "Brute Force") + assert t is None + + def test_start_tailer_when_enabled_and_consented(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_per_crack_enabled=True, + notify_attack_allowlist=["Brute Force"], + notify_poll_interval_seconds=0.05, + ) + out = tmp_path / "h.out" + out.write_text("") + t = notify.start_tailer(str(out), "Brute Force") + try: + assert t is not None + assert t.is_alive() + finally: + notify.stop_tailer(t) + + def test_stop_tailer_none_is_noop(self) -> None: + # Must not raise. + notify.stop_tailer(None) diff --git a/tests/test_notify_pushover.py b/tests/test_notify_pushover.py new file mode 100644 index 0000000..8b9f759 --- /dev/null +++ b/tests/test_notify_pushover.py @@ -0,0 +1,88 @@ +"""Unit tests for the Pushover HTTP backend.""" +from unittest.mock import MagicMock, patch + +from hate_crack.notify import pushover + + +def _mock_response(status_code: int = 200) -> MagicMock: + r = MagicMock() + r.status_code = status_code + return r + + +class TestSendPushoverSuccess: + def test_returns_true_on_http_200(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.return_value = _mock_response(200) + ok = pushover._send_pushover("tok", "usr", "title", "msg") + assert ok is True + + def test_payload_has_no_plaintext(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.return_value = _mock_response(200) + pushover._send_pushover("tok", "usr", "Crack complete", "alice cracked") + (url,), kwargs = mock_requests.post.call_args + assert url == pushover.PUSHOVER_URL + data = kwargs["data"] + assert set(data.keys()) == {"token", "user", "title", "message"} + assert data["token"] == "tok" + assert data["user"] == "usr" + # The whole payload is just title + message + creds. No 'password', + # 'hash', 'plaintext' or similar keys ever appear. + assert "password" not in data + assert "plaintext" not in data + assert "hash" not in data + + def test_timeout_is_passed(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.return_value = _mock_response(200) + pushover._send_pushover("tok", "usr", "t", "m") + _, kwargs = mock_requests.post.call_args + assert kwargs["timeout"] == 10 + + +class TestSendPushoverFailureModes: + def test_missing_token_returns_false_without_calling_requests(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + ok = pushover._send_pushover("", "usr", "t", "m") + assert ok is False + mock_requests.post.assert_not_called() + + def test_missing_user_returns_false(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + ok = pushover._send_pushover("tok", "", "t", "m") + assert ok is False + mock_requests.post.assert_not_called() + + def test_network_error_returns_false(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.side_effect = ConnectionError("refused") + ok = pushover._send_pushover("tok", "usr", "t", "m") + assert ok is False + + def test_generic_exception_returns_false(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.side_effect = RuntimeError("boom") + ok = pushover._send_pushover("tok", "usr", "t", "m") + assert ok is False + + def test_non_200_response_returns_false(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.return_value = _mock_response(500) + ok = pushover._send_pushover("tok", "usr", "t", "m") + assert ok is False + + def test_missing_requests_returns_false(self) -> None: + with patch.object(pushover, "requests", None): + ok = pushover._send_pushover("tok", "usr", "t", "m") + assert ok is False + + def test_never_raises(self) -> None: + # Even if requests.post returns something weird (no status_code), + # the function must not raise. + bad = MagicMock() + # Accessing .status_code returns a non-int Mock — it will not equal 200. + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.return_value = bad + ok = pushover._send_pushover("tok", "usr", "t", "m") + assert ok is False diff --git a/tests/test_notify_settings.py b/tests/test_notify_settings.py new file mode 100644 index 0000000..8f5dc7b --- /dev/null +++ b/tests/test_notify_settings.py @@ -0,0 +1,159 @@ +"""Unit tests for hate_crack.notify.settings.""" +import json +from pathlib import Path + +from hate_crack.notify.settings import ( + NotifySettings, + add_to_allowlist, + load_settings, + save_enabled, +) + + +class TestNotifySettingsDataclass: + def test_defaults(self) -> None: + s = NotifySettings() + assert s.enabled is False + assert s.pushover_token == "" + assert s.pushover_user == "" + assert s.per_crack_enabled is False + assert s.attack_allowlist == [] + assert s.suppress_in_orchestrators is True + assert s.max_cracks_per_burst == 5 + assert s.poll_interval_seconds == 5.0 + + def test_allowlist_default_is_fresh_per_instance(self) -> None: + # field(default_factory=list) must not share state. + a = NotifySettings() + b = NotifySettings() + a.attack_allowlist.append("Brute Force") + assert b.attack_allowlist == [] + + +class TestLoadSettings: + def test_load_from_empty_dict_returns_defaults(self) -> None: + s = load_settings({}) + assert s == NotifySettings() + + def test_load_from_none_returns_defaults(self) -> None: + assert load_settings(None) == NotifySettings() + + def test_load_full_dict(self) -> None: + s = load_settings({ + "notify_enabled": True, + "notify_pushover_token": "tok", + "notify_pushover_user": "usr", + "notify_per_crack_enabled": True, + "notify_attack_allowlist": ["Brute Force", "Dictionary"], + "notify_suppress_in_orchestrators": False, + "notify_max_cracks_per_burst": 20, + "notify_poll_interval_seconds": 2.5, + }) + assert s.enabled is True + assert s.pushover_token == "tok" + assert s.pushover_user == "usr" + assert s.per_crack_enabled is True + assert s.attack_allowlist == ["Brute Force", "Dictionary"] + assert s.suppress_in_orchestrators is False + assert s.max_cracks_per_burst == 20 + assert s.poll_interval_seconds == 2.5 + + def test_load_tolerates_bad_types(self) -> None: + s = load_settings({ + "notify_enabled": "true", + "notify_max_cracks_per_burst": "not-a-number", + "notify_poll_interval_seconds": "also-bad", + "notify_attack_allowlist": "not-a-list", + }) + # string "true" -> True + assert s.enabled is True + # bad ints fall back to defaults (5, 5.0) + assert s.max_cracks_per_burst == 5 + assert s.poll_interval_seconds == 5.0 + # non-list allowlist becomes empty list + assert s.attack_allowlist == [] + + +class TestSaveEnabled: + def test_writes_new_config(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + save_enabled(str(config_path), True) + data = json.loads(config_path.read_text()) + assert data["notify_enabled"] is True + + def test_preserves_existing_keys(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + initial = { + "hcatBin": "hashcat", + "hashview_api_key": "secret", + "notify_enabled": False, + } + config_path.write_text(json.dumps(initial)) + save_enabled(str(config_path), True) + data = json.loads(config_path.read_text()) + assert data["hcatBin"] == "hashcat" + assert data["hashview_api_key"] == "secret" + assert data["notify_enabled"] is True + + def test_toggles_back_and_forth(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + save_enabled(str(config_path), True) + save_enabled(str(config_path), False) + data = json.loads(config_path.read_text()) + assert data["notify_enabled"] is False + + def test_invalid_existing_config_replaced(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text("this is not json") + save_enabled(str(config_path), True) + data = json.loads(config_path.read_text()) + assert data == {"notify_enabled": True} + + def test_atomic_no_half_write(self, tmp_path: Path) -> None: + # A partial write should never leave the main file invalid. We + # check that after save_enabled, parsing always succeeds. + config_path = tmp_path / "config.json" + for flag in (True, False, True, False, True): + save_enabled(str(config_path), flag) + json.loads(config_path.read_text()) # must not raise + + +class TestAddToAllowlist: + def test_adds_to_empty_list(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + add_to_allowlist(str(config_path), "Brute Force") + data = json.loads(config_path.read_text()) + assert data["notify_attack_allowlist"] == ["Brute Force"] + + def test_idempotent(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + add_to_allowlist(str(config_path), "Brute Force") + add_to_allowlist(str(config_path), "Brute Force") + add_to_allowlist(str(config_path), "Brute Force") + data = json.loads(config_path.read_text()) + assert data["notify_attack_allowlist"] == ["Brute Force"] + + def test_preserves_other_entries(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps({ + "hcatBin": "hashcat", + "notify_attack_allowlist": ["Existing"], + })) + add_to_allowlist(str(config_path), "Brute Force") + data = json.loads(config_path.read_text()) + assert data["hcatBin"] == "hashcat" + assert data["notify_attack_allowlist"] == ["Existing", "Brute Force"] + + def test_empty_attack_name_is_noop(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps({"notify_attack_allowlist": ["A"]})) + add_to_allowlist(str(config_path), "") + data = json.loads(config_path.read_text()) + assert data["notify_attack_allowlist"] == ["A"] + + def test_repairs_non_list_allowlist(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps({"notify_attack_allowlist": "bogus"})) + add_to_allowlist(str(config_path), "Brute Force") + data = json.loads(config_path.read_text()) + assert data["notify_attack_allowlist"] == ["Brute Force"] diff --git a/tests/test_notify_suppression.py b/tests/test_notify_suppression.py new file mode 100644 index 0000000..c57557f --- /dev/null +++ b/tests/test_notify_suppression.py @@ -0,0 +1,74 @@ +"""Unit tests for hate_crack.notify._suppress.""" +import threading +import time + +from hate_crack.notify._suppress import is_suppressed, suppressed_notifications + + +class TestSuppressionContextManager: + def test_default_is_not_suppressed(self) -> None: + assert is_suppressed() is False + + def test_inside_context_is_suppressed(self) -> None: + assert is_suppressed() is False + with suppressed_notifications(): + assert is_suppressed() is True + assert is_suppressed() is False + + def test_nested_restores_outer_state(self) -> None: + with suppressed_notifications(): + assert is_suppressed() is True + with suppressed_notifications(): + assert is_suppressed() is True + # Leaving inner context must still leave us suppressed. + assert is_suppressed() is True + assert is_suppressed() is False + + def test_exception_restores_state(self) -> None: + try: + with suppressed_notifications(): + raise RuntimeError("boom") + except RuntimeError: + pass + assert is_suppressed() is False + + +class TestSuppressionThreadLocal: + def test_other_thread_not_suppressed_by_us(self) -> None: + seen: list[bool] = [] + ready = threading.Event() + done = threading.Event() + + def worker() -> None: + # Wait until main thread has entered its suppression context, + # then sample our own state. + ready.wait(timeout=2.0) + seen.append(is_suppressed()) + done.set() + + t = threading.Thread(target=worker, daemon=True) + t.start() + with suppressed_notifications(): + assert is_suppressed() is True + ready.set() + done.wait(timeout=2.0) + t.join(timeout=2.0) + assert seen == [False], f"worker thread saw suppression state: {seen}" + + def test_worker_thread_suppression_does_not_leak(self) -> None: + worker_done = threading.Event() + + def worker() -> None: + with suppressed_notifications(): + assert is_suppressed() is True + # Hold long enough that the main thread can sample. + time.sleep(0.05) + worker_done.set() + + t = threading.Thread(target=worker, daemon=True) + t.start() + # Main thread should never observe the worker's suppression. + assert is_suppressed() is False + t.join(timeout=2.0) + assert worker_done.is_set() + assert is_suppressed() is False diff --git a/tests/test_notify_tailer.py b/tests/test_notify_tailer.py new file mode 100644 index 0000000..7404ed2 --- /dev/null +++ b/tests/test_notify_tailer.py @@ -0,0 +1,219 @@ +"""Unit tests for the CrackTailer polling thread and username extractor.""" +import time +from pathlib import Path +from unittest.mock import MagicMock + +from hate_crack.notify.settings import NotifySettings +from hate_crack.notify.tailer import CrackTailer, extract_username_from_out_line + + +def _settings(**overrides) -> NotifySettings: + defaults = { + "enabled": True, + "per_crack_enabled": True, + "max_cracks_per_burst": 5, + # Short poll so tests don't hang. + "poll_interval_seconds": 0.05, + } + defaults.update(overrides) + return NotifySettings(**defaults) + + +def _make_tailer(out_path: Path, **overrides): + notify = MagicMock(name="notify_callback") + aggregate = MagicMock(name="aggregate_callback") + settings = _settings(**overrides) + tailer = CrackTailer( + out_path=str(out_path), + attack_name="Brute Force", + settings=settings, + notify_callback=notify, + aggregate_callback=aggregate, + ) + return tailer, notify, aggregate + + +def _wait_until(predicate, timeout=2.0): + """Spin until predicate() or timeout; returns whether it passed.""" + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return True + time.sleep(0.02) + return predicate() + + +class TestExtractUsername: + def test_bare_hash_returns_none(self) -> None: + assert extract_username_from_out_line("5f4dcc3b5aa10b0be:secret") is None + + def test_user_hash_plain(self) -> None: + assert extract_username_from_out_line("alice:5f4dcc:secret") == "alice" + + def test_pwdump(self) -> None: + assert ( + extract_username_from_out_line("alice:1001:aad3b435:31d6cfe:::secret") + == "alice" + ) + + def test_netntlmv2(self) -> None: + line = "alice::DOMAIN:1122334455667788:resp:resp2:secret" + assert extract_username_from_out_line(line) == "alice" + + def test_empty_line(self) -> None: + assert extract_username_from_out_line("") is None + + def test_whitespace_only(self) -> None: + assert extract_username_from_out_line("\n") is None + + def test_line_with_trailing_newline(self) -> None: + assert extract_username_from_out_line("alice:hash:plain\n") == "alice" + + def test_does_not_leak_plaintext(self) -> None: + # No matter the format, the return value must never equal the + # plaintext password. Paranoid test; relies on us knowing where + # the plaintext sits in each format. + for line, plain in [ + ("bob:1000:aa:bb:::s3cret", "s3cret"), + ("bob:hash:s3cret", "s3cret"), + ("bob::DOM:chal:resp:resp2:s3cret", "s3cret"), + ]: + assert extract_username_from_out_line(line) != plain + + +class TestCrackTailerStart: + def test_daemon_is_true(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, _, _ = _make_tailer(out) + assert tailer.daemon is True + + def test_seeks_to_eof_on_start(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("alice:hash:plain\nbob:hash:plain\n") + tailer, notify, aggregate = _make_tailer(out) + tailer.start() + try: + # Allow at least one poll interval; no new lines were added, so + # no notifications should fire. + time.sleep(0.2) + assert notify.call_count == 0 + assert aggregate.call_count == 0 + finally: + tailer.stop() + + def test_new_lines_fire_notify(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, notify, aggregate = _make_tailer(out) + tailer.start() + try: + with open(out, "a") as f: + f.write("alice:hash:plain\n") + f.write("bob:hash:plain\n") + assert _wait_until(lambda: notify.call_count >= 2) + assert aggregate.call_count == 0 + labels = [call.args[0] for call in notify.call_args_list] + assert "alice" in labels + assert "bob" in labels + finally: + tailer.stop() + + def test_no_username_falls_back_to_attack_name(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, notify, aggregate = _make_tailer(out) + tailer.start() + try: + with open(out, "a") as f: + # Bare-hash format -> extractor returns None -> fallback. + f.write("5f4dcc3b5:plain\n") + assert _wait_until(lambda: notify.call_count >= 1) + assert notify.call_args.args[0] == "Brute Force" + finally: + tailer.stop() + + +class TestCrackTailerBurstCap: + def test_burst_cap_fires_aggregate(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, notify, aggregate = _make_tailer(out, max_cracks_per_burst=3) + tailer.start() + try: + # Write 10 lines in one shot; a single poll tick must see them + # all and collapse into one aggregate call. + with open(out, "a") as f: + for i in range(10): + f.write(f"user{i}:hash:plain\n") + assert _wait_until(lambda: aggregate.call_count >= 1) + # Per-crack path must NOT have fired for this burst. + assert notify.call_count == 0 + args = aggregate.call_args.args + assert args[0] == 10 + assert args[1] == "Brute Force" + finally: + tailer.stop() + + def test_under_cap_fires_per_crack(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, notify, aggregate = _make_tailer(out, max_cracks_per_burst=10) + tailer.start() + try: + with open(out, "a") as f: + for i in range(3): + f.write(f"user{i}:hash:plain\n") + assert _wait_until(lambda: notify.call_count >= 3) + assert aggregate.call_count == 0 + finally: + tailer.stop() + + +class TestCrackTailerStop: + def test_stop_joins_within_timeout(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, _, _ = _make_tailer(out) + tailer.start() + start = time.time() + tailer.stop() + elapsed = time.time() - start + assert not tailer.is_alive() + assert elapsed < 5.0 + + def test_stop_is_safe_without_start(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, _, _ = _make_tailer(out) + # Calling stop on a never-started thread should not raise. + tailer.stop() + + +class TestCrackTailerFileHandling: + def test_missing_file_then_appearing(self, tmp_path: Path) -> None: + out = tmp_path / "missing.out" + tailer, notify, _ = _make_tailer(out) + tailer.start() + try: + time.sleep(0.15) + out.write_text("alice:hash:plain\n") + assert _wait_until(lambda: notify.call_count >= 1) + finally: + tailer.stop() + + def test_truncation_resets_position(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("alice:hash:plain\nbob:hash:plain\n") + tailer, notify, _ = _make_tailer(out) + tailer.start() + try: + time.sleep(0.15) + # Truncate the file and write a fresh line; tailer should reset + # its file position and see the new line. + out.write_text("charlie:hash:plain\n") + assert _wait_until(lambda: notify.call_count >= 1) + labels = [call.args[0] for call in notify.call_args_list] + assert "charlie" in labels + finally: + tailer.stop()