wip: pydantic result document

This commit is contained in:
Willi Ballenthin
2022-05-31 07:40:50 -06:00
parent 8080752815
commit 42e2c53e5e
6 changed files with 669 additions and 519 deletions

View File

@@ -11,7 +11,8 @@ import collections
import tabulate
import capa.render.utils as rutils
import capa.render.result_document
import capa.render.result_document as rd
import capa.features.freeze as frz
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.render.utils import StringIO
@@ -27,50 +28,49 @@ def width(s: str, character_count: int) -> str:
return s
def render_meta(doc, ostream: StringIO):
def render_meta(doc: rd.ResultDocument, ostream: StringIO):
rows = [
(width("md5", 22), width(doc["meta"]["sample"]["md5"], 82)),
("sha1", doc["meta"]["sample"]["sha1"]),
("sha256", doc["meta"]["sample"]["sha256"]),
("os", doc["meta"]["analysis"]["os"]),
("format", doc["meta"]["analysis"]["format"]),
("arch", doc["meta"]["analysis"]["arch"]),
("path", doc["meta"]["sample"]["path"]),
(width("md5", 22), width(doc.meta.sample.md5, 82)),
("sha1", doc.meta.sample.sha1),
("sha256", doc.meta.sample.sha256),
("os", doc.meta.analysis.os),
("format", doc.meta.analysis.format),
("arch", doc.meta.analysis.arch),
("path", doc.meta.sample.path),
]
ostream.write(tabulate.tabulate(rows, tablefmt="psql"))
ostream.write("\n")
def find_subrule_matches(doc):
def find_subrule_matches(doc: rd.ResultDocument):
"""
collect the rule names that have been matched as a subrule match.
this way we can avoid displaying entries for things that are too specific.
"""
matches = set([])
def rec(node):
if not node["success"]:
def rec(node: rd.Match):
if not node.success:
# there's probably a bug here for rules that do `not: match: ...`
# but we don't have any examples of this yet
return
elif node["node"]["type"] == "statement":
for child in node["children"]:
elif isinstance(node, rd.StatementNode):
for child in node.children:
rec(child)
elif node["node"]["type"] == "feature":
if node["node"]["feature"]["type"] == "match":
matches.add(node["node"]["feature"]["match"])
elif isinstance(node, rd.FeatureNode) and isinstance(node.feature, frz.features.MatchFeature):
matches.add(node.node.feature.match)
for rule in rutils.capability_rules(doc):
for address, node in rule["matches"]:
for address, node in rule.matches:
rec(node)
return matches
def render_capabilities(doc, ostream: StringIO):
def render_capabilities(doc: rd.ResultDocument, ostream: StringIO):
"""
example::
@@ -86,18 +86,18 @@ def render_capabilities(doc, ostream: StringIO):
rows = []
for rule in rutils.capability_rules(doc):
if rule["meta"]["name"] in subrule_matches:
if rule.meta.name in subrule_matches:
# rules that are also matched by other rules should not get rendered by default.
# this cuts down on the amount of output while giving approx the same detail.
# see #224
continue
count = len(rule["matches"])
count = len(rule.matches)
if count == 1:
capability = rutils.bold(rule["meta"]["name"])
capability = rutils.bold(rule.meta.name)
else:
capability = "%s (%d matches)" % (rutils.bold(rule["meta"]["name"]), count)
rows.append((capability, rule["meta"]["namespace"]))
capability = "%s (%d matches)" % (rutils.bold(rule.meta.name), count)
rows.append((capability, rule.meta.namespace))
if rows:
ostream.write(
@@ -108,7 +108,7 @@ def render_capabilities(doc, ostream: StringIO):
ostream.writeln(rutils.bold("no capabilities found"))
def render_attack(doc, ostream: StringIO):
def render_attack(doc: rd.ResultDocument, ostream: StringIO):
"""
example::
@@ -126,11 +126,8 @@ def render_attack(doc, ostream: StringIO):
"""
tactics = collections.defaultdict(set)
for rule in rutils.capability_rules(doc):
if not rule["meta"].get("att&ck"):
continue
for attack in rule["meta"]["att&ck"]:
tactics[attack["tactic"]].add((attack["technique"], attack.get("subtechnique"), attack["id"]))
for attack in rule.meta.attack:
tactics[attack.tactic].add((attack.technique, attack.subtechnique, attack.id))
rows = []
for tactic, techniques in sorted(tactics.items()):
@@ -156,7 +153,7 @@ def render_attack(doc, ostream: StringIO):
ostream.write("\n")
def render_mbc(doc, ostream: StringIO):
def render_mbc(doc: rd.ResultDocument, ostream: StringIO):
"""
example::
@@ -172,11 +169,8 @@ def render_mbc(doc, ostream: StringIO):
"""
objectives = collections.defaultdict(set)
for rule in rutils.capability_rules(doc):
if not rule["meta"].get("mbc"):
continue
for mbc in rule["meta"]["mbc"]:
objectives[mbc["objective"]].add((mbc["behavior"], mbc.get("method"), mbc["id"]))
for mbc in rule.meta.mbc:
objectives[mbc.objective].add((mbc.behavior, mbc.method, mbc.id))
rows = []
for objective, behaviors in sorted(objectives.items()):
@@ -200,7 +194,7 @@ def render_mbc(doc, ostream: StringIO):
ostream.write("\n")
def render_default(doc):
def render_default(doc: rd.ResultDocument):
ostream = rutils.StringIO()
render_meta(doc, ostream)
@@ -215,5 +209,5 @@ def render_default(doc):
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
return render_default(doc)

View File

@@ -5,29 +5,10 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import json
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.render.result_document import convert_capabilities_to_result_document
class CapaJsonObjectEncoder(json.JSONEncoder):
"""JSON encoder that emits Python sets as sorted lists"""
def default(self, obj):
if isinstance(obj, (list, dict, int, float, bool, type(None))) or isinstance(obj, str):
return json.JSONEncoder.default(self, obj)
elif isinstance(obj, set):
return list(sorted(obj))
else:
# probably will TypeError
return json.JSONEncoder.default(self, obj)
import capa.render.result_document as rd
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
return json.dumps(
convert_capabilities_to_result_document(meta, rules, capabilities),
cls=CapaJsonObjectEncoder,
sort_keys=True,
)
return rd.ResultDocument.from_capa(meta, rules, capabilities).json(exclude_none=True)

View File

@@ -5,386 +5,576 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import copy
from typing import Any, List
import datetime
from typing import Any, Dict, Tuple, Union, Optional
from pydantic import Field, BaseModel
import capa.engine
import capa.render.utils
import capa.features.common
import capa.features.freeze as frz
import capa.features.address
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.helpers import assert_never
from capa.features.address import Address
def serialize_address(a: Address) -> Any:
if isinstance(a, capa.features.address.AbsoluteVirtualAddress):
return ("absolute", int(a))
elif isinstance(a, capa.features.address.RelativeVirtualAddress):
return ("relative", int(a))
elif isinstance(a, capa.features.address.FileOffsetAddress):
return ("file", int(a))
elif isinstance(a, capa.features.address.DNTokenAddress):
return ("dn token", a.token)
elif isinstance(a, capa.features.address.DNTokenOffsetAddress):
return ("dn token offset", a.token, a.offset)
elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress):
return ("no address",)
elif isinstance(a, capa.features.address.Address) and not issubclass(type(a), capa.features.address.Address):
raise ValueError("don't use an Address instance directly")
elif isinstance(a, capa.features.address.Address):
raise ValueError("don't use an Address instance directly")
else:
assert_never(a)
class FrozenModel(BaseModel):
class Config:
frozen = True
def deserialize_address(doc: List[Any]) -> Address:
atype = doc[0]
if atype == "absolute":
return capa.features.address.AbsoluteVirtualAddress(doc[1])
elif atype == "relative":
return capa.features.address.RelativeVirtualAddress(doc[1])
elif atype == "file":
return capa.features.address.FileOffsetAddress(doc[1])
elif atype == "dn token":
return capa.features.address.DNTokenAddress(doc[1])
elif atype == "dn token offset":
return capa.features.address.DNTokenOffsetAddress(doc[1], doc[2])
elif atype == "no address":
return capa.features.address.NO_ADDRESS
else:
assert_never(atype)
class Sample(FrozenModel):
md5: str
sha1: str
sha256: str
path: str
def convert_statement_to_result_document(statement):
"""
"statement": {
"type": "or"
},
"statement": {
"max": 9223372036854775808,
"min": 2,
"type": "range"
},
"""
statement_type = statement.name.lower()
result = {"type": statement_type}
if statement.description:
result["description"] = statement.description
if statement_type == "some" and statement.count == 0:
result["type"] = "optional"
elif statement_type == "some":
result["count"] = statement.count
elif statement_type == "range":
result["min"] = statement.min
result["max"] = statement.max
result["child"] = convert_feature_to_result_document(statement.child)
elif statement_type == "subscope":
result["subscope"] = statement.scope
return result
class BasicBlockLayout(FrozenModel):
address: frz.Address
def convert_feature_to_result_document(feature):
"""
"feature": {
"number": 6,
"type": "number"
},
class FunctionLayout(FrozenModel):
address: frz.Address
matched_basic_blocks: Tuple[BasicBlockLayout, ...]
"feature": {
"api": "ws2_32.WSASocket",
"type": "api"
},
"feature": {
"match": "create TCP socket",
"type": "match"
},
class Layout(FrozenModel):
functions: Tuple[FunctionLayout, ...]
class LibraryFunction(FrozenModel):
address: frz.Address
name: str
class FunctionFeatureCount(FrozenModel):
address: frz.Address
count: int
class FeatureCounts(FrozenModel):
file: int
functions: Tuple[FunctionFeatureCount, ...]
class Analysis(FrozenModel):
format: str
arch: str
os: str
extractor: str
rules: Tuple[str, ...]
base_address: frz.Address
layout: Layout
feature_counts: FeatureCounts
library_functions: Tuple[LibraryFunction, ...]
class Metadata(FrozenModel):
timestamp: datetime.datetime
version: str
argv: Tuple[str, ...]
sample: Sample
analysis: Analysis
@classmethod
def from_capa(cls, meta: Any) -> "Metadata":
return cls(
timestamp=meta["timestamp"],
version=meta["version"],
argv=meta["argv"],
sample=Sample(
md5=meta["sample"]["md5"],
sha1=meta["sample"]["sha1"],
sha256=meta["sample"]["sha256"],
path=meta["sample"]["path"],
),
analysis=Analysis(
format=meta["analysis"]["format"],
arch=meta["analysis"]["arch"],
os=meta["analysis"]["os"],
extractor=meta["analysis"]["extractor"],
rules=meta["analysis"]["rules"],
base_address=frz.Address.from_capa(meta["analysis"]["base_address"]),
layout=Layout(
functions=[
FunctionLayout(
address=frz.Address.from_capa(address),
matched_basic_blocks=[
BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in f["matched_basic_blocks"]
],
)
for address, f in meta["analysis"]["layout"]["functions"].items()
]
),
feature_counts=FeatureCounts(
file=meta["analysis"]["feature_counts"]["file"],
functions=[
FunctionFeatureCount(address=frz.Address.from_capa(address), count=count)
for address, count in meta["analysis"]["feature_counts"]["functions"].items()
],
),
library_functions=[
LibraryFunction(address=frz.Address.from_capa(address), name=name)
for address, name in meta["analysis"]["library_functions"].items()
],
),
)
class StatementModel(FrozenModel):
...
class AndStatement(StatementModel):
type = "and"
description: Optional[str]
class OrStatement(StatementModel):
type = "or"
description: Optional[str]
class NotStatement(StatementModel):
type = "not"
description: Optional[str]
class SomeStatement(StatementModel):
type = "some"
description: Optional[str]
count: int
class OptionalStatement(StatementModel):
type = "optional"
description: Optional[str]
class RangeStatement(StatementModel):
type = "range"
description: Optional[str]
min: int
max: int
child: frz.Feature
class SubscopeStatement(StatementModel):
type = "subscope"
description: Optional[str]
scope = capa.rules.Scope
Statement = Union[
AndStatement,
OrStatement,
NotStatement,
SomeStatement,
OptionalStatement,
RangeStatement,
SubscopeStatement,
]
class StatementNode(FrozenModel):
type = "statement"
statement: Statement
def statement_from_capa(node: capa.engine.Statement) -> StatementNode:
if isinstance(node, capa.engine.And):
return StatementNode(
statement=AndStatement(
description=node.description,
)
)
elif isinstance(node, capa.engine.Or):
return StatementNode(
statement=OrStatement(
description=node.description,
)
)
elif isinstance(node, capa.engine.Not):
return StatementNode(
statement=NotStatement(
description=node.description,
)
)
elif isinstance(node, capa.engine.Some):
if node.count == 0:
return StatementNode(
statement=OptionalStatement(
description=node.description,
)
)
"feature": {
"characteristic": [
"loop",
true
],
"type": "characteristic"
},
"""
result = {"type": feature.name, feature.name: feature.get_value_str()}
if feature.description:
result["description"] = feature.description
if feature.name in ("regex", "substring"):
if feature.matches:
# regex featur matches are a dict from the capture group to list of location addresses
result["matches"] = {k: list(map(serialize_address, vs)) for k, vs in feature.matches.items()}
else:
# there were no matches
pass
return StatementNode(
statement=SomeStatement(
description=node.description,
count=node.count,
)
)
return result
elif isinstance(node, capa.engine.Range):
return StatementNode(
statement=RangeStatement(
description=node.description,
min=node.min,
max=node.max,
child=frz.feature_from_capa(node.child),
)
)
elif isinstance(node, capa.engine.Subscope):
return StatementNode(
statement=SubscopeStatement(
description=node.description,
scope=capa.rules.Scope(node.scope),
)
)
def convert_node_to_result_document(node):
"""
"node": {
"type": "statement",
"statement": { ... }
},
"node": {
"type": "feature",
"feature": { ... }
},
"""
if isinstance(node, capa.engine.Statement):
return {
"type": "statement",
"statement": convert_statement_to_result_document(node),
}
elif isinstance(node, capa.features.common.Feature):
return {
"type": "feature",
"feature": convert_feature_to_result_document(node),
}
else:
raise RuntimeError("unexpected match node type")
raise NotImplementedError(f"statement_from_capa({type(node)}) not implemented")
def convert_match_to_result_document(rules, capabilities, result):
class FeatureNode(FrozenModel):
type = "feature"
feature: frz.Feature
Node = Union[StatementNode, FeatureNode]
def node_from_capa(node: Union[capa.engine.Statement, capa.engine.Feature]) -> Node:
if isinstance(node, capa.engine.Statement):
return StatementNode(statement=statement_from_capa(node))
elif isinstance(node, capa.engine.Feature):
return FeatureNode(feature=frz.feature_from_capa(node))
else:
assert_never(node)
class Match(BaseModel):
"""
convert the given Result instance into a common, Python-native data structure.
this will become part of the "result document" format that can be emitted to JSON.
args:
success: did the node match?
node: the logic node or feature node.
children: any children of the logic node. not relevent for features, can be empty.
locations: where the feature matched. not relevant for logic nodes (except range), can be empty.
captures: captured values from the string/regex feature, and the locations of those values.
"""
doc = {
"success": bool(result.success),
"node": convert_node_to_result_document(result.statement),
"children": [convert_match_to_result_document(rules, capabilities, child) for child in result.children],
}
# logic expression, like `and`, don't have locations - their children do.
# so only add `locations` to feature nodes.
if isinstance(result.statement, capa.features.common.Feature):
if bool(result.success):
doc["locations"] = list(map(serialize_address, result.locations))
elif isinstance(result.statement, capa.engine.Range):
if bool(result.success):
doc["locations"] = list(map(serialize_address, result.locations))
success: bool
node: Node
children: Tuple["Match", ...]
locations: Tuple[frz.Address, ...]
captures: Dict[str, Tuple[frz.Address, ...]]
# if we have a `match` statement, then we're referencing another rule or namespace.
# this could an external rule (written by a human), or
# rule generated to support a subscope (basic block, etc.)
# we still want to include the matching logic in this tree.
#
# so, we need to lookup the other rule results
# and then filter those down to the address used here.
# finally, splice that logic into this tree.
if (
doc["node"]["type"] == "feature"
and doc["node"]["feature"]["type"] == "match"
# only add subtree on success,
# because there won't be results for the other rule on failure.
and doc["success"]
):
@classmethod
def from_capa(
cls,
rules: RuleSet,
capabilities: MatchResults,
result: capa.engine.Result,
) -> "Match":
success = bool(result)
name = doc["node"]["feature"]["match"]
node = node_from_capa(result.statement)
if name in rules:
# this is a rule that we're matching
#
# pull matches from the referenced rule into our tree here.
rule_name = doc["node"]["feature"]["match"]
children = [Match.from_capa(rules, capabilities, child) for child in result.children]
# logic expression, like `and`, don't have locations - their children do.
# so only add `locations` to feature nodes.
locations = []
if isinstance(node, FeatureNode) and success:
locations = list(map(frz.Address.from_capa, result.locations))
elif isinstance(node, StatementNode) and isinstance(node.statement, RangeStatement) and success:
locations = list(map(frz.Address.from_capa, result.locations))
captures = {}
if isinstance(result.statement, (capa.features.common._MatchedSubstring, capa.features.common._MatchedRegex)):
captures = {
capture: list(map(frz.Address.from_capa, locs)) for capture, locs in result.statement.matches.items()
}
# if we have a `match` statement, then we're referencing another rule or namespace.
# this could an external rule (written by a human), or
# rule generated to support a subscope (basic block, etc.)
# we still want to include the matching logic in this tree.
#
# so, we need to lookup the other rule results
# and then filter those down to the address used here.
# finally, splice that logic into this tree.
if (
isinstance(node, FeatureNode)
and isinstance(node.feature, frz.features.MatchFeature)
# only add subtree on success,
# because there won't be results for the other rule on failure.
and success
):
name = node.feature.match
if name in rules:
# this is a rule that we're matching
#
# pull matches from the referenced rule into our tree here.
rule_name = name
rule = rules[rule_name]
rule_matches = {address: result for (address, result) in capabilities[rule_name]}
if rule.is_subscope_rule():
# for a subscope rule, fixup the node to be a scope node, rather than a match feature node.
#
# e.g. `contain loop/30c4c78e29bf4d54894fc74f664c62e8` -> `basic block`
#
# note! replace `node`
node = StatementNode(
statement=SubscopeStatement(
scope=rule.meta["scope"],
)
)
for location in result.locations:
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
else:
# this is a namespace that we're matching
#
# check for all rules in the namespace,
# seeing if they matched.
# if so, pull their matches into our match tree here.
ns_name = name
ns_rules = rules.rules_by_namespace[ns_name]
for rule in ns_rules:
if rule.name in capabilities:
# the rule matched, so splice results into our tree here.
#
# note, there's a shortcoming in our result document schema here:
# we lose the name of the rule that matched in a namespace.
# for example, if we have a statement: `match: runtime/dotnet`
# and we get matches, we can say the following:
#
# match: runtime/dotnet @ 0x0
# or:
# import: mscoree._CorExeMain @ 0x402000
#
# however, we lose the fact that it was rule
# "compiled to the .NET platform"
# that contained this logic and did the match.
#
# we could introduce an intermediate node here.
# this would be a breaking change and require updates to the renderers.
# in the meantime, the above might be sufficient.
rule_matches = {address: result for (address, result) in capabilities[rule.name]}
for location in result.locations:
# doc[locations] contains all matches for the given namespace.
# for example, the feature might be `match: anti-analysis/packer`
# which matches against "generic unpacker" and "UPX".
# in this case, doc[locations] contains locations for *both* of thse.
#
# rule_matches contains the matches for the specific rule.
# this is a subset of doc[locations].
#
# so, grab only the locations for current rule.
if location in rule_matches:
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
return cls(
success=success,
node=node,
children=children,
locations=locations,
captures=captures,
)
def parse_parts_id(s: str):
id = ""
parts = s.split("::")
if len(parts) > 0:
last = parts.pop()
last, _, id = last.rpartition(" ")
id = id.lstrip("[").rstrip("]")
parts.append(last)
return parts, id
class AttackSpec(FrozenModel):
"""
given an ATT&CK spec like: `Tactic::Technique::Subtechnique [Identifier]`
e.g., `Execution::Command and Scripting Interpreter::Python [T1059.006]`
args:
tactic: like `Tactic` above, perhaps "Execution"
technique: like `Technique` above, perhaps "Command and Scripting Interpreter"
subtechnique: like `Subtechnique` above, perhaps "Python"
id: like `Identifier` above, perhaps "T1059.006"
"""
parts: Tuple[str, ...]
tactic: str
technique: str
subtechnique: str
id: str
@classmethod
def from_str(cls, s) -> "AttackSpec":
tactic = ""
technique = ""
subtechnique = ""
parts, id = parse_parts_id(s)
if len(parts) > 0:
tactic = parts[0]
if len(parts) > 1:
technique = parts[1]
if len(parts) > 2:
subtechnique = parts[2]
return cls(
parts=parts,
tactic=tactic,
technique=technique,
subtechnique=subtechnique,
id=id,
)
class MBCSpec(FrozenModel):
"""
given an MBC spec like: `Objective::Behavior::Method [Identifier]`
e.g., `Collection::Input Capture::Mouse Events [E1056.m01]`
args:
objective: like `Objective` above, perhaps "Collection"
behavior: like `Behavior` above, perhaps "Input Capture"
method: like `Method` above, perhaps "Mouse Events"
id: like `Identifier` above, perhaps "E1056.m01"
"""
parts: Tuple[str, ...]
objective: str
behavior: str
method: str
id: str
@classmethod
def from_str(cls, s) -> "MBCSpec":
objective = ""
behavior = ""
method = ""
parts, id = parse_parts_id(s)
if len(parts) > 0:
objective = parts[0]
if len(parts) > 1:
behavior = parts[1]
if len(parts) > 2:
method = parts[2]
return cls(
parts=parts,
objective=objective,
behavior=behavior,
method=method,
id=id,
)
class MaecMetadata(FrozenModel):
analysis_conclusion: Optional[str] = Field(None, alias="analysis-conclusion")
analysis_conclusion_ov: Optional[str] = Field(None, alias="analysis-conclusion-ov")
malware_family: Optional[str] = Field(None, alias="malware-family")
malware_category: Optional[str] = Field(None, alias="malware-category")
malware_category_ov: Optional[str] = Field(None, alias="malware-category-ov")
class Config:
frozen = True
allow_population_by_field_name = True
class RuleMetadata(FrozenModel):
name: str
namespace: Optional[str]
authors: Tuple[str, ...]
scope: capa.rules.Scope
attack: Tuple[AttackSpec, ...] = Field(alias="att&ck")
mbc: Tuple[MBCSpec, ...]
references: Tuple[str, ...]
examples: Tuple[str, ...]
description: str
lib: bool = Field(False, alias="lib")
is_subscope_rule: bool = Field(False, alias="capa/subscope")
maec: MaecMetadata
@classmethod
def from_capa(cls, rule: capa.rules.Rule) -> "RuleMetadata":
return cls(
name=rule.meta.get("name"),
namespace=rule.meta.get("namespace"),
authors=rule.meta.get("authors"),
scope=capa.rules.Scope(rule.meta.get("scope")),
attack=list(map(AttackSpec.from_str, rule.meta.get("att&ck", []))),
mbc=list(map(MBCSpec.from_str, rule.meta.get("mbc", []))),
references=rule.meta.get("references", []),
examples=rule.meta.get("examples", []),
description=rule.meta.get("description", ""),
lib=rule.meta.get("lib", False),
capa_subscope=rule.meta.get("capa/subscope", False),
maec=MaecMetadata(
analysis_conclusion=rule.meta.get("maec/analysis-conclusion"),
analysis_conclusion_ov=rule.meta.get("maec/analysis-conclusion-ov"),
malware_family=rule.meta.get("maec/malware-family"),
malware_category=rule.meta.get("maec/malware-category"),
malware_category_ov=rule.meta.get("maec/malware-category-ov"),
)
)
class Config:
frozen = True
allow_population_by_field_name = True
class RuleMatches(BaseModel):
"""
args:
meta: the metadata from the rule
source: the raw rule text
"""
meta: RuleMetadata
source: str
matches: Tuple[Tuple[frz.Address, Match], ...]
class ResultDocument(BaseModel):
meta: Metadata
rules: Dict[str, RuleMatches]
@classmethod
def from_capa(cls, meta, rules: RuleSet, capabilities: MatchResults) -> "ResultDocument":
rule_matches: Dict[str, RuleMatches] = {}
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
rule_matches = {address: result for (address, result) in capabilities[rule_name]}
if rule.meta.get("capa/subscope-rule"):
# for a subscope rule, fixup the node to be a scope node, rather than a match feature node.
#
# e.g. `contain loop/30c4c78e29bf4d54894fc74f664c62e8` -> `basic block`
scope = rule.meta["scope"]
doc["node"] = {
"type": "statement",
"statement": {
"type": "subscope",
"subscope": scope,
},
}
continue
for location in result.locations:
doc["children"].append(convert_match_to_result_document(rules, capabilities, rule_matches[location]))
else:
# this is a namespace that we're matching
#
# check for all rules in the namespace,
# seeing if they matched.
# if so, pull their matches into our match tree here.
ns_name = doc["node"]["feature"]["match"]
ns_rules = rules.rules_by_namespace[ns_name]
rule_matches[rule_name] = RuleMatches(
meta=RuleMetadata.from_capa(rule),
source=rule.definition,
matches=[
(frz.Address.from_capa(addr), Match.from_capa(rules, capabilities, match))
for addr, match in matches
],
)
for rule in ns_rules:
if rule.name in capabilities:
# the rule matched, so splice results into our tree here.
#
# note, there's a shortcoming in our result document schema here:
# we lose the name of the rule that matched in a namespace.
# for example, if we have a statement: `match: runtime/dotnet`
# and we get matches, we can say the following:
#
# match: runtime/dotnet @ 0x0
# or:
# import: mscoree._CorExeMain @ 0x402000
#
# however, we lose the fact that it was rule
# "compiled to the .NET platform"
# that contained this logic and did the match.
#
# we could introduce an intermediate node here.
# this would be a breaking change and require updates to the renderers.
# in the meantime, the above might be sufficient.
rule_matches = {address: result for (address, result) in capabilities[rule.name]}
for location in result.locations:
# doc[locations] contains all matches for the given namespace.
# for example, the feature might be `match: anti-analysis/packer`
# which matches against "generic unpacker" and "UPX".
# in this case, doc[locations] contains locations for *both* of thse.
#
# rule_matches contains the matches for the specific rule.
# this is a subset of doc[locations].
#
# so, grab only the locations for current rule.
if location in rule_matches:
doc["children"].append(
convert_match_to_result_document(rules, capabilities, rule_matches[location])
)
return doc
def convert_meta_to_result_document(meta):
# make a copy so that we don't modify the given parameter
meta = copy.deepcopy(meta)
attacks = meta.get("att&ck", [])
meta["att&ck"] = [parse_canonical_attack(attack) for attack in attacks]
mbcs = meta.get("mbc", [])
meta["mbc"] = [parse_canonical_mbc(mbc) for mbc in mbcs]
return meta
def parse_canonical_attack(attack: str):
"""
parse capa's canonical ATT&CK representation: `Tactic::Technique::Subtechnique [Identifier]`
"""
tactic = ""
technique = ""
subtechnique = ""
parts, id = capa.render.utils.parse_parts_id(attack)
if len(parts) > 0:
tactic = parts[0]
if len(parts) > 1:
technique = parts[1]
if len(parts) > 2:
subtechnique = parts[2]
return {
"parts": parts,
"id": id,
"tactic": tactic,
"technique": technique,
"subtechnique": subtechnique,
}
def parse_canonical_mbc(mbc: str):
"""
parse capa's canonical MBC representation: `Objective::Behavior::Method [Identifier]`
"""
objective = ""
behavior = ""
method = ""
parts, id = capa.render.utils.parse_parts_id(mbc)
if len(parts) > 0:
objective = parts[0]
if len(parts) > 1:
behavior = parts[1]
if len(parts) > 2:
method = parts[2]
return {
"parts": parts,
"id": id,
"objective": objective,
"behavior": behavior,
"method": method,
}
def convert_capabilities_to_result_document(meta, rules: RuleSet, capabilities: MatchResults):
"""
convert the given rule set and capabilities result to a common, Python-native data structure.
this format can be directly emitted to JSON, or passed to the other `capa.render.*.render()` routines
to render as text.
see examples of substructures in above routines.
"""
meta["analysis"]["base_address"] = serialize_address(meta["analysis"]["base_address"])
meta["analysis"]["feature_counts"]["functions"] = [
{"address": serialize_address(address), "count": count}
for address, count in meta["analysis"]["feature_counts"]["functions"].items()
]
meta["analysis"]["library_functions"] = [
{"address": serialize_address(address), "name": name}
for address, name in meta["analysis"]["library_functions"].items()
]
meta["analysis"]["layout"]["functions"] = [
{
"address": serialize_address(faddr),
"matched_basic_blocks": list({"address": serialize_address(bb)} for bb in f["matched_basic_blocks"]),
}
for faddr, f in meta["analysis"]["layout"]["functions"].items()
]
doc = {
"meta": meta,
"rules": {},
}
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if rule.meta.get("capa/subscope-rule"):
continue
rule_meta = convert_meta_to_result_document(rule.meta)
doc["rules"][rule_name] = {
"meta": rule_meta,
"source": rule.definition,
"matches": [
[serialize_address(addr), convert_match_to_result_document(rules, capabilities, match)]
for (addr, match) in matches
],
}
return doc
return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches)

View File

@@ -7,9 +7,12 @@
# See the License for the specific language governing permissions and limitations under the License.
import io
from typing import Iterator, Union
import termcolor
import capa.render.result_document as rd
def bold(s: str) -> str:
"""draw attention to the given string"""
@@ -29,42 +32,31 @@ def hex(n: int) -> str:
return "0x%X" % n
def parse_parts_id(s: str):
id = ""
parts = s.split("::")
if len(parts) > 0:
last = parts.pop()
last, _, id = last.rpartition(" ")
id = id.lstrip("[").rstrip("]")
parts.append(last)
return parts, id
def format_parts_id(data):
def format_parts_id(data: Union[rd.AttackSpec, rd.MBCSpec]):
"""
format canonical representation of ATT&CK/MBC parts and ID
"""
return "%s [%s]" % ("::".join(data["parts"]), data["id"])
return "%s [%s]" % ("::".join(data.parts), data.id)
def capability_rules(doc):
def capability_rules(doc: rd.ResultDocument) -> Iterator[rd.RuleMatches]:
"""enumerate the rules in (namespace, name) order that are 'capability' rules (not lib/subscope/disposition/etc)."""
for (_, _, rule) in sorted(
map(lambda rule: (rule["meta"].get("namespace", ""), rule["meta"]["name"], rule), doc["rules"].values())
map(lambda rule: (rule.meta.namespace or "", rule.meta.name, rule), doc.rules.values())
):
if rule["meta"].get("lib"):
if rule.meta.lib:
continue
if rule["meta"].get("capa/subscope"):
if rule.meta.is_subscope_rule:
continue
if rule["meta"].get("maec/analysis-conclusion"):
if rule.meta.maec.analysis_conclusion:
continue
if rule["meta"].get("maec/analysis-conclusion-ov"):
if rule.meta.maec.analysis_conclusion_ov:
continue
if rule["meta"].get("maec/malware-family"):
if rule.meta.maec.malware_family:
continue
if rule["meta"].get("maec/malware-category"):
if rule.meta.maec.malware_category:
continue
if rule["meta"].get("maec/malware-category-ov"):
if rule.meta.maec.malware_category_ov:
continue
yield rule

View File

@@ -22,6 +22,7 @@ Unless required by applicable law or agreed to in writing, software distributed
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
from tarfile import FIFOTYPE
import tabulate
import dnfile.mdtable
@@ -29,39 +30,32 @@ import capa.rules
import capa.render.utils as rutils
import capa.render.result_document
import capa.render.result_document as rd
import capa.features.freeze as frz
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.features.address import (
NO_ADDRESS,
Address,
DNTokenAddress,
FileOffsetAddress,
DNTokenOffsetAddress,
AbsoluteVirtualAddress,
RelativeVirtualAddress,
)
def format_address(address: Address) -> str:
if isinstance(address, AbsoluteVirtualAddress):
return rutils.hex(int(address))
elif isinstance(address, RelativeVirtualAddress):
return f"base address+{rutils.hex(int(address))}"
elif isinstance(address, FileOffsetAddress):
return f"file+{rutils.hex(int(address))}"
elif isinstance(address, DNTokenAddress):
return str(address)
elif isinstance(address, DNTokenOffsetAddress):
name = dnfile.mdtable.ClrMetaDataTableFactory._table_number_map[address.token.table].name
rid = address.token.rid
return f"{name}[{rid}]+{rutils.hex(int(address.offset))}"
elif address == NO_ADDRESS:
def format_address(address: frz.Address) -> str:
if address.type == frz.AddressType.ABSOLUTE:
return rutils.hex(address.value)
elif address.type == frz.AddressType.RELATIVE:
return f"base address+{rutils.hex(address.value)}"
elif address.type == frz.AddressType.FILE:
return f"file+{rutils.hex(address.value)}"
elif address.type == frz.AddressType.DN_TOKEN:
return str(address.value)
elif address.type == frz.AddressType.DN_TOKEN_OFFSET:
token, offset = address.value
name = dnfile.mdtable.ClrMetaDataTableFactory._table_number_map[token.table].name
rid = token.rid
return f"{name}[{rid}]+{rutils.hex(offset)}"
elif address.type == frz.AddressType.NO_ADDRESS:
return "global"
else:
raise ValueError("unexpected address type")
def render_meta(ostream, doc):
def render_meta(ostream, doc: rd.ResultDocument):
"""
like:
@@ -81,31 +75,31 @@ def render_meta(ostream, doc):
total feature count 1918
"""
rows = [
("md5", doc["meta"]["sample"]["md5"]),
("sha1", doc["meta"]["sample"]["sha1"]),
("sha256", doc["meta"]["sample"]["sha256"]),
("path", doc["meta"]["sample"]["path"]),
("timestamp", doc["meta"]["timestamp"]),
("capa version", doc["meta"]["version"]),
("os", doc["meta"]["analysis"]["os"]),
("format", doc["meta"]["analysis"]["format"]),
("arch", doc["meta"]["analysis"]["arch"]),
("extractor", doc["meta"]["analysis"]["extractor"]),
("base address", format_address(rd.deserialize_address(doc["meta"]["analysis"]["base_address"]))),
("rules", "\n".join(doc["meta"]["analysis"]["rules"])),
("function count", len(doc["meta"]["analysis"]["feature_counts"]["functions"])),
("library function count", len(doc["meta"]["analysis"]["library_functions"])),
("md5", doc.meta.sample.md5),
("sha1", doc.meta.sample.sha1),
("sha256", doc.meta.sample.sha256),
("path", doc.meta.sample.path),
("timestamp", doc.meta.timestamp),
("capa version", doc.meta.version),
("os", doc.meta.analysis.os),
("format", doc.meta.analysis.format),
("arch", doc.meta.analysis.arch),
("extractor", doc.meta.analysis.extractor),
("base address", format_address(doc.meta.analysis.base_address)),
("rules", "\n".join(doc.meta.analysis.rules)),
("function count", len(doc.meta.analysis.feature_counts.functions)),
("library function count", len(doc.meta.analysis.library_functions)),
(
"total feature count",
doc["meta"]["analysis"]["feature_counts"]["file"]
+ sum(map(lambda f: f["count"], doc["meta"]["analysis"]["feature_counts"]["functions"])),
doc.meta.analysis.feature_counts.file
+ sum(map(lambda f: f.count, doc.meta.analysis.feature_counts.functions)),
),
]
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
def render_rules(ostream, doc):
def render_rules(ostream, doc: rd.ResultDocument):
"""
like:
@@ -118,28 +112,29 @@ def render_rules(ostream, doc):
"""
had_match = False
for rule in rutils.capability_rules(doc):
count = len(rule["matches"])
count = len(rule.matches)
if count == 1:
capability = rutils.bold(rule["meta"]["name"])
capability = rutils.bold(rule.meta.name)
else:
capability = "%s (%d matches)" % (rutils.bold(rule["meta"]["name"]), count)
capability = "%s (%d matches)" % (rutils.bold(rule.meta.name), count)
ostream.writeln(capability)
had_match = True
rows = []
for key in ("namespace", "description", "scope"):
if key == "name" or key not in rule["meta"]:
v = getattr(rule.meta, key)
if not v:
continue
v = rule["meta"][key]
if isinstance(v, list) and len(v) == 1:
v = v[0]
rows.append((key, v))
if rule["meta"]["scope"] != capa.rules.FILE_SCOPE:
locations = list(map(lambda m: m[0], doc["rules"][rule["meta"]["name"]]["matches"]))
rows.append(("matches", "\n".join(map(lambda d: format_address(rd.deserialize_address(d)), locations))))
if rule.meta.scope != capa.rules.FILE_SCOPE:
locations = list(map(lambda m: m[0], doc.rules[rule.meta.name].matches))
rows.append(("matches", "\n".join(map(format_address, locations))))
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
ostream.write("\n")
@@ -148,7 +143,7 @@ def render_rules(ostream, doc):
ostream.writeln(rutils.bold("no capabilities found"))
def render_verbose(doc):
def render_verbose(doc: rd.ResultDocument):
ostream = rutils.StringIO()
render_meta(ostream, doc)
@@ -161,5 +156,4 @@ def render_verbose(doc):
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
doc = rd.convert_capabilities_to_result_document(meta, rules, capabilities)
return render_verbose(doc)
return render_verbose(rd.ResultDocument.from_capa(meta, rules, capabilities))

View File

@@ -12,6 +12,8 @@ import capa.rules
import capa.render.utils as rutils
import capa.render.verbose
import capa.features.common
import capa.features.freeze as frz
import capa.features.freeze.features as frzf
import capa.render.result_document as rd
from capa.rules import RuleSet
from capa.engine import MatchResults
@@ -23,22 +25,22 @@ def render_locations(ostream, match):
# its possible to have an empty locations array here,
# such as when we're in MODE_FAILURE and showing the logic
# under a `not` statement (which will have no matched locations).
locations = list(sorted(match.get("locations", [])))
locations = list(sorted(match.locations))
if len(locations) == 1:
ostream.write(" @ ")
ostream.write(v.format_address(rd.deserialize_address(locations[0])))
ostream.write(v.format_address(locations[0]))
elif len(locations) > 1:
ostream.write(" @ ")
if len(locations) > 4:
# don't display too many locations, because it becomes very noisy.
# probably only the first handful of locations will be useful for inspection.
ostream.write(", ".join(map(lambda d: v.format_address(rd.deserialize_address(d)), locations[0:4])))
ostream.write(", ".join(map(v.format_address, locations[0:4])))
ostream.write(", and %d more..." % (len(locations) - 4))
else:
ostream.write(", ".join(map(lambda d: v.format_address(rd.deserialize_address(d)), locations)))
ostream.write(", ".join(map(v.format_address, locations)))
def render_statement(ostream, match, statement, indent=0):
def render_statement(ostream, match: rd.Match, statement: rd.Statement, indent=0):
ostream.write(" " * indent)
if statement["type"] in ("and", "or", "optional", "not", "subscope"):
if statement["type"] == "subscope":
@@ -99,7 +101,7 @@ def render_string_value(s):
return '"%s"' % capa.features.common.escape_string(s)
def render_feature(ostream, match, feature, indent=0):
def render_feature(ostream, match: rd.Match, feature: frzf.Feature, indent=0):
ostream.write(" " * indent)
key = feature["type"]
@@ -142,11 +144,11 @@ def render_feature(ostream, match, feature, indent=0):
ostream.write("\n")
def render_node(ostream, match, node, indent=0):
if node["type"] == "statement":
render_statement(ostream, match, node["statement"], indent=indent)
elif node["type"] == "feature":
render_feature(ostream, match, node["feature"], indent=indent)
def render_node(ostream, match: rd.Match, node: rd.Node, indent=0):
if isinstance(node, rd.Statement):
render_statement(ostream, match, node.statement, indent=indent)
elif isinstance(node, rd.Feature):
render_feature(ostream, match, node.feature, indent=indent)
else:
raise RuntimeError("unexpected node type: " + str(node))
@@ -159,11 +161,11 @@ MODE_SUCCESS = "success"
MODE_FAILURE = "failure"
def render_match(ostream, match, indent=0, mode=MODE_SUCCESS):
def render_match(ostream, match: rd.Match, indent=0, mode=MODE_SUCCESS):
child_mode = mode
if mode == MODE_SUCCESS:
# display only nodes that evaluated successfully.
if not match["success"]:
if not match.success:
return
# optional statement with no successful children is empty
if match["node"].get("statement", {}).get("type") == "optional" and not any(
@@ -188,13 +190,13 @@ def render_match(ostream, match, indent=0, mode=MODE_SUCCESS):
else:
raise RuntimeError("unexpected mode: " + mode)
render_node(ostream, match, match["node"], indent=indent)
render_node(ostream, match, match.node, indent=indent)
for child in match["children"]:
for child in match.children:
render_match(ostream, child, indent=indent + 1, mode=child_mode)
def render_rules(ostream, doc):
def render_rules(ostream, doc: rd.ResultDocument):
"""
like:
@@ -211,37 +213,37 @@ def render_rules(ostream, doc):
api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895
"""
functions_by_bb = {}
for finfo in doc["meta"]["analysis"]["layout"]["functions"]:
faddress = rd.deserialize_address(finfo["address"])
for finfo in doc.meta.analysis.layout.functions:
faddress = finfo.address.to_capa()
for bb in finfo["matched_basic_blocks"]:
bbaddress = rd.deserialize_address(bb["address"])
for bb in finfo.matched_basic_blocks:
bbaddress = bb.address.to_capa()
functions_by_bb[bbaddress] = faddress
had_match = False
for (_, _, rule) in sorted(
map(lambda rule: (rule["meta"].get("namespace", ""), rule["meta"]["name"], rule), doc["rules"].values())
map(lambda rule: (rule.meta.namespace or "", rule.meta.name, rule), doc.rules.values())
):
# default scope hides things like lib rules, malware-category rules, etc.
# but in vverbose mode, we really want to show everything.
#
# still ignore subscope rules because they're stitched into the final document.
if rule["meta"].get("capa/subscope"):
if rule.meta.is_subscope_rule:
continue
count = len(rule["matches"])
count = len(rule.matches)
if count == 1:
capability = rutils.bold(rule["meta"]["name"])
capability = rutils.bold(rule.meta.name)
else:
capability = "%s (%d matches)" % (rutils.bold(rule["meta"]["name"]), count)
capability = "%s (%d matches)" % (rutils.bold(rule.meta.name), count)
ostream.writeln(capability)
had_match = True
rows = []
for key in capa.rules.META_KEYS:
if key == "name" or key not in rule["meta"]:
if key == "name":
continue
if key == "examples":
@@ -251,11 +253,11 @@ def render_rules(ostream, doc):
# so, don't make the output messy by showing the examples.
continue
v = rule["meta"][key]
v = getattr(rule.meta, key)
if not v:
continue
if key in ("att&ck", "mbc"):
if key in ("attack", "mbc"):
v = [rutils.format_parts_id(vv) for vv in v]
if isinstance(v, list) and len(v) == 1:
@@ -266,8 +268,8 @@ def render_rules(ostream, doc):
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
if rule["meta"]["scope"] == capa.rules.FILE_SCOPE:
matches = doc["rules"][rule["meta"]["name"]]["matches"]
if rule.meta.scope == capa.rules.FILE_SCOPE:
matches = doc.rules[rule.meta.name].matches
if len(matches) != 1:
# i think there should only ever be one match per file-scope rule,
# because we do the file-scope evaluation a single time.
@@ -277,15 +279,13 @@ def render_rules(ostream, doc):
first_address, first_match = matches[0]
render_match(ostream, first_match, indent=0)
else:
for location, match in sorted(doc["rules"][rule["meta"]["name"]]["matches"]):
location = rd.deserialize_address(location)
ostream.write(rule["meta"]["scope"])
for location, match in sorted(doc.rules[rule.meta.name].matches):
ostream.write(rule.meta.scope)
ostream.write(" @ ")
ostream.write(capa.render.verbose.format_address(location))
if rule["meta"]["scope"] == capa.rules.BASIC_BLOCK_SCOPE:
ostream.write(" in function " + capa.render.verbose.format_address(functions_by_bb[location]))
if rule.meta.scope == capa.rules.BASIC_BLOCK_SCOPE:
ostream.write(" in function " + capa.render.verbose.format_address(functions_by_bb[location.to_capa()]))
ostream.write("\n")
render_match(ostream, match, indent=1)
@@ -295,7 +295,7 @@ def render_rules(ostream, doc):
ostream.writeln(rutils.bold("no capabilities found"))
def render_vverbose(doc):
def render_vverbose(doc: rd.ResultDocument):
ostream = rutils.StringIO()
capa.render.verbose.render_meta(ostream, doc)
@@ -308,5 +308,4 @@ def render_vverbose(doc):
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
doc = rd.convert_capabilities_to_result_document(meta, rules, capabilities)
return render_vverbose(doc)
return render_vverbose(rd.ResultDocument.from_capa(meta, rules, capabilities))