From 6b633efdba8d829dd15a82a601008bede072bb8c Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Wed, 25 May 2022 15:28:02 -0600 Subject: [PATCH] freeze: fix schema to support overlapping functions --- capa/features/freeze.py | 160 +++++++++++++++++----------------------- tests/test_freeze.py | 12 +-- 2 files changed, 72 insertions(+), 100 deletions(-) diff --git a/capa/features/freeze.py b/capa/features/freeze.py index 286f4f65..521196e4 100644 --- a/capa/features/freeze.py +++ b/capa/features/freeze.py @@ -13,7 +13,7 @@ import zlib import logging import collections from enum import Enum -from typing import Any, Dict, List, Type, Tuple, Set +from typing import Any, Set, Dict, List, Type, Tuple from pydantic import Field, BaseModel @@ -139,6 +139,7 @@ class FunctionFeature(HashableModel): function != address because, e.g., the feature may be found *within* the scope (function). versus right at its starting address. """ + function: Address address: Address feature: Feature @@ -153,6 +154,7 @@ class BasicBlockFeature(HashableModel): basic_block != address because, e.g., the feature may be found *within* the scope (basic block). versus right at its starting address. """ + basic_block: Address address: Address feature: Feature @@ -167,44 +169,44 @@ class InstructionFeature(HashableModel): instruction != address because, e.g., the feature may be found *within* the scope (basic block), versus right at its starting address. """ + instruction: Address address: Address feature: Feature -class Features(BaseModel): - global_: List[GlobalFeature] = Field(alias="global") - file: List[FileFeature] - function: List[FunctionFeature] - basic_block: List[BasicBlockFeature] = Field(alias="basic block") - instruction: List[InstructionFeature] +class InstructionFeatures(BaseModel): + address: Address + features: List[InstructionFeature] + + +class BasicBlockFeatures(BaseModel): + address: Address + features: List[BasicBlockFeature] + instructions: List[InstructionFeatures] + + +class FunctionFeatures(BaseModel): + address: Address + features: List[FunctionFeature] + basic_blocks: List[BasicBlockFeatures] = Field(alias="basic block") class Config: allow_population_by_field_name = True -class InstructionLayout(BaseModel): - address: Address +class Features(BaseModel): + global_: List[GlobalFeature] = Field(alias="global") + file: List[FileFeature] + functions: List[FunctionFeatures] - -class BasicBlockLayout(BaseModel): - address: Address - instructions: List[InstructionLayout] - - -class FunctionLayout(BaseModel): - address: Address - basic_blocks: List[BasicBlockLayout] - - -class Layout(BaseModel): - functions: List[FunctionLayout] + class Config: + allow_population_by_field_name = True class Freeze(BaseModel): version: int = 2 base_address: Address = Field(alias="base address") - layout: Layout features: Features class Config: @@ -233,85 +235,73 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) - ) ) - function_features: Set[FunctionFeature] = set() - basic_block_features: Set[BasicBlockFeature] = set() - instruction_features: Set[InstructionFeature] = set() - function_layouts: List[FunctionLayout] = [] - + function_features: List[FunctionFeatures] = [] for f in extractor.get_functions(): faddr = Address.from_capa(f.address) + ffeatures = [ + FunctionFeature( + function=faddr, + address=Address.from_capa(addr), + feature=Feature.from_capa(feature), + ) + for feature, addr in extractor.extract_function_features(f) + ] - for feature, addr in extractor.extract_function_features(f): - function_features.add( - FunctionFeature( - function=faddr, + basic_blocks = [] + for bb in extractor.get_basic_blocks(f): + bbaddr = Address.from_capa(bb.address) + bbfeatures = [ + BasicBlockFeature( + basic_block=bbaddr, address=Address.from_capa(addr), feature=Feature.from_capa(feature), ) - ) + for feature, addr in extractor.extract_basic_block_features(f, bb) + ] - basic_block_layouts: List[BasicBlockLayout] = [] - for bb in extractor.get_basic_blocks(f): - bbaddr = Address.from_capa(bb.address) - - for feature, addr in extractor.extract_basic_block_features(f, bb): - basic_block_features.add( - BasicBlockFeature( - basic_block=bbaddr, + instructions = [] + for insn in extractor.get_instructions(f, bb): + iaddr = Address.from_capa(insn.address) + ifeatures = [ + InstructionFeature( + instruction=iaddr, address=Address.from_capa(addr), feature=Feature.from_capa(feature), ) - ) - - instruction_layouts: List[InstructionLayout] = [] - for insn in extractor.get_instructions(f, bb): - iaddr = Address.from_capa(insn.address) - - for feature, addr in extractor.extract_insn_features(f, bb, insn): - instruction_features.add( - InstructionFeature( - instruction=iaddr, - address=Address.from_capa(addr), - feature=Feature.from_capa(feature), - ) - ) - - instruction_layouts.append( - InstructionLayout( + for feature, addr in extractor.extract_insn_features(f, bb, insn) + ] + instructions.append( + InstructionFeatures( address=iaddr, + features=ifeatures, ) ) - basic_block_layouts.append( - BasicBlockLayout( + basic_blocks.append( + BasicBlockFeatures( address=bbaddr, - instructions=instruction_layouts, + features=bbfeatures, + instructions=instructions, ) ) - function_layouts.append( - FunctionLayout( + function_features.append( + FunctionFeatures( address=faddr, - basic_blocks=basic_block_layouts, + features=ffeatures, + basic_blocks=basic_blocks, ) ) - layout = Layout( - functions=function_layouts, - ) - features = Features( global_=global_features, file=file_features, - function=list(function_features), - basic_block=list(basic_block_features), - instruction=list(instruction_features), + functions=function_features, ) freeze = Freeze( version=2, base_address=Address.from_capa(extractor.get_base_address()), - layout=layout, features=features, ) @@ -326,37 +316,19 @@ def loads(s: str) -> capa.features.extractors.base_extractor.FeatureExtractor: if freeze.version != 2: raise ValueError("unsupported freeze format version: %d", freeze.version) - function_features_by_address: Dict[ - capa.features.address.Address, List[Tuple[capa.features.address.Address, capa.features.common.Feature]] - ] = collections.defaultdict(list) - for f in freeze.features.function: - function_features_by_address[f.function.to_capa()].append((f.address.to_capa(), f.feature.to_capa())) - - basic_block_features_by_address: Dict[ - capa.features.address.Address, List[Tuple[capa.features.address.Address, capa.features.common.Feature]] - ] = collections.defaultdict(list) - for bb in freeze.features.basic_block: - basic_block_features_by_address[bb.basic_block.to_capa()].append((bb.address.to_capa(), bb.feature.to_capa())) - - instruction_features_by_address: Dict[ - capa.features.address.Address, List[Tuple[capa.features.address.Address, capa.features.common.Feature]] - ] = collections.defaultdict(list) - for i in freeze.features.instruction: - instruction_features_by_address[i.instruction.to_capa()].append((i.address.to_capa(), i.feature.to_capa())) - return null.NullFeatureExtractor( base_address=freeze.base_address.to_capa(), global_features=[f.feature.to_capa() for f in freeze.features.global_], file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file], functions={ f.address.to_capa(): null.FunctionFeatures( - features=function_features_by_address.get(f.address.to_capa(), []), + features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in f.features], basic_blocks={ bb.address.to_capa(): null.BasicBlockFeatures( - features=basic_block_features_by_address.get(bb.address.to_capa(), []), + features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in bb.features], instructions={ i.address.to_capa(): null.InstructionFeatures( - features=instruction_features_by_address.get(i.address.to_capa(), []), + features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in i.features] ) for i in bb.instructions }, @@ -364,7 +336,7 @@ def loads(s: str) -> capa.features.extractors.base_extractor.FeatureExtractor: for bb in f.basic_blocks }, ) - for f in freeze.layout.functions + for f in freeze.features.functions }, ) diff --git a/tests/test_freeze.py b/tests/test_freeze.py index 56bf7405..d2ecc58c 100644 --- a/tests/test_freeze.py +++ b/tests/test_freeze.py @@ -106,14 +106,14 @@ def compare_extractors(a, b): for bb in a.get_basic_blocks(f): assert addresses(a.get_instructions(f, bb)) == addresses(b.get_instructions(f, bb)) - assert list(sorted(set(a.extract_basic_block_features(f, bb)))) == list(sorted(set(b.extract_basic_block_features(f, bb)))) + assert list(sorted(set(a.extract_basic_block_features(f, bb)))) == list( + sorted(set(b.extract_basic_block_features(f, bb))) + ) for insn in a.get_instructions(f, bb): - try: - assert list(sorted(set(a.extract_insn_features(f, bb, insn)))) == list(sorted(set(b.extract_insn_features(f, bb, insn)))) - except Exception as e: - from IPython import embed; embed() - raise + assert list(sorted(set(a.extract_insn_features(f, bb, insn)))) == list( + sorted(set(b.extract_insn_features(f, bb, insn))) + ) def test_freeze_str_roundtrip():