From 39319c57a4d7450c87bcc9bb4e670fb3f79d759b Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Mon, 16 Dec 2024 15:51:35 +0000 Subject: [PATCH] sequence: documentation and tests sequence: add more tests --- capa/capabilities/dynamic.py | 10 +- capa/features/extractors/base_extractor.py | 6 +- capa/rules/__init__.py | 2 +- tests/test_dynamic_sequence_scope.py | 111 ++++++++++++++++++++- 4 files changed, 117 insertions(+), 12 deletions(-) diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index 47029cb4..39b538c5 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -18,13 +18,12 @@ import itertools import collections from dataclasses import dataclass -from capa.features.address import NO_ADDRESS import capa.perf import capa.features.freeze as frz import capa.render.result_document as rdoc from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults -from capa.features.common import Feature +from capa.features.address import _NoAddress from capa.capabilities.common import Capabilities, find_file_capabilities from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor @@ -137,9 +136,10 @@ def find_thread_capabilities( if len(sequence_feature_sets) == SEQUENCE_SIZE: overflowing_feature_set = sequence_feature_sets.popleft() - # these are the top-level features that will no longer have any associated addresses. for feature, vas in overflowing_feature_set.items(): - if vas == { NO_ADDRESS, }: + if len(vas) == 1 and isinstance(next(iter(vas)), _NoAddress): + # `vas == { NO_ADDRESS }` without the garbage. + # # ignore the common case of global features getting added/removed/trimmed repeatedly, # like arch/os/format. continue @@ -238,7 +238,7 @@ def find_process_capabilities( def find_dynamic_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None + ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress: bool = False ) -> Capabilities: all_process_matches: MatchResults = collections.defaultdict(list) all_thread_matches: MatchResults = collections.defaultdict(list) diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py index 35c9b665..1be52d06 100644 --- a/capa/features/extractors/base_extractor.py +++ b/capa/features/extractors/base_extractor.py @@ -488,11 +488,11 @@ class DynamicFeatureExtractor: raise NotImplementedError() -def ProcessFilter(extractor: DynamicFeatureExtractor, processes: set) -> DynamicFeatureExtractor: +def ProcessFilter(extractor: DynamicFeatureExtractor, pids: set[int]) -> DynamicFeatureExtractor: original_get_processes = extractor.get_processes def filtered_get_processes(self): - yield from (f for f in original_get_processes() if f.address.pid in processes) + yield from (f for f in original_get_processes() if f.address.pid in pids) # we make a copy of the original extractor object and then update its get_processes() method with the decorated filter one. # this is in order to preserve the original extractor object's get_processes() method, in case it is used elsewhere in the code. @@ -504,7 +504,7 @@ def ProcessFilter(extractor: DynamicFeatureExtractor, processes: set) -> Dynamic return new_extractor -def ThreadFilter(extractor: DynamicFeatureExtractor, threads: set) -> DynamicFeatureExtractor: +def ThreadFilter(extractor: DynamicFeatureExtractor, threads: set[Address]) -> DynamicFeatureExtractor: original_get_threads = extractor.get_threads def filtered_get_threads(self, ph: ProcessHandle): diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 2b0813b6..2ca4e0bc 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -643,7 +643,7 @@ def build_statements(d, scopes: Scopes): ) elif key == "sequence": - if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD)): + if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD, Scope.SEQUENCE)): raise InvalidRule("sequence subscope supported only for the process and thread scopes") if len(d[key]) != 1: diff --git a/tests/test_dynamic_sequence_scope.py b/tests/test_dynamic_sequence_scope.py index 4b423fe0..09ba62bc 100644 --- a/tests/test_dynamic_sequence_scope.py +++ b/tests/test_dynamic_sequence_scope.py @@ -27,11 +27,14 @@ # ... import textwrap +from typing import Iterator from functools import lru_cache +import pytest import fixtures import capa.main +import capa.rules import capa.capabilities.dynamic from capa.features.extractors.base_extractor import ThreadFilter, DynamicFeatureExtractor @@ -62,7 +65,7 @@ def get_0000a657_thread3064(): return extractor -def get_call_ids(matches): +def get_call_ids(matches) -> Iterator[int]: for address, _ in matches: yield address.id @@ -96,7 +99,7 @@ def test_dynamic_call_scope(): assert 8 in get_call_ids(matches[r.name]) -# match the first 5-tuple sequence. +# match the first sequence. # # proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) # thread: 3064 @@ -147,7 +150,7 @@ def test_dynamic_sequence_scope(): # call 14: RtlAddVectoredExceptionHandler(1921490089, 0) # call 15: GetSystemTime() # call 16: NtAllocateVirtualMemory(no, 4, 786432, 4784128, 4294967295) -def test_dynamic_sequence_scope2(): +def test_dynamic_sequence_scope_length(): extractor = get_0000a657_thread3064() rule = textwrap.dedent( @@ -178,6 +181,108 @@ def test_dynamic_sequence_scope2(): assert r.name not in capabilities.matches +# show that you can use a call subscope in sequence rules. +# +# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) +# thread: 3064 +# ... +# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll) +# ... +def test_dynamic_sequence_call_subscope(): + extractor = get_0000a657_thread3064() + + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: unsupported + dynamic: sequence + features: + - and: + - call: + - and: + - api: LdrGetProcedureAddress + - string: AddVectoredExceptionHandler + """ + ) + + r = capa.rules.Rule.from_yaml(rule) + ruleset = capa.rules.RuleSet([r]) + + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name in capabilities.matches + assert 11 in get_call_ids(capabilities.matches[r.name]) + + +# show that you can use a sequence subscope in sequence rules. +# +# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052) +# thread: 3064 +# ... +# call 10: LdrGetDllHandle(1974337536, kernel32.dll) +# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll) +# call 12: LdrGetDllHandle(1974337536, kernel32.dll) +# call 13: LdrGetProcedureAddress(2010595072, 0, RemoveVectoredExceptionHandler, 1974337536, kernel32.dll) +# ... +def test_dynamic_sequence_scope_sequence_subscope(): + extractor = get_0000a657_thread3064() + + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: unsupported + dynamic: sequence + features: + - and: + - sequence: + - description: resolve add VEH # should match at 11 + - and: + - api: LdrGetDllHandle + - api: LdrGetProcedureAddress + - string: AddVectoredExceptionHandler + - sequence: + - description: resolve remove VEH # should match at 13 + - and: + - api: LdrGetDllHandle + - api: LdrGetProcedureAddress + - string: RemoveVectoredExceptionHandler + """ + ) + + r = capa.rules.Rule.from_yaml(rule) + ruleset = capa.rules.RuleSet([r]) + + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name in capabilities.matches + assert 13 in get_call_ids(capabilities.matches[r.name]) + + +# show that you can't use thread subscope in sequence rules. +def test_dynamic_sequence_scope_thread_subscope(): + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: unsupported + dynamic: sequence + features: + - and: + - thread: + - string: "foo" + """ + ) + + with pytest.raises(capa.rules.InvalidRule): + capa.rules.Rule.from_yaml(rule) + + # show how you might use a sequence rule: to match a small window for a collection of features. # # proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)