RulSet: add flavor-based rule filtering

This commit is contained in:
Yacine Elhamer
2023-07-27 10:24:08 +01:00
parent b843382065
commit d6aced5ec7
3 changed files with 65 additions and 30 deletions

View File

@@ -458,6 +458,13 @@ FORMAT_AUTO = "auto"
FORMAT_SC32 = "sc32"
FORMAT_SC64 = "sc64"
FORMAT_CAPE = "cape"
STATIC_FORMATS = (
FORMAT_SC32,
FORMAT_SC64,
FORMAT_PE,
FORMAT_ELF,
FORMAT_DOTNET,
)
DYNAMIC_FORMATS = (FORMAT_CAPE,)
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"

View File

@@ -20,6 +20,7 @@ import textwrap
import itertools
import contextlib
import collections
from enum import Enum
from typing import Any, Dict, List, Tuple, Callable, Optional
from pathlib import Path
@@ -78,6 +79,8 @@ from capa.features.common import (
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_RESULT,
STATIC_FORMATS,
DYNAMIC_FORMATS,
)
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import (
@@ -113,6 +116,15 @@ E_UNSUPPORTED_IDA_VERSION = 19
logger = logging.getLogger("capa")
class ExecutionContext(str, Enum):
STATIC = "static"
DYNAMIC = "dynamic"
STATIC_CONTEXT = ExecutionContext.STATIC
DYNAMIC_CONTEXT = ExecutionContext.DYNAMIC
@contextlib.contextmanager
def timing(msg: str):
t0 = time.time()
@@ -823,6 +835,7 @@ def get_rules(
rule_paths: List[RulePath],
cache_dir=None,
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
analysis_context: ExecutionContext | None = None,
) -> RuleSet:
"""
args:
@@ -861,7 +874,14 @@ def get_rules(
rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes)
ruleset = capa.rules.RuleSet(rules)
# filter rules according to the execution context
if analysis_context is STATIC_CONTEXT:
ruleset = capa.rules.RuleSet(rules, rules_filter_func=lambda rule: rule.scopes.static)
elif analysis_context is DYNAMIC_CONTEXT:
ruleset = capa.rules.RuleSet(rules, rules_filter_func=lambda rule: rule.scopes.dynamic)
else:
# default: load all rules
ruleset = capa.rules.RuleSet(rules)
capa.rules.cache.cache_ruleset(cache_dir, ruleset)
@@ -1382,7 +1402,15 @@ def main(argv: Optional[List[str]] = None):
else:
cache_dir = capa.rules.cache.get_default_cache_directory()
rules = get_rules(args.rules, cache_dir=cache_dir)
if format_ in STATIC_FORMATS:
analysis_context = STATIC_CONTEXT
elif format_ in DYNAMIC_FORMATS:
analysis_context = DYNAMIC_CONTEXT
else:
# freeze or result formats
analysis_context = None
rules = get_rules(args.rules, cache_dir=cache_dir, analysis_context=analysis_context)
logger.debug(
"successfully loaded %s rules",

View File

@@ -91,7 +91,6 @@ INSTRUCTION_SCOPE = Scope.INSTRUCTION.value
# used only to specify supported features per scope.
# not used to validate rules.
GLOBAL_SCOPE = "global"
DEV_SCOPE = "dev"
# these literals are used to check if the flavor
@@ -108,7 +107,6 @@ DYNAMIC_SCOPES = (
GLOBAL_SCOPE,
PROCESS_SCOPE,
THREAD_SCOPE,
DEV_SCOPE,
)
@@ -117,7 +115,7 @@ class Scopes:
static: Union[str, None] = None
dynamic: Union[str, None] = None
@lru_cache # type: ignore
@lru_cache() # type: ignore
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
@@ -237,12 +235,6 @@ SUPPORTED_FEATURES: Dict[str, Set] = {
capa.features.common.Class,
capa.features.common.Namespace,
},
DEV_SCOPE: {
# TODO(yelhamer): this is a temporary scope. remove it after support
# for the legacy scope keyword has been added (to rendering).
# https://github.com/mandiant/capa/pull/1580
capa.features.insn.API,
},
}
# global scope features are available in all other scopes
@@ -259,10 +251,6 @@ SUPPORTED_FEATURES[PROCESS_SCOPE].update(SUPPORTED_FEATURES[THREAD_SCOPE])
SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE].update(SUPPORTED_FEATURES[INSTRUCTION_SCOPE])
# all basic block scope features are also function scope features
SUPPORTED_FEATURES[FUNCTION_SCOPE].update(SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE])
# dynamic-dev scope contains all features
SUPPORTED_FEATURES[DEV_SCOPE].update(SUPPORTED_FEATURES[FILE_SCOPE])
SUPPORTED_FEATURES[DEV_SCOPE].update(SUPPORTED_FEATURES[FUNCTION_SCOPE])
SUPPORTED_FEATURES[DEV_SCOPE].update(SUPPORTED_FEATURES[PROCESS_SCOPE])
class InvalidRule(ValueError):
@@ -576,7 +564,7 @@ def build_statements(d, scopes: Scopes):
)
elif key == "function":
if (FILE_SCOPE not in scopes) and (DEV_SCOPE not in scopes):
if FILE_SCOPE not in scopes:
raise InvalidRule("function subscope supported only for file scope")
if len(d[key]) != 1:
@@ -587,7 +575,7 @@ def build_statements(d, scopes: Scopes):
)
elif key == "basic block":
if (FUNCTION_SCOPE not in scopes) and (DEV_SCOPE not in scopes):
if FUNCTION_SCOPE not in scopes:
raise InvalidRule("basic block subscope supported only for function scope")
if len(d[key]) != 1:
@@ -598,7 +586,7 @@ def build_statements(d, scopes: Scopes):
)
elif key == "instruction":
if all(map(lambda s: s not in scopes, (FUNCTION_SCOPE, BASIC_BLOCK_SCOPE, DEV_SCOPE))):
if all(map(lambda s: s not in scopes, (FUNCTION_SCOPE, BASIC_BLOCK_SCOPE))):
raise InvalidRule("instruction subscope supported only for function and basic block scope")
if len(d[key]) == 1:
@@ -820,13 +808,19 @@ class Rule:
# the name is a randomly generated, hopefully unique value.
# ideally, this won't every be rendered to a user.
name = self.name + "/" + uuid.uuid4().hex
if subscope.scope in STATIC_SCOPES:
scopes = Scopes(static=subscope.scope)
elif subscope.scope in DYNAMIC_SCOPES:
scopes = Scopes(dynamic=subscope.scope)
else:
raise InvalidRule(f"scope {subscope.scope} is not a valid subscope")
new_rule = Rule(
name,
Scopes(subscope.scope, DEV_SCOPE),
scopes,
subscope.child,
{
"name": name,
"scopes": asdict(Scopes(subscope.scope, DEV_SCOPE)),
"scopes": asdict(scopes),
# these derived rules are never meant to be inspected separately,
# they are dependencies for the parent rule,
# so mark it as such.
@@ -890,7 +884,11 @@ class Rule:
# this is probably the mode that rule authors will start with.
# each rule has two scopes, a static-flavor scope, and a
# dynamic-flavor one. which one is used depends on the analysis type.
scopes: Scopes = Scopes.from_dict(meta.get("scopes", {"static": "function", "dynamic": "process"}))
scopes_ = meta.get("scopes", {"static": "function", "dynamic": "process"})
if not isinstance(scopes_, dict):
raise InvalidRule("the scopes field must contain a dictionary specifying the scopes")
scopes: Scopes = Scopes.from_dict(scopes_)
statements = d["rule"]["features"]
# the rule must start with a single logic node.
@@ -907,14 +905,7 @@ class Rule:
if not isinstance(meta.get("mbc", []), list):
raise InvalidRule("MBC mapping must be a list")
# TODO(yelhamer): once we've decided on the desired format for mixed-scope statements,
# we should go back and update this accordingly to either:
# - generate one englobing statement.
# - generate two respective statements and store them approriately
# https://github.com/mandiant/capa/pull/1580
statement = build_statements(statements[0], scopes)
return cls(name, scopes, statement, meta, definition)
return cls(name, scopes, build_statements(statements[0], scopes), meta, definition)
@staticmethod
@lru_cache()
@@ -1220,9 +1211,18 @@ class RuleSet:
capa.engine.match(ruleset.file_rules, ...)
"""
def __init__(self, rules: List[Rule]):
def __init__(
self,
rules: List[Rule],
rules_filter_func=None,
):
super().__init__()
if rules_filter_func:
# this allows for filtering the ruleset based on
# the execution context (static or dynamic)
rules = list(filter(rules_filter_func, rules))
ensure_rules_are_unique(rules)
# in the next step we extract subscope rules,