Merge branch 'process-thread-addresses' of https://github.com/yelhamer/capa into yelhamer-process-thread-addresses

This commit is contained in:
Yacine Elhamer
2023-07-11 12:07:20 +01:00
14 changed files with 357 additions and 44 deletions

View File

@@ -10,10 +10,14 @@
- Add a CAPE file format and CAPE-based dynamic feature extraction to scripts/show-features.py #1566 @yelhamer - Add a CAPE file format and CAPE-based dynamic feature extraction to scripts/show-features.py #1566 @yelhamer
- Add a new process scope for the dynamic analysis flavor #1517 @yelhamer - Add a new process scope for the dynamic analysis flavor #1517 @yelhamer
- Add a new thread scope for the dynamic analysis flavor #1517 @yelhamer - Add a new thread scope for the dynamic analysis flavor #1517 @yelhamer
<<<<<<< HEAD
- use fancy box drawing characters for default output #1586 @williballenthin - use fancy box drawing characters for default output #1586 @williballenthin
- use [pre-commit](https://pre-commit.com/) to invoke linters #1579 @williballenthin - use [pre-commit](https://pre-commit.com/) to invoke linters #1579 @williballenthin
- publish via PyPI trusted publishing #1491 @williballenthin - publish via PyPI trusted publishing #1491 @williballenthin
- migrate to pyproject.toml #1301 @williballenthin - migrate to pyproject.toml #1301 @williballenthin
=======
- Add ProcessesAddress and ThreadAddress #1612 @yelhamer
>>>>>>> 64a16314abef9647f1729a4b5c3e2c21c41e1f9f
### Breaking Changes ### Breaking Changes
- Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat - Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat

View File

@@ -36,6 +36,54 @@ class AbsoluteVirtualAddress(int, Address):
return int.__hash__(self) return int.__hash__(self)
class ProcessAddress(Address):
"""an address of a process in a dynamic execution trace"""
def __init__(self, pid: int, ppid: int = 0):
assert ppid >= 0
assert pid > 0
self.ppid = ppid
self.pid = pid
def __repr__(self):
return "process(%s%s)" % (
f"ppid: {self.ppid}, " if self.ppid > 0 else "",
f"pid: {self.pid}",
)
def __hash__(self):
return hash((self.ppid, self.pid))
def __eq__(self, other):
assert isinstance(other, ProcessAddress)
return (self.ppid, self.pid) == (other.ppid, other.pid)
def __lt__(self, other):
return (self.ppid, self.pid) < (other.ppid, other.pid)
class ThreadAddress(Address):
"""addresses a thread in a dynamic execution trace"""
def __init__(self, process: ProcessAddress, tid: int):
assert tid >= 0
self.process = process
self.tid = tid
def __repr__(self):
return f"thread(tid: {self.tid})"
def __hash__(self):
return hash((self.process, self.tid))
def __eq__(self, other):
assert isinstance(other, ThreadAddress)
return (self.process, self.tid) == (other.process, other.tid)
def __lt__(self, other):
return (self.process, self.tid) < (other.process, other.tid)
class DynamicAddress(Address): class DynamicAddress(Address):
"""an address from a dynamic analysis trace""" """an address from a dynamic analysis trace"""

View File

@@ -15,7 +15,7 @@ from typing_extensions import TypeAlias
import capa.features.address import capa.features.address
from capa.features.common import Feature from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.address import Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress
# feature extractors may reference functions, BBs, insns by opaque handle values. # feature extractors may reference functions, BBs, insns by opaque handle values.
# you can use the `.address` property to get and render the address of the feature. # you can use the `.address` property to get and render the address of the feature.
@@ -278,7 +278,7 @@ class ProcessHandle:
inner: sandbox-specific data inner: sandbox-specific data
""" """
pid: int address: ProcessAddress
inner: Any inner: Any
@@ -292,7 +292,7 @@ class ThreadHandle:
inner: sandbox-specific data inner: sandbox-specific data
""" """
tid: int address: ThreadAddress
inner: Any inner: Any

View File

@@ -13,7 +13,7 @@ import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_ import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process import capa.features.extractors.cape.process
from capa.features.common import Feature from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicFeatureExtractor from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicFeatureExtractor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -30,7 +30,7 @@ class CapeExtractor(DynamicFeatureExtractor):
self.global_features = capa.features.extractors.cape.global_.extract_features(self.static) self.global_features = capa.features.extractors.cape.global_.extract_features(self.static)
def get_base_address(self) -> Address: def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]:
# value according to the PE header, the actual trace may use a different imagebase # value according to the PE header, the actual trace may use a different imagebase
return AbsoluteVirtualAddress(self.static["pe"]["imagebase"]) return AbsoluteVirtualAddress(self.static["pe"]["imagebase"])

