rules: use Scope enum instead of constants

This commit is contained in:
Willi Ballenthin
2023-08-25 12:54:57 +00:00
parent 164b08276c
commit a734358377
8 changed files with 95 additions and 106 deletions

View File

@@ -500,13 +500,13 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
location = location_.to_capa()
parent2: CapaExplorerDataItem
if capa.rules.FILE_SCOPE in rule.meta.scopes:
if capa.rules.Scope.FILE in rule.meta.scopes:
parent2 = parent
elif capa.rules.FUNCTION_SCOPE in rule.meta.scopes:
elif capa.rules.Scope.FUNCTION in rule.meta.scopes:
parent2 = CapaExplorerFunctionItem(parent, location)
elif capa.rules.BASIC_BLOCK_SCOPE in rule.meta.scopes:
elif capa.rules.Scope.BASIC_BLOCK in rule.meta.scopes:
parent2 = CapaExplorerBlockItem(parent, location)
elif capa.rules.INSTRUCTION_SCOPE in rule.meta.scopes:
elif capa.rules.Scope.INSTRUCTION in rule.meta.scopes:
parent2 = CapaExplorerInstructionItem(parent, location)
else:
raise RuntimeError("unexpected rule scope: " + str(rule.meta.scopes.static))

View File

@@ -1056,7 +1056,7 @@ def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabiliti
matched_threads = set()
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if capa.rules.THREAD_SCOPE in rule.scopes:
if capa.rules.Scope.THREAD in rule.scopes:
for addr, _ in matches:
assert addr in processes_by_thread
matched_threads.add(addr)
@@ -1099,7 +1099,7 @@ def compute_static_layout(rules, extractor: StaticFeatureExtractor, capabilities
matched_bbs = set()
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if capa.rules.BASIC_BLOCK_SCOPE in rule.scopes:
if capa.rules.Scope.BASIC_BLOCK in rule.scopes:
for addr, _ in matches:
assert addr in functions_by_bb
matched_bbs.add(addr)

View File

@@ -214,7 +214,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
rows.append((key, v))
if capa.rules.FILE_SCOPE not in rule.meta.scopes:
if capa.rules.Scope.FILE not in rule.meta.scopes:
locations = [m[0] for m in doc.rules[rule.meta.name].matches]
rows.append(("matches", "\n".join(map(format_address, locations))))

View File

@@ -357,7 +357,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
if capa.rules.FILE_SCOPE in rule.meta.scopes:
if capa.rules.Scope.FILE in rule.meta.scopes:
matches = doc.rules[rule.meta.name].matches
if len(matches) != 1:
# i think there should only ever be one match per file-scope rule,
@@ -379,13 +379,13 @@ def render_rules(ostream, doc: rd.ResultDocument):
ostream.write(" @ ")
ostream.write(capa.render.verbose.format_address(location))
if capa.rules.BASIC_BLOCK_SCOPE in rule.meta.scopes:
if capa.rules.Scope.BASIC_BLOCK in rule.meta.scopes:
ostream.write(
" in function "
+ capa.render.verbose.format_address(frz.Address.from_capa(functions_by_bb[location.to_capa()]))
)
if capa.rules.THREAD_SCOPE in rule.meta.scopes:
if capa.rules.Scope.THREAD in rule.meta.scopes:
ostream.write(
" in process "
+ capa.render.verbose.format_address(

View File

@@ -82,46 +82,37 @@ class Scope(str, Enum):
BASIC_BLOCK = "basic block"
INSTRUCTION = "instruction"
FILE_SCOPE = Scope.FILE.value
PROCESS_SCOPE = Scope.PROCESS.value
THREAD_SCOPE = Scope.THREAD.value
CALL_SCOPE = Scope.CALL.value
FUNCTION_SCOPE = Scope.FUNCTION.value
BASIC_BLOCK_SCOPE = Scope.BASIC_BLOCK.value
INSTRUCTION_SCOPE = Scope.INSTRUCTION.value
# used only to specify supported features per scope.
# not used to validate rules.
GLOBAL_SCOPE = "global"
GLOBAL = "global"
# these literals are used to check if the flavor
# of a rule is correct.
STATIC_SCOPES = {
FILE_SCOPE,
GLOBAL_SCOPE,
FUNCTION_SCOPE,
BASIC_BLOCK_SCOPE,
INSTRUCTION_SCOPE,
Scope.FILE,
Scope.GLOBAL,
Scope.FUNCTION,
Scope.BASIC_BLOCK,
Scope.INSTRUCTION,
}
DYNAMIC_SCOPES = {
FILE_SCOPE,
GLOBAL_SCOPE,
PROCESS_SCOPE,
THREAD_SCOPE,
CALL_SCOPE,
Scope.FILE,
Scope.GLOBAL,
Scope.PROCESS,
Scope.THREAD,
Scope.CALL,
}
@dataclass
class Scopes:
# when None, the scope is not supported by a rule
static: Optional[str] = None
static: Optional[Scope] = None
# when None, the scope is not supported by a rule
dynamic: Optional[str] = None
dynamic: Optional[Scope] = None
def __contains__(self, scope: Union[Scope, str]) -> bool:
assert isinstance(scope, (Scope, str))
def __contains__(self, scope: Scope) -> bool:
return (scope == self.static) or (scope == self.dynamic)
def __repr__(self) -> str:
@@ -135,56 +126,55 @@ class Scopes:
raise ValueError("invalid rules class. at least one scope must be specified")
@classmethod
def from_dict(self, scopes: Dict) -> "Scopes":
assert isinstance(scopes, dict)
# make local copy so we don't make changes outside of this routine
scopes = dict(scopes)
def from_dict(self, scopes: Dict[str, str]) -> "Scopes":
# make local copy so we don't make changes outside of this routine.
# we'll use the value None to indicate the scope is not supported.
scopes_: Dict[str, Optional[str]] = dict(scopes)
# mark non-specified scopes as invalid
if "static" not in scopes:
if "static" not in scopes_:
raise InvalidRule("static scope must be provided")
if "dynamic" not in scopes:
if "dynamic" not in scopes_:
raise InvalidRule("dynamic scope must be provided")
# check the syntax of the meta `scopes` field
if sorted(scopes) != ["dynamic", "static"]:
if sorted(scopes_) != ["dynamic", "static"]:
raise InvalidRule("scope flavors can be either static or dynamic")
if scopes["static"] == "unsupported":
scopes["static"] = None
if scopes["dynamic"] == "unsupported":
scopes["dynamic"] = None
if scopes_["static"] == "unsupported":
scopes_["static"] = None
if scopes_["dynamic"] == "unsupported":
scopes_["dynamic"] = None
# unspecified is used to indicate a rule is yet to be migrated.
# TODO(williballenthin): this scope term should be removed once all rules have been migrated.
# https://github.com/mandiant/capa/issues/1747
if scopes["static"] == "unspecified":
scopes["static"] = None
if scopes["dynamic"] == "unspecified":
scopes["dynamic"] = None
if scopes_["static"] == "unspecified":
scopes_["static"] = None
if scopes_["dynamic"] == "unspecified":
scopes_["dynamic"] = None
if (not scopes["static"]) and (not scopes["dynamic"]):
if (not scopes_["static"]) and (not scopes_["dynamic"]):
raise InvalidRule("invalid scopes value. At least one scope must be specified")
# check that all the specified scopes are valid
if scopes["static"] and scopes["static"] not in STATIC_SCOPES:
raise InvalidRule(f"{scopes['static']} is not a valid static scope")
if scopes_["static"] and scopes_["static"] not in STATIC_SCOPES:
raise InvalidRule(f"{scopes_['static']} is not a valid static scope")
if scopes["dynamic"] and scopes["dynamic"] not in DYNAMIC_SCOPES:
raise InvalidRule(f"{scopes['dynamic']} is not a valid dynamic scope")
if scopes_["dynamic"] and scopes_["dynamic"] not in DYNAMIC_SCOPES:
raise InvalidRule(f"{scopes_['dynamic']} is not a valid dynamic scope")
return Scopes(static=scopes["static"], dynamic=scopes["dynamic"])
return Scopes(static=Scope(scopes_["static"]), dynamic=Scope(scopes_["dynamic"]))
SUPPORTED_FEATURES: Dict[str, Set] = {
GLOBAL_SCOPE: {
Scope.GLOBAL: {
# these will be added to other scopes, see below.
capa.features.common.OS,
capa.features.common.Arch,
capa.features.common.Format,
},
FILE_SCOPE: {
Scope.FILE: {
capa.features.common.MatchedRule,
capa.features.file.Export,
capa.features.file.Import,
@@ -197,11 +187,11 @@ SUPPORTED_FEATURES: Dict[str, Set] = {
capa.features.common.Characteristic("mixed mode"),
capa.features.common.Characteristic("forwarded export"),
},
PROCESS_SCOPE: {
Scope.PROCESS: {
capa.features.common.MatchedRule,
},
THREAD_SCOPE: set(),
CALL_SCOPE: {
Scope.THREAD: set(),
Scope.CALL: {
capa.features.common.MatchedRule,
capa.features.common.Regex,
capa.features.common.String,
@@ -209,7 +199,7 @@ SUPPORTED_FEATURES: Dict[str, Set] = {
capa.features.insn.API,
capa.features.insn.Number,
},
FUNCTION_SCOPE: {
Scope.FUNCTION: {
capa.features.common.MatchedRule,
capa.features.basicblock.BasicBlock,
capa.features.common.Characteristic("calls from"),
@@ -218,13 +208,13 @@ SUPPORTED_FEATURES: Dict[str, Set] = {
capa.features.common.Characteristic("recursive call"),
# plus basic block scope features, see below
},
BASIC_BLOCK_SCOPE: {
Scope.BASIC_BLOCK: {
capa.features.common.MatchedRule,
capa.features.common.Characteristic("tight loop"),
capa.features.common.Characteristic("stack string"),
# plus instruction scope features, see below
},
INSTRUCTION_SCOPE: {
Scope.INSTRUCTION: {
capa.features.common.MatchedRule,
capa.features.insn.API,
capa.features.insn.Property,
@@ -249,24 +239,24 @@ SUPPORTED_FEATURES: Dict[str, Set] = {
}
# global scope features are available in all other scopes
SUPPORTED_FEATURES[INSTRUCTION_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
SUPPORTED_FEATURES[FUNCTION_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
SUPPORTED_FEATURES[FILE_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
SUPPORTED_FEATURES[PROCESS_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
SUPPORTED_FEATURES[THREAD_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
SUPPORTED_FEATURES[CALL_SCOPE].update(SUPPORTED_FEATURES[GLOBAL_SCOPE])
SUPPORTED_FEATURES[Scope.INSTRUCTION].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.BASIC_BLOCK].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.FUNCTION].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.FILE].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.PROCESS].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.THREAD].update(SUPPORTED_FEATURES[Scope.GLOBAL])
SUPPORTED_FEATURES[Scope.CALL].update(SUPPORTED_FEATURES[Scope.GLOBAL])
# all call scope features are also thread features
SUPPORTED_FEATURES[THREAD_SCOPE].update(SUPPORTED_FEATURES[CALL_SCOPE])
SUPPORTED_FEATURES[Scope.THREAD].update(SUPPORTED_FEATURES[Scope.CALL])
# all thread scope features are also process features
SUPPORTED_FEATURES[PROCESS_SCOPE].update(SUPPORTED_FEATURES[THREAD_SCOPE])
SUPPORTED_FEATURES[Scope.PROCESS].update(SUPPORTED_FEATURES[Scope.THREAD])
# all instruction scope features are also basic block features
SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE].update(SUPPORTED_FEATURES[INSTRUCTION_SCOPE])
SUPPORTED_FEATURES[Scope.BASIC_BLOCK].update(SUPPORTED_FEATURES[Scope.INSTRUCTION])
# all basic block scope features are also function scope features
SUPPORTED_FEATURES[FUNCTION_SCOPE].update(SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE])
SUPPORTED_FEATURES[Scope.FUNCTION].update(SUPPORTED_FEATURES[Scope.BASIC_BLOCK])
class InvalidRule(ValueError):
@@ -558,66 +548,66 @@ def build_statements(d, scopes: Scopes):
return ceng.Some(0, [build_statements(dd, scopes) for dd in d[key]], description=description)
elif key == "process":
if FILE_SCOPE not in scopes:
if Scope.FILE not in scopes:
raise InvalidRule("process subscope supported only for file scope")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope(
PROCESS_SCOPE, build_statements(d[key][0], Scopes(dynamic=PROCESS_SCOPE)), description=description
Scope.PROCESS, build_statements(d[key][0], Scopes(dynamic=Scope.PROCESS)), description=description
)
elif key == "thread":
if all(s not in scopes for s in (FILE_SCOPE, PROCESS_SCOPE)):
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS)):
raise InvalidRule("thread subscope supported only for the process scope")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope(
THREAD_SCOPE, build_statements(d[key][0], Scopes(dynamic=THREAD_SCOPE)), description=description
Scope.THREAD, build_statements(d[key][0], Scopes(dynamic=Scope.THREAD)), description=description
)
elif key == "call":
if all(s not in scopes for s in (FILE_SCOPE, PROCESS_SCOPE, THREAD_SCOPE)):
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD)):
raise InvalidRule("call subscope supported only for the process and thread scopes")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope(
CALL_SCOPE, build_statements(d[key][0], Scopes(dynamic=CALL_SCOPE)), description=description
Scope.CALL, build_statements(d[key][0], Scopes(dynamic=Scope.CALL)), description=description
)
elif key == "function":
if FILE_SCOPE not in scopes:
if Scope.FILE not in scopes:
raise InvalidRule("function subscope supported only for file scope")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope(
FUNCTION_SCOPE, build_statements(d[key][0], Scopes(static=FUNCTION_SCOPE)), description=description
Scope.FUNCTION, build_statements(d[key][0], Scopes(static=Scope.FUNCTION)), description=description
)
elif key == "basic block":
if FUNCTION_SCOPE not in scopes:
if Scope.FUNCTION not in scopes:
raise InvalidRule("basic block subscope supported only for function scope")
if len(d[key]) != 1:
raise InvalidRule("subscope must have exactly one child statement")
return ceng.Subscope(
BASIC_BLOCK_SCOPE, build_statements(d[key][0], Scopes(static=BASIC_BLOCK_SCOPE)), description=description
Scope.BASIC_BLOCK, build_statements(d[key][0], Scopes(static=Scope.BASIC_BLOCK)), description=description
)
elif key == "instruction":
if all(s not in scopes for s in (FUNCTION_SCOPE, BASIC_BLOCK_SCOPE)):
if all(s not in scopes for s in (Scope.FUNCTION, Scope.BASIC_BLOCK)):
raise InvalidRule("instruction subscope supported only for function and basic block scope")
if len(d[key]) == 1:
statements = build_statements(d[key][0], Scopes(static=INSTRUCTION_SCOPE))
statements = build_statements(d[key][0], Scopes(static=Scope.INSTRUCTION))
else:
# for instruction subscopes, we support a shorthand in which the top level AND is implied.
# the following are equivalent:
@@ -631,9 +621,9 @@ def build_statements(d, scopes: Scopes):
# - arch: i386
# - mnemonic: cmp
#
statements = ceng.And([build_statements(dd, Scopes(static=INSTRUCTION_SCOPE)) for dd in d[key]])
statements = ceng.And([build_statements(dd, Scopes(static=Scope.INSTRUCTION)) for dd in d[key]])
return ceng.Subscope(INSTRUCTION_SCOPE, statements, description=description)
return ceng.Subscope(Scope.INSTRUCTION, statements, description=description)
elif key.startswith("count(") and key.endswith(")"):
# e.g.:
@@ -1140,10 +1130,9 @@ class Rule:
return doc
def get_rules_with_scope(rules, scope) -> List[Rule]:
def get_rules_with_scope(rules, scope: Scope) -> List[Rule]:
"""
from the given collection of rules, select those with the given scope.
`scope` is one of the capa.rules.*_SCOPE constants.
"""
return [rule for rule in rules if scope in rule.scopes]
@@ -1295,13 +1284,13 @@ class RuleSet:
rules = capa.optimizer.optimize_rules(rules)
self.file_rules = self._get_rules_for_scope(rules, FILE_SCOPE)
self.process_rules = self._get_rules_for_scope(rules, PROCESS_SCOPE)
self.thread_rules = self._get_rules_for_scope(rules, THREAD_SCOPE)
self.call_rules = self._get_rules_for_scope(rules, CALL_SCOPE)
self.function_rules = self._get_rules_for_scope(rules, FUNCTION_SCOPE)
self.basic_block_rules = self._get_rules_for_scope(rules, BASIC_BLOCK_SCOPE)
self.instruction_rules = self._get_rules_for_scope(rules, INSTRUCTION_SCOPE)
self.file_rules = self._get_rules_for_scope(rules, Scope.FILE)
self.process_rules = self._get_rules_for_scope(rules, Scope.PROCESS)
self.thread_rules = self._get_rules_for_scope(rules, Scope.THREAD)
self.call_rules = self._get_rules_for_scope(rules, Scope.CALL)
self.function_rules = self._get_rules_for_scope(rules, Scope.FUNCTION)
self.basic_block_rules = self._get_rules_for_scope(rules, Scope.BASIC_BLOCK)
self.instruction_rules = self._get_rules_for_scope(rules, Scope.INSTRUCTION)
self.rules = {rule.name: rule for rule in rules}
self.rules_by_namespace = index_rules_by_namespace(rules)

View File

@@ -115,10 +115,10 @@ def render_matches_by_function(doc: rd.ResultDocument):
matches_by_function = collections.defaultdict(set)
for rule in rutils.capability_rules(doc):
if capa.rules.FUNCTION_SCOPE in rule.meta.scopes:
if capa.rules.Scope.FUNCTION in rule.meta.scopes:
for addr, _ in rule.matches:
matches_by_function[addr].add(rule.meta.name)
elif capa.rules.BASIC_BLOCK_SCOPE in rule.meta.scopes:
elif capa.rules.Scope.BASIC_BLOCK in rule.meta.scopes:
for addr, _ in rule.matches:
function = functions_by_bb[addr]
matches_by_function[function].add(rule.meta.name)

View File

@@ -116,10 +116,10 @@ def test_addr_to_pb2():
def test_scope_to_pb2():
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.FILE_SCOPE)) == capa_pb2.SCOPE_FILE
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.FUNCTION_SCOPE)) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.BASIC_BLOCK_SCOPE)) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.INSTRUCTION_SCOPE)) == capa_pb2.SCOPE_INSTRUCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.FILE)) == capa_pb2.SCOPE_FILE
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.FUNCTION)) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.BASIC_BLOCK)) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.scope_to_pb2(capa.rules.Scope(capa.rules.Scope.INSTRUCTION)) == capa_pb2.SCOPE_INSTRUCTION
def cmp_optional(a: Any, b: Any) -> bool:

View File

@@ -40,7 +40,7 @@ ADDR4 = capa.features.address.AbsoluteVirtualAddress(0x401004)
def test_rule_ctor():
r = capa.rules.Rule(
"test rule", capa.rules.Scopes(capa.rules.FUNCTION_SCOPE, capa.rules.FILE_SCOPE), Or([Number(1)]), {}
"test rule", capa.rules.Scopes(capa.rules.Scope.FUNCTION, capa.rules.Scope.FILE), Or([Number(1)]), {}
)
assert bool(r.evaluate({Number(0): {ADDR1}})) is False
assert bool(r.evaluate({Number(1): {ADDR2}})) is True