freeze: model each features separately

This commit is contained in:
Willi Ballenthin
2022-05-25 17:12:02 -06:00
parent eb6de90059
commit 3879e33cce
3 changed files with 269 additions and 28 deletions

View File

@@ -11,10 +11,10 @@ See the License for the specific language governing permissions and limitations
"""
import zlib
import logging
import collections
from enum import Enum
from typing import Any, Set, Dict, List, Type, Tuple
from typing import Any, Set, Dict, List
from devtools import debug
from pydantic import Field, BaseModel
import capa.helpers
@@ -25,6 +25,7 @@ import capa.features.address
import capa.features.basicblock
import capa.features.extractors.base_extractor
from capa.helpers import assert_never
from capa.features.freeze.features import Feature, feature_from_capa
logger = logging.getLogger(__name__)
@@ -101,26 +102,6 @@ class Address(HashableModel):
assert_never(self.type)
KNOWN_FEATURES: Dict[str, Type[capa.features.common.Feature]] = {
F.__name__: F for F in capa.features.common.Feature.__subclasses__()
}
KNOWN_FEATURES.update({F.__name__: F for F in capa.features.insn._Operand.__subclasses__()}) # type: ignore
class Feature(HashableModel):
name: str
args: Tuple[Any, ...]
@classmethod
def from_capa(cls, f: capa.features.common.Feature) -> "Feature":
name, args = f.freeze_serialize()
return cls(name=name, args=tuple(args))
def to_capa(self) -> capa.features.common.Feature:
F = KNOWN_FEATURES[self.name]
return F.freeze_deserialize(self.args)
class GlobalFeature(HashableModel):
feature: Feature
@@ -222,7 +203,7 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -
for feature, _ in extractor.extract_global_features():
global_features.append(
GlobalFeature(
feature=Feature.from_capa(feature),
feature=feature_from_capa(feature),
)
)
@@ -230,7 +211,7 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -
for feature, address in extractor.extract_file_features():
file_features.append(
FileFeature(
feature=Feature.from_capa(feature),
feature=feature_from_capa(feature),
address=Address.from_capa(address),
)
)
@@ -242,7 +223,7 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -
FunctionFeature(
function=faddr,
address=Address.from_capa(addr),
feature=Feature.from_capa(feature),
feature=feature_from_capa(feature),
)
for feature, addr in extractor.extract_function_features(f)
]
@@ -254,7 +235,7 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -
BasicBlockFeature(
basic_block=bbaddr,
address=Address.from_capa(addr),
feature=Feature.from_capa(feature),
feature=feature_from_capa(feature),
)
for feature, addr in extractor.extract_basic_block_features(f, bb)
]
@@ -266,10 +247,11 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) -
InstructionFeature(
instruction=iaddr,
address=Address.from_capa(addr),
feature=Feature.from_capa(feature),
feature=debug(feature_from_capa(feature)),
)
for feature, addr in extractor.extract_insn_features(f, bb, insn)
]
instructions.append(
InstructionFeatures(
address=iaddr,

View File

@@ -0,0 +1,258 @@
import binascii
from typing import Any, Union
from pydantic import Field, BaseModel
import capa.features.file
import capa.features.insn
import capa.features.common
import capa.features.basicblock
class FeatureModel(BaseModel):
class Config:
frozen = True
allow_population_by_field_name = True
def to_capa(self) -> capa.features.common.Feature:
if isinstance(self, OSFeature):
return capa.features.common.OS(self.os)
elif isinstance(self, ArchFeature):
return capa.features.common.Arch(self.arch)
elif isinstance(self, FormatFeature):
return capa.features.common.Format(self.format)
elif isinstance(self, MatchedRuleFeature):
return capa.features.common.MatchedRule(self.match)
elif isinstance(
self,
CharacteristicFeature,
):
return capa.features.common.Characteristic(self.characteristic)
elif isinstance(self, ExportFeature):
return capa.features.file.Export(self.export)
elif isinstance(self, ImportFeature):
return capa.features.file.Import(self.import_)
elif isinstance(self, SectionFeature):
return capa.features.file.Section(self.section)
elif isinstance(self, FunctionNameFeature):
return capa.features.file.FunctionName(self.function_name)
elif isinstance(self, StringFeature):
return capa.features.common.String(self.string)
elif isinstance(self, BasicBlockFeature):
return capa.features.basicblock.BasicBlock()
elif isinstance(self, APIFeature):
return capa.features.insn.API(self.api)
elif isinstance(self, NumberFeature):
return capa.features.insn.Number(self.number)
elif isinstance(self, BytesFeature):
return capa.features.common.Bytes(binascii.unhexlify(self.bytes))
elif isinstance(self, OffsetFeature):
return capa.features.insn.Offset(self.offset)
elif isinstance(self, MnemonicFeature):
return capa.features.insn.Mnemonic(self.mnemonic)
elif isinstance(self, OperandNumberFeature):
return capa.features.insn.OperandNumber(
self.index,
self.operand_number,
)
elif isinstance(self, OperandOffsetFeature):
return capa.features.insn.OperandOffset(
self.index,
self.operand_offset,
)
else:
raise NotImplementedError(f"Feature.to_capa({type(self)}) not implemented")
def feature_from_capa(f: capa.features.common.Feature) -> "Feature":
if isinstance(f, capa.features.common.OS):
return OSFeature(os=f.value)
elif isinstance(f, capa.features.common.Arch):
return ArchFeature(arch=f.value)
elif isinstance(f, capa.features.common.Format):
return FormatFeature(format=f.value)
elif isinstance(f, capa.features.common.MatchedRule):
return MatchedRuleFeature(match=f.value)
elif isinstance(f, capa.features.common.Characteristic):
return CharacteristicFeature(characteristic=f.value)
elif isinstance(f, capa.features.file.Export):
return ExportFeature(export=f.value)
elif isinstance(f, capa.features.file.Import):
return ImportFeature(import_=f.value)
elif isinstance(f, capa.features.file.Section):
return SectionFeature(section=f.value)
elif isinstance(f, capa.features.file.FunctionName):
return FunctionNameFeature(function_name=f.value)
elif isinstance(f, capa.features.common.String):
return StringFeature(string=f.value)
elif isinstance(f, capa.features.basicblock.BasicBlock):
return BasicBlockFeature()
elif isinstance(f, capa.features.insn.API):
return APIFeature(api=f.value)
elif isinstance(f, capa.features.insn.Number):
return NumberFeature(number=f.value)
elif isinstance(f, capa.features.common.Bytes):
buf = f.value
assert isinstance(buf, bytes)
return BytesFeature(bytes=binascii.hexlify(buf).decode("ascii"))
elif isinstance(f, capa.features.insn.Offset):
return OffsetFeature(offset=f.value)
elif isinstance(f, capa.features.insn.Mnemonic):
return MnemonicFeature(mnemonic=f.value)
elif isinstance(f, capa.features.insn.OperandNumber):
return OperandNumberFeature(index=f.index, operand_number=f.value)
elif isinstance(f, capa.features.insn.OperandOffset):
return OperandOffsetFeature(index=f.index, operand_offset=f.value)
else:
raise NotImplementedError(f"feature_from_capa({type(f)}) not implemented")
class OSFeature(FeatureModel):
type: str = "os"
os: str
class ArchFeature(FeatureModel):
type: str = "arch"
arch: str
class FormatFeature(FeatureModel):
type: str = "format"
format: str
class MatchedRuleFeature(FeatureModel):
type: str = "match"
match: str
class CharacteristicFeature(FeatureModel):
type: str = "characteristic"
characteristic: str
class ExportFeature(FeatureModel):
type: str = "export"
export: str
class ImportFeature(FeatureModel):
type: str = "import"
import_: str = Field(alias="import")
class SectionFeature(FeatureModel):
type: str = "section"
section: str
class FunctionNameFeature(FeatureModel):
type: str = "function name"
function_name: str = Field(alias="function name")
class StringFeature(FeatureModel):
type: str = "string"
string: str
class BasicBlockFeature(FeatureModel):
type: str = "basic block"
class APIFeature(FeatureModel):
type: str = "api"
api: str
class NumberFeature(FeatureModel):
type: str = "number"
number: Union[int, float]
class BytesFeature(FeatureModel):
type: str = "bytes"
bytes: str
class OffsetFeature(FeatureModel):
type: str = "offset"
offset: int
class MnemonicFeature(FeatureModel):
type: str = "mnemonic"
mnemonic: str
class OperandNumberFeature(FeatureModel):
type: str = "operand number"
index: int
operand_number: int = Field(alias="operand number")
class OperandOffsetFeature(FeatureModel):
type: str = "operand offset"
index: int
operand_offset: int = Field(alias="operand offset")
Feature = Union[
OSFeature,
ArchFeature,
FormatFeature,
MatchedRuleFeature,
CharacteristicFeature,
ExportFeature,
ImportFeature,
SectionFeature,
FunctionNameFeature,
StringFeature,
APIFeature,
NumberFeature,
BytesFeature,
OffsetFeature,
MnemonicFeature,
OperandNumberFeature,
OperandOffsetFeature,
# this has to go last because...? pydantic fails to serialize correctly otherwise.
# possibly because this feature has no associated value?
BasicBlockFeature,
]

View File

@@ -131,7 +131,7 @@ def test_freeze_bytes_roundtrip():
def roundtrip_feature(feature):
assert feature == capa.features.freeze.Feature.from_capa(feature).to_capa()
assert feature == capa.features.freeze.feature_from_capa(feature).to_capa()
def test_serialize_features():
@@ -146,6 +146,7 @@ def test_serialize_features():
roundtrip_feature(capa.features.file.Export("BaseThreadInitThunk"))
roundtrip_feature(capa.features.file.Import("kernel32.IsWow64Process"))
roundtrip_feature(capa.features.file.Import("#11"))
roundtrip_feature(capa.features.insn.OperandOffset(0, 0x8))
def test_freeze_sample(tmpdir, z9324d_extractor):