Files
capa/capa/engine.py
Ana Maria Martinez Gomez 3cd97ae9f2 [copyright + license] Fix headers
Replace the header from source code files using the following script:
```Python
for dir_path, dir_names, file_names in os.walk("capa"):
    for file_name in file_names:
        # header are only in `.py` and `.toml` files
        if file_name[-3:] not in (".py", "oml"):
            continue
        file_path = f"{dir_path}/{file_name}"
        f = open(file_path, "rb+")
        content = f.read()
        m = re.search(OLD_HEADER, content)
        if not m:
            continue
        print(f"{file_path}: {m.group('year')}")
        content = content.replace(m.group(0), NEW_HEADER % m.group("year"))
        f.seek(0)
        f.write(content)
```

Some files had the copyright headers inside a `"""` comment and needed
manual changes before applying the script. `hook-vivisect.py` and
`pyinstaller.spec` didn't include the license in the header and also
needed manual changes.

The old header had the confusing sentence `All rights reserved`, which
does not make sense for an open source license. Replace the header by
the default Google header that corrects this issue and keep capa
consistent with other Google projects.

Adapt the linter to work with the new header.

Replace also the copyright text in the `web/public/index.html` file for
consistency.
2025-01-15 08:52:42 -07:00

347 lines
13 KiB
Python

# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import collections
from typing import TYPE_CHECKING, Union, Mapping, Iterable, Iterator
import capa.perf
import capa.features.common
from capa.features.common import Result, Feature
from capa.features.address import Address
if TYPE_CHECKING:
# circular import, otherwise
import capa.rules
# a collection of features and the locations at which they are found.
#
# used throughout matching as the context in which features are searched:
# to check if a feature exists, do: `Number(0x10) in features`.
# to collect the locations of a feature, do: `features[Number(0x10)]`
#
# aliased here so that the type can be documented and xref'd.
FeatureSet = dict[Feature, set[Address]]
class Statement:
"""
superclass for structural nodes, such as and/or/not.
this exists to provide a default impl for `__str__` and `__repr__`,
and to declare the interface method `evaluate`
"""
def __init__(self, description=None):
super().__init__()
self.name = self.__class__.__name__
self.description = description
def __str__(self):
name = self.name.lower()
children = ",".join(map(str, self.get_children()))
if self.description:
return f"{name}({children} = {self.description})"
else:
return f"{name}({children})"
def __repr__(self):
return str(self)
def evaluate(self, features: FeatureSet, short_circuit=True) -> Result:
"""
classes that inherit `Statement` must implement `evaluate`
args:
short_circuit (bool): if true, then statements like and/or/some may short circuit.
"""
raise NotImplementedError()
def get_children(self) -> Iterator[Union["Statement", Feature]]:
if hasattr(self, "child"):
# this really confuses mypy because the property may not exist
# since its defined in the subclasses.
child = self.child # type: ignore
assert isinstance(child, (Statement, Feature))
yield child
if hasattr(self, "children"):
for child in self.children:
assert isinstance(child, (Statement, Feature))
yield child
def replace_child(self, existing, new):
if hasattr(self, "child"):
# this really confuses mypy because the property may not exist
# since its defined in the subclasses.
if self.child is existing: # type: ignore
self.child = new
if hasattr(self, "children"):
children = self.children
for i, child in enumerate(children):
if child is existing:
children[i] = new
class And(Statement):
"""
match if all of the children evaluate to True.
the order of evaluation is dictated by the property
`And.children` (type: list[Statement|Feature]).
a query optimizer may safely manipulate the order of these children.
"""
def __init__(self, children, description=None):
super().__init__(description=description)
self.children = children
def evaluate(self, features: FeatureSet, short_circuit=True):
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature.and"] += 1
if short_circuit:
results = []
for child in self.children:
result = child.evaluate(features, short_circuit=short_circuit)
results.append(result)
if not result:
# short circuit
return Result(False, self, results)
return Result(True, self, results)
else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
success = all(results)
return Result(success, self, results)
class Or(Statement):
"""
match if any of the children evaluate to True.
the order of evaluation is dictated by the property
`Or.children` (type: list[Statement|Feature]).
a query optimizer may safely manipulate the order of these children.
"""
def __init__(self, children, description=None):
super().__init__(description=description)
self.children = children
def evaluate(self, features: FeatureSet, short_circuit=True):
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature.or"] += 1
if short_circuit:
results = []
for child in self.children:
result = child.evaluate(features, short_circuit=short_circuit)
results.append(result)
if result:
# short circuit as soon as we hit one match
return Result(True, self, results)
return Result(False, self, results)
else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
success = any(results)
return Result(success, self, results)
class Not(Statement):
"""match only if the child evaluates to False."""
def __init__(self, child, description=None):
super().__init__(description=description)
self.child = child
def evaluate(self, features: FeatureSet, short_circuit=True):
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature.not"] += 1
results = [self.child.evaluate(features, short_circuit=short_circuit)]
success = not results[0]
return Result(success, self, results)
class Some(Statement):
"""
match if at least N of the children evaluate to True.
the order of evaluation is dictated by the property
`Some.children` (type: list[Statement|Feature]).
a query optimizer may safely manipulate the order of these children.
"""
def __init__(self, count, children, description=None):
super().__init__(description=description)
self.count = count
self.children = children
def evaluate(self, features: FeatureSet, short_circuit=True):
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature.some"] += 1
if short_circuit:
results = []
satisfied_children_count = 0
for child in self.children:
result = child.evaluate(features, short_circuit=short_circuit)
results.append(result)
if result:
satisfied_children_count += 1
if satisfied_children_count >= self.count:
# short circuit as soon as we hit the threshold
return Result(True, self, results)
return Result(False, self, results)
else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
# note that here we cast the child result as a bool
# because we've overridden `__bool__` above.
#
# we can't use `if child is True` because the instance is not True.
success = sum([1 for child in results if bool(child) is True]) >= self.count
return Result(success, self, results)
class Range(Statement):
"""match if the child is contained in the feature set with a count in the given range."""
def __init__(self, child, min=None, max=None, description=None):
super().__init__(description=description)
self.child = child
self.min = min if min is not None else 0
self.max = max if max is not None else (1 << 64 - 1)
def evaluate(self, features: FeatureSet, short_circuit=True):
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature.range"] += 1
count = len(features.get(self.child, []))
if self.min == 0 and count == 0:
return Result(True, self, [])
return Result(self.min <= count <= self.max, self, [], locations=features.get(self.child))
def __str__(self):
if self.max == (1 << 64 - 1):
return f"range({str(self.child)}, min={self.min}, max=infinity)"
else:
return f"range({str(self.child)}, min={self.min}, max={self.max})"
class Subscope(Statement):
"""
a subscope element is a placeholder in a rule - it should not be evaluated directly.
the engine should preprocess rules to extract subscope statements into their own rules.
"""
def __init__(self, scope, child, description=None):
super().__init__(description=description)
self.scope = scope
self.child = child
def evaluate(self, features: FeatureSet, short_circuit=True):
raise ValueError("cannot evaluate a subscope directly!")
# mapping from rule name to list of: (location of match, result object)
#
# used throughout matching and rendering to collection the results
# of statement evaluation and their locations.
#
# to check if a rule matched, do: `"TCP client" in matches`.
# to find where a rule matched, do: `map(first, matches["TCP client"])`
# to see how a rule matched, do:
#
# for address, match_details in matches["TCP client"]:
# inspect(match_details)
#
# aliased here so that the type can be documented and xref'd.
MatchResults = Mapping[str, list[tuple[Address, Result]]]
def get_rule_namespaces(rule: "capa.rules.Rule") -> Iterator[str]:
namespace = rule.meta.get("namespace")
if namespace:
while namespace:
yield namespace
namespace, _, _ = namespace.rpartition("/")
def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: Iterable[Address]):
"""
record into the given featureset that the given rule matched at the given locations.
naively, this is just adding a MatchedRule feature;
however, we also want to record matches for the rule's namespaces.
updates `features` in-place. doesn't modify the remaining arguments.
"""
features[capa.features.common.MatchedRule(rule.name)].update(locations)
for namespace in get_rule_namespaces(rule):
features[capa.features.common.MatchedRule(namespace)].update(locations)
def match(rules: list["capa.rules.Rule"], features: FeatureSet, addr: Address) -> tuple[FeatureSet, MatchResults]:
"""
match the given rules against the given features,
returning an updated set of features and the matches.
the updated features are just like the input,
but extended to include the match features (e.g. names of rules that matched).
the given feature set is not modified; an updated copy is returned.
the given list of rules must be ordered topologically by dependency,
or else `match` statements will not be handled correctly.
this routine should be fairly optimized, but is not guaranteed to be the fastest matcher possible.
it has a particularly convenient signature: (rules, features) -> matches
other strategies can be imagined that match differently; implement these elsewhere.
specifically, this routine does "top down" matching of the given rules against the feature set.
"""
results: MatchResults = collections.defaultdict(list)
# copy features so that we can modify it
# without affecting the caller (keep this function pure)
#
# note: copy doesn't notice this is a defaultdict, so we'll recreate that manually.
features = collections.defaultdict(set, copy.copy(features))
for rule in rules:
res = rule.evaluate(features, short_circuit=True)
if res:
# we first matched the rule with short circuiting enabled.
# this is much faster than without short circuiting.
# however, we want to collect all results thoroughly,
# so once we've found a match quickly,
# go back and capture results without short circuiting.
res = rule.evaluate(features, short_circuit=False)
# sanity check
assert bool(res) is True
results[rule.name].append((addr, res))
# we need to update the current `features`
# because subsequent iterations of this loop may use newly added features,
# such as rule or namespace matches.
index_rule_matches(features, rule, [addr])
return (features, results)