sequence: refactor into SequenceMatcher

contains the call ids for all the calls within the sequence, so we know
where to look for related matched.

sequence: refactor SequenceMatcher

sequence: don't use sequence addresses

sequence: remove sequence address
This commit is contained in:
Willi Ballenthin
2024-12-18 12:54:11 +00:00
committed by Willi Ballenthin
parent 39319c57a4
commit f55086c212
9 changed files with 296 additions and 91 deletions

View File

@@ -80,6 +80,89 @@ class ThreadCapabilities:
call_matches: MatchResults
class SequenceMatcher:
def __init__(self, ruleset: RuleSet):
super().__init__()
self.ruleset = ruleset
# matches found at the sequence scope.
self.matches: MatchResults = collections.defaultdict(list)
# We matches sequences as the sliding window of calls with size SEQUENCE_SIZE.
#
# For each call, we consider the window of SEQUENCE_SIZE calls leading up to it,
# merging all their features and doing a match.
#
# We track these features in two data structures:
# 1. a deque of those features found in the prior calls.
# We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed.
# 2. a live set of features seen in the sequence.
# As we pop from the deque, we remove features from the current set,
# and as we push to the deque, we insert features to the current set.
# With this approach, our algorithm performance is independent of SEQUENCE_SIZE.
# The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SEQUENCE_SIZE
# (that is, runtime gets slower the larger SEQUENCE_SIZE is).
self.current_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE)
self.current_features: FeatureSet = collections.defaultdict(set)
# the names of rules matched at the last sequence,
# so that we can deduplicate long strings of the same matche.
self.last_sequence_matches: set[str] = set()
def next(self, ch: CallHandle, call_features: FeatureSet):
# As we add items to the end of the deque, overflow and drop the oldest items (at the left end).
# While we could rely on `deque.append` with `maxlen` set (which we provide above),
# we want to use the dropped item first, to remove the old features, so we manually pop it here.
if len(self.current_feature_sets) == SEQUENCE_SIZE:
overflowing_feature_set = self.current_feature_sets.popleft()
for feature, vas in overflowing_feature_set.items():
if len(vas) == 1 and isinstance(next(iter(vas)), _NoAddress):
# `vas == { NO_ADDRESS }` without the garbage.
#
# ignore the common case of global features getting added/removed/trimmed repeatedly,
# like arch/os/format.
continue
feature_vas = self.current_features[feature]
feature_vas.difference_update(vas)
if not feature_vas:
del self.current_features[feature]
# update the deque and set of features with the latest call's worth of features.
self.current_feature_sets.append(call_features)
for feature, vas in call_features.items():
self.current_features[feature].update(vas)
_, matches = self.ruleset.match(Scope.SEQUENCE, self.current_features, ch.address)
newly_encountered_rules = set(matches.keys()) - self.last_sequence_matches
# don't emit match results for rules seen during the immediately preceeding sequence.
#
# This means that we won't emit duplicate matches when there are multiple sequences
# that overlap a single matching event.
# It also handles the case of a tight loop containing matched logic;
# only the first match will be recorded.
#
# In theory, this means the result document doesn't have *every* possible match location,
# but in practice, humans will only be interested in the first handful anyways.
suppressed_rules = set(self.last_sequence_matches)
# however, if a newly encountered rule depends on a suppressed rule,
# don't suppress that rule match, or we won't be able to reconstruct the vverbose output.
# see: https://github.com/mandiant/capa/pull/2532#issuecomment-2548508130
for new_rule in newly_encountered_rules:
suppressed_rules -= set(self.ruleset.rules[new_rule].get_dependencies(self.ruleset.rules_by_namespace))
for rule_name, res in matches.items():
if rule_name in suppressed_rules:
continue
self.matches[rule_name].extend(res)
self.last_sequence_matches = set(matches.keys())
def find_thread_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle
) -> ThreadCapabilities:
@@ -95,31 +178,11 @@ def find_thread_capabilities(
# might be found at different calls, that's ok.
call_matches: MatchResults = collections.defaultdict(list)
# matches found at the sequence scope.
sequence_matches: MatchResults = collections.defaultdict(list)
# We matches sequences as the sliding window of calls with size SEQUENCE_SIZE.
#
# For each call, we consider the window of SEQUENCE_SIZE calls leading up to it,
# merging all their features and doing a match.
#
# We track these features in two data structures:
# 1. a deque of those features found in the prior calls.
# We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed.
# 2. a live set of features seen in the sequence.
# As we pop from the deque, we remove features from the current set,
# and as we push to the deque, we insert features to the current set.
# With this approach, our algorithm performance is independent of SEQUENCE_SIZE.
# The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SEQUENCE_SIZE
# (that is, runtime gets slower the larger SEQUENCE_SIZE is).
sequence_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE)
sequence_features: FeatureSet = collections.defaultdict(set)
# the names of rules matched at the last sequence,
# so that we can deduplicate long strings of the same matche.
last_sequence_matches: set[str] = set()
sequence_matcher = SequenceMatcher(ruleset)
call_count = 0
for ch in extractor.get_calls(ph, th):
call_count += 1
call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in call_capabilities.features.items():
features[feature].update(vas)
@@ -127,50 +190,7 @@ def find_thread_capabilities(
for rule_name, res in call_capabilities.matches.items():
call_matches[rule_name].extend(res)
#
# sequence scope matching
#
# As we add items to the end of the deque, overflow and drop the oldest items (at the left end).
# While we could rely on `deque.append` with `maxlen` set (which we provide above),
# we want to use the dropped item first, to remove the old features, so we manually pop it here.
if len(sequence_feature_sets) == SEQUENCE_SIZE:
overflowing_feature_set = sequence_feature_sets.popleft()
for feature, vas in overflowing_feature_set.items():
if len(vas) == 1 and isinstance(next(iter(vas)), _NoAddress):
# `vas == { NO_ADDRESS }` without the garbage.
#
# ignore the common case of global features getting added/removed/trimmed repeatedly,
# like arch/os/format.
continue
feature_vas = sequence_features[feature]
feature_vas.difference_update(vas)
if not feature_vas:
del sequence_features[feature]
# update the deque and set of features with the latest call's worth of features.
latest_features = call_capabilities.features
sequence_feature_sets.append(latest_features)
for feature, vas in latest_features.items():
sequence_features[feature].update(vas)
_, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, ch.address)
for rule_name, res in smatches.items():
if rule_name in last_sequence_matches:
# don't emit match results for rules seen during the immediately preceeding sequence.
#
# This means that we won't emit duplicate matches when there are multiple sequences
# that overlap a single matching event.
# It also handles the case of a tight loop containing matched logic;
# only the first match will be recorded.
#
# In theory, this means the result document doesn't have *every* possible match location,
# but in practice, humans will only be interested in the first handful anyways.
continue
sequence_matches[rule_name].extend(res)
last_sequence_matches = set(smatches.keys())
sequence_matcher.next(ch, call_capabilities.features)
for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()):
features[feature].add(va)
@@ -183,13 +203,22 @@ def find_thread_capabilities(
for va, _ in res:
capa.engine.index_rule_matches(features, rule, [va])
return ThreadCapabilities(features, matches, sequence_matches, call_matches)
logger.debug(
"analyzed thread %d[%d] with %d events, %d features, and %d matches",
th.address.process.pid,
th.address.tid,
call_count,
len(features),
len(matches) + len(sequence_matcher.matches) + len(call_matches),
)
return ThreadCapabilities(features, matches, sequence_matcher.matches, call_matches)
@dataclass
class ProcessCapabilities:
process_matches: MatchResults
thread_matches: MatchResults
sequence_matches: MatchResults
call_matches: MatchResults
feature_count: int
@@ -234,7 +263,14 @@ def find_process_capabilities(
process_features[feature].add(va)
_, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address)
return ProcessCapabilities(process_matches, thread_matches, call_matches, len(process_features))
logger.debug(
"analyzed process %d and extracted %d features with %d matches",
ph.address.pid,
len(process_features),
len(process_matches),
)
return ProcessCapabilities(process_matches, thread_matches, sequence_matches, call_matches, len(process_features))
def find_dynamic_capabilities(
@@ -262,7 +298,6 @@ def find_dynamic_capabilities(
address=frz.Address.from_capa(p.address), count=process_capabilities.feature_count
),
)
logger.debug("analyzed %s and extracted %d features", p.address, process_capabilities.feature_count)
for rule_name, res in process_capabilities.process_matches.items():
all_process_matches[rule_name].extend(res)

View File

@@ -114,8 +114,7 @@ class DynamicCallAddress(Address):
return hash((self.thread, self.id))
def __eq__(self, other):
assert isinstance(other, DynamicCallAddress)
return (self.thread, self.id) == (other.thread, other.id)
return isinstance(other, DynamicCallAddress) and (self.thread, self.id) == (other.thread, other.id)
def __lt__(self, other):
assert isinstance(other, DynamicCallAddress)

View File

@@ -64,7 +64,14 @@ class AddressType(str, Enum):
class Address(HashableModel):
type: AddressType
value: Union[int, tuple[int, ...], None] = None # None default value to support deserialization of NO_ADDRESS
value: Union[
# for absolute, relative, file
int,
# for DNToken, Process, Thread, Call
tuple[int, ...],
# for NO_ADDRESS,
None,
] = None # None default value to support deserialization of NO_ADDRESS
@classmethod
def from_capa(cls, a: capa.features.address.Address) -> "Address":

View File

@@ -1,11 +1,22 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: capa/render/proto/capa.proto
# Protobuf Python Version: 5.28.3
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
5,
28,
3,
'',
'capa/render/proto/capa.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@@ -18,15 +29,15 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x63\x61pa/ren
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'capa.render.proto.capa_pb2', _globals)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_globals['_MATCH_CAPTURESENTRY']._options = None
if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None
_globals['_MATCH_CAPTURESENTRY']._loaded_options = None
_globals['_MATCH_CAPTURESENTRY']._serialized_options = b'8\001'
_globals['_METADATA'].fields_by_name['analysis']._options = None
_globals['_METADATA'].fields_by_name['analysis']._loaded_options = None
_globals['_METADATA'].fields_by_name['analysis']._serialized_options = b'\030\001'
_globals['_RESULTDOCUMENT_RULESENTRY']._options = None
_globals['_RESULTDOCUMENT_RULESENTRY']._loaded_options = None
_globals['_RESULTDOCUMENT_RULESENTRY']._serialized_options = b'8\001'
_globals['_RULEMETADATA'].fields_by_name['scope']._options = None
_globals['_RULEMETADATA'].fields_by_name['scope']._loaded_options = None
_globals['_RULEMETADATA'].fields_by_name['scope']._serialized_options = b'\030\001'
_globals['_ADDRESSTYPE']._serialized_start=9062
_globals['_ADDRESSTYPE']._serialized_end=9336

