diff --git a/hate_crack/api.py b/hate_crack/api.py index e6f058f..f3955eb 100644 --- a/hate_crack/api.py +++ b/hate_crack/api.py @@ -593,9 +593,13 @@ class HashviewAPI: if data: hashtype = data.get("hashtype") or data.get("hash_type") or data.get("type") if self.debug: - print(f"[DEBUG] get_hashfile_details({hashfile_id}): raw data={data}, hashtype={hashtype}") + print( + f"[DEBUG] get_hashfile_details({hashfile_id}): raw data={data}, hashtype={hashtype}" + ) elif self.debug: - print(f"[DEBUG] get_hashfile_details({hashfile_id}): no data returned. raw response: {resp.text}") + print( + f"[DEBUG] get_hashfile_details({hashfile_id}): no data returned. raw response: {resp.text}" + ) return { "hashfile_id": hashfile_id, "hashtype": hashtype, @@ -640,10 +644,12 @@ class HashviewAPI: customer_hfs = [ hf for hf in all_hashfiles if int(hf.get("customer_id", 0)) == customer_id ] - + if self.debug: - print(f"[DEBUG] get_customer_hashfiles({customer_id}): found {len(customer_hfs)} hashfiles") - + print( + f"[DEBUG] get_customer_hashfiles({customer_id}): found {len(customer_hfs)} hashfiles" + ) + # Fetch hash types for any hashfiles missing them for hf in customer_hfs: if not (hf.get("hashtype") or hf.get("hash_type")): @@ -657,13 +663,19 @@ class HashviewAPI: if hashtype: hf["hash_type"] = hashtype if self.debug: - print(f"[DEBUG] Updated hashfile {hf_id} with hash_type={hashtype}") + print( + f"[DEBUG] Updated hashfile {hf_id} with hash_type={hashtype}" + ) elif self.debug: - print(f"[DEBUG] No hashtype found in details for {hf_id}: {details}") + print( + f"[DEBUG] No hashtype found in details for {hf_id}: {details}" + ) except Exception as e: if self.debug: - print(f"[DEBUG] Exception fetching hash_type for {hf_id}: {e}") - + print( + f"[DEBUG] Exception fetching hash_type for {hf_id}: {e}" + ) + return customer_hfs def get_customer_hashfiles_with_hashtype(self, customer_id, target_hashtype="1000"): @@ -789,7 +801,9 @@ class HashviewAPI: resp.raise_for_status() return resp.json() - def download_left_hashes(self, customer_id, hashfile_id, output_file=None, hash_type=None): + def download_left_hashes( + self, customer_id, hashfile_id, output_file=None, hash_type=None + ): import sys import subprocess @@ -821,40 +835,48 @@ class HashviewAPI: # If content-length is not provided, just print size at end if total == 0: print(f"Downloaded {downloaded} bytes.") - + # Try to download found file and process with hashcat combined_count = 0 combined_file = None out_dir = os.path.dirname(output_abs) or os.getcwd() found_file = os.path.join(out_dir, f"found_{customer_id}_{hashfile_id}.txt") - + try: # Try to download the found file found_url = f"{self.base_url}/v1/hashfiles/{hashfile_id}/found" - found_resp = self.session.get(found_url, headers=self._auth_headers(), stream=True, timeout=30) - + found_resp = self.session.get( + found_url, headers=self._auth_headers(), stream=True, timeout=30 + ) + # Only proceed if we successfully downloaded the found file (ignore 404s) if found_resp.status_code == 404: # No found file available, that's okay pass else: found_resp.raise_for_status() - + # Write the found file temporarily with open(found_file, "wb") as f: for chunk in found_resp.iter_content(chunk_size=8192): if chunk: f.write(chunk) - + # Split found file into hashes and clears - found_hashes_file = os.path.join(out_dir, f"found_hashes_{customer_id}_{hashfile_id}.txt") - found_clears_file = os.path.join(out_dir, f"found_clears_{customer_id}_{hashfile_id}.txt") - + found_hashes_file = os.path.join( + out_dir, f"found_hashes_{customer_id}_{hashfile_id}.txt" + ) + found_clears_file = os.path.join( + out_dir, f"found_clears_{customer_id}_{hashfile_id}.txt" + ) + hashes_count = 0 clears_count = 0 - - with open(found_hashes_file, "w", encoding="utf-8") as hf, \ - open(found_clears_file, "w", encoding="utf-8") as cf: + + with ( + open(found_hashes_file, "w", encoding="utf-8") as hf, + open(found_clears_file, "w", encoding="utf-8") as cf, + ): with open(found_file, "r", encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() @@ -866,56 +888,74 @@ class HashviewAPI: cf.write(clear_part + "\n") hashes_count += 1 clears_count += 1 - - print(f"Split found file into {hashes_count} hashes and {clears_count} clears") - + + print( + f"Split found file into {hashes_count} hashes and {clears_count} clears" + ) + # Run hashcat to combine them combined_file = output_abs + ".out" try: # Execute hashcat: hashcat -m hash_type found_hashes found_clears --outfile output.out --outfile-format=1,2 tuning_args = get_hcat_tuning_args() - + # Create temporary outfile for hashcat temp_outfile = output_abs + ".tmp" - + if self.debug: - print(f"[DEBUG] download_left_hashes: hash_type={hash_type}, type={type(hash_type)}") - + print( + f"[DEBUG] download_left_hashes: hash_type={hash_type}, type={type(hash_type)}" + ) + # Build command with hash type if provided cmd = ["hashcat", *tuning_args] if hash_type: cmd.extend(["-m", str(hash_type)]) - cmd.extend([ - found_hashes_file, - found_clears_file, - "--outfile", - temp_outfile, - "--outfile-format=1,2", - ]) - + cmd.extend( + [ + found_hashes_file, + found_clears_file, + "--outfile", + temp_outfile, + "--outfile-format=1,2", + ] + ) + if self.debug: print(f"[DEBUG] Running command: {' '.join(cmd)}") - + print(f"Running: {' '.join(cmd)}") - - result = subprocess.run(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, text=True, timeout=300) - + + result = subprocess.run( + cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + text=True, + timeout=300, + ) + if result.returncode != 0: print(f"Warning: hashcat exited with code {result.returncode}") if result.stderr: print(f" stderr: {result.stderr}") - + # Append the output to the combined file if os.path.exists(temp_outfile): - with open(temp_outfile, "r", encoding="utf-8", errors="ignore") as tmp_f: + with open( + temp_outfile, "r", encoding="utf-8", errors="ignore" + ) as tmp_f: with open(combined_file, "a", encoding="utf-8") as out_f: out_f.write(tmp_f.read()) - + # Count lines appended - with open(combined_file, "r", encoding="utf-8", errors="ignore") as f: + with open( + combined_file, "r", encoding="utf-8", errors="ignore" + ) as f: combined_count = len(f.readlines()) - print(f"✓ Appended cracked hashes to {combined_file} (total lines: {combined_count})") - + print( + f"✓ Appended cracked hashes to {combined_file} (total lines: {combined_count})" + ) + # Clean up temp file try: os.remove(temp_outfile) @@ -923,14 +963,14 @@ class HashviewAPI: pass else: print("Note: No cracked hashes found") - + except FileNotFoundError: print("✗ Error: hashcat not found in PATH") except subprocess.TimeoutExpired: print("✗ Error: hashcat execution timed out") except Exception as e: print(f"✗ Error running hashcat: {e}") - + # Clean up temporary files (keep when debug is enabled) if not self.debug: files_to_delete = [found_file, found_hashes_file, found_clears_file] @@ -943,7 +983,7 @@ class HashviewAPI: print(f"Warning: Could not delete {temp_file}: {e}") else: print("Debug enabled: keeping found and split files") - + except Exception as e: # If there's any error downloading found file, just skip it print(f"Note: Could not download found hashes: {e}") @@ -954,7 +994,7 @@ class HashviewAPI: os.remove(found_file) except Exception: pass - + return { "output_file": output_file, "size": downloaded, @@ -962,7 +1002,9 @@ class HashviewAPI: "combined_file": combined_file, } - def download_found_hashes(self, customer_id, hashfile_id, output_file=None, hash_type=None): + def download_found_hashes( + self, customer_id, hashfile_id, output_file=None, hash_type=None + ): import sys import subprocess @@ -971,7 +1013,7 @@ class HashviewAPI: resp.raise_for_status() if output_file is None: output_file = f"found_{customer_id}_{hashfile_id}.txt" - + total = int(resp.headers.get("content-length", 0)) downloaded = 0 chunk_size = 8192 @@ -996,17 +1038,23 @@ class HashviewAPI: # Split found file into hashes and clears output_dir = os.path.dirname(os.path.abspath(output_file)) or os.getcwd() - found_hashes_file = os.path.join(output_dir, f"found_hashes_{customer_id}_{hashfile_id}.txt") - found_clears_file = os.path.join(output_dir, f"found_clears_{customer_id}_{hashfile_id}.txt") - + found_hashes_file = os.path.join( + output_dir, f"found_hashes_{customer_id}_{hashfile_id}.txt" + ) + found_clears_file = os.path.join( + output_dir, f"found_clears_{customer_id}_{hashfile_id}.txt" + ) + hashes_count = 0 clears_count = 0 combined_count = 0 combined_file = None - + try: - with open(found_hashes_file, "w", encoding="utf-8") as hf, \ - open(found_clears_file, "w", encoding="utf-8") as cf: + with ( + open(found_hashes_file, "w", encoding="utf-8") as hf, + open(found_clears_file, "w", encoding="utf-8") as cf, + ): with open(output_file, "r", encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() @@ -1018,55 +1066,71 @@ class HashviewAPI: cf.write(clear_part + "\n") hashes_count += 1 clears_count += 1 - - print(f"✓ Split found file into {hashes_count} hashes and {clears_count} clears") - + + print( + f"✓ Split found file into {hashes_count} hashes and {clears_count} clears" + ) + # Run hashcat to combine them into an output file combined_file = output_file + ".out" try: tuning_args = get_hcat_tuning_args() - + # Create temporary outfile for hashcat temp_outfile = output_file + ".tmp" - + if self.debug: - print(f"[DEBUG] download_found_hashes: hash_type={hash_type}, type={type(hash_type)}") - + print( + f"[DEBUG] download_found_hashes: hash_type={hash_type}, type={type(hash_type)}" + ) + # Build command with hash type if provided cmd = ["hashcat", *tuning_args] if hash_type: cmd.extend(["-m", str(hash_type)]) - cmd.extend([ - found_hashes_file, - found_clears_file, - "--outfile", - temp_outfile, - "--outfile-format=1,2", - ]) - + cmd.extend( + [ + found_hashes_file, + found_clears_file, + "--outfile", + temp_outfile, + "--outfile-format=1,2", + ] + ) + if self.debug: print(f"[DEBUG] Running command: {' '.join(cmd)}") - + print(f"Running: {' '.join(cmd)}") - - result = subprocess.run(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, text=True, timeout=300) - + + result = subprocess.run( + cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + text=True, + timeout=300, + ) + if result.returncode != 0: print(f"Warning: hashcat exited with code {result.returncode}") if result.stderr: print(f" stderr: {result.stderr}") - + # Write the output file if os.path.exists(temp_outfile): - with open(temp_outfile, "r", encoding="utf-8", errors="ignore") as tmp_f: + with open( + temp_outfile, "r", encoding="utf-8", errors="ignore" + ) as tmp_f: with open(combined_file, "w", encoding="utf-8") as out_f: out_f.write(tmp_f.read()) - + # Count lines in output - with open(combined_file, "r", encoding="utf-8", errors="ignore") as f: + with open( + combined_file, "r", encoding="utf-8", errors="ignore" + ) as f: combined_count = len(f.readlines()) print(f"✓ Created {combined_file} (total lines: {combined_count})") - + # Clean up temp file try: os.remove(temp_outfile) @@ -1074,14 +1138,14 @@ class HashviewAPI: pass else: print("Note: No cracked hashes generated") - + except FileNotFoundError: print("✗ Error: hashcat not found in PATH") except subprocess.TimeoutExpired: print("✗ Error: hashcat execution timed out") except Exception as e: print(f"✗ Error running hashcat: {e}") - + # Clean up temporary files (keep when debug is enabled) if not self.debug: files_to_delete = [found_hashes_file, found_clears_file] @@ -1094,10 +1158,10 @@ class HashviewAPI: print(f"Warning: Could not delete {temp_file}: {e}") else: print("Debug enabled: keeping split files") - + except Exception as e: print(f"✗ Error splitting found file: {e}") - + return { "output_file": output_file, "size": downloaded, @@ -1135,18 +1199,24 @@ class HashviewAPI: except (json.JSONDecodeError, ValueError): raise Exception(f"Invalid API response: {resp.text[:200]}") - def download_wordlist(self, wordlist_id, output_file=None, *, update_dynamic: bool = False): + def download_wordlist( + self, wordlist_id, output_file=None, *, update_dynamic: bool = False + ): import sys import re if int(wordlist_id) == 1 and update_dynamic: update_url = f"{self.base_url}/v1/updateWordlist/{wordlist_id}" try: - update_resp = self.session.get(update_url, headers=self._auth_headers(), timeout=30) + update_resp = self.session.get( + update_url, headers=self._auth_headers(), timeout=30 + ) update_resp.raise_for_status() except Exception as exc: if self.debug: - print(f"Warning: failed to update dynamic wordlist {wordlist_id}: {exc}") + print( + f"Warning: failed to update dynamic wordlist {wordlist_id}: {exc}" + ) url = f"{self.base_url}/v1/wordlists/{wordlist_id}" resp = self.session.get(url, headers=self._auth_headers(), stream=True) @@ -1156,7 +1226,9 @@ class HashviewAPI: output_file = "dynamic-all.txt.gz" else: content_disp = resp.headers.get("content-disposition", "") - match = re.search(r"filename=\"?([^\";]+)\"?", content_disp, re.IGNORECASE) + match = re.search( + r"filename=\"?([^\";]+)\"?", content_disp, re.IGNORECASE + ) if match: output_file = os.path.basename(match.group(1)) else: @@ -1269,7 +1341,7 @@ def download_hashes_from_hashview( customer_raw = _safe_input("\nEnter customer ID or N to create new: ").strip() if customer_raw.lower() == "q": raise ValueError("cancelled") - + if customer_raw.lower() == "n": customer_name = _safe_input("Enter customer name: ").strip() if customer_name.lower() == "q": @@ -1316,7 +1388,9 @@ def download_hashes_from_hashview( print_fn(f"Total: {len(hashfile_map)} hashfile(s)") else: print_fn(f"\nNo hashfiles found for customer ID {customer_id}") - print_fn("This customer needs to have hashfiles uploaded before downloading left hashes.") + print_fn( + "This customer needs to have hashfiles uploaded before downloading left hashes." + ) print_fn("Please use the Hashview menu to upload a hashfile first.") raise ValueError("No hashfiles available for download") except ValueError: diff --git a/hate_crack/attacks.py b/hate_crack/attacks.py index 5dc08aa..1d3b5dc 100644 --- a/hate_crack/attacks.py +++ b/hate_crack/attacks.py @@ -89,9 +89,7 @@ def quick_crack(ctx: Any) -> None: except ValueError: print("Please enter a valid number.") - rule_files = sorted( - f for f in os.listdir(ctx.rulesDirectory) if f != ".DS_Store" - ) + rule_files = sorted(f for f in os.listdir(ctx.rulesDirectory) if f != ".DS_Store") if not rule_files: download_rules = ( input("\nNo rules found. Download rules from Hashmob now? (Y/n): ") @@ -182,9 +180,7 @@ def loopback_attack(ctx: Any) -> None: rule_choice = None selected_hcatRules = [] - rule_files = sorted( - f for f in os.listdir(ctx.rulesDirectory) if f != ".DS_Store" - ) + rule_files = sorted(f for f in os.listdir(ctx.rulesDirectory) if f != ".DS_Store") if not rule_files: download_rules = ( input("\nNo rules found. Download rules from Hashmob now? (Y/n): ") @@ -275,7 +271,9 @@ def extensive_crack(ctx: Any) -> None: hcatTargetTime = 4 * 60 * 60 ctx.hcatTopMask(ctx.hcatHashType, ctx.hcatHashFile, hcatTargetTime) ctx.hcatRecycle(ctx.hcatHashType, ctx.hcatHashFile, ctx.hcatMaskCount) - ctx.hcatFingerprint(ctx.hcatHashType, ctx.hcatHashFile, 7, run_hybrid_on_expanded=False) + ctx.hcatFingerprint( + ctx.hcatHashType, ctx.hcatHashFile, 7, run_hybrid_on_expanded=False + ) ctx.hcatRecycle(ctx.hcatHashType, ctx.hcatHashFile, ctx.hcatFingerprintCount) ctx.hcatCombination(ctx.hcatHashType, ctx.hcatHashFile) ctx.hcatRecycle(ctx.hcatHashType, ctx.hcatHashFile, ctx.hcatCombinationCount) @@ -386,10 +384,12 @@ def combinator_crack(ctx: Any) -> None: return wordlists = valid_wordlists - - wordlists = [ctx._resolve_wordlist_path(wl, ctx.hcatWordlists) for wl in wordlists[:2]] - print(f"\nStarting combinator attack with 2 wordlists:") + wordlists = [ + ctx._resolve_wordlist_path(wl, ctx.hcatWordlists) for wl in wordlists[:2] + ] + + print("\nStarting combinator attack with 2 wordlists:") print(f" Wordlist 1: {wordlists[0]}") print(f" Wordlist 2: {wordlists[1]}") print(f"Hash type: {ctx.hcatHashType}") diff --git a/hate_crack/main.py b/hate_crack/main.py index 525a634..9c8ccc2 100755 --- a/hate_crack/main.py +++ b/hate_crack/main.py @@ -68,7 +68,7 @@ from hate_crack import attacks as _attacks # noqa: E402 # Import HashcatRosetta for rule analysis functionality try: - sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'HashcatRosetta')) + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "HashcatRosetta")) from hashcat_rosetta.formatting import display_rule_opcodes_summary except ImportError: display_rule_opcodes_summary = None @@ -298,6 +298,7 @@ def _append_potfile_arg(cmd, *, use_potfile_path=True, potfile_path=None): if pot: cmd.append(f"--potfile-path={pot}") + try: rulesDirectory = config_parser["rules_directory"] except KeyError as e: @@ -471,7 +472,9 @@ except KeyError as e: e ) ) - hcatDebugLogPath = os.path.expanduser(default_config.get("hcatDebugLogPath", "./hashcat_debug")) + hcatDebugLogPath = os.path.expanduser( + default_config.get("hcatDebugLogPath", "./hashcat_debug") + ) hcatExpanderBin = "expander.bin" hcatCombinatorBin = "combinator.bin" @@ -710,7 +713,7 @@ def _debug_cmd(cmd): def _add_debug_mode_for_rules(cmd): """Add debug mode arguments to hashcat command if rules are being used. - + This function detects if rules are present in the command (by looking for -r flags) and adds --debug-mode=1 and --debug-file= if rules are found. Debug log path is configurable via hcatDebugLogPath in config.json @@ -718,14 +721,16 @@ def _add_debug_mode_for_rules(cmd): if "-r" in cmd: # Create debug output directory if it doesn't exist os.makedirs(hcatDebugLogPath, exist_ok=True) - + # Create a debug output filename based on the session ID or hash file debug_filename = os.path.join(hcatDebugLogPath, "hashcat_debug.log") if "--session" in cmd: session_idx = cmd.index("--session") + 1 if session_idx < len(cmd): - debug_filename = os.path.join(hcatDebugLogPath, f"hashcat_debug_{cmd[session_idx]}.log") - + debug_filename = os.path.join( + hcatDebugLogPath, f"hashcat_debug_{cmd[session_idx]}.log" + ) + cmd.extend(["--debug-mode", "4", "--debug-file", debug_filename]) return cmd @@ -1063,7 +1068,9 @@ def hcatQuickDictionary( if hcatChains: cmd.extend(shlex.split(hcatChains)) cmd.extend(shlex.split(hcatTuning)) - _append_potfile_arg(cmd, use_potfile_path=use_potfile_path, potfile_path=potfile_path) + _append_potfile_arg( + cmd, use_potfile_path=use_potfile_path, potfile_path=potfile_path + ) cmd = _add_debug_mode_for_rules(cmd) _debug_cmd(cmd) hcatProcess = subprocess.Popen(cmd) @@ -1143,7 +1150,10 @@ def hcatTopMask(hcatHashType, hcatHashFile, hcatTargetTime): # Fingerprint Attack def hcatFingerprint( - hcatHashType, hcatHashFile, expander_len: int = 7, run_hybrid_on_expanded: bool = False + hcatHashType, + hcatHashFile, + expander_len: int = 7, + run_hybrid_on_expanded: bool = False, ): global hcatFingerprintCount global hcatProcess @@ -1179,7 +1189,9 @@ def hcatFingerprint( expander_stdout = expander_proc.stdout if expander_stdout is None: raise RuntimeError("expander stdout pipe was not created") - sort_proc = subprocess.Popen(["sort", "-u"], stdin=expander_stdout, stdout=dst) + sort_proc = subprocess.Popen( + ["sort", "-u"], stdin=expander_stdout, stdout=dst + ) hcatProcess = sort_proc expander_stdout.close() try: @@ -1234,7 +1246,7 @@ def hcatCombination(hcatHashType, hcatHashFile, wordlists=None): # Ensure wordlists is a list with at least 2 items if not isinstance(wordlists, list): wordlists = [wordlists] - + if len(wordlists) < 2: print("[!] Combinator attack requires at least 2 wordlists.") return @@ -1247,7 +1259,7 @@ def hcatCombination(hcatHashType, hcatHashFile, wordlists=None): resolved_wordlists.append(resolved) else: print(f"[!] Wordlist not found: {resolved}") - + if len(resolved_wordlists) < 2: print("[!] Could not find 2 valid wordlists. Aborting combinator attack.") return @@ -1964,26 +1976,37 @@ def hashview_api(): print("\n" + "=" * 60) print("What would you like to do?") print("=" * 60) - + # Build dynamic menu based on state menu_options = [] if hcatHashFile: - menu_options.append(("upload_cracked", "Upload Cracked Hashes from current session")) + menu_options.append( + ("upload_cracked", "Upload Cracked Hashes from current session") + ) menu_options.append(("upload_wordlist", "Upload Wordlist")) menu_options.append(("download_wordlist", "Download Wordlist")) - menu_options.append(("download_left", "Download Left Hashes (with automatic merge if found)")) - menu_options.append(("download_found", "Download Found Hashes (with automatic split)")) + menu_options.append( + ( + "download_left", + "Download Left Hashes (with automatic merge if found)", + ) + ) + menu_options.append( + ("download_found", "Download Found Hashes (with automatic split)") + ) if hcatHashFile: - menu_options.append(("upload_hashfile_job", "Upload Hashfile and Create Job")) + menu_options.append( + ("upload_hashfile_job", "Upload Hashfile and Create Job") + ) menu_options.append(("back", "Back to Main Menu")) - + # Display menu with dynamic numbering for i, (option_key, option_text) in enumerate(menu_options, 1): if option_key == "back": print(f"\t(99) {option_text}") else: print(f"\t({i}) {option_text}") - + # Create mapping of display numbers to option keys option_map = {} display_num = 1 @@ -1993,19 +2016,21 @@ def hashview_api(): option_map["99"] = "back" choice = input("\nSelect an option: ") - + if choice not in option_map: print("Invalid option. Please try again.") continue - + option_key = option_map[choice] if option_key == "upload_cracked": # Upload cracked hashes if not hcatHashFile: - print("\n✗ Error: No hashfile is currently set. This option is not available.") + print( + "\n✗ Error: No hashfile is currently set. This option is not available." + ) continue - + print("\n" + "-" * 60) print("Upload Cracked Hashes") print("-" * 60) @@ -2152,14 +2177,21 @@ def hashview_api(): print("\n✗ Error: Invalid ID entered. Please enter a numeric ID.") continue - api_name = wordlist_map.get(wordlist_id) if "wordlist_map" in locals() else None + api_name = ( + wordlist_map.get(wordlist_id) + if "wordlist_map" in locals() + else None + ) api_filename = "dynamic-all.txt.gz" if wordlist_id == 1 else api_name - prompt_suffix = f" (API filename: {api_filename})" if api_filename else " (API filename)" + prompt_suffix = ( + f" (API filename: {api_filename})" + if api_filename + else " (API filename)" + ) output_file = ( input( f"Enter output file name{prompt_suffix} or press Enter to use API filename: " - ) - .strip() + ).strip() or None ) if output_file is None and wordlist_id == 1: @@ -2226,7 +2258,9 @@ def hashview_api(): continue # Use hashfile from original command if available - hashfile_path = hcatHashFileOrig # Use original path, not the modified one + hashfile_path = ( + hcatHashFileOrig # Use original path, not the modified one + ) if not hashfile_path or not os.path.exists(hashfile_path): hashfile_path = select_file_with_autocomplete( "Enter path to hashfile (TAB to autocomplete)" @@ -2236,11 +2270,11 @@ def hashview_api(): hashfile_path = hashfile_path[0] if hashfile_path else None if isinstance(hashfile_path, str): hashfile_path = hashfile_path.strip() - + if not hashfile_path or not os.path.exists(hashfile_path): print(f"Error: File not found: {hashfile_path}") continue - + # Use hash type from original command if available, otherwise prompt if hcatHashType and str(hcatHashType).isdigit(): hash_type = int(hcatHashType) @@ -2251,23 +2285,34 @@ def hashview_api(): # Auto-detect file format based on content file_format = 5 # Default to hash_only try: - with open(hashfile_path, 'r', encoding='utf-8', errors='ignore') as f: + with open( + hashfile_path, "r", encoding="utf-8", errors="ignore" + ) as f: first_line = f.readline().strip() if first_line: # Check for pwdump format (username:hash or username:rid:lmhash:nthash) - parts = first_line.split(':') + parts = first_line.split(":") if len(parts) >= 4: # Likely pwdump format (username:rid:lmhash:nthash) file_format = 0 - elif len(parts) == 2 and not all(c in '0123456789abcdefABCDEF' for c in parts[0]): + elif len(parts) == 2 and not all( + c in "0123456789abcdefABCDEF" for c in parts[0] + ): # Likely user:hash format (first part is not all hex) file_format = 4 # Otherwise default to 5 (hash_only) except Exception: file_format = 5 # Default if detection fails - + print(f"\nAuto-detected file format: {file_format} ", end="") - format_names = {0: "pwdump", 1: "NetNTLM", 2: "kerberos", 3: "shadow", 4: "user:hash", 5: "hash_only"} + format_names = { + 0: "pwdump", + 1: "NetNTLM", + 2: "kerberos", + 3: "shadow", + 4: "user:hash", + 5: "hash_only", + } print(f"({format_names.get(file_format, 'unknown')})") # Default hashfile name to the basename of the file @@ -2373,7 +2418,9 @@ def hashview_api(): print( f"\n✓ Success: {result.get('msg', 'Customer created')}" ) - customer_id = result.get("customer_id") or result.get("id") + customer_id = result.get( + "customer_id" + ) or result.get("id") if not customer_id: print("\n✗ Error: Customer ID not returned.") continue @@ -2400,7 +2447,9 @@ def hashview_api(): ) if not customer_hashfiles: - print(f"\nNo hashfiles found for customer ID {customer_id}") + print( + f"\nNo hashfiles found for customer ID {customer_id}" + ) continue print("\n" + "=" * 120) @@ -2412,14 +2461,18 @@ def hashview_api(): for hf in customer_hashfiles: hf_id = hf.get("id") hf_name = hf.get("name", "N/A") - hf_type = hf.get("hash_type") or hf.get("hashtype") or "N/A" + hf_type = ( + hf.get("hash_type") or hf.get("hashtype") or "N/A" + ) if hf_id is None: continue # Truncate long names to fit within 120 columns if len(str(hf_name)) > 96: hf_name = str(hf_name)[:93] + "..." if debug_mode: - print(f"[DEBUG] Hashfile {hf_id}: hash_type={hf.get('hash_type')}, hashtype={hf.get('hashtype')}, combined={hf_type}") + print( + f"[DEBUG] Hashfile {hf_id}: hash_type={hf.get('hash_type')}, hashtype={hf.get('hashtype')}, combined={hf_type}" + ) print(f"{hf_id:<10} {hf_type:<10} {hf_name:<96}") hashfile_map[int(hf_id)] = hf_type print("=" * 120) @@ -2430,13 +2483,19 @@ def hashview_api(): while True: try: - hashfile_id_input = input("\nEnter hashfile ID: ").strip() + hashfile_id_input = input( + "\nEnter hashfile ID: " + ).strip() hashfile_id = int(hashfile_id_input) except ValueError: - print("\n✗ Error: Invalid ID entered. Please enter a numeric ID.") + print( + "\n✗ Error: Invalid ID entered. Please enter a numeric ID." + ) continue if hashfile_id not in hashfile_map: - print("\n✗ Error: Hashfile ID not in the list. Please try again.") + print( + "\n✗ Error: Hashfile ID not in the list. Please try again." + ) continue break break @@ -2447,13 +2506,17 @@ def hashview_api(): # Get hash type for hashcat from the hashfile map selected_hash_type = hashfile_map.get(hashfile_id) if debug_mode: - print(f"[DEBUG] selected_hash_type from map: {selected_hash_type}") + print( + f"[DEBUG] selected_hash_type from map: {selected_hash_type}" + ) if not selected_hash_type or selected_hash_type == "N/A": try: details = api_harness.get_hashfile_details(hashfile_id) selected_hash_type = details.get("hashtype") if debug_mode: - print(f"[DEBUG] selected_hash_type from get_hashfile_details: {selected_hash_type}") + print( + f"[DEBUG] selected_hash_type from get_hashfile_details: {selected_hash_type}" + ) except Exception as e: if debug_mode: print(f"[DEBUG] Error fetching hashfile details: {e}") @@ -2461,9 +2524,14 @@ def hashview_api(): # Download the left hashes if debug_mode: - print(f"[DEBUG] Calling download_left_hashes with hash_type={selected_hash_type}") + print( + f"[DEBUG] Calling download_left_hashes with hash_type={selected_hash_type}" + ) download_result = api_harness.download_left_hashes( - customer_id, hashfile_id, output_file, hash_type=selected_hash_type + customer_id, + hashfile_id, + output_file, + hash_type=selected_hash_type, ) print(f"\n✓ Success: Downloaded {download_result['size']} bytes") print(f" File: {download_result['output_file']}") @@ -2519,7 +2587,9 @@ def hashview_api(): print( f"\n✓ Success: {result.get('msg', 'Customer created')}" ) - customer_id = result.get("customer_id") or result.get("id") + customer_id = result.get( + "customer_id" + ) or result.get("id") if not customer_id: print("\n✗ Error: Customer ID not returned.") continue @@ -2546,7 +2616,9 @@ def hashview_api(): ) if not customer_hashfiles: - print(f"\nNo hashfiles found for customer ID {customer_id}") + print( + f"\nNo hashfiles found for customer ID {customer_id}" + ) continue print("\n" + "=" * 120) @@ -2558,14 +2630,18 @@ def hashview_api(): for hf in customer_hashfiles: hf_id = hf.get("id") hf_name = hf.get("name", "N/A") - hf_type = hf.get("hash_type") or hf.get("hashtype") or "N/A" + hf_type = ( + hf.get("hash_type") or hf.get("hashtype") or "N/A" + ) if hf_id is None: continue # Truncate long names to fit within 120 columns if len(str(hf_name)) > 96: hf_name = str(hf_name)[:93] + "..." if debug_mode: - print(f"[DEBUG] Hashfile {hf_id}: hash_type={hf.get('hash_type')}, hashtype={hf.get('hashtype')}, combined={hf_type}") + print( + f"[DEBUG] Hashfile {hf_id}: hash_type={hf.get('hash_type')}, hashtype={hf.get('hashtype')}, combined={hf_type}" + ) print(f"{hf_id:<10} {hf_type:<10} {hf_name:<96}") hashfile_map[int(hf_id)] = hf_type print("=" * 120) @@ -2576,13 +2652,19 @@ def hashview_api(): while True: try: - hashfile_id_input = input("\nEnter hashfile ID: ").strip() + hashfile_id_input = input( + "\nEnter hashfile ID: " + ).strip() hashfile_id = int(hashfile_id_input) except ValueError: - print("\n✗ Error: Invalid ID entered. Please enter a numeric ID.") + print( + "\n✗ Error: Invalid ID entered. Please enter a numeric ID." + ) continue if hashfile_id not in hashfile_map: - print("\n✗ Error: Hashfile ID not in the list. Please try again.") + print( + "\n✗ Error: Hashfile ID not in the list. Please try again." + ) continue break break @@ -2593,13 +2675,17 @@ def hashview_api(): # Get hash type for hashcat from the hashfile map selected_hash_type = hashfile_map.get(hashfile_id) if debug_mode: - print(f"[DEBUG] selected_hash_type from map: {selected_hash_type}") + print( + f"[DEBUG] selected_hash_type from map: {selected_hash_type}" + ) if not selected_hash_type or selected_hash_type == "N/A": try: details = api_harness.get_hashfile_details(hashfile_id) selected_hash_type = details.get("hashtype") if debug_mode: - print(f"[DEBUG] selected_hash_type from get_hashfile_details: {selected_hash_type}") + print( + f"[DEBUG] selected_hash_type from get_hashfile_details: {selected_hash_type}" + ) except Exception as e: if debug_mode: print(f"[DEBUG] Error fetching hashfile details: {e}") @@ -2607,7 +2693,9 @@ def hashview_api(): # Download the found hashes if debug_mode: - print(f"[DEBUG] Calling download_found_hashes with hash_type={selected_hash_type}") + print( + f"[DEBUG] Calling download_found_hashes with hash_type={selected_hash_type}" + ) download_result = api_harness.download_found_hashes( customer_id, hashfile_id, output_file ) @@ -2887,25 +2975,25 @@ def analyze_rules(): print("\nError: HashcatRosetta formatting module not found.") print("Make sure HashcatRosetta submodule is properly initialized.") return - - print("\n" + "="*60) + + print("\n" + "=" * 60) print("Rule Opcode Analyzer") - print("="*60) - + print("=" * 60) + # Get rule file path from user rule_file = input("\nEnter path to rule file: ").strip() - + if not rule_file: print("No rule file specified.") return - + # Expand user path rule_file = os.path.expanduser(rule_file) - + if not os.path.isfile(rule_file): print(f"Error: Rule file not found: {rule_file}") return - + try: display_rule_opcodes_summary(rule_file) print() @@ -3087,7 +3175,9 @@ def main(): "--hashfile-id", required=True, type=int, help="Hashfile ID" ) hv_download_left.add_argument( - "--hash-type", default=None, help="Hash type for hashcat (e.g., 1000 for NTLM)" + "--hash-type", + default=None, + help="Hash type for hashcat (e.g., 1000 for NTLM)", ) hv_download_found = hashview_subparsers.add_parser( @@ -3137,7 +3227,7 @@ def main(): # Removed add_common_args(parser) since config items are now only set via config file argv = sys.argv[1:] - + hashview_subcommands = [ "upload-cracked", "upload-wordlist", @@ -3147,9 +3237,13 @@ def main(): ] has_hashview_flag = "--hashview" in argv has_hashview_subcommand = any(cmd in argv for cmd in hashview_subcommands) - + # Handle custom help for --hashview (without subcommand) - if has_hashview_flag and not has_hashview_subcommand and ("--help" in argv or "-h" in argv): + if ( + has_hashview_flag + and not has_hashview_subcommand + and ("--help" in argv or "-h" in argv) + ): # Build the full parser to get hashview help temp_parser, hashview_parser = _build_parser( include_positional=False, @@ -3158,7 +3252,7 @@ def main(): if hashview_parser: hashview_parser.print_help() sys.exit(0) - + # If --hashview flag is used with a subcommand, convert to subcommand format for parser if has_hashview_flag and has_hashview_subcommand: # Remove --hashview flag and insert "hashview" as subcommand @@ -3170,7 +3264,7 @@ def main(): break else: argv = argv_temp # Fallback if subcommand not found - + use_subcommand_parser = "hashview" in argv parser, hashview_parser = _build_parser( include_positional=not use_subcommand_parser, @@ -3400,7 +3494,12 @@ def main(): elif choice == "5": sys.exit(0) else: - if args.download_hashview or args.weakpass or args.hashmob or args.rules: + if ( + args.download_hashview + or args.weakpass + or args.hashmob + or args.rules + ): sys.exit(0) # At this point, a hashfile must be loaded @@ -3460,7 +3559,10 @@ def main(): # NetNTLMv2-ESS format is similar, with Enhanced Session Security pwdump_format = False # Try to detect if it's NetNTLMv2-ESS (has specific markers) - if re.search(r"^.+::.+:.+:[a-f0-9A-F]{16}:[a-f0-9A-F]{32}:[a-f0-9A-F]+$", hcatHashFileLine): + if re.search( + r"^.+::.+:.+:[a-f0-9A-F]{16}:[a-f0-9A-F]{32}:[a-f0-9A-F]+$", + hcatHashFileLine, + ): print("NetNTLMv2-ESS format detected") print("Note: Hash type should be 5600 for NetNTLMv2-ESS hashes") else: diff --git a/tests/test_fingerprint_expander_and_hybrid.py b/tests/test_fingerprint_expander_and_hybrid.py index 9d1c36a..e4700da 100644 --- a/tests/test_fingerprint_expander_and_hybrid.py +++ b/tests/test_fingerprint_expander_and_hybrid.py @@ -9,7 +9,9 @@ def test_fingerprint_crack_prompts_for_expander_len_and_enables_hybrid(monkeypat seen = {} - def fake_hcatFingerprint(hash_type, hash_file, expander_len, run_hybrid_on_expanded=False): + def fake_hcatFingerprint( + hash_type, hash_file, expander_len, run_hybrid_on_expanded=False + ): seen["hash_type"] = hash_type seen["hash_file"] = hash_file seen["expander_len"] = expander_len @@ -87,7 +89,9 @@ def test_hcatFingerprint_uses_selected_expander_and_calls_hybrid(monkeypatch, tm # Run with expander24 and ensure secondary hybrid gets the expanded file. monkeypatch.setattr(hc_main, "hcatHashFile", str(hashfile), raising=False) - hc_main.hcatFingerprint("1000", str(hashfile), expander_len=24, run_hybrid_on_expanded=True) + hc_main.hcatFingerprint( + "1000", str(hashfile), expander_len=24, run_hybrid_on_expanded=True + ) assert any( isinstance(args[0], str) and args[0].endswith("expander24.bin") diff --git a/tests/test_hashview.py b/tests/test_hashview.py index dae489e..3f5181f 100644 --- a/tests/test_hashview.py +++ b/tests/test_hashview.py @@ -261,7 +261,7 @@ class TestHashviewAPI: content = f.read() assert content == b"hash1\nhash2\n" assert result["size"] == len(content) - + # Verify auth headers were passed in the left hashes download call call_args_list = api.session.get.call_args_list left_call = [c for c in call_args_list if "left" in str(c)][0] @@ -307,7 +307,7 @@ class TestHashviewAPI: content = f.read() assert content == b"hash1:pass1\nhash2:pass2\n" assert result["size"] == len(content) - + # Verify auth headers were passed in the found hashes download call call_args_list = api.session.get.call_args_list found_call = [c for c in call_args_list if "found" in str(c)][0] @@ -350,7 +350,7 @@ class TestHashviewAPI: content = f.read() assert content == b"gzipdata" assert result["size"] == len(content) - + # Verify auth headers were passed in the download call # session.get should be called with headers containing the auth cookie call_args_list = api.session.get.call_args_list @@ -365,7 +365,9 @@ class TestHashviewAPI: """Live test for Hashview wordlist listing with auth headers.""" # Only run this test if explicitly enabled if os.environ.get("HASHVIEW_TEST_REAL", "").lower() not in ("1", "true", "yes"): - pytest.skip("Set HASHVIEW_TEST_REAL=1 to run live Hashview list_wordlists test.") + pytest.skip( + "Set HASHVIEW_TEST_REAL=1 to run live Hashview list_wordlists test." + ) # For live tests, prefer explicit env vars so developers don't accidentally # hit a config.json default/localhost target. @@ -373,7 +375,7 @@ class TestHashviewAPI: hashview_api_key = os.environ.get("HASHVIEW_API_KEY") if not hashview_url or not hashview_api_key: pytest.skip("Missing HASHVIEW_URL/HASHVIEW_API_KEY env vars.") - + # Only proceed if the server is actually reachable try: import socket @@ -383,7 +385,9 @@ class TestHashviewAPI: host = parsed.hostname port = parsed.port if not host: - pytest.skip(f"Could not parse hostname from hashview_url: {hashview_url!r}") + pytest.skip( + f"Could not parse hostname from hashview_url: {hashview_url!r}" + ) if port is None: port = 443 if parsed.scheme == "https" else 80 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -394,7 +398,7 @@ class TestHashviewAPI: pytest.skip(f"Hashview server not reachable at {host}:{port}") except Exception as e: pytest.skip(f"Could not check Hashview server availability: {e}") - + real_api = HashviewAPI(hashview_url, hashview_api_key) wordlists = real_api.list_wordlists() assert isinstance(wordlists, list) @@ -557,35 +561,37 @@ class TestHashviewAPI: pwdump_file.write_text( "Administrator:500:aad3b435b51404eeaad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0:::\n" ) - + # Test user:hash format (2 parts, non-hex username) userhash_file = tmp_path / "userhash.txt" userhash_file.write_text("user123:5f4dcc3b5aa765d61d8327deb882cf99\n") - + # Test hash_only format (default) hashonly_file = tmp_path / "hashonly.txt" hashonly_file.write_text("5f4dcc3b5aa765d61d8327deb882cf99\n") - + # Test hex:hash format (should be hash_only since first part is all hex) hexhash_file = tmp_path / "hexhash.txt" hexhash_file.write_text("abcdef123456:5f4dcc3b5aa765d61d8327deb882cf99\n") - + # Detection logic (same as in main.py) def detect_format(filepath): file_format = 5 # Default to hash_only try: - with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: + with open(filepath, "r", encoding="utf-8", errors="ignore") as f: first_line = f.readline().strip() if first_line: - parts = first_line.split(':') + parts = first_line.split(":") if len(parts) >= 4: file_format = 0 # pwdump - elif len(parts) == 2 and not all(c in '0123456789abcdefABCDEF' for c in parts[0]): + elif len(parts) == 2 and not all( + c in "0123456789abcdefABCDEF" for c in parts[0] + ): file_format = 4 # user:hash except Exception: file_format = 5 return file_format - + # Verify detection assert detect_format(pwdump_file) == 0, "Should detect pwdump format" assert detect_format(userhash_file) == 4, "Should detect user:hash format" @@ -599,75 +605,86 @@ class TestHashviewAPI: other_cwd = tmp_path / "other_cwd" other_cwd.mkdir() monkeypatch.chdir(other_cwd) - + # Mock left hashes download mock_left_response = Mock() mock_left_response.content = b"uncracked_hash1\nuncracked_hash2\n" mock_left_response.raise_for_status = Mock() mock_left_response.headers = {"content-length": "0"} - + def iter_content_left(chunk_size=8192): yield mock_left_response.content - + mock_left_response.iter_content = iter_content_left - + # Mock found hashes download mock_found_response = Mock() - mock_found_response.content = b"found_hash1:found_password1\nfound_hash2:found_password2\n" + mock_found_response.content = ( + b"found_hash1:found_password1\nfound_hash2:found_password2\n" + ) mock_found_response.raise_for_status = Mock() mock_found_response.headers = {"content-length": "0"} - + def iter_content_found(chunk_size=8192): yield mock_found_response.content - + mock_found_response.iter_content = iter_content_found - + # Set up session.get to return different responses api.session.get.side_effect = [mock_left_response, mock_found_response] - + # Download left hashes (should auto-download and split found for hashcat) left_file = tmp_path / "left_1_2.txt" result = api.download_left_hashes(1, 2, output_file=str(left_file)) - + # Verify left file was created assert os.path.exists(result["output_file"]) - + # Verify found file was downloaded and deleted found_file = tmp_path / "found_1_2.txt" - assert not os.path.exists(found_file), "Found file should be deleted after split" + assert not os.path.exists(found_file), ( + "Found file should be deleted after split" + ) assert not (other_cwd / "found_1_2.txt").exists() - + # Verify split files were created and deleted found_hashes_file = tmp_path / "found_hashes_1_2.txt" found_clears_file = tmp_path / "found_clears_1_2.txt" - assert not os.path.exists(str(found_hashes_file)), "Split hashes file should be deleted" - assert not os.path.exists(str(found_clears_file)), "Split clears file should be deleted" + assert not os.path.exists(str(found_hashes_file)), ( + "Split hashes file should be deleted" + ) + assert not os.path.exists(str(found_clears_file)), ( + "Split clears file should be deleted" + ) + def test_download_left_id_matching(self, api, tmp_path): """Test that found hashes only merge when customer_id and hashfile_id match""" # Create .out file with specific IDs out_file = tmp_path / "left_1_2.txt.out" out_file.write_text("existing_hash:password\n") - + # Mock left hashes download for different IDs mock_response = Mock() mock_response.content = b"hash1\nhash2\n" mock_response.raise_for_status = Mock() mock_response.headers = {"content-length": "0"} - + def iter_content(chunk_size=8192): yield mock_response.content - + mock_response.iter_content = iter_content api.session.get.return_value = mock_response - + # Download left hashes with different IDs (3_4 instead of 1_2) left_file = tmp_path / "left_3_4.txt" api.download_left_hashes(3, 4, output_file=str(left_file)) - + # Verify the different IDs' .out file wasn't affected - with open(str(out_file), 'r') as f: + with open(str(out_file), "r") as f: content = f.read() - assert content == "existing_hash:password\n", "Different ID's .out file should be unchanged" + assert content == "existing_hash:password\n", ( + "Different ID's .out file should be unchanged" + ) def test_download_left_tolerates_missing_found(self, api, tmp_path): """Test that 404 on found hash download doesn't fail the workflow""" @@ -676,34 +693,35 @@ class TestHashviewAPI: mock_left_response.content = b"hash1\nhash2\n" mock_left_response.raise_for_status = Mock() mock_left_response.headers = {"content-length": "0"} - + def iter_content(chunk_size=8192): yield mock_left_response.content - + mock_left_response.iter_content = iter_content - + # Mock 404 response for found download from requests.exceptions import HTTPError + mock_found_response = Mock() mock_found_response.status_code = 404 - + def raise_404(): response = Mock() response.status_code = 404 raise HTTPError("404 Not Found", response=response) - + mock_found_response.raise_for_status = raise_404 - + # Set up session.get to return different responses api.session.get.side_effect = [mock_left_response, mock_found_response] - + # Download left hashes (should complete despite 404 on found) left_file = tmp_path / "left_1_2.txt" result = api.download_left_hashes(1, 2, output_file=str(left_file)) - + # Verify left file was created successfully assert os.path.exists(result["output_file"]) - with open(result["output_file"], 'rb') as f: + with open(result["output_file"], "rb") as f: content = f.read() assert content == b"hash1\nhash2\n" @@ -711,34 +729,39 @@ class TestHashviewAPI: """Test that original hashfile path is preserved before _ensure_hashfile_in_cwd""" import sys import os - sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + + sys.path.insert( + 0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) + ) from hate_crack.main import _ensure_hashfile_in_cwd - + # Create a test hashfile in a different directory test_dir = tmp_path / "subdir" test_dir.mkdir() test_file = test_dir / "test.txt" test_file.write_text("hash1\nhash2\n") - + original_path = str(test_file) - + # Save current directory orig_cwd = os.getcwd() try: # Change to tmp_path os.chdir(str(tmp_path)) - + # Call _ensure_hashfile_in_cwd result_path = _ensure_hashfile_in_cwd(original_path) - + # The result should be different from original (in cwd now) # But original_path should still exist and be unchanged assert os.path.exists(original_path), "Original file should still exist" assert os.path.exists(result_path), "Result file should exist" - + # If they're different, result should be in cwd if result_path != original_path: - assert os.path.dirname(result_path) == str(tmp_path), "Result should be in cwd" + assert os.path.dirname(result_path) == str(tmp_path), ( + "Result should be in cwd" + ) finally: os.chdir(orig_cwd) diff --git a/tests/test_hashview_cli_subcommands_subprocess.py b/tests/test_hashview_cli_subcommands_subprocess.py index 888bb6f..1e31c88 100644 --- a/tests/test_hashview_cli_subcommands_subprocess.py +++ b/tests/test_hashview_cli_subcommands_subprocess.py @@ -47,10 +47,10 @@ def _ensure_customer_one(): if not url or not key: pytest.skip("Missing hashview_url/hashview_api_key in config.json or env.") api = HashviewAPI(url, key) - + # Get customer ID from environment or default to 1 customer_id = int(os.environ.get("HASHVIEW_CUSTOMER_ID", "1")) - + try: customers_result = api.list_customers() except Exception as exc: diff --git a/tests/test_utils.py b/tests/test_utils.py index 4e0e405..c0b3397 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -24,7 +24,8 @@ def test_setup_logging_adds_single_streamhandler(tmp_path): stream_handlers = [ h for h in logger.handlers - if isinstance(h, logging.StreamHandler) and not isinstance(h, logging.FileHandler) + if isinstance(h, logging.StreamHandler) + and not isinstance(h, logging.FileHandler) ] assert len(stream_handlers) == 1 file_handlers = [h for h in logger.handlers if isinstance(h, logging.FileHandler)]