feat: add ngramX attack and gzip auto-detection for external binaries

- Add _is_gzipped() magic-byte detector and _wordlist_path() context
  manager that transparently decompresses gzip files to a temp path
- Apply gzip handling to hcatCombinator3 and hcatCombinatorX via
  contextlib.ExitStack so compressed wordlists work without manual prep
- Add hcatNgramX() wrapper using ngramX.bin <corpus> <group_size> piped
  to hashcat, with gzip auto-detection on the corpus file
- Add ngram_attack() handler in attacks.py with tab-autocomplete corpus
  selection and configurable group size (default 3)
- Register attack as menu option 19 in both main.py and hate_crack.py
- Fix wordlist_optimizer.py: .app extension on macOS was wrong, use .bin
- Add tests/test_ngram_gzip.py covering ngram_attack handler, _is_gzipped,
  and _wordlist_path context manager (temp file cleanup, plain passthrough)
This commit is contained in:
Justin Bollinger
2026-03-19 14:35:29 -04:00
parent 20f9110fc1
commit f43bf2c982
5 changed files with 308 additions and 53 deletions

View File

@@ -88,6 +88,7 @@ def get_main_menu_options():
"16": _attacks.omen_attack,
"17": _attacks.adhoc_mask_crack,
"18": _attacks.markov_brute_force,
"19": _attacks.ngram_attack,
"90": download_hashmob_rules,
"91": weakpass_wordlist_menu,
"92": download_hashmob_wordlists,

View File

@@ -637,6 +637,32 @@ def markov_brute_force(ctx: Any) -> None:
ctx.hcatMarkovBruteForce(ctx.hcatHashType, ctx.hcatHashFile, hcatMinLen, hcatMaxLen)
def ngram_attack(ctx: Any) -> None:
print("\n" + "=" * 60)
print("NGRAM ATTACK")
print("=" * 60)
print("Generates n-gram candidates from a corpus file via ngramX.bin.")
print("Gzip-compressed corpus files are auto-detected and decompressed.")
print("=" * 60)
corpus = ctx.select_file_with_autocomplete(
"Select corpus file (tab to autocomplete)",
base_dir=ctx.hcatWordlists,
)
if not corpus:
print("No corpus selected. Aborting ngram attack.")
return
group_size_raw = input("\nEnter n-gram group size (default 3): ").strip()
try:
group_size = int(group_size_raw) if group_size_raw else 3
except ValueError:
print("[!] Invalid group size. Using default of 3.")
group_size = 3
ctx.hcatNgramX(ctx.hcatHashType, ctx.hcatHashFile, corpus, group_size)
def combinator_submenu(ctx: Any) -> None:
items = [
("1", "Combinator Attack (2-8 wordlists)"),

View File

@@ -23,7 +23,10 @@ import time
import argparse
import urllib.request
import urllib.error
import contextlib
import gzip
import lzma
import tempfile
from types import SimpleNamespace
#!/usr/bin/env python3
@@ -680,6 +683,7 @@ hcatFingerprintCount = 0
hcatCombinationCount = 0
hcatCombinator3Count = 0
hcatCombinatorXCount = 0
hcatNgramXCount = 0
hcatHybridCount = 0
hcatExtraCount = 0
hcatRecycleCount = 0
@@ -697,6 +701,37 @@ def _debug_cmd(cmd):
print(f"[DEBUG] hashcat cmd: {_format_cmd(cmd)}")
def _is_gzipped(path: str) -> bool:
try:
with open(path, "rb") as f:
return f.read(2) == b"\x1f\x8b"
except OSError:
return False
@contextlib.contextmanager
def _wordlist_path(path: str):
"""Yield an uncompressed path for path.
If the file is gzip-compressed, decompress to a temp file and clean up on
exit. Otherwise yield the original path unchanged.
"""
if _is_gzipped(path):
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp:
tmp_name = tmp.name
with gzip.open(path, "rb") as gz_in:
shutil.copyfileobj(gz_in, tmp)
try:
yield tmp_name
finally:
try:
os.unlink(tmp_name)
except OSError:
pass
else:
yield path
def _add_debug_mode_for_rules(cmd):
"""Add debug mode arguments to hashcat command if rules are being used.
@@ -1431,30 +1466,32 @@ def hcatCombinator3(hcatHashType, hcatHashFile, wordlists):
return
combinator3_bin = os.path.join(hate_path, "hashcat-utils/bin/combinator3.bin")
generator_cmd = [combinator3_bin] + list(wordlists[:3])
hashcat_cmd = [
hcatBin,
"-m",
hcatHashType,
hcatHashFile,
"--session",
generate_session_id(),
"-o",
f"{hcatHashFile}.out",
]
hashcat_cmd.extend(shlex.split(hcatTuning))
_append_potfile_arg(hashcat_cmd)
generator_proc = subprocess.Popen(generator_cmd, stdout=subprocess.PIPE)
assert generator_proc.stdout is not None
hcatProcess = subprocess.Popen(hashcat_cmd, stdin=generator_proc.stdout)
generator_proc.stdout.close()
try:
hcatProcess.wait()
generator_proc.wait()
except KeyboardInterrupt:
print("Killing PID {0}...".format(str(hcatProcess.pid)))
hcatProcess.kill()
generator_proc.kill()
with contextlib.ExitStack() as stack:
resolved = [stack.enter_context(_wordlist_path(w)) for w in wordlists[:3]]
generator_cmd = [combinator3_bin] + resolved
hashcat_cmd = [
hcatBin,
"-m",
hcatHashType,
hcatHashFile,
"--session",
generate_session_id(),
"-o",
f"{hcatHashFile}.out",
]
hashcat_cmd.extend(shlex.split(hcatTuning))
_append_potfile_arg(hashcat_cmd)
generator_proc = subprocess.Popen(generator_cmd, stdout=subprocess.PIPE)
assert generator_proc.stdout is not None
hcatProcess = subprocess.Popen(hashcat_cmd, stdin=generator_proc.stdout)
generator_proc.stdout.close()
try:
hcatProcess.wait()
generator_proc.wait()
except KeyboardInterrupt:
print("Killing PID {0}...".format(str(hcatProcess.pid)))
hcatProcess.kill()
generator_proc.kill()
hcatCombinator3Count = lineCount(hcatHashFile + ".out") - hcatHashCracked
@@ -1469,38 +1506,75 @@ def hcatCombinatorX(hcatHashType, hcatHashFile, wordlists, separator=None):
return
combinatorX_bin = os.path.join(hate_path, "hashcat-utils/bin/combinatorX.bin")
generator_cmd = [combinatorX_bin]
for i, f in enumerate(wordlists[:8], start=1):
generator_cmd += [f"--file{i}", f]
if separator:
generator_cmd += ["--sepFill", separator]
hashcat_cmd = [
hcatBin,
"-m",
hcatHashType,
hcatHashFile,
"--session",
generate_session_id(),
"-o",
f"{hcatHashFile}.out",
]
hashcat_cmd.extend(shlex.split(hcatTuning))
_append_potfile_arg(hashcat_cmd)
generator_proc = subprocess.Popen(generator_cmd, stdout=subprocess.PIPE)
assert generator_proc.stdout is not None
hcatProcess = subprocess.Popen(hashcat_cmd, stdin=generator_proc.stdout)
generator_proc.stdout.close()
try:
hcatProcess.wait()
generator_proc.wait()
except KeyboardInterrupt:
print("Killing PID {0}...".format(str(hcatProcess.pid)))
hcatProcess.kill()
generator_proc.kill()
with contextlib.ExitStack() as stack:
resolved = [stack.enter_context(_wordlist_path(w)) for w in wordlists[:8]]
generator_cmd = [combinatorX_bin]
for i, f in enumerate(resolved, start=1):
generator_cmd += [f"--file{i}", f]
if separator:
generator_cmd += ["--sepFill", separator]
hashcat_cmd = [
hcatBin,
"-m",
hcatHashType,
hcatHashFile,
"--session",
generate_session_id(),
"-o",
f"{hcatHashFile}.out",
]
hashcat_cmd.extend(shlex.split(hcatTuning))
_append_potfile_arg(hashcat_cmd)
generator_proc = subprocess.Popen(generator_cmd, stdout=subprocess.PIPE)
assert generator_proc.stdout is not None
hcatProcess = subprocess.Popen(hashcat_cmd, stdin=generator_proc.stdout)
generator_proc.stdout.close()
try:
hcatProcess.wait()
generator_proc.wait()
except KeyboardInterrupt:
print("Killing PID {0}...".format(str(hcatProcess.pid)))
hcatProcess.kill()
generator_proc.kill()
hcatCombinatorXCount = lineCount(hcatHashFile + ".out") - hcatHashCracked
# NgramX Attack - n-gram candidates from corpus file piped to hashcat
def hcatNgramX(hcatHashType, hcatHashFile, corpus, group_size=3):
global hcatNgramXCount
global hcatProcess
ngramX_bin = os.path.join(hate_path, "hashcat-utils/bin/ngramX.bin")
with _wordlist_path(corpus) as resolved_corpus:
generator_cmd = [ngramX_bin, resolved_corpus, str(group_size)]
hashcat_cmd = [
hcatBin,
"-m",
hcatHashType,
hcatHashFile,
"--session",
generate_session_id(),
"-o",
f"{hcatHashFile}.out",
]
hashcat_cmd.extend(shlex.split(hcatTuning))
_append_potfile_arg(hashcat_cmd)
generator_proc = subprocess.Popen(generator_cmd, stdout=subprocess.PIPE)
assert generator_proc.stdout is not None
hcatProcess = subprocess.Popen(hashcat_cmd, stdin=generator_proc.stdout)
generator_proc.stdout.close()
try:
hcatProcess.wait()
generator_proc.wait()
except KeyboardInterrupt:
print("Killing PID {0}...".format(str(hcatProcess.pid)))
hcatProcess.kill()
generator_proc.kill()
hcatNgramXCount = lineCount(hcatHashFile + ".out") - hcatHashCracked
# Hybrid Attack
def hcatHybrid(hcatHashType, hcatHashFile, wordlists=None):
global hcatHybridCount
@@ -3399,6 +3473,9 @@ def combinator_3plus_crack():
return _attacks.combinator_3plus_crack(_attack_ctx())
def ngram_attack():
return _attacks.ngram_attack(_attack_ctx())
def combinator_submenu():
return _attacks.combinator_submenu(_attack_ctx())
@@ -3656,6 +3733,7 @@ def get_main_menu_items():
("16", "OMEN Attack"),
("17", "Ad-hoc Mask Attack"),
("18", "Markov Brute Force Attack"),
("19", "N-gram Attack"),
("90", "Download rules from Hashmob.net"),
("91", "Analyze Hashcat Rules"),
("92", "Download wordlists from Hashmob.net"),
@@ -3693,6 +3771,7 @@ def get_main_menu_options():
"16": omen_attack,
"17": adhoc_mask_crack,
"18": markov_brute_force,
"19": ngram_attack,
"90": lambda: download_hashmob_rules(rules_dir=rulesDirectory),
"91": analyze_rules,
"92": download_hashmob_wordlists,

149
tests/test_ngram_gzip.py Normal file
View File

@@ -0,0 +1,149 @@
import gzip
import os
from unittest.mock import MagicMock, patch
from hate_crack.attacks import ngram_attack
def _make_ctx(hash_type="1000", hash_file="/tmp/hashes.txt"):
ctx = MagicMock()
ctx.hcatHashType = hash_type
ctx.hcatHashFile = hash_file
return ctx
class TestNgramAttack:
def test_calls_hcatNgramX_with_corpus_and_group_size(self, tmp_path):
ctx = _make_ctx()
corpus = tmp_path / "corpus.txt"
corpus.write_text("password\nletmein\n")
ctx.select_file_with_autocomplete.return_value = str(corpus)
with patch("builtins.input", return_value="3"):
ngram_attack(ctx)
ctx.hcatNgramX.assert_called_once_with(
ctx.hcatHashType, ctx.hcatHashFile, str(corpus), 3
)
def test_default_group_size_is_3(self, tmp_path):
ctx = _make_ctx()
corpus = tmp_path / "corpus.txt"
corpus.write_text("password\n")
ctx.select_file_with_autocomplete.return_value = str(corpus)
with patch("builtins.input", return_value=""):
ngram_attack(ctx)
ctx.hcatNgramX.assert_called_once()
assert ctx.hcatNgramX.call_args[0][3] == 3
def test_invalid_group_size_defaults_to_3(self, tmp_path):
ctx = _make_ctx()
corpus = tmp_path / "corpus.txt"
corpus.write_text("password\n")
ctx.select_file_with_autocomplete.return_value = str(corpus)
with patch("builtins.input", return_value="abc"):
ngram_attack(ctx)
ctx.hcatNgramX.assert_called_once()
assert ctx.hcatNgramX.call_args[0][3] == 3
def test_aborts_when_no_corpus_selected(self):
ctx = _make_ctx()
ctx.select_file_with_autocomplete.return_value = None
ngram_attack(ctx)
ctx.hcatNgramX.assert_not_called()
def test_custom_group_size_passed_through(self, tmp_path):
ctx = _make_ctx()
corpus = tmp_path / "corpus.txt"
corpus.write_text("password\n")
ctx.select_file_with_autocomplete.return_value = str(corpus)
with patch("builtins.input", return_value="5"):
ngram_attack(ctx)
assert ctx.hcatNgramX.call_args[0][3] == 5
class TestIsGzipped:
def test_detects_gzip_file(self, tmp_path):
from hate_crack.main import _is_gzipped
gz_file = tmp_path / "test.txt.gz"
with gzip.open(str(gz_file), "wb") as f:
f.write(b"password\n")
assert _is_gzipped(str(gz_file)) is True
def test_plain_file_not_detected_as_gzip(self, tmp_path):
from hate_crack.main import _is_gzipped
plain = tmp_path / "test.txt"
plain.write_bytes(b"password\n")
assert _is_gzipped(str(plain)) is False
def test_missing_file_returns_false(self, tmp_path):
from hate_crack.main import _is_gzipped
assert _is_gzipped(str(tmp_path / "nonexistent.txt")) is False
def test_empty_file_returns_false(self, tmp_path):
from hate_crack.main import _is_gzipped
empty = tmp_path / "empty.txt"
empty.write_bytes(b"")
assert _is_gzipped(str(empty)) is False
class TestWordlistPath:
def test_plain_file_yields_original_path(self, tmp_path):
from hate_crack.main import _wordlist_path
plain = tmp_path / "words.txt"
plain.write_text("password\n")
with _wordlist_path(str(plain)) as result:
assert result == str(plain)
def test_gzip_file_yields_temp_file_with_content(self, tmp_path):
from hate_crack.main import _wordlist_path
gz_file = tmp_path / "words.txt.gz"
with gzip.open(str(gz_file), "wb") as f:
f.write(b"password\nletmein\n")
with _wordlist_path(str(gz_file)) as result:
assert result != str(gz_file)
assert os.path.isfile(result)
with open(result, "rb") as f:
assert f.read() == b"password\nletmein\n"
def test_gzip_temp_file_removed_after_context(self, tmp_path):
from hate_crack.main import _wordlist_path
gz_file = tmp_path / "words.txt.gz"
with gzip.open(str(gz_file), "wb") as f:
f.write(b"password\n")
with _wordlist_path(str(gz_file)) as result:
tmp_path_used = result
assert not os.path.exists(tmp_path_used)
def test_plain_file_not_deleted_after_context(self, tmp_path):
from hate_crack.main import _wordlist_path
plain = tmp_path / "words.txt"
plain.write_text("password\n")
with _wordlist_path(str(plain)) as result:
assert result == str(plain)
assert plain.exists()

View File

@@ -40,7 +40,7 @@ def main():
# Resolve binary paths relative to script location
script_dir = os.path.dirname(os.path.realpath(__file__))
ext = ".app" if sys.platform == "darwin" else ".bin"
ext = ".bin" if sys.platform == "darwin" else ".bin"
splitlen_bin = os.path.join(script_dir, "hashcat-utils", "bin", f"splitlen{ext}")
rli_bin = os.path.join(script_dir, "hashcat-utils", "bin", f"rli{ext}")