rename "sequence" scope to "span of calls" scope

pep8

fix ref

update submodules

update testfiles submodule

duplicate variable
This commit is contained in:
Willi Ballenthin
2025-01-17 12:44:47 +00:00
committed by Willi Ballenthin
parent 277504c7b7
commit cdc1cb7afd
13 changed files with 131 additions and 133 deletions

View File

@@ -4,11 +4,11 @@
### New Features
- add sequence scope to match features against a across a sliding window of API calls within a thread @williballenthin #2532
- add span-of-calls scope to match features against a across a sliding window of API calls within a thread @williballenthin #2532
### Breaking Changes
- add sequence scope to rule format
- add span-of-calls scope to rule format
- capabilities functions return dataclasses instead of tuples
### New Rules (2)

View File

@@ -32,11 +32,11 @@ from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, Pr
logger = logging.getLogger(__name__)
# The number of calls that make up a sequence.
# The number of calls that make up a span of calls.
#
# The larger this is, the more calls are grouped together to match rule logic.
# This means a longer chain can be recognized; however, its a bit more expensive.
SEQUENCE_SIZE = 20
SPAN_SIZE = 20
@dataclass
@@ -45,10 +45,6 @@ class CallCapabilities:
matches: MatchResults
# The number of calls that make up a sequence.
SEQUENCE_SIZE = 5
def find_call_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle
) -> CallCapabilities:
@@ -78,44 +74,44 @@ def find_call_capabilities(
class ThreadCapabilities:
features: FeatureSet
thread_matches: MatchResults
sequence_matches: MatchResults
span_matches: MatchResults
call_matches: MatchResults
class SequenceMatcher:
class SpanOfCallsMatcher:
def __init__(self, ruleset: RuleSet):
super().__init__()
self.ruleset = ruleset
# matches found at the sequence scope.
# matches found at the span scope.
self.matches: MatchResults = collections.defaultdict(list)
# We matches sequences as the sliding window of calls with size SEQUENCE_SIZE.
# We match spans as the sliding window of calls with size SPAN_SIZE.
#
# For each call, we consider the window of SEQUENCE_SIZE calls leading up to it,
# For each call, we consider the window of SPAN_SIZE calls leading up to it,
# merging all their features and doing a match.
#
# We track these features in two data structures:
# 1. a deque of those features found in the prior calls.
# We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed.
# 2. a live set of features seen in the sequence.
# We'll append to it, and as it grows larger than SPAN_SIZE, the oldest items are removed.
# 2. a live set of features seen in the span.
# As we pop from the deque, we remove features from the current set,
# and as we push to the deque, we insert features to the current set.
# With this approach, our algorithm performance is independent of SEQUENCE_SIZE.
# The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SEQUENCE_SIZE
# (that is, runtime gets slower the larger SEQUENCE_SIZE is).
self.current_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE)
# With this approach, our algorithm performance is independent of SPAN_SIZE.
# The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SPAN_SIZE
# (that is, runtime gets slower the larger SPAN_SIZE is).
self.current_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SPAN_SIZE)
self.current_features: FeatureSet = collections.defaultdict(set)
# the names of rules matched at the last sequence,
# the names of rules matched at the last span,
# so that we can deduplicate long strings of the same matches.
self.last_sequence_matches: set[str] = set()
self.last_span_matches: set[str] = set()
def next(self, ch: CallHandle, call_features: FeatureSet):
# As we add items to the end of the deque, overflow and drop the oldest items (at the left end).
# While we could rely on `deque.append` with `maxlen` set (which we provide above),
# we want to use the dropped item first, to remove the old features, so we manually pop it here.
if len(self.current_feature_sets) == SEQUENCE_SIZE:
if len(self.current_feature_sets) == SPAN_SIZE:
overflowing_feature_set = self.current_feature_sets.popleft()
for feature, vas in overflowing_feature_set.items():
@@ -135,20 +131,20 @@ class SequenceMatcher:
for feature, vas in call_features.items():
self.current_features[feature] |= vas
_, matches = self.ruleset.match(Scope.SEQUENCE, self.current_features, ch.address)
_, matches = self.ruleset.match(Scope.SPAN_OF_CALLS, self.current_features, ch.address)
newly_encountered_rules = set(matches.keys()) - self.last_sequence_matches
newly_encountered_rules = set(matches.keys()) - self.last_span_matches
# don't emit match results for rules seen during the immediately preceeding sequence.
# don't emit match results for rules seen during the immediately preceeding spans.
#
# This means that we won't emit duplicate matches when there are multiple sequences
# This means that we won't emit duplicate matches when there are multiple spans
# that overlap a single matching event.
# It also handles the case of a tight loop containing matched logic;
# only the first match will be recorded.
#
# In theory, this means the result document doesn't have *every* possible match location,
# but in practice, humans will only be interested in the first handful anyways.
suppressed_rules = set(self.last_sequence_matches)
suppressed_rules = set(self.last_span_matches)
# however, if a newly encountered rule depends on a suppressed rule,
# don't suppress that rule match, or we won't be able to reconstruct the vverbose output.
@@ -161,7 +157,7 @@ class SequenceMatcher:
continue
self.matches[rule_name].extend(res)
self.last_sequence_matches = set(matches.keys())
self.last_span_matches = set(matches.keys())
def find_thread_capabilities(
@@ -169,7 +165,7 @@ def find_thread_capabilities(
) -> ThreadCapabilities:
"""
find matches for the given rules within the given thread,
which includes matches for all the sequences and calls within it.
which includes matches for all the spans and calls within it.
"""
# all features found within this thread,
# includes features found within calls.
@@ -179,7 +175,7 @@ def find_thread_capabilities(
# might be found at different calls, that's ok.
call_matches: MatchResults = collections.defaultdict(list)
sequence_matcher = SequenceMatcher(ruleset)
span_matcher = SpanOfCallsMatcher(ruleset)
call_count = 0
for call_count, ch in enumerate(extractor.get_calls(ph, th)): # noqa: B007
@@ -190,7 +186,7 @@ def find_thread_capabilities(
for rule_name, res in call_capabilities.matches.items():
call_matches[rule_name].extend(res)
sequence_matcher.next(ch, call_capabilities.features)
span_matcher.next(ch, call_capabilities.features)
for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()):
features[feature].add(va)
@@ -209,16 +205,16 @@ def find_thread_capabilities(
th.address.tid,
call_count,
len(features),
len(matches) + len(sequence_matcher.matches) + len(call_matches),
len(matches) + len(span_matcher.matches) + len(call_matches),
)
return ThreadCapabilities(features, matches, sequence_matcher.matches, call_matches)
return ThreadCapabilities(features, matches, span_matcher.matches, call_matches)
@dataclass
class ProcessCapabilities:
process_matches: MatchResults
thread_matches: MatchResults
sequence_matches: MatchResults
span_matches: MatchResults
call_matches: MatchResults
feature_count: int
@@ -237,9 +233,9 @@ def find_process_capabilities(
# might be found at different threads, that's ok.
thread_matches: MatchResults = collections.defaultdict(list)
# matches found at the sequence scope.
# might be found at different sequences, that's ok.
sequence_matches: MatchResults = collections.defaultdict(list)
# matches found at the span-of-calls scope.
# might be found at different spans, that's ok.
span_matches: MatchResults = collections.defaultdict(list)
# matches found at the call scope.
# might be found at different calls, that's ok.
@@ -253,8 +249,8 @@ def find_process_capabilities(
for rule_name, res in thread_capabilities.thread_matches.items():
thread_matches[rule_name].extend(res)
for rule_name, res in thread_capabilities.sequence_matches.items():
sequence_matches[rule_name].extend(res)
for rule_name, res in thread_capabilities.span_matches.items():
span_matches[rule_name].extend(res)
for rule_name, res in thread_capabilities.call_matches.items():
call_matches[rule_name].extend(res)
@@ -270,7 +266,7 @@ def find_process_capabilities(
len(process_features),
len(process_matches),
)
return ProcessCapabilities(process_matches, thread_matches, sequence_matches, call_matches, len(process_features))
return ProcessCapabilities(process_matches, thread_matches, span_matches, call_matches, len(process_features))
def find_dynamic_capabilities(
@@ -278,7 +274,7 @@ def find_dynamic_capabilities(
) -> Capabilities:
all_process_matches: MatchResults = collections.defaultdict(list)
all_thread_matches: MatchResults = collections.defaultdict(list)
all_sequence_matches: MatchResults = collections.defaultdict(list)
all_span_matches: MatchResults = collections.defaultdict(list)
all_call_matches: MatchResults = collections.defaultdict(list)
feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=())
@@ -303,8 +299,8 @@ def find_dynamic_capabilities(
all_process_matches[rule_name].extend(res)
for rule_name, res in process_capabilities.thread_matches.items():
all_thread_matches[rule_name].extend(res)
for rule_name, res in process_capabilities.sequence_matches.items():
all_sequence_matches[rule_name].extend(res)
for rule_name, res in process_capabilities.span_matches.items():
all_span_matches[rule_name].extend(res)
for rule_name, res in process_capabilities.call_matches.items():
all_call_matches[rule_name].extend(res)
@@ -314,7 +310,7 @@ def find_dynamic_capabilities(
# mapping from feature (matched rule) to set of addresses at which it matched.
process_and_lower_features: FeatureSet = collections.defaultdict(set)
for rule_name, results in itertools.chain(
all_process_matches.items(), all_thread_matches.items(), all_sequence_matches.items(), all_call_matches.items()
all_process_matches.items(), all_thread_matches.items(), all_span_matches.items(), all_call_matches.items()
):
locations = {p[0] for p in results}
rule = ruleset[rule_name]
@@ -329,7 +325,7 @@ def find_dynamic_capabilities(
# so there won't be any overlap among these following MatchResults,
# and we can merge the dictionaries naively.
all_call_matches.items(),
all_sequence_matches.items(),
all_span_matches.items(),
all_thread_matches.items(),
all_process_matches.items(),
all_file_capabilities.matches.items(),

View File

@@ -163,8 +163,8 @@ def scope_to_pb2(scope: capa.rules.Scope) -> capa_pb2.Scope.ValueType:
return capa_pb2.Scope.SCOPE_PROCESS
elif scope == capa.rules.Scope.THREAD:
return capa_pb2.Scope.SCOPE_THREAD
elif scope == capa.rules.Scope.SEQUENCE:
return capa_pb2.Scope.SCOPE_SEQUENCE
elif scope == capa.rules.Scope.SPAN_OF_CALLS:
return capa_pb2.Scope.SCOPE_SPAN_OF_CALLS
elif scope == capa.rules.Scope.CALL:
return capa_pb2.Scope.SCOPE_CALL
else:
@@ -657,8 +657,8 @@ def scope_from_pb2(scope: capa_pb2.Scope.ValueType) -> capa.rules.Scope:
return capa.rules.Scope.PROCESS
elif scope == capa_pb2.Scope.SCOPE_THREAD:
return capa.rules.Scope.THREAD
elif scope == capa_pb2.Scope.SCOPE_SEQUENCE:
return capa.rules.Scope.SEQUENCE
elif scope == capa_pb2.Scope.SCOPE_SPAN_OF_CALLS:
return capa.rules.Scope.SPAN_OF_CALLS
elif scope == capa_pb2.Scope.SCOPE_CALL:
return capa.rules.Scope.CALL
else:

View File

@@ -378,7 +378,7 @@ enum Scope {
SCOPE_PROCESS = 5;
SCOPE_THREAD = 6;
SCOPE_CALL = 7;
SCOPE_SEQUENCE = 8;
SCOPE_SPAN_OF_CALLS = 8;
}
message Scopes {

File diff suppressed because one or more lines are too long

View File

@@ -94,7 +94,7 @@ class _ScopeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumType
SCOPE_PROCESS: _Scope.ValueType # 5
SCOPE_THREAD: _Scope.ValueType # 6
SCOPE_CALL: _Scope.ValueType # 7
SCOPE_SEQUENCE: _Scope.ValueType # 8
SCOPE_SPAN_OF_CALLS: _Scope.ValueType # 8
class Scope(_Scope, metaclass=_ScopeEnumTypeWrapper): ...
@@ -106,7 +106,7 @@ SCOPE_INSTRUCTION: Scope.ValueType # 4
SCOPE_PROCESS: Scope.ValueType # 5
SCOPE_THREAD: Scope.ValueType # 6
SCOPE_CALL: Scope.ValueType # 7
SCOPE_SEQUENCE: Scope.ValueType # 8
SCOPE_SPAN_OF_CALLS: Scope.ValueType # 8
global___Scope = Scope
@typing.final

View File

@@ -399,12 +399,12 @@ class Match(FrozenModel):
if location in rule_matches:
# exact match, such as matching a call-scoped rule.
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
# we'd like to assert the scope of the current rule is "sequence"
# we'd like to assert the scope of the current rule is span-of-calls
# but we don't have that data here.
else:
# Sequence scopes can match each other, but they don't strictly contain each other,
# Span-of-calls scopes can match each other, but they don't strictly contain each other,
# like the way a function contains a basic block.
# So when we have a match within a sequence for another sequence, we need to look
# So when we have a match within a span for another span, we need to look
# for all the places it might be found.
#
# Despite the edge cases (like API hammering), this turns out to be pretty easy:

View File

@@ -126,7 +126,7 @@ def render_thread(layout: rd.DynamicLayout, addr: frz.Address) -> str:
return f"{name}{{pid:{thread.process.pid},tid:{thread.tid}}}"
def render_sequence(layout: rd.DynamicLayout, addrs: list[frz.Address]) -> str:
def render_span_of_calls(layout: rd.DynamicLayout, addrs: list[frz.Address]) -> str:
calls: list[capa.features.address.DynamicCallAddress] = [addr.to_capa() for addr in addrs] # type: ignore
for call in calls:
assert isinstance(call, capa.features.address.DynamicCallAddress)
@@ -328,7 +328,7 @@ def render_rules(console: Console, doc: rd.ResultDocument):
lines = [render_process(doc.meta.analysis.layout, loc) for loc in locations]
elif rule.meta.scopes.dynamic == capa.rules.Scope.THREAD:
lines = [render_thread(doc.meta.analysis.layout, loc) for loc in locations]
elif rule.meta.scopes.dynamic in (capa.rules.Scope.CALL, capa.rules.Scope.SEQUENCE):
elif rule.meta.scopes.dynamic in (capa.rules.Scope.CALL, capa.rules.Scope.SPAN_OF_CALLS):
# because we're only in verbose mode, we won't show the full call details (name, args, retval)
# we'll only show the details of the thread in which the calls are found.
# so select the thread locations and render those.

View File

@@ -231,7 +231,7 @@ def render_feature(
# of the output, so don't re-render it again for each feature.
pass
elif isinstance(layout, rd.DynamicLayout) and isinstance(feature, frzf.MatchFeature):
# don't render copies of the sequence address for submatches
# don't render copies of the span of calls address for submatches
pass
else:
render_locations(console, layout, match.locations, indent)
@@ -312,13 +312,13 @@ def render_match(
render_match(console, layout, rule, child, indent=indent + 1, mode=child_mode)
def collect_sequence_locations(
def collect_span_of_calls_locations(
match: rd.Match,
mode=MODE_SUCCESS,
):
"""
Find all the (call, sequence) locations used in a given sequence match, recursively.
Useful to collect the events used to match a sequence scoped rule.
Find all the call locations used in a given span-of-calls match, recursively.
Useful to collect the events used to match a span-of-calls scoped rule.
"""
if isinstance(match.node, rd.StatementNode):
if (
@@ -327,7 +327,7 @@ def collect_sequence_locations(
):
child_mode = MODE_FAILURE if mode == MODE_SUCCESS else MODE_SUCCESS
for child in match.children:
yield from collect_sequence_locations(child, child_mode)
yield from collect_span_of_calls_locations(child, child_mode)
elif isinstance(match.node.statement, rd.RangeStatement):
for location in match.locations:
if location.type not in (frz.AddressType.CALL,):
@@ -337,7 +337,7 @@ def collect_sequence_locations(
yield location
else:
for child in match.children:
yield from collect_sequence_locations(child, mode)
yield from collect_span_of_calls_locations(child, mode)
elif isinstance(match.node, rd.FeatureNode):
for location in match.locations:
if location.type not in (frz.AddressType.CALL,):
@@ -488,9 +488,9 @@ def render_rules(console: Console, doc: rd.ResultDocument):
console.write(v.render_process(doc.meta.analysis.layout, location))
elif rule.meta.scopes.dynamic == capa.rules.Scope.THREAD:
console.write(v.render_thread(doc.meta.analysis.layout, location))
elif rule.meta.scopes.dynamic == capa.rules.Scope.SEQUENCE:
calls = sorted(set(collect_sequence_locations(match)))
console.write(hanging_indent(v.render_sequence(doc.meta.analysis.layout, calls), indent=1))
elif rule.meta.scopes.dynamic == capa.rules.Scope.SPAN_OF_CALLS:
calls = sorted(set(collect_span_of_calls_locations(match)))
console.write(hanging_indent(v.render_span_of_calls(doc.meta.analysis.layout, calls), indent=1))
elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL:
console.write(hanging_indent(v.render_call(doc.meta.analysis.layout, location), indent=1))
else:

View File

@@ -86,7 +86,7 @@ class Scope(str, Enum):
FILE = "file"
PROCESS = "process"
THREAD = "thread"
SEQUENCE = "sequence"
SPAN_OF_CALLS = "span of calls"
CALL = "call"
FUNCTION = "function"
BASIC_BLOCK = "basic block"
@@ -115,7 +115,7 @@ DYNAMIC_SCOPES = {
Scope.GLOBAL,
Scope.PROCESS,
Scope.THREAD,
Scope.SEQUENCE,
Scope.SPAN_OF_CALLS,
Scope.CALL,
}
@@ -201,7 +201,7 @@ SUPPORTED_FEATURES: dict[str, set] = {
capa.features.common.MatchedRule,
},
Scope.THREAD: set(),
Scope.SEQUENCE: set(),
Scope.SPAN_OF_CALLS: set(),
Scope.CALL: {
capa.features.common.MatchedRule,
capa.features.common.Regex,
@@ -256,14 +256,14 @@ SUPPORTED_FEATURES[Scope.FUNCTION].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.FILE].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.PROCESS].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.THREAD].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.SEQUENCE].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.SPAN_OF_CALLS].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.CALL].update(SUPPORTED_FEATURES[Scope.GLOBAL])
# all call scope features are also sequence features
SUPPORTED_FEATURES[Scope.SEQUENCE].update(SUPPORTED_FEATURES[Scope.CALL])
# all sequence scope features (and therefore, call features) are also thread features
SUPPORTED_FEATURES[Scope.THREAD].update(SUPPORTED_FEATURES[Scope.SEQUENCE])
# all call scope features are also span-of-calls features
SUPPORTED_FEATURES[Scope.SPAN_OF_CALLS].update(SUPPORTED_FEATURES[Scope.CALL])
# all span-of-calls scope features (and therefore, call features) are also thread features
SUPPORTED_FEATURES[Scope.THREAD].update(SUPPORTED_FEATURES[Scope.SPAN_OF_CALLS])
# all thread scope features are also process features
SUPPORTED_FEATURES[Scope.PROCESS].update(SUPPORTED_FEATURES[Scope.THREAD])
@@ -622,7 +622,7 @@ def build_statements(d, scopes: Scopes):
elif key == "process":
if Scope.FILE not in scopes:
raise InvalidRule("process subscope supported only for file scope")
raise InvalidRule("`process` subscope supported only for `file` scope")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
@@ -633,7 +633,7 @@ def build_statements(d, scopes: Scopes):
elif key == "thread":
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS)):
raise InvalidRule("thread subscope supported only for the process scope")
raise InvalidRule("`thread` subscope supported only for the `process` scope")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
@@ -642,20 +642,22 @@ def build_statements(d, scopes: Scopes):
Scope.THREAD, build_statements(d[key][0], Scopes(dynamic=Scope.THREAD)), description=description
)
elif key == "sequence":
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD, Scope.SEQUENCE)):
raise InvalidRule("sequence subscope supported only for the process and thread scopes")
elif key == "span of calls":
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD, Scope.SPAN_OF_CALLS)):
raise InvalidRule("`span of calls` subscope supported only for the `process` and `thread` scopes")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope(
Scope.SEQUENCE, build_statements(d[key][0], Scopes(dynamic=Scope.SEQUENCE)), description=description
Scope.SPAN_OF_CALLS,
build_statements(d[key][0], Scopes(dynamic=Scope.SPAN_OF_CALLS)),
description=description,
)
elif key == "call":
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD, Scope.SEQUENCE, Scope.CALL)):
raise InvalidRule("call subscope supported only for the process, thread, and call scopes")
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD, Scope.SPAN_OF_CALLS, Scope.CALL)):
raise InvalidRule("`call` subscope supported only for the `process`, `thread`, and `call` scopes")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
@@ -666,7 +668,7 @@ def build_statements(d, scopes: Scopes):
elif key == "function":
if Scope.FILE not in scopes:
raise InvalidRule("function subscope supported only for file scope")
raise InvalidRule("`function` subscope supported only for `file` scope")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
@@ -677,7 +679,7 @@ def build_statements(d, scopes: Scopes):
elif key == "basic block":
if Scope.FUNCTION not in scopes:
raise InvalidRule("basic block subscope supported only for function scope")
raise InvalidRule("`basic block` subscope supported only for `function` scope")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
@@ -688,7 +690,7 @@ def build_statements(d, scopes: Scopes):
elif key == "instruction":
if all(s not in scopes for s in (Scope.FUNCTION, Scope.BASIC_BLOCK)):
raise InvalidRule("instruction subscope supported only for function and basic block scope")
raise InvalidRule("`instruction` subscope supported only for `function` and `basic block` scope")
if len(d[key]) == 1:
statements = build_statements(d[key][0], Scopes(static=Scope.INSTRUCTION))
@@ -1401,7 +1403,7 @@ class RuleSet:
scopes = (
Scope.CALL,
Scope.SEQUENCE,
Scope.SPAN_OF_CALLS,
Scope.THREAD,
Scope.PROCESS,
Scope.INSTRUCTION,
@@ -1433,8 +1435,8 @@ class RuleSet:
return self.rules_by_scope[Scope.THREAD]
@property
def sequence_rules(self):
return self.rules_by_scope[Scope.SEQUENCE]
def span_of_calls_rules(self):
return self.rules_by_scope[Scope.SPAN_OF_CALLS]
@property
def call_rules(self):

View File

@@ -194,7 +194,7 @@ class InvalidDynamicScope(Lint):
"file",
"process",
"thread",
"sequence",
"span of calls",
"call",
"unsupported",
)

