diff --git a/capa/engine.py b/capa/engine.py index b28a15a2..0b45dc06 100644 --- a/capa/engine.py +++ b/capa/engine.py @@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Set, Dict, List, Tuple, Mapping, Iterable import capa.perf import capa.features.common from capa.features.common import Result, Feature +from capa.features.address import Address if TYPE_CHECKING: # circular import, otherwise @@ -26,7 +27,7 @@ if TYPE_CHECKING: # to collect the locations of a feature, do: `features[Number(0x10)]` # # aliased here so that the type can be documented and xref'd. -FeatureSet = Dict[Feature, Set[int]] +FeatureSet = Dict[Feature, Set[Address]] class Statement: @@ -257,10 +258,10 @@ class Subscope(Statement): # inspect(match_details) # # aliased here so that the type can be documented and xref'd. -MatchResults = Mapping[str, List[Tuple[int, Result]]] +MatchResults = Mapping[str, List[Tuple[Address, Result]]] -def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: Iterable[int]): +def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: Iterable[Address]): """ record into the given featureset that the given rule matched at the given locations. @@ -277,7 +278,7 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: namespace, _, _ = namespace.rpartition("/") -def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tuple[FeatureSet, MatchResults]: +def match(rules: List["capa.rules.Rule"], features: FeatureSet, addr: Address) -> Tuple[FeatureSet, MatchResults]: """ match the given rules against the given features, returning an updated set of features and the matches. @@ -315,10 +316,10 @@ def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tupl # sanity check assert bool(res) is True - results[rule.name].append((va, res)) + results[rule.name].append((addr, res)) # we need to update the current `features` # because subsequent iterations of this loop may use newly added features, # such as rule or namespace matches. - index_rule_matches(features, rule, [va]) + index_rule_matches(features, rule, [addr]) return (features, results) diff --git a/capa/features/address.py b/capa/features/address.py new file mode 100644 index 00000000..5d96b75c --- /dev/null +++ b/capa/features/address.py @@ -0,0 +1,68 @@ +import abc + +from dncil.clr.token import Token + + +class Address(abc.ABC): + @abc.abstractmethod + def __lt__(self, other): + # implement < so that addresses can be sorted from low to high + ... + + @abc.abstractmethod + def __hash__(self): + # implement hash so that addresses can be used in sets and dicts + ... + + @abc.abstractmethod + def __str__(self): + # implement str so the address can be rendered in capa output + ... + + +class AbsoluteVirtualAddress(int, Address): + """an absolute memory address""" + def __new__(cls, v): + assert v > 0 + return int.__new__(cls, v) + + +class RelativeVirtualAddress(int, Address): + """a memory address relative to a base address""" + + pass + + +class FileOffsetAddress(int, Address): + """an address relative to the start of a file""" + def __new__(cls, v): + assert v > 0 + return int.__new__(cls, v) + + +class DNTokenAddress(Token, Address): + """a .NET token""" + pass + + +class DNTokenOffsetAddress(Address): + """an offset into an object specified by a .NET token""" + + def __init__(self, token: Token, rva: int): + assert rva > 0 + self.token = token + self.rva = rva + + +class _NoAddress(Address): + def __lt__(self, other): + return False + + def __hash__(self): + return hash(0) + + def __str__(self): + return "no address" + + +NO_ADDRESS = _NoAddress() diff --git a/capa/features/common.py b/capa/features/common.py index ed2b288e..e6c5f675 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -11,7 +11,7 @@ import abc import codecs import logging import collections -from typing import TYPE_CHECKING, Set, Dict, List, Union +from typing import TYPE_CHECKING, Set, Dict, List, Union, Optional, Sequence if TYPE_CHECKING: # circular import, otherwise @@ -20,6 +20,7 @@ if TYPE_CHECKING: import capa.perf import capa.features import capa.features.extractors.elf +from capa.features.address import Address logger = logging.getLogger(__name__) MAX_BYTES_FEATURE_SIZE = 0x100 @@ -70,20 +71,13 @@ class Result: success: bool, statement: Union["capa.engine.Statement", "Feature"], children: List["Result"], - locations=None, + locations: Optional[Set[Address]] = None, ): - """ - args: - success (bool) - statement (capa.engine.Statement or capa.features.Feature) - children (list[Result]) - locations (iterable[VA]) - """ super(Result, self).__init__() self.success = success self.statement = statement self.children = children - self.locations = locations if locations is not None else () + self.locations = locations if locations is not None else set() def __eq__(self, other): if isinstance(other, bool): @@ -137,10 +131,10 @@ class Feature(abc.ABC): def __repr__(self): return str(self) - def evaluate(self, ctx: Dict["Feature", Set[int]], **kwargs) -> Result: + def evaluate(self, ctx: Dict["Feature", Set[Address]], **kwargs) -> Result: capa.perf.counters["evaluate.feature"] += 1 capa.perf.counters["evaluate.feature." + self.name] += 1 - return Result(self in ctx, self, [], locations=ctx.get(self, [])) + return Result(self in ctx, self, [], locations=ctx.get(self, set())) def freeze_serialize(self): return (self.__class__.__name__, [self.value]) diff --git a/capa/main.py b/capa/main.py index 5b306c7c..cdb4a164 100644 --- a/capa/main.py +++ b/capa/main.py @@ -63,6 +63,7 @@ from capa.features.common import ( FORMAT_DOTNET, FORMAT_FREEZE, ) +from capa.features.address import NO_ADDRESS from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor RULES_PATH_DEFAULT_STRING = "(embedded rules)" @@ -224,7 +225,7 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi file_features.update(function_features) - _, matches = ruleset.match(Scope.FILE, file_features, 0x0) + _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) return matches, len(file_features) diff --git a/capa/rules.py b/capa/rules.py index f42b56a4..537014e0 100644 --- a/capa/rules.py +++ b/capa/rules.py @@ -12,7 +12,6 @@ import uuid import codecs import logging import binascii -import functools import collections from enum import Enum @@ -40,6 +39,7 @@ import capa.features.common import capa.features.basicblock from capa.engine import Statement, FeatureSet from capa.features.common import MAX_BYTES_FEATURE_SIZE, Feature +from capa.features.address import Address logger = logging.getLogger(__name__) @@ -1284,7 +1284,7 @@ class RuleSet: break return RuleSet(list(rules_filtered)) - def match(self, scope: Scope, features: FeatureSet, va: int) -> Tuple[FeatureSet, ceng.MatchResults]: + def match(self, scope: Scope, features: FeatureSet, addr: Address) -> Tuple[FeatureSet, ceng.MatchResults]: """ match rules from this ruleset at the given scope against the given features. @@ -1316,7 +1316,7 @@ class RuleSet: # first, match against the set of rules that have at least one # feature shared with our feature set. candidate_rules = [self.rules[name] for name in candidate_rule_names] - features2, easy_matches = ceng.match(candidate_rules, features, va) + features2, easy_matches = ceng.match(candidate_rules, features, addr) # note that we've stored the updated feature set in `features2`. # this contains a superset of the features in `features`; @@ -1335,7 +1335,7 @@ class RuleSet: # that we can't really make any guesses about. # these are rules with hard features, like substring/regex/bytes and match statements. hard_rules = [self.rules[name] for name in hard_rule_names] - features3, hard_matches = ceng.match(hard_rules, features2, va) + features3, hard_matches = ceng.match(hard_rules, features2, addr) # note that above, we probably are skipping matching a bunch of # rules that definitely would never hit.