download wordlists testing

This commit is contained in:
Justin Bollinger
2026-02-05 17:36:46 -05:00
parent b16c3a1d26
commit b52d856d7b
4 changed files with 72 additions and 24 deletions

1
.gitignore vendored
View File

@@ -10,3 +10,4 @@ hate_crack.egg-info/
hate_crack/hashcat-utils/
hate_crack/princeprocessor/
*.txt.expanded
*.hcmask

View File

@@ -562,7 +562,7 @@ class HashviewAPI:
Return all hashfiles of a given hash_type using the /v1/hashfiles/hash_type/<hash_type> endpoint.
"""
url = f"{self.base_url}/v1/hashfiles/hash_type/{hash_type}"
resp = self.session.get(url)
resp = self.session.get(url, headers=self._auth_headers())
resp.raise_for_status()
try:
data = resp.json()
@@ -581,7 +581,7 @@ class HashviewAPI:
def get_hashfile_details(self, hashfile_id):
"""Get hashfile details and hashtype for a given hashfile_id."""
url = f"{self.base_url}/v1/hashfiles/{hashfile_id}/hash_type"
resp = self.session.get(url)
resp = self.session.get(url, headers=self._auth_headers())
resp.raise_for_status()
try:
data = resp.json()
@@ -624,7 +624,7 @@ class HashviewAPI:
def list_hashfiles(self):
url = f"{self.base_url}/v1/hashfiles"
resp = self.session.get(url)
resp = self.session.get(url, headers=self._auth_headers())
resp.raise_for_status()
data = resp.json()
if "hashfiles" in data:
@@ -795,7 +795,7 @@ class HashviewAPI:
import subprocess
url = f"{self.base_url}/v1/hashfiles/{hashfile_id}/left"
resp = self.session.get(url, stream=True)
resp = self.session.get(url, headers=self._auth_headers(), stream=True)
resp.raise_for_status()
if output_file is None:
output_file = f"left_{customer_id}_{hashfile_id}.txt"
@@ -832,7 +832,7 @@ class HashviewAPI:
try:
# Try to download the found file
found_url = f"{self.base_url}/v1/hashfiles/{hashfile_id}/found"
found_resp = self.session.get(found_url, stream=True, timeout=30)
found_resp = self.session.get(found_url, headers=self._auth_headers(), stream=True, timeout=30)
# Only proceed if we successfully downloaded the found file (ignore 404s)
if found_resp.status_code == 404:
@@ -967,7 +967,7 @@ class HashviewAPI:
import sys
url = f"{self.base_url}/v1/hashfiles/{hashfile_id}/found"
resp = self.session.get(url, stream=True)
resp = self.session.get(url, headers=self._auth_headers(), stream=True)
resp.raise_for_status()
if output_file is None:
output_file = f"found_{customer_id}_{hashfile_id}.txt"
@@ -1107,12 +1107,32 @@ class HashviewAPI:
def download_wordlist(self, wordlist_id, output_file=None):
import sys
import re
import gzip
import shutil
if int(wordlist_id) == 1:
update_url = f"{self.base_url}/v1/updateWordlist/{wordlist_id}"
try:
update_resp = self.session.get(update_url, headers=self._auth_headers(), timeout=30)
update_resp.raise_for_status()
except Exception as exc:
if self.debug:
print(f"Warning: failed to update dynamic wordlist {wordlist_id}: {exc}")
url = f"{self.base_url}/v1/wordlists/{wordlist_id}"
resp = self.session.get(url, stream=True)
resp = self.session.get(url, headers=self._auth_headers(), stream=True)
resp.raise_for_status()
if output_file is None:
output_file = f"wordlist_{wordlist_id}.gz"
if int(wordlist_id) == 1:
output_file = "dynamic-all.txt.gz"
else:
content_disp = resp.headers.get("content-disposition", "")
match = re.search(r"filename=\"?([^\";]+)\"?", content_disp, re.IGNORECASE)
if match:
output_file = os.path.basename(match.group(1))
else:
output_file = f"wordlist_{wordlist_id}.gz"
total = int(resp.headers.get("content-length", 0))
downloaded = 0
chunk_size = 8192
@@ -1133,6 +1153,7 @@ class HashviewAPI:
sys.stdout.write("\n")
if total == 0:
print(f"Downloaded {downloaded} bytes.")
return {"output_file": output_file, "size": downloaded}
def create_customer(self, name):

View File

@@ -1969,6 +1969,7 @@ def hashview_api():
# Download wordlist
try:
wordlists = api_harness.list_wordlists()
wordlist_map = {}
if wordlists:
print("\n" + "=" * 100)
print("Available Wordlists:")
@@ -1983,6 +1984,11 @@ def hashview_api():
if len(name) > 60:
name = name[:57] + "..."
print(f"{wl_id:<10} {name:<60} {wl_size:>12}")
if wl_id != "N/A":
try:
wordlist_map[int(wl_id)] = str(wl_name)
except ValueError:
pass
print("=" * 100)
else:
print("\nNo wordlists found.")
@@ -1996,12 +2002,18 @@ def hashview_api():
print("\n✗ Error: Invalid ID entered. Please enter a numeric ID.")
continue
api_name = wordlist_map.get(wordlist_id) if "wordlist_map" in locals() else None
api_filename = "dynamic-all.txt.gz" if wordlist_id == 1 else api_name
prompt_suffix = f" (API filename: {api_filename})" if api_filename else " (API filename)"
output_file = (
input(
"Enter output file name (default: wordlist_<id>.gz): "
).strip()
f"Enter output file name{prompt_suffix} or press Enter to use API filename: "
)
.strip()
or None
)
if output_file is None and wordlist_id == 1:
output_file = "dynamic-all.txt.gz"
try:
download_result = api_harness.download_wordlist(
wordlist_id, output_file
@@ -2956,20 +2968,7 @@ def main():
print("\nError: Hashview API key not configured.")
print("Please set 'hashview_api_key' in config.json")
sys.exit(1)
try:
hcatHashFile, hcatHashType = download_hashes_from_hashview(
hashview_url,
hashview_api_key,
debug_mode,
input_fn=input,
print_fn=print,
)
except ValueError:
print("\n✗ Error: Invalid ID entered. Please enter a numeric ID.")
sys.exit(1)
except Exception as e:
print(f"\n✗ Error downloading hashes: {str(e)}")
sys.exit(1)
hashview_api()
sys.exit(0)
if args.weakpass:

View File

@@ -246,6 +246,7 @@ class TestHashviewAPI:
mock_response.content = b"hash1\nhash2\n"
mock_response.raise_for_status = Mock()
mock_response.headers = {"content-length": "0"}
mock_response.status_code = 404 # For the found file lookup
def iter_content(chunk_size=8192):
yield mock_response.content
@@ -260,6 +261,14 @@ class TestHashviewAPI:
content = f.read()
assert content == b"hash1\nhash2\n"
assert result["size"] == len(content)
# Verify auth headers were passed in the left hashes download call
call_args_list = api.session.get.call_args_list
left_call = [c for c in call_args_list if "left" in str(c)][0]
assert left_call.kwargs.get("headers") is not None
auth_headers = left_call.kwargs.get("headers")
assert "Cookie" in auth_headers or "uuid" in str(auth_headers)
assert HASHVIEW_API_KEY in str(auth_headers)
def test_download_found_hashes(self, api, tmp_path):
"""Test downloading found hashes: real API if possible, else mock."""
@@ -298,6 +307,14 @@ class TestHashviewAPI:
content = f.read()
assert content == b"hash1:pass1\nhash2:pass2\n"
assert result["size"] == len(content)
# Verify auth headers were passed in the found hashes download call
call_args_list = api.session.get.call_args_list
found_call = [c for c in call_args_list if "found" in str(c)][0]
assert found_call.kwargs.get("headers") is not None
auth_headers = found_call.kwargs.get("headers")
assert "Cookie" in auth_headers or "uuid" in str(auth_headers)
assert HASHVIEW_API_KEY in str(auth_headers)
def test_download_wordlist(self, api, tmp_path):
"""Test downloading a wordlist: real API if possible, else mock."""
@@ -333,6 +350,16 @@ class TestHashviewAPI:
content = f.read()
assert content == b"gzipdata"
assert result["size"] == len(content)
# Verify auth headers were passed in the download call
# session.get should be called with headers containing the auth cookie
call_args_list = api.session.get.call_args_list
# Last call should be the download (not the update call for id 1)
download_call = [c for c in call_args_list if "wordlists/1" in str(c)][0]
assert download_call.kwargs.get("headers") is not None
auth_headers = download_call.kwargs.get("headers")
assert "Cookie" in auth_headers or "uuid" in str(auth_headers)
assert HASHVIEW_API_KEY in str(auth_headers)
@pytest.mark.skipif(
os.environ.get("HASHVIEW_TEST_REAL", "").lower() not in ("1", "true", "yes"),