diff --git a/capa/features/address.py b/capa/features/address.py index e6bf88ff..1c741556 100644 --- a/capa/features/address.py +++ b/capa/features/address.py @@ -36,6 +36,58 @@ class AbsoluteVirtualAddress(int, Address): return int.__hash__(self) +class ProcessAddress(Address): + """addresses a processes in a dynamic execution trace""" + + def __init__(self, pid: int, ppid: int = 0): + assert ppid >= 0 + assert pid > 0 + self.ppid = ppid + self.pid = pid + + def __repr__(self): + return "process(%s%s)" % ( + f"ppid: {self.ppid}, " if self.ppid > 0 else "", + f"pid: {self.pid}", + ) + + def __hash__(self): + return hash((self.ppid, self.pid)) + + def __eq__(self, other): + assert isinstance(other, ProcessAddress) + if self.ppid > 0: + return (self.ppid, self.pid) == (other.ppid, other.pid) + else: + return self.pid == other.pid + + def __lt__(self, other): + return (self.ppid, self.pid) < (other.ppid, other.pid) + + +class ThreadAddress(Address): + """addresses a thread in a dynamic execution trace""" + + def __init__(self, process: ProcessAddress, tid: int): + assert tid >= 0 + self.ppid = process.ppid + self.pid = process.pid + self.tid = tid + + def __repr__(self): + return f"thread(tid: {self.tid})" + + def __hash__(self): + return hash((self.ppid, self.pid, self.tid)) + + def __eq__(self, other): + assert isinstance(other, ThreadAddress) + return (self.ppid, self.pid, self.tid) == (other.ppid, other.pid, other.tid) + + def __lt__(self, other): + return (self.ppid, self.pid, self.tid) < (other.ppid, other.pid, other.tid) + + class DynamicAddress(Address): """an address from a dynamic analysis trace""" diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py index 7cac8bbc..836e7216 100644 --- a/capa/features/extractors/base_extractor.py +++ b/capa/features/extractors/base_extractor.py @@ -15,7 +15,7 @@ from typing_extensions import TypeAlias import capa.features.address from capa.features.common import Feature -from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.address import Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress # feature extractors may reference functions, BBs, insns by opaque handle values. # you can use the `.address` property to get and render the address of the feature. @@ -278,7 +278,7 @@ class ProcessHandle: inner: sandbox-specific data """ - pid: int + address: ProcessAddress inner: Any @@ -292,7 +292,7 @@ class ThreadHandle: inner: sandbox-specific data """ - tid: int + address: ThreadAddress inner: Any diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py index f27e3077..2564d0db 100644 --- a/capa/features/extractors/cape/file.py +++ b/capa/features/extractors/cape/file.py @@ -11,7 +11,7 @@ from typing import Dict, Tuple, Iterator from capa.features.file import Export, Import, Section from capa.features.common import String, Feature -from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress +from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress from capa.features.extractors.helpers import generate_symbols from capa.features.extractors.base_extractor import ProcessHandle @@ -24,8 +24,10 @@ def get_processes(static: Dict) -> Iterator[ProcessHandle]: """ def rec(process): - inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]} - yield ProcessHandle(pid=process["pid"], inner=inner) + address: ProcessAddress = ProcessAddress(pid=process["pid"], ppid=process["parent_id"]) + inner: Dict[str, str] = {"name": process["name"]} + print(address) + yield ProcessHandle(address=address, inner=inner) for child in process["children"]: yield from rec(child) diff --git a/capa/features/extractors/cape/helpers.py b/capa/features/extractors/cape/helpers.py index fad9be0e..6595c0b1 100644 --- a/capa/features/extractors/cape/helpers.py +++ b/capa/features/extractors/cape/helpers.py @@ -23,6 +23,6 @@ def find_process(processes: List[Dict[str, Any]], ph: ProcessHandle) -> Dict[str """ for process in processes: - if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]: + if ph.address.ppid == process["parent_id"] and ph.address.pid == process["process_id"]: return process return {} diff --git a/capa/features/extractors/cape/process.py b/capa/features/extractors/cape/process.py index 293401f6..cd29039e 100644 --- a/capa/features/extractors/cape/process.py +++ b/capa/features/extractors/cape/process.py @@ -13,7 +13,7 @@ import capa.features.extractors.cape.thread import capa.features.extractors.cape.global_ import capa.features.extractors.cape.process from capa.features.common import String, Feature -from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress +from capa.features.address import NO_ADDRESS, Address, ThreadAddress from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle logger = logging.getLogger(__name__) @@ -28,7 +28,8 @@ def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[ThreadHandle]: threads: List = process["threads"] for thread in threads: - yield ThreadHandle(int(thread), inner={}) + address: ThreadAddress = ThreadAddress(process=ph.address, tid=int(thread)) + yield ThreadHandle(address=address, inner={}) def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py index 43820df5..003f2acf 100644 --- a/capa/features/extractors/cape/thread.py +++ b/capa/features/extractors/cape/thread.py @@ -35,7 +35,7 @@ def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) - process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph) calls: List[Dict[str, Any]] = process["calls"] - tid = str(th.tid) + tid = str(th.address.tid) for call in calls: if call["thread_id"] != tid: continue diff --git a/capa/features/extractors/null.py b/capa/features/extractors/null.py index 6f58d1b4..6820e6ba 100644 --- a/capa/features/extractors/null.py +++ b/capa/features/extractors/null.py @@ -1,9 +1,17 @@ -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, Union, TypeAlias from dataclasses import dataclass from capa.features.common import Feature from capa.features.address import NO_ADDRESS, Address -from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor +from capa.features.extractors.base_extractor import ( + BBHandle, + InsnHandle, + ThreadHandle, + ProcessHandle, + FunctionHandle, + StaticFeatureExtractor, + DynamicFeatureExtractor, +) @dataclass @@ -24,7 +32,7 @@ class FunctionFeatures: @dataclass -class NullFeatureExtractor(StaticFeatureExtractor): +class NullStaticFeatureExtractor(StaticFeatureExtractor): """ An extractor that extracts some user-provided features. @@ -70,3 +78,49 @@ class NullFeatureExtractor(StaticFeatureExtractor): def extract_insn_features(self, f, bb, insn): for address, feature in self.functions[f.address].basic_blocks[bb.address].instructions[insn.address].features: yield feature, address + + +@dataclass +class ThreadFeatures: + features: List[Tuple[Address, Feature]] + + +@dataclass +class ProcessFeatures: + features: List[Tuple[Address, Feature]] + threads: Dict[Address, ThreadFeatures] + + +@dataclass +class NullDynamicFeatureExtractor(DynamicFeatureExtractor): + base_address: Address + global_features: List[Feature] + file_features: List[Tuple[Address, Feature]] + processes: Dict[Address, ProcessFeatures] + + def extract_global_features(self): + for feature in self.global_features: + yield feature, NO_ADDRESS + + def extract_file_features(self): + for address, feature in self.file_features: + yield feature, address + + def get_processes(self): + for address in sorted(self.processes.keys()): + yield ProcessHandle(address=address, inner={}, pid=address.pid) + + def extract_process_features(self, p): + for addr, feature in self.processes[p.address].features: + yield feature, addr + + def get_threads(self, p): + for address in sorted(self.processes[p].threads.keys()): + yield ThreadHandle(address=address, inner={}, tid=address.pid) + + def extract_thread_features(self, p, t): + for addr, feature in self.processes[p.address].threads[t.address].features: + yield feature, addr + + +NullFeatureExtractor: TypeAlias = Union[NullStaticFeatureExtractor, NullDynamicFeatureExtractor] diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 0f7adc05..b2b41794 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations import zlib import logging from enum import Enum -from typing import Any, List, Tuple, Union +from typing import Any, List, Tuple, Union, TypeAlias from pydantic import Field, BaseModel @@ -23,9 +23,10 @@ import capa.features.insn import capa.features.common import capa.features.address import capa.features.basicblock +import capa.features.extractors.null as null from capa.helpers import assert_never from capa.features.freeze.features import Feature, feature_from_capa -from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor +from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor logger = logging.getLogger(__name__) @@ -41,13 +42,15 @@ class AddressType(str, Enum): FILE = "file" DN_TOKEN = "dn token" DN_TOKEN_OFFSET = "dn token offset" + PROCESS = "process" + THREAD = "thread" DYNAMIC = "dynamic" NO_ADDRESS = "no address" class Address(HashableModel): type: AddressType - value: Union[int, Tuple[int, int], None] + value: Union[int, Tuple[int, int], Tuple[int, int, int], None] @classmethod def from_capa(cls, a: capa.features.address.Address) -> "Address": @@ -66,6 +69,12 @@ class Address(HashableModel): elif isinstance(a, capa.features.address.DNTokenOffsetAddress): return cls(type=AddressType.DN_TOKEN_OFFSET, value=(a.token, a.offset)) + elif isinstance(a, capa.features.address.ProcessAddress): + return cls(type=AddressType.PROCESS, value=(a.ppid, a.pid)) + + elif isinstance(a, capa.features.address.ThreadAddress): + return cls(type=AddressType.THREAD, value=(a.ppid, a.pid, a.tid)) + elif isinstance(a, capa.features.address.DynamicAddress): return cls(type=AddressType.DYNAMIC, value=(a.id, a.return_address)) @@ -104,7 +113,17 @@ class Address(HashableModel): assert isinstance(token, int) assert isinstance(offset, int) return capa.features.address.DNTokenOffsetAddress(token, offset) - + elif self.type is AddressType.PROCESS: + assert isinstance(self.value, tuple) + ppid, pid = self.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + elif self.type is AddressType.THREAD: + assert isinstance(self.value, tuple) + ppid, pid, tid = self.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + assert isinstance(tid, int) elif self.type is AddressType.NO_ADDRESS: return capa.features.address.NO_ADDRESS @@ -135,6 +154,36 @@ class FileFeature(HashableModel): feature: Feature +class ProcessFeature(HashableModel): + """ + args: + process: the address of the process to which this feature belongs. + address: the address at which this feature is found. + + process != address because, e.g., the feature may be found *within* the scope (process). + versus right at its starting address. + """ + + process: Address + address: Address + feature: Feature + + +class ThreadFeature(HashableModel): + """ + args: + thread: the address of the thread to which this feature belongs. + address: the address at which this feature is found. + + thread != address because, e.g., the feature may be found *within* the scope (thread). + versus right at its starting address. + """ + + thread: Address + address: Address + feature: Feature + + class FunctionFeature(HashableModel): """ args: @@ -203,7 +252,18 @@ class FunctionFeatures(BaseModel): allow_population_by_field_name = True -class Features(BaseModel): +class ThreadFeatures(BaseModel): + address: Address + features: Tuple[ThreadFeature, ...] + + +class ProcessFeatures(BaseModel): + address: Address + features: Tuple[ProcessFeature, ...] + threads: Tuple[ThreadFeatures, ...] + + +class StaticFeatures(BaseModel): global_: Tuple[GlobalFeature, ...] = Field(alias="global") file: Tuple[FileFeature, ...] functions: Tuple[FunctionFeatures, ...] @@ -212,6 +272,18 @@ class Features(BaseModel): allow_population_by_field_name = True +class DynamicFeatures(BaseModel): + global_: Tuple[GlobalFeature, ...] = Field(alias="global") + file: Tuple[FileFeature, ...] + processes: Tuple[ProcessFeatures, ...] + + class Config: + allow_population_by_field_name = True + + +Features: TypeAlias = Union[StaticFeatures, DynamicFeatures] + + class Extractor(BaseModel): name: str version: str = capa.version.__version__ @@ -230,7 +302,7 @@ class Freeze(BaseModel): allow_population_by_field_name = True -def dumps(extractor: StaticFeatureExtractor) -> str: +def dumps_static(extractor: StaticFeatureExtractor) -> str: """ serialize the given extractor to a string """ @@ -313,7 +385,7 @@ def dumps(extractor: StaticFeatureExtractor) -> str: # Mypy is unable to recognise `basic_blocks` as a argument due to alias ) - features = Features( + features = StaticFeatures( global_=global_features, file=tuple(file_features), functions=tuple(function_features), @@ -331,15 +403,94 @@ def dumps(extractor: StaticFeatureExtractor) -> str: return freeze.json() -def loads(s: str) -> StaticFeatureExtractor: - """deserialize a set of features (as a NullFeatureExtractor) from a string.""" - import capa.features.extractors.null as null +def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str: + """ + serialize the given extractor to a string + """ + global_features: List[GlobalFeature] = [] + for feature, _ in extractor.extract_global_features(): + global_features.append( + GlobalFeature( + feature=feature_from_capa(feature), + ) + ) + + file_features: List[FileFeature] = [] + for feature, address in extractor.extract_file_features(): + file_features.append( + FileFeature( + feature=feature_from_capa(feature), + address=Address.from_capa(address), + ) + ) + + process_features: List[ProcessFeatures] = [] + for p in extractor.get_processes(): + paddr = Address.from_capa(p.address) + pfeatures = [ + ProcessFeature( + process=paddr, + address=Address.from_capa(addr), + feature=feature_from_capa(feature), + ) + for feature, addr in extractor.extract_process_features(p) + ] + + threads = [] + for t in extractor.get_threads(p): + taddr = Address.from_capa(t.address) + tfeatures = [ + ThreadFeature( + basic_block=taddr, + address=Address.from_capa(addr), + feature=feature_from_capa(feature), + ) # type: ignore + # Mypy is unable to recognise `basic_block` as a argument due to alias + for feature, addr in extractor.extract_thread_features(p, t) + ] + + threads.append( + ThreadFeatures( + address=taddr, + features=tuple(tfeatures), + ) + ) + + process_features.append( + ProcessFeatures( + address=paddr, + features=tuple(pfeatures), + threads=threads, + ) # type: ignore + # Mypy is unable to recognise `basic_blocks` as a argument due to alias + ) + + features = DynamicFeatures( + global_=global_features, + file=tuple(file_features), + processes=tuple(process_features), + ) # type: ignore + # Mypy is unable to recognise `global_` as a argument due to alias + + freeze = Freeze( + version=2, + base_address=Address.from_capa(extractor.get_base_address()) if hasattr(extractor, "get_base_address") else 0, + extractor=Extractor(name=extractor.__class__.__name__), + features=features, + ) # type: ignore + # Mypy is unable to recognise `base_address` as a argument due to alias + + return freeze.json() + + +def loads_static(s: str) -> StaticFeatureExtractor: + """deserialize a set of features (as a NullFeatureExtractor) from a string.""" freeze = Freeze.parse_raw(s) if freeze.version != 2: raise ValueError(f"unsupported freeze format version: {freeze.version}") - return null.NullFeatureExtractor( + return null.NullStaticFeatureExtractor( base_address=freeze.base_address.to_capa(), global_features=[f.feature.to_capa() for f in freeze.features.global_], file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file], @@ -364,24 +515,68 @@ def loads(s: str) -> StaticFeatureExtractor: ) -MAGIC = "capa0000".encode("ascii") +def loads_dynamic(s: str) -> DynamicFeatureExtractor: + """deserialize a set of features (as a NullFeatureExtractor) from a string.""" + freeze = Freeze.parse_raw(s) + if freeze.version != 2: + raise ValueError(f"unsupported freeze format version: {freeze.version}") + + return null.NullDynamicFeatureExtractor( + base_address=freeze.base_address.to_capa(), + global_features=[f.feature.to_capa() for f in freeze.features.global_], + file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file], + processes={ + p.address.to_capa(): null.ProcessFeatures( + features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in p.features], + threads={ + t.address.to_capa(): null.ThreadFeatures( + features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in t.features], + ) + for t in p.threads + }, + ) + for p in freeze.features.processes + }, + ) + + +MAGIC = "capa000".encode("ascii") +STATIC_MAGIC = MAGIC + "0".encode("ascii") +DYNAMIC_MAGIC = MAGIC + "1".encode("ascii") def dump(extractor: FeatureExtractor) -> bytes: """serialize the given extractor to a byte array.""" - assert isinstance(extractor, StaticFeatureExtractor) - return MAGIC + zlib.compress(dumps(extractor).encode("utf-8")) + if isinstance(extractor, StaticFeatureExtractor): + return STATIC_MAGIC + zlib.compress(dumps_static(extractor).encode("utf-8")) + elif isinstance(extractor, DynamicFeatureExtractor): + return DYNAMIC_MAGIC + zlib.compress(dumps_static(extractor).encode("utf-8")) + else: + raise ValueError("Invalid feature extractor") def is_freeze(buf: bytes) -> bool: return buf[: len(MAGIC)] == MAGIC -def load(buf: bytes) -> StaticFeatureExtractor: +def is_static(buf: bytes) -> bool: + return buf[: len(STATIC_MAGIC)] == STATIC_MAGIC + + +def is_dynamic(buf: bytes) -> bool: + return buf[: len(DYNAMIC_MAGIC)] == DYNAMIC_MAGIC + + +def load(buf: bytes) -> null.NullFeatureExtractor: """deserialize a set of features (as a NullFeatureExtractor) from a byte array.""" if not is_freeze(buf): raise ValueError("missing magic header") - return loads(zlib.decompress(buf[len(MAGIC) :]).decode("utf-8")) + if is_static(buf): + return loads_static(zlib.decompress(buf[len(STATIC_MAGIC) :]).decode("utf-8")) + elif is_dynamic(buf): + return loads_dynamic(zlib.decompress(buf[len(DYNAMIC_MAGIC) :]).decode("utf-8")) + else: + raise ValueError("invalid magic header") def main(argv=None): diff --git a/capa/main.py b/capa/main.py index 80a6036d..c6627fc8 100644 --- a/capa/main.py +++ b/capa/main.py @@ -800,6 +800,7 @@ def collect_metadata( format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_ arch = get_arch(sample_path) os_ = get_os(sample_path) if os_ == OS_AUTO else os_ + base_addr = extractor.get_base_address() if hasattr(extractor, "get_base_address") else None return rdoc.Metadata( timestamp=datetime.datetime.now(), @@ -817,7 +818,7 @@ def collect_metadata( os=os_, extractor=extractor.__class__.__name__, rules=tuple(rules_path), - base_address=frz.Address.from_capa(extractor.get_base_address()), + base_address=frz.Address.from_capa(base_addr), layout=rdoc.Layout( functions=tuple(), # this is updated after capabilities have been collected. @@ -1263,7 +1264,6 @@ def main(argv=None): # freeze format deserializes directly into an extractor with open(args.sample, "rb") as f: extractor: FeatureExtractor = frz.load(f.read()) - assert isinstance(extractor, StaticFeatureExtractor) else: # all other formats we must create an extractor, # such as viv, binary ninja, etc. workspaces