mirror of
https://github.com/mandiant/capa.git
synced 2026-01-14 05:46:41 -08:00
Merge branch 'master' into perf/rule-selection
This commit is contained in:
118
capa/engine.py
118
capa/engine.py
@@ -46,9 +46,12 @@ class Statement:
|
||||
def __repr__(self):
|
||||
return str(self)
|
||||
|
||||
def evaluate(self, features: FeatureSet) -> Result:
|
||||
def evaluate(self, features: FeatureSet, short_circuit=True) -> Result:
|
||||
"""
|
||||
classes that inherit `Statement` must implement `evaluate`
|
||||
|
||||
args:
|
||||
short_circuit (bool): if true, then statements like and/or/some may short circuit.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@@ -73,35 +76,69 @@ class Statement:
|
||||
|
||||
|
||||
class And(Statement):
|
||||
"""match if all of the children evaluate to True."""
|
||||
"""
|
||||
match if all of the children evaluate to True.
|
||||
|
||||
the order of evaluation is dictated by the property
|
||||
`And.children` (type: List[Statement|Feature]).
|
||||
a query optimizer may safely manipulate the order of these children.
|
||||
"""
|
||||
|
||||
def __init__(self, children, description=None):
|
||||
super(And, self).__init__(description=description)
|
||||
self.children = children
|
||||
|
||||
def evaluate(self, ctx):
|
||||
def evaluate(self, ctx, short_circuit=True):
|
||||
capa.perf.counters["evaluate.feature"] += 1
|
||||
capa.perf.counters["evaluate.feature.and"] += 1
|
||||
|
||||
results = [child.evaluate(ctx) for child in self.children]
|
||||
success = all(results)
|
||||
return Result(success, self, results)
|
||||
if short_circuit:
|
||||
results = []
|
||||
for child in self.children:
|
||||
result = child.evaluate(ctx, short_circuit=short_circuit)
|
||||
results.append(result)
|
||||
if not result:
|
||||
# short circuit
|
||||
return Result(False, self, results)
|
||||
|
||||
return Result(True, self, results)
|
||||
else:
|
||||
results = [child.evaluate(ctx, short_circuit=short_circuit) for child in self.children]
|
||||
success = all(results)
|
||||
return Result(success, self, results)
|
||||
|
||||
|
||||
class Or(Statement):
|
||||
"""match if any of the children evaluate to True."""
|
||||
"""
|
||||
match if any of the children evaluate to True.
|
||||
|
||||
the order of evaluation is dictated by the property
|
||||
`Or.children` (type: List[Statement|Feature]).
|
||||
a query optimizer may safely manipulate the order of these children.
|
||||
"""
|
||||
|
||||
def __init__(self, children, description=None):
|
||||
super(Or, self).__init__(description=description)
|
||||
self.children = children
|
||||
|
||||
def evaluate(self, ctx):
|
||||
def evaluate(self, ctx, short_circuit=True):
|
||||
capa.perf.counters["evaluate.feature"] += 1
|
||||
capa.perf.counters["evaluate.feature.or"] += 1
|
||||
|
||||
results = [child.evaluate(ctx) for child in self.children]
|
||||
success = any(results)
|
||||
return Result(success, self, results)
|
||||
if short_circuit:
|
||||
results = []
|
||||
for child in self.children:
|
||||
result = child.evaluate(ctx, short_circuit=short_circuit)
|
||||
results.append(result)
|
||||
if result:
|
||||
# short circuit as soon as we hit one match
|
||||
return Result(True, self, results)
|
||||
|
||||
return Result(False, self, results)
|
||||
else:
|
||||
results = [child.evaluate(ctx, short_circuit=short_circuit) for child in self.children]
|
||||
success = any(results)
|
||||
return Result(success, self, results)
|
||||
|
||||
|
||||
class Not(Statement):
|
||||
@@ -111,34 +148,55 @@ class Not(Statement):
|
||||
super(Not, self).__init__(description=description)
|
||||
self.child = child
|
||||
|
||||
def evaluate(self, ctx):
|
||||
def evaluate(self, ctx, short_circuit=True):
|
||||
capa.perf.counters["evaluate.feature"] += 1
|
||||
capa.perf.counters["evaluate.feature.not"] += 1
|
||||
|
||||
results = [self.child.evaluate(ctx)]
|
||||
results = [self.child.evaluate(ctx, short_circuit=short_circuit)]
|
||||
success = not results[0]
|
||||
return Result(success, self, results)
|
||||
|
||||
|
||||
class Some(Statement):
|
||||
"""match if at least N of the children evaluate to True."""
|
||||
"""
|
||||
match if at least N of the children evaluate to True.
|
||||
|
||||
the order of evaluation is dictated by the property
|
||||
`Some.children` (type: List[Statement|Feature]).
|
||||
a query optimizer may safely manipulate the order of these children.
|
||||
"""
|
||||
|
||||
def __init__(self, count, children, description=None):
|
||||
super(Some, self).__init__(description=description)
|
||||
self.count = count
|
||||
self.children = children
|
||||
|
||||
def evaluate(self, ctx):
|
||||
def evaluate(self, ctx, short_circuit=True):
|
||||
capa.perf.counters["evaluate.feature"] += 1
|
||||
capa.perf.counters["evaluate.feature.some"] += 1
|
||||
|
||||
results = [child.evaluate(ctx) for child in self.children]
|
||||
# note that here we cast the child result as a bool
|
||||
# because we've overridden `__bool__` above.
|
||||
#
|
||||
# we can't use `if child is True` because the instance is not True.
|
||||
success = sum([1 for child in results if bool(child) is True]) >= self.count
|
||||
return Result(success, self, results)
|
||||
if short_circuit:
|
||||
results = []
|
||||
satisfied_children_count = 0
|
||||
for child in self.children:
|
||||
result = child.evaluate(ctx, short_circuit=short_circuit)
|
||||
results.append(result)
|
||||
if result:
|
||||
satisfied_children_count += 1
|
||||
|
||||
if satisfied_children_count >= self.count:
|
||||
# short circuit as soon as we hit the threshold
|
||||
return Result(True, self, results)
|
||||
|
||||
return Result(False, self, results)
|
||||
else:
|
||||
results = [child.evaluate(ctx, short_circuit=short_circuit) for child in self.children]
|
||||
# note that here we cast the child result as a bool
|
||||
# because we've overridden `__bool__` above.
|
||||
#
|
||||
# we can't use `if child is True` because the instance is not True.
|
||||
success = sum([1 for child in results if bool(child) is True]) >= self.count
|
||||
return Result(success, self, results)
|
||||
|
||||
|
||||
class Range(Statement):
|
||||
@@ -150,7 +208,7 @@ class Range(Statement):
|
||||
self.min = min if min is not None else 0
|
||||
self.max = max if max is not None else (1 << 64 - 1)
|
||||
|
||||
def evaluate(self, ctx):
|
||||
def evaluate(self, ctx, **kwargs):
|
||||
capa.perf.counters["evaluate.feature"] += 1
|
||||
capa.perf.counters["evaluate.feature.range"] += 1
|
||||
|
||||
@@ -178,7 +236,7 @@ class Subscope(Statement):
|
||||
self.scope = scope
|
||||
self.child = child
|
||||
|
||||
def evaluate(self, ctx):
|
||||
def evaluate(self, ctx, **kwargs):
|
||||
raise ValueError("cannot evaluate a subscope directly!")
|
||||
|
||||
|
||||
@@ -241,8 +299,18 @@ def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tupl
|
||||
features = collections.defaultdict(set, copy.copy(features))
|
||||
|
||||
for rule in rules:
|
||||
res = rule.evaluate(features)
|
||||
res = rule.evaluate(features, short_circuit=True)
|
||||
if res:
|
||||
# we first matched the rule with short circuiting enabled.
|
||||
# this is much faster than without short circuiting.
|
||||
# however, we want to collect all results thoroughly,
|
||||
# so once we've found a match quickly,
|
||||
# go back and capture results without short circuiting.
|
||||
res = rule.evaluate(features, short_circuit=False)
|
||||
|
||||
# sanity check
|
||||
assert bool(res) is True
|
||||
|
||||
results[rule.name].append((va, res))
|
||||
# we need to update the current `features`
|
||||
# because subsequent iterations of this loop may use newly added features,
|
||||
|
||||
Reference in New Issue
Block a user