mirror of
https://github.com/mandiant/capa.git
synced 2025-12-21 23:00:29 -08:00
Merge pull request #51 from fireeye/match-namespaces
engine, rules: support matching namespaces, not just rule names
This commit is contained in:
@@ -221,6 +221,9 @@ def topologically_order_rules(rules):
|
||||
|
||||
assumes that the rule dependency graph is a DAG.
|
||||
'''
|
||||
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
|
||||
rules = list(rules)
|
||||
namespaces = capa.rules.index_rules_by_namespace(rules)
|
||||
rules = {rule.name: rule for rule in rules}
|
||||
seen = set([])
|
||||
ret = []
|
||||
@@ -229,7 +232,7 @@ def topologically_order_rules(rules):
|
||||
if rule.name in seen:
|
||||
return
|
||||
|
||||
for dep in rule.get_dependencies():
|
||||
for dep in rule.get_dependencies(namespaces):
|
||||
rec(rules[dep])
|
||||
|
||||
ret.append(rule)
|
||||
@@ -267,4 +270,10 @@ def match(rules, features, va):
|
||||
results[rule.name].append((va, res))
|
||||
features[capa.features.MatchedRule(rule.name)].add(va)
|
||||
|
||||
namespace = rule.meta.get('namespace')
|
||||
if namespace:
|
||||
while namespace:
|
||||
features[capa.features.MatchedRule(namespace)].add(va)
|
||||
namespace, _, _ = namespace.rpartition('/')
|
||||
|
||||
return (features, results)
|
||||
|
||||
@@ -424,12 +424,16 @@ class Rule(object):
|
||||
def __repr__(self):
|
||||
return 'Rule(scope=%s, name=%s)' % (self.scope, self.name)
|
||||
|
||||
def get_dependencies(self):
|
||||
def get_dependencies(self, namespaces):
|
||||
'''
|
||||
fetch the names of rules this rule relies upon.
|
||||
these are only the direct dependencies; a user must
|
||||
compute the transitive dependency graph themself, if they want it.
|
||||
|
||||
Args:
|
||||
namespaces(Dict[str, List[Rule]]): mapping from namespace name to rules in it.
|
||||
see `index_rules_by_namespace`.
|
||||
|
||||
Returns:
|
||||
List[str]: names of rules upon which this rule depends.
|
||||
'''
|
||||
@@ -437,7 +441,20 @@ class Rule(object):
|
||||
|
||||
def rec(statement):
|
||||
if isinstance(statement, capa.features.MatchedRule):
|
||||
deps.add(statement.rule_name)
|
||||
# we're not sure at this point if the `statement.rule_name` is
|
||||
# really a rule name or a namespace name (we use `MatchedRule` for both cases).
|
||||
# we'll give precedence to namespaces, and then assume if that does work,
|
||||
# that it must be a rule name.
|
||||
#
|
||||
# we don't expect any collisions between namespaces and rule names, but its possible.
|
||||
# most likely would be collision between top level namespace (e.g. `host-interaction`) and rule name.
|
||||
# but, namespaces tend to use `-` while rule names use ` `. so, unlikely, but possible.
|
||||
if statement.rule_name in namespaces:
|
||||
# matches a namespace, so take precedence and don't even check rule names.
|
||||
deps.update(map(lambda r: r.name, namespaces[statement.rule_name]))
|
||||
else:
|
||||
# not a namespace, assume its a rule name.
|
||||
deps.add(statement.rule_name)
|
||||
|
||||
elif isinstance(statement, Statement):
|
||||
for child in statement.get_children():
|
||||
@@ -635,12 +652,15 @@ def get_rules_and_dependencies(rules, rule_name):
|
||||
yields:
|
||||
Rule:
|
||||
'''
|
||||
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
|
||||
rules = list(rules)
|
||||
namespaces = index_rules_by_namespace(rules)
|
||||
rules = {rule.name: rule for rule in rules}
|
||||
wanted = set([rule_name])
|
||||
|
||||
def rec(rule):
|
||||
wanted.add(rule.name)
|
||||
for dep in rule.get_dependencies():
|
||||
for dep in rule.get_dependencies(namespaces):
|
||||
rec(rules[dep])
|
||||
|
||||
rec(rules[rule_name])
|
||||
@@ -665,13 +685,50 @@ def ensure_rule_dependencies_are_met(rules):
|
||||
raises:
|
||||
InvalidRule: if a dependency is not met.
|
||||
'''
|
||||
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
|
||||
rules = list(rules)
|
||||
namespaces = index_rules_by_namespace(rules)
|
||||
rules = {rule.name: rule for rule in rules}
|
||||
for rule in rules.values():
|
||||
for dep in rule.get_dependencies():
|
||||
for dep in rule.get_dependencies(namespaces):
|
||||
if dep not in rules:
|
||||
raise InvalidRule('rule "%s" depends on missing rule "%s"' % (rule.name, dep))
|
||||
|
||||
|
||||
def index_rules_by_namespace(rules):
|
||||
'''
|
||||
compute the rules that fit into each namespace found within the given rules.
|
||||
|
||||
for example, given:
|
||||
|
||||
- c2/shell :: create reverse shell
|
||||
- c2/file-transfer :: download and write a file
|
||||
|
||||
return the index:
|
||||
|
||||
c2/shell: [create reverse shell]
|
||||
c2/file-transfer: [download and write a file]
|
||||
c2: [create reverse shell, download and write a file]
|
||||
|
||||
Args:
|
||||
rules (List[Rule]):
|
||||
|
||||
Returns: Dict[str, List[Rule]]
|
||||
'''
|
||||
namespaces = collections.defaultdict(list)
|
||||
|
||||
for rule in rules:
|
||||
namespace = rule.meta.get('namespace')
|
||||
if not namespace:
|
||||
continue
|
||||
|
||||
while namespace:
|
||||
namespaces[namespace].append(rule)
|
||||
namespace, _, _ = namespace.rpartition('/')
|
||||
|
||||
return dict(namespaces)
|
||||
|
||||
|
||||
class RuleSet(object):
|
||||
'''
|
||||
a ruleset is initialized with a collection of rules, which it verifies and sorts into scopes.
|
||||
|
||||
@@ -43,8 +43,10 @@ def test_some():
|
||||
assert Some(2, Number(1), Number(2), Number(3)).evaluate({Number(0): {1}}) == False
|
||||
assert Some(2, Number(1), Number(2), Number(3)).evaluate({Number(0): {1}, Number(1): {1}}) == False
|
||||
assert Some(2, Number(1), Number(2), Number(3)).evaluate({Number(0): {1}, Number(1): {1}, Number(2): {1}}) == True
|
||||
assert Some(2, Number(1), Number(2), Number(3)).evaluate({Number(0): {1}, Number(1): {1}, Number(2): {1}, Number(3): {1}}) == True
|
||||
assert Some(2, Number(1), Number(2), Number(3)).evaluate({Number(0): {1}, Number(1): {1}, Number(2): {1}, Number(3): {1}, Number(4): {1}}) == True
|
||||
assert Some(2, Number(1), Number(2), Number(3)).evaluate(
|
||||
{Number(0): {1}, Number(1): {1}, Number(2): {1}, Number(3): {1}}) == True
|
||||
assert Some(2, Number(1), Number(2), Number(3)).evaluate(
|
||||
{Number(0): {1}, Number(1): {1}, Number(2): {1}, Number(3): {1}, Number(4): {1}}) == True
|
||||
|
||||
|
||||
def test_complex():
|
||||
@@ -118,7 +120,7 @@ def test_match_matched_rules():
|
||||
features:
|
||||
- number: 100
|
||||
''')),
|
||||
capa.rules.Rule.from_yaml(textwrap.dedent('''
|
||||
capa.rules.Rule.from_yaml(textwrap.dedent('''
|
||||
rule:
|
||||
meta:
|
||||
name: test rule2
|
||||
@@ -126,15 +128,16 @@ def test_match_matched_rules():
|
||||
- match: test rule1
|
||||
''')),
|
||||
]
|
||||
|
||||
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.insn.Number(100): {1}}, 0x0)
|
||||
{capa.features.insn.Number(100): {1}}, 0x0)
|
||||
assert capa.features.MatchedRule('test rule1') in features
|
||||
assert capa.features.MatchedRule('test rule2') in features
|
||||
|
||||
# the ordering of the rules must not matter,
|
||||
# the engine should match rules in an appropriate order.
|
||||
features, matches = capa.engine.match(capa.engine.topologically_order_rules(reversed(rules)),
|
||||
{capa.features.insn.Number(100): {1}}, 0x0)
|
||||
{capa.features.insn.Number(100): {1}}, 0x0)
|
||||
assert capa.features.MatchedRule('test rule1') in features
|
||||
assert capa.features.MatchedRule('test rule2') in features
|
||||
|
||||
@@ -167,11 +170,11 @@ def test_regex():
|
||||
''')),
|
||||
]
|
||||
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.insn.Number(100): {1}}, 0x0)
|
||||
{capa.features.insn.Number(100): {1}}, 0x0)
|
||||
assert capa.features.MatchedRule('test rule') not in features
|
||||
|
||||
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.String('aaaa'): {1}}, 0x0)
|
||||
{capa.features.String('aaaa'): {1}}, 0x0)
|
||||
assert capa.features.MatchedRule('test rule') not in features
|
||||
|
||||
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
|
||||
@@ -179,7 +182,7 @@ def test_regex():
|
||||
assert capa.features.MatchedRule('test rule') not in features
|
||||
|
||||
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.String('abbbba'): {1}}, 0x0)
|
||||
{capa.features.String('abbbba'): {1}}, 0x0)
|
||||
assert capa.features.MatchedRule('test rule') in features
|
||||
assert capa.features.MatchedRule('rule with implied wildcards') in features
|
||||
assert capa.features.MatchedRule('rule with anchor') not in features
|
||||
@@ -213,5 +216,57 @@ def test_regex_complex():
|
||||
''')),
|
||||
]
|
||||
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.String(r'Hardware\Key\key with spaces\some value'): {1}}, 0x0)
|
||||
{capa.features.String(r'Hardware\Key\key with spaces\some value'): {1}}, 0x0)
|
||||
assert capa.features.MatchedRule('test rule') in features
|
||||
|
||||
|
||||
def test_match_namespace():
|
||||
rules = [
|
||||
capa.rules.Rule.from_yaml(textwrap.dedent('''
|
||||
rule:
|
||||
meta:
|
||||
name: CreateFile API
|
||||
namespace: file/create/CreateFile
|
||||
features:
|
||||
- api: CreateFile
|
||||
''')),
|
||||
capa.rules.Rule.from_yaml(textwrap.dedent('''
|
||||
rule:
|
||||
meta:
|
||||
name: WriteFile API
|
||||
namespace: file/write
|
||||
features:
|
||||
- api: WriteFile
|
||||
''')),
|
||||
capa.rules.Rule.from_yaml(textwrap.dedent('''
|
||||
rule:
|
||||
meta:
|
||||
name: file-create
|
||||
features:
|
||||
- match: file/create
|
||||
''')),
|
||||
capa.rules.Rule.from_yaml(textwrap.dedent('''
|
||||
rule:
|
||||
meta:
|
||||
name: filesystem-any
|
||||
features:
|
||||
- match: file
|
||||
''')),
|
||||
]
|
||||
|
||||
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.insn.API('CreateFile'): {1}},
|
||||
0x0)
|
||||
assert 'CreateFile API' in matches
|
||||
assert 'file-create' in matches
|
||||
assert 'filesystem-any' in matches
|
||||
assert capa.features.MatchedRule('file') in features
|
||||
assert capa.features.MatchedRule('file/create') in features
|
||||
assert capa.features.MatchedRule('file/create/CreateFile') in features
|
||||
|
||||
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
|
||||
{capa.features.insn.API('WriteFile'): {1}},
|
||||
0x0)
|
||||
assert 'WriteFile API' in matches
|
||||
assert 'file-create' not in matches
|
||||
assert 'filesystem-any' in matches
|
||||
|
||||
@@ -451,3 +451,49 @@ def test_filter_rules_missing_dependency():
|
||||
- match: rule 2
|
||||
''')),
|
||||
])
|
||||
|
||||
|
||||
def test_rules_namespace_dependencies():
|
||||
rules = [
|
||||
capa.rules.Rule.from_yaml(textwrap.dedent('''
|
||||
rule:
|
||||
meta:
|
||||
name: rule 1
|
||||
namespace: ns1/nsA
|
||||
features:
|
||||
- api: CreateFile
|
||||
''')),
|
||||
capa.rules.Rule.from_yaml(textwrap.dedent('''
|
||||
rule:
|
||||
meta:
|
||||
name: rule 2
|
||||
namespace: ns1/nsB
|
||||
features:
|
||||
- api: CreateFile
|
||||
''')),
|
||||
capa.rules.Rule.from_yaml(textwrap.dedent('''
|
||||
rule:
|
||||
meta:
|
||||
name: rule 3
|
||||
features:
|
||||
- match: ns1/nsA
|
||||
''')),
|
||||
capa.rules.Rule.from_yaml(textwrap.dedent('''
|
||||
rule:
|
||||
meta:
|
||||
name: rule 4
|
||||
features:
|
||||
- match: ns1
|
||||
''')),
|
||||
]
|
||||
|
||||
r3 = set(map(lambda r: r.name, capa.rules.get_rules_and_dependencies(rules, 'rule 3')))
|
||||
assert 'rule 1' in r3
|
||||
assert 'rule 2' not in r3
|
||||
assert 'rule 4' not in r3
|
||||
|
||||
r4 = set(map(lambda r: r.name, capa.rules.get_rules_and_dependencies(rules, 'rule 4')))
|
||||
assert 'rule 1' in r4
|
||||
assert 'rule 2' in r4
|
||||
assert 'rule 3' not in r4
|
||||
|
||||
|
||||
Reference in New Issue
Block a user