fix: dedent bulk-process.py main() body so explicit argv is used

The entire main() body was indented inside `if argv is None:`, causing
main() to silently return None when called with an explicit argv list.

Closes SURF-90.
This commit is contained in:
Willi Ballenthin
2026-04-22 22:23:57 +03:00
committed by Willi Ballenthin
parent a938c87fa4
commit 7d8714098c
3 changed files with 79 additions and 50 deletions
+58 -50
View File
@@ -160,69 +160,77 @@ def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os", "backend"})
parser.add_argument("input_directory", type=str, help="Path to directory of files to recursively analyze")
parser.add_argument(
"-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor"
)
parser.add_argument("--no-mp", action="store_true", help="disable subprocesses")
args = parser.parse_args(args=argv)
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os", "backend"})
parser.add_argument("input_directory", type=str, help="Path to directory of files to recursively analyze")
parser.add_argument("-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor")
parser.add_argument("--no-mp", action="store_true", help="disable subprocesses")
args = parser.parse_args(args=argv)
samples = []
for file in Path(args.input_directory).rglob("*"):
samples.append(file)
samples = []
for file in Path(args.input_directory).rglob("*"):
samples.append(file)
cpu_count = multiprocessing.cpu_count()
cpu_count = multiprocessing.cpu_count()
def pmap(f, args, parallelism=cpu_count):
"""apply the given function f to the given args using subprocesses"""
return multiprocessing.Pool(parallelism).imap(f, args)
def pmap(f, args, parallelism=cpu_count):
"""apply the given function f to the given args using subprocesses"""
return multiprocessing.Pool(parallelism).imap(f, args)
def tmap(f, args, parallelism=cpu_count):
"""apply the given function f to the given args using threads"""
return multiprocessing.pool.ThreadPool(parallelism).imap(f, args)
def tmap(f, args, parallelism=cpu_count):
"""apply the given function f to the given args using threads"""
return multiprocessing.pool.ThreadPool(parallelism).imap(f, args)
def map(f, args, parallelism=None):
"""apply the given function f to the given args in the current thread"""
for arg in args:
yield f(arg)
def map(f, args, parallelism=None):
"""apply the given function f to the given args in the current thread"""
for arg in args:
yield f(arg)
if args.no_mp:
if args.parallelism == 1:
logger.debug("using current thread mapper")
mapper = map
else:
logger.debug("using threading mapper")
mapper = tmap
if args.no_mp:
if args.parallelism == 1:
logger.debug("using current thread mapper")
mapper = map
else:
logger.debug("using process mapper")
mapper = pmap
logger.debug("using threading mapper")
mapper = tmap
else:
logger.debug("using process mapper")
mapper = pmap
rules = args.rules
if rules == [capa.main.RULES_PATH_DEFAULT_STRING]:
rules = None
rules = args.rules
if rules == [capa.main.RULES_PATH_DEFAULT_STRING]:
rules = None
results = {}
for result in mapper(
get_capa_results,
[(rules, args.signatures, args.format, args.backend, args.os, str(sample)) for sample in samples],
parallelism=args.parallelism,
):
if result["status"] == "error":
logger.warning(result["error"])
elif result["status"] == "ok":
doc = rd.ResultDocument.model_validate(result["ok"]).model_dump_json(exclude_none=True)
results[result["path"]] = json.loads(doc)
results = {}
for result in mapper(
get_capa_results,
[
(
rules,
args.signatures,
args.format,
args.backend,
args.os,
str(sample),
)
for sample in samples
],
parallelism=args.parallelism,
):
if result["status"] == "error":
logger.warning(result["error"])
elif result["status"] == "ok":
doc = rd.ResultDocument.model_validate(result["ok"]).model_dump_json(exclude_none=True)
results[result["path"]] = json.loads(doc)
else:
raise ValueError(f"unexpected status: {result['status']}")
else:
raise ValueError(f"unexpected status: {result['status']}")
print(json.dumps(results))
print(json.dumps(results))
logger.info("done.")
logger.info("done.")
return 0
return 0
if __name__ == "__main__":