View File

@@ -11,7 +11,7 @@ from typing import Dict, Tuple, Iterator
from capa.features.file import Export, Import, Section from capa.features.file import Export, Import, Section
from capa.features.common import String, Feature from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress
from capa.features.extractors.helpers import generate_symbols from capa.features.extractors.helpers import generate_symbols
from capa.features.extractors.base_extractor import ProcessHandle from capa.features.extractors.base_extractor import ProcessHandle
@@ -24,8 +24,9 @@ def get_processes(static: Dict) -> Iterator[ProcessHandle]:
""" """
def rec(process): def rec(process):
inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]} address: ProcessAddress = ProcessAddress(pid=process["pid"], ppid=process["parent_id"])
yield ProcessHandle(pid=process["pid"], inner=inner) inner: Dict[str, str] = {"name": process["name"]}
yield ProcessHandle(address=address, inner=inner)
for child in process["children"]: for child in process["children"]:
yield from rec(child) yield from rec(child)

View File

@@ -23,6 +23,6 @@ def find_process(processes: List[Dict[str, Any]], ph: ProcessHandle) -> Dict[str
""" """
for process in processes: for process in processes:
if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]: if ph.address.ppid == process["parent_id"] and ph.address.pid == process["process_id"]:
return process return process
return {} return {}

View File

@@ -13,7 +13,7 @@ import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_ import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process import capa.features.extractors.cape.process
from capa.features.common import String, Feature from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address from capa.features.address import NO_ADDRESS, Address, ThreadAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -28,7 +28,8 @@ def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[ThreadHandle]:
threads: List = process["threads"] threads: List = process["threads"]
for thread in threads: for thread in threads:
yield ThreadHandle(int(thread), inner={}) address: ThreadAddress = ThreadAddress(process=ph.address, tid=int(thread))
yield ThreadHandle(address=address, inner={})
def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
@@ -42,10 +43,9 @@ def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple
if not environ: if not environ:
return return
for value in environ.values(): for variable, value in environ.items():
if not value: if value:
continue yield String(value), ph.address
yield String(value), NO_ADDRESS
def extract_features(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: def extract_features(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -35,7 +35,7 @@ def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -
process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph) process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph)
calls: List[Dict[str, Any]] = process["calls"] calls: List[Dict[str, Any]] = process["calls"]
tid = str(th.tid) tid = str(th.address.tid)
for call in calls: for call in calls:
if call["thread_id"] != tid: if call["thread_id"] != tid:
continue continue

View File

@@ -1,9 +1,17 @@
from typing import Dict, List, Tuple from typing import Dict, List, Tuple, Union, TypeAlias
from dataclasses import dataclass from dataclasses import dataclass
from capa.features.common import Feature from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address from capa.features.address import NO_ADDRESS, Address, ThreadAddress, ProcessAddress
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor from capa.features.extractors.base_extractor import (
BBHandle,
InsnHandle,
ThreadHandle,
ProcessHandle,
FunctionHandle,
StaticFeatureExtractor,
DynamicFeatureExtractor,
)
@dataclass @dataclass
@@ -24,7 +32,7 @@ class FunctionFeatures:
@dataclass @dataclass
class NullFeatureExtractor(StaticFeatureExtractor): class NullStaticFeatureExtractor(StaticFeatureExtractor):
""" """
An extractor that extracts some user-provided features. An extractor that extracts some user-provided features.
@@ -70,3 +78,51 @@ class NullFeatureExtractor(StaticFeatureExtractor):
def extract_insn_features(self, f, bb, insn): def extract_insn_features(self, f, bb, insn):
for address, feature in self.functions[f.address].basic_blocks[bb.address].instructions[insn.address].features: for address, feature in self.functions[f.address].basic_blocks[bb.address].instructions[insn.address].features:
yield feature, address yield feature, address
@dataclass
class ThreadFeatures:
features: List[Tuple[Address, Feature]]
@dataclass
class ProcessFeatures:
features: List[Tuple[Address, Feature]]
threads: Dict[Address, ThreadFeatures]
@dataclass
class NullDynamicFeatureExtractor(DynamicFeatureExtractor):
base_address: Address
global_features: List[Feature]
file_features: List[Tuple[Address, Feature]]
processes: Dict[Address, ProcessFeatures]
def extract_global_features(self):
for feature in self.global_features:
yield feature, NO_ADDRESS
def extract_file_features(self):
for address, feature in self.file_features:
yield feature, address
def get_processes(self):
for address in sorted(self.processes.keys()):
assert isinstance(address, ProcessAddress)
yield ProcessHandle(address=address, inner={})
def extract_process_features(self, p):
for addr, feature in self.processes[p.address].features:
yield feature, addr
def get_threads(self, p):
for address in sorted(self.processes[p].threads.keys()):
assert isinstance(address, ThreadAddress)
yield ThreadHandle(address=address, inner={})
def extract_thread_features(self, p, t):
for addr, feature in self.processes[p.address].threads[t.address].features:
yield feature, addr
NullFeatureExtractor: TypeAlias = Union[NullStaticFeatureExtractor, NullDynamicFeatureExtractor]

