refactor(api): consolidate streaming-download logic into shared helpers

Add _stream_response_to_file and _streamed_download helpers that
implement atomic .part-file writes, unified progress bars, and
KeyboardInterrupt cleanup in one place.  Add _with_hashmob_backoff
for bounded 429 retry logic (max 6 attempts, step-doubling delay)
and _Hashmob429 sentinel exception so callers signal rate-limits
without re-implementing the backoff loop.

Refactor all four callers onto the helpers:
- download_hashmob_wordlist: removes inline backoff loop and fixes
  the redirect recursion bug (redirect URL was passed as file_name,
  bypassing the API prefix logic)
- download_hashmob_rule: removes duplicate backoff loop; keeps the
  60-entry pinned URL dict and 404→alt_url fallback verbatim
- download_official_wordlist: delegates streaming to _streamed_download;
  keeps .7z extraction tail
- HashviewAPI.download_wordlist: uses session.get + _stream_response_to_file
  so cookie auth is preserved; removes the separate progress-bar
  implementation

All public signatures are unchanged; 40 existing tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Justin Bollinger
2026-04-24 23:30:25 -04:00
parent 20ae8f659c
commit 9ec7fefe1d

View File

@@ -37,6 +37,133 @@ class _RateLimiter:
_hashmob_limiter = _RateLimiter(rate=1, period=2.0)
class _Hashmob429(Exception):
"""Raised inside a _with_hashmob_backoff callback to trigger a backoff retry."""
def _stream_response_to_file(
r,
dest_path: str,
*,
label: str | None = None,
show_progress: bool = True,
) -> bool:
"""Write an already-opened streaming response to dest_path atomically via a .part file."""
temp_path = dest_path + ".part"
try:
total = 0
try:
total = int(r.headers.get("content-length") or 0)
except Exception:
pass
downloaded = 0
os.makedirs(os.path.dirname(os.path.abspath(dest_path)), exist_ok=True)
with open(temp_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if show_progress:
if total:
done = int(50 * downloaded / total)
percent = 100 * downloaded / total
bar = "=" * done + " " * (50 - done)
sys.stdout.write(
f"\r[{bar}] {percent:6.2f}% ({downloaded // 1024} KB/{total // 1024} KB)"
)
else:
sys.stdout.write(f"\rDownloaded {downloaded // 1024} KB")
sys.stdout.flush()
if show_progress:
sys.stdout.write("\n")
os.replace(temp_path, dest_path)
print(f"Downloaded {dest_path}")
return True
except KeyboardInterrupt:
print("\nKeyboard interrupt: Cleaning up partial download...")
if os.path.exists(temp_path):
try:
os.remove(temp_path)
print(f"Removed partial file: {temp_path}")
except Exception as e:
print(f"Failed to remove partial file: {e}")
raise
except Exception as e:
print(f"Error during download: {e}")
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except Exception:
pass
return False
def _streamed_download(
url: str,
dest_path: str,
*,
headers: dict | None = None,
label: str | None = None,
timeout: int = 120,
chunk_size: int = 8192,
show_progress: bool = True,
skip_existing: bool = False,
allow_redirects: bool = True,
) -> bool:
"""Download url to dest_path atomically, with optional progress bar.
Returns True on success, False on handled failure.
Re-raises KeyboardInterrupt after cleaning up the .part file.
"""
if skip_existing and os.path.isfile(dest_path) and os.path.getsize(dest_path) > 0:
name = label or os.path.basename(dest_path)
print(f"[i] Skipping {name} (already present)")
return True
try:
with requests.get(
url,
headers=headers or {},
stream=True,
timeout=timeout,
allow_redirects=allow_redirects,
) as r:
r.raise_for_status()
return _stream_response_to_file(r, dest_path, label=label, show_progress=show_progress)
except KeyboardInterrupt:
raise
except Exception as e:
print(f"Error downloading {label or url}: {e}")
return False
def _with_hashmob_backoff(
fn: Callable[[], bool],
*,
max_attempts: int = 6,
base_delay: int = 30,
step: int = 30,
max_delay: int = 300,
) -> bool:
"""Call fn() with bounded 429 backoff retry logic.
fn() should raise _Hashmob429 to signal a rate-limit response.
Non-429 exceptions are re-raised immediately.
Returns True on success, False after max_attempts consecutive 429s.
"""
penalty = base_delay
for attempt in range(max_attempts):
try:
result = fn()
return result
except _Hashmob429:
print(f"[!] Rate limit hit (429). Backing off for {penalty} seconds...")
time.sleep(penalty)
penalty = min(penalty + step, max_delay)
step *= 2
print(f"[!] Hashmob rate limit: gave up after {max_attempts} attempts.")
return False
def _get_hate_path():
_package_path = os.path.dirname(os.path.realpath(__file__))
_repo_root = os.path.dirname(_package_path)
@@ -1019,7 +1146,6 @@ class HashviewAPI:
def download_wordlist(
self, wordlist_id, output_file=None, *, update_dynamic: bool = False
):
import sys
import re
if int(wordlist_id) == 1 and update_dynamic:
@@ -1038,6 +1164,7 @@ class HashviewAPI:
url = f"{self.base_url}/v1/wordlists/{wordlist_id}"
resp = self.session.get(url, headers=self._auth_headers(), stream=True)
resp.raise_for_status()
if output_file is None:
if int(wordlist_id) == 1:
output_file = "dynamic-all.txt.gz"
@@ -1046,37 +1173,20 @@ class HashviewAPI:
match = re.search(
r"filename=\"?([^\";]+)\"?", content_disp, re.IGNORECASE
)
if match:
output_file = os.path.basename(match.group(1))
else:
output_file = f"wordlist_{wordlist_id}.gz"
output_file = (
os.path.basename(match.group(1)) if match else f"wordlist_{wordlist_id}.gz"
)
if not os.path.isabs(output_file):
dest_dir = get_hcat_wordlists_dir()
output_file = os.path.join(dest_dir, output_file)
os.makedirs(os.path.dirname(output_file), exist_ok=True)
total = int(resp.headers.get("content-length", 0))
downloaded = 0
chunk_size = 8192
with open(output_file, "wb") as f:
for chunk in resp.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total > 0:
done = int(50 * downloaded / total)
bar = "[" + "=" * done + " " * (50 - done) + "]"
percent = 100 * downloaded / total
sys.stdout.write(
f"\rDownloading: {bar} {percent:5.1f}% ({downloaded}/{total} bytes)"
)
sys.stdout.flush()
if total > 0:
sys.stdout.write("\n")
if total == 0:
print(f"Downloaded {downloaded} bytes.")
return {"output_file": output_file, "size": downloaded}
ok = _stream_response_to_file(resp, output_file, label=output_file)
resp.close()
if ok:
return {"output_file": output_file, "size": os.path.getsize(output_file)}
return {"output_file": output_file, "size": 0}
def create_customer(self, name):
url = f"{self.base_url}/v1/customers/add"
@@ -1334,78 +1444,42 @@ def download_hashmob_wordlist_list():
def download_hashmob_wordlist(file_name, out_path):
"""Download a wordlist file from Hashmob by file name."""
import re
url = f"https://hashmob.net/api/v2/downloads/research/wordlists/{file_name}"
api_key = get_hashmob_api_key()
headers = {"api-key": api_key} if api_key else {}
base_backoff = 30
max_backoff = 300
penalty_add = 30
penalty = base_backoff
while True:
_hashmob_limiter.wait()
try:
with requests.get(
url, headers=headers, stream=True, timeout=60, allow_redirects=True
) as r:
if r.status_code == 429:
print(
f"[!] Rate limit hit (429). Backing off for {penalty} seconds..."
)
time.sleep(penalty)
penalty = min(penalty + penalty_add, max_backoff)
penalty_add *= 2
continue
if r.status_code in (301, 302, 303, 307, 308):
redirect_url = r.headers.get("Location")
if redirect_url:
print(f"Following redirect to: {redirect_url}")
return download_hashmob_wordlist(redirect_url, out_path)
print("Redirect with no Location header!")
return False
r.raise_for_status()
content_type = r.headers.get("Content-Type", "")
if "text/plain" in content_type:
html = r.content.decode(errors="replace")
import re
match = re.search(
r"<meta[^>]+http-equiv=['\"]refresh['\"][^>]+content=['\"]0;url=([^'\"]+)['\"]",
html,
re.IGNORECASE,
)
if match:
real_url = match.group(1)
print(f"Found meta refresh redirect to: {real_url}")
with requests.get(real_url, stream=True, timeout=120) as r2:
r2.raise_for_status()
with open(out_path, "wb") as f:
for chunk in r2.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Downloaded {out_path}")
return True
print(
"Error: Received HTML instead of file. Possible permission or quota issue."
)
return False
with open(out_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Downloaded {out_path}")
return True
except Exception as e:
if (
hasattr(e, "response")
and getattr(e.response, "status_code", None) == 429
):
print(f"[!] Rate limit hit (429). Backing off for {penalty} seconds...")
time.sleep(penalty)
penalty = min(penalty + penalty_add, max_backoff)
penalty_add *= 2
continue
print(f"Error downloading wordlist: {e}")
return False
def _attempt():
_hashmob_limiter.wait()
with requests.get(
url, headers=headers, stream=True, timeout=60, allow_redirects=True
) as r:
if r.status_code == 429:
raise _Hashmob429()
r.raise_for_status()
content_type = r.headers.get("Content-Type", "")
if "text/plain" in content_type:
html = r.content.decode(errors="replace")
match = re.search(
r"<meta[^>]+http-equiv=['\"]refresh['\"][^>]+content=['\"]0;url=([^'\"]+)['\"]",
html,
re.IGNORECASE,
)
if match:
real_url = match.group(1)
print(f"Found meta refresh redirect to: {real_url}")
return _streamed_download(real_url, out_path, label=file_name)
print("Error: Received HTML instead of file. Possible permission or quota issue.")
return False
# Normal binary download — re-request now that we've confirmed the URL is good
return _streamed_download(url, out_path, headers=headers, label=file_name)
try:
return _with_hashmob_backoff(_attempt)
except Exception as e:
print(f"Error downloading wordlist: {e}")
return False
def download_hashmob_rule_list():
@@ -1502,79 +1576,42 @@ def download_hashmob_rule(file_name, out_path):
"HashMob._100.rule": "https://www.hashmob.net/api/v2/downloads/research/rules/HashMob._100.rule",
"HashMob._66.rule": "https://www.hashmob.net/api/v2/downloads/research/rules/HashMob._66.rule",
}
url = hashmob_rule_urls.get(file_name)
if not url:
primary_url = hashmob_rule_urls.get(file_name)
if not primary_url:
print(
f"[i] Hashmob rule not in pinned URL list, using public prefix: {file_name}"
)
url = f"https://www.hashmob.net/api/v2/downloads/research/rules/{file_name}"
primary_url = f"https://www.hashmob.net/api/v2/downloads/research/rules/{file_name}"
alt_url = f"https://hashmob.net/api/v2/downloads/research/official/hashmob_rules/{file_name}"
api_key = get_hashmob_api_key()
headers = {"api-key": api_key} if api_key else {}
base_backoff = 30
max_backoff = 300
penalty_add = 30
penalty = base_backoff
while True:
def _attempt():
_hashmob_limiter.wait()
try:
with requests.get(
url, headers=headers, stream=True, timeout=60, allow_redirects=True
) as r:
if r.status_code == 429:
print(
f"[!] Rate limit hit (429). Backing off for {penalty} seconds..."
)
time.sleep(penalty)
penalty = min(penalty + penalty_add, max_backoff)
penalty_add *= 2
continue
if r.status_code == 404 and alt_url:
print(
f"[i] Hashmob rule not found at primary URL, trying fallback: {alt_url}"
)
with requests.get(
alt_url,
headers=headers,
stream=True,
timeout=60,
allow_redirects=True,
) as r_alt:
if r_alt.status_code == 429:
print(
f"[!] Rate limit hit (429). Backing off for {penalty} seconds..."
)
time.sleep(penalty)
penalty = min(penalty + penalty_add, max_backoff)
penalty_add *= 2
continue
r_alt.raise_for_status()
with open(out_path, "wb") as f:
for chunk in r_alt.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Downloaded {out_path}")
return True
r.raise_for_status()
with open(out_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Downloaded {out_path}")
return True
except Exception as e:
# If it's a 429 error, handle backoff, else fail
if (
hasattr(e, "response")
and getattr(e.response, "status_code", None) == 429
):
print(f"[!] Rate limit hit (429). Backing off for {penalty} seconds...")
time.sleep(penalty)
penalty = min(penalty + penalty_add, max_backoff)
penalty_add *= 2
continue
print(f"Error downloading rule: {e}")
return False
with requests.get(
primary_url, headers=headers, stream=True, timeout=60, allow_redirects=True
) as r:
if r.status_code == 429:
raise _Hashmob429()
if r.status_code == 404 and alt_url:
print(
f"[i] Hashmob rule not found at primary URL, trying fallback: {alt_url}"
)
with requests.get(
alt_url, headers=headers, stream=True, timeout=60, allow_redirects=True
) as r_alt:
if r_alt.status_code == 429:
raise _Hashmob429()
r_alt.raise_for_status()
return _streamed_download(alt_url, out_path, headers=headers, label=file_name)
r.raise_for_status()
return _streamed_download(primary_url, out_path, headers=headers, label=file_name)
try:
return _with_hashmob_backoff(_attempt)
except Exception as e:
print(f"Error downloading rule: {e}")
return False
def list_official_wordlists():
@@ -1802,63 +1839,19 @@ def list_and_download_hashmob_rules(rules_dir=None):
def download_official_wordlist(file_name, out_path):
"""Download a file from the official wordlists directory with a progress bar."""
import sys
url = f"https://hashmob.net/api/v2/downloads/research/official/{file_name}"
archive_path = None
try:
with requests.get(url, stream=True, timeout=120) as r:
r.raise_for_status()
try:
total = int(r.headers.get("content-length") or 0)
except Exception:
total = 0
downloaded = 0
chunk_size = 8192
out_path = sanitize_filename(file_name)
dest_dir = get_hcat_wordlists_dir()
archive_path = (
os.path.join(dest_dir, out_path)
if not os.path.isabs(out_path)
else out_path
)
temp_path = archive_path + ".part"
os.makedirs(os.path.dirname(archive_path), exist_ok=True)
with open(temp_path, "wb") as f:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total:
done = int(50 * downloaded / total)
percent = 100 * downloaded / total
bar = "=" * done + " " * (50 - done)
sys.stdout.write(
f"\r[{bar}] {percent:6.2f}% ({downloaded // 1024} KB/{total // 1024} KB)"
)
sys.stdout.flush()
else:
sys.stdout.write(f"\rDownloaded {downloaded // 1024} KB")
sys.stdout.flush()
sys.stdout.write("\n")
os.replace(temp_path, archive_path)
print(f"Downloaded {archive_path}")
if archive_path.endswith(".7z"):
extract_with_7z(archive_path)
return True
except KeyboardInterrupt:
print("\nKeyboard interrupt: Cleaning up partial download...")
temp_path = f"{archive_path}.part" if archive_path else None
if temp_path and os.path.exists(temp_path):
try:
os.remove(temp_path)
print(f"Removed partial file: {temp_path}")
except Exception as e:
print(f"Failed to remove partial file: {e}")
return False
except Exception as e:
print(f"Error downloading official wordlist: {e}")
return False
out_path = sanitize_filename(file_name)
dest_dir = get_hcat_wordlists_dir()
archive_path = (
os.path.join(dest_dir, out_path)
if not os.path.isabs(out_path)
else out_path
)
os.makedirs(os.path.dirname(archive_path), exist_ok=True)
ok = _streamed_download(url, archive_path, label=file_name)
if ok and archive_path.endswith(".7z"):
extract_with_7z(archive_path)
return ok
def extract_with_7z(archive_path, output_dir=None, remove_archive=True):