type: capa.rules

This commit is contained in:
William Ballenthin
2021-06-09 23:39:07 -06:00
parent 459df37b13
commit 3e79dfd0e7

View File

@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import re
import uuid
import codecs
@@ -19,10 +20,9 @@ try:
except ImportError:
from backports.functools_lru_cache import lru_cache
import io
import yaml
import ruamel.yaml
from typing import List, Any, Union, Dict, Iterator, Set
import capa.rules
import capa.engine
@@ -32,7 +32,8 @@ import capa.features.file
import capa.features.insn
import capa.features.common
import capa.features.basicblock
from capa.features.common import MAX_BYTES_FEATURE_SIZE
from capa.features.common import MAX_BYTES_FEATURE_SIZE, Feature
from capa.rules import Rule
logger = logging.getLogger(__name__)
@@ -143,7 +144,7 @@ class InvalidRuleSet(ValueError):
return str(self)
def ensure_feature_valid_for_scope(scope, feature):
def ensure_feature_valid_for_scope(scope: str, feature: Feature):
if isinstance(feature, capa.features.common.Characteristic):
if capa.features.common.Characteristic(feature.value) not in SUPPORTED_FEATURES[scope]:
raise InvalidRule("feature %s not support for scope %s" % (feature, scope))
@@ -151,14 +152,14 @@ def ensure_feature_valid_for_scope(scope, feature):
raise InvalidRule("feature %s not support for scope %s" % (feature, scope))
def parse_int(s):
def parse_int(s: str) -> int:
if s.startswith("0x"):
return int(s, 0x10)
else:
return int(s, 10)
def parse_range(s):
def parse_range(s: str):
"""
parse a string "(0, 1)" into a range (min, max).
min and/or max may by None to indicate an unbound range.
@@ -196,7 +197,7 @@ def parse_range(s):
return min, max
def parse_feature(key):
def parse_feature(key: str):
# keep this in sync with supported features
if key == "api":
return capa.features.insn.API
@@ -247,10 +248,7 @@ def parse_feature(key):
DESCRIPTION_SEPARATOR = " = "
def parse_description(s, value_type, description=None):
"""
s can be an int or a string
"""
def parse_description(s: Union[str, int], value_type: str, description=None):
if value_type != "string" and isinstance(s, str) and DESCRIPTION_SEPARATOR in s:
if description:
raise InvalidRule(
@@ -318,7 +316,7 @@ def pop_statement_description_entry(d):
return description["description"]
def build_statements(d, scope):
def build_statements(d, scope: str):
if len(d.keys()) > 2:
raise InvalidRule("too many statements")
@@ -425,11 +423,11 @@ def build_statements(d, scope):
return feature
def first(s):
def first(s: List[Any]) -> Any:
return s[0]
def second(s):
def second(s: List[Any]) -> Any:
return s[1]
@@ -754,50 +752,37 @@ class Rule(object):
return doc
def get_rules_with_scope(rules, scope):
def get_rules_with_scope(rules, scope) -> List[Rule]:
"""
from the given collection of rules, select those with the given scope.
args:
rules (List[capa.rules.Rule]):
scope (str): one of the capa.rules.*_SCOPE constants.
returns:
List[capa.rules.Rule]:
`scope` is one of the capa.rules.*_SCOPE constants.
"""
return list(rule for rule in rules if rule.scope == scope)
def get_rules_and_dependencies(rules, rule_name):
def get_rules_and_dependencies(rules: List[Rule], rule_name: str) -> Iterator[Rule]:
"""
from the given collection of rules, select a rule and its dependencies (transitively).
args:
rules (List[Rule]):
rule_name (str):
yields:
Rule:
"""
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules}
rules_by_name = {rule.name: rule for rule in rules}
wanted = set([rule_name])
def rec(rule):
wanted.add(rule.name)
for dep in rule.get_dependencies(namespaces):
rec(rules[dep])
rec(rules_by_name[dep])
rec(rules[rule_name])
rec(rules_by_name[rule_name])
for rule in rules.values():
for rule in rules_by_name.values():
if rule.name in wanted:
yield rule
def ensure_rules_are_unique(rules):
def ensure_rules_are_unique(rules: List[Rule]) -> None:
seen = set([])
for rule in rules:
if rule.name in seen:
@@ -805,7 +790,7 @@ def ensure_rules_are_unique(rules):
seen.add(rule.name)
def ensure_rule_dependencies_are_met(rules):
def ensure_rule_dependencies_are_met(rules: List[Rule]) -> None:
"""
raise an exception if a rule dependency does not exist.
@@ -815,14 +800,14 @@ def ensure_rule_dependencies_are_met(rules):
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules}
for rule in rules.values():
rules_by_name = {rule.name: rule for rule in rules}
for rule in rules_by_name.values():
for dep in rule.get_dependencies(namespaces):
if dep not in rules:
if dep not in rules_by_name:
raise InvalidRule('rule "%s" depends on missing rule "%s"' % (rule.name, dep))
def index_rules_by_namespace(rules):
def index_rules_by_namespace(rules: List[Rule]) -> Dict[str, List[Rule]]:
"""
compute the rules that fit into each namespace found within the given rules.
@@ -836,11 +821,6 @@ def index_rules_by_namespace(rules):
c2/shell: [create reverse shell]
c2/file-transfer: [download and write a file]
c2: [create reverse shell, download and write a file]
Args:
rules (List[Rule]):
Returns: Dict[str, List[Rule]]
"""
namespaces = collections.defaultdict(list)
@@ -856,7 +836,7 @@ def index_rules_by_namespace(rules):
return dict(namespaces)
def topologically_order_rules(rules):
def topologically_order_rules(rules: List[Rule]) -> List[Rule]:
"""
order the given rules such that dependencies show up before dependents.
this means that as we match rules, we can add features for the matches, and these
@@ -867,7 +847,7 @@ def topologically_order_rules(rules):
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules}
rules_by_name = {rule.name: rule for rule in rules}
seen = set([])
ret = []
@@ -876,12 +856,12 @@ def topologically_order_rules(rules):
return
for dep in rule.get_dependencies(namespaces):
rec(rules[dep])
rec(rules_by_name[dep])
ret.append(rule)
seen.add(rule.name)
for rule in rules.values():
for rule in rules_by_name.values():
rec(rule)
return ret
@@ -902,7 +882,7 @@ class RuleSet(object):
capa.engine.match(ruleset.file_rules, ...)
"""
def __init__(self, rules):
def __init__(self, rules: List[Rule]):
super(RuleSet, self).__init__()
ensure_rules_are_unique(rules)
@@ -949,7 +929,7 @@ class RuleSet(object):
continue
scope_rules.update(get_rules_and_dependencies(rules, rule.name))
return get_rules_with_scope(topologically_order_rules(scope_rules), scope)
return get_rules_with_scope(topologically_order_rules(list(scope_rules)), scope)
@staticmethod
def _extract_subscope_rules(rules):
@@ -973,7 +953,7 @@ class RuleSet(object):
return done
def filter_rules_by_meta(self, tag):
def filter_rules_by_meta(self, tag: str) -> "RuleSet":
"""
return new rule set with rules filtered based on all meta field values, adds all dependency rules
apply tag-based rule filter assuming that all required rules are loaded
@@ -982,7 +962,7 @@ class RuleSet(object):
TODO handle circular dependencies?
TODO support -t=metafield <k>
"""
rules = self.rules.values()
rules = list(self.rules.values())
rules_filtered = set([])
for rule in rules:
for k, v in rule.meta.items():