add abstract DynamicExtractor class

This commit is contained in:
Yacine Elhamer
2023-06-08 23:15:29 +00:00
parent 0559e61af1
commit 35e53e9691

View File

@@ -8,7 +8,7 @@
import abc
import dataclasses
from typing import Any, Dict, Tuple, Union, Iterator
from typing import Any, Dict, Tuple, Union, Iterator, TextIO, BinaryIO
from dataclasses import dataclass
import capa.features.address
@@ -262,3 +262,105 @@ class FeatureExtractor:
Tuple[Feature, Address]: feature and its location
"""
raise NotImplementedError()
@dataclass
class ProcessHandle:
"""
reference to a process extracted by the sandbox.
Attributes:
pid: process id
inner: sandbox-specific data
"""
pid: int
inner: Any
@dataclass
class ThreadHandle:
"""
reference to a thread extracted by the sandbox.
Attributes:
tid: thread id
inner: sandbox-specific data
"""
tid: int
inner: Any
class DynamicExtractor(FeatureExtractor):
"""
DynamicExtractor defines the interface for fetching features from a sandbox' analysis of a sample.
Features are grouped mainly into threads that alongside their meta-features are also grouped into
processes (that also have their own features). Other scopes (such as function and file) may also apply
for a specific sandbox.
This class is not instantiated directly; it is the base class for other implementations.
"""
def __init__(self):
super().__init__()
@abc.abstractmethod
def get_processes(self) -> Iterator[ProcessHandle]:
"""
Yields all the child-processes of a parent one.
Attributes:
ph: parent process
"""
raise NotImplementedError()
@abc.abstractmethod
def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
"""
Yields all the features of a process. These include:
- file features of the process' image
- inter-process injection
- detected dynamic DLL loading
"""
raise NotImplementedError()
@abc.abstractmethod
def get_threads(self, ph: ProcessHandle) -> Iterator[ProcessHandle]:
"""
Yields all the threads that a process created.
Attributes:
ph: parent process
"""
raise NotImplementedError()
@abc.abstractmethod
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
"""
Yields all the features of a thread. These include:
- sequenced api traces
- files/registris interacted with
- network activity
"""
raise NotImplementedError()
@abc.abstractclassmethod
def from_trace(cls, trace: TextIO) -> "DynamicExtractor":
"""
Most sandboxes provide reports in a serialized text format (i.e. JSON for Cuckoo and CAPE).
This routine takes a file descriptor of such report (analysis trace) and returns a corresponding DynamicExtractor object.
"""
raise NotImplementedError()
@abc.abstractclassmethod
def submit_sample(cls, sample: BinaryIO, api: Dict[str, str]) -> "DynamicExtractor":
"""
This routine takes a sample and submits it for analysis to the provided api. The trace should then ideally be passed to the from_trace() method.
Attributes:
sample: file descriptor of the sample
api: contains information such as the uri, api key, etc.
"""
raise NotImplementedError()