diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 10deb40c..84969c86 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -9,10 +9,11 @@ Unless required by applicable law or agreed to in writing, software distributed is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import json import zlib import logging from enum import Enum -from typing import List, Tuple, Union +from typing import List, Tuple, Union, Literal from pydantic import Field, BaseModel, ConfigDict @@ -39,6 +40,8 @@ from capa.features.extractors.base_extractor import ( logger = logging.getLogger(__name__) +CURRENT_VERSION = 3 + class HashableModel(BaseModel): model_config = ConfigDict(frozen=True) @@ -325,9 +328,10 @@ class Extractor(BaseModel): class Freeze(BaseModel): - version: int = 2 + version: int = CURRENT_VERSION base_address: Address = Field(alias="base address") sample_hashes: SampleHashes + flavor: Literal["static", "dynamic"] extractor: Extractor features: Features model_config = ConfigDict(populate_by_name=True) @@ -423,9 +427,10 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str: # Mypy is unable to recognise `global_` as a argument due to alias freeze = Freeze( - version=3, + version=CURRENT_VERSION, base_address=Address.from_capa(extractor.get_base_address()), sample_hashes=extractor.get_sample_hashes(), + flavor="static", extractor=Extractor(name=extractor.__class__.__name__), features=features, ) # type: ignore @@ -527,24 +532,27 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str: base_addr = get_base_addr() if get_base_addr else capa.features.address.NO_ADDRESS freeze = Freeze( - version=3, + version=CURRENT_VERSION, base_address=Address.from_capa(base_addr), sample_hashes=extractor.get_sample_hashes(), + flavor="dynamic", extractor=Extractor(name=extractor.__class__.__name__), features=features, ) # type: ignore # Mypy is unable to recognise `base_address` as a argument due to alias - return freeze.json() + return freeze.model_dump_json() def loads_static(s: str) -> StaticFeatureExtractor: """deserialize a set of features (as a NullStaticFeatureExtractor) from a string.""" freeze = Freeze.model_validate_json(s) - if freeze.version != 3: + if freeze.version != CURRENT_VERSION: raise ValueError(f"unsupported freeze format version: {freeze.version}") + assert freeze.flavor == "static" assert isinstance(freeze.features, StaticFeatures) + return null.NullStaticFeatureExtractor( base_address=freeze.base_address.to_capa(), sample_hashes=freeze.sample_hashes, @@ -573,11 +581,13 @@ def loads_static(s: str) -> StaticFeatureExtractor: def loads_dynamic(s: str) -> DynamicFeatureExtractor: """deserialize a set of features (as a NullDynamicFeatureExtractor) from a string.""" - freeze = Freeze.parse_raw(s) - if freeze.version != 3: + freeze = Freeze.model_validate_json(s) + if freeze.version != CURRENT_VERSION: raise ValueError(f"unsupported freeze format version: {freeze.version}") + assert freeze.flavor == "dynamic" assert isinstance(freeze.features, DynamicFeatures) + return null.NullDynamicFeatureExtractor( base_address=freeze.base_address.to_capa(), sample_hashes=freeze.sample_hashes, @@ -605,42 +615,51 @@ def loads_dynamic(s: str) -> DynamicFeatureExtractor: MAGIC = "capa0000".encode("ascii") -STATIC_MAGIC = MAGIC + "-static".encode("ascii") -DYNAMIC_MAGIC = MAGIC + "-dynamic".encode("ascii") + + +def dumps(extractor: FeatureExtractor) -> str: + """serialize the given extractor to a string.""" + if isinstance(extractor, StaticFeatureExtractor): + doc = dumps_static(extractor) + elif isinstance(extractor, DynamicFeatureExtractor): + doc = dumps_dynamic(extractor) + else: + raise ValueError("Invalid feature extractor") + + return doc def dump(extractor: FeatureExtractor) -> bytes: """serialize the given extractor to a byte array.""" - if isinstance(extractor, StaticFeatureExtractor): - return STATIC_MAGIC + zlib.compress(dumps_static(extractor).encode("utf-8")) - elif isinstance(extractor, DynamicFeatureExtractor): - return DYNAMIC_MAGIC + zlib.compress(dumps_dynamic(extractor).encode("utf-8")) - else: - raise ValueError("Invalid feature extractor") + return MAGIC + zlib.compress(dumps(extractor).encode("utf-8")) def is_freeze(buf: bytes) -> bool: return buf[: len(MAGIC)] == MAGIC -def is_static_freeze(buf: bytes) -> bool: - return buf[: len(STATIC_MAGIC)] == STATIC_MAGIC +def loads(s: str): + doc = json.loads(s) + if doc["version"] != CURRENT_VERSION: + raise ValueError(f"unsupported freeze format version: {doc['version']}") -def is_dynamic_freeze(buf: bytes) -> bool: - return buf[: len(DYNAMIC_MAGIC)] == DYNAMIC_MAGIC + if doc["flavor"] == "static": + return loads_static(s) + elif doc["flavor"] == "dynamic": + return loads_dynamic(s) + else: + raise ValueError(f"unsupported freeze format flavor: {doc['flavor']}") def load(buf: bytes): """deserialize a set of features (as a NullFeatureExtractor) from a byte array.""" if not is_freeze(buf): raise ValueError("missing magic header") - if is_static_freeze(buf): - return loads_static(zlib.decompress(buf[len(STATIC_MAGIC) :]).decode("utf-8")) - elif is_dynamic_freeze(buf): - return loads_dynamic(zlib.decompress(buf[len(DYNAMIC_MAGIC) :]).decode("utf-8")) - else: - raise ValueError("invalid magic header") + + s = zlib.decompress(buf[len(MAGIC) :]).decode("utf-8") + + return loads(s) def main(argv=None):