freeze: use pydantic for (de)serialization

also, move null extractor to its own namespace
This commit is contained in:
Willi Ballenthin
2022-05-25 15:09:31 -06:00
parent b1fa5be7b1
commit adb425aeb3
6 changed files with 414 additions and 385 deletions

View File

@@ -18,43 +18,43 @@ import capa.features.insn
import capa.features.common
import capa.features.freeze
import capa.features.basicblock
import capa.features.extractors.null
import capa.features.extractors.base_extractor
from capa.features.address import AbsoluteVirtualAddress
EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor(
{
"base address": AbsoluteVirtualAddress(0x401000),
"file features": [
(AbsoluteVirtualAddress(0x402345), capa.features.common.Characteristic("embedded pe")),
],
"functions": {
AbsoluteVirtualAddress(0x401000): {
"features": [
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("indirect call")),
],
"basic blocks": {
AbsoluteVirtualAddress(0x401000): {
"features": [
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("tight loop")),
],
"instructions": {
AbsoluteVirtualAddress(0x401000): {
"features": [
(AbsoluteVirtualAddress(0x401000), capa.features.insn.Mnemonic("xor")),
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("nzxor")),
],
},
AbsoluteVirtualAddress(0x401002): {
"features": [
(AbsoluteVirtualAddress(0x401002), capa.features.insn.Mnemonic("mov")),
],
},
},
EXTRACTOR = capa.features.extractors.null.NullFeatureExtractor(
base_address=AbsoluteVirtualAddress(0x401000),
global_features=[],
file_features=[
(AbsoluteVirtualAddress(0x402345), capa.features.common.Characteristic("embedded pe")),
],
functions={
AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.FunctionFeatures(
features=[
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("indirect call")),
],
basic_blocks={
AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.BasicBlockFeatures(
features=[
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("tight loop")),
],
instructions={
AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.InstructionFeatures(
features=[
(AbsoluteVirtualAddress(0x401000), capa.features.insn.Mnemonic("xor")),
(AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("nzxor")),
],
),
AbsoluteVirtualAddress(0x401002): capa.features.extractors.null.InstructionFeatures(
features=[
(AbsoluteVirtualAddress(0x401002), capa.features.insn.Mnemonic("mov")),
],
),
},
},
),
},
},
}
),
},
)
@@ -97,34 +97,33 @@ def test_null_feature_extractor():
def compare_extractors(a, b):
"""
args:
a (capa.features.extractors.NullFeatureExtractor)
b (capa.features.extractors.NullFeatureExtractor)
"""
assert list(a.extract_file_features()) == list(b.extract_file_features())
assert addresses(a.get_functions()) == addresses(b.get_functions())
for f in a.get_functions():
assert addresses(a.get_basic_blocks(f)) == addresses(b.get_basic_blocks(f))
assert list(a.extract_function_features(f)) == list(b.extract_function_features(f))
assert list(sorted(set(a.extract_function_features(f)))) == list(sorted(set(b.extract_function_features(f))))
for bb in a.get_basic_blocks(f):
assert addresses(a.get_instructions(f, bb)) == addresses(b.get_instructions(f, bb))
assert list(a.extract_basic_block_features(f, bb)) == list(b.extract_basic_block_features(f, bb))
assert list(sorted(set(a.extract_basic_block_features(f, bb)))) == list(sorted(set(b.extract_basic_block_features(f, bb))))
for insn in a.get_instructions(f, bb):
assert list(a.extract_insn_features(f, bb, insn)) == list(b.extract_insn_features(f, bb, insn))
try:
assert list(sorted(set(a.extract_insn_features(f, bb, insn)))) == list(sorted(set(b.extract_insn_features(f, bb, insn))))
except Exception as e:
from IPython import embed; embed()
raise
def test_freeze_s_roundtrip():
def test_freeze_str_roundtrip():
load = capa.features.freeze.loads
dump = capa.features.freeze.dumps
reanimated = load(dump(EXTRACTOR))
compare_extractors(EXTRACTOR, reanimated)
def test_freeze_b_roundtrip():
def test_freeze_bytes_roundtrip():
load = capa.features.freeze.load
dump = capa.features.freeze.dump
reanimated = load(dump(EXTRACTOR))
@@ -132,9 +131,7 @@ def test_freeze_b_roundtrip():
def roundtrip_feature(feature):
serialize = capa.features.freeze.serialize_feature
deserialize = capa.features.freeze.deserialize_feature
assert feature == deserialize(serialize(feature))
assert feature == capa.features.freeze.Feature.from_capa(feature).to_capa()
def test_serialize_features():