mirror of
https://github.com/trustedsec/hate_crack.git
synced 2026-04-28 12:03:11 -07:00
Merge pull request #103 from trustedsec/feat/release-v2.4.0
feat: release v2.4.0 - wordlist filtering, parallel rule downloads, dynamic optimized kernels
This commit is contained in:
@@ -108,6 +108,17 @@ DEFAULT_OPTIMIZED_ATTACKS = frozenset(
|
||||
"hcatPathwellBruteForce",
|
||||
"hcatAdHocMask",
|
||||
"hcatMarkovBruteForce",
|
||||
"hcatFingerprint",
|
||||
"hcatCombination",
|
||||
"hcatCombinator3",
|
||||
"hcatCombinatorX",
|
||||
"hcatHybrid",
|
||||
"hcatYoloCombination",
|
||||
"hcatMiddleCombinator",
|
||||
"hcatThoroughCombinator",
|
||||
"hcatCombipow",
|
||||
"hcatPrince",
|
||||
"hcatPermute",
|
||||
}
|
||||
)
|
||||
|
||||
@@ -1430,24 +1441,25 @@ def hcatFingerprint(
|
||||
print("Killing PID {0}...".format(str(sort_proc.pid)))
|
||||
sort_proc.kill()
|
||||
expander_proc.kill()
|
||||
hcatProcess = subprocess.Popen(
|
||||
[
|
||||
hcatBin,
|
||||
"-m",
|
||||
hcatHashType,
|
||||
hcatHashFile,
|
||||
"--session",
|
||||
generate_session_id(),
|
||||
"-o",
|
||||
f"{hcatHashFile}.out",
|
||||
"-a",
|
||||
"1",
|
||||
f"{hcatHashFile}.expanded",
|
||||
f"{hcatHashFile}.expanded",
|
||||
*shlex.split(hcatTuning),
|
||||
*([f"--potfile-path={hcatPotfilePath}"] if hcatPotfilePath else []),
|
||||
]
|
||||
)
|
||||
fingerprint_cmd = [
|
||||
hcatBin,
|
||||
"-m",
|
||||
hcatHashType,
|
||||
hcatHashFile,
|
||||
"--session",
|
||||
generate_session_id(),
|
||||
"-o",
|
||||
f"{hcatHashFile}.out",
|
||||
"-a",
|
||||
"1",
|
||||
f"{hcatHashFile}.expanded",
|
||||
f"{hcatHashFile}.expanded",
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatFingerprint"):
|
||||
_insert_optimized_flag(fingerprint_cmd)
|
||||
fingerprint_cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(fingerprint_cmd)
|
||||
hcatProcess = subprocess.Popen(fingerprint_cmd)
|
||||
try:
|
||||
hcatProcess.wait()
|
||||
except KeyboardInterrupt:
|
||||
@@ -1507,6 +1519,8 @@ def hcatCombination(hcatHashType, hcatHashFile, wordlists=None):
|
||||
resolved_wordlists[0],
|
||||
resolved_wordlists[1],
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatCombination"):
|
||||
_insert_optimized_flag(cmd)
|
||||
cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(cmd)
|
||||
hcatProcess = subprocess.Popen(cmd)
|
||||
@@ -1542,6 +1556,8 @@ def hcatCombinator3(hcatHashType, hcatHashFile, wordlists):
|
||||
"-o",
|
||||
f"{hcatHashFile}.out",
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatCombinator3"):
|
||||
_insert_optimized_flag(hashcat_cmd)
|
||||
hashcat_cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(hashcat_cmd)
|
||||
generator_proc = subprocess.Popen(generator_cmd, stdout=subprocess.PIPE)
|
||||
@@ -1586,6 +1602,8 @@ def hcatCombinatorX(hcatHashType, hcatHashFile, wordlists, separator=None):
|
||||
"-o",
|
||||
f"{hcatHashFile}.out",
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatCombinatorX"):
|
||||
_insert_optimized_flag(hashcat_cmd)
|
||||
hashcat_cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(hashcat_cmd)
|
||||
generator_proc = subprocess.Popen(generator_cmd, stdout=subprocess.PIPE)
|
||||
@@ -1683,6 +1701,8 @@ def hcatHybrid(hcatHashType, hcatHashFile, wordlists=None):
|
||||
f"{hcatHashFile}.out",
|
||||
*args,
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatHybrid"):
|
||||
_insert_optimized_flag(cmd)
|
||||
cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(cmd)
|
||||
hcatProcess = subprocess.Popen(cmd)
|
||||
@@ -1719,6 +1739,8 @@ def hcatYoloCombination(hcatHashType, hcatHashFile):
|
||||
left_path,
|
||||
right_path,
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatYoloCombination"):
|
||||
_insert_optimized_flag(cmd)
|
||||
cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(cmd)
|
||||
hcatProcess = subprocess.Popen(cmd)
|
||||
@@ -2102,8 +2124,11 @@ def hcatMiddleCombinator(hcatHashType, hcatHashFile):
|
||||
masks[x],
|
||||
hcatMiddleBaseList,
|
||||
hcatMiddleBaseList,
|
||||
*([f"--potfile-path={hcatPotfilePath}"] if hcatPotfilePath else []),
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatMiddleCombinator"):
|
||||
_insert_optimized_flag(cmd)
|
||||
cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(cmd)
|
||||
hcatProcess = subprocess.Popen(cmd)
|
||||
try:
|
||||
hcatProcess.wait()
|
||||
@@ -2145,6 +2170,8 @@ def hcatThoroughCombinator(hcatHashType, hcatHashFile):
|
||||
hcatThoroughBaseList,
|
||||
hcatThoroughBaseList,
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatThoroughCombinator"):
|
||||
_insert_optimized_flag(cmd)
|
||||
cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(cmd)
|
||||
hcatProcess = subprocess.Popen(cmd)
|
||||
@@ -2171,8 +2198,11 @@ def hcatThoroughCombinator(hcatHashType, hcatHashFile):
|
||||
masks[x],
|
||||
hcatThoroughBaseList,
|
||||
hcatThoroughBaseList,
|
||||
*([f"--potfile-path={hcatPotfilePath}"] if hcatPotfilePath else []),
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatThoroughCombinator"):
|
||||
_insert_optimized_flag(cmd)
|
||||
cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(cmd)
|
||||
hcatProcess = subprocess.Popen(cmd)
|
||||
try:
|
||||
hcatProcess.wait()
|
||||
@@ -2200,6 +2230,8 @@ def hcatThoroughCombinator(hcatHashType, hcatHashFile):
|
||||
hcatThoroughBaseList,
|
||||
hcatThoroughBaseList,
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatThoroughCombinator"):
|
||||
_insert_optimized_flag(cmd)
|
||||
cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(cmd)
|
||||
hcatProcess = subprocess.Popen(cmd)
|
||||
@@ -2231,6 +2263,8 @@ def hcatThoroughCombinator(hcatHashType, hcatHashFile):
|
||||
hcatThoroughBaseList,
|
||||
hcatThoroughBaseList,
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatThoroughCombinator"):
|
||||
_insert_optimized_flag(cmd)
|
||||
cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(cmd)
|
||||
hcatProcess = subprocess.Popen(cmd)
|
||||
@@ -2433,6 +2467,8 @@ def hcatCombipow(hcatHashType, hcatHashFile, wordlist, use_space_sep=True):
|
||||
"-o",
|
||||
f"{hcatHashFile}.out",
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatCombipow"):
|
||||
_insert_optimized_flag(hashcat_cmd)
|
||||
hashcat_cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(hashcat_cmd)
|
||||
generator_proc = subprocess.Popen(generator_cmd, stdout=subprocess.PIPE)
|
||||
@@ -2483,6 +2519,8 @@ def hcatPrince(hcatHashType, hcatHashFile):
|
||||
"-r",
|
||||
prince_rule,
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatPrince"):
|
||||
_insert_optimized_flag(hashcat_cmd)
|
||||
hashcat_cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(hashcat_cmd)
|
||||
hashcat_cmd = _add_debug_mode_for_rules(hashcat_cmd)
|
||||
@@ -2518,6 +2556,8 @@ def hcatPermute(hcatHashType, hcatHashFile, wordlist):
|
||||
"-o",
|
||||
f"{hcatHashFile}.out",
|
||||
]
|
||||
if _should_use_optimized_kernel("hcatPermute"):
|
||||
_insert_optimized_flag(hashcat_cmd)
|
||||
hashcat_cmd.extend(shlex.split(hcatTuning))
|
||||
_append_potfile_arg(hashcat_cmd)
|
||||
with _open_wordlist(wordlist) as wl_file:
|
||||
|
||||
@@ -76,3 +76,8 @@ invalid-argument-type = "warn"
|
||||
testpaths = [
|
||||
"tests",
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"pytest-timeout>=2.4.0",
|
||||
]
|
||||
|
||||
@@ -515,6 +515,17 @@ class TestOptimizedKernel:
|
||||
[
|
||||
"hcatDictionary",
|
||||
"hcatQuickDictionary",
|
||||
"hcatFingerprint",
|
||||
"hcatCombination",
|
||||
"hcatCombinator3",
|
||||
"hcatCombinatorX",
|
||||
"hcatHybrid",
|
||||
"hcatYoloCombination",
|
||||
"hcatMiddleCombinator",
|
||||
"hcatThoroughCombinator",
|
||||
"hcatCombipow",
|
||||
"hcatPrince",
|
||||
"hcatPermute",
|
||||
"hcatBandrel",
|
||||
"hcatGoodMeasure",
|
||||
"hcatRecycle",
|
||||
@@ -529,14 +540,7 @@ class TestOptimizedKernel:
|
||||
@pytest.mark.parametrize(
|
||||
"attack_name",
|
||||
[
|
||||
"hcatCombination",
|
||||
"hcatYoloCombination",
|
||||
"hcatMiddleCombinator",
|
||||
"hcatThoroughCombinator",
|
||||
"hcatHybrid",
|
||||
"hcatPrince",
|
||||
"hcatOmen",
|
||||
"hcatFingerprint",
|
||||
"hcatLMtoNT",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""End-to-end tests for markov brute force attack flow."""
|
||||
|
||||
import os
|
||||
import gzip
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
@@ -18,6 +17,9 @@ class TestMarkovE2E:
|
||||
# Setup paths
|
||||
main.hate_path = Path(__file__).resolve().parents[1]
|
||||
main.hcatHcstat2genBin = "hcstat2gen.bin"
|
||||
bin_path = main.hate_path / "hashcat-utils" / "bin" / "hcstat2gen.bin"
|
||||
if not bin_path.is_file():
|
||||
pytest.skip(f"hcstat2gen.bin not compiled: {bin_path}")
|
||||
|
||||
# Create test wordlist
|
||||
wordlist = tmp_path / "wordlist.txt"
|
||||
@@ -44,6 +46,9 @@ class TestMarkovE2E:
|
||||
# Setup paths
|
||||
main.hate_path = Path(__file__).resolve().parents[1]
|
||||
main.hcatHcstat2genBin = "hcstat2gen.bin"
|
||||
bin_path = main.hate_path / "hashcat-utils" / "bin" / "hcstat2gen.bin"
|
||||
if not bin_path.is_file():
|
||||
pytest.skip(f"hcstat2gen.bin not compiled: {bin_path}")
|
||||
|
||||
# Create test wordlist (gzipped)
|
||||
wordlist_plain = tmp_path / "wordlist.txt"
|
||||
|
||||
93
tests/test_optimized_kernel.py
Normal file
93
tests/test_optimized_kernel.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""Tests for optimized kernel system - covers gaps not in test_main_utils.py::TestOptimizedKernel."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def main_module(hc_module):
|
||||
return hc_module._main
|
||||
|
||||
|
||||
class TestOptimizedKernelMissingNames:
|
||||
"""Covers the two DEFAULT_OPTIMIZED_ATTACKS names missing from test_main_utils.py."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"attack_name",
|
||||
[
|
||||
"hcatAdHocMask",
|
||||
"hcatMarkovBruteForce",
|
||||
],
|
||||
)
|
||||
def test_optimized_attacks_return_true(self, main_module, attack_name):
|
||||
assert main_module._should_use_optimized_kernel(attack_name) is True
|
||||
|
||||
|
||||
class TestOptimizedKernelNonMembers:
|
||||
"""Covers False cases not already parametrized in test_main_utils.py."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"attack_name",
|
||||
[
|
||||
"hcatOllama",
|
||||
"hcatNgramX",
|
||||
"hcatGenerateRules",
|
||||
"hcatMarkovTrain",
|
||||
"hcatOmenTrain",
|
||||
"unknown_attack",
|
||||
],
|
||||
)
|
||||
def test_non_optimized_attacks_return_false(self, main_module, attack_name):
|
||||
assert main_module._should_use_optimized_kernel(attack_name) is False
|
||||
|
||||
|
||||
class TestHcatFingerprintOptimizedFlag:
|
||||
"""End-to-end mock test verifying hcatFingerprint passes -O to hashcat."""
|
||||
|
||||
def test_fingerprint_includes_optimized_flag(self, main_module, tmp_path):
|
||||
hash_file = tmp_path / "hashes.txt"
|
||||
hash_file.write_text("")
|
||||
(tmp_path / "hashes.txt.out").write_text("")
|
||||
# hcatFingerprint opens .working and .expanded inside the loop; create them.
|
||||
(tmp_path / "hashes.txt.working").write_text("")
|
||||
(tmp_path / "hashes.txt.expanded").write_text("")
|
||||
|
||||
captured_cmds = []
|
||||
|
||||
def fake_popen(cmd, **kwargs):
|
||||
captured_cmds.append(list(cmd))
|
||||
proc = MagicMock()
|
||||
proc.stdout = MagicMock()
|
||||
proc.pid = 1234
|
||||
proc.wait.return_value = 0
|
||||
return proc
|
||||
|
||||
# lineCount call sequence:
|
||||
# call 1 (initial, before loop): 1 -> crackedBefore=1, crackedAfter=0 -> enter loop
|
||||
# call 2 (top of loop body): 1 -> crackedBefore=1
|
||||
# call 3 (bottom of loop body): 1 -> crackedAfter=1 -> exit loop (1==1)
|
||||
# call 4 (final hcatFingerprintCount assignment): 1
|
||||
with (
|
||||
patch("hate_crack.main.subprocess.Popen", side_effect=fake_popen),
|
||||
patch.object(main_module, "hcatBin", "hashcat"),
|
||||
patch.object(main_module, "hcatTuning", ""),
|
||||
patch.object(main_module, "hcatPotfilePath", ""),
|
||||
patch.object(main_module, "hate_path", str(tmp_path)),
|
||||
patch.object(main_module, "hcatExpanderBin", "expander.bin"),
|
||||
patch.object(main_module, "hcatHashCracked", 0),
|
||||
patch("hate_crack.main.lineCount", side_effect=[1, 1, 1, 1]),
|
||||
patch("hate_crack.main._write_delimited_field"),
|
||||
patch("hate_crack.main.ensure_binary"),
|
||||
patch("hate_crack.main.generate_session_id", return_value="test_session"),
|
||||
):
|
||||
main_module.hcatFingerprint(
|
||||
hcatHashType="1000",
|
||||
hcatHashFile=str(hash_file),
|
||||
)
|
||||
|
||||
hashcat_cmds = [cmd for cmd in captured_cmds if cmd and cmd[0] == "hashcat"]
|
||||
assert hashcat_cmds, "No hashcat Popen calls captured"
|
||||
assert any("-O" in cmd for cmd in hashcat_cmds), (
|
||||
f"Expected -O in hashcat cmd, got: {hashcat_cmds}"
|
||||
)
|
||||
@@ -1,19 +1,18 @@
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
|
||||
import json
|
||||
import pytest
|
||||
|
||||
|
||||
def test_pipal_executable_and_runs(tmp_path):
|
||||
# Read pipalPath from config.json
|
||||
config_path = os.path.join(os.path.dirname(__file__), "..", "config.json")
|
||||
if not os.path.isfile(config_path):
|
||||
pytest.skip("config.json not present (worktree or fresh checkout)")
|
||||
with open(config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
pipal_path = config.get("pipalPath")
|
||||
if not pipal_path or not os.path.isfile(pipal_path):
|
||||
import pytest
|
||||
|
||||
pytest.skip("pipalPath not configured or file missing")
|
||||
|
||||
if not os.access(pipal_path, os.X_OK):
|
||||
|
||||
@@ -11,11 +11,15 @@ PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
||||
|
||||
def load_cli_module():
|
||||
os.environ["HATE_CRACK_SKIP_INIT"] = "1"
|
||||
_preserve = {"hate_crack.attacks", "hate_crack.api"}
|
||||
for key in list(sys.modules.keys()):
|
||||
# Preserve hate_crack.attacks - reloading it creates a new module object
|
||||
# that breaks __globals__ references held by functions imported at
|
||||
# module level in other test files (test isolation violation).
|
||||
if "hate_crack" in key and key != "hate_crack.attacks":
|
||||
# Preserve hate_crack.attacks and hate_crack.api - reloading them creates
|
||||
# new module objects that break __globals__ references held by functions
|
||||
# imported at module level in other test files (test isolation violation).
|
||||
# In particular, hate_crack.api must be preserved so that mocks applied via
|
||||
# patch("hate_crack.api.*") in later tests (e.g. test_rule_download_parallel)
|
||||
# target the same module object that the already-imported functions reference.
|
||||
if "hate_crack" in key and key not in _preserve:
|
||||
del sys.modules[key]
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"hate_crack_cli", PROJECT_ROOT / "hate_crack.py"
|
||||
|
||||
178
tests/test_rule_download_parallel.py
Normal file
178
tests/test_rule_download_parallel.py
Normal file
@@ -0,0 +1,178 @@
|
||||
import os
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from hate_crack.api import list_and_download_hashmob_rules
|
||||
|
||||
|
||||
def _make_rules(names):
|
||||
return [{"file_name": n} for n in names]
|
||||
|
||||
|
||||
def _patch_stdin_tty():
|
||||
mock_stdin = MagicMock()
|
||||
mock_stdin.isatty.return_value = True
|
||||
return patch("hate_crack.api.sys.stdin", mock_stdin)
|
||||
|
||||
|
||||
class TestListAndDownloadHashmobRulesAllFiles:
|
||||
def test_downloads_all_rules_when_selection_is_a(self, tmp_path):
|
||||
rules = _make_rules(["a.rule", "b.rule", "c.rule", "d.rule", "e.rule"])
|
||||
rules_dir = str(tmp_path / "rules")
|
||||
os.makedirs(rules_dir)
|
||||
|
||||
with patch("hate_crack.api.download_hashmob_rule_list", return_value=rules), \
|
||||
patch("hate_crack.api.download_hashmob_rule") as mock_dl, \
|
||||
_patch_stdin_tty(), \
|
||||
patch("builtins.input", return_value="a"):
|
||||
list_and_download_hashmob_rules(rules_dir=rules_dir)
|
||||
|
||||
assert mock_dl.call_count == 5
|
||||
downloaded = {call.args[0] for call in mock_dl.call_args_list}
|
||||
assert downloaded == {"a.rule", "b.rule", "c.rule", "d.rule", "e.rule"}
|
||||
|
||||
def test_output_path_is_inside_rules_dir(self, tmp_path):
|
||||
rules = _make_rules(["sample.rule"])
|
||||
rules_dir = str(tmp_path / "rules")
|
||||
os.makedirs(rules_dir)
|
||||
|
||||
captured_paths = []
|
||||
|
||||
def capture(file_name, out_path):
|
||||
captured_paths.append(out_path)
|
||||
|
||||
with patch("hate_crack.api.download_hashmob_rule_list", return_value=rules), \
|
||||
patch("hate_crack.api.download_hashmob_rule", side_effect=capture), \
|
||||
_patch_stdin_tty(), \
|
||||
patch("builtins.input", return_value="a"):
|
||||
list_and_download_hashmob_rules(rules_dir=rules_dir)
|
||||
|
||||
assert len(captured_paths) == 1
|
||||
assert captured_paths[0].startswith(rules_dir)
|
||||
|
||||
def test_success_count_reported(self, tmp_path, capsys):
|
||||
rules = _make_rules(["x.rule", "y.rule"])
|
||||
rules_dir = str(tmp_path / "rules")
|
||||
os.makedirs(rules_dir)
|
||||
|
||||
with patch("hate_crack.api.download_hashmob_rule_list", return_value=rules), \
|
||||
patch("hate_crack.api.download_hashmob_rule"), \
|
||||
_patch_stdin_tty(), \
|
||||
patch("builtins.input", return_value="a"):
|
||||
list_and_download_hashmob_rules(rules_dir=rules_dir)
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert "2 succeeded" in out
|
||||
assert "0 failed" in out
|
||||
|
||||
|
||||
class TestListAndDownloadHashmobRulesSkipping:
|
||||
def test_skips_already_downloaded_files(self, tmp_path):
|
||||
rules = _make_rules(["existing.rule", "new1.rule", "new2.rule", "also_existing.rule", "new3.rule"])
|
||||
rules_dir = str(tmp_path / "rules")
|
||||
os.makedirs(rules_dir)
|
||||
(tmp_path / "rules" / "existing.rule").touch()
|
||||
(tmp_path / "rules" / "also_existing.rule").touch()
|
||||
|
||||
with patch("hate_crack.api.download_hashmob_rule_list", return_value=rules), \
|
||||
patch("hate_crack.api.download_hashmob_rule") as mock_dl, \
|
||||
_patch_stdin_tty(), \
|
||||
patch("builtins.input", return_value="a"):
|
||||
list_and_download_hashmob_rules(rules_dir=rules_dir)
|
||||
|
||||
assert mock_dl.call_count == 3
|
||||
downloaded = {call.args[0] for call in mock_dl.call_args_list}
|
||||
assert downloaded == {"new1.rule", "new2.rule", "new3.rule"}
|
||||
|
||||
def test_skip_prints_message(self, tmp_path, capsys):
|
||||
rules = _make_rules(["existing.rule", "new.rule"])
|
||||
rules_dir = str(tmp_path / "rules")
|
||||
os.makedirs(rules_dir)
|
||||
(tmp_path / "rules" / "existing.rule").touch()
|
||||
|
||||
with patch("hate_crack.api.download_hashmob_rule_list", return_value=rules), \
|
||||
patch("hate_crack.api.download_hashmob_rule"), \
|
||||
_patch_stdin_tty(), \
|
||||
patch("builtins.input", return_value="a"):
|
||||
list_and_download_hashmob_rules(rules_dir=rules_dir)
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert "Skipping" in out
|
||||
assert "existing.rule" in out
|
||||
|
||||
def test_all_already_downloaded_does_nothing(self, tmp_path):
|
||||
rules = _make_rules(["r1.rule", "r2.rule"])
|
||||
rules_dir = str(tmp_path / "rules")
|
||||
os.makedirs(rules_dir)
|
||||
(tmp_path / "rules" / "r1.rule").touch()
|
||||
(tmp_path / "rules" / "r2.rule").touch()
|
||||
|
||||
with patch("hate_crack.api.download_hashmob_rule_list", return_value=rules), \
|
||||
patch("hate_crack.api.download_hashmob_rule") as mock_dl, \
|
||||
_patch_stdin_tty(), \
|
||||
patch("builtins.input", return_value="a"):
|
||||
list_and_download_hashmob_rules(rules_dir=rules_dir)
|
||||
|
||||
mock_dl.assert_not_called()
|
||||
|
||||
|
||||
class TestListAndDownloadHashmobRulesFailures:
|
||||
def test_failed_download_reported_in_count(self, tmp_path, capsys):
|
||||
rules = _make_rules(["good.rule", "bad.rule", "also_good.rule"])
|
||||
rules_dir = str(tmp_path / "rules")
|
||||
os.makedirs(rules_dir)
|
||||
|
||||
def side_effect(file_name, out_path):
|
||||
if file_name == "bad.rule":
|
||||
raise RuntimeError("network error")
|
||||
|
||||
with patch("hate_crack.api.download_hashmob_rule_list", return_value=rules), \
|
||||
patch("hate_crack.api.download_hashmob_rule", side_effect=side_effect), \
|
||||
_patch_stdin_tty(), \
|
||||
patch("builtins.input", return_value="a"):
|
||||
list_and_download_hashmob_rules(rules_dir=rules_dir)
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert "2 succeeded" in out
|
||||
assert "1 failed" in out
|
||||
|
||||
def test_failure_does_not_block_other_downloads(self, tmp_path):
|
||||
rules = _make_rules(["good1.rule", "bad.rule", "good2.rule", "good3.rule", "good4.rule"])
|
||||
rules_dir = str(tmp_path / "rules")
|
||||
os.makedirs(rules_dir)
|
||||
completed = []
|
||||
|
||||
def side_effect(file_name, out_path):
|
||||
if file_name == "bad.rule":
|
||||
raise RuntimeError("fail")
|
||||
completed.append(file_name)
|
||||
|
||||
with patch("hate_crack.api.download_hashmob_rule_list", return_value=rules), \
|
||||
patch("hate_crack.api.download_hashmob_rule", side_effect=side_effect), \
|
||||
_patch_stdin_tty(), \
|
||||
patch("builtins.input", return_value="a"):
|
||||
list_and_download_hashmob_rules(rules_dir=rules_dir)
|
||||
|
||||
assert len(completed) == 4
|
||||
assert "bad.rule" not in completed
|
||||
|
||||
|
||||
class TestListAndDownloadHashmobRulesEmptyAndQuit:
|
||||
def test_returns_early_when_rules_list_empty(self, tmp_path):
|
||||
with patch("hate_crack.api.download_hashmob_rule_list", return_value=[]), \
|
||||
patch("hate_crack.api.download_hashmob_rule") as mock_dl:
|
||||
list_and_download_hashmob_rules(rules_dir=str(tmp_path))
|
||||
|
||||
mock_dl.assert_not_called()
|
||||
|
||||
def test_quit_selection_downloads_nothing(self, tmp_path):
|
||||
rules = _make_rules(["r.rule"])
|
||||
rules_dir = str(tmp_path / "rules")
|
||||
os.makedirs(rules_dir)
|
||||
|
||||
with patch("hate_crack.api.download_hashmob_rule_list", return_value=rules), \
|
||||
patch("hate_crack.api.download_hashmob_rule") as mock_dl, \
|
||||
_patch_stdin_tty(), \
|
||||
patch("builtins.input", return_value="q"):
|
||||
list_and_download_hashmob_rules(rules_dir=rules_dir)
|
||||
|
||||
mock_dl.assert_not_called()
|
||||
@@ -7,6 +7,8 @@ from hate_crack.api import HashviewAPI
|
||||
|
||||
def get_hashview_config():
|
||||
config_path = os.path.join(os.path.dirname(__file__), "..", "config.json")
|
||||
if not os.path.isfile(config_path):
|
||||
pytest.skip("config.json not present (worktree or fresh checkout)")
|
||||
with open(config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
hashview_url = config.get("hashview_url")
|
||||
|
||||
86
tests/test_wordlist_filtering.py
Normal file
86
tests/test_wordlist_filtering.py
Normal file
@@ -0,0 +1,86 @@
|
||||
from hate_crack.main import EXCLUDED_WORDLIST_EXTENSIONS, list_wordlist_files
|
||||
|
||||
|
||||
class TestExcludedWordlistExtensions:
|
||||
def test_contains_7z(self):
|
||||
assert ".7z" in EXCLUDED_WORDLIST_EXTENSIONS
|
||||
|
||||
def test_contains_torrent(self):
|
||||
assert ".torrent" in EXCLUDED_WORDLIST_EXTENSIONS
|
||||
|
||||
def test_contains_out(self):
|
||||
assert ".out" in EXCLUDED_WORDLIST_EXTENSIONS
|
||||
|
||||
def test_is_frozenset(self):
|
||||
assert isinstance(EXCLUDED_WORDLIST_EXTENSIONS, frozenset)
|
||||
|
||||
|
||||
class TestListWordlistFiles:
|
||||
def _populate(self, directory, names):
|
||||
for name in names:
|
||||
(directory / name).touch()
|
||||
|
||||
def test_excludes_7z_files(self, tmp_path):
|
||||
self._populate(tmp_path, ["rockyou.txt", "archive.7z"])
|
||||
result = list_wordlist_files(str(tmp_path))
|
||||
assert "archive.7z" not in result
|
||||
assert "rockyou.txt" in result
|
||||
|
||||
def test_excludes_torrent_files(self, tmp_path):
|
||||
self._populate(tmp_path, ["words.txt", "data.torrent"])
|
||||
result = list_wordlist_files(str(tmp_path))
|
||||
assert "data.torrent" not in result
|
||||
assert "words.txt" in result
|
||||
|
||||
def test_excludes_out_files(self, tmp_path):
|
||||
self._populate(tmp_path, ["hashesorg.lst", "results.out"])
|
||||
result = list_wordlist_files(str(tmp_path))
|
||||
assert "results.out" not in result
|
||||
assert "hashesorg.lst" in result
|
||||
|
||||
def test_excludes_ds_store(self, tmp_path):
|
||||
self._populate(tmp_path, ["words.dict", ".DS_Store"])
|
||||
result = list_wordlist_files(str(tmp_path))
|
||||
assert ".DS_Store" not in result
|
||||
assert "words.dict" in result
|
||||
|
||||
def test_includes_txt_lst_dict(self, tmp_path):
|
||||
names = ["rockyou.txt", "hashesorg.lst", "words.dict"]
|
||||
self._populate(tmp_path, names)
|
||||
result = list_wordlist_files(str(tmp_path))
|
||||
assert result == sorted(names)
|
||||
|
||||
def test_result_is_sorted(self, tmp_path):
|
||||
names = ["zebra.txt", "apple.txt", "mango.lst"]
|
||||
self._populate(tmp_path, names)
|
||||
result = list_wordlist_files(str(tmp_path))
|
||||
assert result == sorted(names)
|
||||
|
||||
def test_mixed_files_filters_correctly(self, tmp_path):
|
||||
all_files = [
|
||||
"rockyou.txt",
|
||||
"hashesorg.lst",
|
||||
"archive.7z",
|
||||
"data.torrent",
|
||||
"results.out",
|
||||
".DS_Store",
|
||||
"words.dict",
|
||||
]
|
||||
self._populate(tmp_path, all_files)
|
||||
result = list_wordlist_files(str(tmp_path))
|
||||
assert "archive.7z" not in result
|
||||
assert "data.torrent" not in result
|
||||
assert "results.out" not in result
|
||||
assert ".DS_Store" not in result
|
||||
assert "rockyou.txt" in result
|
||||
assert "hashesorg.lst" in result
|
||||
assert "words.dict" in result
|
||||
|
||||
def test_empty_directory(self, tmp_path):
|
||||
result = list_wordlist_files(str(tmp_path))
|
||||
assert result == []
|
||||
|
||||
def test_only_excluded_files_returns_empty(self, tmp_path):
|
||||
self._populate(tmp_path, ["a.7z", "b.torrent", "c.out", ".DS_Store"])
|
||||
result = list_wordlist_files(str(tmp_path))
|
||||
assert result == []
|
||||
Reference in New Issue
Block a user