mirror of
https://github.com/trustedsec/hate_crack.git
synced 2026-03-12 21:23:05 -07:00
The test built its own mock context instead of using the shared ollama_globals helper, missing the rulesDirectory and hcatPotfilePath patches. This caused FileNotFoundError on CI where /path/to/hashcat/rules doesn't exist. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
685 lines
27 KiB
Python
685 lines
27 KiB
Python
"""Unit tests for _pull_ollama_model helper, hcatOllama steps A-C, and ollama_attack handler."""
|
|
|
|
import io
|
|
import json
|
|
import os
|
|
import urllib.error
|
|
import urllib.request
|
|
from contextlib import contextmanager
|
|
from types import SimpleNamespace
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
|
|
os.environ["HATE_CRACK_SKIP_INIT"] = "1"
|
|
from hate_crack import main as hc_main # noqa: E402
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared test infrastructure
|
|
# ---------------------------------------------------------------------------
|
|
|
|
OLLAMA_URL = "http://localhost:11434"
|
|
MODEL = "llama3.2"
|
|
|
|
|
|
@pytest.fixture
|
|
def ollama_env(tmp_path):
|
|
"""Create the filesystem layout hcatOllama expects."""
|
|
hash_file = tmp_path / "hashes.txt"
|
|
hash_file.touch()
|
|
|
|
wordlist = tmp_path / "sample.txt"
|
|
wordlist.write_text("password\n123456\nletmein\n")
|
|
|
|
return SimpleNamespace(
|
|
tmp_path=tmp_path,
|
|
hash_file=str(hash_file),
|
|
wordlist=str(wordlist),
|
|
)
|
|
|
|
|
|
@contextmanager
|
|
def ollama_globals(tmp_path, tuning="", potfile=""):
|
|
"""Patch the hc_main globals that hcatOllama reads."""
|
|
rules_dir = str(tmp_path / "rules")
|
|
os.makedirs(rules_dir, exist_ok=True)
|
|
with mock.patch.object(hc_main, "ollamaUrl", OLLAMA_URL), \
|
|
mock.patch.object(hc_main, "ollamaModel", MODEL), \
|
|
mock.patch.object(hc_main, "hcatBin", "/usr/bin/hashcat"), \
|
|
mock.patch.object(hc_main, "hcatTuning", tuning), \
|
|
mock.patch.object(hc_main, "hcatPotfilePath", potfile), \
|
|
mock.patch.object(hc_main, "hate_path", str(tmp_path)), \
|
|
mock.patch.object(hc_main, "rulesDirectory", rules_dir), \
|
|
mock.patch("hate_crack.main.generate_session_id", return_value="test_session"):
|
|
yield
|
|
|
|
|
|
def _make_proc(wait_return=0):
|
|
"""Create a mock subprocess that works with both wait() and communicate()."""
|
|
proc = mock.MagicMock()
|
|
proc.wait.return_value = wait_return
|
|
proc.communicate.return_value = (b"", b"")
|
|
proc.returncode = wait_return
|
|
return proc
|
|
|
|
|
|
def _generate_response(passwords):
|
|
"""Build a fake urlopen context-manager that returns an Ollama /api/generate JSON body."""
|
|
body = json.dumps({"response": "\n".join(passwords)}).encode()
|
|
resp = mock.MagicMock()
|
|
resp.__enter__ = mock.Mock(return_value=io.BytesIO(body))
|
|
resp.__exit__ = mock.Mock(return_value=False)
|
|
return resp
|
|
|
|
|
|
def _urlopen_with_response(passwords):
|
|
"""Return a urlopen mock that always succeeds with the given passwords."""
|
|
return mock.patch(
|
|
"hate_crack.main.urllib.request.urlopen",
|
|
return_value=_generate_response(passwords),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _pull_ollama_model tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPullOllamaModel:
|
|
"""Tests for _pull_ollama_model()."""
|
|
|
|
OLLAMA_URL = "http://localhost:11434"
|
|
MODEL = "llama3.2"
|
|
|
|
def _make_stream_response(self, statuses):
|
|
"""Build a fake streaming response (newline-delimited JSON)."""
|
|
lines = [json.dumps({"status": s}).encode() + b"\n" for s in statuses]
|
|
return io.BytesIO(b"".join(lines))
|
|
|
|
@mock.patch("hate_crack.main.urllib.request.urlopen")
|
|
def test_successful_pull(self, mock_urlopen, capsys):
|
|
mock_urlopen.return_value.__enter__ = mock.Mock(
|
|
return_value=self._make_stream_response(
|
|
["pulling manifest", "downloading sha256:abc123", "success"]
|
|
)
|
|
)
|
|
mock_urlopen.return_value.__exit__ = mock.Mock(return_value=False)
|
|
|
|
result = hc_main._pull_ollama_model(self.OLLAMA_URL, self.MODEL)
|
|
|
|
assert result is True
|
|
captured = capsys.readouterr()
|
|
assert "not found locally" in captured.out
|
|
assert "Successfully pulled" in captured.out
|
|
assert "pulling manifest" in captured.out
|
|
|
|
@mock.patch("hate_crack.main.urllib.request.urlopen")
|
|
def test_pull_http_error(self, mock_urlopen, capsys):
|
|
mock_urlopen.side_effect = urllib.error.HTTPError(
|
|
url="http://localhost:11434/api/pull",
|
|
code=500,
|
|
msg="Internal Server Error",
|
|
hdrs=None,
|
|
fp=None,
|
|
)
|
|
|
|
result = hc_main._pull_ollama_model(self.OLLAMA_URL, self.MODEL)
|
|
|
|
assert result is False
|
|
captured = capsys.readouterr()
|
|
assert "HTTP 500" in captured.out
|
|
|
|
@mock.patch("hate_crack.main.urllib.request.urlopen")
|
|
def test_pull_url_error(self, mock_urlopen, capsys):
|
|
mock_urlopen.side_effect = urllib.error.URLError("Connection refused")
|
|
|
|
result = hc_main._pull_ollama_model(self.OLLAMA_URL, self.MODEL)
|
|
|
|
assert result is False
|
|
captured = capsys.readouterr()
|
|
assert "Could not connect" in captured.out
|
|
|
|
@mock.patch("hate_crack.main.urllib.request.urlopen")
|
|
def test_pull_generic_exception(self, mock_urlopen, capsys):
|
|
mock_urlopen.side_effect = RuntimeError("unexpected")
|
|
|
|
result = hc_main._pull_ollama_model(self.OLLAMA_URL, self.MODEL)
|
|
|
|
assert result is False
|
|
captured = capsys.readouterr()
|
|
assert "unexpected" in captured.out
|
|
|
|
@mock.patch("hate_crack.main.urllib.request.urlopen")
|
|
def test_pull_handles_empty_status_lines(self, mock_urlopen, capsys):
|
|
"""Blank lines and missing status keys should not crash."""
|
|
lines = b'{"status": "downloading"}\n\n{"other": "data"}\n'
|
|
mock_urlopen.return_value.__enter__ = mock.Mock(
|
|
return_value=io.BytesIO(lines)
|
|
)
|
|
mock_urlopen.return_value.__exit__ = mock.Mock(return_value=False)
|
|
|
|
result = hc_main._pull_ollama_model(self.OLLAMA_URL, self.MODEL)
|
|
|
|
assert result is True
|
|
|
|
@mock.patch("hate_crack.main.urllib.request.urlopen")
|
|
def test_pull_request_payload(self, mock_urlopen):
|
|
"""Verify the pull request sends the correct model name and stream=True."""
|
|
mock_urlopen.return_value.__enter__ = mock.Mock(
|
|
return_value=self._make_stream_response(["success"])
|
|
)
|
|
mock_urlopen.return_value.__exit__ = mock.Mock(return_value=False)
|
|
|
|
hc_main._pull_ollama_model(self.OLLAMA_URL, self.MODEL)
|
|
|
|
call_args = mock_urlopen.call_args
|
|
req = call_args[0][0]
|
|
body = json.loads(req.data.decode("utf-8"))
|
|
assert body["name"] == self.MODEL
|
|
assert body["stream"] is True
|
|
assert req.full_url == f"{self.OLLAMA_URL}/api/pull"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# hcatOllama 404-retry integration tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHcatOllama404Retry:
|
|
"""Test that hcatOllama auto-pulls on 404 and retries the generate call."""
|
|
|
|
OLLAMA_URL = "http://localhost:11434"
|
|
MODEL = "llama3.2"
|
|
|
|
def _generate_response(self, passwords):
|
|
body = json.dumps({"response": "\n".join(passwords)}).encode()
|
|
return io.BytesIO(body)
|
|
|
|
def _setup_env(self, tmp_path):
|
|
"""Create the files/dirs hcatOllama needs before it hits the API."""
|
|
hash_file = str(tmp_path / "hashes.txt")
|
|
open(hash_file, "w").close()
|
|
|
|
# Create a small wordlist for "wordlist" mode
|
|
wordlist = str(tmp_path / "sample.txt")
|
|
with open(wordlist, "w") as f:
|
|
f.write("password\n123456\nletmein\n")
|
|
|
|
return hash_file, wordlist
|
|
|
|
@mock.patch("hate_crack.main._pull_ollama_model")
|
|
@mock.patch("hate_crack.main.urllib.request.urlopen")
|
|
def test_404_triggers_pull_then_retries(self, mock_urlopen, mock_pull, capsys, tmp_path):
|
|
"""A 404 on generate should trigger a pull, then retry successfully."""
|
|
hash_file, wordlist = self._setup_env(tmp_path)
|
|
|
|
# First call: 404, second call: success
|
|
generate_ok = mock.MagicMock()
|
|
generate_ok.__enter__ = mock.Mock(
|
|
return_value=self._generate_response(["Password1", "Summer2024"])
|
|
)
|
|
generate_ok.__exit__ = mock.Mock(return_value=False)
|
|
|
|
mock_urlopen.side_effect = [
|
|
urllib.error.HTTPError(
|
|
url=f"{self.OLLAMA_URL}/api/generate",
|
|
code=404,
|
|
msg="Not Found",
|
|
hdrs=None,
|
|
fp=None,
|
|
),
|
|
generate_ok,
|
|
]
|
|
mock_pull.return_value = True
|
|
|
|
rules_dir = str(tmp_path / "rules")
|
|
os.makedirs(rules_dir, exist_ok=True)
|
|
with mock.patch.object(hc_main, "ollamaUrl", self.OLLAMA_URL), \
|
|
mock.patch.object(hc_main, "ollamaModel", self.MODEL), \
|
|
mock.patch.object(hc_main, "hcatBin", "/usr/bin/hashcat"), \
|
|
mock.patch.object(hc_main, "hcatTuning", ""), \
|
|
mock.patch.object(hc_main, "hcatPotfilePath", ""), \
|
|
mock.patch.object(hc_main, "hate_path", str(tmp_path)), \
|
|
mock.patch.object(hc_main, "rulesDirectory", rules_dir), \
|
|
mock.patch("hate_crack.main.generate_session_id", return_value="test_session"), \
|
|
mock.patch("subprocess.Popen") as mock_popen:
|
|
|
|
proc = _make_proc()
|
|
mock_popen.return_value = proc
|
|
|
|
hc_main.hcatOllama("0", hash_file, "wordlist", wordlist)
|
|
|
|
mock_pull.assert_called_once_with(self.OLLAMA_URL, self.MODEL)
|
|
# urlopen called twice: first 404, then retry
|
|
assert mock_urlopen.call_count == 2
|
|
|
|
@mock.patch("hate_crack.main._pull_ollama_model")
|
|
@mock.patch("hate_crack.main.urllib.request.urlopen")
|
|
def test_404_pull_fails_aborts(self, mock_urlopen, mock_pull, capsys, tmp_path):
|
|
"""If pull fails after 404, hcatOllama should abort gracefully."""
|
|
hash_file, wordlist = self._setup_env(tmp_path)
|
|
|
|
mock_urlopen.side_effect = urllib.error.HTTPError(
|
|
url=f"{self.OLLAMA_URL}/api/generate",
|
|
code=404,
|
|
msg="Not Found",
|
|
hdrs=None,
|
|
fp=None,
|
|
)
|
|
mock_pull.return_value = False
|
|
|
|
with mock.patch.object(hc_main, "ollamaUrl", self.OLLAMA_URL), \
|
|
mock.patch.object(hc_main, "ollamaModel", self.MODEL), \
|
|
mock.patch.object(hc_main, "hate_path", str(tmp_path)):
|
|
|
|
hc_main.hcatOllama("0", hash_file, "wordlist", wordlist)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Could not pull model" in captured.out
|
|
mock_pull.assert_called_once()
|
|
|
|
@mock.patch("hate_crack.main.urllib.request.urlopen")
|
|
def test_non_404_http_error_propagates(self, mock_urlopen, capsys, tmp_path):
|
|
"""Non-404 HTTP errors should not trigger a pull attempt."""
|
|
hash_file, wordlist = self._setup_env(tmp_path)
|
|
|
|
mock_urlopen.side_effect = urllib.error.HTTPError(
|
|
url=f"{self.OLLAMA_URL}/api/generate",
|
|
code=500,
|
|
msg="Internal Server Error",
|
|
hdrs=None,
|
|
fp=None,
|
|
)
|
|
|
|
with mock.patch.object(hc_main, "ollamaUrl", self.OLLAMA_URL), \
|
|
mock.patch.object(hc_main, "ollamaModel", self.MODEL), \
|
|
mock.patch.object(hc_main, "hate_path", str(tmp_path)):
|
|
|
|
hc_main.hcatOllama("0", hash_file, "wordlist", wordlist)
|
|
|
|
captured = capsys.readouterr()
|
|
# HTTPError is a subclass of URLError, so it hits the URLError handler
|
|
assert "Could not connect to Ollama" in captured.out or "Error calling Ollama API" in captured.out
|
|
assert "500" in captured.out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step A: Mode routing, prompt construction, early-return error paths
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHcatOllamaModeRouting:
|
|
"""Test mode selection, prompt building, and early-return errors."""
|
|
|
|
def test_unknown_mode_prints_error(self, ollama_env, capsys):
|
|
"""Bad mode string → error message, no API call."""
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
mock.patch("hate_crack.main.urllib.request.urlopen") as mock_url:
|
|
hc_main.hcatOllama("0", ollama_env.hash_file, "bogus", "")
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Unknown LLM generation mode" in captured.out
|
|
mock_url.assert_not_called()
|
|
|
|
def test_missing_wordlist_prints_error(self, ollama_env, capsys):
|
|
"""Non-existent wordlist path → error, no API call."""
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
mock.patch("hate_crack.main.urllib.request.urlopen") as mock_url:
|
|
hc_main.hcatOllama(
|
|
"0", ollama_env.hash_file, "wordlist",
|
|
"/no/such/wordlist.txt",
|
|
)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Wordlist not found" in captured.out
|
|
mock_url.assert_not_called()
|
|
|
|
def test_wordlist_mode_reads_all_lines(self, ollama_env, capsys):
|
|
"""All non-blank lines from the wordlist should appear in the prompt."""
|
|
# Write 600 lines to the wordlist
|
|
big_wordlist = ollama_env.tmp_path / "big.txt"
|
|
big_wordlist.write_text("\n".join(f"pass{i}" for i in range(600)) + "\n")
|
|
|
|
captured_payload = {}
|
|
|
|
def fake_urlopen(req, **kwargs):
|
|
captured_payload["data"] = json.loads(req.data.decode())
|
|
return _generate_response(["Password1"])
|
|
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
mock.patch("hate_crack.main.urllib.request.urlopen", side_effect=fake_urlopen), \
|
|
mock.patch("subprocess.Popen") as mock_popen:
|
|
proc = _make_proc()
|
|
mock_popen.return_value = proc
|
|
hc_main.hcatOllama(
|
|
"0", ollama_env.hash_file, "wordlist", str(big_wordlist),
|
|
)
|
|
|
|
prompt_text = captured_payload["data"]["prompt"]
|
|
# All lines should be included in the prompt
|
|
assert "pass0" in prompt_text
|
|
assert "pass499" in prompt_text
|
|
assert "pass599" in prompt_text
|
|
|
|
def test_target_mode_includes_context_in_prompt(self, ollama_env, capsys):
|
|
"""Company, industry, and location should appear in the prompt."""
|
|
captured_payload = {}
|
|
|
|
def fake_urlopen(req, **kwargs):
|
|
captured_payload["data"] = json.loads(req.data.decode())
|
|
return _generate_response(["AcmeCorp2024"])
|
|
|
|
target_info = {
|
|
"company": "AcmeCorp",
|
|
"industry": "Finance",
|
|
"location": "New York",
|
|
}
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
mock.patch("hate_crack.main.urllib.request.urlopen", side_effect=fake_urlopen), \
|
|
mock.patch("subprocess.Popen") as mock_popen:
|
|
proc = _make_proc()
|
|
mock_popen.return_value = proc
|
|
hc_main.hcatOllama(
|
|
"0", ollama_env.hash_file, "target", target_info,
|
|
)
|
|
|
|
prompt_text = captured_payload["data"]["prompt"]
|
|
assert "AcmeCorp" in prompt_text
|
|
assert "Finance" in prompt_text
|
|
assert "New York" in prompt_text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step B: Candidate filtering / post-processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHcatOllamaCandidateFiltering:
|
|
"""Test regex stripping, blank-line removal, 128-char limit, empty-result handling."""
|
|
|
|
def _run_with_response(self, ollama_env, response_text):
|
|
"""Run hcatOllama with a canned Ollama response_text, return candidates file content."""
|
|
body = json.dumps({"response": response_text}).encode()
|
|
resp = mock.MagicMock()
|
|
resp.__enter__ = mock.Mock(return_value=io.BytesIO(body))
|
|
resp.__exit__ = mock.Mock(return_value=False)
|
|
|
|
candidates_path = f"{ollama_env.hash_file}.ollama_candidates"
|
|
captured = {}
|
|
|
|
def capture_popen(cmd, **kwargs):
|
|
# Read the candidates file before hashcat "runs" (cleanup happens after)
|
|
if os.path.isfile(candidates_path):
|
|
with open(candidates_path) as f:
|
|
captured["content"] = f.read()
|
|
return _make_proc()
|
|
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
mock.patch("hate_crack.main.urllib.request.urlopen", return_value=resp), \
|
|
mock.patch("subprocess.Popen", side_effect=capture_popen):
|
|
hc_main.hcatOllama(
|
|
"0", ollama_env.hash_file, "wordlist", ollama_env.wordlist,
|
|
)
|
|
|
|
return captured.get("content")
|
|
|
|
def test_strips_numeric_dot_prefix(self, ollama_env):
|
|
content = self._run_with_response(ollama_env, "1. Password1\n2. Summer2024")
|
|
assert content is not None
|
|
lines = [l for l in content.strip().split("\n") if l]
|
|
assert "Password1" in lines
|
|
assert "Summer2024" in lines
|
|
# No line should start with a digit+dot
|
|
for line in lines:
|
|
assert not line.startswith("1.")
|
|
assert not line.startswith("2.")
|
|
|
|
def test_strips_dash_and_asterisk_prefix(self, ollama_env):
|
|
content = self._run_with_response(ollama_env, "- foo\n* bar")
|
|
assert content is not None
|
|
lines = [l for l in content.strip().split("\n") if l]
|
|
assert "foo" in lines
|
|
assert "bar" in lines
|
|
|
|
def test_skips_blank_lines(self, ollama_env):
|
|
content = self._run_with_response(ollama_env, "alpha\n\n\nbeta\n\n")
|
|
assert content is not None
|
|
lines = [l for l in content.strip().split("\n") if l]
|
|
assert lines == ["alpha", "beta"]
|
|
|
|
def test_rejects_over_128_chars(self, ollama_env):
|
|
long_pw = "A" * 129
|
|
content = self._run_with_response(ollama_env, f"short\n{long_pw}\nkeep")
|
|
assert content is not None
|
|
lines = [l for l in content.strip().split("\n") if l]
|
|
assert "short" in lines
|
|
assert "keep" in lines
|
|
assert long_pw not in lines
|
|
|
|
def test_accepts_exactly_128_chars(self, ollama_env):
|
|
exact_pw = "B" * 128
|
|
content = self._run_with_response(ollama_env, f"{exact_pw}\nother")
|
|
assert content is not None
|
|
lines = [l for l in content.strip().split("\n") if l]
|
|
assert exact_pw in lines
|
|
|
|
def test_empty_response_prints_error(self, ollama_env, capsys):
|
|
self._run_with_response(ollama_env, "")
|
|
captured = capsys.readouterr()
|
|
assert "no usable" in captured.out.lower()
|
|
|
|
def test_missing_response_key(self, ollama_env, capsys):
|
|
"""If the JSON has no 'response' key, treat as empty."""
|
|
body = json.dumps({"other": "stuff"}).encode()
|
|
resp = mock.MagicMock()
|
|
resp.__enter__ = mock.Mock(return_value=io.BytesIO(body))
|
|
resp.__exit__ = mock.Mock(return_value=False)
|
|
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
mock.patch("hate_crack.main.urllib.request.urlopen", return_value=resp):
|
|
hc_main.hcatOllama(
|
|
"0", ollama_env.hash_file, "wordlist", ollama_env.wordlist,
|
|
)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "no usable" in captured.out.lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step B: API error paths (non-404)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHcatOllamaApiErrors:
|
|
"""Test connection errors and generic exceptions during the generate call."""
|
|
|
|
def test_url_error_prints_connection_error(self, ollama_env, capsys):
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
mock.patch(
|
|
"hate_crack.main.urllib.request.urlopen",
|
|
side_effect=urllib.error.URLError("Connection refused"),
|
|
):
|
|
hc_main.hcatOllama(
|
|
"0", ollama_env.hash_file, "wordlist", ollama_env.wordlist,
|
|
)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Could not connect" in captured.out
|
|
assert "Ensure Ollama is running" in captured.out
|
|
|
|
def test_generic_exception_prints_error(self, ollama_env, capsys):
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
mock.patch(
|
|
"hate_crack.main.urllib.request.urlopen",
|
|
side_effect=RuntimeError("boom"),
|
|
):
|
|
hc_main.hcatOllama(
|
|
"0", ollama_env.hash_file, "wordlist", ollama_env.wordlist,
|
|
)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Error calling Ollama API" in captured.out
|
|
|
|
def test_generate_request_payload(self, ollama_env):
|
|
"""Verify the /api/generate request has correct URL, model, stream=false."""
|
|
captured_req = {}
|
|
|
|
def fake_urlopen(req, **kwargs):
|
|
captured_req["url"] = req.full_url
|
|
captured_req["body"] = json.loads(req.data.decode())
|
|
return _generate_response(["Password1"])
|
|
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
mock.patch("hate_crack.main.urllib.request.urlopen", side_effect=fake_urlopen), \
|
|
mock.patch("subprocess.Popen") as mock_popen:
|
|
proc = _make_proc()
|
|
mock_popen.return_value = proc
|
|
hc_main.hcatOllama(
|
|
"0", ollama_env.hash_file, "wordlist", ollama_env.wordlist,
|
|
)
|
|
|
|
assert captured_req["url"] == f"{OLLAMA_URL}/api/generate"
|
|
assert captured_req["body"]["model"] == MODEL
|
|
assert captured_req["body"]["stream"] is False
|
|
assert len(captured_req["body"]["prompt"]) > 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step C: Hashcat command construction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHcatOllamaHashcatCommand:
|
|
"""Test hashcat command flags and process handling."""
|
|
|
|
def _run_full(self, ollama_env, tuning="", potfile=""):
|
|
"""Run hcatOllama through all steps, returning the list of Popen calls."""
|
|
popen_calls = []
|
|
|
|
def track_popen(cmd, **kwargs):
|
|
popen_calls.append((list(cmd), dict(kwargs)))
|
|
return _make_proc()
|
|
|
|
with ollama_globals(ollama_env.tmp_path, tuning=tuning, potfile=potfile), \
|
|
_urlopen_with_response(["Password1"]), \
|
|
mock.patch("subprocess.Popen", side_effect=track_popen), \
|
|
mock.patch("hate_crack.main.generate_session_id", return_value="test_session"):
|
|
hc_main.hcatOllama(
|
|
"1000", ollama_env.hash_file, "wordlist", ollama_env.wordlist,
|
|
)
|
|
|
|
return popen_calls
|
|
|
|
def test_hashcat_command_structure(self, ollama_env):
|
|
"""First Popen call should be a straight wordlist attack with candidates file,
|
|
followed by rule-based attacks."""
|
|
calls = self._run_full(ollama_env)
|
|
assert len(calls) >= 1, "Expected at least one Popen call (hashcat)"
|
|
|
|
# First call: base wordlist attack (no rules)
|
|
hashcat_cmd = calls[0][0]
|
|
assert hashcat_cmd[0] == "/usr/bin/hashcat"
|
|
assert "-m" in hashcat_cmd
|
|
m_idx = hashcat_cmd.index("-m")
|
|
assert hashcat_cmd[m_idx + 1] == "1000"
|
|
# Should use the candidates wordlist file
|
|
candidates_path = f"{ollama_env.hash_file}.ollama_candidates"
|
|
assert candidates_path in hashcat_cmd
|
|
# First call should NOT have mask attack flags
|
|
assert "-a" not in hashcat_cmd
|
|
assert "--markov-hcstat2" not in " ".join(hashcat_cmd)
|
|
assert "--increment" not in hashcat_cmd
|
|
# First call should NOT have -r (rule) flag
|
|
assert "-r" not in hashcat_cmd
|
|
|
|
# Subsequent calls: rule-based attacks
|
|
if len(calls) > 1:
|
|
for rule_cmd, _ in calls[1:]:
|
|
assert "-r" in rule_cmd
|
|
assert candidates_path in rule_cmd
|
|
|
|
def test_hashcat_includes_tuning(self, ollama_env):
|
|
"""-w 3 tuning flag should be appended to hashcat cmd."""
|
|
calls = self._run_full(ollama_env, tuning="-w 3")
|
|
hashcat_cmd = calls[0][0]
|
|
assert "-w" in hashcat_cmd
|
|
w_idx = hashcat_cmd.index("-w")
|
|
assert hashcat_cmd[w_idx + 1] == "3"
|
|
|
|
def test_hashcat_includes_potfile(self, ollama_env):
|
|
"""--potfile-path should be present when hcatPotfilePath is set."""
|
|
calls = self._run_full(ollama_env, potfile="/tmp/test.potfile")
|
|
hashcat_cmd = calls[0][0]
|
|
potfile_flags = [f for f in hashcat_cmd if "--potfile-path=" in f]
|
|
assert len(potfile_flags) == 1
|
|
assert potfile_flags[0] == "--potfile-path=/tmp/test.potfile"
|
|
|
|
def test_hashcat_keyboard_interrupt(self, ollama_env, capsys):
|
|
"""KeyboardInterrupt during hashcat should kill the process."""
|
|
proc = mock.MagicMock()
|
|
proc.pid = 99999
|
|
proc.wait.side_effect = KeyboardInterrupt()
|
|
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
_urlopen_with_response(["Password1"]), \
|
|
mock.patch("subprocess.Popen", return_value=proc), \
|
|
mock.patch("hate_crack.main.generate_session_id", return_value="test_session"):
|
|
hc_main.hcatOllama(
|
|
"0", ollama_env.hash_file, "wordlist", ollama_env.wordlist,
|
|
)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Killing PID" in captured.out
|
|
proc.kill.assert_called_once()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step F: Cleanup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHcatOllamaWordlistPersistence:
|
|
"""Test that the generated wordlist file persists after the run."""
|
|
|
|
def test_candidates_file_persists(self, ollama_env):
|
|
"""ollama_candidates wordlist should remain after the run."""
|
|
with ollama_globals(ollama_env.tmp_path), \
|
|
_urlopen_with_response(["Password1"]), \
|
|
mock.patch("subprocess.Popen", return_value=_make_proc()), \
|
|
mock.patch("hate_crack.main.generate_session_id", return_value="test_session"):
|
|
hc_main.hcatOllama(
|
|
"0", ollama_env.hash_file, "wordlist", ollama_env.wordlist,
|
|
)
|
|
|
|
candidates_path = f"{ollama_env.hash_file}.ollama_candidates"
|
|
assert os.path.isfile(candidates_path), "candidates wordlist should persist"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# attacks.py: ollama_attack() UI handler
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestOllamaAttackHandler:
|
|
"""Test the ollama_attack(ctx) menu handler in attacks.py."""
|
|
|
|
def _make_ctx(self, **overrides):
|
|
"""Build a mock ctx with the attributes ollama_attack() reads."""
|
|
ctx = mock.MagicMock()
|
|
ctx.hcatHashType = "0"
|
|
ctx.hcatHashFile = "/tmp/hashes.txt"
|
|
ctx.hcatWordlists = "/tmp/wordlists"
|
|
for k, v in overrides.items():
|
|
setattr(ctx, k, v)
|
|
return ctx
|
|
|
|
def test_target_mode(self):
|
|
"""ollama_attack prompts for company/industry/location and calls hcatOllama."""
|
|
from hate_crack.attacks import ollama_attack
|
|
|
|
ctx = self._make_ctx()
|
|
with mock.patch("builtins.input", side_effect=["AcmeCorp", "Finance", "NYC"]):
|
|
ollama_attack(ctx)
|
|
|
|
ctx.hcatOllama.assert_called_once()
|
|
call_args = ctx.hcatOllama.call_args
|
|
assert call_args[0][2] == "target"
|
|
target_info = call_args[0][3]
|
|
assert target_info["company"] == "AcmeCorp"
|
|
assert target_info["industry"] == "Finance"
|
|
assert target_info["location"] == "NYC"
|
|
|