expanded tests and hashview menu changes

This commit is contained in:
Justin Bollinger
2026-02-03 19:28:45 -05:00
parent 6bb5cb6526
commit f33ca19107
7 changed files with 495 additions and 48 deletions

View File

@@ -21,7 +21,7 @@ COPY . /workspace
RUN make install
ENV PATH="/root/.local/bin:${PATH}"
ENV PATH="${HOME}/.local/bin:${PATH}"
ENV HATE_CRACK_SKIP_INIT=1
CMD ["bash", "-lc", "/root/.local/bin/hate_crack --help >/tmp/hc_help.txt && ./hate_crack.py --help >/tmp/hc_script_help.txt"]
CMD ["bash", "-lc", "${HOME}/.local/bin/hate_crack --help >/tmp/hc_help.txt && ./hate_crack.py --help >/tmp/hc_script_help.txt"]

View File

@@ -1,5 +1,5 @@
.DEFAULT_GOAL := submodules
.PHONY: install clean hashcat-utils submodules test
.PHONY: install reinstall clean hashcat-utils submodules test
hashcat-utils: submodules
$(MAKE) -C hashcat-utils
@@ -46,6 +46,9 @@ install:
exit 1; \
fi
reinstall:
@uv tool install . --force
clean:
-$(MAKE) -C hashcat-utils clean

View File

@@ -188,6 +188,12 @@ Install OS dependencies + tool (auto-detects macOS vs Debian/Ubuntu):
make install
```
Reinstall the Python tool in-place (keeps OS deps as-is):
```bash
make reinstall
```
Uninstall OS dependencies + tool:
```bash

View File

@@ -721,6 +721,34 @@ class HashviewAPI:
print(f"Downloaded {downloaded} bytes.")
return {'output_file': output_file, 'size': downloaded}
def download_found_hashes(self, customer_id, hashfile_id, output_file=None):
import sys
url = f"{self.base_url}/v1/hashfiles/{hashfile_id}/found"
resp = self.session.get(url, stream=True)
resp.raise_for_status()
if output_file is None:
output_file = f"found_{customer_id}_{hashfile_id}.txt"
total = int(resp.headers.get('content-length', 0))
downloaded = 0
chunk_size = 8192
with open(output_file, 'wb') as f:
for chunk in resp.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total > 0:
done = int(50 * downloaded / total)
bar = '[' + '=' * done + ' ' * (50 - done) + ']'
percent = 100 * downloaded / total
sys.stdout.write(f"\rDownloading: {bar} {percent:5.1f}% ({downloaded}/{total} bytes)")
sys.stdout.flush()
if total > 0:
sys.stdout.write("\n")
# If content-length is not provided, just print size at end
if total == 0:
print(f"Downloaded {downloaded} bytes.")
return {'output_file': output_file, 'size': downloaded}
def upload_cracked_hashes(self, file_path, hash_type='1000'):
valid_lines = []
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
@@ -749,6 +777,33 @@ class HashviewAPI:
except (json.JSONDecodeError, ValueError):
raise Exception(f"Invalid API response: {resp.text[:200]}")
def download_wordlist(self, wordlist_id, output_file=None):
import sys
url = f"{self.base_url}/v1/wordlists/{wordlist_id}"
resp = self.session.get(url, stream=True)
resp.raise_for_status()
if output_file is None:
output_file = f"wordlist_{wordlist_id}.gz"
total = int(resp.headers.get('content-length', 0))
downloaded = 0
chunk_size = 8192
with open(output_file, 'wb') as f:
for chunk in resp.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total > 0:
done = int(50 * downloaded / total)
bar = '[' + '=' * done + ' ' * (50 - done) + ']'
percent = 100 * downloaded / total
sys.stdout.write(f"\rDownloading: {bar} {percent:5.1f}% ({downloaded}/{total} bytes)")
sys.stdout.flush()
if total > 0:
sys.stdout.write("\n")
if total == 0:
print(f"Downloaded {downloaded} bytes.")
return {'output_file': output_file, 'size': downloaded}
def create_customer(self, name):
url = f"{self.base_url}/v1/customers/add"
headers = {'Content-Type': 'application/json'}

