diff --git a/.github/pyinstaller/pyinstaller.spec b/.github/pyinstaller/pyinstaller.spec index 8130684a..0e614055 100644 --- a/.github/pyinstaller/pyinstaller.spec +++ b/.github/pyinstaller/pyinstaller.spec @@ -19,6 +19,7 @@ a = Analysis( # i.e. ./.github/pyinstaller ("../../rules", "rules"), ("../../sigs", "sigs"), + ("../../cache", "cache"), # capa.render.default uses tabulate that depends on wcwidth. # it seems wcwidth uses a json file `version.json` # and this doesn't get picked up by pyinstaller automatically. diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e5f3821d..b294eaf4 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -42,6 +42,8 @@ jobs: run: python -m pip install --upgrade pip setuptools - name: Install capa with build requirements run: pip install -e .[build] + - name: Cache the rule set + run: python ./scripts/cache-ruleset.py ./rules/ ./cache/ - name: Build standalone executable run: pyinstaller --log-level DEBUG .github/pyinstaller/pyinstaller.spec - name: Does it run (PE)? diff --git a/CHANGELOG.md b/CHANGELOG.md index 644d6839..6622fab5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - dotnet: emit namespace/class features for type references #1242 @mike-hunhoff - dotnet: extract dotnet and pe format #1187 @mr-tz - don't render all library rule matches in vverbose output #1174 @mr-tz +- cache the rule set across invocations for better performance #1212 @williballenthin ### Breaking Changes - remove SMDA backend #1062 @williballenthin diff --git a/capa/ida/plugin/cache.py b/capa/ida/plugin/cache.py index 4444d302..eec67734 100644 --- a/capa/ida/plugin/cache.py +++ b/capa/ida/plugin/cache.py @@ -8,26 +8,18 @@ from __future__ import annotations -import copy import itertools import collections from typing import Set, Dict, List, Tuple, Union, Optional import capa.engine -from capa.rules import Rule, Scope, RuleSet +from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.features.address import NO_ADDRESS, Address from capa.ida.plugin.extractor import CapaExplorerFeatureExtractor from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle -class CapaExplorerRuleSetCache: - def __init__(self, rules: List[Rule]): - # capa.rules.Ruleset modifies rules, so we use deepcopy to preserve the original list of rules and our cached list of rules - self.rules: List[Rule] = copy.deepcopy(rules) - self.ruleset: RuleSet = RuleSet(copy.deepcopy(self.rules)) - - class CapaRuleGenFeatureCacheNode: def __init__( self, diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 175eb550..ee1faa3e 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -27,7 +27,7 @@ import capa.render.json import capa.features.common import capa.render.result_document import capa.features.extractors.ida.extractor -from capa.rules import Rule +from capa.rules import Rule, RuleSet from capa.engine import FeatureSet from capa.ida.plugin.icon import QICON from capa.ida.plugin.view import ( @@ -36,7 +36,7 @@ from capa.ida.plugin.view import ( CapaExplorerRulegenPreview, CapaExplorerRulegenFeatures, ) -from capa.ida.plugin.cache import CapaRuleGenFeatureCache, CapaExplorerRuleSetCache +from capa.ida.plugin.cache import CapaRuleGenFeatureCache from capa.ida.plugin.error import UserCancelledError from capa.ida.plugin.hooks import CapaExplorerIdaHooks from capa.ida.plugin.model import CapaExplorerDataModel @@ -160,7 +160,7 @@ class CapaExplorerForm(idaapi.PluginForm): # caches used to speed up capa explorer analysis - these must be init to None self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None - self.ruleset_cache: Optional[CapaExplorerRuleSetCache] = None + self.ruleset_cache: Optional[capa.rules.RuleSet] = None self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None self.rulegen_current_function: Optional[FunctionHandle] = None @@ -525,8 +525,7 @@ class CapaExplorerForm(idaapi.PluginForm): meta["prev_base"] = idaapi.get_imagebase() self.model_data.reset() - def load_capa_rules(self): - """load capa rules from directory specified by user, either using IDA UI or settings""" + def ensure_capa_settings_rule_path(self): try: # resolve rules directory - check self and settings first, then ask user if not os.path.exists(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")): @@ -567,56 +566,26 @@ class CapaExplorerForm(idaapi.PluginForm): logger.info("User cancelled analysis.") return False + if not os.path.exists(path): + logger.error("rule path %s does not exist or cannot be accessed" % path) + return False + + return True + + def load_capa_rules(self): + """load capa rules from directory specified by user, either using IDA UI or settings""" + if not self.ensure_capa_settings_rule_path(): + return False + rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "") try: - # following code is derived from capa.main.get_rules, we dup it here so we can inject code that allows - # user to cancel analysis from IDA UI - if not os.path.exists(rule_path): - raise IOError("rule path %s does not exist or cannot be accessed" % rule_path) - rule_paths: List[str] = [] - if os.path.isfile(rule_path): - rule_paths.append(rule_path) - elif os.path.isdir(rule_path): - for root, dirs, files in os.walk(rule_path): - if ".git" in root: - # the .github directory contains CI config in capa-rules - # this includes some .yml files - # these are not rules - # additionally, .git has files that are not .yml and generate the warning - # skip those too - continue - for file in files: - if not file.endswith(".yml"): - if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))): - # expect to see .git* files, readme.md, format.md, and maybe a .git directory - # other things maybe are rules, but are mis-named. - logger.warning("skipping non-.yml file: %s", file) - continue - rule_path = os.path.join(root, file) - rule_paths.append(rule_path) - - rules: List[Rule] = [] - total_paths: int = len(rule_paths) - for (i, rule_path) in enumerate(rule_paths): - update_wait_box( - "loading capa rules from %s (%d of %d)" - % (settings.user[CAPA_SETTINGS_RULE_PATH], i + 1, total_paths) - ) + def on_load_rule(rule_path, i, total): + update_wait_box("loading capa rules from %s (%d of %d)" % (rule_path, i, total)) if ida_kernwin.user_cancelled(): raise UserCancelledError("user cancelled") - try: - rule = capa.rules.Rule.from_yaml_file(rule_path) - except capa.rules.InvalidRule: - raise - else: - rule.meta["capa/path"] = rule_path - if capa.main.is_nursery_rule_path(rule_path): - rule.meta["capa/nursery"] = True - rules.append(rule) - # cache rules and rule set - self.ruleset_cache = CapaExplorerRuleSetCache(rules) + self.ruleset_cache = capa.main.get_rules([rule_path], on_load_rule=on_load_rule) except UserCancelledError: logger.info("User cancelled analysis.") return False @@ -686,6 +655,10 @@ class CapaExplorerForm(idaapi.PluginForm): # function should handle exceptions and return False if not self.load_capa_rules(): return False + assert self.ruleset_cache is not None + # matching operations may update rule instances, + # so we'll work with a local copy of the ruleset. + ruleset = copy.deepcopy(self.ruleset_cache) if ida_kernwin.user_cancelled(): logger.info("User cancelled analysis.") @@ -694,17 +667,10 @@ class CapaExplorerForm(idaapi.PluginForm): update_wait_box("extracting features") try: - # just generated above - assert self.ruleset_cache is not None - meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]]) - capabilities, counts = capa.main.find_capabilities( - self.ruleset_cache.ruleset, extractor, disable_progress=True - ) + capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True) meta["analysis"].update(counts) - meta["analysis"]["layout"] = capa.main.compute_layout( - self.ruleset_cache.ruleset, extractor, capabilities - ) + meta["analysis"]["layout"] = capa.main.compute_layout(ruleset, extractor, capabilities) except UserCancelledError: logger.info("User cancelled analysis.") return False @@ -735,7 +701,7 @@ class CapaExplorerForm(idaapi.PluginForm): capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") - if capa.main.has_file_limitation(self.ruleset_cache.ruleset, capabilities, is_standalone=False): + if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") except Exception as e: logger.error("Failed to check for file limitations (error: %s)", e, exc_info=True) @@ -748,9 +714,7 @@ class CapaExplorerForm(idaapi.PluginForm): update_wait_box("rendering results") try: - self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa( - meta, self.ruleset_cache.ruleset, capabilities - ) + self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(meta, ruleset, capabilities) except Exception as e: logger.error("Failed to collect results (error: %s)", e, exc_info=True) return False @@ -759,12 +723,10 @@ class CapaExplorerForm(idaapi.PluginForm): # either the results are cached and the doc already exists, # or the doc was just created above assert self.resdoc_cache is not None - # same with rules cache, either it's cached or it was just loaded - assert self.ruleset_cache is not None self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked()) self.set_view_status_label( - "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules)) + "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count) ) except Exception as e: logger.error("Failed to render results (error: %s)", e, exc_info=True) @@ -809,6 +771,11 @@ class CapaExplorerForm(idaapi.PluginForm): else: logger.info('Using cached capa rules, click "Reset" to load rules from disk.') + assert self.ruleset_cache is not None + # matching operations may update rule instances, + # so we'll work with a local copy of the ruleset. + ruleset = copy.deepcopy(self.ruleset_cache) + # clear feature cache if self.rulegen_feature_cache is not None: self.rulegen_feature_cache = None @@ -868,18 +835,16 @@ class CapaExplorerForm(idaapi.PluginForm): all_function_features: FeatureSet = collections.defaultdict(set) try: - assert self.ruleset_cache is not None - if self.rulegen_current_function is not None: _, func_matches, bb_matches, insn_matches = self.rulegen_feature_cache.find_code_capabilities( - self.ruleset_cache.ruleset, self.rulegen_current_function + ruleset, self.rulegen_current_function ) all_function_features.update( self.rulegen_feature_cache.get_all_function_features(self.rulegen_current_function) ) for (name, result) in itertools.chain(func_matches.items(), bb_matches.items(), insn_matches.items()): - rule = self.ruleset_cache.ruleset[name] + rule = ruleset[name] if rule.is_subscope_rule(): continue for (addr, _) in result: @@ -896,13 +861,11 @@ class CapaExplorerForm(idaapi.PluginForm): all_file_features: FeatureSet = collections.defaultdict(set) try: - assert self.ruleset_cache is not None - - _, file_matches = self.rulegen_feature_cache.find_file_capabilities(self.ruleset_cache.ruleset) + _, file_matches = self.rulegen_feature_cache.find_file_capabilities(ruleset) all_file_features.update(self.rulegen_feature_cache.get_all_file_features()) for (name, result) in file_matches.items(): - rule = self.ruleset_cache.ruleset[name] + rule = ruleset[name] if rule.is_subscope_rule(): continue for (addr, _) in result: @@ -928,7 +891,7 @@ class CapaExplorerForm(idaapi.PluginForm): self.view_rulegen_features.load_features(all_file_features, all_function_features) self.set_view_status_label( - "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules)) + "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count) ) except Exception as e: logger.error("Failed to render views (error: %s)", e, exc_info=True) @@ -1037,7 +1000,9 @@ class CapaExplorerForm(idaapi.PluginForm): return # we must create a deep copy of rules because any rule matching operations modify the original rule - rules = copy.deepcopy(self.ruleset_cache.rules) + # the ruleset may derive subscope rules from the source rules loaded from disk. + # by ignoring them, we reconstruct the collection of rules provided by the user. + rules = copy.deepcopy([r for r in self.ruleset_cache.rules.values() if not r.is_subscope_rule()]) rules.append(rule) try: diff --git a/capa/main.py b/capa/main.py index 1741fad9..81f7e47d 100644 --- a/capa/main.py +++ b/capa/main.py @@ -20,7 +20,7 @@ import textwrap import itertools import contextlib import collections -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Tuple, Callable import halo import tqdm @@ -33,6 +33,7 @@ import capa.rules import capa.engine import capa.version import capa.render.json +import capa.rules.cache import capa.render.default import capa.render.verbose import capa.features.common @@ -561,7 +562,10 @@ def is_nursery_rule_path(path: str) -> bool: return "nursery" in path -def get_rules(rule_paths: List[str], disable_progress=False) -> List[Rule]: +def collect_rule_file_paths(rule_paths: List[str]) -> List[str]: + """ + collect all rule file paths, including those in subdirectories. + """ rule_file_paths = [] for rule_path in rule_paths: if not os.path.exists(rule_path): @@ -589,28 +593,69 @@ def get_rules(rule_paths: List[str], disable_progress=False) -> List[Rule]: rule_path = os.path.join(root, file) rule_file_paths.append(rule_path) + return rule_file_paths + + +# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+ +RulePath = str + + +def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None: + return + + +def get_rules( + rule_paths: List[RulePath], + cache_dir=None, + on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default, +) -> RuleSet: + """ + args: + rule_paths: list of paths to rules files or directories containing rules files + cache_dir: directory to use for caching rules, or will use the default detected cache directory if None + on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation + """ + if cache_dir is None: + cache_dir = capa.rules.cache.get_default_cache_directory() + + # rule_paths may contain directory paths, + # so search for file paths recursively. + rule_file_paths = collect_rule_file_paths(rule_paths) + + # this list is parallel to `rule_file_paths`: + # rule_file_paths[i] corresponds to rule_contents[i]. + rule_contents = [] + for file_path in rule_file_paths: + with open(file_path, "rb") as f: + rule_contents.append(f.read()) + + ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents) + if ruleset is not None: + return ruleset + rules = [] # type: List[Rule] - pbar = tqdm.tqdm - if disable_progress: - # do not use tqdm to avoid unnecessary side effects when caller intends - # to disable progress completely - pbar = lambda s, *args, **kwargs: s + total_rule_count = len(rule_file_paths) + for i, (path, content) in enumerate(zip(rule_file_paths, rule_contents)): + on_load_rule(path, i, total_rule_count) - for rule_file_path in pbar(list(rule_file_paths), desc="loading ", unit=" rules"): try: - rule = capa.rules.Rule.from_yaml_file(rule_file_path) + rule = capa.rules.Rule.from_yaml(content.decode("utf-8")) except capa.rules.InvalidRule: raise else: - rule.meta["capa/path"] = rule_file_path - if is_nursery_rule_path(rule_file_path): + rule.meta["capa/path"] = path + if is_nursery_rule_path(path): rule.meta["capa/nursery"] = True rules.append(rule) logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scope) - return rules + ruleset = capa.rules.RuleSet(rules) + + capa.rules.cache.cache_ruleset(cache_dir, ruleset) + + return ruleset def get_signatures(sigs_path): @@ -849,6 +894,9 @@ def handle_common_args(args): - rules: file system path to rule files. - signatures: file system path to signature files. + the following field may be added: + - is_default_rules: if the default rules were used. + args: args (argparse.Namespace): parsed arguments that included at least `install_common_args` args. """ @@ -908,6 +956,7 @@ def handle_common_args(args): return E_MISSING_RULES rules_paths.append(default_rule_path) + args.is_default_rules = True else: rules_paths = args.rules @@ -917,6 +966,8 @@ def handle_common_args(args): for rule_path in rules_paths: logger.debug("using rules path: %s", rule_path) + args.is_default_rules = False + args.rules = rules_paths if hasattr(args, "signatures"): @@ -1002,7 +1053,12 @@ def main(argv=None): return E_INVALID_FILE_TYPE try: - rules = capa.rules.RuleSet(get_rules(args.rules, disable_progress=args.quiet)) + if is_running_standalone() and args.is_default_rules: + cache_dir = os.path.join(get_default_root(), "cache") + else: + cache_dir = capa.rules.cache.get_default_cache_directory() + + rules = get_rules(args.rules, cache_dir=cache_dir) logger.debug( "successfully loaded %s rules", @@ -1149,7 +1205,7 @@ def ida_main(): rules_path = os.path.join(get_default_root(), "rules") logger.debug("rule path: %s", rules_path) - rules = capa.rules.RuleSet(get_rules([rules_path])) + rules = get_rules([rules_path]) meta = capa.ida.helpers.collect_metadata([rules_path]) diff --git a/capa/rules.py b/capa/rules/__init__.py similarity index 99% rename from capa/rules.py rename to capa/rules/__init__.py index ac344334..7d98e25a 100644 --- a/capa/rules.py +++ b/capa/rules/__init__.py @@ -743,7 +743,7 @@ class Rule: return self.statement.evaluate(features, short_circuit=short_circuit) @classmethod - def from_dict(cls, d, definition) -> "Rule": + def from_dict(cls, d: Dict[str, Any], definition: str) -> "Rule": meta = d["rule"]["meta"] name = meta["name"] # if scope is not specified, default to function scope. @@ -813,7 +813,7 @@ class Rule: return y @classmethod - def from_yaml(cls, s, use_ruamel=False) -> "Rule": + def from_yaml(cls, s: str, use_ruamel=False) -> "Rule": if use_ruamel: # ruamel enables nice formatting and doc roundtripping with comments doc = cls._get_ruamel_yaml_parser().load(s) @@ -1081,6 +1081,14 @@ class RuleSet: ensure_rules_are_unique(rules) + # in the next step we extract subscope rules, + # which may inflate the number of rules tracked in this ruleset. + # so record number of rules initially provided to this ruleset. + # + # this number is really only meaningful to the user, + # who may compare it against the number of files on their file system. + self.source_rule_count = len(rules) + rules = self._extract_subscope_rules(rules) ensure_rule_dependencies_are_met(rules) diff --git a/capa/rules/cache.py b/capa/rules/cache.py new file mode 100644 index 00000000..e2bdfc70 --- /dev/null +++ b/capa/rules/cache.py @@ -0,0 +1,155 @@ +import sys +import zlib +import pickle +import hashlib +import logging +import os.path +from typing import List, Optional +from dataclasses import dataclass + +import capa.rules +import capa.helpers +import capa.version + +logger = logging.getLogger(__name__) + + +# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+ +CacheIdentifier = str + + +def compute_cache_identifier(rule_content: List[bytes]) -> CacheIdentifier: + hash = hashlib.sha256() + + # note that this changes with each release, + # so cache identifiers will never collide across releases. + version = capa.version.__version__ + + hash.update(version.encode("utf-8")) + hash.update(b"\x00") + + rule_hashes = list(sorted([hashlib.sha256(buf).hexdigest() for buf in rule_content])) + for rule_hash in rule_hashes: + hash.update(rule_hash.encode("ascii")) + hash.update(b"\x00") + + return hash.hexdigest() + + +def get_default_cache_directory() -> str: + # ref: https://github.com/mandiant/capa/issues/1212#issuecomment-1361259813 + # + # Linux: $XDG_CACHE_HOME/capa/ + # Windows: %LOCALAPPDATA%\flare\capa\cache + # MacOS: ~/Library/Caches/capa + + # ref: https://stackoverflow.com/a/8220141/87207 + if sys.platform == "linux" or sys.platform == "linux2": + directory = os.environ.get("XDG_CACHE_HOME", os.path.join(os.environ["HOME"], ".cache", "capa")) + elif sys.platform == "darwin": + directory = os.path.join(os.environ["HOME"], "Library", "Caches", "capa") + elif sys.platform == "win32": + directory = os.path.join(os.environ["LOCALAPPDATA"], "flare", "capa", "cache") + else: + raise NotImplementedError(f"unsupported platform: {sys.platform}") + + os.makedirs(directory, exist_ok=True) + + return directory + + +def get_cache_path(cache_dir: str, id: CacheIdentifier) -> str: + filename = "capa-" + id[:8] + ".cache" + return os.path.join(cache_dir, filename) + + +MAGIC = b"capa" +VERSION = b"\x00\x00\x00\x01" + + +@dataclass +class RuleCache: + id: CacheIdentifier + ruleset: capa.rules.RuleSet + + def dump(self): + return MAGIC + VERSION + self.id.encode("ascii") + zlib.compress(pickle.dumps(self)) + + @staticmethod + def load(data): + assert data.startswith(MAGIC + VERSION) + + id = data[0x8:0x48].decode("ascii") + cache = pickle.loads(zlib.decompress(data[0x48:])) + + assert isinstance(cache, RuleCache) + assert cache.id == id + + return cache + + +def get_ruleset_content(ruleset: capa.rules.RuleSet) -> List[bytes]: + rule_contents = [] + for rule in ruleset.rules.values(): + if rule.is_subscope_rule(): + continue + rule_contents.append(rule.definition.encode("utf-8")) + return rule_contents + + +def compute_ruleset_cache_identifier(ruleset: capa.rules.RuleSet) -> CacheIdentifier: + rule_contents = get_ruleset_content(ruleset) + return compute_cache_identifier(rule_contents) + + +def cache_ruleset(cache_dir: str, ruleset: capa.rules.RuleSet): + """ + cache the given ruleset to disk, using the given cache directory. + this can subsequently be reloaded via `load_cached_ruleset`, + assuming the capa version and rule content does not change. + + callers should use this function to avoid the performance overhead + of validating rules on each run. + """ + id = compute_ruleset_cache_identifier(ruleset) + path = get_cache_path(cache_dir, id) + if os.path.exists(path): + logger.debug("rule set already cached to %s", path) + return + + cache = RuleCache(id, ruleset) + with open(path, "wb") as f: + f.write(cache.dump()) + + logger.debug("rule set cached to %s", path) + return + + +def load_cached_ruleset(cache_dir: str, rule_contents: List[bytes]) -> Optional[capa.rules.RuleSet]: + """ + load a cached ruleset from disk, using the given cache directory. + the raw rule contents are required here to prove that the rules haven't changed + and to avoid stale cache entries. + + callers should use this function to avoid the performance overhead + of validating rules on each run. + """ + id = compute_cache_identifier(rule_contents) + path = get_cache_path(cache_dir, id) + if not os.path.exists(path): + logger.debug("rule set cache does not exist: %s", path) + return None + + logger.debug("loading rule set from cache: %s", path) + with open(path, "rb") as f: + buf = f.read() + + try: + cache = RuleCache.load(buf) + except AssertionError: + logger.debug("rule set cache is invalid: %s", path) + # delete the cache that seems to be invalid. + os.remove(path) + return None + else: + return cache.ruleset diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index b57928c6..c235116d 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -152,7 +152,7 @@ def main(argv=None): capa.main.handle_common_args(args) try: - rules = capa.rules.RuleSet(capa.main.get_rules(args.rules)) + rules = capa.main.get_rules(args.rules) logger.info("successfully loaded %s rules", len(rules)) except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: logger.error("%s", str(e)) diff --git a/scripts/cache-ruleset.py b/scripts/cache-ruleset.py new file mode 100644 index 00000000..a2a49bdb --- /dev/null +++ b/scripts/cache-ruleset.py @@ -0,0 +1,67 @@ +""" +Create a cache of the given rules. +This is only really intended to be used by CI to pre-cache rulesets +that will be distributed within PyInstaller binaries. + +Usage: + + $ python scripts/cache-ruleset.py rules/ /path/to/cache/directory + +Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. +You may obtain a copy of the License at: [package root]/LICENSE.txt +Unless required by applicable law or agreed to in writing, software distributed under the License + is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +""" +import os +import sys +import time +import logging +import argparse + +import capa.main +import capa.rules +import capa.engine +import capa.helpers +import capa.rules.cache +import capa.features.insn + +logger = logging.getLogger("cache-ruleset") + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + parser = argparse.ArgumentParser(description="Cache ruleset.") + capa.main.install_common_args(parser) + parser.add_argument("rules", type=str, action="append", help="Path to rules") + parser.add_argument("cache", type=str, help="Path to cache directory") + args = parser.parse_args(args=argv) + capa.main.handle_common_args(args) + + if args.debug: + logging.getLogger("capa").setLevel(logging.DEBUG) + else: + logging.getLogger("capa").setLevel(logging.ERROR) + + try: + os.makedirs(args.cache, exist_ok=True) + rules = capa.main.get_rules(args.rules, cache_dir=args.cache) + logger.info("successfully loaded %s rules", len(rules)) + except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: + logger.error("%s", str(e)) + return -1 + + content = capa.rules.cache.get_ruleset_content(rules) + id = capa.rules.cache.compute_cache_identifier(content) + path = capa.rules.cache.get_cache_path(args.cache, id) + + assert os.path.exists(path) + logger.info("cached to: %s", path) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/capa2yara.py b/scripts/capa2yara.py index 7fd4ad41..8e477a6b 100644 --- a/scripts/capa2yara.py +++ b/scripts/capa2yara.py @@ -709,9 +709,8 @@ def main(argv=None): logging.getLogger("capa2yara").setLevel(level) try: - rules_ = capa.main.get_rules([args.rules], disable_progress=True) - namespaces = capa.rules.index_rules_by_namespace(rules_) - rules = capa.rules.RuleSet(rules_) + rules = capa.main.get_rules([args.rules]) + namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values())) logger.info("successfully loaded %s rules (including subscope rules which will be ignored)", len(rules)) if args.tag: rules = rules.filter_rules_by_meta(args.tag) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 07b408cc..1122969b 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -161,7 +161,7 @@ def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]: # ==== render dictionary helpers def capa_details(rules_path, file_path, output_format="dictionary"): # load rules from disk - rules = capa.rules.RuleSet(capa.main.get_rules([rules_path], disable_progress=True)) + rules = capa.main.get_rules([rules_path]) # extract features and find capabilities extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, [], False, disable_progress=True) diff --git a/scripts/lint.py b/scripts/lint.py index 767cd0a8..2c0fa739 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -917,12 +917,11 @@ def lint(ctx: Context): """ ret = {} - with tqdm.contrib.logging.tqdm_logging_redirect(ctx.rules.rules.items(), unit="rule") as pbar: + source_rules = [rule for rule in ctx.rules.rules.values() if not rule.is_subscope_rule()] + with tqdm.contrib.logging.tqdm_logging_redirect(source_rules, unit="rule") as pbar: with redirecting_print_to_tqdm(): - for name, rule in pbar: - if rule.is_subscope_rule(): - continue - + for rule in pbar: + name = rule.name pbar.set_description(width("linting rule: %s" % (name), 48)) ret[name] = lint_rule(ctx, rule) @@ -998,8 +997,8 @@ def main(argv=None): time0 = time.time() try: - rules = capa.rules.RuleSet(capa.main.get_rules(args.rules, disable_progress=True)) - logger.info("successfully loaded %s rules", len(rules)) + rules = capa.main.get_rules(args.rules) + logger.info("successfully loaded %s rules", rules.source_rule_count) if args.tag: rules = rules.filter_rules_by_meta(args.tag) logger.debug("selected %s rules", len(rules)) diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 3d6b5e07..73caabb9 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -88,7 +88,7 @@ def main(argv=None): try: with capa.main.timing("load rules"): - rules = capa.rules.RuleSet(capa.main.get_rules(args.rules, disable_progress=True)) + rules = capa.main.get_rules(args.rules) except (IOError) as e: logger.error("%s", str(e)) return -1 diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index f4303fef..c96920d0 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -141,7 +141,7 @@ def main(argv=None): return -1 try: - rules = capa.rules.RuleSet(capa.main.get_rules(args.rules)) + rules = capa.main.get_rules(args.rules) logger.info("successfully loaded %s rules", len(rules)) if args.tag: rules = rules.filter_rules_by_meta(args.tag) diff --git a/tests/test_rule_cache.py b/tests/test_rule_cache.py new file mode 100644 index 00000000..fb11e5e7 --- /dev/null +++ b/tests/test_rule_cache.py @@ -0,0 +1,115 @@ +# Copyright (C) 2023 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import os +import textwrap + +import capa.rules +import capa.rules.cache + +R1 = capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + authors: + - user@domain.com + scope: function + examples: + - foo1234 + - bar5678 + features: + - and: + - number: 1 + - number: 2 + """ + ) +) + +R2 = capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule 2 + authors: + - user@domain.com + scope: function + examples: + - foo1234 + - bar5678 + features: + - and: + - number: 3 + - number: 4 + """ + ) +) + + +def test_ruleset_cache_ids(): + rs = capa.rules.RuleSet([R1]) + content = capa.rules.cache.get_ruleset_content(rs) + + rs2 = capa.rules.RuleSet([R1, R2]) + content2 = capa.rules.cache.get_ruleset_content(rs2) + + id = capa.rules.cache.compute_cache_identifier(content) + id2 = capa.rules.cache.compute_cache_identifier(content2) + assert id != id2 + + +def test_ruleset_cache_save_load(): + rs = capa.rules.RuleSet([R1]) + content = capa.rules.cache.get_ruleset_content(rs) + + id = capa.rules.cache.compute_cache_identifier(content) + assert id is not None + + cache_dir = capa.rules.cache.get_default_cache_directory() + + path = capa.rules.cache.get_cache_path(cache_dir, id) + try: + os.remove(path) + except OSError: + pass + + capa.rules.cache.cache_ruleset(cache_dir, rs) + assert os.path.exists(path) + + assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is not None + + +def test_ruleset_cache_invalid(): + rs = capa.rules.RuleSet([R1]) + content = capa.rules.cache.get_ruleset_content(rs) + id = capa.rules.cache.compute_cache_identifier(content) + cache_dir = capa.rules.cache.get_default_cache_directory() + path = capa.rules.cache.get_cache_path(cache_dir, id) + try: + os.remove(path) + except OSError: + pass + + capa.rules.cache.cache_ruleset(cache_dir, rs) + assert os.path.exists(path) + + with open(path, "rb") as f: + buf = f.read() + + # corrupt the magic header + buf = b"x" + buf[1:] + + with open(path, "wb") as f: + f.write(buf) + + assert os.path.exists(path) + assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is None + # the invalid cache should be deleted + assert not os.path.exists(path) diff --git a/tests/test_rules.py b/tests/test_rules.py index 466ac306..fe154c39 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -31,7 +31,6 @@ from capa.features.common import ( Substring, FeatureAccess, ) -from capa.features.address import AbsoluteVirtualAddress ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001) ADDR2 = capa.features.address.AbsoluteVirtualAddress(0x401002)