import sys import timeit import logging import argparse import subprocess import tqdm import tabulate import capa.main import capa.perf import capa.rules import capa.engine import capa.helpers import capa.features import capa.features.common import capa.features.freeze logger = logging.getLogger("capa.profile") def main(argv=None): if argv is None: argv = sys.argv[1:] label = subprocess.run( "git show --pretty=oneline --abbrev-commit | head -n 1", shell=True, capture_output=True, text=True ).stdout.strip() is_dirty = ( subprocess.run( "git status | grep 'modified: ' | grep -v 'rules' | grep -v 'tests/data'", shell=True, capture_output=True, text=True, ).stdout != "" ) if is_dirty: label += " (dirty)" parser = argparse.ArgumentParser(description="Profile capa performance") capa.main.install_common_args(parser, wanted={"format", "sample", "signatures", "rules"}) parser.add_argument("--number", type=int, default=3, help="batch size of profile collection") parser.add_argument("--repeat", type=int, default=30, help="batch count of profile collection") parser.add_argument("--label", type=str, default=label, help="description of the profile collection") args = parser.parse_args(args=argv) capa.main.handle_common_args(args) try: taste = capa.helpers.get_file_taste(args.sample) except IOError as e: logger.error("%s", str(e)) return -1 try: with capa.main.timing("load rules"): rules = capa.rules.RuleSet(capa.main.get_rules(args.rules, disable_progress=True)) except (IOError) as e: logger.error("%s", str(e)) return -1 try: sig_paths = capa.main.get_signatures(args.signatures) except (IOError) as e: logger.error("%s", str(e)) return -1 if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)): with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) else: extractor = capa.main.get_extractor( args.sample, args.format, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False ) with tqdm.tqdm(total=args.number * args.repeat) as pbar: def do_iteration(): capa.perf.reset() capa.main.find_capabilities(rules, extractor, disable_progress=True) pbar.update(1) samples = timeit.repeat(do_iteration, number=args.number, repeat=args.repeat) logger.debug("perf: find capabilities: min: %0.2fs" % (min(samples) / float(args.number))) logger.debug("perf: find capabilities: avg: %0.2fs" % (sum(samples) / float(args.repeat) / float(args.number))) logger.debug("perf: find capabilities: max: %0.2fs" % (max(samples) / float(args.number))) for (counter, count) in capa.perf.counters.most_common(): logger.debug("perf: counter: {:}: {:,}".format(counter, count)) print( tabulate.tabulate( [ ( args.label, "{:,}".format(capa.perf.counters["evaluate.feature"]), "%0.2fs" % (sum(samples) / float(args.repeat) / float(args.number)), "%0.2fs" % (min(samples) / float(args.number)), "%0.2fs" % (max(samples) / float(args.number)), ) ], headers=["label", "count(evaluations)", "avg(time)", "min(time)", "max(time)"], tablefmt="github", ) ) return 0 if __name__ == "__main__": sys.exit(main())