major refactor and version change to 2.0

This commit is contained in:
Justin Bollinger
2026-01-26 13:32:37 -05:00
parent d36c5457b5
commit 01f4e55e85
23 changed files with 2409 additions and 1246 deletions

View File

@@ -1,13 +1,13 @@
# Test Mocking Summary
# Testing Guide
## Overview
All Hashview API tests have been updated to use mocked responses instead of real API calls. This allows tests to run in CI/CD environments (like GitHub Actions) without requiring connectivity to a Hashview server or actual API credentials.
The test suite uses mocked API responses and local fixtures so it can run without external services (Hashview, Hashmob, Weakpass). Most tests are fast and run entirely offline.
## Changes Made
### 1. Updated Test Files
### 1. Test Files (Current)
**test_hashview.py** (consolidated test suite)
**tests/test_hashview.py** (mocked Hashview API tests)
- Added `unittest.mock` imports (Mock, patch, MagicMock)
- Removed dependency on config.json file
- Replaced all real API calls with mocked responses
@@ -18,6 +18,23 @@ All Hashview API tests have been updated to use mocked responses instead of real
- Hashfile upload
- Complete job creation workflow
**tests/test_hate_crack_utils.py**
- Unit tests for utility helpers (session id generation, line counts, path resolution, hex conversion)
- Uses `HATE_CRACK_SKIP_INIT=1` to avoid heavy dependency checks
**tests/test_menu_snapshots.py**
- Snapshot-based tests for menu output text
- Uses fixtures in `tests/fixtures/menu_outputs/`
**tests/test_dependencies.py**
- Checks local tool availability (7z, transmission-cli)
**tests/test_module_imports.py**
- Ensures core modules import cleanly (`hashview`, `hashmob_wordlist`, `weakpass`, `cli`, `api`, `attacks`)
**tests/test_hashmob_connectivity.py**
- Mocked Hashmob API connectivity test
### 2. Key Mock Patterns
```python
@@ -32,33 +49,25 @@ mock_response.raise_for_status = Mock()
api.session.get.return_value = mock_response
```
### 3. GitHub Actions Workflow
### 3. Documentation
Created `.github/workflows/tests.yml` to automatically run tests on:
- Push to main/master/develop branches
- Pull requests to main/master/develop branches
- Tests run against Python 3.9, 3.10, 3.11, and 3.12
### 4. Documentation
Updated readme.md with:
Updated `readme.md` with:
- Testing section explaining how to run tests locally
- Description of test structure
- Information about CI/CD integration
## Test Results
6 tests passing
⚡ Tests run in ~0.1 seconds (vs ~20 seconds with real API calls)
25 tests passing
⚡ Tests run in <1 second on a typical dev machine
### Test Coverage
1. **test_list_customers_success** - Validates customer listing with multiple customers
2. **test_list_customers_returns_valid_data** - Validates customer data structure
3. **test_connection_and_auth** - Tests successful authentication
4. **test_invalid_api_key_fails** - Tests authentication failure handling
5. **test_upload_hashfile** - Tests hashfile upload functionality
6. **test_create_job_workflow** - Tests complete end-to-end job creation workflow
Highlights:
1. Hashview API workflows (list customers, upload hashfile, create jobs, download left hashes)
2. Utility helpers (sanitize session ids, line count, path resolution, hex conversion)
3. Menu output snapshots
4. Hashmob connectivity (mocked)
5. Module import sanity checks
## Benefits
@@ -78,10 +87,10 @@ pip install pytest pytest-mock requests
pytest -v
# Run specific test
pytest test_hashview.py -v
pytest tests/test_hashview.py -v
# Run a specific test method
pytest test_hashview.py::TestHashviewAPI::test_create_job_workflow -v
pytest tests/test_hashview.py::TestHashviewAPI::test_create_job_workflow -v
```
## Note on Real API Testing

File diff suppressed because it is too large Load Diff

1
hate_crack/__init__.py Normal file
View File

@@ -0,0 +1 @@
# hate_crack package

89
hate_crack/api.py Normal file
View File

@@ -0,0 +1,89 @@
import json
import os
from typing import Callable, Tuple
from hate_crack.hashview import HashviewAPI
from hate_crack.hashmob_wordlist import list_and_download_official_wordlists
def download_hashes_from_hashview(
hashview_url: str,
hashview_api_key: str,
debug_mode: bool,
input_fn: Callable[[str], str] = input,
print_fn: Callable[..., None] = print,
) -> Tuple[str, str]:
"""Interactive Hashview download flow used by CLI."""
api_harness = HashviewAPI(hashview_url, hashview_api_key, debug=debug_mode)
result = api_harness.list_customers()
if 'customers' in result and result['customers']:
api_harness.display_customers_multicolumn(result['customers'])
customer_id = int(input_fn("\nEnter customer ID: "))
try:
customer_hashfiles = api_harness.get_customer_hashfiles(customer_id)
if customer_hashfiles:
print_fn("\n" + "=" * 100)
print_fn(f"Hashfiles for Customer ID {customer_id}:")
print_fn("=" * 100)
print_fn(f"{'ID':<10} {'Name':<88}")
print_fn("-" * 100)
for hf in customer_hashfiles:
hf_id = hf.get('id', 'N/A')
hf_name = hf.get('name', 'N/A')
if len(str(hf_name)) > 88:
hf_name = str(hf_name)[:85] + "..."
print_fn(f"{hf_id:<10} {hf_name:<88}")
print_fn("=" * 100)
print_fn(f"Total: {len(customer_hashfiles)} hashfile(s)")
else:
print_fn(f"\nNo hashfiles found for customer ID {customer_id}")
except Exception as exc:
print_fn(f"\nWarning: Could not list hashfiles: {exc}")
print_fn("You may need to manually find the hashfile ID in the web interface.")
hashfile_id = int(input_fn("\nEnter hashfile ID: "))
hcat_hash_type = "1000"
output_file = f"left_{customer_id}_{hashfile_id}.txt"
download_result = api_harness.download_left_hashes(customer_id, hashfile_id, output_file)
print_fn(f"\n✓ Success: Downloaded {download_result['size']} bytes")
print_fn(f" File: {download_result['output_file']}")
hcat_hash_file = download_result['output_file']
print_fn("\nNow starting hate_crack with:")
print_fn(f" Hash file: {hcat_hash_file}")
print_fn(f" Hash type: {hcat_hash_type}")
return hcat_hash_file, hcat_hash_type
def download_hashmob_wordlists(print_fn=print) -> None:
"""Download official Hashmob wordlists."""
list_and_download_official_wordlists()
print_fn("Hashmob wordlist download complete.")
def download_weakpass_torrent(download_torrent, filename: str, print_fn=print) -> None:
"""Download a single Weakpass torrent file by name or URL."""
print_fn(f"[i] Downloading: {filename}")
download_torrent(filename)
def download_all_weakpass_torrents(
fetch_all_wordlists,
download_torrent,
print_fn=print,
cache_path: str = "weakpass_wordlists.json",
) -> None:
"""Download all Weakpass torrents from a cached wordlist JSON."""
if not os.path.exists(cache_path):
print_fn("[i] weakpass_wordlists.json not found, fetching wordlist cache...")
fetch_all_wordlists()
try:
with open(cache_path, "r", encoding="utf-8") as f:
all_wordlists = json.load(f)
except Exception as exc:
print_fn(f"Failed to load local wordlist cache: {exc}")
raise
torrents = [wl['torrent_url'] for wl in all_wordlists if wl.get('torrent_url')]
print_fn(f"[i] Downloading {len(torrents)} torrents...")
for tfile in torrents:
print_fn(f"[i] Downloading: {tfile}")
download_torrent(tfile)
print_fn("[i] All torrents processed.")

235
hate_crack/attacks.py Normal file
View File

@@ -0,0 +1,235 @@
import glob
import os
import readline
from typing import Any
def _configure_readline(completer):
readline.set_completer_delims(' \t\n;')
try:
readline.parse_and_bind("set completion-query-items -1")
except Exception:
pass
try:
readline.parse_and_bind("tab: complete")
except Exception:
pass
try:
readline.parse_and_bind("bind ^I rl_complete")
except Exception:
pass
readline.set_completer(completer)
def quick_crack(ctx: Any) -> None:
wordlist_choice = None
rule_choice = None
selected_hcatRules = []
wordlist_files = sorted(os.listdir(ctx.hcatWordlists))
print("\nWordlists:")
for i, file in enumerate(wordlist_files, start=1):
print(f"{i}. {file}")
def path_completer(text, state):
if not text:
text = './'
text = os.path.expanduser(text)
if text.startswith('/') or text.startswith('./') or text.startswith('../') or text.startswith('~'):
matches = glob.glob(text + '*')
else:
matches = glob.glob('./' + text + '*')
matches = [m[2:] if m.startswith('./') else m for m in matches]
matches = [m + '/' if os.path.isdir(m) else m for m in matches]
try:
return matches[state]
except IndexError:
return None
_configure_readline(path_completer)
while wordlist_choice is None:
try:
raw_choice = input(
"\nEnter path of wordlist or wordlist directory (tab to autocomplete).\n"
f"Press Enter for default optimized wordlists [{ctx.hcatOptimizedWordlists}]: "
)
if raw_choice == '':
wordlist_choice = ctx.hcatOptimizedWordlists
elif os.path.exists(raw_choice):
wordlist_choice = raw_choice
elif 1 <= int(raw_choice) <= len(wordlist_files):
if os.path.exists(ctx.hcatWordlists + '/' + wordlist_files[int(raw_choice) - 1]):
wordlist_choice = ctx.hcatWordlists + '/' + wordlist_files[int(raw_choice) - 1]
print(wordlist_choice)
else:
wordlist_choice = None
print('Please enter a valid wordlist or wordlist directory.')
except ValueError:
print("Please enter a valid number.")
rule_files = sorted(os.listdir(ctx.hcatPath + '/rules'))
print("\nWhich rule(s) would you like to run?")
print('0. To run without any rules')
for i, file in enumerate(rule_files, start=1):
print(f"{i}. {file}")
print('99. YOLO...run all of the rules')
while rule_choice is None:
raw_choice = input(
'Enter Comma separated list of rules you would like to run. To run rules chained use the + symbol.\n'
f'For example 1+1 will run {rule_files[0]} chained twice and 1,2 would run {rule_files[0]} and then {rule_files[1]} sequentially.\n'
'Choose wisely: '
)
if raw_choice != '':
rule_choice = raw_choice.split(',')
if '99' in rule_choice:
for rule in rule_files:
selected_hcatRules.append(f"-r {ctx.hcatPath}/rules/{rule}")
elif '0' in rule_choice:
selected_hcatRules = ['']
else:
for choice in rule_choice:
if '+' in choice:
combined_choice = ''
choices = choice.split('+')
for rule in choices:
try:
combined_choice = f"{combined_choice} -r {ctx.hcatPath}/rules/{rule_files[int(rule) - 1]}"
except Exception:
continue
selected_hcatRules.append(combined_choice)
else:
try:
selected_hcatRules.append(f"-r {ctx.hcatPath}/rules/{rule_files[int(choice) - 1]}")
except IndexError:
continue
for chain in selected_hcatRules:
ctx.hcatQuickDictionary(ctx.hcatHashType, ctx.hcatHashFile, chain, wordlist_choice)
def extensive_crack(ctx: Any) -> None:
ctx.hcatBruteForce(ctx.hcatHashType, ctx.hcatHashFile, "1", "7")
ctx.hcatRecycle(ctx.hcatHashType, ctx.hcatHashFile, ctx.hcatBruteCount)
ctx.hcatDictionary(ctx.hcatHashType, ctx.hcatHashFile)
ctx.hcatRecycle(ctx.hcatHashType, ctx.hcatHashFile, ctx.hcatDictionaryCount)
hcatTargetTime = 4 * 60 * 60
ctx.hcatTopMask(ctx.hcatHashType, ctx.hcatHashFile, hcatTargetTime)
ctx.hcatRecycle(ctx.hcatHashType, ctx.hcatHashFile, ctx.hcatMaskCount)
ctx.hcatFingerprint(ctx.hcatHashType, ctx.hcatHashFile)
ctx.hcatRecycle(ctx.hcatHashType, ctx.hcatHashFile, ctx.hcatFingerprintCount)
ctx.hcatCombination(ctx.hcatHashType, ctx.hcatHashFile)
ctx.hcatRecycle(ctx.hcatHashType, ctx.hcatHashFile, ctx.hcatCombinationCount)
ctx.hcatHybrid(ctx.hcatHashType, ctx.hcatHashFile)
ctx.hcatRecycle(ctx.hcatHashType, ctx.hcatHashFile, ctx.hcatHybridCount)
ctx.hcatGoodMeasure(ctx.hcatHashType, ctx.hcatHashFile)
ctx.hcatRecycle(ctx.hcatHashType, ctx.hcatHashFile, ctx.hcatExtraCount)
def brute_force_crack(ctx: Any) -> None:
hcatMinLen = int(input("\nEnter the minimum password length to brute force (1): ") or 1)
hcatMaxLen = int(input("\nEnter the maximum password length to brute force (7): ") or 7)
ctx.hcatBruteForce(ctx.hcatHashType, ctx.hcatHashFile, hcatMinLen, hcatMaxLen)
def top_mask_crack(ctx: Any) -> None:
hcatTargetTime = int(input("\nEnter a target time for completion in hours (4): ") or 4)
hcatTargetTime = hcatTargetTime * 60 * 60
ctx.hcatTopMask(ctx.hcatHashType, ctx.hcatHashFile, hcatTargetTime)
def fingerprint_crack(ctx: Any) -> None:
ctx.hcatFingerprint(ctx.hcatHashType, ctx.hcatHashFile)
def combinator_crack(ctx: Any) -> None:
ctx.hcatCombination(ctx.hcatHashType, ctx.hcatHashFile)
def hybrid_crack(ctx: Any) -> None:
print("\n" + "=" * 60)
print("HYBRID ATTACK")
print("=" * 60)
print("This attack combines wordlists with masks to generate candidates.")
print("Examples:")
print(" - Mode 6: wordlist + mask (e.g., 'password' + '123')")
print(" - Mode 7: mask + wordlist (e.g., '123' + 'password')")
print("=" * 60)
use_default = input("\nUse default hybrid wordlist from config? (Y/n): ").strip().lower()
if use_default != 'n':
print("\nUsing default wordlist(s) from config:")
if isinstance(ctx.hcatHybridlist, list):
for wl in ctx.hcatHybridlist:
print(f" - {wl}")
wordlists = ctx.hcatHybridlist
else:
print(f" - {ctx.hcatHybridlist}")
wordlists = [ctx.hcatHybridlist]
else:
print("\nSelect wordlist(s) for hybrid attack.")
print("You can enter:")
print(" - A single file path")
print(" - Multiple paths separated by commas")
print(" - Press TAB to autocomplete file paths")
selection = ctx.select_file_with_autocomplete(
"Enter wordlist file(s) (comma-separated for multiple)",
allow_multiple=True
)
if not selection:
print("No wordlist selected. Aborting hybrid attack.")
return
if isinstance(selection, str):
wordlists = [selection]
else:
wordlists = selection
valid_wordlists = []
for wl in wordlists:
if os.path.isfile(wl):
valid_wordlists.append(wl)
print(f"✓ Found: {wl}")
else:
print(f"✗ Not found: {wl}")
if not valid_wordlists:
print("\nNo valid wordlists found. Aborting hybrid attack.")
return
wordlists = valid_wordlists
print(f"\nStarting hybrid attack with {len(wordlists)} wordlist(s)...")
print(f"Hash type: {ctx.hcatHashType}")
print(f"Hash file: {ctx.hcatHashFile}")
ctx.hcatHybrid(ctx.hcatHashType, ctx.hcatHashFile, wordlists)
def pathwell_crack(ctx: Any) -> None:
ctx.hcatPathwellBruteForce(ctx.hcatHashType, ctx.hcatHashFile)
def prince_attack(ctx: Any) -> None:
ctx.hcatPrince(ctx.hcatHashType, ctx.hcatHashFile)
def yolo_combination(ctx: Any) -> None:
ctx.hcatYoloCombination(ctx.hcatHashType, ctx.hcatHashFile)
def thorough_combinator(ctx: Any) -> None:
ctx.hcatThoroughCombinator(ctx.hcatHashType, ctx.hcatHashFile)
def middle_combinator(ctx: Any) -> None:
ctx.hcatMiddleCombinator(ctx.hcatHashType, ctx.hcatHashFile)
def bandrel_method(ctx: Any) -> None:
ctx.hcatBandrel(ctx.hcatHashType, ctx.hcatHashFile)

55
hate_crack/cli.py Normal file
View File

@@ -0,0 +1,55 @@
import logging
import os
from typing import Optional
def resolve_path(value: Optional[str]) -> Optional[str]:
"""Expand user and return an absolute path, or None."""
if not value:
return None
return os.path.abspath(os.path.expanduser(value))
def apply_config_overrides(args, config):
"""Apply CLI config overrides to the provided config object."""
if args.hashview_url:
config.hashview_url = args.hashview_url
if args.hashview_api_key is not None:
config.hashview_api_key = args.hashview_api_key
if args.hcat_path:
config.hcatPath = resolve_path(args.hcat_path)
if args.hcat_bin:
config.hcatBin = args.hcat_bin
if args.wordlists_dir:
config.hcatWordlists = resolve_path(args.wordlists_dir)
if args.optimized_wordlists_dir:
config.hcatOptimizedWordlists = resolve_path(args.optimized_wordlists_dir)
if args.pipal_path:
config.pipalPath = resolve_path(args.pipal_path)
if args.maxruntime is not None:
config.maxruntime = args.maxruntime
if args.bandrel_basewords:
config.bandrelbasewords = args.bandrel_basewords
def add_common_args(parser) -> None:
parser.add_argument('--hashview-url', dest='hashview_url', help='Override Hashview URL')
parser.add_argument('--hashview-api-key', dest='hashview_api_key', help='Override Hashview API key')
parser.add_argument('--hcat-path', dest='hcat_path', help='Override hashcat path')
parser.add_argument('--hcat-bin', dest='hcat_bin', help='Override hashcat binary name')
parser.add_argument('--wordlists-dir', dest='wordlists_dir', help='Override wordlists directory')
parser.add_argument('--optimized-wordlists-dir', dest='optimized_wordlists_dir', help='Override optimized wordlists directory')
parser.add_argument('--pipal-path', dest='pipal_path', help='Override pipal path')
parser.add_argument('--maxruntime', type=int, help='Override max runtime setting')
parser.add_argument('--bandrel-basewords', dest='bandrel_basewords', help='Override bandrel basewords setting')
def setup_logging(logger: logging.Logger, hate_path: str, debug_mode: bool) -> None:
if not debug_mode:
return
logger.setLevel(logging.DEBUG)
if not any(isinstance(h, logging.FileHandler) for h in logger.handlers):
log_path = os.path.join(hate_path, "hate_crack.log")
file_handler = logging.FileHandler(log_path)
file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
logger.addHandler(file_handler)

View File

@@ -0,0 +1,283 @@
import json
import os
import requests
def sanitize_filename(filename):
"""Sanitize a filename by replacing spaces and removing problematic characters."""
import re
filename = filename.replace(' ', '_')
filename = re.sub(r'[^A-Za-z0-9._-]', '', filename)
return filename
def get_api_key():
"""Return hashmob_api_key from config.json in package or project root."""
pkg_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(pkg_dir, os.pardir))
for cfg in (os.path.join(pkg_dir, 'config.json'), os.path.join(project_root, 'config.json')):
if os.path.isfile(cfg):
try:
with open(cfg) as f:
config = json.load(f)
key = config.get('hashmob_api_key')
if key:
return key
except Exception:
continue
return None
def get_hcat_wordlists_dir():
"""Return the configured `hcatWordlists` directory from config.json.
Checks both the package directory and the project root for config.json.
Falls back to project_root/wordlists when not configured.
"""
pkg_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(pkg_dir, os.pardir))
candidates = [
os.path.join(pkg_dir, 'config.json'),
os.path.join(project_root, 'config.json')
]
default = os.path.join(project_root, 'wordlists')
for config_path in candidates:
try:
if os.path.isfile(config_path):
with open(config_path) as f:
config = json.load(f)
path = config.get('hcatWordlists')
if path:
path = os.path.expanduser(path)
if not os.path.isabs(path):
path = os.path.join(project_root, path)
os.makedirs(path, exist_ok=True)
return path
except Exception:
continue
os.makedirs(default, exist_ok=True)
return default
def download_hashmob_wordlist_list():
"""Fetch available wordlists from Hashmob API v2 and print them."""
url = "https://hashmob.net/api/v2/resource"
api_key = get_api_key()
headers = {"api-key": api_key} if api_key else {}
try:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
wordlists = [r for r in data if r.get('type') == 'wordlist']
print("Available Hashmob Wordlists:")
for idx, wl in enumerate(wordlists):
print(f"{idx+1}. {wl.get('name', wl.get('file_name', ''))} - {wl.get('information', '')}")
return wordlists
except Exception as e:
print(f"Error fetching Hashmob wordlists: {e}")
return []
def download_hashmob_wordlist(file_name, out_path):
"""Download a wordlist file from Hashmob by file name."""
url = f"https://hashmob.net/api/v2/downloads/research/wordlists/{file_name}"
api_key = get_api_key()
headers = {"api-key": api_key} if api_key else {}
try:
with requests.get(url, headers=headers, stream=True, timeout=60, allow_redirects=True) as r:
if r.status_code in (301, 302, 303, 307, 308):
redirect_url = r.headers.get('Location')
if redirect_url:
print(f"Following redirect to: {redirect_url}")
return download_hashmob_wordlist(redirect_url, out_path)
print("Redirect with no Location header!")
return False
r.raise_for_status()
content_type = r.headers.get('Content-Type', '')
if 'text/plain' in content_type:
html = r.content.decode(errors='replace')
import re
match = re.search(
r"<meta[^>]+http-equiv=['\"]refresh['\"][^>]+content=['\"]0;url=([^'\"]+)['\"]",
html,
re.IGNORECASE
)
if match:
real_url = match.group(1)
print(f"Found meta refresh redirect to: {real_url}")
with requests.get(real_url, stream=True, timeout=120) as r2:
r2.raise_for_status()
with open(out_path, 'wb') as f:
for chunk in r2.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Downloaded {out_path}")
return True
print("Error: Received HTML instead of file. Possible permission or quota issue.")
return False
with open(out_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Downloaded {out_path}")
return True
except Exception as e:
print(f"Error downloading wordlist: {e}")
return False
def list_official_wordlists():
"""List files in the official wordlists directory via the Hashmob API."""
url = "https://hashmob.net/api/v2/downloads/research/official/"
api_key = get_api_key()
headers = {"api-key": api_key} if api_key else {}
try:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
try:
data = resp.json()
print("Official Hashmob Wordlists (JSON):")
for idx, entry in enumerate(data):
print(f"{idx+1}. {entry}")
return data
except Exception:
print("Official Hashmob Wordlists (raw text):")
print(resp.text)
return resp.text
except Exception as e:
print(f"Error listing official wordlists: {e}")
return []
def list_and_download_official_wordlists():
"""List files in the official wordlists directory via the Hashmob API, prompt for selection, and download."""
url = "https://hashmob.net/api/v2/downloads/research/official/"
api_key = get_api_key()
headers = {"api-key": api_key} if api_key else {}
try:
resp = requests.get(url, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, list):
print("Unexpected response format. Raw output:")
print(data)
return
print("Official Hashmob Wordlists:")
for idx, entry in enumerate(data):
name = entry.get('name', entry.get('file_name', str(entry)))
file_name = entry.get('file_name', name)
info = entry.get('information', '')
print(f"{idx+1}. {name} ({file_name}) - {info}")
print("a. Download ALL files")
sel = input("Enter the number of the wordlist to download, or 'a' for all, or 'q' to quit: ")
if sel.lower() == 'q':
return
if sel.lower() == 'a':
for entry in data:
file_name = entry.get('file_name')
if not file_name:
print("No file_name found for an entry, skipping.")
continue
out_path = entry.get('name', file_name)
if download_official_wordlist(file_name, out_path):
dest_dir = get_hcat_wordlists_dir()
archive_path = os.path.join(dest_dir, out_path) if not os.path.isabs(out_path) else out_path
if archive_path.endswith('.7z'):
extract_with_7z(archive_path)
return
try:
idx = int(sel) - 1
if idx < 0 or idx >= len(data):
print("Invalid selection.")
return
file_name = data[idx].get('file_name')
if not file_name:
print("No file_name found for selection.")
return
out_path = data[idx].get('name', file_name)
if download_official_wordlist(file_name, out_path):
dest_dir = get_hcat_wordlists_dir()
archive_path = os.path.join(dest_dir, out_path) if not os.path.isabs(out_path) else out_path
if archive_path.endswith('.7z'):
extract_with_7z(archive_path)
except Exception as e:
print(f"Error: {e}")
except Exception as e:
print(f"Error listing official wordlists: {e}")
def download_official_wordlist(file_name, out_path):
"""Download a file from the official wordlists directory with a progress bar."""
import sys
url = f"https://hashmob.net/api/v2/downloads/research/official/{file_name}"
api_key = get_api_key()
headers = {"api-key": api_key} if api_key else {}
try:
with requests.get(url, headers=headers, stream=True, timeout=120) as r:
r.raise_for_status()
try:
total = int(r.headers.get('content-length') or 0)
except Exception:
total = 0
downloaded = 0
chunk_size = 8192
out_path = sanitize_filename(out_path)
dest_dir = get_hcat_wordlists_dir()
archive_path = os.path.join(dest_dir, out_path) if not os.path.isabs(out_path) else out_path
os.makedirs(os.path.dirname(archive_path), exist_ok=True)
with open(archive_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total:
done = int(50 * downloaded / total)
percent = 100 * downloaded / total
bar = '=' * done + ' ' * (50 - done)
sys.stdout.write(
f"\r[{bar}] {percent:6.2f}% ({downloaded // 1024} KB/{total // 1024} KB)"
)
sys.stdout.flush()
else:
sys.stdout.write(f"\rDownloaded {downloaded // 1024} KB")
sys.stdout.flush()
sys.stdout.write("\n")
print(f"Downloaded {archive_path}")
if archive_path.endswith('.7z'):
extract_with_7z(archive_path)
return True
except Exception as e:
print(f"Error downloading official wordlist: {e}")
return False
def extract_with_7z(archive_path, output_dir=None):
"""Extract a .7z archive using the 7z or 7za command."""
import shutil
import subprocess
if output_dir is None:
output_dir = os.path.splitext(archive_path)[0]
os.makedirs(output_dir, exist_ok=True)
sevenz_bin = shutil.which('7z') or shutil.which('7za')
if not sevenz_bin:
print("[!] 7z or 7za not found in PATH. Please install p7zip-full or 7-zip to extract archives.")
return False
try:
print(f"Extracting {archive_path} to {output_dir} ...")
result = subprocess.run(
[sevenz_bin, 'x', '-y', archive_path, f'-o{output_dir}'],
capture_output=True,
text=True
)
print(result.stdout)
if result.returncode == 0:
print(f"[+] Extraction complete: {archive_path}")
return True
print(f"[!] Extraction failed for {archive_path}: {result.stderr}")
return False
except Exception as e:
print(f"[!] Error extracting {archive_path}: {e}")
return False

168
hate_crack/hashview.py Normal file
View File

@@ -0,0 +1,168 @@
"""
hashview_api.py
Modularized Hashview API integration (class only).
"""
import os
import json
import requests
# Hashview Integration - Real API implementation matching hate_crack.py
class HashviewAPI:
"""Hashview API integration for uploading/downloading hashfiles, wordlists, jobs, and customers."""
FILE_FORMATS = {
'pwdump': 0,
'netntlm': 1,
'kerberos': 2,
'shadow': 3,
'user:hash': 4,
'hash_only': 5,
}
def __init__(self, base_url, api_key, debug=False):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.debug = debug
self.session = requests.Session()
self.session.cookies.set('uuid', api_key)
self.session.verify = False
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def list_customers(self):
url = f"{self.base_url}/v1/customers"
resp = self.session.get(url)
resp.raise_for_status()
data = resp.json()
if 'users' in data:
customers = json.loads(data['users'])
return {'customers': customers}
return data
def list_hashfiles(self):
url = f"{self.base_url}/v1/hashfiles"
resp = self.session.get(url)
resp.raise_for_status()
data = resp.json()
if 'hashfiles' in data:
if isinstance(data['hashfiles'], str):
hashfiles = json.loads(data['hashfiles'])
else:
hashfiles = data['hashfiles']
return hashfiles
return []
def get_customer_hashfiles(self, customer_id):
all_hashfiles = self.list_hashfiles()
return [hf for hf in all_hashfiles if int(hf.get('customer_id', 0)) == customer_id]
def display_customers_multicolumn(self, customers):
if not customers:
print("\nNo customers found.")
return
try:
terminal_width = os.get_terminal_size().columns
except:
terminal_width = 120
max_id_len = max(len(str(c.get('id', ''))) for c in customers)
col_width = max_id_len + 2 + 30 + 2
num_cols = max(1, terminal_width // col_width)
print("\n" + "="*terminal_width)
print("Available Customers:")
print("="*terminal_width)
num_customers = len(customers)
rows = (num_customers + num_cols - 1) // num_cols
for row in range(rows):
line_parts = []
for col in range(num_cols):
idx = row + col * rows
if idx < num_customers:
customer = customers[idx]
cust_id = customer.get('id', 'N/A')
cust_name = customer.get('name', 'N/A')
name_width = col_width - max_id_len - 2 - 2
if len(str(cust_name)) > name_width:
cust_name = str(cust_name)[:name_width-3] + "..."
entry = f"{cust_id}: {cust_name}"
line_parts.append(entry.ljust(col_width))
print("".join(line_parts).rstrip())
print("="*terminal_width)
print(f"Total: {len(customers)} customer(s)")
def upload_hashfile(self, file_path, customer_id, hash_type, file_format=5, hashfile_name=None):
if hashfile_name is None:
hashfile_name = os.path.basename(file_path)
with open(file_path, 'rb') as f:
file_content = f.read()
url = (
f"{self.base_url}/v1/hashfiles/upload/"
f"{customer_id}/{file_format}/{hash_type}/{hashfile_name}"
)
headers = {'Content-Type': 'text/plain'}
resp = self.session.post(url, data=file_content, headers=headers)
resp.raise_for_status()
return resp.json()
def create_job(self, name, hashfile_id, customer_id, limit_recovered=False, notify_email=True):
url = f"{self.base_url}/v1/jobs/add"
headers = {'Content-Type': 'application/json'}
data = {
"name": name,
"hashfile_id": hashfile_id,
"customer_id": customer_id,
}
resp = self.session.post(url, json=data, headers=headers)
resp.raise_for_status()
return resp.json()
def download_left_hashes(self, customer_id, hashfile_id, output_file=None):
url = f"{self.base_url}/v1/hashfiles/{hashfile_id}"
resp = self.session.get(url)
resp.raise_for_status()
if output_file is None:
output_file = f"left_{customer_id}_{hashfile_id}.txt"
with open(output_file, 'wb') as f:
f.write(resp.content)
return {'output_file': output_file, 'size': len(resp.content)}
def upload_cracked_hashes(self, file_path, hash_type='1000'):
valid_lines = []
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
line = line.strip()
if '31d6cfe0d16ae931b73c59d7e0c089c0' in line:
continue
if not line or ':' not in line:
continue
parts = line.split(':', 1)
if len(parts) != 2:
break
hash_value = parts[0].strip()
plaintext = parts[1].strip()
valid_lines.append(f"{hash_value}:{plaintext}")
converted_content = '\n'.join(valid_lines)
url = f"{self.base_url}/v1/hashes/import/{hash_type}"
headers = {'Content-Type': 'text/plain'}
resp = self.session.post(url, data=converted_content, headers=headers)
resp.raise_for_status()
try:
json_response = resp.json()
if 'type' in json_response and json_response['type'] == 'Error':
raise Exception(f"Hashview API Error: {json_response.get('msg', 'Unknown error')}")
return json_response
except (json.JSONDecodeError, ValueError):
raise Exception(f"Invalid API response: {resp.text[:200]}")
def create_customer(self, name):
url = f"{self.base_url}/v1/customers/add"
headers = {'Content-Type': 'application/json'}
data = {"name": name}
resp = self.session.post(url, json=data, headers=headers)
resp.raise_for_status()
return resp.json()

37
hate_crack/hate_crack.py Normal file
View File

@@ -0,0 +1,37 @@
#!/usr/bin/env python3
import os
import sys
def _resolve_root():
return os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "hate_crack.py"))
def _load_root_module():
root_path = _resolve_root()
if not os.path.isfile(root_path):
raise FileNotFoundError(f"Root hate_crack.py not found at {root_path}")
import importlib.util
spec = importlib.util.spec_from_file_location("hate_crack_root", root_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
_ROOT = _load_root_module()
for _name, _value in _ROOT.__dict__.items():
if _name.startswith("__") and _name not in {"__all__", "__doc__", "__name__", "__package__", "__loader__", "__spec__"}:
continue
globals().setdefault(_name, _value)
def cli_main():
if hasattr(_ROOT, "cli_main"):
return _ROOT.cli_main()
if hasattr(_ROOT, "main"):
return _ROOT.main()
raise AttributeError("Root hate_crack.py has no cli_main or main")
if __name__ == "__main__":
sys.exit(cli_main())

338
hate_crack/weakpass.py Normal file
View File

@@ -0,0 +1,338 @@
def check_7z():
"""
Check if 7z (or 7za) is installed in PATH. Print instructions if missing.
Returns True if found, False otherwise.
"""
import shutil
if shutil.which('7z') or shutil.which('7za'):
return True
print("\n[!] 7z (or 7za) is missing.")
print("To install on macOS: brew install p7zip")
print("To install on Ubuntu/Debian: sudo apt-get install p7zip-full")
print("Please install 7z and try again.")
return False
def check_transmission_cli():
"""
Check if transmission-cli is installed in PATH. Print instructions if missing.
Returns True if found, False otherwise.
"""
import shutil
if shutil.which('transmission-cli'):
return True
print("\n[!] transmission-cli is missing.")
print("To install on macOS: brew install transmission-cli")
print("To install on Ubuntu/Debian: sudo apt-get install transmission-cli")
print("Please install transmission-cli and try again.")
return False
"""
weakpass.py
Modularized Weakpass integration functions.
"""
import os
import threading
from queue import Queue
import requests
from bs4 import BeautifulSoup
import json
import shutil
def get_hcat_wordlists_dir():
"""Return the configured `hcatWordlists` directory from config.json.
Looks for config.json in:
1) The package directory (hate_crack/config.json)
2) The project root (parent of package)
Falls back to './wordlists' within the project root if not found.
"""
pkg_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(pkg_dir, os.pardir))
candidates = [
os.path.join(pkg_dir, 'config.json'),
os.path.join(project_root, 'config.json')
]
default = os.path.join(project_root, 'wordlists')
for config_path in candidates:
try:
if os.path.isfile(config_path):
with open(config_path) as f:
config = json.load(f)
path = config.get('hcatWordlists')
if path:
path = os.path.expanduser(path)
if not os.path.isabs(path):
path = os.path.join(project_root, path)
os.makedirs(path, exist_ok=True)
return path
except Exception:
continue
os.makedirs(default, exist_ok=True)
return default
def fetch_all_weakpass_wordlists_multithreaded(total_pages=67, threads=10, output_file="weakpass_wordlists.json"):
"""Fetch all Weakpass wordlist pages in parallel using threads and save to a local JSON file."""
wordlists = []
lock = threading.Lock()
q = Queue()
headers = {"User-Agent": "Mozilla/5.0"}
def worker():
while True:
page = q.get()
if page is None:
break
try:
url = f"https://weakpass.com/wordlists?page={page}"
r = requests.get(url, headers=headers, timeout=30)
soup = BeautifulSoup(r.text, "html.parser")
app_div = soup.find("div", id="app")
if not app_div or not app_div.has_attr("data-page"):
q.task_done()
continue
data_page_val = app_div["data-page"]
if not isinstance(data_page_val, str):
data_page_val = str(data_page_val)
data = json.loads(data_page_val)
wordlists_data = data.get("props", {}).get("wordlists", {})
if isinstance(wordlists_data, dict) and 'data' in wordlists_data:
wordlists_data = wordlists_data['data']
with lock:
for wl in wordlists_data:
wordlists.append({
"name": wl.get("name", ""),
"size": wl.get("size", ""),
"rank": wl.get("rank", ""),
"downloads": wl.get("downloaded", ""),
"torrent_url": wl.get("torrent_link", "")
})
except Exception as e:
print(f"Error fetching page {page}: {e}")
q.task_done()
for page in range(1, total_pages + 1):
q.put(page)
threads_list = []
for _ in range(threads):
t = threading.Thread(target=worker)
t.start()
threads_list.append(t)
q.join()
for _ in range(threads):
q.put(None)
for t in threads_list:
t.join()
seen = set()
unique_wordlists = []
for wl in wordlists:
if wl['name'] not in seen:
unique_wordlists.append(wl)
seen.add(wl['name'])
with open(output_file, "w", encoding="utf-8") as f:
json.dump(unique_wordlists, f, indent=2)
print(f"Saved {len(unique_wordlists)} wordlists to {output_file}")
def download_torrent_file(torrent_url, save_dir=None):
# Use configured hcat wordlists directory by default
if not save_dir:
save_dir = get_hcat_wordlists_dir()
else:
save_dir = os.path.expanduser(save_dir)
if not os.path.isabs(save_dir):
save_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), save_dir)
os.makedirs(save_dir, exist_ok=True)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
if not torrent_url.startswith("http"):
filename = torrent_url
else:
filename = torrent_url.split("/")[-1]
wordlist_base = filename.replace('.torrent', '').replace('.7z', '').replace('.txt', '')
wordlist_uri = f"https://weakpass.com/wordlists/{wordlist_base}"
print(f"[+] Fetching wordlist page: {wordlist_uri}")
r = requests.get(wordlist_uri, headers=headers)
if r.status_code != 200:
print(f"[!] Failed to fetch wordlist page: {wordlist_uri}")
return None
soup = BeautifulSoup(r.text, "html.parser")
app_div = soup.find("div", id="app")
if not app_div or not app_div.has_attr("data-page"):
print(f"[!] Could not find app data on {wordlist_uri}")
return None
data_page_val = app_div["data-page"]
if not isinstance(data_page_val, str):
data_page_val = str(data_page_val)
data_page_val = data_page_val.replace('&quot;', '"')
try:
data = json.loads(data_page_val)
wordlist = data.get('props', {}).get('wordlist')
wordlist_id = None
torrent_link_from_data = None
if wordlist:
wordlist_id = wordlist.get('id')
torrent_link_from_data = wordlist.get('torrent_link')
else:
wordlists = data.get('props', {}).get('wordlists')
if isinstance(wordlists, dict) and 'data' in wordlists:
wordlists = wordlists['data']
if isinstance(wordlists, list):
for wl in wordlists:
if wl.get('torrent_link') == filename or wl.get('name') == filename:
wordlist_id = wl.get('id')
torrent_link_from_data = wl.get('torrent_link')
break
if wordlist_base in wl.get('name', ''):
wordlist_id = wl.get('id')
torrent_link_from_data = wl.get('torrent_link')
break
except Exception as e:
print(f"[!] Failed to parse data-page JSON: {e}")
return None
if not (torrent_link_from_data and wordlist_id):
print(f"[!] No torrent link or id found in wordlist data for {filename}.")
return None
if not torrent_link_from_data.startswith('http'):
torrent_link = f"https://weakpass.com/download/{wordlist_id}/{torrent_link_from_data}"
else:
torrent_link = torrent_link_from_data
print(f"[+] Downloading .torrent file from: {torrent_link}")
r2 = requests.get(torrent_link, headers=headers, stream=True)
content_type = r2.headers.get("Content-Type", "")
local_filename = os.path.join(save_dir, filename if filename.endswith('.torrent') else filename + '.torrent')
if r2.status_code == 200 and not content_type.startswith("text/html"):
with open(local_filename, 'wb') as f:
for chunk in r2.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Saved to {local_filename}")
else:
print(f"Failed to download a valid torrent file: {torrent_link}")
try:
html = r2.content.decode(errors="replace")
print("--- Begin HTML Debug Output ---")
print(html[:2000])
print("--- End HTML Debug Output ---")
except Exception as e:
print(f"Could not decode response for debug: {e}")
return None
if shutil.which("transmission-cli") is None:
print("[ERROR] transmission-cli is not installed or not in your PATH.")
print("Please install it with: brew install transmission-cli (on macOS) or your package manager.")
print(f"Torrent file saved at {local_filename}, but download will not start until transmission-cli is available.")
return local_filename
def run_transmission(torrent_file, output_dir):
import subprocess
import glob
print(f"Starting transmission-cli for {torrent_file}...")
try:
proc = subprocess.Popen([
"transmission-cli",
"-w", output_dir,
torrent_file
], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True)
if proc.stdout is not None:
for line in proc.stdout:
print(line, end='')
proc.wait()
if proc.returncode != 0:
print(f"transmission-cli failed for {torrent_file} (exit {proc.returncode})")
return
else:
print(f"Download complete for {torrent_file}")
sevenz_files = glob.glob(os.path.join(output_dir, '*.7z'))
if not sevenz_files:
print("[i] No .7z files found to extract.")
return
for zfile in sevenz_files:
print(f"[+] Extracting {zfile} ...")
sevenz_bin = shutil.which('7z') or shutil.which('7za')
if not sevenz_bin:
print("[!] 7z or 7za not found in PATH. Please install p7zip-full or 7-zip to extract archives.")
continue
try:
extract_result = subprocess.run([
sevenz_bin, 'x', '-y', zfile, f'-o{output_dir}'
], capture_output=True, text=True)
print(extract_result.stdout)
if extract_result.returncode == 0:
print(f"[+] Extraction complete: {zfile}")
else:
print(f"[!] Extraction failed for {zfile}: {extract_result.stderr}")
except Exception as e:
print(f"[!] Error extracting {zfile}: {e}")
except Exception as e:
print(f"Error running transmission-cli: {e}")
t = threading.Thread(target=run_transmission, args=(local_filename, save_dir))
t.start()
print(f"transmission-cli launched in background for {local_filename}")
return local_filename
def weakpass_wordlist_menu():
fetch_all_weakpass_wordlists_multithreaded()
try:
with open("weakpass_wordlists.json", "r", encoding="utf-8") as f:
all_wordlists = json.load(f)
except Exception as e:
print(f"Failed to load local wordlist cache: {e}")
return
page = 0
batch_size = 100
while True:
filtered_wordlists = [wl for wl in all_wordlists if str(wl.get('rank', '')) == '7']
start_idx = page * batch_size
end_idx = start_idx + batch_size
page_wordlists = filtered_wordlists[start_idx:end_idx]
if not page_wordlists:
print("No more wordlists.")
if page > 0:
page -= 1
continue
col_width = 45
cols = 3
print("\nEach entry shows: [number]. [wordlist name] [effectiveness score] [rank]")
print(f"Available Wordlists (Batch {page+1}):")
rows = (len(page_wordlists) + cols - 1) // cols
lines = [''] * rows
for idx, wl in enumerate(page_wordlists):
col = idx // rows
row = idx % rows
effectiveness = wl.get('effectiveness', wl.get('downloads', ''))
rank = wl.get('rank', '')
entry = f"{start_idx+idx+1:3d}. {wl['name'][:25]:<25} {effectiveness:<8} {rank:<2}"
lines[row] += entry.ljust(col_width)
for line in lines:
print(line)
sel = input("\nEnter the number to download, 'n' for next batch, 'p' for previous, or 'q' to cancel: ")
if sel.lower() == 'q':
print("Returning to menu...")
return
if sel.lower() == 'n':
page += 1
continue
if sel.lower() == 'p' and page > 0:
page -= 1
continue
try:
sel_idx = int(sel) - 1 - start_idx
if 0 <= sel_idx < len(page_wordlists):
torrent_url = page_wordlists[sel_idx]['torrent_url']
download_torrent_file(torrent_url)
else:
print("Invalid selection.")
except Exception as e:
print(f"Error: {e}")

View File

@@ -6,4 +6,7 @@ readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"pytest>=8.3.4",
"requests>=2.31.0",
"beautifulsoup4>=4.12.0",
"ruff>=0.9.4",
]

View File

@@ -20,18 +20,47 @@ make install
```git clone https://github.com/trustedsec/hate_crack.git```
* Customize binary and wordlist paths in "config.json"
* Make sure that at least "rockyou.txt" is within your "wordlists" path
### Create Optimized Wordlists
wordlist_optimizer.py - parses all wordlists from `<input file list>`, sorts them by length and de-duplicates into `<output directory>`
```$ python wordlist_optimizer.py
usage: python wordlist_optimizer.py <input file list> <output directory>
$ python wordlist_optimizer.py wordlists.txt ../optimized_wordlists
```
-------------------------------------------------------------------
## Project Structure
Core logic is now split into modules under `hate_crack/`:
- `hate_crack/cli.py`: argparse helpers and config overrides.
- `hate_crack/api.py`: Hashview + Weakpass + Hashmob flows.
- `hate_crack/attacks.py`: menu attack handlers.
- `hate_crack/hashview.py`: Hashview API client.
- `hate_crack/hashmob_wordlist.py`: Hashmob wordlist utilities.
- `hate_crack/weakpass.py`: Weakpass wordlist utilities.
The top-level `hate_crack.py` remains the main entry point and orchestrates these modules.
-------------------------------------------------------------------
## Usage
`$ ./hate_crack.py
usage: python hate_crack.py <hash_file> <hash_type>`
`$ ./hate_crack.py`
```
usage: python hate_crack.py <hash_file> <hash_type> [options]
```
Common options:
- `--task <N>`: Run a specific menu task number (e.g., 1, 93).
- `--download-hashview`: Download hashes from Hashview before cracking.
- `--weakpass`: Download wordlists from Weakpass.
- `--hashmob`: Download wordlists from Hashmob.net.
- `--download-torrent <FILENAME>`: Download a specific Weakpass torrent file.
- `--download-all-torrents`: Download all available Weakpass torrents from cache.
- `--hashview-url <URL>` / `--hashview-api-key <KEY>`: Override Hashview settings.
- `--hcat-path <PATH>` / `--hcat-bin <BIN>`: Override hashcat path/binary.
- `--wordlists-dir <PATH>` / `--optimized-wordlists-dir <PATH>`: Override wordlist directories.
- `--pipal-path <PATH>`: Override pipal path.
- `--maxruntime <SECONDS>`: Override max runtime.
- `--bandrel-basewords <PATH>`: Override bandrel basewords file.
- `--debug`: Enable debug logging (writes `hate_crack.log` in repo root).
The <hash_type> is attained by running `hashcat --help`
@@ -72,12 +101,12 @@ pip install pytest pytest-mock requests
pytest -v
# Run specific test
pytest test_hashview.py -v
pytest tests/test_hashview.py -v
```
### Test Structure
- **test_hashview.py**: Comprehensive test suite for HashviewAPI class with mocked API responses, including:
- **tests/test_hashview.py**: Comprehensive test suite for HashviewAPI class with mocked API responses, including:
- Customer listing and data validation
- Authentication and authorization tests
- Hashfile upload functionality

View File

@@ -1,284 +0,0 @@
"""
Tests for Hashview integration - Mocked API calls for CI/CD
"""
import pytest
import sys
import os
import json
import tempfile
from unittest.mock import Mock, patch, MagicMock
# Add the parent directory to the path to import hate_crack
sys.path.insert(0, os.path.dirname(__file__))
from hate_crack import HashviewAPI
# Test configuration - these are mock values, not real credentials
HASHVIEW_URL = 'https://hashview.example.com'
HASHVIEW_API_KEY = 'test-api-key-123'
class TestHashviewAPI:
"""Test suite for HashviewAPI class with mocked API calls"""
@pytest.fixture
def api(self):
"""Create a HashviewAPI instance with mocked session"""
with patch('hate_crack.requests.Session') as mock_session_class:
api = HashviewAPI(
base_url=HASHVIEW_URL,
api_key=HASHVIEW_API_KEY
)
# Replace the session with a mock
api.session = MagicMock()
yield api
@pytest.fixture
def test_hashfile(self):
"""Create a temporary test hashfile with NTLM hashes"""
test_hashes = [
"8846f7eaee8fb117ad06bdd830b7586c", # password (NTLM)
"e19ccf75ee54e06b06a5907af13cef42", # 123456 (NTLM)
"5835048ce94ad0564e29a924a03510ef", # 12345678 (NTLM)
]
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
hashfile_path = f.name
for hash_val in test_hashes:
f.write(hash_val + '\n')
yield hashfile_path
# Cleanup
if os.path.exists(hashfile_path):
os.unlink(hashfile_path)
def test_list_customers_success(self, api):
"""Test successful customer listing with mocked API call"""
# Mock the response - API returns 'users' as a JSON string
mock_response = Mock()
mock_response.json.return_value = {
'users': json.dumps([
{'id': 1, 'name': 'Test Customer 1', 'description': 'Test description 1'},
{'id': 2, 'name': 'Test Customer 2', 'description': 'Test description 2'}
])
}
mock_response.raise_for_status = Mock()
api.session.get.return_value = mock_response
# Make API call
result = api.list_customers()
# Assertions
assert result is not None
assert 'customers' in result
assert isinstance(result['customers'], list)
assert len(result['customers']) == 2
# Print results for visibility
print(f"\nFound {len(result['customers'])} customers:")
for customer in result['customers']:
print(f" ID: {customer.get('id')}, Name: {customer.get('name')}, Description: {customer.get('description', 'N/A')}")
def test_list_customers_returns_valid_data(self, api):
"""Test that customer data has expected structure"""
# Mock the response - API returns 'users' as a JSON string
mock_response = Mock()
mock_response.json.return_value = {
'users': json.dumps([
{'id': 1, 'name': 'Test Customer', 'description': 'Test'}
])
}
mock_response.raise_for_status = Mock()
api.session.get.return_value = mock_response
result = api.list_customers()
assert 'customers' in result
# If there are customers, validate structure
if result['customers']:
for customer in result['customers']:
assert 'id' in customer
assert 'name' in customer
# Description is optional
def test_connection_and_auth(self, api):
"""Test that we can connect and authenticate"""
# Mock successful response - API returns 'users' as a JSON string
mock_response = Mock()
mock_response.json.return_value = {
'users': json.dumps([
{'id': 1, 'name': 'Test Customer'}
])
}
mock_response.raise_for_status = Mock()
api.session.get.return_value = mock_response
result = api.list_customers()
assert result is not None
# Valid response should have 'customers' key
assert 'customers' in result, "Valid authentication should return customers data"
print(f"\n✓ Successfully connected to {HASHVIEW_URL}")
print(f"✓ Authentication successful")
def test_invalid_api_key_fails(self):
"""Test that an invalid API key results in authentication failure"""
with patch('hate_crack.requests.Session') as mock_session_class:
# Create API instance with invalid API key
invalid_api = HashviewAPI(
base_url=HASHVIEW_URL,
api_key="invalid-api-key-123-this-should-fail"
)
# Mock error response
mock_session = MagicMock()
mock_response = Mock()
mock_response.json.return_value = {
'type': 'Error',
'msg': 'You are not authorized to perform this action',
'status': 401
}
mock_response.raise_for_status = Mock()
mock_session.get.return_value = mock_response
invalid_api.session = mock_session
# Attempt to list customers with invalid key
result = invalid_api.list_customers()
# API returns 200 but with error message in response body
assert result is not None
assert 'type' in result
assert result['type'] == 'Error'
assert 'msg' in result
assert 'not authorized' in result['msg'].lower()
print(f"\n✓ Invalid API key correctly rejected")
print(f" Error message: {result['msg']}")
def test_upload_hashfile(self, api, test_hashfile):
"""Test uploading a hashfile to Hashview"""
print("\n[Test] Uploading hashfile...")
# Mock list_customers response - API returns 'users' as a JSON string
mock_customers_response = Mock()
mock_customers_response.json.return_value = {
'users': json.dumps([{'id': 1, 'name': 'Test Customer'}])
}
mock_customers_response.raise_for_status = Mock()
# Mock upload_hashfile response
mock_upload_response = Mock()
mock_upload_response.json.return_value = {
'hashfile_id': 4567,
'msg': 'Hashfile added'
}
mock_upload_response.raise_for_status = Mock()
# Set up session mock to return different responses
api.session.get.return_value = mock_customers_response
api.session.post.return_value = mock_upload_response
# Get first customer
customers_result = api.list_customers()
customer_id = customers_result['customers'][0]['id']
# Upload hashfile
hash_type = 1000 # NTLM
file_format = 5 # hash_only
hashfile_name = "test_hashfile_automated"
upload_result = api.upload_hashfile(
test_hashfile,
customer_id,
hash_type,
file_format,
hashfile_name
)
assert upload_result is not None, "No upload result returned"
assert 'hashfile_id' in upload_result, "No hashfile_id returned"
print(f" ✓ Hashfile uploaded successfully")
print(f" ✓ Hashfile ID: {upload_result['hashfile_id']}")
def test_create_job_workflow(self, api, test_hashfile):
"""Test creating a job in Hashview (option 2 complete workflow)"""
print("\n" + "="*60)
print("Testing Option 2: Create Job Workflow")
print("="*60)
# Mock responses for different endpoints - API returns 'users' as a JSON string
mock_customers_response = Mock()
mock_customers_response.json.return_value = {
'users': json.dumps([{'id': 1, 'name': 'Test Customer'}])
}
mock_customers_response.raise_for_status = Mock()
mock_upload_response = Mock()
mock_upload_response.json.return_value = {
'hashfile_id': 4567,
'msg': 'Hashfile added'
}
mock_upload_response.raise_for_status = Mock()
mock_job_response = Mock()
mock_job_response.json.return_value = {
'job_id': 789,
'msg': 'Job added'
}
mock_job_response.raise_for_status = Mock()
# Configure session mock
api.session.get.return_value = mock_customers_response
api.session.post.side_effect = [mock_upload_response, mock_job_response]
# Step 1: Get test customer
print("\n[Step 1] Getting test customer...")
customers_result = api.list_customers()
test_customer = customers_result['customers'][0]
customer_id = test_customer['id']
print(f" ✓ Using customer ID: {customer_id} ({test_customer['name']})")
# Step 2: Upload hashfile
print("\n[Step 2] Uploading hashfile...")
hash_type = 1000 # NTLM
file_format = 5 # hash_only
hashfile_name = "test_hashfile_automated"
upload_result = api.upload_hashfile(
test_hashfile,
customer_id,
hash_type,
file_format,
hashfile_name
)
hashfile_id = upload_result['hashfile_id']
print(f" ✓ Hashfile ID: {hashfile_id}")
# Step 3: Create job
print("\n[Step 3] Creating job...")
job_name = "test_job_automated"
job_result = api.create_job(
name=job_name,
hashfile_id=hashfile_id,
customer_id=customer_id
)
assert job_result is not None, "No job result returned"
print(f" ✓ Job created successfully")
if 'job_id' in job_result:
print(f" ✓ Job ID: {job_result['job_id']}")
print("\n" + "="*60)
print("✓ Option 2 (Create Job) is READY and WORKING!")
print("="*60)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# tests package init

22
tests/conftest.py Normal file
View File

@@ -0,0 +1,22 @@
import importlib.util
import sys
from pathlib import Path
import pytest
def load_hate_crack_module(monkeypatch):
monkeypatch.setenv("HATE_CRACK_SKIP_INIT", "1")
module_path = Path(__file__).resolve().parents[1] / "hate_crack.py"
module_name = "hate_crack_script"
if module_name in sys.modules:
del sys.modules[module_name]
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
@pytest.fixture
def hc_module(monkeypatch):
return load_hate_crack_module(monkeypatch)

20
tests/run_checks.py Normal file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python3
import subprocess
import sys
def run(cmd, label):
print(f"\n==> {label}: {' '.join(cmd)}")
return subprocess.call(cmd)
def main():
# Lint first so we fail fast on style issues.
rc = run([sys.executable, "-m", "ruff", "check", "."], "lint")
if rc != 0:
return rc
return run([sys.executable, "-m", "pytest"], "pytest")
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,18 @@
import shutil
import warnings
import pytest
def _require_executable(name):
if shutil.which(name) is None:
warnings.warn(f"Missing required dependency: {name}", RuntimeWarning)
pytest.fail(f"Required dependency not installed: {name}")
def test_dependency_7z_installed():
_require_executable("7z")
def test_dependency_transmission_cli_installed():
_require_executable("transmission-cli")

View File

@@ -0,0 +1,31 @@
def test_hashmob_connectivity_mocked(monkeypatch, capsys):
from hate_crack import hashmob_wordlist as hm
class FakeResp:
def raise_for_status(self):
return None
def json(self):
return [
{"type": "wordlist", "name": "List A", "information": "Info A"},
{"type": "wordlist", "name": "List B", "information": "Info B"},
{"type": "other", "name": "Ignore", "information": "Nope"},
]
def fake_get(url, headers=None, timeout=None):
assert url == "https://hashmob.net/api/v2/resource"
assert headers == {"api-key": "test-key"}
return FakeResp()
monkeypatch.setattr(hm, "get_api_key", lambda: "test-key")
monkeypatch.setattr(hm.requests, "get", fake_get)
result = hm.download_hashmob_wordlist_list()
assert len(result) == 2
assert result[0]["name"] == "List A"
assert result[1]["name"] == "List B"
captured = capsys.readouterr()
assert "Available Hashmob Wordlists:" in captured.out
assert "List A" in captured.out
assert "List B" in captured.out

358
tests/test_hashview.py Normal file
View File

@@ -0,0 +1,358 @@
"""
Tests for Hashview integration - Mocked API calls for CI/CD
"""
import pytest
import sys
import os
import json
import tempfile
from unittest.mock import Mock, patch, MagicMock
# Add the parent directory to the path to import hate_crack
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from hate_crack.hashview import HashviewAPI
# Test configuration - these are mock values, not real credentials
HASHVIEW_URL = 'https://hashview.example.com'
HASHVIEW_API_KEY = 'test-api-key-123'
class TestHashviewAPI:
"""Test suite for HashviewAPI class with mocked API calls"""
@pytest.fixture
def api(self):
"""Create a HashviewAPI instance with mocked session"""
with patch('requests.Session') as mock_session_class:
api = HashviewAPI(
base_url=HASHVIEW_URL,
api_key=HASHVIEW_API_KEY
)
# Replace the session with a mock
api.session = MagicMock()
yield api
@pytest.fixture
def test_hashfile(self):
"""Create a temporary test hashfile with NTLM hashes"""
test_hashes = [
"8846f7eaee8fb117ad06bdd830b7586c", # password (NTLM)
"e19ccf75ee54e06b06a5907af13cef42", # 123456 (NTLM)
"5835048ce94ad0564e29a924a03510ef", # 12345678 (NTLM)
]
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
hashfile_path = f.name
for hash_val in test_hashes:
f.write(hash_val + '\n')
yield hashfile_path
# Cleanup
def test_list_hashfiles_success(self, api):
"""Test successful hashfile listing with mocked API call"""
mock_response = Mock()
mock_response.json.return_value = {
'hashfiles': json.dumps([
{'id': 1, 'customer_id': 1, 'name': 'hashfile1.txt'},
{'id': 2, 'customer_id': 2, 'name': 'hashfile2.txt'}
])
}
mock_response.raise_for_status = Mock()
api.session.get.return_value = mock_response
result = api.list_hashfiles()
assert isinstance(result, list)
assert len(result) == 2
assert result[0]['name'] == 'hashfile1.txt'
def test_list_hashfiles_empty(self, api):
"""Test hashfile listing returns empty list if no hashfiles"""
mock_response = Mock()
mock_response.json.return_value = {}
mock_response.raise_for_status = Mock()
api.session.get.return_value = mock_response
result = api.list_hashfiles()
assert result == []
def test_get_customer_hashfiles(self, api):
"""Test filtering hashfiles by customer_id"""
api.list_hashfiles = Mock(return_value=[
{'id': 1, 'customer_id': 1, 'name': 'hashfile1.txt'},
{'id': 2, 'customer_id': 2, 'name': 'hashfile2.txt'},
{'id': 3, 'customer_id': 1, 'name': 'hashfile3.txt'}
])
result = api.get_customer_hashfiles(1)
assert len(result) == 2
assert all(hf['customer_id'] == 1 for hf in result)
def test_display_customers_multicolumn_empty(self, api, capsys):
"""Test display_customers_multicolumn with no customers"""
api.display_customers_multicolumn([])
captured = capsys.readouterr()
assert "No customers found" in captured.out
def test_upload_cracked_hashes_success(self, api, tmp_path):
"""Test uploading cracked hashes with valid lines"""
cracked_file = tmp_path / "cracked.txt"
cracked_file.write_text("8846f7eaee8fb117ad06bdd830b7586c:password\n"
"e19ccf75ee54e06b06a5907af13cef42:123456\n"
"31d6cfe0d16ae931b73c59d7e0c089c0:should_skip\n"
"invalidline\n")
mock_response = Mock()
mock_response.json.return_value = {'imported': 2}
mock_response.raise_for_status = Mock()
api.session.post.return_value = mock_response
result = api.upload_cracked_hashes(str(cracked_file), hash_type='1000')
assert 'imported' in result
assert result['imported'] == 2
def test_upload_cracked_hashes_api_error(self, api, tmp_path):
"""Test uploading cracked hashes with API error response"""
cracked_file = tmp_path / "cracked.txt"
cracked_file.write_text("8846f7eaee8fb117ad06bdd830b7586c:password\n")
mock_response = Mock()
mock_response.json.return_value = {'type': 'Error', 'msg': 'Some error'}
mock_response.raise_for_status = Mock()
api.session.post.return_value = mock_response
with pytest.raises(Exception) as excinfo:
api.upload_cracked_hashes(str(cracked_file), hash_type='1000')
assert "Hashview API Error" in str(excinfo.value)
def test_upload_cracked_hashes_invalid_json(self, api, tmp_path):
"""Test uploading cracked hashes with invalid JSON response"""
cracked_file = tmp_path / "cracked.txt"
cracked_file.write_text("8846f7eaee8fb117ad06bdd830b7586c:password\n")
mock_response = Mock()
mock_response.json.side_effect = json.JSONDecodeError("Expecting value", "", 0)
mock_response.text = "not a json"
mock_response.raise_for_status = Mock()
api.session.post.return_value = mock_response
with pytest.raises(Exception) as excinfo:
api.upload_cracked_hashes(str(cracked_file), hash_type='1000')
assert "Invalid API response" in str(excinfo.value)
def test_create_customer_success(self, api):
"""Test creating a customer"""
mock_response = Mock()
mock_response.json.return_value = {'id': 10, 'name': 'New Customer'}
mock_response.raise_for_status = Mock()
api.session.post.return_value = mock_response
result = api.create_customer("New Customer")
assert result['id'] == 10
assert result['name'] == "New Customer"
def test_download_left_hashes(self, api, tmp_path):
"""Test downloading left hashes writes file"""
mock_response = Mock()
mock_response.content = b"hash1\nhash2\n"
mock_response.raise_for_status = Mock()
api.session.get.return_value = mock_response
output_file = tmp_path / "left_1_2.txt"
result = api.download_left_hashes(1, 2, output_file=str(output_file))
assert os.path.exists(result['output_file'])
with open(result['output_file'], 'rb') as f:
content = f.read()
assert content == b"hash1\nhash2\n"
assert result['size'] == len(content)
def test_create_job_workflow(self, api, test_hashfile):
"""Test creating a job in Hashview (option 2 complete workflow)"""
print("\n" + "="*60)
print("Testing Option 2: Create Job Workflow")
print("="*60)
# Mock responses for different endpoints - API returns 'users' as a JSON string
mock_customers_response = Mock()
mock_customers_response.json.return_value = {
'users': json.dumps([{'id': 1, 'name': 'Test Customer'}])
}
mock_customers_response.raise_for_status = Mock()
mock_upload_response = Mock()
mock_upload_response.json.return_value = {
'hashfile_id': 4567,
'msg': 'Hashfile added'
}
mock_upload_response.raise_for_status = Mock()
mock_job_response = Mock()
mock_job_response.json.return_value = {
'job_id': 789,
'msg': 'Job added'
}
mock_job_response.raise_for_status = Mock()
# Configure session mock
api.session.get.return_value = mock_customers_response
api.session.post.side_effect = [mock_upload_response, mock_job_response]
# Step 1: Get test customer
print("\n[Step 1] Getting test customer...")
customers_result = api.list_customers()
test_customer = customers_result['customers'][0]
customer_id = test_customer['id']
print(f" ✓ Using customer ID: {customer_id} ({test_customer['name']})")
# Step 2: Upload hashfile
print("\n[Step 2] Uploading hashfile...")
hash_type = 1000 # NTLM
file_format = 5 # hash_only
hashfile_name = "test_hashfile_automated"
upload_result = api.upload_hashfile(
test_hashfile,
customer_id,
hash_type,
file_format,
hashfile_name
)
hashfile_id = upload_result['hashfile_id']
print(f" ✓ Hashfile ID: {hashfile_id}")
# Step 3: Create job
print("\n[Step 3] Creating job...")
job_name = "test_job_automated"
job_result = api.create_job(
name=job_name,
hashfile_id=hashfile_id,
customer_id=customer_id
)
assert job_result is not None, "No job result returned"
print(" ✓ Job created successfully")
if 'job_id' in job_result:
print(f" ✓ Job ID: {job_result['job_id']}")
print("\n" + "="*60)
print("✓ Option 2 (Create Job) is READY and WORKING!")
print("="*60)
def test_list_hashfiles_success(self, api):
"""Test successful hashfile listing with mocked API call"""
mock_response = Mock()
mock_response.json.return_value = {
'hashfiles': json.dumps([
{'id': 1, 'customer_id': 1, 'name': 'hashfile1.txt'},
{'id': 2, 'customer_id': 2, 'name': 'hashfile2.txt'}
])
}
mock_response.raise_for_status = Mock()
api.session.get.return_value = mock_response
result = api.list_hashfiles()
assert isinstance(result, list)
assert len(result) == 2
assert result[0]['name'] == 'hashfile1.txt'
def test_list_hashfiles_empty(self, api):
"""Test hashfile listing returns empty list if no hashfiles"""
mock_response = Mock()
mock_response.json.return_value = {}
mock_response.raise_for_status = Mock()
api.session.get.return_value = mock_response
result = api.list_hashfiles()
assert result == []
def test_get_customer_hashfiles(self, api):
"""Test filtering hashfiles by customer_id"""
api.list_hashfiles = Mock(return_value=[
{'id': 1, 'customer_id': 1, 'name': 'hashfile1.txt'},
{'id': 2, 'customer_id': 2, 'name': 'hashfile2.txt'},
{'id': 3, 'customer_id': 1, 'name': 'hashfile3.txt'}
])
result = api.get_customer_hashfiles(1)
assert len(result) == 2
assert all(hf['customer_id'] == 1 for hf in result)
def test_display_customers_multicolumn_empty(self, api, capsys):
"""Test display_customers_multicolumn with no customers"""
api.display_customers_multicolumn([])
captured = capsys.readouterr()
assert "No customers found" in captured.out
def test_upload_cracked_hashes_success(self, api, tmp_path):
"""Test uploading cracked hashes with valid lines"""
cracked_file = tmp_path / "cracked.txt"
cracked_file.write_text("8846f7eaee8fb117ad06bdd830b7586c:password\n"
"e19ccf75ee54e06b06a5907af13cef42:123456\n"
"31d6cfe0d16ae931b73c59d7e0c089c0:should_skip\n"
"invalidline\n")
mock_response = Mock()
mock_response.json.return_value = {'imported': 2}
mock_response.raise_for_status = Mock()
api.session.post.return_value = mock_response
result = api.upload_cracked_hashes(str(cracked_file), hash_type='1000')
assert 'imported' in result
assert result['imported'] == 2
def test_upload_cracked_hashes_api_error(self, api, tmp_path):
"""Test uploading cracked hashes with API error response"""
cracked_file = tmp_path / "cracked.txt"
cracked_file.write_text("8846f7eaee8fb117ad06bdd830b7586c:password\n")
mock_response = Mock()
mock_response.json.return_value = {'type': 'Error', 'msg': 'Some error'}
mock_response.raise_for_status = Mock()
api.session.post.return_value = mock_response
with pytest.raises(Exception) as excinfo:
api.upload_cracked_hashes(str(cracked_file), hash_type='1000')
assert "Hashview API Error" in str(excinfo.value)
def test_upload_cracked_hashes_invalid_json(self, api, tmp_path):
"""Test uploading cracked hashes with invalid JSON response"""
cracked_file = tmp_path / "cracked.txt"
cracked_file.write_text("8846f7eaee8fb117ad06bdd830b7586c:password\n")
mock_response = Mock()
mock_response.json.side_effect = json.JSONDecodeError("Expecting value", "", 0)
mock_response.text = "not a json"
mock_response.raise_for_status = Mock()
api.session.post.return_value = mock_response
with pytest.raises(Exception) as excinfo:
api.upload_cracked_hashes(str(cracked_file), hash_type='1000')
assert "Invalid API response" in str(excinfo.value)
def test_create_customer_success(self, api):
"""Test creating a customer"""
mock_response = Mock()
mock_response.json.return_value = {'id': 10, 'name': 'New Customer'}
mock_response.raise_for_status = Mock()
api.session.post.return_value = mock_response
result = api.create_customer("New Customer")
assert result['id'] == 10
assert result['name'] == "New Customer"
def test_download_left_hashes(self, api, tmp_path):
"""Test downloading left hashes writes file"""
mock_response = Mock()
mock_response.content = b"hash1\nhash2\n"
mock_response.raise_for_status = Mock()
api.session.get.return_value = mock_response
output_file = tmp_path / "left_1_2.txt"
result = api.download_left_hashes(1, 2, output_file=str(output_file))
assert os.path.exists(result['output_file'])
with open(result['output_file'], 'rb') as f:
content = f.read()
assert content == b"hash1\nhash2\n"
assert result['size'] == len(content)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1,38 @@
from pathlib import Path
def test_generate_session_id_sanitizes(hc_module):
hc = hc_module
hc.hcatHashFile = "/tmp/my hash@file(1).txt"
assert hc.generate_session_id() == "my_hash_file_1_"
def test_line_count(hc_module, tmp_path):
hc = hc_module
test_file = tmp_path / "lines.txt"
test_file.write_text("a\nb\nc\n", encoding="utf-8")
assert hc.lineCount(str(test_file)) == 3
def test_verify_wordlist_dir_resolves(hc_module, tmp_path):
hc = hc_module
directory = tmp_path / "wordlists"
directory.mkdir()
wordlist = directory / "list.txt"
wordlist.write_text("one\n", encoding="utf-8")
assert hc.verify_wordlist_dir(str(directory), "list.txt") == str(wordlist)
def test_verify_wordlist_dir_prefers_absolute(hc_module, tmp_path):
hc = hc_module
wordlist = tmp_path / "absolute.txt"
wordlist.write_text("one\n", encoding="utf-8")
assert hc.verify_wordlist_dir("/does/not/matter", str(wordlist)) == str(wordlist)
def test_convert_hex(hc_module, tmp_path):
hc = hc_module
data = "$HEX[68656c6c6f]\nplain\n"
infile = tmp_path / "hex.txt"
infile.write_text(data, encoding="utf-8")
assert hc.convert_hex(str(infile)) == ["hello", "plain"]

View File

@@ -0,0 +1,53 @@
import builtins
from pathlib import Path
SNAPSHOT_DIR = Path(__file__).resolve().parent / "fixtures" / "menu_outputs"
def _snapshot_text(out, err):
return f"STDOUT:\n{out}STDERR:\n{err}"
def _assert_snapshot(name, capsys):
captured = capsys.readouterr()
snapshot_path = SNAPSHOT_DIR / f"{name}.txt"
expected = snapshot_path.read_text(encoding="utf-8")
actual = _snapshot_text(captured.out, captured.err)
assert actual == expected
def _input_sequence(values):
iterator = iter(values)
def _fake_input(_prompt=""):
return next(iterator)
return _fake_input
def _setup_globals(hc, tmp_path):
hc.hcatHashType = "1000"
hc.hcatHashFile = "hashes"
hc.hcatHashFileOrig = hc.hcatHashFile
hc.hcatWordlists = str(tmp_path / "wordlists")
hc.hcatOptimizedWordlists = str(tmp_path / "optimized")
hc.hcatPath = str(tmp_path / "hcat")
hc.hcatHybridlist = ["hybrid1.txt", "hybrid2.txt"]
hc.hcatBruteCount = 0
hc.hcatDictionaryCount = 0
hc.hcatMaskCount = 0
hc.hcatFingerprintCount = 0
hc.hcatCombinationCount = 0
hc.hcatHybridCount = 0
hc.hcatExtraCount = 0
def test_hybrid_crack_snapshot(hc_module, monkeypatch, tmp_path, capsys):
hc = hc_module
_setup_globals(hc, tmp_path)
monkeypatch.setattr(hc, "hcatHybrid", lambda *args, **kwargs: print("hcatHybrid called"))
monkeypatch.setattr(builtins, "input", _input_sequence([""]))
hc.hybrid_crack()
_assert_snapshot("hybrid_crack", capsys)

View File

@@ -0,0 +1,31 @@
import importlib
def test_import_hashview_module():
module = importlib.import_module("hate_crack.hashview")
assert hasattr(module, "HashviewAPI")
def test_import_hashmob_module():
module = importlib.import_module("hate_crack.hashmob_wordlist")
assert hasattr(module, "list_and_download_official_wordlists")
def test_import_weakpass_module():
module = importlib.import_module("hate_crack.weakpass")
assert hasattr(module, "weakpass_wordlist_menu")
def test_import_cli_module():
module = importlib.import_module("hate_crack.cli")
assert hasattr(module, "add_common_args")
def test_import_api_module():
module = importlib.import_module("hate_crack.api")
assert hasattr(module, "download_hashes_from_hashview")
def test_import_attacks_module():
module = importlib.import_module("hate_crack.attacks")
assert hasattr(module, "quick_crack")

View File

@@ -1,88 +0,0 @@
#!/usr/bin/env python3
import sys
import os
import subprocess
import shutil
import pathlib
# Help
def usage():
print("usage: python %s <input file list> <output directory>" % sys.argv[0])
def lineCount(file):
try:
outFile = open(file)
except:
return 0
count = 0
for line in outFile:
count = count + 1
return count
# Main guts
def main():
try:
if not os.path.isfile(sys.argv[1]):
print('{0} is not a valid file.\n'.format(sys.argv[1]))
sys.exit()
if not os.path.isdir(sys.argv[2]):
create_directory = input('{0} is not a directory. Do you want to create it? (Y or N)'.format(sys.argv[2]))
if create_directory.upper() == 'Y':
try:
pathlib.Path(sys.argv[2]).mkdir(parents=True, exist_ok=True)
except PermissionError:
print('You do not have the correct permissions to receate the directory. Please try a different path or create manually')
sys.exit()
else:
print('Please specify a valid directory and try again')
sys.exit()
input_list = open(sys.argv[1], "r")
destination = sys.argv[2]
except IndexError:
usage()
sys.exit()
if sys.platform == 'darwin':
splitlen_bin = "hashcat-utils/bin/splitlen.app"
rli_bin = "hashcat-utils/bin/rli.app"
else:
splitlen_bin = "hashcat-utils/bin/splitlen.bin"
rli_bin = "hashcat-utils/bin/rli.bin"
# Get list of wordlists from <input file list> argument
for wordlist in input_list:
print(wordlist.strip())
# Parse wordlists by password length into "optimized" <output directory>
if len(os.listdir(destination)) == 0:
splitlenProcess = subprocess.Popen("%s %s < %s" % (splitlen_bin, destination, wordlist), shell=True).wait()
else:
if not os.path.isdir("/tmp/splitlen"):
os.mkdir("/tmp/splitlen")
splitlenProcess = subprocess.Popen("%s /tmp/splitlen < %s" % (splitlen_bin, wordlist), shell=True).wait()
# Copy unique passwords into "optimized" <output directory>
for file in os.listdir("/tmp/splitlen"):
if not os.path.isfile(destination + "/" + file):
shutil.copyfile("/tmp/splitlen/" + file, destination + "/" + file)
else:
rliProcess = subprocess.Popen("%s /tmp/splitlen/%s /tmp/splitlen.out %s/%s" % (rli_bin, file, destination, file), shell=True).wait()
if lineCount("/tmp/splitlen.out") > 0:
destination_file = open(destination + "/" + file, "a")
splitlen_file = open("/tmp/splitlen.out", "r")
destination_file.write(splitlen_file.read())
destination_file.close()
splitlen_file.close()
# Clean Up
if os.path.isdir("/tmp/splitlen"):
shutil.rmtree('/tmp/splitlen')
if os.path.isfile("/tmp/splitlen.out"):
os.remove("/tmp/splitlen.out")
# Standard boilerplate to call the main() function
if __name__ == '__main__':
main()