View File

@@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations
import zlib import zlib
import logging import logging
from enum import Enum from enum import Enum
from typing import List, Tuple, Union from typing import Any, List, Tuple, Union, TypeAlias
from pydantic import Field, BaseModel from pydantic import Field, BaseModel
@@ -23,9 +23,10 @@ import capa.features.insn
import capa.features.common import capa.features.common
import capa.features.address import capa.features.address
import capa.features.basicblock import capa.features.basicblock
import capa.features.extractors.null as null
from capa.helpers import assert_never from capa.helpers import assert_never
from capa.features.freeze.features import Feature, feature_from_capa from capa.features.freeze.features import Feature, feature_from_capa
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -41,13 +42,15 @@ class AddressType(str, Enum):
FILE = "file" FILE = "file"
DN_TOKEN = "dn token" DN_TOKEN = "dn token"
DN_TOKEN_OFFSET = "dn token offset" DN_TOKEN_OFFSET = "dn token offset"
PROCESS = "process"
THREAD = "thread"
DYNAMIC = "dynamic" DYNAMIC = "dynamic"
NO_ADDRESS = "no address" NO_ADDRESS = "no address"
class Address(HashableModel): class Address(HashableModel):
type: AddressType type: AddressType
value: Union[int, Tuple[int, int], None] value: Union[int, Tuple[int, ...], None]
@classmethod @classmethod
def from_capa(cls, a: capa.features.address.Address) -> "Address": def from_capa(cls, a: capa.features.address.Address) -> "Address":
@@ -66,6 +69,12 @@ class Address(HashableModel):
elif isinstance(a, capa.features.address.DNTokenOffsetAddress): elif isinstance(a, capa.features.address.DNTokenOffsetAddress):
return cls(type=AddressType.DN_TOKEN_OFFSET, value=(a.token, a.offset)) return cls(type=AddressType.DN_TOKEN_OFFSET, value=(a.token, a.offset))
elif isinstance(a, capa.features.address.ProcessAddress):
return cls(type=AddressType.PROCESS, value=(a.ppid, a.pid))
elif isinstance(a, capa.features.address.ThreadAddress):
return cls(type=AddressType.THREAD, value=(a.process.ppid, a.process.pid, a.tid))
elif isinstance(a, capa.features.address.DynamicAddress): elif isinstance(a, capa.features.address.DynamicAddress):
return cls(type=AddressType.DYNAMIC, value=(a.id, a.return_address)) return cls(type=AddressType.DYNAMIC, value=(a.id, a.return_address))
@@ -105,6 +114,22 @@ class Address(HashableModel):
assert isinstance(offset, int) assert isinstance(offset, int)
return capa.features.address.DNTokenOffsetAddress(token, offset) return capa.features.address.DNTokenOffsetAddress(token, offset)
elif self.type is AddressType.PROCESS:
assert isinstance(self.value, tuple)
ppid, pid = self.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
return capa.features.address.ProcessAddress(ppid=ppid, pid=pid)
elif self.type is AddressType.THREAD:
assert isinstance(self.value, tuple)
ppid, pid, tid = self.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
assert isinstance(tid, int)
proc_addr = capa.features.address.ProcessAddress(ppid=ppid, pid=pid)
return capa.features.address.ThreadAddress(proc_addr, tid=tid)
elif self.type is AddressType.NO_ADDRESS: elif self.type is AddressType.NO_ADDRESS:
return capa.features.address.NO_ADDRESS return capa.features.address.NO_ADDRESS
@@ -135,6 +160,34 @@ class FileFeature(HashableModel):
feature: Feature feature: Feature
class ProcessFeature(HashableModel):
"""
args:
process: the address of the process to which this feature belongs.
address: the address at which this feature is found.
process != address because, e.g., the feature may be found *within* the scope (process).
"""
process: Address
address: Address
feature: Feature
class ThreadFeature(HashableModel):
"""
args:
thread: the address of the thread to which this feature belongs.
address: the address at which this feature is found.
thread != address because, e.g., the feature may be found *within* the scope (thread).
"""
thread: Address
address: Address
feature: Feature
class FunctionFeature(HashableModel): class FunctionFeature(HashableModel):
""" """
args: args:
@@ -203,7 +256,18 @@ class FunctionFeatures(BaseModel):
allow_population_by_field_name = True allow_population_by_field_name = True
class Features(BaseModel): class ThreadFeatures(BaseModel):
address: Address
features: Tuple[ThreadFeature, ...]
class ProcessFeatures(BaseModel):
address: Address
features: Tuple[ProcessFeature, ...]
threads: Tuple[ThreadFeatures, ...]
class StaticFeatures(BaseModel):
global_: Tuple[GlobalFeature, ...] = Field(alias="global") global_: Tuple[GlobalFeature, ...] = Field(alias="global")
file: Tuple[FileFeature, ...] file: Tuple[FileFeature, ...]
functions: Tuple[FunctionFeatures, ...] functions: Tuple[FunctionFeatures, ...]
@@ -212,6 +276,18 @@ class Features(BaseModel):
allow_population_by_field_name = True allow_population_by_field_name = True
class DynamicFeatures(BaseModel):
global_: Tuple[GlobalFeature, ...] = Field(alias="global")
file: Tuple[FileFeature, ...]
processes: Tuple[ProcessFeatures, ...]
class Config:
allow_population_by_field_name = True
Features: TypeAlias = Union[StaticFeatures, DynamicFeatures]
class Extractor(BaseModel): class Extractor(BaseModel):
name: str name: str
version: str = capa.version.__version__ version: str = capa.version.__version__
@@ -230,11 +306,11 @@ class Freeze(BaseModel):
allow_population_by_field_name = True allow_population_by_field_name = True
def dumps(extractor: StaticFeatureExtractor) -> str: def dumps_static(extractor: StaticFeatureExtractor) -> str:
""" """
serialize the given extractor to a string serialize the given extractor to a string
""" """
assert isinstance(extractor, StaticFeatureExtractor)
global_features: List[GlobalFeature] = [] global_features: List[GlobalFeature] = []
for feature, _ in extractor.extract_global_features(): for feature, _ in extractor.extract_global_features():
global_features.append( global_features.append(
@@ -313,7 +389,7 @@ def dumps(extractor: StaticFeatureExtractor) -> str:
# Mypy is unable to recognise `basic_blocks` as a argument due to alias # Mypy is unable to recognise `basic_blocks` as a argument due to alias
) )
features = Features( features = StaticFeatures(
global_=global_features, global_=global_features,
file=tuple(file_features), file=tuple(file_features),
functions=tuple(function_features), functions=tuple(function_features),
@@ -331,15 +407,98 @@ def dumps(extractor: StaticFeatureExtractor) -> str:
return freeze.json() return freeze.json()
def loads(s: str) -> StaticFeatureExtractor: def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
"""deserialize a set of features (as a NullFeatureExtractor) from a string.""" """
import capa.features.extractors.null as null serialize the given extractor to a string
"""
global_features: List[GlobalFeature] = []
for feature, _ in extractor.extract_global_features():
global_features.append(
GlobalFeature(
feature=feature_from_capa(feature),
)
)
file_features: List[FileFeature] = []
for feature, address in extractor.extract_file_features():
file_features.append(
FileFeature(
feature=feature_from_capa(feature),
address=Address.from_capa(address),
)
)
process_features: List[ProcessFeatures] = []
for p in extractor.get_processes():
paddr = Address.from_capa(p.address)
pfeatures = [
ProcessFeature(
process=paddr,
address=Address.from_capa(addr),
feature=feature_from_capa(feature),
)
for feature, addr in extractor.extract_process_features(p)
]
threads = []
for t in extractor.get_threads(p):
taddr = Address.from_capa(t.address)
tfeatures = [
ThreadFeature(
basic_block=taddr,
address=Address.from_capa(addr),
feature=feature_from_capa(feature),
) # type: ignore
# Mypy is unable to recognise `basic_block` as a argument due to alias
for feature, addr in extractor.extract_thread_features(p, t)
]
threads.append(
ThreadFeatures(
address=taddr,
features=tuple(tfeatures),
)
)
process_features.append(
ProcessFeatures(
address=paddr,
features=tuple(pfeatures),
threads=threads,
) # type: ignore
# Mypy is unable to recognise `basic_blocks` as a argument due to alias
)
features = DynamicFeatures(
global_=global_features,
file=tuple(file_features),
processes=tuple(process_features),
) # type: ignore
# Mypy is unable to recognise `global_` as a argument due to alias
# workaround around mypy issue: https://github.com/python/mypy/issues/1424
get_base_addr = getattr(extractor, "get_base_addr", None)
base_addr = get_base_addr() if get_base_addr else capa.features.address.NO_ADDRESS
freeze = Freeze(
version=2,
base_address=Address.from_capa(base_addr),
extractor=Extractor(name=extractor.__class__.__name__),
features=features,
) # type: ignore
# Mypy is unable to recognise `base_address` as a argument due to alias
return freeze.json()
def loads_static(s: str) -> StaticFeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
freeze = Freeze.parse_raw(s) freeze = Freeze.parse_raw(s)
if freeze.version != 2: if freeze.version != 2:
raise ValueError(f"unsupported freeze format version: {freeze.version}") raise ValueError(f"unsupported freeze format version: {freeze.version}")
return null.NullFeatureExtractor( assert isinstance(freeze.features, StaticFeatures)
return null.NullStaticFeatureExtractor(
base_address=freeze.base_address.to_capa(), base_address=freeze.base_address.to_capa(),
global_features=[f.feature.to_capa() for f in freeze.features.global_], global_features=[f.feature.to_capa() for f in freeze.features.global_],
file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file], file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file],
@@ -364,24 +523,69 @@ def loads(s: str) -> StaticFeatureExtractor:
) )
def loads_dynamic(s: str) -> DynamicFeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
freeze = Freeze.parse_raw(s)
if freeze.version != 2:
raise ValueError(f"unsupported freeze format version: {freeze.version}")
assert isinstance(freeze.features, DynamicFeatures)
return null.NullDynamicFeatureExtractor(
base_address=freeze.base_address.to_capa(),
global_features=[f.feature.to_capa() for f in freeze.features.global_],
file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file],
processes={
p.address.to_capa(): null.ProcessFeatures(
features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in p.features],
threads={
t.address.to_capa(): null.ThreadFeatures(
features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in t.features],
)
for t in p.threads
},
)
for p in freeze.features.processes
},
)
MAGIC = "capa0000".encode("ascii") MAGIC = "capa0000".encode("ascii")
STATIC_MAGIC = MAGIC + "-static".encode("ascii")
DYNAMIC_MAGIC = MAGIC + "-dynamic".encode("ascii")
def dump(extractor: FeatureExtractor) -> bytes: def dump(extractor: FeatureExtractor) -> bytes:
"""serialize the given extractor to a byte array.""" """serialize the given extractor to a byte array."""
assert isinstance(extractor, StaticFeatureExtractor) if isinstance(extractor, StaticFeatureExtractor):
return MAGIC + zlib.compress(dumps(extractor).encode("utf-8")) return STATIC_MAGIC + zlib.compress(dumps_static(extractor).encode("utf-8"))
elif isinstance(extractor, DynamicFeatureExtractor):
return DYNAMIC_MAGIC + zlib.compress(dumps_dynamic(extractor).encode("utf-8"))
else:
raise ValueError("Invalid feature extractor")
def is_freeze(buf: bytes) -> bool: def is_freeze(buf: bytes) -> bool:
return buf[: len(MAGIC)] == MAGIC return buf[: len(MAGIC)] == MAGIC
def load(buf: bytes) -> StaticFeatureExtractor: def is_static(buf: bytes) -> bool:
return buf[: len(STATIC_MAGIC)] == STATIC_MAGIC
def is_dynamic(buf: bytes) -> bool:
return buf[: len(DYNAMIC_MAGIC)] == DYNAMIC_MAGIC
def load(buf: bytes):
"""deserialize a set of features (as a NullFeatureExtractor) from a byte array.""" """deserialize a set of features (as a NullFeatureExtractor) from a byte array."""
if not is_freeze(buf): if not is_freeze(buf):
raise ValueError("missing magic header") raise ValueError("missing magic header")
return loads(zlib.decompress(buf[len(MAGIC) :]).decode("utf-8")) if is_static(buf):
return loads_static(zlib.decompress(buf[len(STATIC_MAGIC) :]).decode("utf-8"))
elif is_dynamic(buf):
return loads_dynamic(zlib.decompress(buf[len(DYNAMIC_MAGIC) :]).decode("utf-8"))
else:
raise ValueError("invalid magic header")
def main(argv=None): def main(argv=None):

