mirror of
https://github.com/trustedsec/hate_crack.git
synced 2026-04-28 12:03:11 -07:00
Introduce hate_crack.notify package with a small functional public API and a CrackTailer thread for polling hashcat output files. Package layout keeps the HTTP call (_send_pushover) isolated so future backends (Slack, generic webhooks) can be added as a sibling function rather than a framework rewrite. Core pieces: - settings.py: NotifySettings dataclass plus atomic config persistence (save_enabled, add_to_allowlist) via read-modify-write + os.replace. - pushover.py: single _send_pushover() that never raises; network errors, missing requests, and missing creds all funnel to False. - _suppress.py: thread-local suppression context manager so orchestrator attacks can chain primitives without flooding notifications. - tailer.py: CrackTailer(threading.Thread) that seeks to EOF on start, polls at a user-configurable interval, and collapses per-tick bursts into a single aggregate notification when they exceed the cap. - __init__.py: public API (init, prompt_notify_for_attack, notify_job_done, notify_crack, start_tailer, stop_tailer, toggle_enabled, suppressed_notifications). Privacy guarantee: notification payloads contain only attack name, counts, and hash path, never plaintexts. 72 new tests cover dataclass defaults, atomic config writes, idempotent allowlist updates, HTTP payload privacy, suppression nesting and thread-locality, tailer EOF seek, burst cap, truncation recovery, and the per-attack prompt's [y/n/always] flow. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
75 lines
2.5 KiB
Python
75 lines
2.5 KiB
Python
"""Unit tests for hate_crack.notify._suppress."""
|
|
import threading
|
|
import time
|
|
|
|
from hate_crack.notify._suppress import is_suppressed, suppressed_notifications
|
|
|
|
|
|
class TestSuppressionContextManager:
|
|
def test_default_is_not_suppressed(self) -> None:
|
|
assert is_suppressed() is False
|
|
|
|
def test_inside_context_is_suppressed(self) -> None:
|
|
assert is_suppressed() is False
|
|
with suppressed_notifications():
|
|
assert is_suppressed() is True
|
|
assert is_suppressed() is False
|
|
|
|
def test_nested_restores_outer_state(self) -> None:
|
|
with suppressed_notifications():
|
|
assert is_suppressed() is True
|
|
with suppressed_notifications():
|
|
assert is_suppressed() is True
|
|
# Leaving inner context must still leave us suppressed.
|
|
assert is_suppressed() is True
|
|
assert is_suppressed() is False
|
|
|
|
def test_exception_restores_state(self) -> None:
|
|
try:
|
|
with suppressed_notifications():
|
|
raise RuntimeError("boom")
|
|
except RuntimeError:
|
|
pass
|
|
assert is_suppressed() is False
|
|
|
|
|
|
class TestSuppressionThreadLocal:
|
|
def test_other_thread_not_suppressed_by_us(self) -> None:
|
|
seen: list[bool] = []
|
|
ready = threading.Event()
|
|
done = threading.Event()
|
|
|
|
def worker() -> None:
|
|
# Wait until main thread has entered its suppression context,
|
|
# then sample our own state.
|
|
ready.wait(timeout=2.0)
|
|
seen.append(is_suppressed())
|
|
done.set()
|
|
|
|
t = threading.Thread(target=worker, daemon=True)
|
|
t.start()
|
|
with suppressed_notifications():
|
|
assert is_suppressed() is True
|
|
ready.set()
|
|
done.wait(timeout=2.0)
|
|
t.join(timeout=2.0)
|
|
assert seen == [False], f"worker thread saw suppression state: {seen}"
|
|
|
|
def test_worker_thread_suppression_does_not_leak(self) -> None:
|
|
worker_done = threading.Event()
|
|
|
|
def worker() -> None:
|
|
with suppressed_notifications():
|
|
assert is_suppressed() is True
|
|
# Hold long enough that the main thread can sample.
|
|
time.sleep(0.05)
|
|
worker_done.set()
|
|
|
|
t = threading.Thread(target=worker, daemon=True)
|
|
t.start()
|
|
# Main thread should never observe the worker's suppression.
|
|
assert is_suppressed() is False
|
|
t.join(timeout=2.0)
|
|
assert worker_done.is_set()
|
|
assert is_suppressed() is False
|