mirror of
https://github.com/mandiant/capa.git
synced 2026-02-04 19:12:01 -08:00
linter: add typing
This commit is contained in:
133
scripts/lint.py
133
scripts/lint.py
@@ -26,6 +26,8 @@ import argparse
|
||||
import itertools
|
||||
import posixpath
|
||||
import contextlib
|
||||
from typing import Set, Dict, List
|
||||
from dataclasses import field, dataclass
|
||||
|
||||
import tqdm
|
||||
import termcolor
|
||||
@@ -37,6 +39,8 @@ import capa.rules
|
||||
import capa.engine
|
||||
import capa.features.insn
|
||||
import capa.features.common
|
||||
from capa.rules import Rule, RuleSet
|
||||
from capa.features.common import Feature
|
||||
|
||||
logger = logging.getLogger("lint")
|
||||
|
||||
@@ -53,6 +57,23 @@ def green(s):
|
||||
return termcolor.colored(s, "green")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Context:
|
||||
"""
|
||||
attributes:
|
||||
samples: mapping from content hash (MD5, SHA, etc.) to file path.
|
||||
rules: rules to inspect
|
||||
is_thorough: should inspect long-running lints
|
||||
capabilities_by_sample: cache of results, indexed by file path.
|
||||
"""
|
||||
|
||||
# TODO: map this to Path
|
||||
samples: Dict[str, str]
|
||||
rules: RuleSet
|
||||
is_thorough: bool
|
||||
capabilities_by_sample: Dict[str, Set[str]] = field(default_factory=dict)
|
||||
|
||||
|
||||
class Lint:
|
||||
WARN = orange("WARN")
|
||||
FAIL = red("FAIL")
|
||||
@@ -61,7 +82,7 @@ class Lint:
|
||||
level = FAIL
|
||||
recommendation = ""
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
return False
|
||||
|
||||
|
||||
@@ -69,7 +90,7 @@ class NameCasing(Lint):
|
||||
name = "rule name casing"
|
||||
recommendation = "Rename rule using to start with lower case letters"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
return rule.name[0] in string.ascii_uppercase and rule.name[1] not in string.ascii_uppercase
|
||||
|
||||
|
||||
@@ -78,7 +99,7 @@ class FilenameDoesntMatchRuleName(Lint):
|
||||
recommendation = "Rename rule file to match the rule name"
|
||||
recommendation_template = 'Rename rule file to match the rule name, expected: "{:s}", found: "{:s}"'
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
expected = rule.name
|
||||
expected = expected.lower()
|
||||
expected = expected.replace(" ", "-")
|
||||
@@ -100,7 +121,7 @@ class MissingNamespace(Lint):
|
||||
name = "missing rule namespace"
|
||||
recommendation = "Add meta.namespace so that the rule is emitted correctly"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
return (
|
||||
"namespace" not in rule.meta
|
||||
and not is_nursery_rule(rule)
|
||||
@@ -113,7 +134,7 @@ class NamespaceDoesntMatchRulePath(Lint):
|
||||
name = "file path doesn't match rule namespace"
|
||||
recommendation = "Move rule to appropriate directory or update the namespace"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
# let the other lints catch namespace issues
|
||||
if "namespace" not in rule.meta:
|
||||
return False
|
||||
@@ -131,7 +152,7 @@ class MissingScope(Lint):
|
||||
name = "missing scope"
|
||||
recommendation = "Add meta.scope so that the scope is explicit (defaults to `function`)"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
return "scope" not in rule.meta
|
||||
|
||||
|
||||
@@ -139,7 +160,7 @@ class InvalidScope(Lint):
|
||||
name = "invalid scope"
|
||||
recommendation = "Use only file, function, or basic block rule scopes"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
return rule.meta.get("scope") not in ("file", "function", "basic block")
|
||||
|
||||
|
||||
@@ -147,7 +168,7 @@ class MissingAuthor(Lint):
|
||||
name = "missing author"
|
||||
recommendation = "Add meta.author so that users know who to contact with questions"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
return "author" not in rule.meta
|
||||
|
||||
|
||||
@@ -155,7 +176,7 @@ class MissingExamples(Lint):
|
||||
name = "missing examples"
|
||||
recommendation = "Add meta.examples so that the rule can be tested and verified"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
return (
|
||||
"examples" not in rule.meta
|
||||
or not isinstance(rule.meta["examples"], list)
|
||||
@@ -168,7 +189,7 @@ class MissingExampleOffset(Lint):
|
||||
name = "missing example offset"
|
||||
recommendation = "Add offset of example function"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
if rule.meta.get("scope") in ("function", "basic block"):
|
||||
examples = rule.meta.get("examples")
|
||||
if isinstance(examples, list):
|
||||
@@ -182,7 +203,7 @@ class ExampleFileDNE(Lint):
|
||||
name = "referenced example doesn't exist"
|
||||
recommendation = "Add the referenced example to samples directory ($capa-root/tests/data or supplied via --samples)"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
if not rule.meta.get("examples"):
|
||||
# let the MissingExamples lint catch this case, don't double report.
|
||||
return False
|
||||
@@ -191,7 +212,7 @@ class ExampleFileDNE(Lint):
|
||||
for example in rule.meta.get("examples", []):
|
||||
if example:
|
||||
example_id = example.partition(":")[0]
|
||||
if example_id in ctx["samples"]:
|
||||
if example_id in ctx.samples:
|
||||
found = True
|
||||
break
|
||||
|
||||
@@ -201,12 +222,20 @@ class ExampleFileDNE(Lint):
|
||||
DEFAULT_SIGNATURES = capa.main.get_default_signatures()
|
||||
|
||||
|
||||
def get_sample_capabilities(ctx, path):
|
||||
extractor = capa.main.get_extractor(
|
||||
path, "auto", capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True
|
||||
)
|
||||
capabilities, _ = capa.main.find_capabilities(ctx["rules"], extractor, disable_progress=True)
|
||||
return capabilities
|
||||
|
||||
|
||||
class DoesntMatchExample(Lint):
|
||||
name = "doesn't match on referenced example"
|
||||
recommendation = "Fix the rule logic or provide a different example"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
if not ctx["is_thorough"]:
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
if not ctx.is_thorough:
|
||||
return False
|
||||
|
||||
examples = rule.meta.get("examples", [])
|
||||
@@ -216,17 +245,14 @@ class DoesntMatchExample(Lint):
|
||||
for example in examples:
|
||||
example_id = example.partition(":")[0]
|
||||
try:
|
||||
path = ctx["samples"][example_id]
|
||||
path = ctx.samples[example_id]
|
||||
except KeyError:
|
||||
# lint ExampleFileDNE will catch this.
|
||||
# don't double report.
|
||||
continue
|
||||
|
||||
try:
|
||||
extractor = capa.main.get_extractor(
|
||||
path, "auto", capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True
|
||||
)
|
||||
capabilities, meta = capa.main.find_capabilities(ctx["rules"], extractor, disable_progress=True)
|
||||
capabilities = get_sample_capabilities(ctx, path)
|
||||
except Exception as e:
|
||||
logger.error("failed to extract capabilities: %s %s %s", rule.name, path, e)
|
||||
return True
|
||||
@@ -241,7 +267,7 @@ class StatementWithSingleChildStatement(Lint):
|
||||
recommendation_template = "remove the superfluous parent statement: {:s}"
|
||||
violation = False
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
self.violation = False
|
||||
|
||||
def rec(statement, is_root=False):
|
||||
@@ -264,7 +290,7 @@ class OrStatementWithAlwaysTrueChild(Lint):
|
||||
recommendation_template = "clarify the rule logic, e.g. by moving the always True child statement: {:s}"
|
||||
violation = False
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
self.violation = False
|
||||
|
||||
def rec(statement):
|
||||
@@ -287,7 +313,7 @@ class UnusualMetaField(Lint):
|
||||
recommendation = "Remove the meta field"
|
||||
recommendation_template = 'Remove the meta field: "{:s}"'
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
for key in rule.meta.keys():
|
||||
if key in capa.rules.META_KEYS:
|
||||
continue
|
||||
@@ -303,7 +329,7 @@ class LibRuleNotInLibDirectory(Lint):
|
||||
name = "lib rule not found in lib directory"
|
||||
recommendation = "Move the rule to the `lib` subdirectory of the rules path"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
if is_nursery_rule(rule):
|
||||
return False
|
||||
|
||||
@@ -317,7 +343,7 @@ class LibRuleHasNamespace(Lint):
|
||||
name = "lib rule has a namespace"
|
||||
recommendation = "Remove the namespace from the rule"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
if "lib" not in rule.meta:
|
||||
return False
|
||||
|
||||
@@ -328,9 +354,10 @@ class FeatureStringTooShort(Lint):
|
||||
name = "feature string too short"
|
||||
recommendation = 'capa only extracts strings with length >= 4; will not match on "{:s}"'
|
||||
|
||||
def check_features(self, ctx, features):
|
||||
def check_features(self, ctx: Context, features: List[Feature]):
|
||||
for feature in features:
|
||||
if isinstance(feature, (capa.features.common.String, capa.features.common.Substring)):
|
||||
assert isinstance(feature.value, str)
|
||||
if len(feature.value) < 4:
|
||||
self.recommendation = self.recommendation.format(feature.value)
|
||||
return True
|
||||
@@ -345,9 +372,10 @@ class FeatureNegativeNumber(Lint):
|
||||
'representation; will not match on "{:d}"'
|
||||
)
|
||||
|
||||
def check_features(self, ctx, features):
|
||||
def check_features(self, ctx: Context, features: List[Feature]):
|
||||
for feature in features:
|
||||
if isinstance(feature, (capa.features.insn.Number,)):
|
||||
assert isinstance(feature.value, int)
|
||||
if feature.value < 0:
|
||||
self.recommendation = self.recommendation_template.format(feature.value)
|
||||
return True
|
||||
@@ -362,9 +390,10 @@ class FeatureNtdllNtoskrnlApi(Lint):
|
||||
"module requirement to improve detection"
|
||||
)
|
||||
|
||||
def check_features(self, ctx, features):
|
||||
def check_features(self, ctx: Context, features: List[Feature]):
|
||||
for feature in features:
|
||||
if isinstance(feature, capa.features.insn.API):
|
||||
assert isinstance(feature.value, str)
|
||||
modname, _, impname = feature.value.rpartition(".")
|
||||
|
||||
if modname == "ntdll":
|
||||
@@ -419,7 +448,7 @@ class FormatLineFeedEOL(Lint):
|
||||
name = "line(s) end with CRLF (\\r\\n)"
|
||||
recommendation = "convert line endings to LF (\\n) for example using dos2unix"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
if len(rule.definition.split("\r\n")) > 0:
|
||||
return False
|
||||
return True
|
||||
@@ -429,7 +458,7 @@ class FormatSingleEmptyLineEOF(Lint):
|
||||
name = "EOF format"
|
||||
recommendation = "end file with a single empty line"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
if rule.definition.endswith("\n") and not rule.definition.endswith("\n\n"):
|
||||
return False
|
||||
return True
|
||||
@@ -439,7 +468,7 @@ class FormatIncorrect(Lint):
|
||||
name = "rule format incorrect"
|
||||
recommendation_template = "use scripts/capafmt.py or adjust as follows\n{:s}"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
actual = rule.definition
|
||||
expected = capa.rules.Rule.from_yaml(rule.definition, use_ruamel=True).to_yaml()
|
||||
|
||||
@@ -459,7 +488,7 @@ class FormatIncorrect(Lint):
|
||||
class FormatStringQuotesIncorrect(Lint):
|
||||
name = "rule string quotes incorrect"
|
||||
|
||||
def check_rule(self, ctx, rule):
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
events = capa.rules.Rule._get_ruamel_yaml_parser().parse(rule.definition)
|
||||
for key in events:
|
||||
if isinstance(key, ruamel.yaml.ScalarEvent) and key.value == "string":
|
||||
@@ -499,13 +528,13 @@ class FormatStringQuotesIncorrect(Lint):
|
||||
return False
|
||||
|
||||
|
||||
def run_lints(lints, ctx, rule):
|
||||
def run_lints(lints, ctx: Context, rule: Rule):
|
||||
for lint in lints:
|
||||
if lint.check_rule(ctx, rule):
|
||||
yield lint
|
||||
|
||||
|
||||
def run_feature_lints(lints, ctx, features):
|
||||
def run_feature_lints(lints, ctx: Context, features: List[Feature]):
|
||||
for lint in lints:
|
||||
if lint.check_features(ctx, features):
|
||||
yield lint
|
||||
@@ -517,7 +546,7 @@ NAME_LINTS = (
|
||||
)
|
||||
|
||||
|
||||
def lint_name(ctx, rule):
|
||||
def lint_name(ctx: Context, rule: Rule):
|
||||
return run_lints(NAME_LINTS, ctx, rule)
|
||||
|
||||
|
||||
@@ -527,7 +556,7 @@ SCOPE_LINTS = (
|
||||
)
|
||||
|
||||
|
||||
def lint_scope(ctx, rule):
|
||||
def lint_scope(ctx: Context, rule: Rule):
|
||||
return run_lints(SCOPE_LINTS, ctx, rule)
|
||||
|
||||
|
||||
@@ -544,14 +573,14 @@ META_LINTS = (
|
||||
)
|
||||
|
||||
|
||||
def lint_meta(ctx, rule):
|
||||
def lint_meta(ctx: Context, rule: Rule):
|
||||
return run_lints(META_LINTS, ctx, rule)
|
||||
|
||||
|
||||
FEATURE_LINTS = (FeatureStringTooShort(), FeatureNegativeNumber(), FeatureNtdllNtoskrnlApi())
|
||||
|
||||
|
||||
def lint_features(ctx, rule):
|
||||
def lint_features(ctx: Context, rule: Rule):
|
||||
features = get_features(ctx, rule)
|
||||
return run_feature_lints(FEATURE_LINTS, ctx, features)
|
||||
|
||||
@@ -564,7 +593,7 @@ FORMAT_LINTS = (
|
||||
)
|
||||
|
||||
|
||||
def lint_format(ctx, rule):
|
||||
def lint_format(ctx: Context, rule: Rule):
|
||||
return run_lints(FORMAT_LINTS, ctx, rule)
|
||||
|
||||
|
||||
@@ -572,11 +601,11 @@ def get_normpath(path):
|
||||
return posixpath.normpath(path).replace(os.sep, "/")
|
||||
|
||||
|
||||
def get_features(ctx, rule):
|
||||
def get_features(ctx: Context, rule: Rule):
|
||||
# get features from rule and all dependencies including subscopes and matched rules
|
||||
features = []
|
||||
namespaces = ctx["rules"].rules_by_namespace
|
||||
deps = [ctx["rules"].rules[dep] for dep in rule.get_dependencies(namespaces)]
|
||||
namespaces = ctx.rules.rules_by_namespace
|
||||
deps = [ctx.rules.rules[dep] for dep in rule.get_dependencies(namespaces)]
|
||||
for r in [rule] + deps:
|
||||
features.extend(get_rule_features(r))
|
||||
return features
|
||||
@@ -603,7 +632,7 @@ LOGIC_LINTS = (
|
||||
)
|
||||
|
||||
|
||||
def lint_logic(ctx, rule):
|
||||
def lint_logic(ctx: Context, rule: Rule):
|
||||
return run_lints(LOGIC_LINTS, ctx, rule)
|
||||
|
||||
|
||||
@@ -616,7 +645,7 @@ def is_nursery_rule(rule):
|
||||
return rule.meta.get("capa/nursery")
|
||||
|
||||
|
||||
def lint_rule(ctx, rule):
|
||||
def lint_rule(ctx: Context, rule: Rule):
|
||||
logger.debug(rule.name)
|
||||
|
||||
violations = list(
|
||||
@@ -728,21 +757,15 @@ def redirecting_print_to_tqdm():
|
||||
inspect.builtins.print = old_print
|
||||
|
||||
|
||||
def lint(ctx, rules):
|
||||
def lint(ctx: Context):
|
||||
"""
|
||||
Args:
|
||||
samples (Dict[string, string]): map from sample id to path.
|
||||
for each sample, record sample id of sha256, md5, and filename.
|
||||
see `collect_samples(path)`.
|
||||
rules (List[Rule]): the rules to lint.
|
||||
|
||||
Returns: Dict[string, Tuple(int, int)]
|
||||
- # lints failed
|
||||
- # lints warned
|
||||
"""
|
||||
ret = {}
|
||||
|
||||
with tqdm.contrib.logging.tqdm_logging_redirect(rules.rules.items(), unit="rule") as pbar:
|
||||
with tqdm.contrib.logging.tqdm_logging_redirect(ctx.rules.rules.items(), unit="rule") as pbar:
|
||||
with redirecting_print_to_tqdm():
|
||||
for name, rule in pbar:
|
||||
if rule.meta.get("capa/subscope-rule", False):
|
||||
@@ -842,13 +865,9 @@ def main(argv=None):
|
||||
|
||||
samples = collect_samples(args.samples)
|
||||
|
||||
ctx = {
|
||||
"samples": samples,
|
||||
"rules": rules,
|
||||
"is_thorough": args.thorough,
|
||||
}
|
||||
ctx = Context(samples=samples, rules=rules, is_thorough=args.thorough)
|
||||
|
||||
results_by_name = lint(ctx, rules)
|
||||
results_by_name = lint(ctx)
|
||||
failed_rules = []
|
||||
warned_rules = []
|
||||
for name, (fail_count, warn_count) in results_by_name.items():
|
||||
|
||||
Reference in New Issue
Block a user