mirror of
https://github.com/mandiant/capa.git
synced 2026-02-04 19:12:01 -08:00
freeze: update freeze format v3
This commit is contained in:
@@ -9,10 +9,11 @@ Unless required by applicable law or agreed to in writing, software distributed
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
import json
|
||||
import zlib
|
||||
import logging
|
||||
from enum import Enum
|
||||
from typing import List, Tuple, Union
|
||||
from typing import List, Tuple, Union, Literal
|
||||
|
||||
from pydantic import Field, BaseModel, ConfigDict
|
||||
|
||||
@@ -39,6 +40,8 @@ from capa.features.extractors.base_extractor import (
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CURRENT_VERSION = 3
|
||||
|
||||
|
||||
class HashableModel(BaseModel):
|
||||
model_config = ConfigDict(frozen=True)
|
||||
@@ -325,9 +328,10 @@ class Extractor(BaseModel):
|
||||
|
||||
|
||||
class Freeze(BaseModel):
|
||||
version: int = 2
|
||||
version: int = CURRENT_VERSION
|
||||
base_address: Address = Field(alias="base address")
|
||||
sample_hashes: SampleHashes
|
||||
flavor: Literal["static", "dynamic"]
|
||||
extractor: Extractor
|
||||
features: Features
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
@@ -423,9 +427,10 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
|
||||
# Mypy is unable to recognise `global_` as a argument due to alias
|
||||
|
||||
freeze = Freeze(
|
||||
version=3,
|
||||
version=CURRENT_VERSION,
|
||||
base_address=Address.from_capa(extractor.get_base_address()),
|
||||
sample_hashes=extractor.get_sample_hashes(),
|
||||
flavor="static",
|
||||
extractor=Extractor(name=extractor.__class__.__name__),
|
||||
features=features,
|
||||
) # type: ignore
|
||||
@@ -527,24 +532,27 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
|
||||
base_addr = get_base_addr() if get_base_addr else capa.features.address.NO_ADDRESS
|
||||
|
||||
freeze = Freeze(
|
||||
version=3,
|
||||
version=CURRENT_VERSION,
|
||||
base_address=Address.from_capa(base_addr),
|
||||
sample_hashes=extractor.get_sample_hashes(),
|
||||
flavor="dynamic",
|
||||
extractor=Extractor(name=extractor.__class__.__name__),
|
||||
features=features,
|
||||
) # type: ignore
|
||||
# Mypy is unable to recognise `base_address` as a argument due to alias
|
||||
|
||||
return freeze.json()
|
||||
return freeze.model_dump_json()
|
||||
|
||||
|
||||
def loads_static(s: str) -> StaticFeatureExtractor:
|
||||
"""deserialize a set of features (as a NullStaticFeatureExtractor) from a string."""
|
||||
freeze = Freeze.model_validate_json(s)
|
||||
if freeze.version != 3:
|
||||
if freeze.version != CURRENT_VERSION:
|
||||
raise ValueError(f"unsupported freeze format version: {freeze.version}")
|
||||
|
||||
assert freeze.flavor == "static"
|
||||
assert isinstance(freeze.features, StaticFeatures)
|
||||
|
||||
return null.NullStaticFeatureExtractor(
|
||||
base_address=freeze.base_address.to_capa(),
|
||||
sample_hashes=freeze.sample_hashes,
|
||||
@@ -573,11 +581,13 @@ def loads_static(s: str) -> StaticFeatureExtractor:
|
||||
|
||||
def loads_dynamic(s: str) -> DynamicFeatureExtractor:
|
||||
"""deserialize a set of features (as a NullDynamicFeatureExtractor) from a string."""
|
||||
freeze = Freeze.parse_raw(s)
|
||||
if freeze.version != 3:
|
||||
freeze = Freeze.model_validate_json(s)
|
||||
if freeze.version != CURRENT_VERSION:
|
||||
raise ValueError(f"unsupported freeze format version: {freeze.version}")
|
||||
|
||||
assert freeze.flavor == "dynamic"
|
||||
assert isinstance(freeze.features, DynamicFeatures)
|
||||
|
||||
return null.NullDynamicFeatureExtractor(
|
||||
base_address=freeze.base_address.to_capa(),
|
||||
sample_hashes=freeze.sample_hashes,
|
||||
@@ -605,42 +615,51 @@ def loads_dynamic(s: str) -> DynamicFeatureExtractor:
|
||||
|
||||
|
||||
MAGIC = "capa0000".encode("ascii")
|
||||
STATIC_MAGIC = MAGIC + "-static".encode("ascii")
|
||||
DYNAMIC_MAGIC = MAGIC + "-dynamic".encode("ascii")
|
||||
|
||||
|
||||
def dumps(extractor: FeatureExtractor) -> str:
|
||||
"""serialize the given extractor to a string."""
|
||||
if isinstance(extractor, StaticFeatureExtractor):
|
||||
doc = dumps_static(extractor)
|
||||
elif isinstance(extractor, DynamicFeatureExtractor):
|
||||
doc = dumps_dynamic(extractor)
|
||||
else:
|
||||
raise ValueError("Invalid feature extractor")
|
||||
|
||||
return doc
|
||||
|
||||
|
||||
def dump(extractor: FeatureExtractor) -> bytes:
|
||||
"""serialize the given extractor to a byte array."""
|
||||
if isinstance(extractor, StaticFeatureExtractor):
|
||||
return STATIC_MAGIC + zlib.compress(dumps_static(extractor).encode("utf-8"))
|
||||
elif isinstance(extractor, DynamicFeatureExtractor):
|
||||
return DYNAMIC_MAGIC + zlib.compress(dumps_dynamic(extractor).encode("utf-8"))
|
||||
else:
|
||||
raise ValueError("Invalid feature extractor")
|
||||
return MAGIC + zlib.compress(dumps(extractor).encode("utf-8"))
|
||||
|
||||
|
||||
def is_freeze(buf: bytes) -> bool:
|
||||
return buf[: len(MAGIC)] == MAGIC
|
||||
|
||||
|
||||
def is_static_freeze(buf: bytes) -> bool:
|
||||
return buf[: len(STATIC_MAGIC)] == STATIC_MAGIC
|
||||
def loads(s: str):
|
||||
doc = json.loads(s)
|
||||
|
||||
if doc["version"] != CURRENT_VERSION:
|
||||
raise ValueError(f"unsupported freeze format version: {doc['version']}")
|
||||
|
||||
def is_dynamic_freeze(buf: bytes) -> bool:
|
||||
return buf[: len(DYNAMIC_MAGIC)] == DYNAMIC_MAGIC
|
||||
if doc["flavor"] == "static":
|
||||
return loads_static(s)
|
||||
elif doc["flavor"] == "dynamic":
|
||||
return loads_dynamic(s)
|
||||
else:
|
||||
raise ValueError(f"unsupported freeze format flavor: {doc['flavor']}")
|
||||
|
||||
|
||||
def load(buf: bytes):
|
||||
"""deserialize a set of features (as a NullFeatureExtractor) from a byte array."""
|
||||
if not is_freeze(buf):
|
||||
raise ValueError("missing magic header")
|
||||
if is_static_freeze(buf):
|
||||
return loads_static(zlib.decompress(buf[len(STATIC_MAGIC) :]).decode("utf-8"))
|
||||
elif is_dynamic_freeze(buf):
|
||||
return loads_dynamic(zlib.decompress(buf[len(DYNAMIC_MAGIC) :]).decode("utf-8"))
|
||||
else:
|
||||
raise ValueError("invalid magic header")
|
||||
|
||||
s = zlib.decompress(buf[len(MAGIC) :]).decode("utf-8")
|
||||
|
||||
return loads(s)
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
|
||||
Reference in New Issue
Block a user