Files
hacktricks-cloud/scripts/translator.py
Carlos Polop f327002a99 trans
2024-12-31 17:53:49 +01:00

382 lines
15 KiB
Python

import argparse
import os
from openai import OpenAI #pip3 install openai
import time
import shutil
import tempfile
import subprocess
import sys
import tiktoken
import concurrent.futures
from tqdm import tqdm #pip3 install tqdm
import traceback
MASTER_BRANCH = "master"
VERBOSE = True
MAX_TOKENS = 10000 #gpt-4-1106-preview
def reportTokens(prompt, model):
encoding = tiktoken.encoding_for_model(model)
# print number of tokens in light gray, with first 50 characters of prompt in green. if truncated, show that it is truncated
#print("\033[37m" + str(len(encoding.encode(prompt))) + " tokens\033[0m" + " in prompt: " + "\033[92m" + prompt[:50] + "\033[0m" + ("..." if len(prompt) > 50 else ""))
return len(encoding.encode(prompt))
def check_git_dir(path):
if os.path.isdir(os.path.join(path, '.git')):
return True
return False
def get_branch_files(branch):
"""Get a list of all files in a branch."""
command = f"git ls-tree -r --name-only {branch}"
result = subprocess.run(command.split(), stdout=subprocess.PIPE)
files = result.stdout.decode().splitlines()
return set(files)
def delete_unique_files(branch):
"""Delete files that are unique to branch2."""
# Get the files in each branch
files_branch1 = get_branch_files(MASTER_BRANCH)
files_branch2 = get_branch_files(branch)
# Find the files that are in branch2 but not in branch1
unique_files = files_branch2 - files_branch1
if unique_files:
# Switch to the second branch
subprocess.run(["git", "checkout", branch])
# Delete the unique files from the second branch
for file in unique_files:
subprocess.run(["git", "rm", file])
subprocess.run(["git", "checkout", MASTER_BRANCH])
print(f"[+] Deleted {len(unique_files)} files from branch: {branch}")
def cp_translation_to_repo_dir_and_check_gh_branch(branch, temp_folder, translate_files):
branch_exists = subprocess.run(['git', 'show-ref', '--verify', '--quiet', 'refs/heads/' + branch])
# If branch doesn't exist, create it
if branch_exists.returncode != 0:
subprocess.run(['git', 'checkout', '-b', branch])
else:
subprocess.run(['git', 'checkout', branch])
# Walk through source directory
for dirpath, dirnames, filenames in os.walk(temp_folder):
# Compute destination path
dest_path = os.path.join(os.getcwd(), os.path.relpath(dirpath, temp_folder))
# Create directory structure in destination, if not already present
if not os.path.exists(dest_path):
os.makedirs(dest_path)
# Copy each file from source to destination
for file_name in filenames:
src_file = os.path.join(dirpath, file_name)
shutil.copy2(src_file, dest_path)
print(f"Translated files copied to branch: {branch}")
if translate_files:
subprocess.run(['git', 'add', "-A"])
subprocess.run(['git', 'commit', '-m', f"Translated {translate_files} to {branch}"[:72]])
subprocess.run(['git', 'checkout', MASTER_BRANCH])
print("Commit created and moved to master branch")
else:
print("No commiting anything, leaving in language branch")
def translate_text(language, text, file_path, model, cont=0, slpitted=False, client=None):
if not text:
return text
messages = [
{"role": "system", "content": "You are a professional hacker, translator and writer. You write everything super clear and as concise as possible without loosing information. Do not return invalid Unicode output."},
{"role": "system", "content": f"The following is content from a hacking book about hacking techiques. The following content is from the file {file_path}. Translate the relevant English text to {language} and return the translation keeping excatly the same markdown and html syntax. Do not translate things like code, hacking technique names, hacking word, cloud/SaaS platform names (like Workspace, aws, gcp...), the word 'leak', pentesting, and markdown tags. Also don't add any extra stuff apart from the translation and markdown syntax."},
{"role": "user", "content": text},
]
try:
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0
)
except Exception as e:
print("Python Exception: " + str(e))
if cont > 6:
print(f"Page {file_path} could not be translated due to count with text: {text}\nReturning text as is.")
return text
if "exceeded your current quota" in str(e).lower():
print("Critical error: Quota exceeded")
exit(1)
if "is currently overloaded" in str(e).lower():
print("Overloaded, waiting 30 seconds")
time.sleep(30)
elif "timeout" in str(e).lower():
print("Timeout, waiting 30 seconds")
cont += 1
time.sleep(30)
elif "rate limit" in str(e).lower():
print("Rate limit, waiting 60 seconds")
cont += 1
time.sleep(60)
elif "maximum context length" in str(e).lower() or "generated invalid unicode output" in str(e).lower():
if "maximum context length" in str(e).lower():
print("Maximum context length, splitting text in two and translating separately")
elif "generated invalid unicode output" in str(e).lower():
print("Invalid unicode error detected.")
if slpitted:
#print(f"Page {file_path} could not be translated with text: {text}")
print(f"Page {file_path} could not be translated.\nReturning text as is.")
return text
text1 = text.split('\n')[:len(text.split('\n'))//2]
text2 = text.split('\n')[len(text.split('\n'))//2:]
return translate_text(language, '\n'.join(text1), file_path, model, cont, False, client) + '\n' + translate_text(language, '\n'.join(text2), file_path, model, cont, True, client)
print("Retrying translation")
return translate_text(language, text, file_path, model, cont, False, client)
response_message = response.choices[0].message.content.strip()
# Sometimes chatgpt modified the number of "#" at the beginning of the text, so we need to fix that. This is specially important for the first line of the MD that mucst have only 1 "#"
cont2 = 0
while (text.startswith('# ') and not response_message[cont2:].startswith('# ')):
cont2 += 1
if cont2 > 3:
cont2 = 0
print(f"Error with initial '#', something went wrong, recheck: {response_message[:30]}")
break
response_message = response_message[cont2:]
return response_message
def split_text(text, model):
global MAX_TOKENS
lines = text.split('\n')
chunks = []
chunk = ''
in_code_block = False
for line in lines:
# If we are in a code block, just add the code to the chunk
if line.startswith('```'):
# If we are in a code block, finish it with the "```"
if in_code_block:
chunk += line + '\n'
in_code_block = not in_code_block
chunks.append(chunk.strip())
chunk = ''
# If a code block is started, add the "```" to the chunk
if in_code_block:
chunk += line + '\n'
continue
if (line.startswith('#') and reportTokens(chunk + "\n" + line.strip(), model) > MAX_TOKENS*0.8) or \
reportTokens(chunk + "\n" + line.strip(), model) > MAX_TOKENS:
chunks.append(chunk.strip())
chunk = ''
chunk += line.strip() + '\n'
chunks.append(chunk.strip())
return chunks
def copy_gitbook_dir(source_path, dest_path):
folder_name = ".gitbook/"
source_folder = os.path.join(source_path, folder_name)
destination_folder = os.path.join(dest_path, folder_name)
if not os.path.exists(source_folder):
print(f"Error: {source_folder} does not exist.")
else:
# Copy the .gitbook folder
shutil.copytree(source_folder, destination_folder)
print(f"Copied .gitbook folder from {source_folder} to {destination_folder}")
def copy_summary(source_path, dest_path):
file_name = "src/SUMMARY.md"
source_filepath = os.path.join(source_path, file_name)
dest_filepath = os.path.join(dest_path, file_name)
shutil.copy2(source_filepath, dest_filepath)
print("[+] Copied SUMMARY.md")
def translate_file(language, file_path, file_dest_path, model, client):
global VERBOSE
if file_path.endswith('SUMMARY.md'):
return
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
content_chunks = split_text(content, model)
translated_content = ''
start_time = time.time()
for chunk in content_chunks:
# Don't trasnlate code blocks
if chunk.startswith('```'):
translated_content += chunk + '\n'
else:
translated_content += translate_text(language, chunk, file_path, model, cont=0, slpitted=False, client=client) + '\n'
elapsed_time = time.time() - start_time
# make sure directory exists
os.makedirs(os.path.dirname(file_dest_path), exist_ok=True)
with open(file_dest_path, 'w', encoding='utf-8') as f:
f.write(translated_content)
#if VERBOSE:
print(f"Page {file_path} translated in {elapsed_time:.2f} seconds")
def translate_directory(language, source_path, dest_path, model, num_threads, client):
all_markdown_files = []
for subdir, dirs, files in os.walk(source_path):
for file in files:
if file.endswith('.md') and file != "SUMMARY.md":
source_filepath = os.path.join(subdir, file)
dest_filepath = os.path.join(dest_path, os.path.relpath(source_filepath, source_path))
all_markdown_files.append((source_filepath, dest_filepath))
print(f"Translating {len(all_markdown_files)} files")
#with tqdm(total=len(all_markdown_files), desc="Translating Files") as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
for source_filepath, dest_filepath in all_markdown_files:
if os.path.exists(dest_filepath):
continue
os.makedirs(os.path.dirname(dest_filepath), exist_ok=True)
future = executor.submit(translate_file, language, source_filepath, dest_filepath, model, client)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
try:
future.result()
#pbar.update()
except Exception as exc:
tb = traceback.format_exc()
print(f'Translation generated an exception: {exc}')
print("Traceback:", tb)
if __name__ == "__main__":
print("- Version 1.1.1")
# Set up argparse
parser = argparse.ArgumentParser(description='Translate gitbook and copy to a new branch.')
parser.add_argument('-d', '--directory', action='store_true', help='Translate a full directory.')
parser.add_argument('-l', '--language', required=True, help='Target language for translation.')
parser.add_argument('-b', '--branch', required=True, help='Branch name to copy translated files.')
parser.add_argument('-k', '--api-key', required=True, help='API key to use.')
parser.add_argument('-m', '--model', default="gpt-4o-mini", help='The openai model to use. By default: gpt-4o-mini')
parser.add_argument('-o', '--org-id', help='The org ID to use (if not set the default one will be used).')
parser.add_argument('-f', '--file-paths', help='If this is set, only the indicated files will be translated (" , " separated).')
parser.add_argument('-n', '--dont-cd', action='store_false', help="If this is true, the script won't change the current directory.")
parser.add_argument('-t', '--threads', default=5, type=int, help="Number of threads to use to translate a directory.")
#parser.add_argument('-v', '--verbose', action='store_false', help="Get the time it takes to translate each page.")
args = parser.parse_args()
source_folder = os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))
dest_folder = tempfile.mkdtemp()
language = args.language.capitalize()
branch = args.branch
model = args.model
org_id = args.org_id
num_threads = args.threads
#VERBOSE = args.verbose
client = OpenAI(
api_key=args.api_key,
organization=org_id
)
# Start with the current directory.
current_dir = os.getcwd()
# Check if model is gpt-3.5
if "gpt-3.5" in model:
MAX_TOKENS = 2000
# Check the current directory
if check_git_dir(current_dir):
print('Found .git directory in current directory: ' + current_dir)
else:
# Check the parent directory
parent_dir = os.path.dirname(current_dir)
if check_git_dir(parent_dir):
print('Found .git directory in parent directory: ' + parent_dir)
# Change the current working directory to the parent directory
os.chdir(parent_dir)
print('Current working directory has been changed to: ' + os.getcwd())
else:
print('No .git directory found in current or parent directory. Exiting.')
exit(1)
current_dir = os.getcwd()
print(f"The translated files will be copied to {current_dir}, make sure this is the expected folder.")
if not args.dont_cd:
# Change to the parent directory
os.chdir(source_folder)
translate_files = None # Need to initialize it here to avoid error
if args.file_paths:
# Translate only the indicated file
translate_files = [f for f in args.file_paths.split(' , ') if f]
for file_path in translate_files:
#with tqdm(total=len(all_markdown_files), desc="Translating Files") as pbar:
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
future = executor.submit(translate_file, language, file_path, os.path.join(dest_folder, file_path), model, client)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
try:
future.result()
#pbar.update()
except Exception as exc:
print(f'Translation generated an exception: {exc}')
# Delete possibly removed files from the master branch
delete_unique_files(branch)
elif args.directory:
# Translate everything
translate_directory(language, source_folder, dest_folder, model, num_threads, client)
else:
print("You need to indicate either a directory or a list of files to translate.")
exit(1)
# Copy summary
copy_summary(source_folder, dest_folder)
# Copy .gitbook folder
copy_gitbook_dir(source_folder, dest_folder)
# Create the branch and copy the translated files
cp_translation_to_repo_dir_and_check_gh_branch(branch, dest_folder, translate_files)