freeze: use address abstraction

This commit is contained in:
Willi Ballenthin
2022-05-24 12:30:06 -06:00
parent fc9681f6d5
commit 6b6dd70110
2 changed files with 171 additions and 131 deletions

View File

@@ -1,40 +1,43 @@
"""
capa freeze file format: `| capa0000 | + zlib(utf-8(json(...)))`
json format:
freeze document schema:
{
'version': 1,
'base address': int(base address),
'functions': {
int(function va): {
int(basic block va): [int(instruction va), ...]
...
},
...
},
'version': 2,
'base address': address(base address),
'functions': [
[address(function): [
[address(basic block): [
address(instruction),
address(instruction),
...]
],
...]
],
...],
'scopes': {
'global': [
(str(name), [any(arg), ...], int(va), ()),
(str(name), [any(arg), ...], address(_), ()),
...
},
'file': [
(str(name), [any(arg), ...], int(va), ()),
(str(name), [any(arg), ...], address(_), ()),
...
},
'function': [
(str(name), [any(arg), ...], int(va), (int(function va), )),
(str(name), [any(arg), ...], address(function), (address(function), )),
...
],
'basic block': [
(str(name), [any(arg), ...], int(va), (int(function va),
int(basic block va))),
(str(name), [any(arg), ...], address(basic block), (address(function),
address(basic block))),
...
],
'instruction': [
(str(name), [any(arg), ...], int(va), (int(function va),
int(basic block va),
int(instruction va))),
(str(name), [any(arg), ...], address(instruction), (int(function),
int(basic block),
int(instruction))),
...
],
}
@@ -57,9 +60,12 @@ import capa.helpers
import capa.features.file
import capa.features.insn
import capa.features.common
import capa.features.address
import capa.features.basicblock
import capa.features.extractors.base_extractor
from capa.features.address import Address
from capa.features.common import Feature
from capa.helpers import assert_never
logger = logging.getLogger(__name__)
@@ -71,27 +77,67 @@ def serialize_feature(feature):
KNOWN_FEATURES: Dict[str, Type[Feature]] = {F.__name__: F for F in capa.features.common.Feature.__subclasses__()}
KNOWN_FEATURES.update({F.__name__: F for F in capa.features.insn._Operand.__subclasses__()}) # type: ignore
def deserialize_feature(doc):
F = KNOWN_FEATURES[doc[0]]
return F.freeze_deserialize(doc[1])
def dumps(extractor):
def serialize_address(a: Address) -> any:
if isinstance(a, capa.features.address.AbsoluteVirtualAddress):
return ("absolute", int(a))
elif isinstance(a, capa.features.address.RelativeVirtualAddress):
return ("relative", int(a))
elif isinstance(a, capa.features.address.FileOffsetAddress):
return ("file", int(a))
elif isinstance(a, capa.features.address.DNTokenAddress):
return ("dn token", a.token)
elif isinstance(a, capa.features.address.DNTokenOffsetAddress):
return ("dn token offset", a.token, a.offset)
elif a == capa.features.address.NO_ADDRESS:
return ("no address")
else:
assert_never(a)
def deserialize_address(doc: any) -> Address:
atype = doc[0]
if atype == "absolute":
return capa.features.address.AbsoluteVirtualAddress(doc[1])
elif atype == "relative":
return capa.features.address.RelativeVirtualAddress(doc[1])
elif atype == "file":
return capa.features.address.FileOffsetAddress(doc[1])
elif atype == "dn token":
return capa.features.address.DNTokenAddress(doc[1])
elif atype == "dn token offset":
return capa.features.address.DNTokenOffsetAddress(doc[1], doc[2])
elif doc == "no address":
return capa.features.address.NO_ADDRESS
else:
assert_never(atype)
def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -> str:
"""
serialize the given extractor to a string
args:
extractor: capa.features.extractors.base_extractor.FeatureExtractor:
returns:
str: the serialized features.
"""
hex = capa.helpers.hex
ret = {
"version": 1,
"base address": extractor.get_base_address(),
"functions": {},
"version": 2,
"base address": serialize_address(extractor.get_base_address()),
"functions": [],
"scopes": {
"global": [],
"file": [],
@@ -100,84 +146,95 @@ def dumps(extractor):
"instruction": [],
},
}
for feature, va in extractor.extract_global_features():
ret["scopes"]["global"].append(serialize_feature(feature) + (hex(va), ()))
for feature, addr in extractor.extract_global_features():
ret["scopes"]["global"].append(serialize_feature(feature) + (serialize_address(addr), ()))
for feature, va in extractor.extract_file_features():
ret["scopes"]["file"].append(serialize_feature(feature) + (hex(va), ()))
for feature, addr in extractor.extract_file_features():
ret["scopes"]["file"].append(serialize_feature(feature) + (serialize_address(addr), ()))
for f in extractor.get_functions():
ret["functions"][hex(f)] = {}
faddr = serialize_address(f.address)
for feature, va in extractor.extract_function_features(f):
ret["scopes"]["function"].append(serialize_feature(feature) + (hex(va), (hex(f),)))
for feature, addr in extractor.extract_function_features(f):
ret["scopes"]["function"].append(serialize_feature(feature) + (serialize_address(addr), (faddr,)))
fentries = []
for bb in extractor.get_basic_blocks(f):
ret["functions"][hex(f)][hex(bb)] = []
bbaddr = serialize_address(bb.address)
for feature, va in extractor.extract_basic_block_features(f, bb):
for feature, addr in extractor.extract_basic_block_features(f, bb):
ret["scopes"]["basic block"].append(
serialize_feature(feature)
+ (
hex(va),
serialize_address(addr),
(
hex(f),
hex(bb),
faddr,
bbaddr,
),
)
)
for insnva, insn in sorted(
[(int(insn), insn) for insn in extractor.get_instructions(f, bb)], key=lambda p: p[0]
):
ret["functions"][hex(f)][hex(bb)].append(hex(insnva))
bbentries = []
for insn in extractor.get_instructions(f, bb):
iaddr = serialize_address(insn.address)
for feature, va in extractor.extract_insn_features(f, bb, insn):
for feature, addr in extractor.extract_insn_features(f, bb, insn):
ret["scopes"]["instruction"].append(
serialize_feature(feature)
+ (
hex(va),
serialize_address(addr),
(
hex(f),
hex(bb),
hex(insnva),
faddr,
bbaddr,
iaddr,
),
)
)
bbentries.append(iaddr)
fentries.append((bbaddr, bbentries))
ret["functions"].append((faddr, fentries))
return json.dumps(ret)
def loads(s):
def loads(s: str) -> capa.features.extractors.base_extractor.FeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
doc = json.loads(s)
if doc.get("version") != 1:
if doc.get("version") != 2:
raise ValueError("unsupported freeze format version: %d" % (doc.get("version")))
features = {
"base address": doc.get("base address"),
"base address": deserialize_address(doc.get("base address")),
"global features": [],
"file features": [],
"functions": {},
}
for fva, function in doc.get("functions", {}).items():
fva = int(fva, 0x10)
features["functions"][fva] = {
for pair in doc.get("functions", []):
faddr, function = pair
faddr = deserialize_address(faddr)
features["functions"][faddr] = {
"features": [],
"basic blocks": {},
}
for bbva, bb in function.items():
bbva = int(bbva, 0x10)
features["functions"][fva]["basic blocks"][bbva] = {
for pair in function:
bbaddr, bb = pair
bbaddr = deserialize_address(bbaddr)
features["functions"][faddr]["basic blocks"][bbaddr] = {
"features": [],
"instructions": {},
}
for insnva in bb:
insnva = int(insnva, 0x10)
features["functions"][fva]["basic blocks"][bbva]["instructions"][insnva] = {
for iaddr in bb:
iaddr = deserialize_address(iaddr)
features["functions"][faddr]["basic blocks"][bbaddr]["instructions"][iaddr] = {
"features": [],
}
@@ -187,16 +244,16 @@ def loads(s):
# ^^^^^^^^^^^^^ ^^^^^^^^^ ^^^^^^^^^^ ^^^^^^^^^^^^^^
# feature name args addr func/bb/insn
for feature in doc.get("scopes", {}).get("global", []):
va, loc = feature[2:]
va = int(va, 0x10)
addr, loc = feature[2:]
addr = deserialize_address(addr)
feature = deserialize_feature(feature[:2])
features["global features"].append((va, feature))
features["global features"].append((addr, feature))
for feature in doc.get("scopes", {}).get("file", []):
va, loc = feature[2:]
va = int(va, 0x10)
addr, loc = feature[2:]
addr = deserialize_address(addr)
feature = deserialize_feature(feature[:2])
features["file features"].append((va, feature))
features["file features"].append((addr, feature))
for feature in doc.get("scopes", {}).get("function", []):
# fetch the pair like:
@@ -204,9 +261,10 @@ def loads(s):
# ('0x401000', ('0x401000', ))
# ^^^^^^^^^^ ^^^^^^^^^^^^^^
# addr func/bb/insn
va, loc = feature[2:]
va = int(va, 0x10)
loc = [int(lo, 0x10) for lo in loc]
addr, loc = feature[2:]
addr = deserialize_address(addr)
loc = list(map(deserialize_address, loc))
faddr, = loc
# decode the feature from the pair like:
#
@@ -214,21 +272,23 @@ def loads(s):
# ^^^^^^^^^^^^^ ^^^^^^^^^
# feature name args
feature = deserialize_feature(feature[:2])
features["functions"][loc[0]]["features"].append((va, feature))
features["functions"][faddr]["features"].append((addr, feature))
for feature in doc.get("scopes", {}).get("basic block", []):
va, loc = feature[2:]
va = int(va, 0x10)
loc = [int(lo, 0x10) for lo in loc]
addr, loc = feature[2:]
addr = deserialize_address(addr)
loc = list(map(deserialize_address, loc))
faddr, bbaddr = loc
feature = deserialize_feature(feature[:2])
features["functions"][loc[0]]["basic blocks"][loc[1]]["features"].append((va, feature))
features["functions"][faddr]["basic blocks"][bbaddr]["features"].append((addr, feature))
for feature in doc.get("scopes", {}).get("instruction", []):
va, loc = feature[2:]
va = int(va, 0x10)
loc = [int(lo, 0x10) for lo in loc]
addr, loc = feature[2:]
addr = deserialize_address(addr)
loc = list(map(deserialize_address, loc))
faddr, bbaddr, iaddr = loc
feature = deserialize_feature(feature[:2])
features["functions"][loc[0]]["basic blocks"][loc[1]]["instructions"][loc[2]]["features"].append((va, feature))
features["functions"][faddr]["basic blocks"][bbaddr]["instructions"][iaddr]["features"].append((addr, feature))
return capa.features.extractors.base_extractor.NullFeatureExtractor(features)
@@ -236,7 +296,7 @@ def loads(s):
MAGIC = "capa0000".encode("ascii")
def dump(extractor):
def dump(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -> bytes:
"""serialize the given extractor to a byte array."""
return MAGIC + zlib.compress(dumps(extractor).encode("utf-8"))
@@ -245,7 +305,7 @@ def is_freeze(buf: bytes) -> bool:
return buf[: len(MAGIC)] == MAGIC
def load(buf):
def load(buf: bytes) -> capa.features.extractors.base_extractor.FeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a byte array."""
if not is_freeze(buf):
raise ValueError("missing magic header")