View File

@@ -1,6 +1,19 @@
"""
@generated by mypy-protobuf. Do not edit manually!
isort:skip_file
Copyright 2023 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import builtins

View File

@@ -29,6 +29,7 @@ import capa.features.freeze.features as frzf
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.helpers import assert_never, load_json_from_path
from capa.features.address import DynamicCallAddress
if TYPE_CHECKING:
from capa.capabilities.common import Capabilities
@@ -392,7 +393,33 @@ class Match(FrozenModel):
)
for location in result.locations:
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
# keep this in sync with the copy below
if isinstance(location, DynamicCallAddress):
if location in rule_matches:
# exact match, such as matching a call-scoped rule.
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
# we'd like to assert the scope of the current rule is "sequence"
# but we don't have that data here.
else:
# Sequence scopes can match each other, but they don't strictly contain each other,
# like the way a function contains a basic block.
# So when we have a match within a sequence for another sequence, we need to look
# for all the places it might be found.
#
# Despite the edge cases (like API hammering), this turns out to be pretty easy:
# collect the most recent match (with the given name) prior to the wanted location.
matches_in_thread = sorted([
(a.id, m) for a, m in rule_matches.items()
if isinstance(a, DynamicCallAddress)
and a.thread == location.thread
and a.id <= location.id
])
_, most_recent_match = matches_in_thread[-1]
children.append(Match.from_capa(rules, capabilities, most_recent_match))
else:
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
else:
# this is a namespace that we're matching
#
@@ -433,8 +460,23 @@ class Match(FrozenModel):
# this is a subset of doc[locations].
#
# so, grab only the locations for current rule.
if location in rule_matches:
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
# keep this in sync with the block above.
if isinstance(location, DynamicCallAddress):
if location in rule_matches:
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
else:
matches_in_thread = sorted([
(a.id, m) for a, m in rule_matches.items()
if isinstance(a, DynamicCallAddress)
and a.thread == location.thread
and a.id <= location.id
])
_, most_recent_match = matches_in_thread[-1]
children.append(Match.from_capa(rules, capabilities, most_recent_match))
else:
if location in rule_matches:
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
return cls(
success=success,

View File

@@ -43,7 +43,6 @@ from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.render.utils import Console
def format_address(address: frz.Address) -> str:
if address.type == frz.AddressType.ABSOLUTE:
assert isinstance(address.value, int)

View File

@@ -230,7 +230,8 @@ def render_feature(
# if we're in call scope, then the call will have been rendered at the top
# of the output, so don't re-render it again for each feature.
pass
elif isinstance(feature, (frzf.OSFeature, frzf.ArchFeature, frzf.FormatFeature)):
elif isinstance(layout, rd.DynamicLayout) and isinstance(feature, frzf.MatchFeature):
# don't render copies of the sequence address for submatches
pass
else:
render_locations(console, layout, match.locations, indent)
@@ -311,13 +312,13 @@ def render_match(
render_match(console, layout, rule, child, indent=indent + 1, mode=child_mode)
def collect_call_locations(
def collect_sequence_locations(
match: rd.Match,
mode=MODE_SUCCESS,
):
"""
Find all the DynamicCallAddress locations in the given match, recursively.
Useful to collect the calls used to match a sequence scoped rule.
Find all the (call, sequence) locations used in a given sequence match, recursively.
Useful to collect the events used to match a sequence scoped rule.
"""
if isinstance(match.node, rd.StatementNode):
if (
@@ -326,13 +327,13 @@ def collect_call_locations(
):
child_mode = MODE_FAILURE if mode == MODE_SUCCESS else MODE_SUCCESS
for child in match.children:
yield from collect_call_locations(child, child_mode)
yield from collect_sequence_locations(child, child_mode)
else:
for child in match.children:
yield from collect_call_locations(child, mode)
yield from collect_sequence_locations(child, mode)
elif isinstance(match.node, rd.FeatureNode):
for location in match.locations:
if location.type != frz.AddressType.CALL:
if location.type not in (frz.AddressType.CALL, ):
continue
if mode == MODE_FAILURE:
continue
@@ -481,7 +482,7 @@ def render_rules(console: Console, doc: rd.ResultDocument):
elif rule.meta.scopes.dynamic == capa.rules.Scope.THREAD:
console.write(v.render_thread(doc.meta.analysis.layout, location))
elif rule.meta.scopes.dynamic == capa.rules.Scope.SEQUENCE:
calls = sorted(set(collect_call_locations(match)))
calls = sorted(set(collect_sequence_locations(match)))
console.write(hanging_indent(v.render_sequence(doc.meta.analysis.layout, calls), indent=1))
elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL:
console.write(hanging_indent(v.render_call(doc.meta.analysis.layout, location), indent=1))

View File

@@ -364,3 +364,101 @@ def test_dynamic_sequence_multiple_sequences_overlapping_single_event():
assert r.name in capabilities.matches
# we only match the first overlapping sequence
assert [11] == list(get_call_ids(capabilities.matches[r.name]))
# show that you can use match statements in sequence rules.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
# ...
# call 10: LdrGetDllHandle(1974337536, kernel32.dll)
# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll)
# call 12: LdrGetDllHandle(1974337536, kernel32.dll)
# call 13: LdrGetProcedureAddress(2010595072, 0, RemoveVectoredExceptionHandler, 1974337536, kernel32.dll)
# ...
def test_dynamic_sequence_scope_match_statements():
extractor = get_0000a657_thread3064()
ruleset = capa.rules.RuleSet(
[
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: resolve add VEH
namespace: linking/runtime-linking/veh
scopes:
static: unsupported
dynamic: sequence
features:
- and:
- api: LdrGetDllHandle
- api: LdrGetProcedureAddress
- string: AddVectoredExceptionHandler
"""
)
),
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: resolve remove VEH
namespace: linking/runtime-linking/veh
scopes:
static: unsupported
dynamic: sequence
features:
- and:
- api: LdrGetDllHandle
- api: LdrGetProcedureAddress
- string: RemoveVectoredExceptionHandler
"""
)
),
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: resolve add and remove VEH
scopes:
static: unsupported
dynamic: sequence
features:
- and:
- match: resolve add VEH
- match: resolve remove VEH
"""
)
),
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: has VEH runtime linking
scopes:
static: unsupported
dynamic: sequence
features:
- and:
- match: linking/runtime-linking/veh
"""
)
),
]
)
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
# basic functionality, already known to work
assert "resolve add VEH" in capabilities.matches
assert "resolve remove VEH" in capabilities.matches
# requires `match: <rule name>` to be working
assert "resolve add and remove VEH" in capabilities.matches
# requires `match: <namespace>` to be working
assert "has VEH runtime linking" in capabilities.matches