diff --git a/scripts/capa2yara.py b/scripts/capa2yara.py index 39bab168..835c1b41 100644 --- a/scripts/capa2yara.py +++ b/scripts/capa2yara.py @@ -43,7 +43,6 @@ import capa.rules import capa.engine import capa.features import capa.features.insn -from capa.features.common import String logger = logging.getLogger("capa2yara") diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index bd860af5..b4690e58 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -2,6 +2,7 @@ import json import collections +from typing import Any, Dict import capa.main import capa.rules @@ -10,52 +11,48 @@ import capa.features import capa.render.json import capa.render.utils as rutils import capa.render.default -import capa.render.result_document +import capa.render.result_document as rd +import capa.features.freeze.features as frzf from capa.engine import * -# edit this to set the path for file to analyze and rule directory -RULES_PATH = "/tmp/capa/rules/" - -# load rules from disk -rules = capa.rules.RuleSet(capa.main.get_rules([RULES_PATH], disable_progress=True)) # == Render ddictionary helpers -def render_meta(doc, ostream): - ostream["md5"] = doc["meta"]["sample"]["md5"] - ostream["sha1"] = doc["meta"]["sample"]["sha1"] - ostream["sha256"] = doc["meta"]["sample"]["sha256"] - ostream["path"] = doc["meta"]["sample"]["path"] +def render_meta(doc: rd.ResultDocument, result): + result["md5"] = doc.meta.sample.md5 + result["sha1"] = doc.meta.sample.sha1 + result["sha256"] = doc.meta.sample.sha256 + result["path"] = doc.meta.sample.path -def find_subrule_matches(doc): +def find_subrule_matches(doc: rd.ResultDocument) -> Set[str]: """ collect the rule names that have been matched as a subrule match. this way we can avoid displaying entries for things that are too specific. """ matches = set([]) - def rec(node): - if not node["success"]: + def rec(node: rd.Match): + if not node.success: # there's probably a bug here for rules that do `not: match: ...` # but we don't have any examples of this yet return - elif node["node"]["type"] == "statement": - for child in node["children"]: + elif isinstance(node.node, rd.StatementNode): + for child in node.children: rec(child) - elif node["node"]["type"] == "feature": - if node["node"]["feature"]["type"] == "match": - matches.add(node["node"]["feature"]["match"]) + elif isinstance(node.node, rd.FeatureNode): + if isinstance(node.node.feature, frzf.MatchFeature): + matches.add(node.node.feature.match) for rule in rutils.capability_rules(doc): - for node in rule["matches"].values(): + for _, node in rule.matches: rec(node) return matches -def render_capabilities(doc, ostream): +def render_capabilities(doc: rd.ResultDocument, result): """ example:: {'CAPABILITY': {'accept command line arguments': 'host-interaction/cli', @@ -68,25 +65,25 @@ def render_capabilities(doc, ostream): """ subrule_matches = find_subrule_matches(doc) - ostream["CAPABILITY"] = dict() + result["CAPABILITY"] = dict() for rule in rutils.capability_rules(doc): - if rule["meta"]["name"] in subrule_matches: + if rule.meta.name in subrule_matches: # rules that are also matched by other rules should not get rendered by default. # this cuts down on the amount of output while giving approx the same detail. # see #224 continue - count = len(rule["matches"]) + count = len(rule.matches) if count == 1: - capability = rule["meta"]["name"] + capability = rule.meta.name else: - capability = "%s (%d matches)" % (rule["meta"]["name"], count) + capability = "%s (%d matches)" % (rule.meta.name, count) - ostream["CAPABILITY"].setdefault(rule["meta"]["namespace"], list()) - ostream["CAPABILITY"][rule["meta"]["namespace"]].append(capability) + result["CAPABILITY"].setdefault(rule.meta.namespace, list()) + result["CAPABILITY"][rule.meta.namespace].append(capability) -def render_attack(doc, ostream): +def render_attack(doc, result): """ example:: {'ATT&CK': {'COLLECTION': ['Input Capture::Keylogging [T1056.001]'], @@ -99,13 +96,13 @@ def render_attack(doc, ostream): 'EXECUTION': ['Shared Modules [T1129]']} } """ - ostream["ATTCK"] = dict() + result["ATTCK"] = dict() tactics = collections.defaultdict(set) for rule in rutils.capability_rules(doc): - if not rule["meta"].get("att&ck"): + if not rule.meta.attack: continue - for attack in rule["meta"]["att&ck"]: - tactics[attack["tactic"]].add((attack["technique"], attack.get("subtechnique"), attack["id"])) + for attack in rule.meta.attack: + tactics[attack.tactic].add((attack.technique, attack.subtechnique, attack.id)) for tactic, techniques in sorted(tactics.items()): inner_rows = [] @@ -114,10 +111,10 @@ def render_attack(doc, ostream): inner_rows.append("%s %s" % (technique, id)) else: inner_rows.append("%s::%s %s" % (technique, subtechnique, id)) - ostream["ATTCK"].setdefault(tactic.upper(), inner_rows) + result["ATTCK"].setdefault(tactic.upper(), inner_rows) -def render_mbc(doc, ostream): +def render_mbc(doc, result): """ example:: {'MBC': {'ANTI-BEHAVIORAL ANALYSIS': ['Debugger Detection::Timing/Delay Check ' @@ -132,14 +129,14 @@ def render_mbc(doc, ostream): '[C0021.004]']} } """ - ostream["MBC"] = dict() + result["MBC"] = dict() objectives = collections.defaultdict(set) for rule in rutils.capability_rules(doc): - if not rule["meta"].get("mbc"): + if not rule.meta.mbc: continue - for mbc in rule["meta"]["mbc"]: - objectives[mbc["objective"]].add((mbc["behavior"], mbc.get("method"), mbc["id"])) + for mbc in rule.meta.mbc: + objectives[mbc.objective].add((mbc.behavior, mbc.method, mbc.id)) for objective, behaviors in sorted(objectives.items()): inner_rows = [] @@ -148,34 +145,37 @@ def render_mbc(doc, ostream): inner_rows.append("%s [%s]" % (behavior, id)) else: inner_rows.append("%s::%s [%s]" % (behavior, method, id)) - ostream["MBC"].setdefault(objective.upper(), inner_rows) + result["MBC"].setdefault(objective.upper(), inner_rows) -def render_dictionary(doc): - ostream = dict() - render_meta(doc, ostream) - render_attack(doc, ostream) - render_mbc(doc, ostream) - render_capabilities(doc, ostream) +def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]: + result = dict() + render_meta(doc, result) + render_attack(doc, result) + render_mbc(doc, result) + render_capabilities(doc, result) - return ostream + return result # ==== render dictionary helpers -def capa_details(file_path, output_format="dictionary"): +def capa_details(rules_path, file_path, output_format="dictionary"): + # load rules from disk + rules = capa.rules.RuleSet(capa.main.get_rules([rules_path], disable_progress=True)) + # extract features and find capabilities extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, [], False, disable_progress=True) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) - meta = capa.main.collect_metadata("", file_path, RULES_PATH, extractor) + meta = capa.main.collect_metadata([], file_path, rules_path, extractor) meta["analysis"].update(counts) meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities) capa_output = False if output_format == "dictionary": # ...as python dictionary, simplified as textable but in dictionary - doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities) + doc = rd.ResultDocument.from_capa(meta, rules, capabilities) capa_output = render_dictionary(doc) elif output_format == "json": # render results @@ -186,3 +186,22 @@ def capa_details(file_path, output_format="dictionary"): capa_output = capa.render.default.render(meta, rules, capabilities) return capa_output + + +if __name__ == "__main__": + import sys + import os.path + import argparse + + RULES_PATH = os.path.join(os.path.dirname(__file__), "..", "rules") + + parser = argparse.ArgumentParser(description="Extract capabilities from a file") + parser.add_argument("file", help="file to extract capabilities from") + parser.add_argument("--rules", help="path to rules directory", default=os.path.abspath(RULES_PATH)) + parser.add_argument( + "--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary" + ) + args = parser.parse_args() + + print(capa_details(args.rules, args.file, args.output)) + sys.exit(0) diff --git a/tests/test_scripts.py b/tests/test_scripts.py index ff443844..b519f89e 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -41,6 +41,7 @@ def get_rule_path(): pytest.param("show-capabilities-by-function.py", [get_file_path()]), pytest.param("show-features.py", [get_file_path()]), pytest.param("show-features.py", ["-F", "0x407970", get_file_path()]), + pytest.param("capa_as_library.py", [get_file_path()]), ], ) def test_scripts(script, args):