Files
hate_crack/tests/test_notify_suppression.py
Justin Bollinger f9926c0b41 feat(notify): add notification package with Pushover backend
Introduce hate_crack.notify package with a small functional public API
and a CrackTailer thread for polling hashcat output files. Package
layout keeps the HTTP call (_send_pushover) isolated so future backends
(Slack, generic webhooks) can be added as a sibling function rather
than a framework rewrite.

Core pieces:
- settings.py: NotifySettings dataclass plus atomic config persistence
  (save_enabled, add_to_allowlist) via read-modify-write + os.replace.
- pushover.py: single _send_pushover() that never raises; network
  errors, missing requests, and missing creds all funnel to False.
- _suppress.py: thread-local suppression context manager so
  orchestrator attacks can chain primitives without flooding
  notifications.
- tailer.py: CrackTailer(threading.Thread) that seeks to EOF on start,
  polls at a user-configurable interval, and collapses per-tick bursts
  into a single aggregate notification when they exceed the cap.
- __init__.py: public API (init, prompt_notify_for_attack,
  notify_job_done, notify_crack, start_tailer, stop_tailer,
  toggle_enabled, suppressed_notifications). Privacy guarantee:
  notification payloads contain only attack name, counts, and hash
  path, never plaintexts.

72 new tests cover dataclass defaults, atomic config writes, idempotent
allowlist updates, HTTP payload privacy, suppression nesting and
thread-locality, tailer EOF seek, burst cap, truncation recovery, and
the per-attack prompt's [y/n/always] flow.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 14:41:38 -04:00

75 lines
2.5 KiB
Python

"""Unit tests for hate_crack.notify._suppress."""
import threading
import time
from hate_crack.notify._suppress import is_suppressed, suppressed_notifications
class TestSuppressionContextManager:
def test_default_is_not_suppressed(self) -> None:
assert is_suppressed() is False
def test_inside_context_is_suppressed(self) -> None:
assert is_suppressed() is False
with suppressed_notifications():
assert is_suppressed() is True
assert is_suppressed() is False
def test_nested_restores_outer_state(self) -> None:
with suppressed_notifications():
assert is_suppressed() is True
with suppressed_notifications():
assert is_suppressed() is True
# Leaving inner context must still leave us suppressed.
assert is_suppressed() is True
assert is_suppressed() is False
def test_exception_restores_state(self) -> None:
try:
with suppressed_notifications():
raise RuntimeError("boom")
except RuntimeError:
pass
assert is_suppressed() is False
class TestSuppressionThreadLocal:
def test_other_thread_not_suppressed_by_us(self) -> None:
seen: list[bool] = []
ready = threading.Event()
done = threading.Event()
def worker() -> None:
# Wait until main thread has entered its suppression context,
# then sample our own state.
ready.wait(timeout=2.0)
seen.append(is_suppressed())
done.set()
t = threading.Thread(target=worker, daemon=True)
t.start()
with suppressed_notifications():
assert is_suppressed() is True
ready.set()
done.wait(timeout=2.0)
t.join(timeout=2.0)
assert seen == [False], f"worker thread saw suppression state: {seen}"
def test_worker_thread_suppression_does_not_leak(self) -> None:
worker_done = threading.Event()
def worker() -> None:
with suppressed_notifications():
assert is_suppressed() is True
# Hold long enough that the main thread can sample.
time.sleep(0.05)
worker_done.set()
t = threading.Thread(target=worker, daemon=True)
t.start()
# Main thread should never observe the worker's suppression.
assert is_suppressed() is False
t.join(timeout=2.0)
assert worker_done.is_set()
assert is_suppressed() is False