diff --git a/capa/features/common.py b/capa/features/common.py index 91764b45..ce26d567 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -458,6 +458,13 @@ FORMAT_AUTO = "auto" FORMAT_SC32 = "sc32" FORMAT_SC64 = "sc64" FORMAT_CAPE = "cape" +STATIC_FORMATS = ( + FORMAT_SC32, + FORMAT_SC64, + FORMAT_PE, + FORMAT_ELF, + FORMAT_DOTNET, +) DYNAMIC_FORMATS = (FORMAT_CAPE,) FORMAT_FREEZE = "freeze" FORMAT_RESULT = "result" diff --git a/capa/main.py b/capa/main.py index 9ce97bc9..7d5e7e9d 100644 --- a/capa/main.py +++ b/capa/main.py @@ -20,6 +20,7 @@ import textwrap import itertools import contextlib import collections +from enum import Enum from typing import Any, Dict, List, Tuple, Callable, Optional from pathlib import Path @@ -78,6 +79,8 @@ from capa.features.common import ( FORMAT_DOTNET, FORMAT_FREEZE, FORMAT_RESULT, + STATIC_FORMATS, + DYNAMIC_FORMATS, ) from capa.features.address import NO_ADDRESS, Address from capa.features.extractors.base_extractor import ( @@ -113,6 +116,15 @@ E_UNSUPPORTED_IDA_VERSION = 19 logger = logging.getLogger("capa") +class ExecutionContext(str, Enum): + STATIC = "static" + DYNAMIC = "dynamic" + + +STATIC_CONTEXT = ExecutionContext.STATIC +DYNAMIC_CONTEXT = ExecutionContext.DYNAMIC + + @contextlib.contextmanager def timing(msg: str): t0 = time.time() @@ -823,6 +835,7 @@ def get_rules( rule_paths: List[RulePath], cache_dir=None, on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default, + analysis_context: ExecutionContext | None = None, ) -> RuleSet: """ args: @@ -861,7 +874,14 @@ def get_rules( rules.append(rule) logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes) - ruleset = capa.rules.RuleSet(rules) + # filter rules according to the execution context + if analysis_context is STATIC_CONTEXT: + ruleset = capa.rules.RuleSet(rules, rules_filter_func=lambda rule: rule.scopes.static) + elif analysis_context is DYNAMIC_CONTEXT: + ruleset = capa.rules.RuleSet(rules, rules_filter_func=lambda rule: rule.scopes.dynamic) + else: + # default: load all rules + ruleset = capa.rules.RuleSet(rules) capa.rules.cache.cache_ruleset(cache_dir, ruleset) @@ -1382,7 +1402,15 @@ def main(argv: Optional[List[str]] = None): else: cache_dir = capa.rules.cache.get_default_cache_directory() - rules = get_rules(args.rules, cache_dir=cache_dir) + if format_ in STATIC_FORMATS: + analysis_context = STATIC_CONTEXT + elif format_ in DYNAMIC_FORMATS: + analysis_context = DYNAMIC_CONTEXT + else: + # freeze or result formats + analysis_context = None + + rules = get_rules(args.rules, cache_dir=cache_dir, analysis_context=analysis_context) logger.debug( "successfully loaded %s rules", diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 8b675ee9..43fb9f7b 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -91,7 +91,6 @@ INSTRUCTION_SCOPE = Scope.INSTRUCTION.value # used only to specify supported features per scope. # not used to validate rules. GLOBAL_SCOPE = "global" -DEV_SCOPE = "dev" # these literals are used to check if the flavor @@ -108,7 +107,6 @@ DYNAMIC_SCOPES = ( GLOBAL_SCOPE, PROCESS_SCOPE, THREAD_SCOPE, - DEV_SCOPE, ) @@ -117,7 +115,7 @@ class Scopes: static: Union[str, None] = None dynamic: Union[str, None] = None - @lru_cache # type: ignore + @lru_cache() # type: ignore def __new__(cls, *args, **kwargs): return super().__new__(cls) @@ -237,12 +235,6 @@ SUPPORTED_FEATURES: Dict[str, Set] = { capa.features.common.Class, capa.features.common.Namespace, }, - DEV_SCOPE: { - # TODO(yelhamer): this is a temporary scope. remove it after support - # for the legacy scope keyword has been added (to rendering). - # https://github.com/mandiant/capa/pull/1580 - capa.features.insn.API, - }, } # global scope features are available in all other scopes @@ -259,10 +251,6 @@ SUPPORTED_FEATURES[PROCESS_SCOPE].update(SUPPORTED_FEATURES[THREAD_SCOPE]) SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE].update(SUPPORTED_FEATURES[INSTRUCTION_SCOPE]) # all basic block scope features are also function scope features SUPPORTED_FEATURES[FUNCTION_SCOPE].update(SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE]) -# dynamic-dev scope contains all features -SUPPORTED_FEATURES[DEV_SCOPE].update(SUPPORTED_FEATURES[FILE_SCOPE]) -SUPPORTED_FEATURES[DEV_SCOPE].update(SUPPORTED_FEATURES[FUNCTION_SCOPE]) -SUPPORTED_FEATURES[DEV_SCOPE].update(SUPPORTED_FEATURES[PROCESS_SCOPE]) class InvalidRule(ValueError): @@ -576,7 +564,7 @@ def build_statements(d, scopes: Scopes): ) elif key == "function": - if (FILE_SCOPE not in scopes) and (DEV_SCOPE not in scopes): + if FILE_SCOPE not in scopes: raise InvalidRule("function subscope supported only for file scope") if len(d[key]) != 1: @@ -587,7 +575,7 @@ def build_statements(d, scopes: Scopes): ) elif key == "basic block": - if (FUNCTION_SCOPE not in scopes) and (DEV_SCOPE not in scopes): + if FUNCTION_SCOPE not in scopes: raise InvalidRule("basic block subscope supported only for function scope") if len(d[key]) != 1: @@ -598,7 +586,7 @@ def build_statements(d, scopes: Scopes): ) elif key == "instruction": - if all(map(lambda s: s not in scopes, (FUNCTION_SCOPE, BASIC_BLOCK_SCOPE, DEV_SCOPE))): + if all(map(lambda s: s not in scopes, (FUNCTION_SCOPE, BASIC_BLOCK_SCOPE))): raise InvalidRule("instruction subscope supported only for function and basic block scope") if len(d[key]) == 1: @@ -820,13 +808,19 @@ class Rule: # the name is a randomly generated, hopefully unique value. # ideally, this won't every be rendered to a user. name = self.name + "/" + uuid.uuid4().hex + if subscope.scope in STATIC_SCOPES: + scopes = Scopes(static=subscope.scope) + elif subscope.scope in DYNAMIC_SCOPES: + scopes = Scopes(dynamic=subscope.scope) + else: + raise InvalidRule(f"scope {subscope.scope} is not a valid subscope") new_rule = Rule( name, - Scopes(subscope.scope, DEV_SCOPE), + scopes, subscope.child, { "name": name, - "scopes": asdict(Scopes(subscope.scope, DEV_SCOPE)), + "scopes": asdict(scopes), # these derived rules are never meant to be inspected separately, # they are dependencies for the parent rule, # so mark it as such. @@ -890,7 +884,11 @@ class Rule: # this is probably the mode that rule authors will start with. # each rule has two scopes, a static-flavor scope, and a # dynamic-flavor one. which one is used depends on the analysis type. - scopes: Scopes = Scopes.from_dict(meta.get("scopes", {"static": "function", "dynamic": "process"})) + scopes_ = meta.get("scopes", {"static": "function", "dynamic": "process"}) + if not isinstance(scopes_, dict): + raise InvalidRule("the scopes field must contain a dictionary specifying the scopes") + + scopes: Scopes = Scopes.from_dict(scopes_) statements = d["rule"]["features"] # the rule must start with a single logic node. @@ -907,14 +905,7 @@ class Rule: if not isinstance(meta.get("mbc", []), list): raise InvalidRule("MBC mapping must be a list") - # TODO(yelhamer): once we've decided on the desired format for mixed-scope statements, - # we should go back and update this accordingly to either: - # - generate one englobing statement. - # - generate two respective statements and store them approriately - # https://github.com/mandiant/capa/pull/1580 - - statement = build_statements(statements[0], scopes) - return cls(name, scopes, statement, meta, definition) + return cls(name, scopes, build_statements(statements[0], scopes), meta, definition) @staticmethod @lru_cache() @@ -1220,9 +1211,18 @@ class RuleSet: capa.engine.match(ruleset.file_rules, ...) """ - def __init__(self, rules: List[Rule]): + def __init__( + self, + rules: List[Rule], + rules_filter_func=None, + ): super().__init__() + if rules_filter_func: + # this allows for filtering the ruleset based on + # the execution context (static or dynamic) + rules = list(filter(rules_filter_func, rules)) + ensure_rules_are_unique(rules) # in the next step we extract subscope rules,