From cb2d1cde36f9b372c73705d3cabdb3af524b9ad4 Mon Sep 17 00:00:00 2001 From: William Ballenthin Date: Fri, 27 Aug 2021 09:04:37 -0600 Subject: [PATCH] linter: add typing --- scripts/lint.py | 133 +++++++++++++++++++++++++++--------------------- 1 file changed, 76 insertions(+), 57 deletions(-) diff --git a/scripts/lint.py b/scripts/lint.py index 79677688..74911520 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -26,6 +26,8 @@ import argparse import itertools import posixpath import contextlib +from typing import Set, Dict, List +from dataclasses import field, dataclass import tqdm import termcolor @@ -37,6 +39,8 @@ import capa.rules import capa.engine import capa.features.insn import capa.features.common +from capa.rules import Rule, RuleSet +from capa.features.common import Feature logger = logging.getLogger("lint") @@ -53,6 +57,23 @@ def green(s): return termcolor.colored(s, "green") +@dataclass +class Context: + """ + attributes: + samples: mapping from content hash (MD5, SHA, etc.) to file path. + rules: rules to inspect + is_thorough: should inspect long-running lints + capabilities_by_sample: cache of results, indexed by file path. + """ + + # TODO: map this to Path + samples: Dict[str, str] + rules: RuleSet + is_thorough: bool + capabilities_by_sample: Dict[str, Set[str]] = field(default_factory=dict) + + class Lint: WARN = orange("WARN") FAIL = red("FAIL") @@ -61,7 +82,7 @@ class Lint: level = FAIL recommendation = "" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): return False @@ -69,7 +90,7 @@ class NameCasing(Lint): name = "rule name casing" recommendation = "Rename rule using to start with lower case letters" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): return rule.name[0] in string.ascii_uppercase and rule.name[1] not in string.ascii_uppercase @@ -78,7 +99,7 @@ class FilenameDoesntMatchRuleName(Lint): recommendation = "Rename rule file to match the rule name" recommendation_template = 'Rename rule file to match the rule name, expected: "{:s}", found: "{:s}"' - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): expected = rule.name expected = expected.lower() expected = expected.replace(" ", "-") @@ -100,7 +121,7 @@ class MissingNamespace(Lint): name = "missing rule namespace" recommendation = "Add meta.namespace so that the rule is emitted correctly" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): return ( "namespace" not in rule.meta and not is_nursery_rule(rule) @@ -113,7 +134,7 @@ class NamespaceDoesntMatchRulePath(Lint): name = "file path doesn't match rule namespace" recommendation = "Move rule to appropriate directory or update the namespace" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): # let the other lints catch namespace issues if "namespace" not in rule.meta: return False @@ -131,7 +152,7 @@ class MissingScope(Lint): name = "missing scope" recommendation = "Add meta.scope so that the scope is explicit (defaults to `function`)" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): return "scope" not in rule.meta @@ -139,7 +160,7 @@ class InvalidScope(Lint): name = "invalid scope" recommendation = "Use only file, function, or basic block rule scopes" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): return rule.meta.get("scope") not in ("file", "function", "basic block") @@ -147,7 +168,7 @@ class MissingAuthor(Lint): name = "missing author" recommendation = "Add meta.author so that users know who to contact with questions" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): return "author" not in rule.meta @@ -155,7 +176,7 @@ class MissingExamples(Lint): name = "missing examples" recommendation = "Add meta.examples so that the rule can be tested and verified" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): return ( "examples" not in rule.meta or not isinstance(rule.meta["examples"], list) @@ -168,7 +189,7 @@ class MissingExampleOffset(Lint): name = "missing example offset" recommendation = "Add offset of example function" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): if rule.meta.get("scope") in ("function", "basic block"): examples = rule.meta.get("examples") if isinstance(examples, list): @@ -182,7 +203,7 @@ class ExampleFileDNE(Lint): name = "referenced example doesn't exist" recommendation = "Add the referenced example to samples directory ($capa-root/tests/data or supplied via --samples)" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): if not rule.meta.get("examples"): # let the MissingExamples lint catch this case, don't double report. return False @@ -191,7 +212,7 @@ class ExampleFileDNE(Lint): for example in rule.meta.get("examples", []): if example: example_id = example.partition(":")[0] - if example_id in ctx["samples"]: + if example_id in ctx.samples: found = True break @@ -201,12 +222,20 @@ class ExampleFileDNE(Lint): DEFAULT_SIGNATURES = capa.main.get_default_signatures() +def get_sample_capabilities(ctx, path): + extractor = capa.main.get_extractor( + path, "auto", capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True + ) + capabilities, _ = capa.main.find_capabilities(ctx["rules"], extractor, disable_progress=True) + return capabilities + + class DoesntMatchExample(Lint): name = "doesn't match on referenced example" recommendation = "Fix the rule logic or provide a different example" - def check_rule(self, ctx, rule): - if not ctx["is_thorough"]: + def check_rule(self, ctx: Context, rule: Rule): + if not ctx.is_thorough: return False examples = rule.meta.get("examples", []) @@ -216,17 +245,14 @@ class DoesntMatchExample(Lint): for example in examples: example_id = example.partition(":")[0] try: - path = ctx["samples"][example_id] + path = ctx.samples[example_id] except KeyError: # lint ExampleFileDNE will catch this. # don't double report. continue try: - extractor = capa.main.get_extractor( - path, "auto", capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True - ) - capabilities, meta = capa.main.find_capabilities(ctx["rules"], extractor, disable_progress=True) + capabilities = get_sample_capabilities(ctx, path) except Exception as e: logger.error("failed to extract capabilities: %s %s %s", rule.name, path, e) return True @@ -241,7 +267,7 @@ class StatementWithSingleChildStatement(Lint): recommendation_template = "remove the superfluous parent statement: {:s}" violation = False - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): self.violation = False def rec(statement, is_root=False): @@ -264,7 +290,7 @@ class OrStatementWithAlwaysTrueChild(Lint): recommendation_template = "clarify the rule logic, e.g. by moving the always True child statement: {:s}" violation = False - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): self.violation = False def rec(statement): @@ -287,7 +313,7 @@ class UnusualMetaField(Lint): recommendation = "Remove the meta field" recommendation_template = 'Remove the meta field: "{:s}"' - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): for key in rule.meta.keys(): if key in capa.rules.META_KEYS: continue @@ -303,7 +329,7 @@ class LibRuleNotInLibDirectory(Lint): name = "lib rule not found in lib directory" recommendation = "Move the rule to the `lib` subdirectory of the rules path" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): if is_nursery_rule(rule): return False @@ -317,7 +343,7 @@ class LibRuleHasNamespace(Lint): name = "lib rule has a namespace" recommendation = "Remove the namespace from the rule" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): if "lib" not in rule.meta: return False @@ -328,9 +354,10 @@ class FeatureStringTooShort(Lint): name = "feature string too short" recommendation = 'capa only extracts strings with length >= 4; will not match on "{:s}"' - def check_features(self, ctx, features): + def check_features(self, ctx: Context, features: List[Feature]): for feature in features: if isinstance(feature, (capa.features.common.String, capa.features.common.Substring)): + assert isinstance(feature.value, str) if len(feature.value) < 4: self.recommendation = self.recommendation.format(feature.value) return True @@ -345,9 +372,10 @@ class FeatureNegativeNumber(Lint): 'representation; will not match on "{:d}"' ) - def check_features(self, ctx, features): + def check_features(self, ctx: Context, features: List[Feature]): for feature in features: if isinstance(feature, (capa.features.insn.Number,)): + assert isinstance(feature.value, int) if feature.value < 0: self.recommendation = self.recommendation_template.format(feature.value) return True @@ -362,9 +390,10 @@ class FeatureNtdllNtoskrnlApi(Lint): "module requirement to improve detection" ) - def check_features(self, ctx, features): + def check_features(self, ctx: Context, features: List[Feature]): for feature in features: if isinstance(feature, capa.features.insn.API): + assert isinstance(feature.value, str) modname, _, impname = feature.value.rpartition(".") if modname == "ntdll": @@ -419,7 +448,7 @@ class FormatLineFeedEOL(Lint): name = "line(s) end with CRLF (\\r\\n)" recommendation = "convert line endings to LF (\\n) for example using dos2unix" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): if len(rule.definition.split("\r\n")) > 0: return False return True @@ -429,7 +458,7 @@ class FormatSingleEmptyLineEOF(Lint): name = "EOF format" recommendation = "end file with a single empty line" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): if rule.definition.endswith("\n") and not rule.definition.endswith("\n\n"): return False return True @@ -439,7 +468,7 @@ class FormatIncorrect(Lint): name = "rule format incorrect" recommendation_template = "use scripts/capafmt.py or adjust as follows\n{:s}" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): actual = rule.definition expected = capa.rules.Rule.from_yaml(rule.definition, use_ruamel=True).to_yaml() @@ -459,7 +488,7 @@ class FormatIncorrect(Lint): class FormatStringQuotesIncorrect(Lint): name = "rule string quotes incorrect" - def check_rule(self, ctx, rule): + def check_rule(self, ctx: Context, rule: Rule): events = capa.rules.Rule._get_ruamel_yaml_parser().parse(rule.definition) for key in events: if isinstance(key, ruamel.yaml.ScalarEvent) and key.value == "string": @@ -499,13 +528,13 @@ class FormatStringQuotesIncorrect(Lint): return False -def run_lints(lints, ctx, rule): +def run_lints(lints, ctx: Context, rule: Rule): for lint in lints: if lint.check_rule(ctx, rule): yield lint -def run_feature_lints(lints, ctx, features): +def run_feature_lints(lints, ctx: Context, features: List[Feature]): for lint in lints: if lint.check_features(ctx, features): yield lint @@ -517,7 +546,7 @@ NAME_LINTS = ( ) -def lint_name(ctx, rule): +def lint_name(ctx: Context, rule: Rule): return run_lints(NAME_LINTS, ctx, rule) @@ -527,7 +556,7 @@ SCOPE_LINTS = ( ) -def lint_scope(ctx, rule): +def lint_scope(ctx: Context, rule: Rule): return run_lints(SCOPE_LINTS, ctx, rule) @@ -544,14 +573,14 @@ META_LINTS = ( ) -def lint_meta(ctx, rule): +def lint_meta(ctx: Context, rule: Rule): return run_lints(META_LINTS, ctx, rule) FEATURE_LINTS = (FeatureStringTooShort(), FeatureNegativeNumber(), FeatureNtdllNtoskrnlApi()) -def lint_features(ctx, rule): +def lint_features(ctx: Context, rule: Rule): features = get_features(ctx, rule) return run_feature_lints(FEATURE_LINTS, ctx, features) @@ -564,7 +593,7 @@ FORMAT_LINTS = ( ) -def lint_format(ctx, rule): +def lint_format(ctx: Context, rule: Rule): return run_lints(FORMAT_LINTS, ctx, rule) @@ -572,11 +601,11 @@ def get_normpath(path): return posixpath.normpath(path).replace(os.sep, "/") -def get_features(ctx, rule): +def get_features(ctx: Context, rule: Rule): # get features from rule and all dependencies including subscopes and matched rules features = [] - namespaces = ctx["rules"].rules_by_namespace - deps = [ctx["rules"].rules[dep] for dep in rule.get_dependencies(namespaces)] + namespaces = ctx.rules.rules_by_namespace + deps = [ctx.rules.rules[dep] for dep in rule.get_dependencies(namespaces)] for r in [rule] + deps: features.extend(get_rule_features(r)) return features @@ -603,7 +632,7 @@ LOGIC_LINTS = ( ) -def lint_logic(ctx, rule): +def lint_logic(ctx: Context, rule: Rule): return run_lints(LOGIC_LINTS, ctx, rule) @@ -616,7 +645,7 @@ def is_nursery_rule(rule): return rule.meta.get("capa/nursery") -def lint_rule(ctx, rule): +def lint_rule(ctx: Context, rule: Rule): logger.debug(rule.name) violations = list( @@ -728,21 +757,15 @@ def redirecting_print_to_tqdm(): inspect.builtins.print = old_print -def lint(ctx, rules): +def lint(ctx: Context): """ - Args: - samples (Dict[string, string]): map from sample id to path. - for each sample, record sample id of sha256, md5, and filename. - see `collect_samples(path)`. - rules (List[Rule]): the rules to lint. - Returns: Dict[string, Tuple(int, int)] - # lints failed - # lints warned """ ret = {} - with tqdm.contrib.logging.tqdm_logging_redirect(rules.rules.items(), unit="rule") as pbar: + with tqdm.contrib.logging.tqdm_logging_redirect(ctx.rules.rules.items(), unit="rule") as pbar: with redirecting_print_to_tqdm(): for name, rule in pbar: if rule.meta.get("capa/subscope-rule", False): @@ -842,13 +865,9 @@ def main(argv=None): samples = collect_samples(args.samples) - ctx = { - "samples": samples, - "rules": rules, - "is_thorough": args.thorough, - } + ctx = Context(samples=samples, rules=rules, is_thorough=args.thorough) - results_by_name = lint(ctx, rules) + results_by_name = lint(ctx) failed_rules = [] warned_rules = [] for name, (fail_count, warn_count) in results_by_name.items():