diff --git a/capa/features/extractors/__init__.py b/capa/features/extractors/__init__.py index b44adbc6..85d1bd49 100644 --- a/capa/features/extractors/__init__.py +++ b/capa/features/extractors/__init__.py @@ -39,6 +39,15 @@ class FeatureExtractor(object): # super(FeatureExtractor, self).__init__() + @abc.abstractmethod + def get_base_address(self): + """ + fetch the preferred load address at which the sample was analyzed. + + returns: int + """ + raise NotImplemented + @abc.abstractmethod def extract_file_features(self): """ diff --git a/capa/features/extractors/ida/__init__.py b/capa/features/extractors/ida/__init__.py index 378a5c90..5091fb34 100644 --- a/capa/features/extractors/ida/__init__.py +++ b/capa/features/extractors/ida/__init__.py @@ -43,6 +43,9 @@ class IdaFeatureExtractor(FeatureExtractor): def __init__(self): super(IdaFeatureExtractor, self).__init__() + def get_base_address(self): + return idaapi.get_imagebase() + def extract_file_features(self): for feature, va in capa.features.extractors.ida.file.extract_features(): yield feature, va diff --git a/capa/features/extractors/viv/__init__.py b/capa/features/extractors/viv/__init__.py index 1e27083d..82950418 100644 --- a/capa/features/extractors/viv/__init__.py +++ b/capa/features/extractors/viv/__init__.py @@ -44,6 +44,10 @@ class VivisectFeatureExtractor(FeatureExtractor): self.vw = vw self.path = path + def get_base_address(self): + # assume there is only one file loaded into the vw + return list(self.vw.filemeta.values())[0]["imagebase"] + def extract_file_features(self): for feature, va in capa.features.extractors.viv.file.extract_features(self.vw, self.path): yield feature, va diff --git a/capa/ida/helpers/__init__.py b/capa/ida/helpers/__init__.py index 7a230000..06997137 100644 --- a/capa/ida/helpers/__init__.py +++ b/capa/ida/helpers/__init__.py @@ -1,7 +1,9 @@ import logging +import datetime import idc import idaapi +import idautils logger = logging.getLogger("capa") @@ -48,3 +50,17 @@ def get_func_start_ea(ea): """ """ f = idaapi.get_func(ea) return f if f is None else f.start_ea + + +def collect_metadata(): + return { + "timestamp": datetime.datetime.now().isoformat(), + # "argv" is not relevant here + "sample": { + "md5": idautils.GetInputFileMD5(), + # "sha1" not easily accessible + "sha256": idaapi.retrieve_input_file_sha256(), + "path": idaapi.get_input_file_path(), + }, + "analysis": {"format": idaapi.get_file_type_name(), "extractor": "ida",}, + } diff --git a/capa/ida/ida_capa_explorer.py b/capa/ida/ida_capa_explorer.py index 004e9a96..6ecf6404 100644 --- a/capa/ida/ida_capa_explorer.py +++ b/capa/ida/ida_capa_explorer.py @@ -363,12 +363,8 @@ class CapaExplorerForm(idaapi.PluginForm): logger.info("analysis completed.") - doc = capa.render.convert_capabilities_to_result_document(rules, capabilities) - - import json - - with open("C:\\Users\\spring\\Desktop\\hmm.json", "w") as twitter_data_file: - json.dump(doc, twitter_data_file, indent=4, sort_keys=True, cls=capa.render.CapaJsonObjectEncoder) + meta = capa.ida.helpers.collect_metadata() + doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities) self.model_data.render_capa_doc(doc) self.render_capa_doc_summary(doc) diff --git a/capa/main.py b/capa/main.py index 34264ede..ee582724 100644 --- a/capa/main.py +++ b/capa/main.py @@ -4,8 +4,10 @@ capa - detect capabilities in programs. """ import os import sys +import hashlib import logging import os.path +import datetime import collections import tqdm @@ -320,6 +322,36 @@ def get_rules(rule_path): return rules +def collect_metadata(argv, path, format, extractor): + md5 = hashlib.md5() + sha1 = hashlib.sha1() + sha256 = hashlib.sha256() + + with open(path, "rb") as f: + buf = f.read() + + md5.update(buf) + sha1.update(buf) + sha256.update(buf) + + return { + "timestamp": datetime.datetime.now().isoformat(), + "version": capa.version.__version__, + "argv": argv, + "sample": { + "md5": md5.hexdigest(), + "sha1": sha1.hexdigest(), + "sha256": sha256.hexdigest(), + "path": os.path.normpath(path), + }, + "analysis": { + "format": format, + "extractor": extractor.__class__.__name__, + "base_address": extractor.get_base_address(), + }, + } + + def main(argv=None): if argv is None: argv = sys.argv[1:] @@ -420,9 +452,11 @@ def main(argv=None): taste = f.read(8) if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)): + format = "freeze" with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) else: + format = args.format try: extractor = get_extractor(args.sample, args.format) except UnsupportedFormatError: @@ -446,6 +480,8 @@ def main(argv=None): logger.error("-" * 80) return -1 + meta = collect_metadata(argv, args.sample, format, extractor) + capabilities = find_capabilities(rules, extractor) if has_file_limitation(rules, capabilities): @@ -460,13 +496,13 @@ def main(argv=None): # renderers should use coloring and assume it will be stripped out if necessary. colorama.init() if args.json: - print(capa.render.render_json(rules, capabilities)) + print(capa.render.render_json(meta, rules, capabilities)) elif args.vverbose: - print(capa.render.render_vverbose(rules, capabilities)) + print(capa.render.render_vverbose(meta, rules, capabilities)) elif args.verbose: - print(capa.render.render_verbose(rules, capabilities)) + print(capa.render.render_verbose(meta, rules, capabilities)) else: - print(capa.render.render_default(rules, capabilities)) + print(capa.render.render_default(meta, rules, capabilities)) colorama.deinit() logger.info("done.") @@ -475,11 +511,12 @@ def main(argv=None): def ida_main(): + import capa.ida.helpers + import capa.features.extractors.ida + logging.basicConfig(level=logging.INFO) logging.getLogger().setLevel(logging.INFO) - import capa.ida.helpers - if not capa.ida.helpers.is_supported_file_type(): return -1 @@ -500,18 +537,15 @@ def ida_main(): logger.debug("default rule path (source method): %s", rules_path) rules = get_rules(rules_path) - import capa.rules - rules = capa.rules.RuleSet(rules) - import capa.features.extractors.ida - capabilities = find_capabilities(rules, capa.features.extractors.ida.IdaFeatureExtractor()) if has_file_limitation(rules, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis") - render_capabilities_default(rules, capabilities) + meta = capa.ida.helpers.collect_metadata() + print(capa.render.render_default(meta, rules, capabilities)) def is_runtime_ida(): diff --git a/capa/render/__init__.py b/capa/render/__init__.py index 2dabb022..5b625217 100644 --- a/capa/render/__init__.py +++ b/capa/render/__init__.py @@ -175,7 +175,7 @@ def convert_match_to_result_document(rules, capabilities, result): return doc -def convert_capabilities_to_result_document(rules, capabilities): +def convert_capabilities_to_result_document(meta, rules, capabilities): """ convert the given rule set and capabilities result to a common, Python-native data structure. this format can be directly emitted to JSON, or passed to the other `render_*` routines @@ -187,22 +187,29 @@ def convert_capabilities_to_result_document(rules, capabilities): ```json { - $rule-name: { - "meta": {...copied from rule.meta...}, - "matches: { - $address: {...match details...}, - ... - } - }, - ... + "meta": {...}, + "rules: { + $rule-name: { + "meta": {...copied from rule.meta...}, + "matches: { + $address: {...match details...}, + ... + } + }, + ... + } } ``` Args: + meta (Dict[str, Any]): rules (RuleSet): capabilities (Dict[str, List[Tuple[int, Result]]]): """ - doc = {} + doc = { + "meta": meta, + "rules": {}, + } for rule_name, matches in capabilities.items(): rule = rules[rule_name] @@ -210,7 +217,7 @@ def convert_capabilities_to_result_document(rules, capabilities): if rule.meta.get("capa/subscope-rule"): continue - doc[rule_name] = { + doc["rules"][rule_name] = { "meta": dict(rule.meta), "source": rule.definition, "matches": { @@ -221,35 +228,37 @@ def convert_capabilities_to_result_document(rules, capabilities): return doc -def render_vverbose(rules, capabilities): +def render_vverbose(meta, rules, capabilities): # there's an import loop here # if capa.render imports capa.render.vverbose # and capa.render.vverbose import capa.render (implicitly, as a submodule) # so, defer the import until routine is called, breaking the import loop. import capa.render.vverbose - doc = convert_capabilities_to_result_document(rules, capabilities) + doc = convert_capabilities_to_result_document(meta, rules, capabilities) return capa.render.vverbose.render_vverbose(doc) -def render_verbose(rules, capabilities): +def render_verbose(meta, rules, capabilities): # break import loop import capa.render.verbose - doc = convert_capabilities_to_result_document(rules, capabilities) + doc = convert_capabilities_to_result_document(meta, rules, capabilities) return capa.render.verbose.render_verbose(doc) -def render_default(rules, capabilities): +def render_default(meta, rules, capabilities): # break import loop import capa.render.verbose import capa.render.default - doc = convert_capabilities_to_result_document(rules, capabilities) + doc = convert_capabilities_to_result_document(meta, rules, capabilities) return capa.render.default.render_default(doc) class CapaJsonObjectEncoder(json.JSONEncoder): + """JSON encoder that emits Python sets as sorted lists""" + def default(self, obj): if isinstance(obj, (list, dict, int, float, bool, type(None))) or isinstance(obj, six.string_types): return json.JSONEncoder.default(self, obj) @@ -260,7 +269,7 @@ class CapaJsonObjectEncoder(json.JSONEncoder): return json.JSONEncoder.default(self, obj) -def render_json(rules, capabilities): +def render_json(meta, rules, capabilities): return json.dumps( - convert_capabilities_to_result_document(rules, capabilities), cls=CapaJsonObjectEncoder, sort_keys=True, + convert_capabilities_to_result_document(meta, rules, capabilities), cls=CapaJsonObjectEncoder, sort_keys=True, ) diff --git a/capa/render/default.py b/capa/render/default.py index a93bdda7..c46ef0cf 100644 --- a/capa/render/default.py +++ b/capa/render/default.py @@ -5,6 +5,8 @@ import tabulate import capa.render.utils as rutils +tabulate.PRESERVE_WHITESPACE = True + def width(s, character_count): """pad the given string to at least `character_count`""" @@ -14,6 +16,19 @@ def width(s, character_count): return s +def render_meta(doc, ostream): + rows = [(rutils.bold("Capa Report for"), rutils.bold(doc["meta"]["sample"]["md5"]),)] + + for k in ("timestamp", "version"): + rows.append((width(k, 22), width(doc["meta"][k], 60))) + + for k in ("path", "md5"): + rows.append((k, doc["meta"]["sample"][k])) + + ostream.write(tabulate.tabulate(rows, tablefmt="psql")) + ostream.write("\n") + + def render_capabilities(doc, ostream): """ example:: @@ -90,8 +105,10 @@ def render_attack(doc, ostream): def render_default(doc): - ostream = six.StringIO() + ostream = rutils.StringIO() + render_meta(doc, ostream) + ostream.write("\n") render_attack(doc, ostream) ostream.write("\n") render_capabilities(doc, ostream) diff --git a/capa/render/utils.py b/capa/render/utils.py index 670c1bcf..6ef1e854 100644 --- a/capa/render/utils.py +++ b/capa/render/utils.py @@ -20,7 +20,7 @@ def hex(n): def capability_rules(doc): """enumerate the rules in (namespace, name) order that are 'capability' rules (not lib/subscope/disposition/etc).""" for (_, _, rule) in sorted( - map(lambda rule: (rule["meta"].get("namespace", ""), rule["meta"]["name"], rule), doc.values()) + map(lambda rule: (rule["meta"].get("namespace", ""), rule["meta"]["name"], rule), doc["rules"].values()) ): if rule["meta"].get("lib"): continue diff --git a/capa/render/verbose.py b/capa/render/verbose.py index cf13402f..24e9cd3c 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -23,6 +23,21 @@ import capa.render.utils as rutils def render_verbose(doc): ostream = rutils.StringIO() + rows = [(rutils.bold("Capa Report for"), rutils.bold(doc["meta"]["sample"]["md5"]),)] + for k in ("timestamp", "version"): + rows.append((k, doc["meta"][k])) + + for k in ("path", "md5", "sha1", "sha256"): + rows.append((k, doc["meta"]["sample"][k])) + + for k in ("format", "extractor"): + rows.append((k.replace("_", " "), doc["meta"]["analysis"][k])) + + rows.append(("base address", rutils.hex(doc["meta"]["analysis"]["base_address"]))) + + ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) + ostream.write("\n") + for rule in rutils.capability_rules(doc): count = len(rule["matches"]) if count == 1: @@ -43,7 +58,7 @@ def render_verbose(doc): rows.append((key, v)) if rule["meta"]["scope"] != capa.rules.FILE_SCOPE: - locations = doc[rule["meta"]["name"]]["matches"].keys() + locations = doc["rules"][rule["meta"]["name"]]["matches"].keys() rows.append(("matches", "\n".join(map(rutils.hex, locations)))) ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index a0182b11..2fe96367 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -141,6 +141,22 @@ def render_match(ostream, match, indent=0, mode=MODE_SUCCESS): def render_vverbose(doc): ostream = rutils.StringIO() + rows = [(rutils.bold("Capa Report for"), rutils.bold(doc["meta"]["sample"]["md5"]),)] + for k in ("timestamp", "version"): + rows.append((k, doc["meta"][k])) + + for k in ("path", "md5", "sha1", "sha256"): + rows.append((k, doc["meta"]["sample"][k])) + + for k in ("format", "extractor"): + rows.append((k.replace("_", " "), doc["meta"]["analysis"][k])) + + rows.append(("base address", rutils.hex(doc["meta"]["analysis"]["base_address"]))) + + ostream.writeln(rutils.bold("Capa Report for " + doc["meta"]["sample"]["md5"])) + ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) + ostream.write("\n") + for rule in rutils.capability_rules(doc): count = len(rule["matches"]) if count == 1: @@ -165,7 +181,7 @@ def render_vverbose(doc): ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) if rule["meta"]["scope"] == capa.rules.FILE_SCOPE: - matches = list(doc[rule["meta"]["name"]]["matches"].values()) + matches = list(doc["rules"][rule["meta"]["name"]]["matches"].values()) if len(matches) != 1: # i think there should only ever be one match per file-scope rule, # because we do the file-scope evaluation a single time. @@ -174,7 +190,7 @@ def render_vverbose(doc): raise RuntimeError("unexpected file scope match count: " + len(matches)) render_match(ostream, matches[0], indent=0) else: - for location, match in sorted(doc[rule["meta"]["name"]]["matches"].items()): + for location, match in sorted(doc["rules"][rule["meta"]["name"]]["matches"].items()): ostream.write(rule["meta"]["scope"]) ostream.write(" @ ") ostream.writeln(rutils.hex(location)) diff --git a/setup.py b/setup.py index 84ea4f26..3522e8e2 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,9 @@ setuptools.setup( entry_points={"console_scripts": ["capa=capa.main:main",]}, include_package_data=True, install_requires=requirements, - extras_require={"dev": ["pytest", "pytest-sugar", "pycodestyle", "black", "isort"]}, + extras_require={ + "dev": ["pytest", "pytest-sugar", "pytest-instafail", "pytest-cov", "pycodestyle", "black", "isort"] + }, zip_safe=False, keywords="capa", classifiers=[