fix: address QA review issues for NTLM preprocessing (#27, #28)

- Add type hints to _filter_computer_accounts and _dedup_netntlm_by_username
- Fix unclosed file handle when reading hash file for format detection
- Extract _count_computer_accounts helper to eliminate duplicate file reads
- Stream _filter_computer_accounts output instead of collecting in memory
- Only write .dedup file when duplicates actually exist
- Add tests for _count_computer_accounts, malformed lines, and no-file-on-zero-dupes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Justin Bollinger
2026-02-17 12:46:48 -05:00
parent e417e3d928
commit 53d42fbe96
2 changed files with 97 additions and 25 deletions

View File

@@ -916,7 +916,23 @@ def _write_field_sorted_unique(input_path, output_path, field_index, delimiter="
return False
def _filter_computer_accounts(input_path, output_path, delimiter=":"):
def _count_computer_accounts(input_path: str, delimiter: str = ":") -> int:
"""Count computer accounts (usernames ending with $) in a hash file."""
count = 0
try:
with open(input_path, "r", errors="replace") as src:
for line in src:
stripped = line.strip()
if stripped and stripped.split(delimiter, 1)[0].endswith("$"):
count += 1
except FileNotFoundError:
pass
return count
def _filter_computer_accounts(
input_path: str, output_path: str, delimiter: str = ":"
) -> int:
"""Filter out computer accounts (usernames ending with $) from a hash file.
Reads the input file, removes lines where the first field (username)
@@ -924,9 +940,10 @@ def _filter_computer_accounts(input_path, output_path, delimiter=":"):
Returns the number of computer accounts removed.
"""
removed = 0
kept_lines = []
try:
with open(input_path, "r", errors="replace") as src:
with open(input_path, "r", errors="replace") as src, open(
output_path, "w"
) as dst:
for line in src:
stripped = line.rstrip("\n")
if not stripped:
@@ -935,24 +952,24 @@ def _filter_computer_accounts(input_path, output_path, delimiter=":"):
if username.endswith("$"):
removed += 1
else:
kept_lines.append(stripped)
with open(output_path, "w") as dst:
for line in kept_lines:
dst.write(line + "\n")
dst.write(stripped + "\n")
except FileNotFoundError:
pass
return removed
def _dedup_netntlm_by_username(input_path, output_path, delimiter=":"):
def _dedup_netntlm_by_username(
input_path: str, output_path: str, delimiter: str = ":"
) -> tuple[int, int]:
"""Deduplicate NetNTLM hashes by username, keeping the first occurrence.
NetNTLM format: username::domain:challenge:response:blob
The username is the first field before the delimiter.
Only writes output_path when duplicates are found.
Returns a tuple of (total_lines, duplicates_removed).
"""
seen_usernames = set()
kept_lines = []
seen_usernames: set[str] = set()
kept_lines: list[str] = []
duplicates = 0
total = 0
try:
@@ -968,9 +985,10 @@ def _dedup_netntlm_by_username(input_path, output_path, delimiter=":"):
else:
seen_usernames.add(username)
kept_lines.append(stripped)
with open(output_path, "w") as dst:
for line in kept_lines:
dst.write(line + "\n")
if duplicates > 0:
with open(output_path, "w") as dst:
for line in kept_lines:
dst.write(line + "\n")
except FileNotFoundError:
pass
return total, duplicates
@@ -3650,16 +3668,13 @@ def main():
if hcatHashType == "1000":
lmHashesFound = False
pwdump_format = False
hcatHashFileLine = open(hcatHashFile, "r").readline().strip().lstrip("\ufeff")
with open(hcatHashFile, "r") as f:
hcatHashFileLine = f.readline().strip().lstrip("\ufeff")
if re.search(r"[a-f0-9A-F]{32}:[a-f0-9A-F]{32}:::", hcatHashFileLine):
pwdump_format = True
print("PWDUMP format detected...")
# Detect computer accounts (usernames ending with $)
computer_count = sum(
1
for line in open(hcatHashFile, "r", errors="replace")
if line.strip() and line.split(":", 1)[0].endswith("$")
)
computer_count = _count_computer_accounts(hcatHashFile)
if computer_count > 0:
print(
f"Detected {computer_count} computer account(s)"
@@ -3753,12 +3768,6 @@ def main():
os.remove(hcatHashFile + ".dedup")
except OSError:
pass
else:
# No duplicates found, clean up
try:
os.remove(hcatHashFile + ".dedup")
except OSError:
pass
# Check POT File for Already Cracked Hashes
if not os.path.isfile(hcatHashFile + ".out"):

View File

@@ -20,6 +20,36 @@ def main_module(monkeypatch):
return mod
class TestCountComputerAccounts:
"""Issue #27 - count computer accounts (helper for detection)."""
def test_counts_computer_accounts(self, tmp_path, main_module):
hash_file = tmp_path / "hashes.txt"
hash_file.write_text(
"user1:1001:aad3b435b51404ee:hash1:::\n"
"COMPUTER1$:1002:aad3b435b51404ee:hash2:::\n"
"user2:1003:aad3b435b51404ee:hash3:::\n"
"WORKSTATION$:1004:aad3b435b51404ee:hash4:::\n"
)
assert main_module._count_computer_accounts(str(hash_file)) == 2
def test_no_computer_accounts(self, tmp_path, main_module):
hash_file = tmp_path / "hashes.txt"
hash_file.write_text(
"user1:1001:aad3b435b51404ee:hash1:::\n"
"user2:1002:aad3b435b51404ee:hash2:::\n"
)
assert main_module._count_computer_accounts(str(hash_file)) == 0
def test_missing_file(self, tmp_path, main_module):
assert main_module._count_computer_accounts(str(tmp_path / "nope.txt")) == 0
def test_empty_file(self, tmp_path, main_module):
hash_file = tmp_path / "hashes.txt"
hash_file.write_text("")
assert main_module._count_computer_accounts(str(hash_file)) == 0
class TestFilterComputerAccounts:
"""Issue #27 - filter accounts ending with $."""
@@ -84,6 +114,21 @@ class TestFilterComputerAccounts:
)
assert removed == 0
def test_malformed_lines(self, tmp_path, main_module):
hash_file = tmp_path / "hashes.txt"
hash_file.write_text(
"user1:1001:aad3b435b51404ee:hash1:::\n"
"malformed_line_without_dollar\n"
"COMP$:1002:aad3b435b51404ee:hash2:::\n"
)
output_file = tmp_path / "filtered.txt"
removed = main_module._filter_computer_accounts(
str(hash_file), str(output_file)
)
assert removed == 1
lines = output_file.read_text().strip().split("\n")
assert len(lines) == 2
class TestDedupNetntlmByUsername:
"""Issue #28 - deduplicate NetNTLM hashes by username."""
@@ -138,6 +183,8 @@ class TestDedupNetntlmByUsername:
)
assert total == 2
assert duplicates == 0
# Output file should NOT be created when no duplicates exist
assert not output_file.exists()
def test_missing_file(self, tmp_path, main_module):
total, duplicates = main_module._dedup_netntlm_by_username(
@@ -156,3 +203,19 @@ class TestDedupNetntlmByUsername:
)
assert total == 0
assert duplicates == 0
assert not output_file.exists()
def test_malformed_lines(self, tmp_path, main_module):
hash_file = tmp_path / "netntlm.txt"
hash_file.write_text(
"user1::DOMAIN:challenge1:response1:blob1\n"
"malformed_line_without_colons\n"
"user2::DOMAIN:challenge2:response2:blob2\n"
)
output_file = tmp_path / "dedup.txt"
total, duplicates = main_module._dedup_netntlm_by_username(
str(hash_file), str(output_file)
)
assert total == 3
assert duplicates == 0
assert not output_file.exists()