mirror of
https://github.com/mandiant/capa.git
synced 2025-12-22 15:16:22 -08:00
@@ -43,7 +43,6 @@ import capa.rules
|
|||||||
import capa.engine
|
import capa.engine
|
||||||
import capa.features
|
import capa.features
|
||||||
import capa.features.insn
|
import capa.features.insn
|
||||||
from capa.features.common import String
|
|
||||||
|
|
||||||
logger = logging.getLogger("capa2yara")
|
logger = logging.getLogger("capa2yara")
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
import collections
|
import collections
|
||||||
|
from typing import Any, Dict
|
||||||
|
|
||||||
import capa.main
|
import capa.main
|
||||||
import capa.rules
|
import capa.rules
|
||||||
@@ -10,52 +11,48 @@ import capa.features
|
|||||||
import capa.render.json
|
import capa.render.json
|
||||||
import capa.render.utils as rutils
|
import capa.render.utils as rutils
|
||||||
import capa.render.default
|
import capa.render.default
|
||||||
import capa.render.result_document
|
import capa.render.result_document as rd
|
||||||
|
import capa.features.freeze.features as frzf
|
||||||
from capa.engine import *
|
from capa.engine import *
|
||||||
|
|
||||||
# edit this to set the path for file to analyze and rule directory
|
|
||||||
RULES_PATH = "/tmp/capa/rules/"
|
|
||||||
|
|
||||||
# load rules from disk
|
|
||||||
rules = capa.rules.RuleSet(capa.main.get_rules([RULES_PATH], disable_progress=True))
|
|
||||||
|
|
||||||
# == Render ddictionary helpers
|
# == Render ddictionary helpers
|
||||||
def render_meta(doc, ostream):
|
def render_meta(doc: rd.ResultDocument, result):
|
||||||
ostream["md5"] = doc["meta"]["sample"]["md5"]
|
result["md5"] = doc.meta.sample.md5
|
||||||
ostream["sha1"] = doc["meta"]["sample"]["sha1"]
|
result["sha1"] = doc.meta.sample.sha1
|
||||||
ostream["sha256"] = doc["meta"]["sample"]["sha256"]
|
result["sha256"] = doc.meta.sample.sha256
|
||||||
ostream["path"] = doc["meta"]["sample"]["path"]
|
result["path"] = doc.meta.sample.path
|
||||||
|
|
||||||
|
|
||||||
def find_subrule_matches(doc):
|
def find_subrule_matches(doc: rd.ResultDocument) -> Set[str]:
|
||||||
"""
|
"""
|
||||||
collect the rule names that have been matched as a subrule match.
|
collect the rule names that have been matched as a subrule match.
|
||||||
this way we can avoid displaying entries for things that are too specific.
|
this way we can avoid displaying entries for things that are too specific.
|
||||||
"""
|
"""
|
||||||
matches = set([])
|
matches = set([])
|
||||||
|
|
||||||
def rec(node):
|
def rec(node: rd.Match):
|
||||||
if not node["success"]:
|
if not node.success:
|
||||||
# there's probably a bug here for rules that do `not: match: ...`
|
# there's probably a bug here for rules that do `not: match: ...`
|
||||||
# but we don't have any examples of this yet
|
# but we don't have any examples of this yet
|
||||||
return
|
return
|
||||||
|
|
||||||
elif node["node"]["type"] == "statement":
|
elif isinstance(node.node, rd.StatementNode):
|
||||||
for child in node["children"]:
|
for child in node.children:
|
||||||
rec(child)
|
rec(child)
|
||||||
|
|
||||||
elif node["node"]["type"] == "feature":
|
elif isinstance(node.node, rd.FeatureNode):
|
||||||
if node["node"]["feature"]["type"] == "match":
|
if isinstance(node.node.feature, frzf.MatchFeature):
|
||||||
matches.add(node["node"]["feature"]["match"])
|
matches.add(node.node.feature.match)
|
||||||
|
|
||||||
for rule in rutils.capability_rules(doc):
|
for rule in rutils.capability_rules(doc):
|
||||||
for node in rule["matches"].values():
|
for _, node in rule.matches:
|
||||||
rec(node)
|
rec(node)
|
||||||
|
|
||||||
return matches
|
return matches
|
||||||
|
|
||||||
|
|
||||||
def render_capabilities(doc, ostream):
|
def render_capabilities(doc: rd.ResultDocument, result):
|
||||||
"""
|
"""
|
||||||
example::
|
example::
|
||||||
{'CAPABILITY': {'accept command line arguments': 'host-interaction/cli',
|
{'CAPABILITY': {'accept command line arguments': 'host-interaction/cli',
|
||||||
@@ -68,25 +65,25 @@ def render_capabilities(doc, ostream):
|
|||||||
"""
|
"""
|
||||||
subrule_matches = find_subrule_matches(doc)
|
subrule_matches = find_subrule_matches(doc)
|
||||||
|
|
||||||
ostream["CAPABILITY"] = dict()
|
result["CAPABILITY"] = dict()
|
||||||
for rule in rutils.capability_rules(doc):
|
for rule in rutils.capability_rules(doc):
|
||||||
if rule["meta"]["name"] in subrule_matches:
|
if rule.meta.name in subrule_matches:
|
||||||
# rules that are also matched by other rules should not get rendered by default.
|
# rules that are also matched by other rules should not get rendered by default.
|
||||||
# this cuts down on the amount of output while giving approx the same detail.
|
# this cuts down on the amount of output while giving approx the same detail.
|
||||||
# see #224
|
# see #224
|
||||||
continue
|
continue
|
||||||
|
|
||||||
count = len(rule["matches"])
|
count = len(rule.matches)
|
||||||
if count == 1:
|
if count == 1:
|
||||||
capability = rule["meta"]["name"]
|
capability = rule.meta.name
|
||||||
else:
|
else:
|
||||||
capability = "%s (%d matches)" % (rule["meta"]["name"], count)
|
capability = "%s (%d matches)" % (rule.meta.name, count)
|
||||||
|
|
||||||
ostream["CAPABILITY"].setdefault(rule["meta"]["namespace"], list())
|
result["CAPABILITY"].setdefault(rule.meta.namespace, list())
|
||||||
ostream["CAPABILITY"][rule["meta"]["namespace"]].append(capability)
|
result["CAPABILITY"][rule.meta.namespace].append(capability)
|
||||||
|
|
||||||
|
|
||||||
def render_attack(doc, ostream):
|
def render_attack(doc, result):
|
||||||
"""
|
"""
|
||||||
example::
|
example::
|
||||||
{'ATT&CK': {'COLLECTION': ['Input Capture::Keylogging [T1056.001]'],
|
{'ATT&CK': {'COLLECTION': ['Input Capture::Keylogging [T1056.001]'],
|
||||||
@@ -99,13 +96,13 @@ def render_attack(doc, ostream):
|
|||||||
'EXECUTION': ['Shared Modules [T1129]']}
|
'EXECUTION': ['Shared Modules [T1129]']}
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
ostream["ATTCK"] = dict()
|
result["ATTCK"] = dict()
|
||||||
tactics = collections.defaultdict(set)
|
tactics = collections.defaultdict(set)
|
||||||
for rule in rutils.capability_rules(doc):
|
for rule in rutils.capability_rules(doc):
|
||||||
if not rule["meta"].get("att&ck"):
|
if not rule.meta.attack:
|
||||||
continue
|
continue
|
||||||
for attack in rule["meta"]["att&ck"]:
|
for attack in rule.meta.attack:
|
||||||
tactics[attack["tactic"]].add((attack["technique"], attack.get("subtechnique"), attack["id"]))
|
tactics[attack.tactic].add((attack.technique, attack.subtechnique, attack.id))
|
||||||
|
|
||||||
for tactic, techniques in sorted(tactics.items()):
|
for tactic, techniques in sorted(tactics.items()):
|
||||||
inner_rows = []
|
inner_rows = []
|
||||||
@@ -114,10 +111,10 @@ def render_attack(doc, ostream):
|
|||||||
inner_rows.append("%s %s" % (technique, id))
|
inner_rows.append("%s %s" % (technique, id))
|
||||||
else:
|
else:
|
||||||
inner_rows.append("%s::%s %s" % (technique, subtechnique, id))
|
inner_rows.append("%s::%s %s" % (technique, subtechnique, id))
|
||||||
ostream["ATTCK"].setdefault(tactic.upper(), inner_rows)
|
result["ATTCK"].setdefault(tactic.upper(), inner_rows)
|
||||||
|
|
||||||
|
|
||||||
def render_mbc(doc, ostream):
|
def render_mbc(doc, result):
|
||||||
"""
|
"""
|
||||||
example::
|
example::
|
||||||
{'MBC': {'ANTI-BEHAVIORAL ANALYSIS': ['Debugger Detection::Timing/Delay Check '
|
{'MBC': {'ANTI-BEHAVIORAL ANALYSIS': ['Debugger Detection::Timing/Delay Check '
|
||||||
@@ -132,14 +129,14 @@ def render_mbc(doc, ostream):
|
|||||||
'[C0021.004]']}
|
'[C0021.004]']}
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
ostream["MBC"] = dict()
|
result["MBC"] = dict()
|
||||||
objectives = collections.defaultdict(set)
|
objectives = collections.defaultdict(set)
|
||||||
for rule in rutils.capability_rules(doc):
|
for rule in rutils.capability_rules(doc):
|
||||||
if not rule["meta"].get("mbc"):
|
if not rule.meta.mbc:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
for mbc in rule["meta"]["mbc"]:
|
for mbc in rule.meta.mbc:
|
||||||
objectives[mbc["objective"]].add((mbc["behavior"], mbc.get("method"), mbc["id"]))
|
objectives[mbc.objective].add((mbc.behavior, mbc.method, mbc.id))
|
||||||
|
|
||||||
for objective, behaviors in sorted(objectives.items()):
|
for objective, behaviors in sorted(objectives.items()):
|
||||||
inner_rows = []
|
inner_rows = []
|
||||||
@@ -148,34 +145,37 @@ def render_mbc(doc, ostream):
|
|||||||
inner_rows.append("%s [%s]" % (behavior, id))
|
inner_rows.append("%s [%s]" % (behavior, id))
|
||||||
else:
|
else:
|
||||||
inner_rows.append("%s::%s [%s]" % (behavior, method, id))
|
inner_rows.append("%s::%s [%s]" % (behavior, method, id))
|
||||||
ostream["MBC"].setdefault(objective.upper(), inner_rows)
|
result["MBC"].setdefault(objective.upper(), inner_rows)
|
||||||
|
|
||||||
|
|
||||||
def render_dictionary(doc):
|
def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]:
|
||||||
ostream = dict()
|
result = dict()
|
||||||
render_meta(doc, ostream)
|
render_meta(doc, result)
|
||||||
render_attack(doc, ostream)
|
render_attack(doc, result)
|
||||||
render_mbc(doc, ostream)
|
render_mbc(doc, result)
|
||||||
render_capabilities(doc, ostream)
|
render_capabilities(doc, result)
|
||||||
|
|
||||||
return ostream
|
return result
|
||||||
|
|
||||||
|
|
||||||
# ==== render dictionary helpers
|
# ==== render dictionary helpers
|
||||||
def capa_details(file_path, output_format="dictionary"):
|
def capa_details(rules_path, file_path, output_format="dictionary"):
|
||||||
|
# load rules from disk
|
||||||
|
rules = capa.rules.RuleSet(capa.main.get_rules([rules_path], disable_progress=True))
|
||||||
|
|
||||||
# extract features and find capabilities
|
# extract features and find capabilities
|
||||||
extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, [], False, disable_progress=True)
|
extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, [], False, disable_progress=True)
|
||||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||||
|
|
||||||
# collect metadata (used only to make rendering more complete)
|
# collect metadata (used only to make rendering more complete)
|
||||||
meta = capa.main.collect_metadata("", file_path, RULES_PATH, extractor)
|
meta = capa.main.collect_metadata([], file_path, rules_path, extractor)
|
||||||
meta["analysis"].update(counts)
|
meta["analysis"].update(counts)
|
||||||
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)
|
||||||
|
|
||||||
capa_output = False
|
capa_output = False
|
||||||
if output_format == "dictionary":
|
if output_format == "dictionary":
|
||||||
# ...as python dictionary, simplified as textable but in dictionary
|
# ...as python dictionary, simplified as textable but in dictionary
|
||||||
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
|
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
|
||||||
capa_output = render_dictionary(doc)
|
capa_output = render_dictionary(doc)
|
||||||
elif output_format == "json":
|
elif output_format == "json":
|
||||||
# render results
|
# render results
|
||||||
@@ -186,3 +186,22 @@ def capa_details(file_path, output_format="dictionary"):
|
|||||||
capa_output = capa.render.default.render(meta, rules, capabilities)
|
capa_output = capa.render.default.render(meta, rules, capabilities)
|
||||||
|
|
||||||
return capa_output
|
return capa_output
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import sys
|
||||||
|
import os.path
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
RULES_PATH = os.path.join(os.path.dirname(__file__), "..", "rules")
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="Extract capabilities from a file")
|
||||||
|
parser.add_argument("file", help="file to extract capabilities from")
|
||||||
|
parser.add_argument("--rules", help="path to rules directory", default=os.path.abspath(RULES_PATH))
|
||||||
|
parser.add_argument(
|
||||||
|
"--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary"
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
print(capa_details(args.rules, args.file, args.output))
|
||||||
|
sys.exit(0)
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ def get_rule_path():
|
|||||||
pytest.param("show-capabilities-by-function.py", [get_file_path()]),
|
pytest.param("show-capabilities-by-function.py", [get_file_path()]),
|
||||||
pytest.param("show-features.py", [get_file_path()]),
|
pytest.param("show-features.py", [get_file_path()]),
|
||||||
pytest.param("show-features.py", ["-F", "0x407970", get_file_path()]),
|
pytest.param("show-features.py", ["-F", "0x407970", get_file_path()]),
|
||||||
|
pytest.param("capa_as_library.py", [get_file_path()]),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_scripts(script, args):
|
def test_scripts(script, args):
|
||||||
|
|||||||
Reference in New Issue
Block a user