diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py index 3be983ed..e3b780d1 100644 --- a/capa/features/extractors/base_extractor.py +++ b/capa/features/extractors/base_extractor.py @@ -8,7 +8,7 @@ import abc import dataclasses -from typing import Any, Dict, Tuple, Union, Iterator +from typing import Any, Dict, Tuple, Union, Iterator, TextIO, BinaryIO from dataclasses import dataclass import capa.features.address @@ -262,3 +262,105 @@ class FeatureExtractor: Tuple[Feature, Address]: feature and its location """ raise NotImplementedError() + + +@dataclass +class ProcessHandle: + """ + reference to a process extracted by the sandbox. + + Attributes: + pid: process id + inner: sandbox-specific data + """ + + pid: int + inner: Any + + +@dataclass +class ThreadHandle: + """ + reference to a thread extracted by the sandbox. + + Attributes: + tid: thread id + inner: sandbox-specific data + """ + + tid: int + inner: Any + + +class DynamicExtractor(FeatureExtractor): + """ + DynamicExtractor defines the interface for fetching features from a sandbox' analysis of a sample. + + Features are grouped mainly into threads that alongside their meta-features are also grouped into + processes (that also have their own features). Other scopes (such as function and file) may also apply + for a specific sandbox. + + This class is not instantiated directly; it is the base class for other implementations. + """ + + def __init__(self): + super().__init__() + + @abc.abstractmethod + def get_processes(self) -> Iterator[ProcessHandle]: + """ + Yields all the child-processes of a parent one. + + Attributes: + ph: parent process + """ + raise NotImplementedError() + + @abc.abstractmethod + def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: + """ + Yields all the features of a process. These include: + - file features of the process' image + - inter-process injection + - detected dynamic DLL loading + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_threads(self, ph: ProcessHandle) -> Iterator[ProcessHandle]: + """ + Yields all the threads that a process created. + + Attributes: + ph: parent process + """ + raise NotImplementedError() + + @abc.abstractmethod + def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: + """ + Yields all the features of a thread. These include: + - sequenced api traces + - files/registris interacted with + - network activity + """ + raise NotImplementedError() + + @abc.abstractclassmethod + def from_trace(cls, trace: TextIO) -> "DynamicExtractor": + """ + Most sandboxes provide reports in a serialized text format (i.e. JSON for Cuckoo and CAPE). + This routine takes a file descriptor of such report (analysis trace) and returns a corresponding DynamicExtractor object. + """ + raise NotImplementedError() + + @abc.abstractclassmethod + def submit_sample(cls, sample: BinaryIO, api: Dict[str, str]) -> "DynamicExtractor": + """ + This routine takes a sample and submits it for analysis to the provided api. The trace should then ideally be passed to the from_trace() method. + + Attributes: + sample: file descriptor of the sample + api: contains information such as the uri, api key, etc. + """ + raise NotImplementedError()