freeze: fix schema to support overlapping functions

This commit is contained in:
Willi Ballenthin
2022-05-25 15:28:02 -06:00
parent 02cef8297c
commit 6b633efdba
2 changed files with 72 additions and 100 deletions

View File

@@ -13,7 +13,7 @@ import zlib
import logging
import collections
from enum import Enum
from typing import Any, Dict, List, Type, Tuple, Set
from typing import Any, Set, Dict, List, Type, Tuple
from pydantic import Field, BaseModel
@@ -139,6 +139,7 @@ class FunctionFeature(HashableModel):
function != address because, e.g., the feature may be found *within* the scope (function).
versus right at its starting address.
"""
function: Address
address: Address
feature: Feature
@@ -153,6 +154,7 @@ class BasicBlockFeature(HashableModel):
basic_block != address because, e.g., the feature may be found *within* the scope (basic block).
versus right at its starting address.
"""
basic_block: Address
address: Address
feature: Feature
@@ -167,44 +169,44 @@ class InstructionFeature(HashableModel):
instruction != address because, e.g., the feature may be found *within* the scope (basic block),
versus right at its starting address.
"""
instruction: Address
address: Address
feature: Feature
class Features(BaseModel):
global_: List[GlobalFeature] = Field(alias="global")
file: List[FileFeature]
function: List[FunctionFeature]
basic_block: List[BasicBlockFeature] = Field(alias="basic block")
instruction: List[InstructionFeature]
class InstructionFeatures(BaseModel):
address: Address
features: List[InstructionFeature]
class BasicBlockFeatures(BaseModel):
address: Address
features: List[BasicBlockFeature]
instructions: List[InstructionFeatures]
class FunctionFeatures(BaseModel):
address: Address
features: List[FunctionFeature]
basic_blocks: List[BasicBlockFeatures] = Field(alias="basic block")
class Config:
allow_population_by_field_name = True
class InstructionLayout(BaseModel):
address: Address
class Features(BaseModel):
global_: List[GlobalFeature] = Field(alias="global")
file: List[FileFeature]
functions: List[FunctionFeatures]
class BasicBlockLayout(BaseModel):
address: Address
instructions: List[InstructionLayout]
class FunctionLayout(BaseModel):
address: Address
basic_blocks: List[BasicBlockLayout]
class Layout(BaseModel):
functions: List[FunctionLayout]
class Config:
allow_population_by_field_name = True
class Freeze(BaseModel):
version: int = 2
base_address: Address = Field(alias="base address")
layout: Layout
features: Features
class Config:
@@ -233,85 +235,73 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -
)
)
function_features: Set[FunctionFeature] = set()
basic_block_features: Set[BasicBlockFeature] = set()
instruction_features: Set[InstructionFeature] = set()
function_layouts: List[FunctionLayout] = []
function_features: List[FunctionFeatures] = []
for f in extractor.get_functions():
faddr = Address.from_capa(f.address)
for feature, addr in extractor.extract_function_features(f):
function_features.add(
ffeatures = [
FunctionFeature(
function=faddr,
address=Address.from_capa(addr),
feature=Feature.from_capa(feature),
)
)
for feature, addr in extractor.extract_function_features(f)
]
basic_block_layouts: List[BasicBlockLayout] = []
basic_blocks = []
for bb in extractor.get_basic_blocks(f):
bbaddr = Address.from_capa(bb.address)
for feature, addr in extractor.extract_basic_block_features(f, bb):
basic_block_features.add(
bbfeatures = [
BasicBlockFeature(
basic_block=bbaddr,
address=Address.from_capa(addr),
feature=Feature.from_capa(feature),
)
)
for feature, addr in extractor.extract_basic_block_features(f, bb)
]
instruction_layouts: List[InstructionLayout] = []
instructions = []
for insn in extractor.get_instructions(f, bb):
iaddr = Address.from_capa(insn.address)
for feature, addr in extractor.extract_insn_features(f, bb, insn):
instruction_features.add(
ifeatures = [
InstructionFeature(
instruction=iaddr,
address=Address.from_capa(addr),
feature=Feature.from_capa(feature),
)
)
instruction_layouts.append(
InstructionLayout(
for feature, addr in extractor.extract_insn_features(f, bb, insn)
]
instructions.append(
InstructionFeatures(
address=iaddr,
features=ifeatures,
)
)
basic_block_layouts.append(
BasicBlockLayout(
basic_blocks.append(
BasicBlockFeatures(
address=bbaddr,
instructions=instruction_layouts,
features=bbfeatures,
instructions=instructions,
)
)
function_layouts.append(
FunctionLayout(
function_features.append(
FunctionFeatures(
address=faddr,
basic_blocks=basic_block_layouts,
features=ffeatures,
basic_blocks=basic_blocks,
)
)
layout = Layout(
functions=function_layouts,
)
features = Features(
global_=global_features,
file=file_features,
function=list(function_features),
basic_block=list(basic_block_features),
instruction=list(instruction_features),
functions=function_features,
)
freeze = Freeze(
version=2,
base_address=Address.from_capa(extractor.get_base_address()),
layout=layout,
features=features,
)
@@ -326,37 +316,19 @@ def loads(s: str) -> capa.features.extractors.base_extractor.FeatureExtractor:
if freeze.version != 2:
raise ValueError("unsupported freeze format version: %d", freeze.version)
function_features_by_address: Dict[
capa.features.address.Address, List[Tuple[capa.features.address.Address, capa.features.common.Feature]]
] = collections.defaultdict(list)
for f in freeze.features.function:
function_features_by_address[f.function.to_capa()].append((f.address.to_capa(), f.feature.to_capa()))
basic_block_features_by_address: Dict[
capa.features.address.Address, List[Tuple[capa.features.address.Address, capa.features.common.Feature]]
] = collections.defaultdict(list)
for bb in freeze.features.basic_block:
basic_block_features_by_address[bb.basic_block.to_capa()].append((bb.address.to_capa(), bb.feature.to_capa()))
instruction_features_by_address: Dict[
capa.features.address.Address, List[Tuple[capa.features.address.Address, capa.features.common.Feature]]
] = collections.defaultdict(list)
for i in freeze.features.instruction:
instruction_features_by_address[i.instruction.to_capa()].append((i.address.to_capa(), i.feature.to_capa()))
return null.NullFeatureExtractor(
base_address=freeze.base_address.to_capa(),
global_features=[f.feature.to_capa() for f in freeze.features.global_],
file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file],
functions={
f.address.to_capa(): null.FunctionFeatures(
features=function_features_by_address.get(f.address.to_capa(), []),
features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in f.features],
basic_blocks={
bb.address.to_capa(): null.BasicBlockFeatures(
features=basic_block_features_by_address.get(bb.address.to_capa(), []),
features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in bb.features],
instructions={
i.address.to_capa(): null.InstructionFeatures(
features=instruction_features_by_address.get(i.address.to_capa(), []),
features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in i.features]
)
for i in bb.instructions
},
@@ -364,7 +336,7 @@ def loads(s: str) -> capa.features.extractors.base_extractor.FeatureExtractor:
for bb in f.basic_blocks
},
)
for f in freeze.layout.functions
for f in freeze.features.functions
},
)

View File

@@ -106,14 +106,14 @@ def compare_extractors(a, b):
for bb in a.get_basic_blocks(f):
assert addresses(a.get_instructions(f, bb)) == addresses(b.get_instructions(f, bb))
assert list(sorted(set(a.extract_basic_block_features(f, bb)))) == list(sorted(set(b.extract_basic_block_features(f, bb))))
assert list(sorted(set(a.extract_basic_block_features(f, bb)))) == list(
sorted(set(b.extract_basic_block_features(f, bb)))
)
for insn in a.get_instructions(f, bb):
try:
assert list(sorted(set(a.extract_insn_features(f, bb, insn)))) == list(sorted(set(b.extract_insn_features(f, bb, insn))))
except Exception as e:
from IPython import embed; embed()
raise
assert list(sorted(set(a.extract_insn_features(f, bb, insn)))) == list(
sorted(set(b.extract_insn_features(f, bb, insn)))
)
def test_freeze_str_roundtrip():