View File

@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import textwrap
from typing import List
from fixtures import *
@@ -18,33 +19,34 @@ import capa.features.common
import capa.features.freeze
import capa.features.basicblock
import capa.features.extractors.base_extractor
from capa.features.address import AbsoluteVirtualAddress
EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor(
{
"base address": 0x401000,
"base address": AbsoluteVirtualAddress(0x401000),
"file features": [
(0x402345, capa.features.common.Characteristic("embedded pe")),
(AbsoluteVirtualAddress(0x402345), capa.features.common.Characteristic("embedded pe")),
],
"functions": {
0x401000: {
AbsoluteVirtualAddress(0x401000): {
"features": [
(0x401000, capa.features.common.Characteristic("indirect call")),
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("indirect call")),
],
"basic blocks": {
0x401000: {
AbsoluteVirtualAddress(0x401000): {
"features": [
(0x401000, capa.features.common.Characteristic("tight loop")),
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("tight loop")),
],
"instructions": {
0x401000: {
AbsoluteVirtualAddress(0x401000): {
"features": [
(0x401000, capa.features.insn.Mnemonic("xor")),
(0x401000, capa.features.common.Characteristic("nzxor")),
(AbsoluteVirtualAddress(0x401000), capa.features.insn.Mnemonic("xor")),
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("nzxor")),
],
},
0x401002: {
AbsoluteVirtualAddress(0x401002): {
"features": [
(0x401002, capa.features.insn.Mnemonic("mov")),
(AbsoluteVirtualAddress(0x401002), capa.features.insn.Mnemonic("mov")),
],
},
},
@@ -56,10 +58,17 @@ EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor(
)
def addresses(s) -> List[Address]:
return list(sorted(map(lambda i: i.address, s)))
def test_null_feature_extractor():
assert list(EXTRACTOR.get_functions()) == [0x401000]
assert list(EXTRACTOR.get_basic_blocks(0x401000)) == [0x401000]
assert list(EXTRACTOR.get_instructions(0x401000, 0x0401000)) == [0x401000, 0x401002]
fh = FunctionHandle(AbsoluteVirtualAddress(0x401000), None)
bbh = BBHandle(AbsoluteVirtualAddress(0x401000), None)
assert addresses(EXTRACTOR.get_functions()) == [AbsoluteVirtualAddress(0x401000)]
assert addresses(EXTRACTOR.get_basic_blocks(fh)) == [AbsoluteVirtualAddress(0x401000)]
assert addresses(EXTRACTOR.get_instructions(fh, bbh)) == [AbsoluteVirtualAddress(0x401000), AbsoluteVirtualAddress(0x401002)]
rules = capa.rules.RuleSet(
[
@@ -90,50 +99,21 @@ def compare_extractors(a, b):
a (capa.features.extractors.NullFeatureExtractor)
b (capa.features.extractors.NullFeatureExtractor)
"""
# TODO: ordering of these things probably doesn't work yet
assert list(a.extract_file_features()) == list(b.extract_file_features())
assert list(a.get_functions()) == list(b.get_functions())
assert addresses(a.get_functions()) == addresses(b.get_functions())
for f in a.get_functions():
assert list(a.get_basic_blocks(f)) == list(b.get_basic_blocks(f))
assert addresses(a.get_basic_blocks(f)) == addresses(b.get_basic_blocks(f))
assert list(a.extract_function_features(f)) == list(b.extract_function_features(f))
for bb in a.get_basic_blocks(f):
assert list(a.get_instructions(f, bb)) == list(b.get_instructions(f, bb))
assert addresses(a.get_instructions(f, bb)) == addresses(b.get_instructions(f, bb))
assert list(a.extract_basic_block_features(f, bb)) == list(b.extract_basic_block_features(f, bb))
for insn in a.get_instructions(f, bb):
assert list(a.extract_insn_features(f, bb, insn)) == list(b.extract_insn_features(f, bb, insn))
def compare_extractors_viv_null(viv_ext, null_ext):
"""
almost identical to compare_extractors but adds casts to ints since the VivisectFeatureExtractor returns objects
and NullFeatureExtractor returns ints
args:
viv_ext (capa.features.extractors.viv.extractor.VivisectFeatureExtractor)
null_ext (capa.features.extractors.NullFeatureExtractor)
"""
assert list(viv_ext.extract_file_features()) == list(null_ext.extract_file_features())
assert list(map(int, viv_ext.get_functions())) == list(null_ext.get_functions())
for f in viv_ext.get_functions():
assert list(map(int, viv_ext.get_basic_blocks(f))) == list(null_ext.get_basic_blocks(int(f)))
assert list(viv_ext.extract_function_features(f)) == list(null_ext.extract_function_features(int(f)))
for bb in viv_ext.get_basic_blocks(f):
assert list(map(int, viv_ext.get_instructions(f, bb))) == list(null_ext.get_instructions(int(f), int(bb)))
assert list(viv_ext.extract_basic_block_features(f, bb)) == list(
null_ext.extract_basic_block_features(int(f), int(bb))
)
for insn in viv_ext.get_instructions(f, bb):
assert list(viv_ext.extract_insn_features(f, bb, insn)) == list(
null_ext.extract_insn_features(int(f), int(bb), int(insn))
)
def test_freeze_s_roundtrip():
load = capa.features.freeze.loads
dump = capa.features.freeze.dumps
@@ -184,4 +164,4 @@ def test_freeze_load_sample(tmpdir, z9324d_extractor):
with open(o.strpath, "rb") as f:
null_extractor = capa.features.freeze.load(f.read())
compare_extractors_viv_null(z9324d_extractor, null_extractor)
compare_extractors(z9324d_extractor, null_extractor)