diff --git a/capa/features/freeze.py b/capa/features/freeze/__init__.py similarity index 92% rename from capa/features/freeze.py rename to capa/features/freeze/__init__.py index 521196e4..4162e4ee 100644 --- a/capa/features/freeze.py +++ b/capa/features/freeze/__init__.py @@ -11,10 +11,10 @@ See the License for the specific language governing permissions and limitations """ import zlib import logging -import collections from enum import Enum -from typing import Any, Set, Dict, List, Type, Tuple +from typing import Any, Set, Dict, List +from devtools import debug from pydantic import Field, BaseModel import capa.helpers @@ -25,6 +25,7 @@ import capa.features.address import capa.features.basicblock import capa.features.extractors.base_extractor from capa.helpers import assert_never +from capa.features.freeze.features import Feature, feature_from_capa logger = logging.getLogger(__name__) @@ -101,26 +102,6 @@ class Address(HashableModel): assert_never(self.type) -KNOWN_FEATURES: Dict[str, Type[capa.features.common.Feature]] = { - F.__name__: F for F in capa.features.common.Feature.__subclasses__() -} -KNOWN_FEATURES.update({F.__name__: F for F in capa.features.insn._Operand.__subclasses__()}) # type: ignore - - -class Feature(HashableModel): - name: str - args: Tuple[Any, ...] - - @classmethod - def from_capa(cls, f: capa.features.common.Feature) -> "Feature": - name, args = f.freeze_serialize() - return cls(name=name, args=tuple(args)) - - def to_capa(self) -> capa.features.common.Feature: - F = KNOWN_FEATURES[self.name] - return F.freeze_deserialize(self.args) - - class GlobalFeature(HashableModel): feature: Feature @@ -222,7 +203,7 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) - for feature, _ in extractor.extract_global_features(): global_features.append( GlobalFeature( - feature=Feature.from_capa(feature), + feature=feature_from_capa(feature), ) ) @@ -230,7 +211,7 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) - for feature, address in extractor.extract_file_features(): file_features.append( FileFeature( - feature=Feature.from_capa(feature), + feature=feature_from_capa(feature), address=Address.from_capa(address), ) ) @@ -242,7 +223,7 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) - FunctionFeature( function=faddr, address=Address.from_capa(addr), - feature=Feature.from_capa(feature), + feature=feature_from_capa(feature), ) for feature, addr in extractor.extract_function_features(f) ] @@ -254,7 +235,7 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) - BasicBlockFeature( basic_block=bbaddr, address=Address.from_capa(addr), - feature=Feature.from_capa(feature), + feature=feature_from_capa(feature), ) for feature, addr in extractor.extract_basic_block_features(f, bb) ] @@ -266,10 +247,11 @@ def dumps(extractor: capa.features.extractors.base_extractor.FeatureExtractor) - InstructionFeature( instruction=iaddr, address=Address.from_capa(addr), - feature=Feature.from_capa(feature), + feature=debug(feature_from_capa(feature)), ) for feature, addr in extractor.extract_insn_features(f, bb, insn) ] + instructions.append( InstructionFeatures( address=iaddr, diff --git a/capa/features/freeze/features.py b/capa/features/freeze/features.py new file mode 100644 index 00000000..e16a169f --- /dev/null +++ b/capa/features/freeze/features.py @@ -0,0 +1,258 @@ +import binascii +from typing import Any, Union + +from pydantic import Field, BaseModel + +import capa.features.file +import capa.features.insn +import capa.features.common +import capa.features.basicblock + + +class FeatureModel(BaseModel): + class Config: + frozen = True + allow_population_by_field_name = True + + def to_capa(self) -> capa.features.common.Feature: + if isinstance(self, OSFeature): + return capa.features.common.OS(self.os) + + elif isinstance(self, ArchFeature): + return capa.features.common.Arch(self.arch) + + elif isinstance(self, FormatFeature): + return capa.features.common.Format(self.format) + + elif isinstance(self, MatchedRuleFeature): + return capa.features.common.MatchedRule(self.match) + + elif isinstance( + self, + CharacteristicFeature, + ): + return capa.features.common.Characteristic(self.characteristic) + + elif isinstance(self, ExportFeature): + return capa.features.file.Export(self.export) + + elif isinstance(self, ImportFeature): + return capa.features.file.Import(self.import_) + + elif isinstance(self, SectionFeature): + return capa.features.file.Section(self.section) + + elif isinstance(self, FunctionNameFeature): + return capa.features.file.FunctionName(self.function_name) + + elif isinstance(self, StringFeature): + return capa.features.common.String(self.string) + + elif isinstance(self, BasicBlockFeature): + return capa.features.basicblock.BasicBlock() + + elif isinstance(self, APIFeature): + return capa.features.insn.API(self.api) + + elif isinstance(self, NumberFeature): + return capa.features.insn.Number(self.number) + + elif isinstance(self, BytesFeature): + return capa.features.common.Bytes(binascii.unhexlify(self.bytes)) + + elif isinstance(self, OffsetFeature): + return capa.features.insn.Offset(self.offset) + + elif isinstance(self, MnemonicFeature): + return capa.features.insn.Mnemonic(self.mnemonic) + + elif isinstance(self, OperandNumberFeature): + return capa.features.insn.OperandNumber( + self.index, + self.operand_number, + ) + + elif isinstance(self, OperandOffsetFeature): + return capa.features.insn.OperandOffset( + self.index, + self.operand_offset, + ) + + else: + raise NotImplementedError(f"Feature.to_capa({type(self)}) not implemented") + + +def feature_from_capa(f: capa.features.common.Feature) -> "Feature": + if isinstance(f, capa.features.common.OS): + return OSFeature(os=f.value) + + elif isinstance(f, capa.features.common.Arch): + return ArchFeature(arch=f.value) + + elif isinstance(f, capa.features.common.Format): + return FormatFeature(format=f.value) + + elif isinstance(f, capa.features.common.MatchedRule): + return MatchedRuleFeature(match=f.value) + + elif isinstance(f, capa.features.common.Characteristic): + return CharacteristicFeature(characteristic=f.value) + + elif isinstance(f, capa.features.file.Export): + return ExportFeature(export=f.value) + + elif isinstance(f, capa.features.file.Import): + return ImportFeature(import_=f.value) + + elif isinstance(f, capa.features.file.Section): + return SectionFeature(section=f.value) + + elif isinstance(f, capa.features.file.FunctionName): + return FunctionNameFeature(function_name=f.value) + + elif isinstance(f, capa.features.common.String): + return StringFeature(string=f.value) + + elif isinstance(f, capa.features.basicblock.BasicBlock): + return BasicBlockFeature() + + elif isinstance(f, capa.features.insn.API): + return APIFeature(api=f.value) + + elif isinstance(f, capa.features.insn.Number): + return NumberFeature(number=f.value) + + elif isinstance(f, capa.features.common.Bytes): + buf = f.value + assert isinstance(buf, bytes) + return BytesFeature(bytes=binascii.hexlify(buf).decode("ascii")) + + elif isinstance(f, capa.features.insn.Offset): + return OffsetFeature(offset=f.value) + + elif isinstance(f, capa.features.insn.Mnemonic): + return MnemonicFeature(mnemonic=f.value) + + elif isinstance(f, capa.features.insn.OperandNumber): + return OperandNumberFeature(index=f.index, operand_number=f.value) + + elif isinstance(f, capa.features.insn.OperandOffset): + return OperandOffsetFeature(index=f.index, operand_offset=f.value) + + else: + raise NotImplementedError(f"feature_from_capa({type(f)}) not implemented") + + +class OSFeature(FeatureModel): + type: str = "os" + os: str + + +class ArchFeature(FeatureModel): + type: str = "arch" + arch: str + + +class FormatFeature(FeatureModel): + type: str = "format" + format: str + + +class MatchedRuleFeature(FeatureModel): + type: str = "match" + match: str + + +class CharacteristicFeature(FeatureModel): + type: str = "characteristic" + characteristic: str + + +class ExportFeature(FeatureModel): + type: str = "export" + export: str + + +class ImportFeature(FeatureModel): + type: str = "import" + import_: str = Field(alias="import") + + +class SectionFeature(FeatureModel): + type: str = "section" + section: str + + +class FunctionNameFeature(FeatureModel): + type: str = "function name" + function_name: str = Field(alias="function name") + + +class StringFeature(FeatureModel): + type: str = "string" + string: str + + +class BasicBlockFeature(FeatureModel): + type: str = "basic block" + + +class APIFeature(FeatureModel): + type: str = "api" + api: str + + +class NumberFeature(FeatureModel): + type: str = "number" + number: Union[int, float] + + +class BytesFeature(FeatureModel): + type: str = "bytes" + bytes: str + + +class OffsetFeature(FeatureModel): + type: str = "offset" + offset: int + + +class MnemonicFeature(FeatureModel): + type: str = "mnemonic" + mnemonic: str + + +class OperandNumberFeature(FeatureModel): + type: str = "operand number" + index: int + operand_number: int = Field(alias="operand number") + + +class OperandOffsetFeature(FeatureModel): + type: str = "operand offset" + index: int + operand_offset: int = Field(alias="operand offset") + + +Feature = Union[ + OSFeature, + ArchFeature, + FormatFeature, + MatchedRuleFeature, + CharacteristicFeature, + ExportFeature, + ImportFeature, + SectionFeature, + FunctionNameFeature, + StringFeature, + APIFeature, + NumberFeature, + BytesFeature, + OffsetFeature, + MnemonicFeature, + OperandNumberFeature, + OperandOffsetFeature, + # this has to go last because...? pydantic fails to serialize correctly otherwise. + # possibly because this feature has no associated value? + BasicBlockFeature, +] diff --git a/tests/test_freeze.py b/tests/test_freeze.py index d2ecc58c..bb449097 100644 --- a/tests/test_freeze.py +++ b/tests/test_freeze.py @@ -131,7 +131,7 @@ def test_freeze_bytes_roundtrip(): def roundtrip_feature(feature): - assert feature == capa.features.freeze.Feature.from_capa(feature).to_capa() + assert feature == capa.features.freeze.feature_from_capa(feature).to_capa() def test_serialize_features(): @@ -146,6 +146,7 @@ def test_serialize_features(): roundtrip_feature(capa.features.file.Export("BaseThreadInitThunk")) roundtrip_feature(capa.features.file.Import("kernel32.IsWow64Process")) roundtrip_feature(capa.features.file.Import("#11")) + roundtrip_feature(capa.features.insn.OperandOffset(0, 0x8)) def test_freeze_sample(tmpdir, z9324d_extractor):