From 6b6dd70110511c65fd00b650cbbb09b06c9f111b Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Tue, 24 May 2022 12:30:06 -0600 Subject: [PATCH] freeze: use address abstraction --- capa/features/freeze.py | 226 +++++++++++++++++++++++++--------------- tests/test_freeze.py | 76 +++++--------- 2 files changed, 171 insertions(+), 131 deletions(-) diff --git a/capa/features/freeze.py b/capa/features/freeze.py index ff465f77..32a64da4 100644 --- a/capa/features/freeze.py +++ b/capa/features/freeze.py @@ -1,40 +1,43 @@ """ capa freeze file format: `| capa0000 | + zlib(utf-8(json(...)))` -json format: +freeze document schema: { - 'version': 1, - 'base address': int(base address), - 'functions': { - int(function va): { - int(basic block va): [int(instruction va), ...] - ... - }, - ... - }, + 'version': 2, + 'base address': address(base address), + 'functions': [ + [address(function): [ + [address(basic block): [ + address(instruction), + address(instruction), + ...] + ], + ...] + ], + ...], 'scopes': { 'global': [ - (str(name), [any(arg), ...], int(va), ()), + (str(name), [any(arg), ...], address(_), ()), ... }, 'file': [ - (str(name), [any(arg), ...], int(va), ()), + (str(name), [any(arg), ...], address(_), ()), ... }, 'function': [ - (str(name), [any(arg), ...], int(va), (int(function va), )), + (str(name), [any(arg), ...], address(function), (address(function), )), ... ], 'basic block': [ - (str(name), [any(arg), ...], int(va), (int(function va), - int(basic block va))), + (str(name), [any(arg), ...], address(basic block), (address(function), + address(basic block))), ... ], 'instruction': [ - (str(name), [any(arg), ...], int(va), (int(function va), - int(basic block va), - int(instruction va))), + (str(name), [any(arg), ...], address(instruction), (int(function), + int(basic block), + int(instruction))), ... ], } @@ -57,9 +60,12 @@ import capa.helpers import capa.features.file import capa.features.insn import capa.features.common +import capa.features.address import capa.features.basicblock import capa.features.extractors.base_extractor +from capa.features.address import Address from capa.features.common import Feature +from capa.helpers import assert_never logger = logging.getLogger(__name__) @@ -71,27 +77,67 @@ def serialize_feature(feature): KNOWN_FEATURES: Dict[str, Type[Feature]] = {F.__name__: F for F in capa.features.common.Feature.__subclasses__()} KNOWN_FEATURES.update({F.__name__: F for F in capa.features.insn._Operand.__subclasses__()}) # type: ignore - def deserialize_feature(doc): F = KNOWN_FEATURES[doc[0]] return F.freeze_deserialize(doc[1]) -def dumps(extractor): +def serialize_address(a: Address) -> any: + if isinstance(a, capa.features.address.AbsoluteVirtualAddress): + return ("absolute", int(a)) + + elif isinstance(a, capa.features.address.RelativeVirtualAddress): + return ("relative", int(a)) + + elif isinstance(a, capa.features.address.FileOffsetAddress): + return ("file", int(a)) + + elif isinstance(a, capa.features.address.DNTokenAddress): + return ("dn token", a.token) + + elif isinstance(a, capa.features.address.DNTokenOffsetAddress): + return ("dn token offset", a.token, a.offset) + + elif a == capa.features.address.NO_ADDRESS: + return ("no address") + + else: + assert_never(a) + + +def deserialize_address(doc: any) -> Address: + atype = doc[0] + + if atype == "absolute": + return capa.features.address.AbsoluteVirtualAddress(doc[1]) + + elif atype == "relative": + return capa.features.address.RelativeVirtualAddress(doc[1]) + + elif atype == "file": + return capa.features.address.FileOffsetAddress(doc[1]) + + elif atype == "dn token": + return capa.features.address.DNTokenAddress(doc[1]) + + elif atype == "dn token offset": + return capa.features.address.DNTokenOffsetAddress(doc[1], doc[2]) + + elif doc == "no address": + return capa.features.address.NO_ADDRESS + + else: + assert_never(atype) + + +def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -> str: """ serialize the given extractor to a string - - args: - extractor: capa.features.extractors.base_extractor.FeatureExtractor: - - returns: - str: the serialized features. """ - hex = capa.helpers.hex ret = { - "version": 1, - "base address": extractor.get_base_address(), - "functions": {}, + "version": 2, + "base address": serialize_address(extractor.get_base_address()), + "functions": [], "scopes": { "global": [], "file": [], @@ -100,84 +146,95 @@ def dumps(extractor): "instruction": [], }, } - for feature, va in extractor.extract_global_features(): - ret["scopes"]["global"].append(serialize_feature(feature) + (hex(va), ())) + for feature, addr in extractor.extract_global_features(): + ret["scopes"]["global"].append(serialize_feature(feature) + (serialize_address(addr), ())) - for feature, va in extractor.extract_file_features(): - ret["scopes"]["file"].append(serialize_feature(feature) + (hex(va), ())) + for feature, addr in extractor.extract_file_features(): + ret["scopes"]["file"].append(serialize_feature(feature) + (serialize_address(addr), ())) for f in extractor.get_functions(): - ret["functions"][hex(f)] = {} + faddr = serialize_address(f.address) - for feature, va in extractor.extract_function_features(f): - ret["scopes"]["function"].append(serialize_feature(feature) + (hex(va), (hex(f),))) + for feature, addr in extractor.extract_function_features(f): + ret["scopes"]["function"].append(serialize_feature(feature) + (serialize_address(addr), (faddr,))) + fentries = [] for bb in extractor.get_basic_blocks(f): - ret["functions"][hex(f)][hex(bb)] = [] + bbaddr = serialize_address(bb.address) - for feature, va in extractor.extract_basic_block_features(f, bb): + for feature, addr in extractor.extract_basic_block_features(f, bb): ret["scopes"]["basic block"].append( serialize_feature(feature) + ( - hex(va), + serialize_address(addr), ( - hex(f), - hex(bb), + faddr, + bbaddr, ), ) ) - for insnva, insn in sorted( - [(int(insn), insn) for insn in extractor.get_instructions(f, bb)], key=lambda p: p[0] - ): - ret["functions"][hex(f)][hex(bb)].append(hex(insnva)) + bbentries = [] + for insn in extractor.get_instructions(f, bb): + iaddr = serialize_address(insn.address) - for feature, va in extractor.extract_insn_features(f, bb, insn): + for feature, addr in extractor.extract_insn_features(f, bb, insn): ret["scopes"]["instruction"].append( serialize_feature(feature) + ( - hex(va), + serialize_address(addr), ( - hex(f), - hex(bb), - hex(insnva), + faddr, + bbaddr, + iaddr, ), ) ) + + bbentries.append(iaddr) + + fentries.append((bbaddr, bbentries)) + + ret["functions"].append((faddr, fentries)) + return json.dumps(ret) -def loads(s): +def loads(s: str) -> capa.features.extractors.base_extractor.FeatureExtractor: """deserialize a set of features (as a NullFeatureExtractor) from a string.""" doc = json.loads(s) - if doc.get("version") != 1: + if doc.get("version") != 2: raise ValueError("unsupported freeze format version: %d" % (doc.get("version"))) features = { - "base address": doc.get("base address"), + "base address": deserialize_address(doc.get("base address")), "global features": [], "file features": [], "functions": {}, } - for fva, function in doc.get("functions", {}).items(): - fva = int(fva, 0x10) - features["functions"][fva] = { + for pair in doc.get("functions", []): + faddr, function = pair + + faddr = deserialize_address(faddr) + features["functions"][faddr] = { "features": [], "basic blocks": {}, } - for bbva, bb in function.items(): - bbva = int(bbva, 0x10) - features["functions"][fva]["basic blocks"][bbva] = { + for pair in function: + bbaddr, bb = pair + + bbaddr = deserialize_address(bbaddr) + features["functions"][faddr]["basic blocks"][bbaddr] = { "features": [], "instructions": {}, } - for insnva in bb: - insnva = int(insnva, 0x10) - features["functions"][fva]["basic blocks"][bbva]["instructions"][insnva] = { + for iaddr in bb: + iaddr = deserialize_address(iaddr) + features["functions"][faddr]["basic blocks"][bbaddr]["instructions"][iaddr] = { "features": [], } @@ -187,16 +244,16 @@ def loads(s): # ^^^^^^^^^^^^^ ^^^^^^^^^ ^^^^^^^^^^ ^^^^^^^^^^^^^^ # feature name args addr func/bb/insn for feature in doc.get("scopes", {}).get("global", []): - va, loc = feature[2:] - va = int(va, 0x10) + addr, loc = feature[2:] + addr = deserialize_address(addr) feature = deserialize_feature(feature[:2]) - features["global features"].append((va, feature)) + features["global features"].append((addr, feature)) for feature in doc.get("scopes", {}).get("file", []): - va, loc = feature[2:] - va = int(va, 0x10) + addr, loc = feature[2:] + addr = deserialize_address(addr) feature = deserialize_feature(feature[:2]) - features["file features"].append((va, feature)) + features["file features"].append((addr, feature)) for feature in doc.get("scopes", {}).get("function", []): # fetch the pair like: @@ -204,9 +261,10 @@ def loads(s): # ('0x401000', ('0x401000', )) # ^^^^^^^^^^ ^^^^^^^^^^^^^^ # addr func/bb/insn - va, loc = feature[2:] - va = int(va, 0x10) - loc = [int(lo, 0x10) for lo in loc] + addr, loc = feature[2:] + addr = deserialize_address(addr) + loc = list(map(deserialize_address, loc)) + faddr, = loc # decode the feature from the pair like: # @@ -214,21 +272,23 @@ def loads(s): # ^^^^^^^^^^^^^ ^^^^^^^^^ # feature name args feature = deserialize_feature(feature[:2]) - features["functions"][loc[0]]["features"].append((va, feature)) + features["functions"][faddr]["features"].append((addr, feature)) for feature in doc.get("scopes", {}).get("basic block", []): - va, loc = feature[2:] - va = int(va, 0x10) - loc = [int(lo, 0x10) for lo in loc] + addr, loc = feature[2:] + addr = deserialize_address(addr) + loc = list(map(deserialize_address, loc)) + faddr, bbaddr = loc feature = deserialize_feature(feature[:2]) - features["functions"][loc[0]]["basic blocks"][loc[1]]["features"].append((va, feature)) + features["functions"][faddr]["basic blocks"][bbaddr]["features"].append((addr, feature)) for feature in doc.get("scopes", {}).get("instruction", []): - va, loc = feature[2:] - va = int(va, 0x10) - loc = [int(lo, 0x10) for lo in loc] + addr, loc = feature[2:] + addr = deserialize_address(addr) + loc = list(map(deserialize_address, loc)) + faddr, bbaddr, iaddr = loc feature = deserialize_feature(feature[:2]) - features["functions"][loc[0]]["basic blocks"][loc[1]]["instructions"][loc[2]]["features"].append((va, feature)) + features["functions"][faddr]["basic blocks"][bbaddr]["instructions"][iaddr]["features"].append((addr, feature)) return capa.features.extractors.base_extractor.NullFeatureExtractor(features) @@ -236,7 +296,7 @@ def loads(s): MAGIC = "capa0000".encode("ascii") -def dump(extractor): +def dump(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -> bytes: """serialize the given extractor to a byte array.""" return MAGIC + zlib.compress(dumps(extractor).encode("utf-8")) @@ -245,7 +305,7 @@ def is_freeze(buf: bytes) -> bool: return buf[: len(MAGIC)] == MAGIC -def load(buf): +def load(buf: bytes) -> capa.features.extractors.base_extractor.FeatureExtractor: """deserialize a set of features (as a NullFeatureExtractor) from a byte array.""" if not is_freeze(buf): raise ValueError("missing magic header") diff --git a/tests/test_freeze.py b/tests/test_freeze.py index 0b3ba949..e9f86f07 100644 --- a/tests/test_freeze.py +++ b/tests/test_freeze.py @@ -6,6 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import textwrap +from typing import List from fixtures import * @@ -18,33 +19,34 @@ import capa.features.common import capa.features.freeze import capa.features.basicblock import capa.features.extractors.base_extractor +from capa.features.address import AbsoluteVirtualAddress EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor( { - "base address": 0x401000, + "base address": AbsoluteVirtualAddress(0x401000), "file features": [ - (0x402345, capa.features.common.Characteristic("embedded pe")), + (AbsoluteVirtualAddress(0x402345), capa.features.common.Characteristic("embedded pe")), ], "functions": { - 0x401000: { + AbsoluteVirtualAddress(0x401000): { "features": [ - (0x401000, capa.features.common.Characteristic("indirect call")), + (AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("indirect call")), ], "basic blocks": { - 0x401000: { + AbsoluteVirtualAddress(0x401000): { "features": [ - (0x401000, capa.features.common.Characteristic("tight loop")), + (AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("tight loop")), ], "instructions": { - 0x401000: { + AbsoluteVirtualAddress(0x401000): { "features": [ - (0x401000, capa.features.insn.Mnemonic("xor")), - (0x401000, capa.features.common.Characteristic("nzxor")), + (AbsoluteVirtualAddress(0x401000), capa.features.insn.Mnemonic("xor")), + (AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("nzxor")), ], }, - 0x401002: { + AbsoluteVirtualAddress(0x401002): { "features": [ - (0x401002, capa.features.insn.Mnemonic("mov")), + (AbsoluteVirtualAddress(0x401002), capa.features.insn.Mnemonic("mov")), ], }, }, @@ -56,10 +58,17 @@ EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor( ) +def addresses(s) -> List[Address]: + return list(sorted(map(lambda i: i.address, s))) + + def test_null_feature_extractor(): - assert list(EXTRACTOR.get_functions()) == [0x401000] - assert list(EXTRACTOR.get_basic_blocks(0x401000)) == [0x401000] - assert list(EXTRACTOR.get_instructions(0x401000, 0x0401000)) == [0x401000, 0x401002] + fh = FunctionHandle(AbsoluteVirtualAddress(0x401000), None) + bbh = BBHandle(AbsoluteVirtualAddress(0x401000), None) + + assert addresses(EXTRACTOR.get_functions()) == [AbsoluteVirtualAddress(0x401000)] + assert addresses(EXTRACTOR.get_basic_blocks(fh)) == [AbsoluteVirtualAddress(0x401000)] + assert addresses(EXTRACTOR.get_instructions(fh, bbh)) == [AbsoluteVirtualAddress(0x401000), AbsoluteVirtualAddress(0x401002)] rules = capa.rules.RuleSet( [ @@ -90,50 +99,21 @@ def compare_extractors(a, b): a (capa.features.extractors.NullFeatureExtractor) b (capa.features.extractors.NullFeatureExtractor) """ - - # TODO: ordering of these things probably doesn't work yet - assert list(a.extract_file_features()) == list(b.extract_file_features()) - assert list(a.get_functions()) == list(b.get_functions()) + + assert addresses(a.get_functions()) == addresses(b.get_functions()) for f in a.get_functions(): - assert list(a.get_basic_blocks(f)) == list(b.get_basic_blocks(f)) + assert addresses(a.get_basic_blocks(f)) == addresses(b.get_basic_blocks(f)) assert list(a.extract_function_features(f)) == list(b.extract_function_features(f)) for bb in a.get_basic_blocks(f): - assert list(a.get_instructions(f, bb)) == list(b.get_instructions(f, bb)) + assert addresses(a.get_instructions(f, bb)) == addresses(b.get_instructions(f, bb)) assert list(a.extract_basic_block_features(f, bb)) == list(b.extract_basic_block_features(f, bb)) for insn in a.get_instructions(f, bb): assert list(a.extract_insn_features(f, bb, insn)) == list(b.extract_insn_features(f, bb, insn)) -def compare_extractors_viv_null(viv_ext, null_ext): - """ - almost identical to compare_extractors but adds casts to ints since the VivisectFeatureExtractor returns objects - and NullFeatureExtractor returns ints - - args: - viv_ext (capa.features.extractors.viv.extractor.VivisectFeatureExtractor) - null_ext (capa.features.extractors.NullFeatureExtractor) - """ - assert list(viv_ext.extract_file_features()) == list(null_ext.extract_file_features()) - assert list(map(int, viv_ext.get_functions())) == list(null_ext.get_functions()) - for f in viv_ext.get_functions(): - assert list(map(int, viv_ext.get_basic_blocks(f))) == list(null_ext.get_basic_blocks(int(f))) - assert list(viv_ext.extract_function_features(f)) == list(null_ext.extract_function_features(int(f))) - - for bb in viv_ext.get_basic_blocks(f): - assert list(map(int, viv_ext.get_instructions(f, bb))) == list(null_ext.get_instructions(int(f), int(bb))) - assert list(viv_ext.extract_basic_block_features(f, bb)) == list( - null_ext.extract_basic_block_features(int(f), int(bb)) - ) - - for insn in viv_ext.get_instructions(f, bb): - assert list(viv_ext.extract_insn_features(f, bb, insn)) == list( - null_ext.extract_insn_features(int(f), int(bb), int(insn)) - ) - - def test_freeze_s_roundtrip(): load = capa.features.freeze.loads dump = capa.features.freeze.dumps @@ -184,4 +164,4 @@ def test_freeze_load_sample(tmpdir, z9324d_extractor): with open(o.strpath, "rb") as f: null_extractor = capa.features.freeze.load(f.read()) - compare_extractors_viv_null(z9324d_extractor, null_extractor) + compare_extractors(z9324d_extractor, null_extractor)