diff --git a/CHANGELOG.md b/CHANGELOG.md index 931b408f..6d28f23d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,14 @@ - Add a CAPE file format and CAPE-based dynamic feature extraction to scripts/show-features.py #1566 @yelhamer - Add a new process scope for the dynamic analysis flavor #1517 @yelhamer - Add a new thread scope for the dynamic analysis flavor #1517 @yelhamer +<<<<<<< HEAD - use fancy box drawing characters for default output #1586 @williballenthin - use [pre-commit](https://pre-commit.com/) to invoke linters #1579 @williballenthin - publish via PyPI trusted publishing #1491 @williballenthin - migrate to pyproject.toml #1301 @williballenthin +======= +- Add ProcessesAddress and ThreadAddress #1612 @yelhamer +>>>>>>> 64a16314abef9647f1729a4b5c3e2c21c41e1f9f ### Breaking Changes - Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat diff --git a/capa/features/address.py b/capa/features/address.py index e6bf88ff..d2706e99 100644 --- a/capa/features/address.py +++ b/capa/features/address.py @@ -36,6 +36,54 @@ class AbsoluteVirtualAddress(int, Address): return int.__hash__(self) +class ProcessAddress(Address): + """an address of a process in a dynamic execution trace""" + + def __init__(self, pid: int, ppid: int = 0): + assert ppid >= 0 + assert pid > 0 + self.ppid = ppid + self.pid = pid + + def __repr__(self): + return "process(%s%s)" % ( + f"ppid: {self.ppid}, " if self.ppid > 0 else "", + f"pid: {self.pid}", + ) + + def __hash__(self): + return hash((self.ppid, self.pid)) + + def __eq__(self, other): + assert isinstance(other, ProcessAddress) + return (self.ppid, self.pid) == (other.ppid, other.pid) + + def __lt__(self, other): + return (self.ppid, self.pid) < (other.ppid, other.pid) + + +class ThreadAddress(Address): + """addresses a thread in a dynamic execution trace""" + + def __init__(self, process: ProcessAddress, tid: int): + assert tid >= 0 + self.process = process + self.tid = tid + + def __repr__(self): + return f"thread(tid: {self.tid})" + + def __hash__(self): + return hash((self.process, self.tid)) + + def __eq__(self, other): + assert isinstance(other, ThreadAddress) + return (self.process, self.tid) == (other.process, other.tid) + + def __lt__(self, other): + return (self.process, self.tid) < (other.process, other.tid) + + class DynamicAddress(Address): """an address from a dynamic analysis trace""" diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py index 7cac8bbc..836e7216 100644 --- a/capa/features/extractors/base_extractor.py +++ b/capa/features/extractors/base_extractor.py @@ -15,7 +15,7 @@ from typing_extensions import TypeAlias import capa.features.address from capa.features.common import Feature -from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.address import Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress # feature extractors may reference functions, BBs, insns by opaque handle values. # you can use the `.address` property to get and render the address of the feature. @@ -278,7 +278,7 @@ class ProcessHandle: inner: sandbox-specific data """ - pid: int + address: ProcessAddress inner: Any @@ -292,7 +292,7 @@ class ThreadHandle: inner: sandbox-specific data """ - tid: int + address: ThreadAddress inner: Any diff --git a/capa/features/extractors/cape/extractor.py b/capa/features/extractors/cape/extractor.py index beeb22fd..5cf00484 100644 --- a/capa/features/extractors/cape/extractor.py +++ b/capa/features/extractors/cape/extractor.py @@ -13,7 +13,7 @@ import capa.features.extractors.cape.thread import capa.features.extractors.cape.global_ import capa.features.extractors.cape.process from capa.features.common import Feature -from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicFeatureExtractor logger = logging.getLogger(__name__) @@ -30,7 +30,7 @@ class CapeExtractor(DynamicFeatureExtractor): self.global_features = capa.features.extractors.cape.global_.extract_features(self.static) - def get_base_address(self) -> Address: + def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]: # value according to the PE header, the actual trace may use a different imagebase return AbsoluteVirtualAddress(self.static["pe"]["imagebase"]) diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py index f27e3077..5cacb5f6 100644 --- a/capa/features/extractors/cape/file.py +++ b/capa/features/extractors/cape/file.py @@ -11,7 +11,7 @@ from typing import Dict, Tuple, Iterator from capa.features.file import Export, Import, Section from capa.features.common import String, Feature -from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress +from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress from capa.features.extractors.helpers import generate_symbols from capa.features.extractors.base_extractor import ProcessHandle @@ -24,8 +24,9 @@ def get_processes(static: Dict) -> Iterator[ProcessHandle]: """ def rec(process): - inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]} - yield ProcessHandle(pid=process["pid"], inner=inner) + address: ProcessAddress = ProcessAddress(pid=process["pid"], ppid=process["parent_id"]) + inner: Dict[str, str] = {"name": process["name"]} + yield ProcessHandle(address=address, inner=inner) for child in process["children"]: yield from rec(child) diff --git a/capa/features/extractors/cape/helpers.py b/capa/features/extractors/cape/helpers.py index fad9be0e..6595c0b1 100644 --- a/capa/features/extractors/cape/helpers.py +++ b/capa/features/extractors/cape/helpers.py @@ -23,6 +23,6 @@ def find_process(processes: List[Dict[str, Any]], ph: ProcessHandle) -> Dict[str """ for process in processes: - if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]: + if ph.address.ppid == process["parent_id"] and ph.address.pid == process["process_id"]: return process return {} diff --git a/capa/features/extractors/cape/process.py b/capa/features/extractors/cape/process.py index ec2cd124..f384e1d6 100644 --- a/capa/features/extractors/cape/process.py +++ b/capa/features/extractors/cape/process.py @@ -13,7 +13,7 @@ import capa.features.extractors.cape.thread import capa.features.extractors.cape.global_ import capa.features.extractors.cape.process from capa.features.common import String, Feature -from capa.features.address import NO_ADDRESS, Address +from capa.features.address import NO_ADDRESS, Address, ThreadAddress from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle logger = logging.getLogger(__name__) @@ -28,7 +28,8 @@ def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[ThreadHandle]: threads: List = process["threads"] for thread in threads: - yield ThreadHandle(int(thread), inner={}) + address: ThreadAddress = ThreadAddress(process=ph.address, tid=int(thread)) + yield ThreadHandle(address=address, inner={}) def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: @@ -42,10 +43,9 @@ def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple if not environ: return - for value in environ.values(): - if not value: - continue - yield String(value), NO_ADDRESS + for variable, value in environ.items(): + if value: + yield String(value), ph.address def extract_features(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py index d9439d2c..bc45a169 100644 --- a/capa/features/extractors/cape/thread.py +++ b/capa/features/extractors/cape/thread.py @@ -35,7 +35,7 @@ def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) - process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph) calls: List[Dict[str, Any]] = process["calls"] - tid = str(th.tid) + tid = str(th.address.tid) for call in calls: if call["thread_id"] != tid: continue diff --git a/capa/features/extractors/null.py b/capa/features/extractors/null.py index 6f58d1b4..ec002c00 100644 --- a/capa/features/extractors/null.py +++ b/capa/features/extractors/null.py @@ -1,9 +1,17 @@ -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, Union, TypeAlias from dataclasses import dataclass from capa.features.common import Feature -from capa.features.address import NO_ADDRESS, Address -from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor +from capa.features.address import NO_ADDRESS, Address, ThreadAddress, ProcessAddress +from capa.features.extractors.base_extractor import ( + BBHandle, + InsnHandle, + ThreadHandle, + ProcessHandle, + FunctionHandle, + StaticFeatureExtractor, + DynamicFeatureExtractor, +) @dataclass @@ -24,7 +32,7 @@ class FunctionFeatures: @dataclass -class NullFeatureExtractor(StaticFeatureExtractor): +class NullStaticFeatureExtractor(StaticFeatureExtractor): """ An extractor that extracts some user-provided features. @@ -70,3 +78,51 @@ class NullFeatureExtractor(StaticFeatureExtractor): def extract_insn_features(self, f, bb, insn): for address, feature in self.functions[f.address].basic_blocks[bb.address].instructions[insn.address].features: yield feature, address + + +@dataclass +class ThreadFeatures: + features: List[Tuple[Address, Feature]] + + +@dataclass +class ProcessFeatures: + features: List[Tuple[Address, Feature]] + threads: Dict[Address, ThreadFeatures] + + +@dataclass +class NullDynamicFeatureExtractor(DynamicFeatureExtractor): + base_address: Address + global_features: List[Feature] + file_features: List[Tuple[Address, Feature]] + processes: Dict[Address, ProcessFeatures] + + def extract_global_features(self): + for feature in self.global_features: + yield feature, NO_ADDRESS + + def extract_file_features(self): + for address, feature in self.file_features: + yield feature, address + + def get_processes(self): + for address in sorted(self.processes.keys()): + assert isinstance(address, ProcessAddress) + yield ProcessHandle(address=address, inner={}) + + def extract_process_features(self, p): + for addr, feature in self.processes[p.address].features: + yield feature, addr + + def get_threads(self, p): + for address in sorted(self.processes[p].threads.keys()): + assert isinstance(address, ThreadAddress) + yield ThreadHandle(address=address, inner={}) + + def extract_thread_features(self, p, t): + for addr, feature in self.processes[p.address].threads[t.address].features: + yield feature, addr + + +NullFeatureExtractor: TypeAlias = Union[NullStaticFeatureExtractor, NullDynamicFeatureExtractor] diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index d42da1f0..39bf9415 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations import zlib import logging from enum import Enum -from typing import List, Tuple, Union +from typing import Any, List, Tuple, Union, TypeAlias from pydantic import Field, BaseModel @@ -23,9 +23,10 @@ import capa.features.insn import capa.features.common import capa.features.address import capa.features.basicblock +import capa.features.extractors.null as null from capa.helpers import assert_never from capa.features.freeze.features import Feature, feature_from_capa -from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor +from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor logger = logging.getLogger(__name__) @@ -41,13 +42,15 @@ class AddressType(str, Enum): FILE = "file" DN_TOKEN = "dn token" DN_TOKEN_OFFSET = "dn token offset" + PROCESS = "process" + THREAD = "thread" DYNAMIC = "dynamic" NO_ADDRESS = "no address" class Address(HashableModel): type: AddressType - value: Union[int, Tuple[int, int], None] + value: Union[int, Tuple[int, ...], None] @classmethod def from_capa(cls, a: capa.features.address.Address) -> "Address": @@ -66,6 +69,12 @@ class Address(HashableModel): elif isinstance(a, capa.features.address.DNTokenOffsetAddress): return cls(type=AddressType.DN_TOKEN_OFFSET, value=(a.token, a.offset)) + elif isinstance(a, capa.features.address.ProcessAddress): + return cls(type=AddressType.PROCESS, value=(a.ppid, a.pid)) + + elif isinstance(a, capa.features.address.ThreadAddress): + return cls(type=AddressType.THREAD, value=(a.process.ppid, a.process.pid, a.tid)) + elif isinstance(a, capa.features.address.DynamicAddress): return cls(type=AddressType.DYNAMIC, value=(a.id, a.return_address)) @@ -105,6 +114,22 @@ class Address(HashableModel): assert isinstance(offset, int) return capa.features.address.DNTokenOffsetAddress(token, offset) + elif self.type is AddressType.PROCESS: + assert isinstance(self.value, tuple) + ppid, pid = self.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + return capa.features.address.ProcessAddress(ppid=ppid, pid=pid) + + elif self.type is AddressType.THREAD: + assert isinstance(self.value, tuple) + ppid, pid, tid = self.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + assert isinstance(tid, int) + proc_addr = capa.features.address.ProcessAddress(ppid=ppid, pid=pid) + return capa.features.address.ThreadAddress(proc_addr, tid=tid) + elif self.type is AddressType.NO_ADDRESS: return capa.features.address.NO_ADDRESS @@ -135,6 +160,34 @@ class FileFeature(HashableModel): feature: Feature +class ProcessFeature(HashableModel): + """ + args: + process: the address of the process to which this feature belongs. + address: the address at which this feature is found. + + process != address because, e.g., the feature may be found *within* the scope (process). + """ + + process: Address + address: Address + feature: Feature + + +class ThreadFeature(HashableModel): + """ + args: + thread: the address of the thread to which this feature belongs. + address: the address at which this feature is found. + + thread != address because, e.g., the feature may be found *within* the scope (thread). + """ + + thread: Address + address: Address + feature: Feature + + class FunctionFeature(HashableModel): """ args: @@ -203,7 +256,18 @@ class FunctionFeatures(BaseModel): allow_population_by_field_name = True -class Features(BaseModel): +class ThreadFeatures(BaseModel): + address: Address + features: Tuple[ThreadFeature, ...] + + +class ProcessFeatures(BaseModel): + address: Address + features: Tuple[ProcessFeature, ...] + threads: Tuple[ThreadFeatures, ...] + + +class StaticFeatures(BaseModel): global_: Tuple[GlobalFeature, ...] = Field(alias="global") file: Tuple[FileFeature, ...] functions: Tuple[FunctionFeatures, ...] @@ -212,6 +276,18 @@ class Features(BaseModel): allow_population_by_field_name = True +class DynamicFeatures(BaseModel): + global_: Tuple[GlobalFeature, ...] = Field(alias="global") + file: Tuple[FileFeature, ...] + processes: Tuple[ProcessFeatures, ...] + + class Config: + allow_population_by_field_name = True + + +Features: TypeAlias = Union[StaticFeatures, DynamicFeatures] + + class Extractor(BaseModel): name: str version: str = capa.version.__version__ @@ -230,11 +306,11 @@ class Freeze(BaseModel): allow_population_by_field_name = True -def dumps(extractor: StaticFeatureExtractor) -> str: +def dumps_static(extractor: StaticFeatureExtractor) -> str: """ serialize the given extractor to a string """ - + assert isinstance(extractor, StaticFeatureExtractor) global_features: List[GlobalFeature] = [] for feature, _ in extractor.extract_global_features(): global_features.append( @@ -313,7 +389,7 @@ def dumps(extractor: StaticFeatureExtractor) -> str: # Mypy is unable to recognise `basic_blocks` as a argument due to alias ) - features = Features( + features = StaticFeatures( global_=global_features, file=tuple(file_features), functions=tuple(function_features), @@ -331,15 +407,98 @@ def dumps(extractor: StaticFeatureExtractor) -> str: return freeze.json() -def loads(s: str) -> StaticFeatureExtractor: - """deserialize a set of features (as a NullFeatureExtractor) from a string.""" - import capa.features.extractors.null as null +def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str: + """ + serialize the given extractor to a string + """ + global_features: List[GlobalFeature] = [] + for feature, _ in extractor.extract_global_features(): + global_features.append( + GlobalFeature( + feature=feature_from_capa(feature), + ) + ) + file_features: List[FileFeature] = [] + for feature, address in extractor.extract_file_features(): + file_features.append( + FileFeature( + feature=feature_from_capa(feature), + address=Address.from_capa(address), + ) + ) + + process_features: List[ProcessFeatures] = [] + for p in extractor.get_processes(): + paddr = Address.from_capa(p.address) + pfeatures = [ + ProcessFeature( + process=paddr, + address=Address.from_capa(addr), + feature=feature_from_capa(feature), + ) + for feature, addr in extractor.extract_process_features(p) + ] + + threads = [] + for t in extractor.get_threads(p): + taddr = Address.from_capa(t.address) + tfeatures = [ + ThreadFeature( + basic_block=taddr, + address=Address.from_capa(addr), + feature=feature_from_capa(feature), + ) # type: ignore + # Mypy is unable to recognise `basic_block` as a argument due to alias + for feature, addr in extractor.extract_thread_features(p, t) + ] + + threads.append( + ThreadFeatures( + address=taddr, + features=tuple(tfeatures), + ) + ) + + process_features.append( + ProcessFeatures( + address=paddr, + features=tuple(pfeatures), + threads=threads, + ) # type: ignore + # Mypy is unable to recognise `basic_blocks` as a argument due to alias + ) + + features = DynamicFeatures( + global_=global_features, + file=tuple(file_features), + processes=tuple(process_features), + ) # type: ignore + # Mypy is unable to recognise `global_` as a argument due to alias + + # workaround around mypy issue: https://github.com/python/mypy/issues/1424 + get_base_addr = getattr(extractor, "get_base_addr", None) + base_addr = get_base_addr() if get_base_addr else capa.features.address.NO_ADDRESS + + freeze = Freeze( + version=2, + base_address=Address.from_capa(base_addr), + extractor=Extractor(name=extractor.__class__.__name__), + features=features, + ) # type: ignore + # Mypy is unable to recognise `base_address` as a argument due to alias + + return freeze.json() + + +def loads_static(s: str) -> StaticFeatureExtractor: + """deserialize a set of features (as a NullFeatureExtractor) from a string.""" freeze = Freeze.parse_raw(s) if freeze.version != 2: raise ValueError(f"unsupported freeze format version: {freeze.version}") - return null.NullFeatureExtractor( + assert isinstance(freeze.features, StaticFeatures) + return null.NullStaticFeatureExtractor( base_address=freeze.base_address.to_capa(), global_features=[f.feature.to_capa() for f in freeze.features.global_], file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file], @@ -364,24 +523,69 @@ def loads(s: str) -> StaticFeatureExtractor: ) +def loads_dynamic(s: str) -> DynamicFeatureExtractor: + """deserialize a set of features (as a NullFeatureExtractor) from a string.""" + freeze = Freeze.parse_raw(s) + if freeze.version != 2: + raise ValueError(f"unsupported freeze format version: {freeze.version}") + + assert isinstance(freeze.features, DynamicFeatures) + return null.NullDynamicFeatureExtractor( + base_address=freeze.base_address.to_capa(), + global_features=[f.feature.to_capa() for f in freeze.features.global_], + file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file], + processes={ + p.address.to_capa(): null.ProcessFeatures( + features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in p.features], + threads={ + t.address.to_capa(): null.ThreadFeatures( + features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in t.features], + ) + for t in p.threads + }, + ) + for p in freeze.features.processes + }, + ) + + MAGIC = "capa0000".encode("ascii") +STATIC_MAGIC = MAGIC + "-static".encode("ascii") +DYNAMIC_MAGIC = MAGIC + "-dynamic".encode("ascii") def dump(extractor: FeatureExtractor) -> bytes: """serialize the given extractor to a byte array.""" - assert isinstance(extractor, StaticFeatureExtractor) - return MAGIC + zlib.compress(dumps(extractor).encode("utf-8")) + if isinstance(extractor, StaticFeatureExtractor): + return STATIC_MAGIC + zlib.compress(dumps_static(extractor).encode("utf-8")) + elif isinstance(extractor, DynamicFeatureExtractor): + return DYNAMIC_MAGIC + zlib.compress(dumps_dynamic(extractor).encode("utf-8")) + else: + raise ValueError("Invalid feature extractor") def is_freeze(buf: bytes) -> bool: return buf[: len(MAGIC)] == MAGIC -def load(buf: bytes) -> StaticFeatureExtractor: +def is_static(buf: bytes) -> bool: + return buf[: len(STATIC_MAGIC)] == STATIC_MAGIC + + +def is_dynamic(buf: bytes) -> bool: + return buf[: len(DYNAMIC_MAGIC)] == DYNAMIC_MAGIC + + +def load(buf: bytes): """deserialize a set of features (as a NullFeatureExtractor) from a byte array.""" if not is_freeze(buf): raise ValueError("missing magic header") - return loads(zlib.decompress(buf[len(MAGIC) :]).decode("utf-8")) + if is_static(buf): + return loads_static(zlib.decompress(buf[len(STATIC_MAGIC) :]).decode("utf-8")) + elif is_dynamic(buf): + return loads_dynamic(zlib.decompress(buf[len(DYNAMIC_MAGIC) :]).decode("utf-8")) + else: + raise ValueError("invalid magic header") def main(argv=None): diff --git a/capa/main.py b/capa/main.py index 8ff1a9ac..366dfdae 100644 --- a/capa/main.py +++ b/capa/main.py @@ -799,6 +799,7 @@ def collect_metadata( format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_ arch = get_arch(sample_path) os_ = get_os(sample_path) if os_ == OS_AUTO else os_ + base_addr = extractor.get_base_address() if hasattr(extractor, "get_base_address") else NO_ADDRESS return rdoc.Metadata( timestamp=datetime.datetime.now(), @@ -816,7 +817,7 @@ def collect_metadata( os=os_, extractor=extractor.__class__.__name__, rules=tuple(rules_path), - base_address=frz.Address.from_capa(extractor.get_base_address()), + base_address=frz.Address.from_capa(base_addr), layout=rdoc.Layout( functions=(), # this is updated after capabilities have been collected. @@ -1270,7 +1271,6 @@ def main(argv=None): # freeze format deserializes directly into an extractor with open(args.sample, "rb") as f: extractor: FeatureExtractor = frz.load(f.read()) - assert isinstance(extractor, StaticFeatureExtractor) else: # all other formats we must create an extractor, # such as viv, binary ninja, etc. workspaces diff --git a/scripts/show-features.py b/scripts/show-features.py index a47997f2..5076189d 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -250,7 +250,7 @@ def print_dynamic_features(processes, extractor: DynamicFeatureExtractor): if is_global_feature(feature): continue - print(f" thread: {t.tid} {format_address(addr)}: {feature}") + print(f" {t.address} {format_address(addr)}: {feature}") def ida_main(): diff --git a/tests/fixtures.py b/tests/fixtures.py index 1b36d236..75b611b7 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -420,14 +420,14 @@ def sample(request): def get_process(extractor, ppid: int, pid: int) -> ProcessHandle: for ph in extractor.get_processes(): - if ph.inner["ppid"] == ppid and ph.pid == pid: - return ProcessHandle(pid, {"ppid": ppid}) + if ph.address.ppid == ppid and ph.address.pid == pid: + return ph raise ValueError("process not found") def get_thread(extractor, ph: ProcessHandle, tid: int) -> ThreadHandle: for th in extractor.get_threads(ph): - if th.tid == tid: + if th.address.tid == tid: return th raise ValueError("thread not found") diff --git a/tests/test_freeze.py b/tests/test_static_freeze.py similarity index 97% rename from tests/test_freeze.py rename to tests/test_static_freeze.py index 62a1aacd..60a806a1 100644 --- a/tests/test_freeze.py +++ b/tests/test_static_freeze.py @@ -24,7 +24,7 @@ import capa.features.extractors.base_extractor from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.extractors.base_extractor import BBHandle, FunctionHandle -EXTRACTOR = capa.features.extractors.null.NullFeatureExtractor( +EXTRACTOR = capa.features.extractors.null.NullStaticFeatureExtractor( base_address=AbsoluteVirtualAddress(0x401000), global_features=[], file_features=[ @@ -119,8 +119,8 @@ def compare_extractors(a, b): def test_freeze_str_roundtrip(): - load = capa.features.freeze.loads - dump = capa.features.freeze.dumps + load = capa.features.freeze.loads_static + dump = capa.features.freeze.dumps_static reanimated = load(dump(EXTRACTOR)) compare_extractors(EXTRACTOR, reanimated)