mirror of
https://github.com/mandiant/capa.git
synced 2026-02-04 11:07:53 -08:00
json, render: work with and serialize addresses
This commit is contained in:
@@ -64,7 +64,7 @@ def find_subrule_matches(doc):
|
||||
matches.add(node["node"]["feature"]["match"])
|
||||
|
||||
for rule in rutils.capability_rules(doc):
|
||||
for node in rule["matches"].values():
|
||||
for address, node in rule["matches"]:
|
||||
rec(node)
|
||||
|
||||
return matches
|
||||
|
||||
@@ -6,12 +6,67 @@
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import copy
|
||||
from typing import Any, List
|
||||
|
||||
import capa.engine
|
||||
import capa.render.utils
|
||||
import capa.features.common
|
||||
import capa.features.address
|
||||
from capa.rules import RuleSet
|
||||
from capa.engine import MatchResults
|
||||
from capa.helpers import assert_never
|
||||
from capa.features.address import Address
|
||||
|
||||
|
||||
def serialize_address(a: Address) -> Any:
|
||||
if isinstance(a, capa.features.address.AbsoluteVirtualAddress):
|
||||
return ("absolute", int(a))
|
||||
|
||||
elif isinstance(a, capa.features.address.RelativeVirtualAddress):
|
||||
return ("relative", int(a))
|
||||
|
||||
elif isinstance(a, capa.features.address.FileOffsetAddress):
|
||||
return ("file", int(a))
|
||||
|
||||
elif isinstance(a, capa.features.address.DNTokenAddress):
|
||||
return ("dn token", a.token)
|
||||
|
||||
elif isinstance(a, capa.features.address.DNTokenOffsetAddress):
|
||||
return ("dn token offset", a.token, a.offset)
|
||||
|
||||
elif a == capa.features.address.NO_ADDRESS:
|
||||
return ("no address",)
|
||||
|
||||
elif isinstance(a, capa.features.address.Address):
|
||||
raise ValueError("don't use an Address instance directly")
|
||||
|
||||
else:
|
||||
assert_never(a)
|
||||
|
||||
|
||||
def deserialize_address(doc: List[Any]) -> Address:
|
||||
atype = doc[0]
|
||||
|
||||
if atype == "absolute":
|
||||
return capa.features.address.AbsoluteVirtualAddress(doc[1])
|
||||
|
||||
elif atype == "relative":
|
||||
return capa.features.address.RelativeVirtualAddress(doc[1])
|
||||
|
||||
elif atype == "file":
|
||||
return capa.features.address.FileOffsetAddress(doc[1])
|
||||
|
||||
elif atype == "dn token":
|
||||
return capa.features.address.DNTokenAddress(doc[1])
|
||||
|
||||
elif atype == "dn token offset":
|
||||
return capa.features.address.DNTokenOffsetAddress(doc[1], doc[2])
|
||||
|
||||
elif atype == "no address":
|
||||
return capa.features.address.NO_ADDRESS
|
||||
|
||||
else:
|
||||
assert_never(atype)
|
||||
|
||||
|
||||
def convert_statement_to_result_document(statement):
|
||||
@@ -74,7 +129,13 @@ def convert_feature_to_result_document(feature):
|
||||
if feature.description:
|
||||
result["description"] = feature.description
|
||||
if feature.name in ("regex", "substring"):
|
||||
result["matches"] = feature.matches
|
||||
if feature.matches:
|
||||
# regex featur matches are a dict from the capture group to list of location addresses
|
||||
result["matches"] = {k: list(map(serialize_address, vs)) for k, vs in feature.matches.items()}
|
||||
else:
|
||||
# there were no matches
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@@ -120,10 +181,10 @@ def convert_match_to_result_document(rules, capabilities, result):
|
||||
# so only add `locations` to feature nodes.
|
||||
if isinstance(result.statement, capa.features.common.Feature):
|
||||
if bool(result.success):
|
||||
doc["locations"] = result.locations
|
||||
doc["locations"] = list(map(serialize_address, result.locations))
|
||||
elif isinstance(result.statement, capa.engine.Range):
|
||||
if bool(result.success):
|
||||
doc["locations"] = result.locations
|
||||
doc["locations"] = list(map(serialize_address, result.locations))
|
||||
|
||||
# if we have a `match` statement, then we're referencing another rule or namespace.
|
||||
# this could an external rule (written by a human), or
|
||||
@@ -164,7 +225,7 @@ def convert_match_to_result_document(rules, capabilities, result):
|
||||
},
|
||||
}
|
||||
|
||||
for location in doc["locations"]:
|
||||
for location in result.locations:
|
||||
doc["children"].append(convert_match_to_result_document(rules, capabilities, rule_matches[location]))
|
||||
else:
|
||||
# this is a namespace that we're matching
|
||||
@@ -196,7 +257,7 @@ def convert_match_to_result_document(rules, capabilities, result):
|
||||
# this would be a breaking change and require updates to the renderers.
|
||||
# in the meantime, the above might be sufficient.
|
||||
rule_matches = {address: result for (address, result) in capabilities[rule.name]}
|
||||
for location in doc["locations"]:
|
||||
for location in result.locations:
|
||||
# doc[locations] contains all matches for the given namespace.
|
||||
# for example, the feature might be `match: anti-analysis/packer`
|
||||
# which matches against "generic unpacker" and "UPX".
|
||||
@@ -280,30 +341,27 @@ def convert_capabilities_to_result_document(meta, rules: RuleSet, capabilities:
|
||||
to render as text.
|
||||
|
||||
see examples of substructures in above routines.
|
||||
|
||||
schema:
|
||||
|
||||
```json
|
||||
{
|
||||
"meta": {...},
|
||||
"rules: {
|
||||
$rule-name: {
|
||||
"meta": {...copied from rule.meta...},
|
||||
"matches: {
|
||||
$address: {...match details...},
|
||||
...
|
||||
}
|
||||
},
|
||||
...
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Args:
|
||||
meta (Dict[str, Any]):
|
||||
rules (RuleSet):
|
||||
capabilities (Dict[str, List[Tuple[int, Result]]]):
|
||||
"""
|
||||
meta["analysis"]["base_address"] = serialize_address(meta["analysis"]["base_address"])
|
||||
|
||||
meta["analysis"]["feature_counts"]["functions"] = [
|
||||
{"address": serialize_address(address), "count": count}
|
||||
for address, count in meta["analysis"]["feature_counts"]["functions"].items()
|
||||
]
|
||||
|
||||
meta["analysis"]["library_functions"] = [
|
||||
{"address": serialize_address(address), "name": name}
|
||||
for address, name in meta["analysis"]["library_functions"].items()
|
||||
]
|
||||
|
||||
meta["analysis"]["layout"]["functions"] = [
|
||||
{
|
||||
"address": serialize_address(faddr),
|
||||
"matched_basic_blocks": list({"address": serialize_address(bb)} for bb in f["matched_basic_blocks"]),
|
||||
}
|
||||
for faddr, f in meta["analysis"]["layout"]["functions"].items()
|
||||
]
|
||||
|
||||
doc = {
|
||||
"meta": meta,
|
||||
"rules": {},
|
||||
@@ -320,9 +378,10 @@ def convert_capabilities_to_result_document(meta, rules: RuleSet, capabilities:
|
||||
doc["rules"][rule_name] = {
|
||||
"meta": rule_meta,
|
||||
"source": rule.definition,
|
||||
"matches": {
|
||||
addr: convert_match_to_result_document(rules, capabilities, match) for (addr, match) in matches
|
||||
},
|
||||
"matches": [
|
||||
[serialize_address(addr), convert_match_to_result_document(rules, capabilities, match)]
|
||||
for (addr, match) in matches
|
||||
],
|
||||
}
|
||||
|
||||
return doc
|
||||
|
||||
@@ -28,6 +28,7 @@ import dnfile.mdtable
|
||||
import capa.rules
|
||||
import capa.render.utils as rutils
|
||||
import capa.render.result_document
|
||||
import capa.render.result_document as rd
|
||||
from capa.rules import RuleSet
|
||||
from capa.engine import MatchResults
|
||||
from capa.features.address import (
|
||||
@@ -90,14 +91,14 @@ def render_meta(ostream, doc):
|
||||
("format", doc["meta"]["analysis"]["format"]),
|
||||
("arch", doc["meta"]["analysis"]["arch"]),
|
||||
("extractor", doc["meta"]["analysis"]["extractor"]),
|
||||
("base address", hex(doc["meta"]["analysis"]["base_address"])),
|
||||
("base address", format_address(rd.deserialize_address(doc["meta"]["analysis"]["base_address"]))),
|
||||
("rules", "\n".join(doc["meta"]["analysis"]["rules"])),
|
||||
("function count", len(doc["meta"]["analysis"]["feature_counts"]["functions"])),
|
||||
("library function count", len(doc["meta"]["analysis"]["library_functions"])),
|
||||
(
|
||||
"total feature count",
|
||||
doc["meta"]["analysis"]["feature_counts"]["file"]
|
||||
+ sum(doc["meta"]["analysis"]["feature_counts"]["functions"].values()),
|
||||
+ sum(map(lambda f: f["count"], doc["meta"]["analysis"]["feature_counts"]["functions"])),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -137,8 +138,8 @@ def render_rules(ostream, doc):
|
||||
rows.append((key, v))
|
||||
|
||||
if rule["meta"]["scope"] != capa.rules.FILE_SCOPE:
|
||||
locations = doc["rules"][rule["meta"]["name"]]["matches"].keys()
|
||||
rows.append(("matches", "\n".join(map(format_address, locations))))
|
||||
locations = list(map(lambda m: m[0], doc["rules"][rule["meta"]["name"]]["matches"]))
|
||||
rows.append(("matches", "\n".join(map(lambda d: format_address(rd.deserialize_address(d)), locations))))
|
||||
|
||||
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
|
||||
ostream.write("\n")
|
||||
@@ -160,5 +161,5 @@ def render_verbose(doc):
|
||||
|
||||
|
||||
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
|
||||
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
doc = rd.convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
return render_verbose(doc)
|
||||
|
||||
@@ -12,9 +12,10 @@ import capa.rules
|
||||
import capa.render.utils as rutils
|
||||
import capa.render.verbose
|
||||
import capa.features.common
|
||||
import capa.render.result_document
|
||||
import capa.render.result_document as rd
|
||||
from capa.rules import RuleSet
|
||||
from capa.engine import MatchResults
|
||||
from capa.features.freeze import deserialize_address
|
||||
|
||||
|
||||
def render_locations(ostream, match):
|
||||
@@ -26,16 +27,16 @@ def render_locations(ostream, match):
|
||||
locations = list(sorted(match.get("locations", [])))
|
||||
if len(locations) == 1:
|
||||
ostream.write(" @ ")
|
||||
ostream.write(v.format_address(locations[0]))
|
||||
ostream.write(v.format_address(rd.deserialize_address(locations[0])))
|
||||
elif len(locations) > 1:
|
||||
ostream.write(" @ ")
|
||||
if len(locations) > 4:
|
||||
# don't display too many locations, because it becomes very noisy.
|
||||
# probably only the first handful of locations will be useful for inspection.
|
||||
ostream.write(", ".join(map(v.format_address, locations[0:4])))
|
||||
ostream.write(", ".join(map(lambda d: v.format_address(rd.deserialize_address(d)), locations[0:4])))
|
||||
ostream.write(", and %d more..." % (len(locations) - 4))
|
||||
else:
|
||||
ostream.write(", ".join(map(v.format_address, locations)))
|
||||
ostream.write(", ".join(map(lambda d: v.format_address(rd.deserialize_address(d)), locations)))
|
||||
|
||||
|
||||
def render_statement(ostream, match, statement, indent=0):
|
||||
@@ -211,9 +212,12 @@ def render_rules(ostream, doc):
|
||||
api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895
|
||||
"""
|
||||
functions_by_bb = {}
|
||||
for function, info in doc["meta"]["analysis"]["layout"]["functions"].items():
|
||||
for bb in info["matched_basic_blocks"]:
|
||||
functions_by_bb[bb] = function
|
||||
for finfo in doc["meta"]["analysis"]["layout"]["functions"]:
|
||||
faddress = rd.deserialize_address(finfo["address"])
|
||||
|
||||
for bb in finfo["matched_basic_blocks"]:
|
||||
bbaddress = rd.deserialize_address(bb["address"])
|
||||
functions_by_bb[bbaddress] = faddress
|
||||
|
||||
had_match = False
|
||||
|
||||
@@ -264,16 +268,19 @@ def render_rules(ostream, doc):
|
||||
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
|
||||
|
||||
if rule["meta"]["scope"] == capa.rules.FILE_SCOPE:
|
||||
matches = list(doc["rules"][rule["meta"]["name"]]["matches"].values())
|
||||
matches = doc["rules"][rule["meta"]["name"]]["matches"]
|
||||
if len(matches) != 1:
|
||||
# i think there should only ever be one match per file-scope rule,
|
||||
# because we do the file-scope evaluation a single time.
|
||||
# but i'm not 100% sure if this is/will always be true.
|
||||
# so, lets be explicit about our assumptions and raise an exception if they fail.
|
||||
raise RuntimeError("unexpected file scope match count: %d" % (len(matches)))
|
||||
render_match(ostream, matches[0], indent=0)
|
||||
first_address, first_match = matches[0]
|
||||
render_match(ostream, first_match, indent=0)
|
||||
else:
|
||||
for location, match in sorted(doc["rules"][rule["meta"]["name"]]["matches"].items()):
|
||||
for location, match in sorted(doc["rules"][rule["meta"]["name"]]["matches"]):
|
||||
location = rd.deserialize_address(location)
|
||||
|
||||
ostream.write(rule["meta"]["scope"])
|
||||
ostream.write(" @ ")
|
||||
ostream.write(capa.render.verbose.format_address(location))
|
||||
@@ -302,5 +309,5 @@ def render_vverbose(doc):
|
||||
|
||||
|
||||
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
|
||||
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
doc = rd.convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
return render_vverbose(doc)
|
||||
|
||||
@@ -438,6 +438,9 @@ def test_json_meta(capsys):
|
||||
assert capa.main.main([path, "-j"]) == 0
|
||||
std = capsys.readouterr()
|
||||
std_json = json.loads(std.out)
|
||||
# remember: json can't have integer keys :-(
|
||||
assert str(0x10001010) in std_json["meta"]["analysis"]["layout"]["functions"]
|
||||
assert 0x10001179 in std_json["meta"]["analysis"]["layout"]["functions"][str(0x10001010)]["matched_basic_blocks"]
|
||||
|
||||
assert ["absolute", 0x10001010] in map(lambda f: f["address"], std_json["meta"]["analysis"]["layout"]["functions"])
|
||||
|
||||
for addr, info in std_json["meta"]["analysis"]["layout"]["functions"]:
|
||||
if addr == ["absolute", 0x10001010]:
|
||||
assert {"address": ["absolute", 0x10001179]} in info["matched_basic_blocks"]
|
||||
|
||||
Reference in New Issue
Block a user