View File

@@ -99,12 +99,12 @@ def test_dynamic_call_scope():
r = capa.rules.Rule.from_yaml(rule)
ruleset = capa.rules.RuleSet([r])
matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in matches
assert 8 in get_call_ids(matches[r.name])
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in capabilities.matches
assert 8 in get_call_ids(capabilities.matches[r.name])
# match the first sequence.
# match the first span.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
@@ -113,7 +113,7 @@ def test_dynamic_call_scope():
# call 10: LdrGetDllHandle(1974337536, kernel32.dll)
# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll)
# call 12: LdrGetDllHandle(1974337536, kernel32.dll)
def test_dynamic_sequence_scope():
def test_dynamic_span_scope():
extractor = get_0000a657_thread3064()
rule = textwrap.dedent(
@@ -123,7 +123,7 @@ def test_dynamic_sequence_scope():
name: test rule
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- api: GetSystemTimeAsFileTime
@@ -137,12 +137,12 @@ def test_dynamic_sequence_scope():
r = capa.rules.Rule.from_yaml(rule)
ruleset = capa.rules.RuleSet([r])
matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in matches
assert 12 in get_call_ids(matches[r.name])
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in capabilities.matches
assert 12 in get_call_ids(capabilities.matches[r.name])
# show that when the sequence is only 5 calls long (for example), it doesn't match beyond that 5-tuple.
# show that when the span is only 5 calls long (for example), it doesn't match beyond that 5-tuple.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
@@ -155,7 +155,7 @@ def test_dynamic_sequence_scope():
# call 14: RtlAddVectoredExceptionHandler(1921490089, 0)
# call 15: GetSystemTime()
# call 16: NtAllocateVirtualMemory(no, 4, 786432, 4784128, 4294967295)
def test_dynamic_sequence_scope_length():
def test_dynamic_span_scope_length():
extractor = get_0000a657_thread3064()
rule = textwrap.dedent(
@@ -165,7 +165,7 @@ def test_dynamic_sequence_scope_length():
name: test rule
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- api: GetSystemTimeAsFileTime
@@ -176,24 +176,24 @@ def test_dynamic_sequence_scope_length():
r = capa.rules.Rule.from_yaml(rule)
ruleset = capa.rules.RuleSet([r])
# patch SEQUENCE_SIZE since we may use a much larger value in the real world.
# patch SPAN_SIZE since we may use a much larger value in the real world.
from pytest import MonkeyPatch
with MonkeyPatch.context() as m:
m.setattr(capa.capabilities.dynamic, "SEQUENCE_SIZE", 5)
m.setattr(capa.capabilities.dynamic, "SPAN_SIZE", 5)
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name not in capabilities.matches
# show that you can use a call subscope in sequence rules.
# show that you can use a call subscope in span-of-calls rules.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
# ...
# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll)
# ...
def test_dynamic_sequence_call_subscope():
def test_dynamic_span_call_subscope():
extractor = get_0000a657_thread3064()
rule = textwrap.dedent(
@@ -203,7 +203,7 @@ def test_dynamic_sequence_call_subscope():
name: test rule
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- call:
@@ -221,7 +221,7 @@ def test_dynamic_sequence_call_subscope():
assert 11 in get_call_ids(capabilities.matches[r.name])
# show that you can use a sequence subscope in sequence rules.
# show that you can use a span subscope in span rules.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
@@ -231,7 +231,7 @@ def test_dynamic_sequence_call_subscope():
# call 12: LdrGetDllHandle(1974337536, kernel32.dll)
# call 13: LdrGetProcedureAddress(2010595072, 0, RemoveVectoredExceptionHandler, 1974337536, kernel32.dll)
# ...
def test_dynamic_sequence_scope_sequence_subscope():
def test_dynamic_span_scope_span_subscope():
extractor = get_0000a657_thread3064()
rule = textwrap.dedent(
@@ -241,16 +241,16 @@ def test_dynamic_sequence_scope_sequence_subscope():
name: test rule
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- sequence:
- span of calls:
- description: resolve add VEH # should match at 11
- and:
- api: LdrGetDllHandle
- api: LdrGetProcedureAddress
- string: AddVectoredExceptionHandler
- sequence:
- span of calls:
- description: resolve remove VEH # should match at 13
- and:
- api: LdrGetDllHandle
@@ -267,8 +267,8 @@ def test_dynamic_sequence_scope_sequence_subscope():
assert 13 in get_call_ids(capabilities.matches[r.name])
# show that you can't use thread subscope in sequence rules.
def test_dynamic_sequence_scope_thread_subscope():
# show that you can't use thread subscope in span rules.
def test_dynamic_span_scope_thread_subscope():
rule = textwrap.dedent(
"""
rule:
@@ -276,7 +276,7 @@ def test_dynamic_sequence_scope_thread_subscope():
name: test rule
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- thread:
@@ -288,7 +288,7 @@ def test_dynamic_sequence_scope_thread_subscope():
capa.rules.Rule.from_yaml(rule)
# show how you might use a sequence rule: to match a small window for a collection of features.
# show how you might use a span-of-calls rule: to match a small window for a collection of features.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
@@ -297,7 +297,7 @@ def test_dynamic_sequence_scope_thread_subscope():
# call 12: ...
# call 13: ...
# call 14: RtlAddVectoredExceptionHandler(1921490089, 0)
def test_dynamic_sequence_example():
def test_dynamic_span_example():
extractor = get_0000a657_thread3064()
rule = textwrap.dedent(
@@ -307,7 +307,7 @@ def test_dynamic_sequence_example():
name: test rule
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- call:
@@ -325,12 +325,12 @@ def test_dynamic_sequence_example():
r = capa.rules.Rule.from_yaml(rule)
ruleset = capa.rules.RuleSet([r])
matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in matches
assert 14 in get_call_ids(matches[r.name])
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in capabilities.matches
assert 14 in get_call_ids(capabilities.matches[r.name])
# show how sequences that overlap a single event are handled.
# show how spans that overlap a single event are handled.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
@@ -342,7 +342,7 @@ def test_dynamic_sequence_example():
# call 14: ...
# call 15: ...
# ...
def test_dynamic_sequence_multiple_sequences_overlapping_single_event():
def test_dynamic_span_multiple_spans_overlapping_single_event():
extractor = get_0000a657_thread3064()
rule = textwrap.dedent(
@@ -352,7 +352,7 @@ def test_dynamic_sequence_multiple_sequences_overlapping_single_event():
name: test rule
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- call:
@@ -367,11 +367,11 @@ def test_dynamic_sequence_multiple_sequences_overlapping_single_event():
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in capabilities.matches
# we only match the first overlapping sequence
# we only match the first overlapping span
assert [11] == list(get_call_ids(capabilities.matches[r.name]))
# show that you can use match statements in sequence rules.
# show that you can use match statements in span-of-calls rules.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
@@ -381,7 +381,7 @@ def test_dynamic_sequence_multiple_sequences_overlapping_single_event():
# call 12: LdrGetDllHandle(1974337536, kernel32.dll)
# call 13: LdrGetProcedureAddress(2010595072, 0, RemoveVectoredExceptionHandler, 1974337536, kernel32.dll)
# ...
def test_dynamic_sequence_scope_match_statements():
def test_dynamic_span_scope_match_statements():
extractor = get_0000a657_thread3064()
ruleset = capa.rules.RuleSet(
@@ -395,7 +395,7 @@ def test_dynamic_sequence_scope_match_statements():
namespace: linking/runtime-linking/veh
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- api: LdrGetDllHandle
@@ -413,7 +413,7 @@ def test_dynamic_sequence_scope_match_statements():
namespace: linking/runtime-linking/veh
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- api: LdrGetDllHandle
@@ -430,7 +430,7 @@ def test_dynamic_sequence_scope_match_statements():
name: resolve add and remove VEH
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- match: resolve add VEH
@@ -446,7 +446,7 @@ def test_dynamic_sequence_scope_match_statements():
name: has VEH runtime linking
scopes:
static: unsupported
dynamic: sequence
dynamic: span of calls
features:
- and:
- match: linking/runtime-linking/veh

View File

@@ -129,7 +129,7 @@ def test_scope_to_pb2():
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.INSTRUCTION) == capa_pb2.SCOPE_INSTRUCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.PROCESS) == capa_pb2.SCOPE_PROCESS
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.THREAD) == capa_pb2.SCOPE_THREAD
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.SEQUENCE) == capa_pb2.SCOPE_SEQUENCE
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.SPAN_OF_CALLS) == capa_pb2.SCOPE_SPAN_OF_CALLS
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.CALL) == capa_pb2.SCOPE_CALL