diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index 39b538c5..a4e97be6 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -80,6 +80,89 @@ class ThreadCapabilities: call_matches: MatchResults +class SequenceMatcher: + def __init__(self, ruleset: RuleSet): + super().__init__() + self.ruleset = ruleset + + # matches found at the sequence scope. + self.matches: MatchResults = collections.defaultdict(list) + + # We matches sequences as the sliding window of calls with size SEQUENCE_SIZE. + # + # For each call, we consider the window of SEQUENCE_SIZE calls leading up to it, + # merging all their features and doing a match. + # + # We track these features in two data structures: + # 1. a deque of those features found in the prior calls. + # We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed. + # 2. a live set of features seen in the sequence. + # As we pop from the deque, we remove features from the current set, + # and as we push to the deque, we insert features to the current set. + # With this approach, our algorithm performance is independent of SEQUENCE_SIZE. + # The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SEQUENCE_SIZE + # (that is, runtime gets slower the larger SEQUENCE_SIZE is). + self.current_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE) + self.current_features: FeatureSet = collections.defaultdict(set) + + # the names of rules matched at the last sequence, + # so that we can deduplicate long strings of the same matche. + self.last_sequence_matches: set[str] = set() + + def next(self, ch: CallHandle, call_features: FeatureSet): + # As we add items to the end of the deque, overflow and drop the oldest items (at the left end). + # While we could rely on `deque.append` with `maxlen` set (which we provide above), + # we want to use the dropped item first, to remove the old features, so we manually pop it here. + if len(self.current_feature_sets) == SEQUENCE_SIZE: + overflowing_feature_set = self.current_feature_sets.popleft() + + for feature, vas in overflowing_feature_set.items(): + if len(vas) == 1 and isinstance(next(iter(vas)), _NoAddress): + # `vas == { NO_ADDRESS }` without the garbage. + # + # ignore the common case of global features getting added/removed/trimmed repeatedly, + # like arch/os/format. + continue + + feature_vas = self.current_features[feature] + feature_vas.difference_update(vas) + if not feature_vas: + del self.current_features[feature] + + # update the deque and set of features with the latest call's worth of features. + self.current_feature_sets.append(call_features) + for feature, vas in call_features.items(): + self.current_features[feature].update(vas) + + _, matches = self.ruleset.match(Scope.SEQUENCE, self.current_features, ch.address) + + newly_encountered_rules = set(matches.keys()) - self.last_sequence_matches + + # don't emit match results for rules seen during the immediately preceeding sequence. + # + # This means that we won't emit duplicate matches when there are multiple sequences + # that overlap a single matching event. + # It also handles the case of a tight loop containing matched logic; + # only the first match will be recorded. + # + # In theory, this means the result document doesn't have *every* possible match location, + # but in practice, humans will only be interested in the first handful anyways. + suppressed_rules = set(self.last_sequence_matches) + + # however, if a newly encountered rule depends on a suppressed rule, + # don't suppress that rule match, or we won't be able to reconstruct the vverbose output. + # see: https://github.com/mandiant/capa/pull/2532#issuecomment-2548508130 + for new_rule in newly_encountered_rules: + suppressed_rules -= set(self.ruleset.rules[new_rule].get_dependencies(self.ruleset.rules_by_namespace)) + + for rule_name, res in matches.items(): + if rule_name in suppressed_rules: + continue + self.matches[rule_name].extend(res) + + self.last_sequence_matches = set(matches.keys()) + + def find_thread_capabilities( ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle ) -> ThreadCapabilities: @@ -95,31 +178,11 @@ def find_thread_capabilities( # might be found at different calls, that's ok. call_matches: MatchResults = collections.defaultdict(list) - # matches found at the sequence scope. - sequence_matches: MatchResults = collections.defaultdict(list) - - # We matches sequences as the sliding window of calls with size SEQUENCE_SIZE. - # - # For each call, we consider the window of SEQUENCE_SIZE calls leading up to it, - # merging all their features and doing a match. - # - # We track these features in two data structures: - # 1. a deque of those features found in the prior calls. - # We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed. - # 2. a live set of features seen in the sequence. - # As we pop from the deque, we remove features from the current set, - # and as we push to the deque, we insert features to the current set. - # With this approach, our algorithm performance is independent of SEQUENCE_SIZE. - # The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SEQUENCE_SIZE - # (that is, runtime gets slower the larger SEQUENCE_SIZE is). - sequence_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE) - sequence_features: FeatureSet = collections.defaultdict(set) - - # the names of rules matched at the last sequence, - # so that we can deduplicate long strings of the same matche. - last_sequence_matches: set[str] = set() + sequence_matcher = SequenceMatcher(ruleset) + call_count = 0 for ch in extractor.get_calls(ph, th): + call_count += 1 call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch) for feature, vas in call_capabilities.features.items(): features[feature].update(vas) @@ -127,50 +190,7 @@ def find_thread_capabilities( for rule_name, res in call_capabilities.matches.items(): call_matches[rule_name].extend(res) - # - # sequence scope matching - # - # As we add items to the end of the deque, overflow and drop the oldest items (at the left end). - # While we could rely on `deque.append` with `maxlen` set (which we provide above), - # we want to use the dropped item first, to remove the old features, so we manually pop it here. - if len(sequence_feature_sets) == SEQUENCE_SIZE: - overflowing_feature_set = sequence_feature_sets.popleft() - - for feature, vas in overflowing_feature_set.items(): - if len(vas) == 1 and isinstance(next(iter(vas)), _NoAddress): - # `vas == { NO_ADDRESS }` without the garbage. - # - # ignore the common case of global features getting added/removed/trimmed repeatedly, - # like arch/os/format. - continue - - feature_vas = sequence_features[feature] - feature_vas.difference_update(vas) - if not feature_vas: - del sequence_features[feature] - - # update the deque and set of features with the latest call's worth of features. - latest_features = call_capabilities.features - sequence_feature_sets.append(latest_features) - for feature, vas in latest_features.items(): - sequence_features[feature].update(vas) - - _, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, ch.address) - for rule_name, res in smatches.items(): - if rule_name in last_sequence_matches: - # don't emit match results for rules seen during the immediately preceeding sequence. - # - # This means that we won't emit duplicate matches when there are multiple sequences - # that overlap a single matching event. - # It also handles the case of a tight loop containing matched logic; - # only the first match will be recorded. - # - # In theory, this means the result document doesn't have *every* possible match location, - # but in practice, humans will only be interested in the first handful anyways. - continue - sequence_matches[rule_name].extend(res) - - last_sequence_matches = set(smatches.keys()) + sequence_matcher.next(ch, call_capabilities.features) for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): features[feature].add(va) @@ -183,13 +203,22 @@ def find_thread_capabilities( for va, _ in res: capa.engine.index_rule_matches(features, rule, [va]) - return ThreadCapabilities(features, matches, sequence_matches, call_matches) + logger.debug( + "analyzed thread %d[%d] with %d events, %d features, and %d matches", + th.address.process.pid, + th.address.tid, + call_count, + len(features), + len(matches) + len(sequence_matcher.matches) + len(call_matches), + ) + return ThreadCapabilities(features, matches, sequence_matcher.matches, call_matches) @dataclass class ProcessCapabilities: process_matches: MatchResults thread_matches: MatchResults + sequence_matches: MatchResults call_matches: MatchResults feature_count: int @@ -234,7 +263,14 @@ def find_process_capabilities( process_features[feature].add(va) _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) - return ProcessCapabilities(process_matches, thread_matches, call_matches, len(process_features)) + + logger.debug( + "analyzed process %d and extracted %d features with %d matches", + ph.address.pid, + len(process_features), + len(process_matches), + ) + return ProcessCapabilities(process_matches, thread_matches, sequence_matches, call_matches, len(process_features)) def find_dynamic_capabilities( @@ -262,7 +298,6 @@ def find_dynamic_capabilities( address=frz.Address.from_capa(p.address), count=process_capabilities.feature_count ), ) - logger.debug("analyzed %s and extracted %d features", p.address, process_capabilities.feature_count) for rule_name, res in process_capabilities.process_matches.items(): all_process_matches[rule_name].extend(res) diff --git a/capa/features/address.py b/capa/features/address.py index 447b936f..eb708a3d 100644 --- a/capa/features/address.py +++ b/capa/features/address.py @@ -114,8 +114,7 @@ class DynamicCallAddress(Address): return hash((self.thread, self.id)) def __eq__(self, other): - assert isinstance(other, DynamicCallAddress) - return (self.thread, self.id) == (other.thread, other.id) + return isinstance(other, DynamicCallAddress) and (self.thread, self.id) == (other.thread, other.id) def __lt__(self, other): assert isinstance(other, DynamicCallAddress) diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 08612f8e..2e12d2ff 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -64,7 +64,14 @@ class AddressType(str, Enum): class Address(HashableModel): type: AddressType - value: Union[int, tuple[int, ...], None] = None # None default value to support deserialization of NO_ADDRESS + value: Union[ + # for absolute, relative, file + int, + # for DNToken, Process, Thread, Call + tuple[int, ...], + # for NO_ADDRESS, + None, + ] = None # None default value to support deserialization of NO_ADDRESS @classmethod def from_capa(cls, a: capa.features.address.Address) -> "Address": diff --git a/capa/render/proto/capa_pb2.py b/capa/render/proto/capa_pb2.py index 35d01793..e695a27a 100644 --- a/capa/render/proto/capa_pb2.py +++ b/capa/render/proto/capa_pb2.py @@ -1,11 +1,22 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE # source: capa/render/proto/capa.proto +# Protobuf Python Version: 5.28.3 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version from google.protobuf import symbol_database as _symbol_database from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 28, + 3, + '', + 'capa/render/proto/capa.proto' +) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -18,15 +29,15 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x63\x61pa/ren _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'capa.render.proto.capa_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _globals['_MATCH_CAPTURESENTRY']._options = None +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_MATCH_CAPTURESENTRY']._loaded_options = None _globals['_MATCH_CAPTURESENTRY']._serialized_options = b'8\001' - _globals['_METADATA'].fields_by_name['analysis']._options = None + _globals['_METADATA'].fields_by_name['analysis']._loaded_options = None _globals['_METADATA'].fields_by_name['analysis']._serialized_options = b'\030\001' - _globals['_RESULTDOCUMENT_RULESENTRY']._options = None + _globals['_RESULTDOCUMENT_RULESENTRY']._loaded_options = None _globals['_RESULTDOCUMENT_RULESENTRY']._serialized_options = b'8\001' - _globals['_RULEMETADATA'].fields_by_name['scope']._options = None + _globals['_RULEMETADATA'].fields_by_name['scope']._loaded_options = None _globals['_RULEMETADATA'].fields_by_name['scope']._serialized_options = b'\030\001' _globals['_ADDRESSTYPE']._serialized_start=9062 _globals['_ADDRESSTYPE']._serialized_end=9336 diff --git a/capa/render/proto/capa_pb2.pyi b/capa/render/proto/capa_pb2.pyi index c881561a..06f84617 100644 --- a/capa/render/proto/capa_pb2.pyi +++ b/capa/render/proto/capa_pb2.pyi @@ -1,6 +1,19 @@ """ @generated by mypy-protobuf. Do not edit manually! isort:skip_file +Copyright 2023 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. """ import builtins diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 8bf6a948..b2b35c00 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -29,6 +29,7 @@ import capa.features.freeze.features as frzf from capa.rules import RuleSet from capa.engine import MatchResults from capa.helpers import assert_never, load_json_from_path +from capa.features.address import DynamicCallAddress if TYPE_CHECKING: from capa.capabilities.common import Capabilities @@ -392,7 +393,33 @@ class Match(FrozenModel): ) for location in result.locations: - children.append(Match.from_capa(rules, capabilities, rule_matches[location])) + + # keep this in sync with the copy below + if isinstance(location, DynamicCallAddress): + if location in rule_matches: + # exact match, such as matching a call-scoped rule. + children.append(Match.from_capa(rules, capabilities, rule_matches[location])) + # we'd like to assert the scope of the current rule is "sequence" + # but we don't have that data here. + else: + # Sequence scopes can match each other, but they don't strictly contain each other, + # like the way a function contains a basic block. + # So when we have a match within a sequence for another sequence, we need to look + # for all the places it might be found. + # + # Despite the edge cases (like API hammering), this turns out to be pretty easy: + # collect the most recent match (with the given name) prior to the wanted location. + matches_in_thread = sorted([ + (a.id, m) for a, m in rule_matches.items() + if isinstance(a, DynamicCallAddress) + and a.thread == location.thread + and a.id <= location.id + ]) + _, most_recent_match = matches_in_thread[-1] + children.append(Match.from_capa(rules, capabilities, most_recent_match)) + + else: + children.append(Match.from_capa(rules, capabilities, rule_matches[location])) else: # this is a namespace that we're matching # @@ -433,8 +460,23 @@ class Match(FrozenModel): # this is a subset of doc[locations]. # # so, grab only the locations for current rule. - if location in rule_matches: - children.append(Match.from_capa(rules, capabilities, rule_matches[location])) + + # keep this in sync with the block above. + if isinstance(location, DynamicCallAddress): + if location in rule_matches: + children.append(Match.from_capa(rules, capabilities, rule_matches[location])) + else: + matches_in_thread = sorted([ + (a.id, m) for a, m in rule_matches.items() + if isinstance(a, DynamicCallAddress) + and a.thread == location.thread + and a.id <= location.id + ]) + _, most_recent_match = matches_in_thread[-1] + children.append(Match.from_capa(rules, capabilities, most_recent_match)) + else: + if location in rule_matches: + children.append(Match.from_capa(rules, capabilities, rule_matches[location])) return cls( success=success, diff --git a/capa/render/verbose.py b/capa/render/verbose.py index a89db503..81bf93e0 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -43,7 +43,6 @@ from capa.rules import RuleSet from capa.engine import MatchResults from capa.render.utils import Console - def format_address(address: frz.Address) -> str: if address.type == frz.AddressType.ABSOLUTE: assert isinstance(address.value, int) diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index 5a2f1718..3b907a37 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -230,7 +230,8 @@ def render_feature( # if we're in call scope, then the call will have been rendered at the top # of the output, so don't re-render it again for each feature. pass - elif isinstance(feature, (frzf.OSFeature, frzf.ArchFeature, frzf.FormatFeature)): + elif isinstance(layout, rd.DynamicLayout) and isinstance(feature, frzf.MatchFeature): + # don't render copies of the sequence address for submatches pass else: render_locations(console, layout, match.locations, indent) @@ -311,13 +312,13 @@ def render_match( render_match(console, layout, rule, child, indent=indent + 1, mode=child_mode) -def collect_call_locations( +def collect_sequence_locations( match: rd.Match, mode=MODE_SUCCESS, ): """ - Find all the DynamicCallAddress locations in the given match, recursively. - Useful to collect the calls used to match a sequence scoped rule. + Find all the (call, sequence) locations used in a given sequence match, recursively. + Useful to collect the events used to match a sequence scoped rule. """ if isinstance(match.node, rd.StatementNode): if ( @@ -326,13 +327,13 @@ def collect_call_locations( ): child_mode = MODE_FAILURE if mode == MODE_SUCCESS else MODE_SUCCESS for child in match.children: - yield from collect_call_locations(child, child_mode) + yield from collect_sequence_locations(child, child_mode) else: for child in match.children: - yield from collect_call_locations(child, mode) + yield from collect_sequence_locations(child, mode) elif isinstance(match.node, rd.FeatureNode): for location in match.locations: - if location.type != frz.AddressType.CALL: + if location.type not in (frz.AddressType.CALL, ): continue if mode == MODE_FAILURE: continue @@ -481,7 +482,7 @@ def render_rules(console: Console, doc: rd.ResultDocument): elif rule.meta.scopes.dynamic == capa.rules.Scope.THREAD: console.write(v.render_thread(doc.meta.analysis.layout, location)) elif rule.meta.scopes.dynamic == capa.rules.Scope.SEQUENCE: - calls = sorted(set(collect_call_locations(match))) + calls = sorted(set(collect_sequence_locations(match))) console.write(hanging_indent(v.render_sequence(doc.meta.analysis.layout, calls), indent=1)) elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL: console.write(hanging_indent(v.render_call(doc.meta.analysis.layout, location), indent=1)) diff --git a/tests/test_dynamic_sequence_scope.py b/tests/test_dynamic_sequence_scope.py index 09ba62bc..8d5341e0 100644 --- a/tests/test_dynamic_sequence_scope.py +++ b/tests/test_dynamic_sequence_scope.py @@ -364,3 +364,101 @@ def test_dynamic_sequence_multiple_sequences_overlapping_single_event(): assert r.name in capabilities.matches # we only match the first overlapping sequence assert [11] == list(get_call_ids(capabilities.matches[r.name])) + + +# show that you can use match statements in sequence rules. +# +# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) +# thread: 3064 +# ... +# call 10: LdrGetDllHandle(1974337536, kernel32.dll) +# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll) +# call 12: LdrGetDllHandle(1974337536, kernel32.dll) +# call 13: LdrGetProcedureAddress(2010595072, 0, RemoveVectoredExceptionHandler, 1974337536, kernel32.dll) +# ... +def test_dynamic_sequence_scope_match_statements(): + extractor = get_0000a657_thread3064() + + ruleset = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: resolve add VEH + namespace: linking/runtime-linking/veh + scopes: + static: unsupported + dynamic: sequence + features: + - and: + - api: LdrGetDllHandle + - api: LdrGetProcedureAddress + - string: AddVectoredExceptionHandler + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: resolve remove VEH + namespace: linking/runtime-linking/veh + scopes: + static: unsupported + dynamic: sequence + features: + - and: + - api: LdrGetDllHandle + - api: LdrGetProcedureAddress + - string: RemoveVectoredExceptionHandler + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: resolve add and remove VEH + scopes: + static: unsupported + dynamic: sequence + features: + - and: + - match: resolve add VEH + - match: resolve remove VEH + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: has VEH runtime linking + scopes: + static: unsupported + dynamic: sequence + features: + - and: + - match: linking/runtime-linking/veh + """ + ) + ), + ] + ) + + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + + # basic functionality, already known to work + assert "resolve add VEH" in capabilities.matches + assert "resolve remove VEH" in capabilities.matches + + # requires `match: ` to be working + assert "resolve add and remove VEH" in capabilities.matches + + # requires `match: ` to be working + assert "has VEH runtime linking" in capabilities.matches