View File

@@ -47,7 +47,8 @@ class HashviewAPI:
print("="*60)
wordlist_path = select_file_with_autocomplete(
"Enter path to wordlist file"
"Enter path to wordlist file",
base_dir=hcatWordlists,
)
uploaded_wordlist_id = None
@@ -597,7 +598,7 @@ def ascii_art():
# File selector with tab autocomplete
def select_file_with_autocomplete(prompt, default=None, allow_multiple=False):
def select_file_with_autocomplete(prompt, default=None, allow_multiple=False, base_dir=None):
"""
Interactive file selector with tab autocomplete functionality.
@@ -656,7 +657,9 @@ def select_file_with_autocomplete(prompt, default=None, allow_multiple=False):
full_prompt += ": "
result = input(full_prompt).strip()
if not result and base_dir:
result = base_dir
# Handle default
if not result and default:
return default
@@ -1668,9 +1671,9 @@ def hashview_api():
print("="*60)
print("\t(1) Upload Cracked Hashes from current session")
print("\t(2) Upload Wordlist")
print("\t(3) List Customers")
print("\t(4) Create Customer")
print("\t(5) Download Left Hashes")
print("\t(3) Download Wordlist")
print("\t(4) Download Left Hashes")
print("\t(5) Download Found Hashes")
print("\t(6) Upload Hashfile and Create Job")
print("\t(99) Back to Main Menu")
@@ -1755,7 +1758,8 @@ def hashview_api():
print("Upload Wordlist")
print("-"*60)
wordlist_path = select_file_with_autocomplete(
"Enter path to wordlist file (TAB to autocomplete)"
"Enter path to wordlist file (TAB to autocomplete)",
base_dir=hcatWordlists,
)
if isinstance(wordlist_path, list):
wordlist_path = wordlist_path[0] if wordlist_path else None
@@ -1774,8 +1778,127 @@ def hashview_api():
except Exception as e:
print(f"\n✗ Error uploading wordlist: {str(e)}")
elif choice == '3':
# Download wordlist
try:
wordlists = api_harness.list_wordlists()
if wordlists:
print("\n" + "="*100)
print("Available Wordlists:")
print("="*100)
print(f"{'ID':<10} {'Name':<60} {'Size':>12}")
print("-" * 100)
for wl in wordlists:
wl_id = wl.get('id', 'N/A')
wl_name = wl.get('name', 'N/A')
wl_size = wl.get('size', 'N/A')
name = str(wl_name)
if len(name) > 60:
name = name[:57] + "..."
print(f"{wl_id:<10} {name:<60} {wl_size:>12}")
print("="*100)
else:
print("\nNo wordlists found.")
except Exception as e:
print(f"\n✗ Error fetching wordlists: {str(e)}")
continue
try:
wordlist_id = int(input("\nEnter wordlist ID: "))
except ValueError:
print("\n✗ Error: Invalid ID entered. Please enter a numeric ID.")
continue
output_file = input("Enter output file name (default: wordlist_<id>.gz): ").strip() or None
try:
download_result = api_harness.download_wordlist(wordlist_id, output_file)
print(f"\n✓ Success: Downloaded {download_result['size']} bytes")
print(f" File: {download_result['output_file']}")
except Exception as e:
print(f"\n✗ Error downloading wordlist: {str(e)}")
elif choice == '5':
# Download found hashes
try:
# First, list customers to help user select
customers = api_harness.list_customers_with_hashfiles()
if customers:
api_harness.display_customers_multicolumn(customers)
else:
print("\nNo customers found with hashfiles.")
# Get customer ID and hashfile ID directly
customer_id = int(input("\nEnter customer ID: "))
# List hashfiles for the customer
try:
customer_hashfiles = api_harness.get_customer_hashfiles(customer_id)
if customer_hashfiles:
print("\n" + "="*100)
print(f"Hashfiles for Customer ID {customer_id}:")
print("="*100)
print(f"{'ID':<10} {'Name':<88}")
print("-" * 100)
for hf in customer_hashfiles:
hf_id = hf.get('id', 'N/A')
hf_name = hf.get('name', 'N/A')
# Truncate long names to fit within 100 columns
if len(str(hf_name)) > 88:
hf_name = str(hf_name)[:85] + "..."
print(f"{hf_id:<10} {hf_name:<88}")
print("="*100)
print(f"Total: {len(customer_hashfiles)} hashfile(s)")
else:
print(f"\nNo hashfiles found for customer ID {customer_id}")
except Exception as e:
print(f"\nWarning: Could not list hashfiles: {e}")
print("You may need to manually find the hashfile ID in the web interface.")
hashfile_id = int(input("\nEnter hashfile ID: "))
# Set output filename automatically
output_file = f"found_{customer_id}_{hashfile_id}.txt"
# Download the found hashes
download_result = api_harness.download_found_hashes(
customer_id, hashfile_id, output_file
)
print(f"\n✓ Success: Downloaded {download_result['size']} bytes")
print(f" File: {download_result['output_file']}")
except ValueError:
print("\n✗ Error: Invalid ID entered. Please enter a numeric ID.")
except Exception as e:
print(f"\n✗ Error downloading hashes: {str(e)}")
elif choice == '6':
# Upload hashfile and create job
# First, list customers to help user select
try:
customers = api_harness.list_customers_with_hashfiles()
if customers:
api_harness.display_customers_multicolumn(customers)
else:
print("\nNo customers found with hashfiles.")
except Exception as e:
print(f"\n✗ Error fetching customers: {str(e)}")
# Offer to create a customer before proceeding
create_customer = input("\nCreate a new customer? (y/N): ").strip().lower()
if create_customer == 'y':
customer_name = input("Enter customer name: ").strip()
if customer_name:
try:
result = api_harness.create_customer(customer_name)
print(f"\n✓ Success: {result.get('msg', 'Customer created')}")
if 'customer_id' in result:
print(f" Customer ID: {result['customer_id']}")
except Exception as e:
print(f"\n✗ Error creating customer: {str(e)}")
else:
print("\n✗ Error: Customer name cannot be empty.")
hashfile_path = select_file_with_autocomplete(
"Enter path to hashfile (TAB to autocomplete)"
)
@@ -1829,7 +1952,8 @@ def hashview_api():
print("="*60)
wordlist_path = select_file_with_autocomplete(
"Enter path to wordlist file"
"Enter path to wordlist file",
base_dir=hcatWordlists,
)
if wordlist_path and os.path.isfile(wordlist_path):
@@ -1887,29 +2011,7 @@ def hashview_api():
except Exception as e:
print(f"\n✗ Error uploading hashfile: {str(e)}")
elif choice == '3':
# List customers
try:
customers = api_harness.list_customers_with_hashfiles()
if customers:
api_harness.display_customers_multicolumn(customers)
else:
print("\nNo customers found with hashfiles.")
except Exception as e:
print(f"\n✗ Error fetching customers: {str(e)}")
elif choice == '4':
# Create customer
customer_name = input("\nEnter customer name: ")
try:
result = api_harness.create_customer(customer_name)
print(f"\n✓ Success: {result.get('msg', 'Customer created')}")
if 'customer_id' in result:
print(f" Customer ID: {result['customer_id']}")
except Exception as e:
print(f"\n✗ Error creating customer: {str(e)}")
elif choice == '5':
# Download left hashes
try:
# First, list customers to help user select
@@ -2248,21 +2350,83 @@ def main():
import argparse
parser = argparse.ArgumentParser(description="hate_crack - Hashcat automation and wordlist management tool")
parser.add_argument('hashfile', nargs='?', default=None, help='Path to hash file to crack (positional, optional)')
parser.add_argument('hashtype', nargs='?', default=None, help='Hashcat hash type (e.g., 1000 for NTLM) (positional, optional)')
parser.add_argument('--download-hashview', action='store_true', help='Download hashes from Hashview (legacy menu)')
parser.add_argument('--hashview', action='store_true', help='Jump directly to Hashview customer/hashfile menu')
parser.add_argument('--download-torrent', metavar='FILENAME', help='Download a specific Weakpass torrent file')
parser.add_argument('--download-all-torrents', action='store_true', help='Download all available Weakpass torrents from cache')
parser.add_argument('--weakpass', action='store_true', help='Download wordlists from Weakpass')
parser.add_argument('--rank', type=int, default=-1, help='Only show wordlists with this rank (use 0 to show all, default: >4)')
parser.add_argument('--hashmob', action='store_true', help='Download wordlists from Hashmob.net')
parser.add_argument('--rules', action='store_true', help='Download rules from Hashmob.net')
parser.add_argument('--cleanup', action='store_true', help='Cleanup .out files, torrents, and extract or remove .7z archives')
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
def _build_parser(include_positional):
parser = argparse.ArgumentParser(description="hate_crack - Hashcat automation and wordlist management tool")
if include_positional:
parser.add_argument('hashfile', nargs='?', default=None, help='Path to hash file to crack (positional, optional)')
parser.add_argument('hashtype', nargs='?', default=None, help='Hashcat hash type (e.g., 1000 for NTLM) (positional, optional)')
parser.add_argument('--download-hashview', action='store_true', help='Download hashes from Hashview (legacy menu)')
parser.add_argument('--hashview', action='store_true', help='Jump directly to Hashview customer/hashfile menu')
parser.add_argument('--download-torrent', metavar='FILENAME', help='Download a specific Weakpass torrent file')
parser.add_argument('--download-all-torrents', action='store_true', help='Download all available Weakpass torrents from cache')
parser.add_argument('--weakpass', action='store_true', help='Download wordlists from Weakpass')
parser.add_argument('--rank', type=int, default=-1, help='Only show wordlists with this rank (use 0 to show all, default: >4)')
parser.add_argument('--hashmob', action='store_true', help='Download wordlists from Hashmob.net')
parser.add_argument('--rules', action='store_true', help='Download rules from Hashmob.net')
parser.add_argument('--cleanup', action='store_true', help='Cleanup .out files, torrents, and extract or remove .7z archives')
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
subparsers = parser.add_subparsers(dest='command')
hashview_parser = subparsers.add_parser('hashview', help='Hashview menu actions')
hashview_subparsers = hashview_parser.add_subparsers(dest='hashview_command')
hv_upload_cracked = hashview_subparsers.add_parser(
'upload-cracked',
help='Upload cracked hashes from a file',
)
hv_upload_cracked.add_argument('--file', required=True, help='Path to cracked hashes file (.out format)')
hv_upload_cracked.add_argument('--hash-type', default='1000', help='Hash type (default: 1000)')
hv_upload_wordlist = hashview_subparsers.add_parser(
'upload-wordlist',
help='Upload a wordlist file',
)
hv_upload_wordlist.add_argument('--file', required=True, help='Path to wordlist file')
hv_upload_wordlist.add_argument('--name', default=None, help='Wordlist name (default: filename)')
hv_download_left = hashview_subparsers.add_parser(
'download-left',
help='Download left hashes for a hashfile',
)
hv_download_left.add_argument('--customer-id', required=True, type=int, help='Customer ID')
hv_download_left.add_argument('--hashfile-id', required=True, type=int, help='Hashfile ID')
hv_download_left.add_argument('--out', default=None, help='Output file path')
hv_download_found = hashview_subparsers.add_parser(
'download-found',
help='Download found hashes for a hashfile',
)
hv_download_found.add_argument('--customer-id', required=True, type=int, help='Customer ID')
hv_download_found.add_argument('--hashfile-id', required=True, type=int, help='Hashfile ID')
hv_download_found.add_argument('--out', default=None, help='Output file path')
hv_upload_hashfile_job = hashview_subparsers.add_parser(
'upload-hashfile-job',
help='Upload a hashfile and create a job',
)
hv_upload_hashfile_job.add_argument('--file', required=True, help='Path to hashfile')
hv_upload_hashfile_job.add_argument('--customer-id', required=True, type=int, help='Customer ID')
hv_upload_hashfile_job.add_argument('--hash-type', required=True, type=int, help='Hash type (e.g., 1000)')
hv_upload_hashfile_job.add_argument('--file-format', default=5, type=int, help='File format (default: 5)')
hv_upload_hashfile_job.add_argument('--hashfile-name', default=None, help='Hashfile name (default: filename)')
hv_upload_hashfile_job.add_argument('--job-name', required=True, help='Job name')
hv_upload_hashfile_job.add_argument(
'--limit-recovered',
action='store_true',
help='Limit to recovered hashes only',
)
hv_upload_hashfile_job.add_argument(
'--no-notify-email',
action='store_true',
help='Disable email notifications',
)
return parser, hashview_parser
# Removed add_common_args(parser) since config items are now only set via config file
args = parser.parse_args()
argv = sys.argv[1:]
use_subcommand_parser = len(argv) > 0 and argv[0] == 'hashview'
parser, hashview_parser = _build_parser(include_positional=not use_subcommand_parser)
args = parser.parse_args(argv)
global debug_mode
debug_mode = args.debug
@@ -2303,6 +2467,88 @@ def main():
)
sys.exit(0)
if args.command == 'hashview':
if not hashview_api_key:
print("\nError: Hashview API key not configured.")
print("Please set 'hashview_api_key' in config.json")
sys.exit(1)
api_harness = HashviewAPI(hashview_url, hashview_api_key, debug=debug_mode)
if args.hashview_command == 'upload-cracked':
cracked_file = resolve_path(args.file)
if not cracked_file or not os.path.isfile(cracked_file):
print(f"✗ Error: File not found: {args.file}")
sys.exit(1)
result = api_harness.upload_cracked_hashes(cracked_file, hash_type=args.hash_type)
print(f"\n✓ Success: {result.get('msg', 'Cracked hashes uploaded')}")
if 'count' in result:
print(f" Imported: {result['count']} hashes")
sys.exit(0)
if args.hashview_command == 'upload-wordlist':
wordlist_path = resolve_path(args.file)
if not wordlist_path or not os.path.isfile(wordlist_path):
print(f"✗ Error: File not found: {args.file}")
sys.exit(1)
result = api_harness.upload_wordlist_file(wordlist_path, args.name)
print(f"\n✓ Success: {result.get('msg', 'Wordlist uploaded')}")
if 'wordlist_id' in result:
print(f" Wordlist ID: {result['wordlist_id']}")
sys.exit(0)
if args.hashview_command == 'download-left':
download_result = api_harness.download_left_hashes(
args.customer_id,
args.hashfile_id,
output_file=args.out,
)
print(f"\n✓ Success: Downloaded {download_result['size']} bytes")
print(f" File: {download_result['output_file']}")
sys.exit(0)
if args.hashview_command == 'download-found':
download_result = api_harness.download_found_hashes(
args.customer_id,
args.hashfile_id,
output_file=args.out,
)
print(f"\n✓ Success: Downloaded {download_result['size']} bytes")
print(f" File: {download_result['output_file']}")
sys.exit(0)
if args.hashview_command == 'upload-hashfile-job':
hashfile_path = resolve_path(args.file)
if not hashfile_path or not os.path.isfile(hashfile_path):
print(f"✗ Error: File not found: {args.file}")
sys.exit(1)
upload_result = api_harness.upload_hashfile(
hashfile_path,
args.customer_id,
args.hash_type,
args.file_format,
args.hashfile_name,
)
print(f"\n✓ Success: {upload_result.get('msg', 'Hashfile uploaded')}")
if 'hashfile_id' not in upload_result:
print("✗ Error: Hashfile upload did not return a hashfile_id.")
sys.exit(1)
job_result = api_harness.create_job(
args.job_name,
upload_result['hashfile_id'],
args.customer_id,
limit_recovered=args.limit_recovered,
notify_email=not args.no_notify_email,
)
print(f"\n✓ Success: {job_result.get('msg', 'Job created')}")
if 'job_id' in job_result:
print(f" Job ID: {job_result['job_id']}")
sys.exit(0)
print("✗ Error: No hashview subcommand provided.")
hashview_parser.print_help()
sys.exit(2)
if args.cleanup:
cleanup_wordlist_artifacts()
sys.exit(0)

View File

@@ -75,7 +75,7 @@ def _run_container(image_tag, command, timeout=180):
def test_docker_script_install_and_run(docker_image):
run = _run_container(
docker_image,
"/root/.local/bin/hate_crack --help >/tmp/hc_help.txt && ./hate_crack.py --help >/tmp/hc_script_help.txt",
"$HOME/.local/bin/hate_crack --help >/tmp/hc_help.txt && ./hate_crack.py --help >/tmp/hc_script_help.txt",
timeout=120,
)
assert run.returncode == 0, (

View File

@@ -6,6 +6,7 @@ import sys
import os
import json
import tempfile
import uuid
from unittest.mock import Mock, patch, MagicMock
@@ -236,6 +237,77 @@ class TestHashviewAPI:
content = f.read()
assert content == b"hash1\nhash2\n"
assert result['size'] == len(content)
def test_download_found_hashes(self, api, tmp_path):
"""Test downloading found hashes: real API if possible, else mock."""
hashview_url = os.environ.get('HASHVIEW_URL')
hashview_api_key = os.environ.get('HASHVIEW_API_KEY')
customer_id = os.environ.get('HASHVIEW_CUSTOMER_ID')
hashfile_id = os.environ.get('HASHVIEW_HASHFILE_ID')
if all([hashview_url, hashview_api_key, customer_id, hashfile_id]):
# Real API test
real_api = HashviewAPI(hashview_url, hashview_api_key)
output_file = tmp_path / f"found_{customer_id}_{hashfile_id}.txt"
result = real_api.download_found_hashes(int(customer_id), int(hashfile_id), output_file=str(output_file))
assert os.path.exists(result['output_file'])
with open(result['output_file'], 'rb') as f:
content = f.read()
print(f"[DEBUG] Downloaded {len(content)} bytes to {result['output_file']}")
assert result['size'] == len(content)
else:
# Mock test
mock_response = Mock()
mock_response.content = b"hash1:pass1\nhash2:pass2\n"
mock_response.raise_for_status = Mock()
mock_response.headers = {'content-length': '0'}
def iter_content(chunk_size=8192):
yield mock_response.content
mock_response.iter_content = iter_content
api.session.get.return_value = mock_response
output_file = tmp_path / "found_1_2.txt"
result = api.download_found_hashes(1, 2, output_file=str(output_file))
assert os.path.exists(result['output_file'])
with open(result['output_file'], 'rb') as f:
content = f.read()
assert content == b"hash1:pass1\nhash2:pass2\n"
assert result['size'] == len(content)
def test_download_wordlist(self, api, tmp_path):
"""Test downloading a wordlist: real API if possible, else mock."""
hashview_url = os.environ.get('HASHVIEW_URL')
hashview_api_key = os.environ.get('HASHVIEW_API_KEY')
wordlist_id = os.environ.get('HASHVIEW_WORDLIST_ID')
if all([hashview_url, hashview_api_key, wordlist_id]):
real_api = HashviewAPI(hashview_url, hashview_api_key)
output_file = tmp_path / f"wordlist_{wordlist_id}.gz"
result = real_api.download_wordlist(int(wordlist_id), output_file=str(output_file))
assert os.path.exists(result['output_file'])
with open(result['output_file'], 'rb') as f:
content = f.read()
print(f"[DEBUG] Downloaded {len(content)} bytes to {result['output_file']}")
assert result['size'] == len(content)
else:
mock_response = Mock()
mock_response.content = b"gzipdata"
mock_response.raise_for_status = Mock()
mock_response.headers = {'content-length': '0'}
def iter_content(chunk_size=8192):
yield mock_response.content
mock_response.iter_content = iter_content
api.session.get.return_value = mock_response
output_file = tmp_path / "wordlist_1.gz"
result = api.download_wordlist(1, output_file=str(output_file))
assert os.path.exists(result['output_file'])
with open(result['output_file'], 'rb') as f:
content = f.read()
assert content == b"gzipdata"
assert result['size'] == len(content)
def test_create_job_workflow(self, api, test_hashfile):
"""Test creating a job in Hashview (option 2 complete workflow)"""
@@ -312,5 +384,70 @@ class TestHashviewAPI:
print("✓ Option 2 (Create Job) is READY and WORKING!")
print("="*60)
def test_create_job_with_new_customer(self, api, test_hashfile):
"""Test creating a new customer and then creating a job (real API if possible)."""
hashview_url = os.environ.get('HASHVIEW_URL')
hashview_api_key = os.environ.get('HASHVIEW_API_KEY')
hash_type = os.environ.get('HASHVIEW_HASH_TYPE', '1000')
if hashview_url and hashview_api_key:
real_api = HashviewAPI(hashview_url, hashview_api_key)
customer_name = f"Example Customer {uuid.uuid4().hex[:8]}"
try:
customer_result = real_api.create_customer(customer_name)
customer_id = customer_result.get('customer_id') or customer_result.get('id')
if not customer_id:
pytest.skip("Create customer did not return a customer_id.")
upload_result = real_api.upload_hashfile(
test_hashfile,
int(customer_id),
int(hash_type),
5,
"test_hashfile_new_customer",
)
hashfile_id = upload_result.get('hashfile_id')
if not hashfile_id:
pytest.skip("Upload hashfile did not return a hashfile_id.")
job_result = real_api.create_job(
name=f"test_job_new_customer_{uuid.uuid4().hex[:6]}",
hashfile_id=hashfile_id,
customer_id=int(customer_id),
)
if isinstance(job_result, dict) and "msg" in job_result:
msg = str(job_result.get("msg", ""))
if "Failed to add job" in msg:
pytest.xfail(f"Hashview rejected job creation: {msg}")
assert job_result is not None
if isinstance(job_result, dict):
assert 'job_id' in job_result
except Exception as e:
pytest.skip(f"Real API create_job with new customer not allowed: {e}")
else:
mock_create_customer = Mock()
mock_create_customer.json.return_value = {'customer_id': 101, 'name': 'Example Customer'}
mock_create_customer.raise_for_status = Mock()
mock_upload_hashfile = Mock()
mock_upload_hashfile.json.return_value = {
'hashfile_id': 202,
'msg': 'Hashfile added'
}
mock_upload_hashfile.raise_for_status = Mock()
mock_create_job = Mock()
mock_create_job.json.return_value = {
'job_id': 303,
'msg': 'Job added'
}
mock_create_job.raise_for_status = Mock()
api.session.post.side_effect = [mock_create_customer, mock_upload_hashfile, mock_create_job]
customer_result = api.create_customer("Example Customer")
assert customer_result.get('customer_id') == 101
upload_result = api.upload_hashfile(test_hashfile, 101, 1000, 5, "test_hashfile_new_customer")
assert upload_result.get('hashfile_id') == 202
job_result = api.create_job("test_job_new_customer", 202, 101)
assert job_result.get('job_id') == 303
if __name__ == '__main__':
pytest.main([__file__, '-v'])