diff --git a/capa/features/extractors/cape/extractor.py b/capa/features/extractors/cape/extractor.py index 5a0b7ce1..854e928a 100644 --- a/capa/features/extractors/cape/extractor.py +++ b/capa/features/extractors/cape/extractor.py @@ -13,7 +13,7 @@ import capa.features.extractors.cape.thread import capa.features.extractors.cape.global_ import capa.features.extractors.cape.process from capa.features.common import Feature -from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress +from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicFeatureExtractor logger = logging.getLogger(__name__) @@ -30,7 +30,7 @@ class CapeExtractor(DynamicFeatureExtractor): self.global_features = capa.features.extractors.cape.global_.extract_features(self.static) - def get_base_address(self) -> Address: + def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]: # value according to the PE header, the actual trace may use a different imagebase return AbsoluteVirtualAddress(self.static["pe"]["imagebase"]) diff --git a/capa/features/extractors/null.py b/capa/features/extractors/null.py index facaa692..ec002c00 100644 --- a/capa/features/extractors/null.py +++ b/capa/features/extractors/null.py @@ -88,7 +88,7 @@ class ThreadFeatures: @dataclass class ProcessFeatures: features: List[Tuple[Address, Feature]] - threads: Dict[ThreadAddress, ThreadFeatures] + threads: Dict[Address, ThreadFeatures] @dataclass @@ -96,7 +96,7 @@ class NullDynamicFeatureExtractor(DynamicFeatureExtractor): base_address: Address global_features: List[Feature] file_features: List[Tuple[Address, Feature]] - processes: Dict[ProcessAddress, ProcessFeatures] + processes: Dict[Address, ProcessFeatures] def extract_global_features(self): for feature in self.global_features: @@ -108,6 +108,7 @@ class NullDynamicFeatureExtractor(DynamicFeatureExtractor): def get_processes(self): for address in sorted(self.processes.keys()): + assert isinstance(address, ProcessAddress) yield ProcessHandle(address=address, inner={}) def extract_process_features(self, p): @@ -116,6 +117,7 @@ class NullDynamicFeatureExtractor(DynamicFeatureExtractor): def get_threads(self, p): for address in sorted(self.processes[p].threads.keys()): + assert isinstance(address, ThreadAddress) yield ThreadHandle(address=address, inner={}) def extract_thread_features(self, p, t): diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 066efec3..97c77185 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -476,9 +476,13 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str: ) # type: ignore # Mypy is unable to recognise `global_` as a argument due to alias + # workaround around mypy issue: https://github.com/python/mypy/issues/1424 + get_base_addr = getattr(extractor, "get_base_addr", None) + base_addr = get_base_addr() if get_base_addr else capa.features.address.NO_ADDRESS + freeze = Freeze( version=2, - base_address=Address.from_capa(extractor.get_base_address()) if hasattr(extractor, "get_base_address") else 0, + base_address=Address.from_capa(base_addr), extractor=Extractor(name=extractor.__class__.__name__), features=features, ) # type: ignore @@ -493,6 +497,7 @@ def loads_static(s: str) -> StaticFeatureExtractor: if freeze.version != 2: raise ValueError(f"unsupported freeze format version: {freeze.version}") + assert isinstance(freeze.features, StaticFeatures) return null.NullStaticFeatureExtractor( base_address=freeze.base_address.to_capa(), global_features=[f.feature.to_capa() for f in freeze.features.global_], @@ -571,7 +576,7 @@ def is_dynamic(buf: bytes) -> bool: return buf[: len(DYNAMIC_MAGIC)] == DYNAMIC_MAGIC -def load(buf: bytes) -> null.NullFeatureExtractor: +def load(buf: bytes): """deserialize a set of features (as a NullFeatureExtractor) from a byte array.""" if not is_freeze(buf): raise ValueError("missing magic header") diff --git a/capa/main.py b/capa/main.py index c6627fc8..7332ea48 100644 --- a/capa/main.py +++ b/capa/main.py @@ -800,7 +800,7 @@ def collect_metadata( format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_ arch = get_arch(sample_path) os_ = get_os(sample_path) if os_ == OS_AUTO else os_ - base_addr = extractor.get_base_address() if hasattr(extractor, "get_base_address") else None + base_addr = extractor.get_base_address() if hasattr(extractor, "get_base_address") else NO_ADDRESS return rdoc.Metadata( timestamp=datetime.datetime.now(), diff --git a/scripts/show-features.py b/scripts/show-features.py index 4054307a..2d9a3de2 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -252,7 +252,7 @@ def print_dynamic_features(processes, extractor: DynamicFeatureExtractor): if is_global_feature(feature): continue - print(f" thread: {t.tid} {format_address(addr)}: {feature}") + print(f" {t.address} {format_address(addr)}: {feature}") def ida_main():