diff --git a/hate_crack/notify/__init__.py b/hate_crack/notify/__init__.py new file mode 100644 index 0000000..9f835e4 --- /dev/null +++ b/hate_crack/notify/__init__.py @@ -0,0 +1,291 @@ +"""Public notification API for hate_crack. + +Overview +======== + +This package wires per-attack and per-crack notifications into the core +hashcat runner. The design is intentionally small and functional: a single +module-level ``_settings`` object, a handful of helper functions, and one +``CrackTailer`` thread class where polling state genuinely wants OO. + +Wiring +====== + +At startup ``main.py`` calls :func:`init` with the resolved config path and +parsed config dict. After that, the rest of the codebase interacts with +this package via: + +- :func:`prompt_notify_for_attack` -- called by attacks.py before an attack + starts; asks the user ``[y/n/always]`` and stashes per-run consent. +- :func:`start_tailer` / :func:`stop_tailer` -- called by the hashcat + command wrapper to spin a background watcher on ``{hashfile}.out``. +- :func:`notify_job_done` -- called by the hashcat command wrapper after + the subprocess exits, fires one summary notification. +- :func:`suppressed_notifications` -- context manager for orchestrator + attacks that chain many primitives; collapses all nested notifications + so the orchestrator can fire a single aggregate at the end. + +Adding a new backend +==================== + +The single concrete HTTP call lives in :mod:`hate_crack.notify.pushover`. +To add Slack/webhooks, write a sibling ``_send_slack()`` there (or in a +new module) and dispatch from :func:`notify_job_done` / +:func:`notify_crack`. No framework, no ABC — one function per transport. +""" + +from __future__ import annotations + +import logging +from typing import Callable + +from hate_crack.notify._suppress import ( + is_suppressed, + suppressed_notifications, +) +from hate_crack.notify.pushover import _send_pushover +from hate_crack.notify.settings import ( + NotifySettings, + add_to_allowlist, + load_settings, + save_enabled, +) +from hate_crack.notify.tailer import ( + CrackTailer, + extract_username_from_out_line, +) + +logger = logging.getLogger(__name__) + + +# Module-level runtime state. Treated as a singleton because the CLI is a +# single-process tool with a single user; no need to pass a context object +# through every attack signature. +_settings: NotifySettings | None = None +_config_path: str | None = None + +# Per-run consent cache: attack_name -> bool. Populated by +# ``prompt_notify_for_attack`` and consulted by ``notify_job_done`` so we +# don't need to re-prompt mid-chain. +_run_consent: dict[str, bool] = {} + +# Input function indirection so tests can inject answers without pulling +# in a terminal. Swap via ``set_input_func``. +_input_func: Callable[[str], str] = input + + +__all__ = [ + "CrackTailer", + "NotifySettings", + "add_to_allowlist", + "clear_state_for_tests", + "extract_username_from_out_line", + "get_settings", + "init", + "is_suppressed", + "notify_crack", + "notify_job_done", + "prompt_notify_for_attack", + "set_input_func", + "start_tailer", + "stop_tailer", + "suppressed_notifications", + "toggle_enabled", + "_send_pushover", +] + + +def init(config_path: str | None, config_parser: dict | None) -> None: + """Bootstrap the notify subsystem from the resolved config. + + Called once from ``main.py`` after its config-loading block. Safe to + call multiple times — the second call replaces settings but does not + reset per-run consent (the user may already have answered prompts). + """ + global _settings, _config_path + _config_path = config_path + _settings = load_settings(config_parser) + + +def get_settings() -> NotifySettings: + """Return the active settings, or fresh defaults if ``init`` never ran.""" + return _settings if _settings is not None else NotifySettings() + + +def set_input_func(func: Callable[[str], str]) -> None: + """Test hook: swap the ``input()`` used by :func:`prompt_notify_for_attack`.""" + global _input_func + _input_func = func + + +def clear_state_for_tests() -> None: + """Reset module state. Only used by the test suite.""" + global _settings, _config_path, _input_func + _settings = None + _config_path = None + _run_consent.clear() + _input_func = input + + +def toggle_enabled() -> bool: + """Flip ``notify_enabled``, persist to ``config.json``, return new state. + + If ``init`` was never called we still toggle an in-memory default — the + UI update must not crash even if the config file is unreachable. + """ + global _settings + if _settings is None: + _settings = NotifySettings() + _settings.enabled = not _settings.enabled + if _config_path: + try: + save_enabled(_config_path, _settings.enabled) + except OSError as exc: + logger.warning("Could not persist notify_enabled: %s", exc) + return _settings.enabled + + +def _in_allowlist(attack_name: str) -> bool: + return attack_name in get_settings().attack_allowlist + + +def prompt_notify_for_attack(attack_name: str) -> bool: + """Ask the user whether this attack should fire a notification. + + Returns ``True`` if notifications should fire for this run. + + Flow: + + 1. Notifications disabled globally -> return False silently (no prompt). + 2. Attack already in allowlist -> return True (no prompt, auto-on). + 3. Otherwise -> prompt ``[y/n/always]``: + * ``y`` -> consent for this run only. + * ``n`` / "" -> no consent. + * ``always`` -> persist to allowlist and consent for this run. + + Per-run consent is stashed in ``_run_consent[attack_name]`` so the + hashcat wrapper can query it at job-done time without re-prompting. + """ + settings = get_settings() + if not settings.enabled: + _run_consent[attack_name] = False + return False + if _in_allowlist(attack_name): + _run_consent[attack_name] = True + return True + + try: + raw = _input_func( + f"\n[notify] Send Pushover notifications for '{attack_name}'? [y/N/always]: " + ) + except EOFError: + raw = "" + answer = (raw or "").strip().lower() + if answer == "always": + _run_consent[attack_name] = True + if _config_path: + try: + add_to_allowlist(_config_path, attack_name) + # Also update the in-memory settings so a later call in the + # same session sees the allowlist without re-reading config. + if attack_name not in settings.attack_allowlist: + settings.attack_allowlist.append(attack_name) + except OSError as exc: + logger.warning("Could not persist allowlist entry: %s", exc) + return True + if answer in ("y", "yes"): + _run_consent[attack_name] = True + return True + _run_consent[attack_name] = False + return False + + +def _should_fire(attack_name: str) -> bool: + if is_suppressed(): + return False + settings = get_settings() + if not settings.enabled: + return False + if _in_allowlist(attack_name): + return True + return _run_consent.get(attack_name, False) + + +def notify_job_done( + attack_name: str, + cracked_count: int, + hash_file: str | None = None, +) -> None: + """Fire a single "attack complete" notification. + + No-op when suppressed, disabled, or the user declined at the prompt. + """ + if not _should_fire(attack_name): + return + settings = get_settings() + title = f"hate_crack: {attack_name} complete" + if hash_file: + message = ( + f"Attack '{attack_name}' finished.\n" + f"Cracked so far: {cracked_count}\n" + f"Hash file: {hash_file}" + ) + else: + message = f"Attack '{attack_name}' finished.\nCracked so far: {cracked_count}" + _send_pushover(settings.pushover_token, settings.pushover_user, title, message) + + +def notify_crack(label: str, attack_name: str) -> None: + """Fire a per-crack notification (called from :class:`CrackTailer`).""" + if not _should_fire(attack_name): + return + settings = get_settings() + title = "hate_crack: new crack" + message = f"{label} cracked ({attack_name})" + _send_pushover(settings.pushover_token, settings.pushover_user, title, message) + + +def _notify_aggregate(count: int, attack_name: str) -> None: + """Aggregated "N accounts cracked" notification for burst-capped ticks.""" + if not _should_fire(attack_name): + return + settings = get_settings() + title = "hate_crack: crack burst" + message = f"{count} new accounts cracked ({attack_name})" + _send_pushover(settings.pushover_token, settings.pushover_user, title, message) + + +def start_tailer(out_path: str, attack_name: str) -> CrackTailer | None: + """Start a :class:`CrackTailer` if per-crack notifications are enabled. + + Returns the running tailer (so the caller can stop it later), or + ``None`` when suppression/disabled/disallowed mean we shouldn't tail. + """ + if is_suppressed(): + return None + settings = get_settings() + if not settings.enabled: + return None + if not settings.per_crack_enabled: + return None + if not _should_fire(attack_name): + return None + tailer = CrackTailer( + out_path=out_path, + attack_name=attack_name, + settings=settings, + notify_callback=notify_crack, + aggregate_callback=_notify_aggregate, + ) + tailer.start() + return tailer + + +def stop_tailer(tailer: CrackTailer | None) -> None: + """Stop a tailer started by :func:`start_tailer`. ``None`` is a no-op.""" + if tailer is None: + return + try: + tailer.stop() + except Exception as exc: + logger.warning("CrackTailer.stop() failed: %s", exc) diff --git a/hate_crack/notify/_suppress.py b/hate_crack/notify/_suppress.py new file mode 100644 index 0000000..d70a40a --- /dev/null +++ b/hate_crack/notify/_suppress.py @@ -0,0 +1,38 @@ +"""Thread-local notification suppression. + +Multi-phase orchestrators (e.g. ``extensive_crack``) chain many hashcat +invocations. Without suppression every primitive would fire its own +"job complete" notification — loud, useless, and easy to rate-limit into +the ground. Callers wrap the chain in :func:`suppressed_notifications` +and then fire exactly one aggregate notification at the end. + +Suppression is thread-local so a tailer thread in a different context +can't accidentally silence itself by observing another thread's flag. +""" + +from __future__ import annotations + +import contextlib +import threading + +_suppressed = threading.local() + + +def is_suppressed() -> bool: + """Return True if the calling thread is inside a suppression context.""" + return getattr(_suppressed, "active", False) + + +@contextlib.contextmanager +def suppressed_notifications(): + """Context manager that suppresses notify_* calls on the current thread. + + Nests correctly: the previous state is restored on exit so an outer + ``with`` block still reflects "suppressed" after an inner one exits. + """ + prev = getattr(_suppressed, "active", False) + _suppressed.active = True + try: + yield + finally: + _suppressed.active = prev diff --git a/hate_crack/notify/pushover.py b/hate_crack/notify/pushover.py new file mode 100644 index 0000000..e0b3e74 --- /dev/null +++ b/hate_crack/notify/pushover.py @@ -0,0 +1,69 @@ +"""Pushover HTTP backend. + +Single concrete entry point: :func:`_send_pushover`. Exposed as a module-level +function (not a class) so that wiring in a Slack or generic webhook backend +later is a matter of writing a sibling ``_send_slack()`` alongside this one. + +Design invariants: + +- The call is a best-effort side effect. A crashed attack is strictly worse + than a missed notification, so every exception below the ``requests`` layer + is caught and swallowed (returning ``False``). +- Missing ``requests`` is treated the same as any other transport error — + we log a warning once and return ``False`` without re-raising. +- Payload contains *only* title + message + credentials. No plaintexts, no + hash data. The tailer guarantees this at call sites; this function is the + final safety barrier if a caller passes bad data (we still forward whatever + string it gives us, but we never introspect outside the message). +""" + +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) + +try: + import requests # type: ignore[import-untyped] +except ImportError: # pragma: no cover - requests is a hard dep of the package + requests = None # type: ignore[assignment] + + +PUSHOVER_URL = "https://api.pushover.net/1/messages.json" + + +def _send_pushover(token: str, user: str, title: str, message: str) -> bool: + """POST a notification to Pushover. + + Returns ``True`` on HTTP 200, ``False`` on any other outcome. Never + raises — network errors, missing credentials, and missing ``requests`` + are all funneled into a ``False`` return so the caller can treat the + notification as a fire-and-forget side effect. + """ + if not token or not user: + logger.debug("Pushover not configured: missing token or user") + return False + + if requests is None: + logger.warning("Pushover requested but 'requests' is not importable") + return False + + payload = { + "token": token, + "user": user, + "title": title, + "message": message, + } + try: + response = requests.post(PUSHOVER_URL, data=payload, timeout=10) + except Exception as exc: + # requests.RequestException covers most, but a plugin/mock may raise + # a bare Exception — we must never let this escape. + logger.warning("Pushover send failed: %s", exc) + return False + + status = getattr(response, "status_code", None) + if status == 200: + return True + logger.warning("Pushover send returned HTTP %s", status) + return False diff --git a/hate_crack/notify/settings.py b/hate_crack/notify/settings.py new file mode 100644 index 0000000..0e388bc --- /dev/null +++ b/hate_crack/notify/settings.py @@ -0,0 +1,166 @@ +"""Notification settings: dataclass + atomic config persistence. + +Settings live in the same ``config.json`` that drives the rest of hate_crack. +This module isolates (a) the typed shape of notification config and +(b) the read-modify-write persistence primitives used by the runtime toggle +and the ``[yes/no/always]`` per-attack prompt. + +Persistence follows the same pattern as ``main.py`` (``json.load`` -> +mutate -> ``json.dump(..., indent=2)`` via a temp file and ``os.replace``) +so a crash mid-write cannot corrupt the config. +""" + +from __future__ import annotations + +import json +import os +import tempfile +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class NotifySettings: + """Typed view of the ``notify_*`` keys from ``config.json``. + + Defaults mirror ``config.json.example`` so freshly-loaded configs and + in-memory fallbacks agree. + """ + + enabled: bool = False + pushover_token: str = "" + pushover_user: str = "" + per_crack_enabled: bool = False + attack_allowlist: list[str] = field(default_factory=list) + suppress_in_orchestrators: bool = True + max_cracks_per_burst: int = 5 + poll_interval_seconds: float = 5.0 + + +def _coerce_bool(value: Any, default: bool) -> bool: + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + if isinstance(value, str): + return value.strip().lower() in ("1", "true", "yes", "on") + return default + + +def _coerce_int(value: Any, default: int) -> int: + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _coerce_float(value: Any, default: float) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default + + +def _coerce_str(value: Any, default: str) -> str: + if value is None: + return default + return str(value) + + +def _coerce_list(value: Any) -> list[str]: + if isinstance(value, list): + return [str(v) for v in value] + return [] + + +def load_settings(config_parser: dict | None) -> NotifySettings: + """Build a ``NotifySettings`` from a parsed config dict. + + Unknown / missing / badly-typed keys fall back to dataclass defaults + so the runtime always has a valid settings object, even when the + config was written by an older hate_crack install. + """ + cfg = config_parser or {} + defaults = NotifySettings() + return NotifySettings( + enabled=_coerce_bool(cfg.get("notify_enabled"), defaults.enabled), + pushover_token=_coerce_str(cfg.get("notify_pushover_token"), defaults.pushover_token), + pushover_user=_coerce_str(cfg.get("notify_pushover_user"), defaults.pushover_user), + per_crack_enabled=_coerce_bool( + cfg.get("notify_per_crack_enabled"), defaults.per_crack_enabled + ), + attack_allowlist=_coerce_list(cfg.get("notify_attack_allowlist")), + suppress_in_orchestrators=_coerce_bool( + cfg.get("notify_suppress_in_orchestrators"), + defaults.suppress_in_orchestrators, + ), + max_cracks_per_burst=_coerce_int( + cfg.get("notify_max_cracks_per_burst"), defaults.max_cracks_per_burst + ), + poll_interval_seconds=_coerce_float( + cfg.get("notify_poll_interval_seconds"), defaults.poll_interval_seconds + ), + ) + + +def _atomic_rewrite(config_path: str, mutator) -> None: + """Read config_path, apply ``mutator(dict)`` in place, write atomically. + + - Missing file is treated as empty dict (mutator runs on ``{}``). + - Invalid JSON is silently replaced with the mutator's output; we do not + want to block a notification toggle on a pre-existing bad config. + - Write goes to a temp file in the same directory and is swapped in + via ``os.replace`` so readers never see a half-written file. + """ + data: dict = {} + if os.path.isfile(config_path): + try: + with open(config_path) as f: + loaded = json.load(f) + if isinstance(loaded, dict): + data = loaded + except (OSError, json.JSONDecodeError): + data = {} + mutator(data) + directory = os.path.dirname(os.path.abspath(config_path)) or "." + os.makedirs(directory, exist_ok=True) + fd, tmp_path = tempfile.mkstemp(prefix=".config-", suffix=".json", dir=directory) + try: + with os.fdopen(fd, "w") as tmp: + json.dump(data, tmp, indent=2) + os.replace(tmp_path, config_path) + except Exception: + # Best-effort cleanup of stale temp file on failure. + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + +def save_enabled(config_path: str, enabled: bool) -> None: + """Persist ``notify_enabled`` without disturbing other config keys.""" + + def _apply(data: dict) -> None: + data["notify_enabled"] = bool(enabled) + + _atomic_rewrite(config_path, _apply) + + +def add_to_allowlist(config_path: str, attack_name: str) -> None: + """Append ``attack_name`` to ``notify_attack_allowlist`` if absent. + + Idempotent: already-present entries are a no-op. + """ + if not attack_name: + return + + def _apply(data: dict) -> None: + current = data.get("notify_attack_allowlist") + if not isinstance(current, list): + current = [] + if attack_name not in current: + current.append(attack_name) + data["notify_attack_allowlist"] = current + + _atomic_rewrite(config_path, _apply) diff --git a/hate_crack/notify/tailer.py b/hate_crack/notify/tailer.py new file mode 100644 index 0000000..0808eb1 --- /dev/null +++ b/hate_crack/notify/tailer.py @@ -0,0 +1,208 @@ +"""Background tailer that watches ``{hashfile}.out`` for new cracks. + +Design rationale: + +- Polling (not inotify/fsevents) keeps the implementation portable across + macOS/Linux without adding a dependency. +- We seek to EOF on start: the hashfile may already contain cracks from a + previous run, and re-notifying those would be both wrong and spammy. +- Burst cap: if a single poll tick yields more cracks than the user's + configured threshold, we collapse them into one aggregated notification + instead of N pings. Cracks tend to arrive in rule-file bursts, so this + knob matters. +- The thread is a daemon so a hung tailer can't block process exit. +- ``stop()`` joins with a timeout to guarantee forward progress even if the + poll loop is stuck on a slow filesystem. +""" + +from __future__ import annotations + +import logging +import os +import threading +from typing import Callable + +from hate_crack.notify.settings import NotifySettings + +logger = logging.getLogger(__name__) + + +def extract_username_from_out_line(line: str) -> str | None: + """Extract a username from a single hashcat ``.out`` line, if present. + + We support the four output layouts hate_crack can produce upstream: + + ======================= =========================================== ============ + Input shape Example Returns + ======================= =========================================== ============ + bare hash ``5f4dcc3b5aa:plaintext`` ``None`` + user:hash:plain ``alice:5f4dcc:plaintext`` ``"alice"`` + pwdump ``alice:1001:aad3b:31d6:::plaintext`` ``"alice"`` + NetNTLMv2 ``alice::DOMAIN:challenge:response:plain`` ``"alice"`` + ======================= =========================================== ============ + + The function deliberately NEVER returns the plaintext — that would + defeat the privacy guarantee documented on the notify public API. + """ + if not line: + return None + stripped = line.rstrip("\r\n") + if not stripped: + return None + + parts = stripped.split(":") + if len(parts) < 2: + return None + + # Bare hash (hash:plain): exactly two colons-separated fields and the + # first field is a plausible hash — any non-empty first field that is + # not a pure hex string may still be a username, so we fall through. + if len(parts) == 2: + return None + + first = parts[0] + if not first: + # Lines that start with ':' aren't in any format we care about. + return None + + # pwdump: user:RID:LM:NT:::plain -> field[1] is numeric RID + if len(parts) >= 7 and parts[1].isdigit(): + return first + + # NetNTLMv2: user::DOMAIN:... -> field[1] is empty + if len(parts) >= 5 and parts[1] == "": + return first + + # user:hash:plain style — assume first field is username. + # We only claim this when there's at least 3 fields so we don't + # mis-label a bare ``hash:plain`` as ``username:plain``. + if len(parts) >= 3: + return first + + return None + + +class CrackTailer(threading.Thread): + """Polls ``out_path`` for new cracked lines and dispatches notifications. + + The tailer is created and started by :func:`hate_crack.notify.start_tailer` + and stopped by :func:`hate_crack.notify.stop_tailer`. External callers + normally don't touch this class directly; it's public for tests. + """ + + daemon = True + + def __init__( + self, + out_path: str, + attack_name: str, + settings: NotifySettings, + notify_callback: Callable[[str, str], None], + aggregate_callback: Callable[[int, str], None], + *, + username_extractor: Callable[[str], str | None] | None = None, + ) -> None: + super().__init__(name=f"CrackTailer[{attack_name}]", daemon=True) + self.out_path = out_path + self.attack_name = attack_name + self.settings = settings + self._notify_crack = notify_callback + self._notify_aggregate = aggregate_callback + self._extract_username = username_extractor or extract_username_from_out_line + self._stop = threading.Event() + # Leftover bytes from the previous read that didn't end in \n. + self._buffer = b"" + # File position to read from; set on first successful open. + self._file_pos: int | None = None + + def stop(self) -> None: + """Signal the thread to exit and join with a bounded timeout. + + Safe to call on a tailer that was never started; in that case the + signal is still set (so a later ``start()`` would exit immediately) + and we skip the join that would otherwise raise. + """ + self._stop.set() + if self._started.is_set(): + self.join(timeout=10.0) + + def run(self) -> None: # pragma: no cover - thread entry, exercised via tests + try: + self._seek_to_eof() + # First tick of ``wait()`` blocks for the full interval; this is + # deliberate so we don't hammer the filesystem immediately after + # starting if hashcat hasn't written anything yet. + while not self._stop.wait(self.settings.poll_interval_seconds): + try: + self._poll_once() + except Exception as exc: + logger.warning("CrackTailer poll failed: %s", exc) + except Exception as exc: # pragma: no cover - defensive + logger.warning("CrackTailer crashed: %s", exc) + + def _seek_to_eof(self) -> None: + """Remember the current EOF so existing cracks aren't re-notified.""" + if os.path.isfile(self.out_path): + try: + self._file_pos = os.path.getsize(self.out_path) + except OSError: + self._file_pos = 0 + else: + self._file_pos = 0 + + def _poll_once(self) -> None: + if not os.path.isfile(self.out_path): + return + try: + size = os.path.getsize(self.out_path) + except OSError: + return + + if self._file_pos is None: + self._file_pos = size + return + + if size < self._file_pos: + # File was truncated / rewritten — reset rather than read garbage. + self._file_pos = 0 + self._buffer = b"" + + if size == self._file_pos: + return + + new_lines = self._read_new_lines(size) + if not new_lines: + return + + count = len(new_lines) + if count > self.settings.max_cracks_per_burst: + self._notify_aggregate(count, self.attack_name) + return + + for line_bytes in new_lines: + try: + line = line_bytes.decode("utf-8", errors="replace") + except Exception: + continue + label = self._extract_username(line) or self.attack_name + self._notify_crack(label, self.attack_name) + + def _read_new_lines(self, size: int) -> list[bytes]: + """Read from ``self._file_pos`` up to ``size`` and return full lines. + + Incomplete trailing bytes are buffered for the next tick. + """ + try: + with open(self.out_path, "rb") as f: + f.seek(self._file_pos) + data = f.read(size - self._file_pos) + except OSError: + return [] + self._file_pos = size + + combined = self._buffer + data + lines = combined.split(b"\n") + # The last element is either empty (buffer ended on \n) or a partial + # line — stash it for next poll either way. + self._buffer = lines.pop() if lines else b"" + return [line for line in lines if line] diff --git a/tests/test_notify_integration.py b/tests/test_notify_integration.py new file mode 100644 index 0000000..b757851 --- /dev/null +++ b/tests/test_notify_integration.py @@ -0,0 +1,242 @@ +"""Integration-style tests for the notify public API.""" +import json +from pathlib import Path +from unittest.mock import patch + +import pytest + +from hate_crack import notify + + +@pytest.fixture(autouse=True) +def _reset_notify_state(): + notify.clear_state_for_tests() + yield + notify.clear_state_for_tests() + + +def _init_with(tmp_path: Path, **kwargs) -> Path: + """Create a config file and initialize notify against it.""" + cfg = { + "notify_enabled": True, + "notify_pushover_token": "tok", + "notify_pushover_user": "usr", + } + cfg.update(kwargs) + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps(cfg)) + notify.init(str(config_path), cfg) + return config_path + + +class TestNotifyJobDone: + def test_no_op_when_disabled(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=False) + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_job_done("Brute Force", 3) + send.assert_not_called() + + def test_fires_when_enabled_and_in_allowlist(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_job_done("Brute Force", 7, hash_file="/tmp/h") + assert send.called + args = send.call_args.args + # (token, user, title, message) + assert args[0] == "tok" + assert args[1] == "usr" + assert "Brute Force" in args[2] + assert "7" in args[3] + + def test_no_op_when_enabled_but_attack_not_allowed(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=True) + # No allowlist, no consent -> should not fire. + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_job_done("Brute Force", 1) + send.assert_not_called() + + def test_fires_after_per_run_consent(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _prompt: "y") + assert notify.prompt_notify_for_attack("Brute Force") is True + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_job_done("Brute Force", 1) + assert send.called + + def test_suppression_silences_fire(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + with patch("hate_crack.notify._send_pushover") as send: + with notify.suppressed_notifications(): + notify.notify_job_done("Brute Force", 1) + send.assert_not_called() + + def test_does_not_leak_plaintext_in_message(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + # Message we pass to notify_job_done is count + attack_name + hash + # path only. Confirm nothing in the payload contains a plausible + # plaintext password token. + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_job_done("Brute Force", 42, hash_file="/tmp/h.txt") + title, message = send.call_args.args[2], send.call_args.args[3] + for banned in ("plaintext", "password=", "secret"): + assert banned not in title + assert banned not in message + + +class TestNotifyCrack: + def test_fires_when_allowed(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + with patch("hate_crack.notify._send_pushover") as send: + notify.notify_crack("alice", "Brute Force") + assert send.called + + def test_no_op_when_suppressed(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + with patch("hate_crack.notify._send_pushover") as send: + with notify.suppressed_notifications(): + notify.notify_crack("alice", "Brute Force") + send.assert_not_called() + + +class TestPromptNotifyForAttack: + def test_no_prompt_when_globally_disabled(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=False) + calls: list[str] = [] + notify.set_input_func(lambda p: calls.append(p) or "y") + assert notify.prompt_notify_for_attack("Brute Force") is False + assert calls == [] + + def test_no_prompt_when_already_in_allowlist(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + calls: list[str] = [] + notify.set_input_func(lambda p: calls.append(p) or "n") + assert notify.prompt_notify_for_attack("Brute Force") is True + assert calls == [] + + def test_answer_yes(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _: "y") + assert notify.prompt_notify_for_attack("Brute Force") is True + + def test_answer_no(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _: "n") + assert notify.prompt_notify_for_attack("Brute Force") is False + + def test_answer_empty_defaults_to_no(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _: "") + assert notify.prompt_notify_for_attack("Brute Force") is False + + def test_answer_always_persists_to_allowlist(self, tmp_path: Path) -> None: + config_path = _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _: "always") + assert notify.prompt_notify_for_attack("Brute Force") is True + data = json.loads(config_path.read_text()) + assert "Brute Force" in data.get("notify_attack_allowlist", []) + # Settings in memory also updated so we don't re-prompt next call. + settings = notify.get_settings() + assert "Brute Force" in settings.attack_allowlist + + def test_always_is_idempotent(self, tmp_path: Path) -> None: + config_path = _init_with(tmp_path, notify_enabled=True) + notify.set_input_func(lambda _: "always") + notify.prompt_notify_for_attack("Brute Force") + notify.prompt_notify_for_attack("Brute Force") + data = json.loads(config_path.read_text()) + assert data["notify_attack_allowlist"].count("Brute Force") == 1 + + +class TestToggleEnabled: + def test_toggle_flips_and_persists(self, tmp_path: Path) -> None: + config_path = _init_with(tmp_path, notify_enabled=False) + assert notify.get_settings().enabled is False + new_state = notify.toggle_enabled() + assert new_state is True + assert notify.get_settings().enabled is True + data = json.loads(config_path.read_text()) + assert data["notify_enabled"] is True + + # Flip back. + assert notify.toggle_enabled() is False + data = json.loads(config_path.read_text()) + assert data["notify_enabled"] is False + + def test_toggle_without_init_still_works(self) -> None: + notify.clear_state_for_tests() + # Call toggle before init ever ran — should just flip in-memory. + assert notify.toggle_enabled() is True + assert notify.get_settings().enabled is True + + +class TestStartStopTailer: + def test_start_tailer_noop_when_disabled(self, tmp_path: Path) -> None: + _init_with(tmp_path, notify_enabled=False, notify_per_crack_enabled=True) + t = notify.start_tailer(str(tmp_path / "h.out"), "Brute Force") + assert t is None + + def test_start_tailer_noop_when_per_crack_disabled(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_per_crack_enabled=False, + notify_attack_allowlist=["Brute Force"], + ) + t = notify.start_tailer(str(tmp_path / "h.out"), "Brute Force") + assert t is None + + def test_start_tailer_noop_when_suppressed(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_per_crack_enabled=True, + notify_attack_allowlist=["Brute Force"], + ) + with notify.suppressed_notifications(): + t = notify.start_tailer(str(tmp_path / "h.out"), "Brute Force") + assert t is None + + def test_start_tailer_when_enabled_and_consented(self, tmp_path: Path) -> None: + _init_with( + tmp_path, + notify_enabled=True, + notify_per_crack_enabled=True, + notify_attack_allowlist=["Brute Force"], + notify_poll_interval_seconds=0.05, + ) + out = tmp_path / "h.out" + out.write_text("") + t = notify.start_tailer(str(out), "Brute Force") + try: + assert t is not None + assert t.is_alive() + finally: + notify.stop_tailer(t) + + def test_stop_tailer_none_is_noop(self) -> None: + # Must not raise. + notify.stop_tailer(None) diff --git a/tests/test_notify_pushover.py b/tests/test_notify_pushover.py new file mode 100644 index 0000000..8b9f759 --- /dev/null +++ b/tests/test_notify_pushover.py @@ -0,0 +1,88 @@ +"""Unit tests for the Pushover HTTP backend.""" +from unittest.mock import MagicMock, patch + +from hate_crack.notify import pushover + + +def _mock_response(status_code: int = 200) -> MagicMock: + r = MagicMock() + r.status_code = status_code + return r + + +class TestSendPushoverSuccess: + def test_returns_true_on_http_200(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.return_value = _mock_response(200) + ok = pushover._send_pushover("tok", "usr", "title", "msg") + assert ok is True + + def test_payload_has_no_plaintext(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.return_value = _mock_response(200) + pushover._send_pushover("tok", "usr", "Crack complete", "alice cracked") + (url,), kwargs = mock_requests.post.call_args + assert url == pushover.PUSHOVER_URL + data = kwargs["data"] + assert set(data.keys()) == {"token", "user", "title", "message"} + assert data["token"] == "tok" + assert data["user"] == "usr" + # The whole payload is just title + message + creds. No 'password', + # 'hash', 'plaintext' or similar keys ever appear. + assert "password" not in data + assert "plaintext" not in data + assert "hash" not in data + + def test_timeout_is_passed(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.return_value = _mock_response(200) + pushover._send_pushover("tok", "usr", "t", "m") + _, kwargs = mock_requests.post.call_args + assert kwargs["timeout"] == 10 + + +class TestSendPushoverFailureModes: + def test_missing_token_returns_false_without_calling_requests(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + ok = pushover._send_pushover("", "usr", "t", "m") + assert ok is False + mock_requests.post.assert_not_called() + + def test_missing_user_returns_false(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + ok = pushover._send_pushover("tok", "", "t", "m") + assert ok is False + mock_requests.post.assert_not_called() + + def test_network_error_returns_false(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.side_effect = ConnectionError("refused") + ok = pushover._send_pushover("tok", "usr", "t", "m") + assert ok is False + + def test_generic_exception_returns_false(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.side_effect = RuntimeError("boom") + ok = pushover._send_pushover("tok", "usr", "t", "m") + assert ok is False + + def test_non_200_response_returns_false(self) -> None: + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.return_value = _mock_response(500) + ok = pushover._send_pushover("tok", "usr", "t", "m") + assert ok is False + + def test_missing_requests_returns_false(self) -> None: + with patch.object(pushover, "requests", None): + ok = pushover._send_pushover("tok", "usr", "t", "m") + assert ok is False + + def test_never_raises(self) -> None: + # Even if requests.post returns something weird (no status_code), + # the function must not raise. + bad = MagicMock() + # Accessing .status_code returns a non-int Mock — it will not equal 200. + with patch.object(pushover, "requests") as mock_requests: + mock_requests.post.return_value = bad + ok = pushover._send_pushover("tok", "usr", "t", "m") + assert ok is False diff --git a/tests/test_notify_settings.py b/tests/test_notify_settings.py new file mode 100644 index 0000000..8f5dc7b --- /dev/null +++ b/tests/test_notify_settings.py @@ -0,0 +1,159 @@ +"""Unit tests for hate_crack.notify.settings.""" +import json +from pathlib import Path + +from hate_crack.notify.settings import ( + NotifySettings, + add_to_allowlist, + load_settings, + save_enabled, +) + + +class TestNotifySettingsDataclass: + def test_defaults(self) -> None: + s = NotifySettings() + assert s.enabled is False + assert s.pushover_token == "" + assert s.pushover_user == "" + assert s.per_crack_enabled is False + assert s.attack_allowlist == [] + assert s.suppress_in_orchestrators is True + assert s.max_cracks_per_burst == 5 + assert s.poll_interval_seconds == 5.0 + + def test_allowlist_default_is_fresh_per_instance(self) -> None: + # field(default_factory=list) must not share state. + a = NotifySettings() + b = NotifySettings() + a.attack_allowlist.append("Brute Force") + assert b.attack_allowlist == [] + + +class TestLoadSettings: + def test_load_from_empty_dict_returns_defaults(self) -> None: + s = load_settings({}) + assert s == NotifySettings() + + def test_load_from_none_returns_defaults(self) -> None: + assert load_settings(None) == NotifySettings() + + def test_load_full_dict(self) -> None: + s = load_settings({ + "notify_enabled": True, + "notify_pushover_token": "tok", + "notify_pushover_user": "usr", + "notify_per_crack_enabled": True, + "notify_attack_allowlist": ["Brute Force", "Dictionary"], + "notify_suppress_in_orchestrators": False, + "notify_max_cracks_per_burst": 20, + "notify_poll_interval_seconds": 2.5, + }) + assert s.enabled is True + assert s.pushover_token == "tok" + assert s.pushover_user == "usr" + assert s.per_crack_enabled is True + assert s.attack_allowlist == ["Brute Force", "Dictionary"] + assert s.suppress_in_orchestrators is False + assert s.max_cracks_per_burst == 20 + assert s.poll_interval_seconds == 2.5 + + def test_load_tolerates_bad_types(self) -> None: + s = load_settings({ + "notify_enabled": "true", + "notify_max_cracks_per_burst": "not-a-number", + "notify_poll_interval_seconds": "also-bad", + "notify_attack_allowlist": "not-a-list", + }) + # string "true" -> True + assert s.enabled is True + # bad ints fall back to defaults (5, 5.0) + assert s.max_cracks_per_burst == 5 + assert s.poll_interval_seconds == 5.0 + # non-list allowlist becomes empty list + assert s.attack_allowlist == [] + + +class TestSaveEnabled: + def test_writes_new_config(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + save_enabled(str(config_path), True) + data = json.loads(config_path.read_text()) + assert data["notify_enabled"] is True + + def test_preserves_existing_keys(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + initial = { + "hcatBin": "hashcat", + "hashview_api_key": "secret", + "notify_enabled": False, + } + config_path.write_text(json.dumps(initial)) + save_enabled(str(config_path), True) + data = json.loads(config_path.read_text()) + assert data["hcatBin"] == "hashcat" + assert data["hashview_api_key"] == "secret" + assert data["notify_enabled"] is True + + def test_toggles_back_and_forth(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + save_enabled(str(config_path), True) + save_enabled(str(config_path), False) + data = json.loads(config_path.read_text()) + assert data["notify_enabled"] is False + + def test_invalid_existing_config_replaced(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text("this is not json") + save_enabled(str(config_path), True) + data = json.loads(config_path.read_text()) + assert data == {"notify_enabled": True} + + def test_atomic_no_half_write(self, tmp_path: Path) -> None: + # A partial write should never leave the main file invalid. We + # check that after save_enabled, parsing always succeeds. + config_path = tmp_path / "config.json" + for flag in (True, False, True, False, True): + save_enabled(str(config_path), flag) + json.loads(config_path.read_text()) # must not raise + + +class TestAddToAllowlist: + def test_adds_to_empty_list(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + add_to_allowlist(str(config_path), "Brute Force") + data = json.loads(config_path.read_text()) + assert data["notify_attack_allowlist"] == ["Brute Force"] + + def test_idempotent(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + add_to_allowlist(str(config_path), "Brute Force") + add_to_allowlist(str(config_path), "Brute Force") + add_to_allowlist(str(config_path), "Brute Force") + data = json.loads(config_path.read_text()) + assert data["notify_attack_allowlist"] == ["Brute Force"] + + def test_preserves_other_entries(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps({ + "hcatBin": "hashcat", + "notify_attack_allowlist": ["Existing"], + })) + add_to_allowlist(str(config_path), "Brute Force") + data = json.loads(config_path.read_text()) + assert data["hcatBin"] == "hashcat" + assert data["notify_attack_allowlist"] == ["Existing", "Brute Force"] + + def test_empty_attack_name_is_noop(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps({"notify_attack_allowlist": ["A"]})) + add_to_allowlist(str(config_path), "") + data = json.loads(config_path.read_text()) + assert data["notify_attack_allowlist"] == ["A"] + + def test_repairs_non_list_allowlist(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps({"notify_attack_allowlist": "bogus"})) + add_to_allowlist(str(config_path), "Brute Force") + data = json.loads(config_path.read_text()) + assert data["notify_attack_allowlist"] == ["Brute Force"] diff --git a/tests/test_notify_suppression.py b/tests/test_notify_suppression.py new file mode 100644 index 0000000..c57557f --- /dev/null +++ b/tests/test_notify_suppression.py @@ -0,0 +1,74 @@ +"""Unit tests for hate_crack.notify._suppress.""" +import threading +import time + +from hate_crack.notify._suppress import is_suppressed, suppressed_notifications + + +class TestSuppressionContextManager: + def test_default_is_not_suppressed(self) -> None: + assert is_suppressed() is False + + def test_inside_context_is_suppressed(self) -> None: + assert is_suppressed() is False + with suppressed_notifications(): + assert is_suppressed() is True + assert is_suppressed() is False + + def test_nested_restores_outer_state(self) -> None: + with suppressed_notifications(): + assert is_suppressed() is True + with suppressed_notifications(): + assert is_suppressed() is True + # Leaving inner context must still leave us suppressed. + assert is_suppressed() is True + assert is_suppressed() is False + + def test_exception_restores_state(self) -> None: + try: + with suppressed_notifications(): + raise RuntimeError("boom") + except RuntimeError: + pass + assert is_suppressed() is False + + +class TestSuppressionThreadLocal: + def test_other_thread_not_suppressed_by_us(self) -> None: + seen: list[bool] = [] + ready = threading.Event() + done = threading.Event() + + def worker() -> None: + # Wait until main thread has entered its suppression context, + # then sample our own state. + ready.wait(timeout=2.0) + seen.append(is_suppressed()) + done.set() + + t = threading.Thread(target=worker, daemon=True) + t.start() + with suppressed_notifications(): + assert is_suppressed() is True + ready.set() + done.wait(timeout=2.0) + t.join(timeout=2.0) + assert seen == [False], f"worker thread saw suppression state: {seen}" + + def test_worker_thread_suppression_does_not_leak(self) -> None: + worker_done = threading.Event() + + def worker() -> None: + with suppressed_notifications(): + assert is_suppressed() is True + # Hold long enough that the main thread can sample. + time.sleep(0.05) + worker_done.set() + + t = threading.Thread(target=worker, daemon=True) + t.start() + # Main thread should never observe the worker's suppression. + assert is_suppressed() is False + t.join(timeout=2.0) + assert worker_done.is_set() + assert is_suppressed() is False diff --git a/tests/test_notify_tailer.py b/tests/test_notify_tailer.py new file mode 100644 index 0000000..7404ed2 --- /dev/null +++ b/tests/test_notify_tailer.py @@ -0,0 +1,219 @@ +"""Unit tests for the CrackTailer polling thread and username extractor.""" +import time +from pathlib import Path +from unittest.mock import MagicMock + +from hate_crack.notify.settings import NotifySettings +from hate_crack.notify.tailer import CrackTailer, extract_username_from_out_line + + +def _settings(**overrides) -> NotifySettings: + defaults = { + "enabled": True, + "per_crack_enabled": True, + "max_cracks_per_burst": 5, + # Short poll so tests don't hang. + "poll_interval_seconds": 0.05, + } + defaults.update(overrides) + return NotifySettings(**defaults) + + +def _make_tailer(out_path: Path, **overrides): + notify = MagicMock(name="notify_callback") + aggregate = MagicMock(name="aggregate_callback") + settings = _settings(**overrides) + tailer = CrackTailer( + out_path=str(out_path), + attack_name="Brute Force", + settings=settings, + notify_callback=notify, + aggregate_callback=aggregate, + ) + return tailer, notify, aggregate + + +def _wait_until(predicate, timeout=2.0): + """Spin until predicate() or timeout; returns whether it passed.""" + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return True + time.sleep(0.02) + return predicate() + + +class TestExtractUsername: + def test_bare_hash_returns_none(self) -> None: + assert extract_username_from_out_line("5f4dcc3b5aa10b0be:secret") is None + + def test_user_hash_plain(self) -> None: + assert extract_username_from_out_line("alice:5f4dcc:secret") == "alice" + + def test_pwdump(self) -> None: + assert ( + extract_username_from_out_line("alice:1001:aad3b435:31d6cfe:::secret") + == "alice" + ) + + def test_netntlmv2(self) -> None: + line = "alice::DOMAIN:1122334455667788:resp:resp2:secret" + assert extract_username_from_out_line(line) == "alice" + + def test_empty_line(self) -> None: + assert extract_username_from_out_line("") is None + + def test_whitespace_only(self) -> None: + assert extract_username_from_out_line("\n") is None + + def test_line_with_trailing_newline(self) -> None: + assert extract_username_from_out_line("alice:hash:plain\n") == "alice" + + def test_does_not_leak_plaintext(self) -> None: + # No matter the format, the return value must never equal the + # plaintext password. Paranoid test; relies on us knowing where + # the plaintext sits in each format. + for line, plain in [ + ("bob:1000:aa:bb:::s3cret", "s3cret"), + ("bob:hash:s3cret", "s3cret"), + ("bob::DOM:chal:resp:resp2:s3cret", "s3cret"), + ]: + assert extract_username_from_out_line(line) != plain + + +class TestCrackTailerStart: + def test_daemon_is_true(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, _, _ = _make_tailer(out) + assert tailer.daemon is True + + def test_seeks_to_eof_on_start(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("alice:hash:plain\nbob:hash:plain\n") + tailer, notify, aggregate = _make_tailer(out) + tailer.start() + try: + # Allow at least one poll interval; no new lines were added, so + # no notifications should fire. + time.sleep(0.2) + assert notify.call_count == 0 + assert aggregate.call_count == 0 + finally: + tailer.stop() + + def test_new_lines_fire_notify(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, notify, aggregate = _make_tailer(out) + tailer.start() + try: + with open(out, "a") as f: + f.write("alice:hash:plain\n") + f.write("bob:hash:plain\n") + assert _wait_until(lambda: notify.call_count >= 2) + assert aggregate.call_count == 0 + labels = [call.args[0] for call in notify.call_args_list] + assert "alice" in labels + assert "bob" in labels + finally: + tailer.stop() + + def test_no_username_falls_back_to_attack_name(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, notify, aggregate = _make_tailer(out) + tailer.start() + try: + with open(out, "a") as f: + # Bare-hash format -> extractor returns None -> fallback. + f.write("5f4dcc3b5:plain\n") + assert _wait_until(lambda: notify.call_count >= 1) + assert notify.call_args.args[0] == "Brute Force" + finally: + tailer.stop() + + +class TestCrackTailerBurstCap: + def test_burst_cap_fires_aggregate(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, notify, aggregate = _make_tailer(out, max_cracks_per_burst=3) + tailer.start() + try: + # Write 10 lines in one shot; a single poll tick must see them + # all and collapse into one aggregate call. + with open(out, "a") as f: + for i in range(10): + f.write(f"user{i}:hash:plain\n") + assert _wait_until(lambda: aggregate.call_count >= 1) + # Per-crack path must NOT have fired for this burst. + assert notify.call_count == 0 + args = aggregate.call_args.args + assert args[0] == 10 + assert args[1] == "Brute Force" + finally: + tailer.stop() + + def test_under_cap_fires_per_crack(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, notify, aggregate = _make_tailer(out, max_cracks_per_burst=10) + tailer.start() + try: + with open(out, "a") as f: + for i in range(3): + f.write(f"user{i}:hash:plain\n") + assert _wait_until(lambda: notify.call_count >= 3) + assert aggregate.call_count == 0 + finally: + tailer.stop() + + +class TestCrackTailerStop: + def test_stop_joins_within_timeout(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, _, _ = _make_tailer(out) + tailer.start() + start = time.time() + tailer.stop() + elapsed = time.time() - start + assert not tailer.is_alive() + assert elapsed < 5.0 + + def test_stop_is_safe_without_start(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("") + tailer, _, _ = _make_tailer(out) + # Calling stop on a never-started thread should not raise. + tailer.stop() + + +class TestCrackTailerFileHandling: + def test_missing_file_then_appearing(self, tmp_path: Path) -> None: + out = tmp_path / "missing.out" + tailer, notify, _ = _make_tailer(out) + tailer.start() + try: + time.sleep(0.15) + out.write_text("alice:hash:plain\n") + assert _wait_until(lambda: notify.call_count >= 1) + finally: + tailer.stop() + + def test_truncation_resets_position(self, tmp_path: Path) -> None: + out = tmp_path / "hashes.out" + out.write_text("alice:hash:plain\nbob:hash:plain\n") + tailer, notify, _ = _make_tailer(out) + tailer.start() + try: + time.sleep(0.15) + # Truncate the file and write a fresh line; tailer should reset + # its file position and see the new line. + out.write_text("charlie:hash:plain\n") + assert _wait_until(lambda: notify.call_count >= 1) + labels = [call.args[0] for call in notify.call_args_list] + assert "charlie" in labels + finally: + tailer.stop()