From b4870b120e36adee6eec10470239c637e6c71457 Mon Sep 17 00:00:00 2001 From: Aayush Goel <81844215+Aayush-Goel-04@users.noreply.github.com> Date: Sat, 3 Jun 2023 15:33:49 +0530 Subject: [PATCH] Remove from_capa API for MetaData --- capa/ida/helpers.py | 67 ++++++------- capa/ida/plugin/form.py | 7 +- capa/main.py | 121 ++++++++++++----------- capa/render/result_document.py | 74 +------------- scripts/bulk-process.py | 6 +- scripts/capa_as_library.py | 7 +- scripts/show-capabilities-by-function.py | 6 +- 7 files changed, 112 insertions(+), 176 deletions(-) diff --git a/capa/ida/helpers.py b/capa/ida/helpers.py index 46e8907e..d66bfdd0 100644 --- a/capa/ida/helpers.py +++ b/capa/ida/helpers.py @@ -22,7 +22,8 @@ import capa import capa.version import capa.render.utils as rutils import capa.features.common -import capa.render.result_document +import capa.features.freeze +import capa.render.result_document as rdoc from capa.features.address import AbsoluteVirtualAddress logger = logging.getLogger("capa") @@ -140,38 +141,34 @@ def collect_metadata(rules): else: os = "unknown os" - return capa.render.result_document.Metadata.from_capa( - { - "timestamp": datetime.datetime.now().isoformat(), - "argv": [], - "sample": { - "md5": md5, - "sha1": "", # not easily accessible - "sha256": sha256, - "path": idaapi.get_input_file_path(), - }, - "analysis": { - "format": idaapi.get_file_type_name(), - "arch": arch, - "os": os, - "extractor": "ida", - "rules": rules, - "base_address": idaapi.get_imagebase(), - "layout": { - # this is updated after capabilities have been collected. - # will look like: - # - # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } - }, - # ignore these for now - not used by IDA plugin. - "feature_counts": { - "file": {}, - "functions": {}, - }, - "library_functions": {}, - }, - "version": capa.version.__version__, - } + return rdoc.Metadata( + timestamp=datetime.datetime.now(), + version=capa.version.__version__, + argv=(), + sample=rdoc.Sample( + md5=md5, + sha1="", # not easily accessible + sha256=sha256, + path=idaapi.get_input_file_path(), + ), + analysis=rdoc.Analysis( + format=idaapi.get_file_type_name(), + arch=arch, + os=os, + extractor="ida", + rules=rules, + base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()), + layout=rdoc.Layout( + functions=tuple() + # this is updated after capabilities have been collected. + # will look like: + # + # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } + ), + # ignore these for now - not used by IDA plugin. + feature_counts=rdoc.FeatureCounts(file=0, functions=tuple()), + library_functions=tuple(), + ), ) @@ -219,12 +216,12 @@ def idb_contains_cached_results() -> bool: return False -def load_and_verify_cached_results() -> Optional[capa.render.result_document.ResultDocument]: +def load_and_verify_cached_results() -> Optional[rdoc.ResultDocument]: """verifies that cached results have valid (mapped) addresses for the current database""" logger.debug("loading cached capa results from netnode '%s'", CAPA_NETNODE) n = netnode.Netnode(CAPA_NETNODE) - doc = capa.render.result_document.ResultDocument.parse_obj(json.loads(n[NETNODE_RESULTS])) + doc = rdoc.ResultDocument.parse_obj(json.loads(n[NETNODE_RESULTS])) for rule in rutils.capability_rules(doc): for location_, _ in rule.matches: diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index f74c0c76..72b33a66 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -771,8 +771,11 @@ class CapaExplorerForm(idaapi.PluginForm): try: meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]]) capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True) - meta.analysis.__dict__.update(counts) - meta.analysis.__dict__.update(capa.main.compute_layout(ruleset, extractor, capabilities)) + + meta.analysis.feature_counts = counts["feature_counts"] + meta.analysis.library_functions = counts["library_functions"] + meta.analysis.layout = capa.main.compute_layout(ruleset, extractor, capabilities) + except UserCancelledError: logger.info("User cancelled analysis.") return False diff --git a/capa/main.py b/capa/main.py index 722597a5..76e7bffc 100644 --- a/capa/main.py +++ b/capa/main.py @@ -38,7 +38,7 @@ import capa.rules.cache import capa.render.default import capa.render.verbose import capa.features.common -import capa.features.freeze +import capa.features.freeze as frz import capa.render.vverbose import capa.features.extractors import capa.render.result_document @@ -247,13 +247,8 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro all_bb_matches = collections.defaultdict(list) # type: MatchResults all_insn_matches = collections.defaultdict(list) # type: MatchResults - meta = { - "feature_counts": { - "file": 0, - "functions": {}, - }, - "library_functions": {}, - } # type: Dict[str, Any] + feature_counts = rdoc.FeatureCounts(file=0, functions=tuple()) + library_functions: Tuple[rdoc.LibraryFunction, ...] = tuple() with redirecting_print_to_tqdm(disable_progress): with tqdm.contrib.logging.logging_redirect_tqdm(): @@ -272,8 +267,10 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro if extractor.is_library_function(f.address): function_name = extractor.get_function_name(f.address) logger.debug("skipping library function 0x%x (%s)", f.address, function_name) - meta["library_functions"][f.address] = function_name - n_libs = len(meta["library_functions"]) + library_functions += ( + rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name), + ) + n_libs = len(library_functions) percentage = round(100 * (n_libs / n_funcs)) if isinstance(pb, tqdm.tqdm): pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)") @@ -282,7 +279,9 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities( ruleset, extractor, f ) - meta["feature_counts"]["functions"][f.address] = feature_count + feature_counts.functions += ( + rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count), + ) logger.debug("analyzed function 0x%x and extracted %d features", f.address, feature_count) for rule_name, res in function_matches.items(): @@ -303,7 +302,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro capa.engine.index_rule_matches(function_and_lower_features, rule, locations) all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) - meta["feature_counts"]["file"] = feature_count + feature_counts.file = feature_count matches = { rule_name: results @@ -317,11 +316,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro all_file_matches.items(), ) } - meta["feature_counts"] = rdoc.FeatureCounts.from_capa(meta["feature_counts"]) - meta["library_functions"] = tuple( - rdoc.LibraryFunction(address=capa.features.freeze.Address.from_capa(address), name=name) - for address, name in meta["library_functions"].items() - ) + + meta = { + "feature_counts": feature_counts, + "library_functions": library_functions, + } return matches, meta @@ -765,39 +764,37 @@ def collect_metadata( arch = get_arch(sample_path) os_ = get_os(sample_path) if os_ == OS_AUTO else os_ - return rdoc.Metadata.from_capa( - { - "timestamp": datetime.datetime.now().isoformat(), - "version": capa.version.__version__, - "argv": argv, - "sample": { - "md5": md5.hexdigest(), - "sha1": sha1.hexdigest(), - "sha256": sha256.hexdigest(), - "path": os.path.normpath(sample_path), - }, - "analysis": { - "format": format_, - "arch": arch, - "os": os_, - "extractor": extractor.__class__.__name__, - "rules": rules_path, - "base_address": extractor.get_base_address(), - "layout": { - "functions": {}, - # this is updated after capabilities have been collected. - # will look like: - # - # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } - }, - "feature_counts": {"file": 0, "functions": {}}, - "library_functions": {}, - }, - } + return rdoc.Metadata( + timestamp=datetime.datetime.now(), + version=capa.version.__version__, + argv=tuple(argv) if argv else None, + sample=rdoc.Sample( + md5=md5.hexdigest(), + sha1=sha1.hexdigest(), + sha256=sha256.hexdigest(), + path=os.path.normpath(sample_path), + ), + analysis=rdoc.Analysis( + format=format_, + arch=arch, + os=os_, + extractor=extractor.__class__.__name__, + rules=tuple(rules_path), + base_address=frz.Address.from_capa(extractor.get_base_address()), + layout=rdoc.Layout( + functions=tuple(), + # this is updated after capabilities have been collected. + # will look like: + # + # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } + ), + feature_counts=rdoc.FeatureCounts(file=0, functions=tuple()), + library_functions=tuple(), + ), ) -def compute_layout(rules, extractor, capabilities) -> Dict[str, rdoc.Layout]: +def compute_layout(rules, extractor, capabilities) -> rdoc.Layout: """ compute a metadata structure that links basic blocks to the functions in which they're found. @@ -822,19 +819,21 @@ def compute_layout(rules, extractor, capabilities) -> Dict[str, rdoc.Layout]: assert addr in functions_by_bb matched_bbs.add(addr) - layout = { - "functions": { - f: { - "matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs] - # this object is open to extension in the future, + layout = rdoc.Layout( + functions=tuple( + rdoc.FunctionLayout( + address=frz.Address.from_capa(f), + matched_basic_blocks=tuple( + rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs + ) # this object is open to extension in the future, # such as with the function name, etc. - } + ) for f, bbs in bbs_by_function.items() if len([bb for bb in bbs if bb in matched_bbs]) > 0 - } - } + ) + ) - return {"layout": rdoc.Layout.from_capa(layout)} + return layout def install_common_args(parser, wanted=None): @@ -1226,7 +1225,7 @@ def main(argv=None): if format_ == FORMAT_FREEZE: # freeze format deserializes directly into an extractor with open(args.sample, "rb") as f: - extractor = capa.features.freeze.load(f.read()) + extractor = frz.load(f.read()) else: # all other formats we must create an extractor, # such as viv, binary ninja, etc. workspaces @@ -1268,8 +1267,9 @@ def main(argv=None): capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) - meta.analysis.__dict__.update(counts) - meta.analysis.__dict__.update(compute_layout(rules, extractor, capabilities)) + meta.analysis.feature_counts = counts["feature_counts"] + meta.analysis.library_functions = counts["library_functions"] + meta.analysis.layout = compute_layout(rules, extractor, capabilities) if has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary @@ -1321,7 +1321,8 @@ def ida_main(): capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor()) - meta.analysis.__dict__.update(counts) + meta.analysis.feature_counts = counts["feature_counts"] + meta.analysis.library_functions = counts["library_functions"] if has_file_limitation(rules, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis") diff --git a/capa/render/result_document.py b/capa/render/result_document.py index d1656e1f..11faf145 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -24,7 +24,6 @@ from capa.helpers import assert_never class FrozenModel(BaseModel): class Config: - frozen = True extra = "forbid" @@ -47,20 +46,6 @@ class FunctionLayout(FrozenModel): class Layout(FrozenModel): functions: Tuple[FunctionLayout, ...] - @classmethod - def from_capa(cls, layout: dict) -> "Layout": - return cls( - functions=tuple( - FunctionLayout( - address=frz.Address.from_capa(address), - matched_basic_blocks=tuple( - BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in f["matched_basic_blocks"] - ), - ) - for address, f in layout["functions"].items() - ) - ) - class LibraryFunction(FrozenModel): address: frz.Address @@ -76,16 +61,6 @@ class FeatureCounts(FrozenModel): file: int functions: Tuple[FunctionFeatureCount, ...] - @classmethod - def from_capa(cls, feature_counts: dict) -> "FeatureCounts": - return cls( - file=feature_counts["file"], - functions=tuple( - FunctionFeatureCount(address=frz.Address.from_capa(address), count=count) - for address, count in feature_counts["functions"].items() - ), - ) - class Analysis(FrozenModel): format: str @@ -106,50 +81,6 @@ class Metadata(FrozenModel): sample: Sample analysis: Analysis - @classmethod - def from_capa(cls, meta: dict) -> "Metadata": - return cls( - timestamp=meta["timestamp"], - version=meta["version"], - argv=meta["argv"] if "argv" in meta else None, - sample=Sample( - md5=meta["sample"]["md5"], - sha1=meta["sample"]["sha1"], - sha256=meta["sample"]["sha256"], - path=meta["sample"]["path"], - ), - analysis=Analysis( - format=meta["analysis"]["format"], - arch=meta["analysis"]["arch"], - os=meta["analysis"]["os"], - extractor=meta["analysis"]["extractor"], - rules=meta["analysis"]["rules"], - base_address=frz.Address.from_capa(meta["analysis"]["base_address"]), - layout=Layout( - functions=tuple( - FunctionLayout( - address=frz.Address.from_capa(address), - matched_basic_blocks=tuple( - BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in f["matched_basic_blocks"] - ), - ) - for address, f in meta["analysis"]["layout"]["functions"].items() - ) - ), - feature_counts=FeatureCounts( - file=meta["analysis"]["feature_counts"]["file"], - functions=tuple( - FunctionFeatureCount(address=frz.Address.from_capa(address), count=count) - for address, count in meta["analysis"]["feature_counts"]["functions"].items() - ), - ), - library_functions=tuple( - LibraryFunction(address=frz.Address.from_capa(address), name=name) - for address, name in meta["analysis"]["library_functions"].items() - ), - ), - ) - class CompoundStatementType: AND = "and" @@ -648,10 +579,7 @@ class ResultDocument(FrozenModel): ), ) - if isinstance(meta, Metadata): - return ResultDocument(meta=meta, rules=rule_matches) - - return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches) + return ResultDocument(meta=meta, rules=rule_matches) def to_capa(self) -> Tuple[Metadata, Dict]: capabilities: Dict[ diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 6c9114da..5e3ed0a1 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -131,8 +131,10 @@ def get_capa_results(args): meta = capa.main.collect_metadata([], path, format, os_, [], extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) - meta.analysis.__dict__.update(counts) - meta.analysis.__dict__.update(capa.main.compute_layout(rules, extractor, capabilities)) + + meta.analysis.feature_counts = counts["feature_counts"] + meta.analysis.library_functions = counts["library_functions"] + meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) doc = rd.ResultDocument.from_capa(meta, rules, capabilities) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index f97dc2de..57657018 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -172,10 +172,13 @@ def capa_details(rules_path, file_path, output_format="dictionary"): # collect metadata (used only to make rendering more complete) meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor) - meta.analysis.__dict__.update(counts) - meta.analysis.__dict__.update(capa.main.compute_layout(rules, extractor, capabilities)) + + meta.analysis.feature_counts = counts["feature_counts"] + meta.analysis.library_functions = counts["library_functions"] + meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) capa_output: Any = False + if output_format == "dictionary": # ...as python dictionary, simplified as textable but in dictionary doc = rd.ResultDocument.from_capa(meta, rules, capabilities) diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 5ed38a46..b58c7568 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -178,8 +178,10 @@ def main(argv=None): meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor) - meta.analysis.__dict__.update(counts) - meta.analysis.__dict__.update(capa.main.compute_layout(rules, extractor, capabilities)) + + meta.analysis.feature_counts = counts["feature_counts"] + meta.analysis.library_functions = counts["library_functions"] + meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) if capa.main.has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary