initial commit: add ProcessAddress and ThreadAddress

This commit is contained in:
Yacine Elhamer
2023-07-10 12:14:53 +01:00
parent e140fba5df
commit 5aa1a1afc7
9 changed files with 335 additions and 31 deletions
+52
View File
@@ -36,6 +36,58 @@ class AbsoluteVirtualAddress(int, Address):
return int.__hash__(self)
class ProcessAddress(Address):
"""addresses a processes in a dynamic execution trace"""
def __init__(self, pid: int, ppid: int = 0):
assert ppid >= 0
assert pid > 0
self.ppid = ppid
self.pid = pid
def __repr__(self):
return "process(%s%s)" % (
f"ppid: {self.ppid}, " if self.ppid > 0 else "",
f"pid: {self.pid}",
)
def __hash__(self):
return hash((self.ppid, self.pid))
def __eq__(self, other):
assert isinstance(other, ProcessAddress)
if self.ppid > 0:
return (self.ppid, self.pid) == (other.ppid, other.pid)
else:
return self.pid == other.pid
def __lt__(self, other):
return (self.ppid, self.pid) < (other.ppid, other.pid)
class ThreadAddress(Address):
"""addresses a thread in a dynamic execution trace"""
def __init__(self, process: ProcessAddress, tid: int):
assert tid >= 0
self.ppid = process.ppid
self.pid = process.pid
self.tid = tid
def __repr__(self):
return f"thread(tid: {self.tid})"
def __hash__(self):
return hash((self.ppid, self.pid, self.tid))
def __eq__(self, other):
assert isinstance(other, ThreadAddress)
return (self.ppid, self.pid, self.tid) == (other.ppid, other.pid, other.tid)
def __lt__(self, other):
return (self.ppid, self.pid, self.tid) < (other.ppid, other.pid, other.tid)
class DynamicAddress(Address):
"""an address from a dynamic analysis trace"""
+3 -3
View File
@@ -15,7 +15,7 @@ from typing_extensions import TypeAlias
import capa.features.address
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.address import Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress
# feature extractors may reference functions, BBs, insns by opaque handle values.
# you can use the `.address` property to get and render the address of the feature.
@@ -278,7 +278,7 @@ class ProcessHandle:
inner: sandbox-specific data
"""
pid: int
address: ProcessAddress
inner: Any
@@ -292,7 +292,7 @@ class ThreadHandle:
inner: sandbox-specific data
"""
tid: int
address: ThreadAddress
inner: Any
+5 -3
View File
@@ -11,7 +11,7 @@ from typing import Dict, Tuple, Iterator
from capa.features.file import Export, Import, Section
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress
from capa.features.extractors.helpers import generate_symbols
from capa.features.extractors.base_extractor import ProcessHandle
@@ -24,8 +24,10 @@ def get_processes(static: Dict) -> Iterator[ProcessHandle]:
"""
def rec(process):
inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]}
yield ProcessHandle(pid=process["pid"], inner=inner)
address: ProcessAddress = ProcessAddress(pid=process["pid"], ppid=process["parent_id"])
inner: Dict[str, str] = {"name": process["name"]}
print(address)
yield ProcessHandle(address=address, inner=inner)
for child in process["children"]:
yield from rec(child)
+1 -1
View File
@@ -23,6 +23,6 @@ def find_process(processes: List[Dict[str, Any]], ph: ProcessHandle) -> Dict[str
"""
for process in processes:
if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]:
if ph.address.ppid == process["parent_id"] and ph.address.pid == process["process_id"]:
return process
return {}
+3 -2
View File
@@ -13,7 +13,7 @@ import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
from capa.features.address import NO_ADDRESS, Address, ThreadAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)
@@ -28,7 +28,8 @@ def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[ThreadHandle]:
threads: List = process["threads"]
for thread in threads:
yield ThreadHandle(int(thread), inner={})
address: ThreadAddress = ThreadAddress(process=ph.address, tid=int(thread))
yield ThreadHandle(address=address, inner={})
def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
+1 -1
View File
@@ -35,7 +35,7 @@ def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -
process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph)
calls: List[Dict[str, Any]] = process["calls"]
tid = str(th.tid)
tid = str(th.address.tid)
for call in calls:
if call["thread_id"] != tid:
continue
+57 -3
View File
@@ -1,9 +1,17 @@
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Union, TypeAlias
from dataclasses import dataclass
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor
from capa.features.extractors.base_extractor import (
BBHandle,
InsnHandle,
ThreadHandle,
ProcessHandle,
FunctionHandle,
StaticFeatureExtractor,
DynamicFeatureExtractor,
)
@dataclass
@@ -24,7 +32,7 @@ class FunctionFeatures:
@dataclass
class NullFeatureExtractor(StaticFeatureExtractor):
class NullStaticFeatureExtractor(StaticFeatureExtractor):
"""
An extractor that extracts some user-provided features.
@@ -70,3 +78,49 @@ class NullFeatureExtractor(StaticFeatureExtractor):
def extract_insn_features(self, f, bb, insn):
for address, feature in self.functions[f.address].basic_blocks[bb.address].instructions[insn.address].features:
yield feature, address
@dataclass
class ThreadFeatures:
features: List[Tuple[Address, Feature]]
@dataclass
class ProcessFeatures:
features: List[Tuple[Address, Feature]]
threads: Dict[Address, ThreadFeatures]
@dataclass
class NullDynamicFeatureExtractor(DynamicFeatureExtractor):
base_address: Address
global_features: List[Feature]
file_features: List[Tuple[Address, Feature]]
processes: Dict[Address, ProcessFeatures]
def extract_global_features(self):
for feature in self.global_features:
yield feature, NO_ADDRESS
def extract_file_features(self):
for address, feature in self.file_features:
yield feature, address
def get_processes(self):
for address in sorted(self.processes.keys()):
yield ProcessHandle(address=address, inner={}, pid=address.pid)
def extract_process_features(self, p):
for addr, feature in self.processes[p.address].features:
yield feature, addr
def get_threads(self, p):
for address in sorted(self.processes[p].threads.keys()):
yield ThreadHandle(address=address, inner={}, tid=address.pid)
def extract_thread_features(self, p, t):
for addr, feature in self.processes[p.address].threads[t.address].features:
yield feature, addr
NullFeatureExtractor: TypeAlias = Union[NullStaticFeatureExtractor, NullDynamicFeatureExtractor]
+211 -16
View File
@@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations
import zlib
import logging
from enum import Enum
from typing import Any, List, Tuple, Union
from typing import Any, List, Tuple, Union, TypeAlias
from pydantic import Field, BaseModel
@@ -23,9 +23,10 @@ import capa.features.insn
import capa.features.common
import capa.features.address
import capa.features.basicblock
import capa.features.extractors.null as null
from capa.helpers import assert_never
from capa.features.freeze.features import Feature, feature_from_capa
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor
logger = logging.getLogger(__name__)
@@ -41,13 +42,15 @@ class AddressType(str, Enum):
FILE = "file"
DN_TOKEN = "dn token"
DN_TOKEN_OFFSET = "dn token offset"
PROCESS = "process"
THREAD = "thread"
DYNAMIC = "dynamic"
NO_ADDRESS = "no address"
class Address(HashableModel):
type: AddressType
value: Union[int, Tuple[int, int], None]
value: Union[int, Tuple[int, int], Tuple[int, int, int], None]
@classmethod
def from_capa(cls, a: capa.features.address.Address) -> "Address":
@@ -66,6 +69,12 @@ class Address(HashableModel):
elif isinstance(a, capa.features.address.DNTokenOffsetAddress):
return cls(type=AddressType.DN_TOKEN_OFFSET, value=(a.token, a.offset))
elif isinstance(a, capa.features.address.ProcessAddress):
return cls(type=AddressType.PROCESS, value=(a.ppid, a.pid))
elif isinstance(a, capa.features.address.ThreadAddress):
return cls(type=AddressType.THREAD, value=(a.ppid, a.pid, a.tid))
elif isinstance(a, capa.features.address.DynamicAddress):
return cls(type=AddressType.DYNAMIC, value=(a.id, a.return_address))
@@ -104,7 +113,17 @@ class Address(HashableModel):
assert isinstance(token, int)
assert isinstance(offset, int)
return capa.features.address.DNTokenOffsetAddress(token, offset)
elif self.type is AddressType.PROCESS:
assert isinstance(self.value, tuple)
ppid, pid = self.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
elif self.type is AddressType.THREAD:
assert isinstance(self.value, tuple)
ppid, pid, tid = self.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
assert isinstance(tid, int)
elif self.type is AddressType.NO_ADDRESS:
return capa.features.address.NO_ADDRESS
@@ -135,6 +154,36 @@ class FileFeature(HashableModel):
feature: Feature
class ProcessFeature(HashableModel):
"""
args:
process: the address of the process to which this feature belongs.
address: the address at which this feature is found.
process != address because, e.g., the feature may be found *within* the scope (process).
versus right at its starting address.
"""
process: Address
address: Address
feature: Feature
class ThreadFeature(HashableModel):
"""
args:
thread: the address of the thread to which this feature belongs.
address: the address at which this feature is found.
thread != address because, e.g., the feature may be found *within* the scope (thread).
versus right at its starting address.
"""
thread: Address
address: Address
feature: Feature
class FunctionFeature(HashableModel):
"""
args:
@@ -203,7 +252,18 @@ class FunctionFeatures(BaseModel):
allow_population_by_field_name = True
class Features(BaseModel):
class ThreadFeatures(BaseModel):
address: Address
features: Tuple[ThreadFeature, ...]
class ProcessFeatures(BaseModel):
address: Address
features: Tuple[ProcessFeature, ...]
threads: Tuple[ThreadFeatures, ...]
class StaticFeatures(BaseModel):
global_: Tuple[GlobalFeature, ...] = Field(alias="global")
file: Tuple[FileFeature, ...]
functions: Tuple[FunctionFeatures, ...]
@@ -212,6 +272,18 @@ class Features(BaseModel):
allow_population_by_field_name = True
class DynamicFeatures(BaseModel):
global_: Tuple[GlobalFeature, ...] = Field(alias="global")
file: Tuple[FileFeature, ...]
processes: Tuple[ProcessFeatures, ...]
class Config:
allow_population_by_field_name = True
Features: TypeAlias = Union[StaticFeatures, DynamicFeatures]
class Extractor(BaseModel):
name: str
version: str = capa.version.__version__
@@ -230,7 +302,7 @@ class Freeze(BaseModel):
allow_population_by_field_name = True
def dumps(extractor: StaticFeatureExtractor) -> str:
def dumps_static(extractor: StaticFeatureExtractor) -> str:
"""
serialize the given extractor to a string
"""
@@ -313,7 +385,7 @@ def dumps(extractor: StaticFeatureExtractor) -> str:
# Mypy is unable to recognise `basic_blocks` as a argument due to alias
)
features = Features(
features = StaticFeatures(
global_=global_features,
file=tuple(file_features),
functions=tuple(function_features),
@@ -331,15 +403,94 @@ def dumps(extractor: StaticFeatureExtractor) -> str:
return freeze.json()
def loads(s: str) -> StaticFeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
import capa.features.extractors.null as null
def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
"""
serialize the given extractor to a string
"""
global_features: List[GlobalFeature] = []
for feature, _ in extractor.extract_global_features():
global_features.append(
GlobalFeature(
feature=feature_from_capa(feature),
)
)
file_features: List[FileFeature] = []
for feature, address in extractor.extract_file_features():
file_features.append(
FileFeature(
feature=feature_from_capa(feature),
address=Address.from_capa(address),
)
)
process_features: List[ProcessFeatures] = []
for p in extractor.get_processes():
paddr = Address.from_capa(p.address)
pfeatures = [
ProcessFeature(
process=paddr,
address=Address.from_capa(addr),
feature=feature_from_capa(feature),
)
for feature, addr in extractor.extract_process_features(p)
]
threads = []
for t in extractor.get_threads(p):
taddr = Address.from_capa(t.address)
tfeatures = [
ThreadFeature(
basic_block=taddr,
address=Address.from_capa(addr),
feature=feature_from_capa(feature),
) # type: ignore
# Mypy is unable to recognise `basic_block` as a argument due to alias
for feature, addr in extractor.extract_thread_features(p, t)
]
threads.append(
ThreadFeatures(
address=taddr,
features=tuple(tfeatures),
)
)
process_features.append(
ProcessFeatures(
address=paddr,
features=tuple(pfeatures),
threads=threads,
) # type: ignore
# Mypy is unable to recognise `basic_blocks` as a argument due to alias
)
features = DynamicFeatures(
global_=global_features,
file=tuple(file_features),
processes=tuple(process_features),
) # type: ignore
# Mypy is unable to recognise `global_` as a argument due to alias
freeze = Freeze(
version=2,
base_address=Address.from_capa(extractor.get_base_address()) if hasattr(extractor, "get_base_address") else 0,
extractor=Extractor(name=extractor.__class__.__name__),
features=features,
) # type: ignore
# Mypy is unable to recognise `base_address` as a argument due to alias
return freeze.json()
def loads_static(s: str) -> StaticFeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
freeze = Freeze.parse_raw(s)
if freeze.version != 2:
raise ValueError(f"unsupported freeze format version: {freeze.version}")
return null.NullFeatureExtractor(
return null.NullStaticFeatureExtractor(
base_address=freeze.base_address.to_capa(),
global_features=[f.feature.to_capa() for f in freeze.features.global_],
file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file],
@@ -364,24 +515,68 @@ def loads(s: str) -> StaticFeatureExtractor:
)
MAGIC = "capa0000".encode("ascii")
def loads_dynamic(s: str) -> DynamicFeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
freeze = Freeze.parse_raw(s)
if freeze.version != 2:
raise ValueError(f"unsupported freeze format version: {freeze.version}")
return null.NullDynamicFeatureExtractor(
base_address=freeze.base_address.to_capa(),
global_features=[f.feature.to_capa() for f in freeze.features.global_],
file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file],
processes={
p.address.to_capa(): null.ProcessFeatures(
features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in p.features],
threads={
t.address.to_capa(): null.ThreadFeatures(
features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in t.features],
)
for t in p.threads
},
)
for p in freeze.features.processes
},
)
MAGIC = "capa000".encode("ascii")
STATIC_MAGIC = MAGIC + "0".encode("ascii")
DYNAMIC_MAGIC = MAGIC + "1".encode("ascii")
def dump(extractor: FeatureExtractor) -> bytes:
"""serialize the given extractor to a byte array."""
assert isinstance(extractor, StaticFeatureExtractor)
return MAGIC + zlib.compress(dumps(extractor).encode("utf-8"))
if isinstance(extractor, StaticFeatureExtractor):
return STATIC_MAGIC + zlib.compress(dumps_static(extractor).encode("utf-8"))
elif isinstance(extractor, DynamicFeatureExtractor):
return DYNAMIC_MAGIC + zlib.compress(dumps_static(extractor).encode("utf-8"))
else:
raise ValueError("Invalid feature extractor")
def is_freeze(buf: bytes) -> bool:
return buf[: len(MAGIC)] == MAGIC
def load(buf: bytes) -> StaticFeatureExtractor:
def is_static(buf: bytes) -> bool:
return buf[: len(STATIC_MAGIC)] == STATIC_MAGIC
def is_dynamic(buf: bytes) -> bool:
return buf[: len(DYNAMIC_MAGIC)] == DYNAMIC_MAGIC
def load(buf: bytes) -> null.NullFeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a byte array."""
if not is_freeze(buf):
raise ValueError("missing magic header")
return loads(zlib.decompress(buf[len(MAGIC) :]).decode("utf-8"))
if is_static(buf):
return loads_static(zlib.decompress(buf[len(STATIC_MAGIC) :]).decode("utf-8"))
elif is_dynamic(buf):
return loads_dynamic(zlib.decompress(buf[len(DYNAMIC_MAGIC) :]).decode("utf-8"))
else:
raise ValueError("invalid magic header")
def main(argv=None):
+2 -2
View File
@@ -800,6 +800,7 @@ def collect_metadata(
format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_
arch = get_arch(sample_path)
os_ = get_os(sample_path) if os_ == OS_AUTO else os_
base_addr = extractor.get_base_address() if hasattr(extractor, "get_base_address") else None
return rdoc.Metadata(
timestamp=datetime.datetime.now(),
@@ -817,7 +818,7 @@ def collect_metadata(
os=os_,
extractor=extractor.__class__.__name__,
rules=tuple(rules_path),
base_address=frz.Address.from_capa(extractor.get_base_address()),
base_address=frz.Address.from_capa(base_addr),
layout=rdoc.Layout(
functions=tuple(),
# this is updated after capabilities have been collected.
@@ -1263,7 +1264,6 @@ def main(argv=None):
# freeze format deserializes directly into an extractor
with open(args.sample, "rb") as f:
extractor: FeatureExtractor = frz.load(f.read())
assert isinstance(extractor, StaticFeatureExtractor)
else:
# all other formats we must create an extractor,
# such as viv, binary ninja, etc. workspaces