View File

@@ -799,6 +799,7 @@ def collect_metadata(
format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_ format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_
arch = get_arch(sample_path) arch = get_arch(sample_path)
os_ = get_os(sample_path) if os_ == OS_AUTO else os_ os_ = get_os(sample_path) if os_ == OS_AUTO else os_
base_addr = extractor.get_base_address() if hasattr(extractor, "get_base_address") else NO_ADDRESS
return rdoc.Metadata( return rdoc.Metadata(
timestamp=datetime.datetime.now(), timestamp=datetime.datetime.now(),
@@ -816,7 +817,7 @@ def collect_metadata(
os=os_, os=os_,
extractor=extractor.__class__.__name__, extractor=extractor.__class__.__name__,
rules=tuple(rules_path), rules=tuple(rules_path),
base_address=frz.Address.from_capa(extractor.get_base_address()), base_address=frz.Address.from_capa(base_addr),
layout=rdoc.Layout( layout=rdoc.Layout(
functions=(), functions=(),
# this is updated after capabilities have been collected. # this is updated after capabilities have been collected.
@@ -1270,7 +1271,6 @@ def main(argv=None):
# freeze format deserializes directly into an extractor # freeze format deserializes directly into an extractor
with open(args.sample, "rb") as f: with open(args.sample, "rb") as f:
extractor: FeatureExtractor = frz.load(f.read()) extractor: FeatureExtractor = frz.load(f.read())
assert isinstance(extractor, StaticFeatureExtractor)
else: else:
# all other formats we must create an extractor, # all other formats we must create an extractor,
# such as viv, binary ninja, etc. workspaces # such as viv, binary ninja, etc. workspaces

View File

@@ -250,7 +250,7 @@ def print_dynamic_features(processes, extractor: DynamicFeatureExtractor):
if is_global_feature(feature): if is_global_feature(feature):
continue continue
print(f" thread: {t.tid} {format_address(addr)}: {feature}") print(f" {t.address} {format_address(addr)}: {feature}")
def ida_main(): def ida_main():

View File

@@ -420,14 +420,14 @@ def sample(request):
def get_process(extractor, ppid: int, pid: int) -> ProcessHandle: def get_process(extractor, ppid: int, pid: int) -> ProcessHandle:
for ph in extractor.get_processes(): for ph in extractor.get_processes():
if ph.inner["ppid"] == ppid and ph.pid == pid: if ph.address.ppid == ppid and ph.address.pid == pid:
return ProcessHandle(pid, {"ppid": ppid}) return ph
raise ValueError("process not found") raise ValueError("process not found")
def get_thread(extractor, ph: ProcessHandle, tid: int) -> ThreadHandle: def get_thread(extractor, ph: ProcessHandle, tid: int) -> ThreadHandle:
for th in extractor.get_threads(ph): for th in extractor.get_threads(ph):
if th.tid == tid: if th.address.tid == tid:
return th return th
raise ValueError("thread not found") raise ValueError("thread not found")

View File

@@ -24,7 +24,7 @@ import capa.features.extractors.base_extractor
from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import BBHandle, FunctionHandle from capa.features.extractors.base_extractor import BBHandle, FunctionHandle
EXTRACTOR = capa.features.extractors.null.NullFeatureExtractor( EXTRACTOR = capa.features.extractors.null.NullStaticFeatureExtractor(
base_address=AbsoluteVirtualAddress(0x401000), base_address=AbsoluteVirtualAddress(0x401000),
global_features=[], global_features=[],
file_features=[ file_features=[
@@ -119,8 +119,8 @@ def compare_extractors(a, b):
def test_freeze_str_roundtrip(): def test_freeze_str_roundtrip():
load = capa.features.freeze.loads load = capa.features.freeze.loads_static
dump = capa.features.freeze.dumps dump = capa.features.freeze.dumps_static
reanimated = load(dump(EXTRACTOR)) reanimated = load(dump(EXTRACTOR))
compare_extractors(EXTRACTOR, reanimated) compare_extractors(EXTRACTOR, reanimated)