Merge pull request #1291 from mandiant/rules-cache

cache rule set across invocations of capa
This commit is contained in:
Willi Ballenthin
2023-01-25 17:52:34 +01:00
committed by GitHub
17 changed files with 474 additions and 115 deletions

View File

@@ -19,6 +19,7 @@ a = Analysis(
# i.e. ./.github/pyinstaller
("../../rules", "rules"),
("../../sigs", "sigs"),
("../../cache", "cache"),
# capa.render.default uses tabulate that depends on wcwidth.
# it seems wcwidth uses a json file `version.json`
# and this doesn't get picked up by pyinstaller automatically.

View File

@@ -42,6 +42,8 @@ jobs:
run: python -m pip install --upgrade pip setuptools
- name: Install capa with build requirements
run: pip install -e .[build]
- name: Cache the rule set
run: python ./scripts/cache-ruleset.py ./rules/ ./cache/
- name: Build standalone executable
run: pyinstaller --log-level DEBUG .github/pyinstaller/pyinstaller.spec
- name: Does it run (PE)?

View File

@@ -13,6 +13,7 @@
- dotnet: emit namespace/class features for type references #1242 @mike-hunhoff
- dotnet: extract dotnet and pe format #1187 @mr-tz
- don't render all library rule matches in vverbose output #1174 @mr-tz
- cache the rule set across invocations for better performance #1212 @williballenthin
### Breaking Changes
- remove SMDA backend #1062 @williballenthin

View File

@@ -8,26 +8,18 @@
from __future__ import annotations
import copy
import itertools
import collections
from typing import Set, Dict, List, Tuple, Union, Optional
import capa.engine
from capa.rules import Rule, Scope, RuleSet
from capa.rules import Scope, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.features.address import NO_ADDRESS, Address
from capa.ida.plugin.extractor import CapaExplorerFeatureExtractor
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
class CapaExplorerRuleSetCache:
def __init__(self, rules: List[Rule]):
# capa.rules.Ruleset modifies rules, so we use deepcopy to preserve the original list of rules and our cached list of rules
self.rules: List[Rule] = copy.deepcopy(rules)
self.ruleset: RuleSet = RuleSet(copy.deepcopy(self.rules))
class CapaRuleGenFeatureCacheNode:
def __init__(
self,

View File

@@ -27,7 +27,7 @@ import capa.render.json
import capa.features.common
import capa.render.result_document
import capa.features.extractors.ida.extractor
from capa.rules import Rule
from capa.rules import Rule, RuleSet
from capa.engine import FeatureSet
from capa.ida.plugin.icon import QICON
from capa.ida.plugin.view import (
@@ -36,7 +36,7 @@ from capa.ida.plugin.view import (
CapaExplorerRulegenPreview,
CapaExplorerRulegenFeatures,
)
from capa.ida.plugin.cache import CapaRuleGenFeatureCache, CapaExplorerRuleSetCache
from capa.ida.plugin.cache import CapaRuleGenFeatureCache
from capa.ida.plugin.error import UserCancelledError
from capa.ida.plugin.hooks import CapaExplorerIdaHooks
from capa.ida.plugin.model import CapaExplorerDataModel
@@ -160,7 +160,7 @@ class CapaExplorerForm(idaapi.PluginForm):
# caches used to speed up capa explorer analysis - these must be init to None
self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None
self.ruleset_cache: Optional[CapaExplorerRuleSetCache] = None
self.ruleset_cache: Optional[capa.rules.RuleSet] = None
self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None
self.rulegen_current_function: Optional[FunctionHandle] = None
@@ -525,8 +525,7 @@ class CapaExplorerForm(idaapi.PluginForm):
meta["prev_base"] = idaapi.get_imagebase()
self.model_data.reset()
def load_capa_rules(self):
"""load capa rules from directory specified by user, either using IDA UI or settings"""
def ensure_capa_settings_rule_path(self):
try:
# resolve rules directory - check self and settings first, then ask user
if not os.path.exists(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")):
@@ -567,56 +566,26 @@ class CapaExplorerForm(idaapi.PluginForm):
logger.info("User cancelled analysis.")
return False
if not os.path.exists(path):
logger.error("rule path %s does not exist or cannot be accessed" % path)
return False
return True
def load_capa_rules(self):
"""load capa rules from directory specified by user, either using IDA UI or settings"""
if not self.ensure_capa_settings_rule_path():
return False
rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "")
try:
# following code is derived from capa.main.get_rules, we dup it here so we can inject code that allows
# user to cancel analysis from IDA UI
if not os.path.exists(rule_path):
raise IOError("rule path %s does not exist or cannot be accessed" % rule_path)
rule_paths: List[str] = []
if os.path.isfile(rule_path):
rule_paths.append(rule_path)
elif os.path.isdir(rule_path):
for root, dirs, files in os.walk(rule_path):
if ".git" in root:
# the .github directory contains CI config in capa-rules
# this includes some .yml files
# these are not rules
# additionally, .git has files that are not .yml and generate the warning
# skip those too
continue
for file in files:
if not file.endswith(".yml"):
if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
# expect to see .git* files, readme.md, format.md, and maybe a .git directory
# other things maybe are rules, but are mis-named.
logger.warning("skipping non-.yml file: %s", file)
continue
rule_path = os.path.join(root, file)
rule_paths.append(rule_path)
rules: List[Rule] = []
total_paths: int = len(rule_paths)
for (i, rule_path) in enumerate(rule_paths):
update_wait_box(
"loading capa rules from %s (%d of %d)"
% (settings.user[CAPA_SETTINGS_RULE_PATH], i + 1, total_paths)
)
def on_load_rule(rule_path, i, total):
update_wait_box("loading capa rules from %s (%d of %d)" % (rule_path, i, total))
if ida_kernwin.user_cancelled():
raise UserCancelledError("user cancelled")
try:
rule = capa.rules.Rule.from_yaml_file(rule_path)
except capa.rules.InvalidRule:
raise
else:
rule.meta["capa/path"] = rule_path
if capa.main.is_nursery_rule_path(rule_path):
rule.meta["capa/nursery"] = True
rules.append(rule)
# cache rules and rule set
self.ruleset_cache = CapaExplorerRuleSetCache(rules)
self.ruleset_cache = capa.main.get_rules([rule_path], on_load_rule=on_load_rule)
except UserCancelledError:
logger.info("User cancelled analysis.")
return False
@@ -686,6 +655,10 @@ class CapaExplorerForm(idaapi.PluginForm):
# function should handle exceptions and return False
if not self.load_capa_rules():
return False
assert self.ruleset_cache is not None
# matching operations may update rule instances,
# so we'll work with a local copy of the ruleset.
ruleset = copy.deepcopy(self.ruleset_cache)
if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
@@ -694,17 +667,10 @@ class CapaExplorerForm(idaapi.PluginForm):
update_wait_box("extracting features")
try:
# just generated above
assert self.ruleset_cache is not None
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
capabilities, counts = capa.main.find_capabilities(
self.ruleset_cache.ruleset, extractor, disable_progress=True
)
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
meta["analysis"].update(counts)
meta["analysis"]["layout"] = capa.main.compute_layout(
self.ruleset_cache.ruleset, extractor, capabilities
)
meta["analysis"]["layout"] = capa.main.compute_layout(ruleset, extractor, capabilities)
except UserCancelledError:
logger.info("User cancelled analysis.")
return False
@@ -735,7 +701,7 @@ class CapaExplorerForm(idaapi.PluginForm):
capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis")
if capa.main.has_file_limitation(self.ruleset_cache.ruleset, capabilities, is_standalone=False):
if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False):
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
except Exception as e:
logger.error("Failed to check for file limitations (error: %s)", e, exc_info=True)
@@ -748,9 +714,7 @@ class CapaExplorerForm(idaapi.PluginForm):
update_wait_box("rendering results")
try:
self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(
meta, self.ruleset_cache.ruleset, capabilities
)
self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(meta, ruleset, capabilities)
except Exception as e:
logger.error("Failed to collect results (error: %s)", e, exc_info=True)
return False
@@ -759,12 +723,10 @@ class CapaExplorerForm(idaapi.PluginForm):
# either the results are cached and the doc already exists,
# or the doc was just created above
assert self.resdoc_cache is not None
# same with rules cache, either it's cached or it was just loaded
assert self.ruleset_cache is not None
self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked())
self.set_view_status_label(
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules))
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count)
)
except Exception as e:
logger.error("Failed to render results (error: %s)", e, exc_info=True)
@@ -809,6 +771,11 @@ class CapaExplorerForm(idaapi.PluginForm):
else:
logger.info('Using cached capa rules, click "Reset" to load rules from disk.')
assert self.ruleset_cache is not None
# matching operations may update rule instances,
# so we'll work with a local copy of the ruleset.
ruleset = copy.deepcopy(self.ruleset_cache)
# clear feature cache
if self.rulegen_feature_cache is not None:
self.rulegen_feature_cache = None
@@ -868,18 +835,16 @@ class CapaExplorerForm(idaapi.PluginForm):
all_function_features: FeatureSet = collections.defaultdict(set)
try:
assert self.ruleset_cache is not None
if self.rulegen_current_function is not None:
_, func_matches, bb_matches, insn_matches = self.rulegen_feature_cache.find_code_capabilities(
self.ruleset_cache.ruleset, self.rulegen_current_function
ruleset, self.rulegen_current_function
)
all_function_features.update(
self.rulegen_feature_cache.get_all_function_features(self.rulegen_current_function)
)
for (name, result) in itertools.chain(func_matches.items(), bb_matches.items(), insn_matches.items()):
rule = self.ruleset_cache.ruleset[name]
rule = ruleset[name]
if rule.is_subscope_rule():
continue
for (addr, _) in result:
@@ -896,13 +861,11 @@ class CapaExplorerForm(idaapi.PluginForm):
all_file_features: FeatureSet = collections.defaultdict(set)
try:
assert self.ruleset_cache is not None
_, file_matches = self.rulegen_feature_cache.find_file_capabilities(self.ruleset_cache.ruleset)
_, file_matches = self.rulegen_feature_cache.find_file_capabilities(ruleset)
all_file_features.update(self.rulegen_feature_cache.get_all_file_features())
for (name, result) in file_matches.items():
rule = self.ruleset_cache.ruleset[name]
rule = ruleset[name]
if rule.is_subscope_rule():
continue
for (addr, _) in result:
@@ -928,7 +891,7 @@ class CapaExplorerForm(idaapi.PluginForm):
self.view_rulegen_features.load_features(all_file_features, all_function_features)
self.set_view_status_label(
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules))
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count)
)
except Exception as e:
logger.error("Failed to render views (error: %s)", e, exc_info=True)
@@ -1037,7 +1000,9 @@ class CapaExplorerForm(idaapi.PluginForm):
return
# we must create a deep copy of rules because any rule matching operations modify the original rule
rules = copy.deepcopy(self.ruleset_cache.rules)
# the ruleset may derive subscope rules from the source rules loaded from disk.
# by ignoring them, we reconstruct the collection of rules provided by the user.
rules = copy.deepcopy([r for r in self.ruleset_cache.rules.values() if not r.is_subscope_rule()])
rules.append(rule)
try:

View File

@@ -20,7 +20,7 @@ import textwrap
import itertools
import contextlib
import collections
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Tuple, Callable
import halo
import tqdm
@@ -33,6 +33,7 @@ import capa.rules
import capa.engine
import capa.version
import capa.render.json
import capa.rules.cache
import capa.render.default
import capa.render.verbose
import capa.features.common
@@ -561,7 +562,10 @@ def is_nursery_rule_path(path: str) -> bool:
return "nursery" in path
def get_rules(rule_paths: List[str], disable_progress=False) -> List[Rule]:
def collect_rule_file_paths(rule_paths: List[str]) -> List[str]:
"""
collect all rule file paths, including those in subdirectories.
"""
rule_file_paths = []
for rule_path in rule_paths:
if not os.path.exists(rule_path):
@@ -589,28 +593,69 @@ def get_rules(rule_paths: List[str], disable_progress=False) -> List[Rule]:
rule_path = os.path.join(root, file)
rule_file_paths.append(rule_path)
return rule_file_paths
# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+
RulePath = str
def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None:
return
def get_rules(
rule_paths: List[RulePath],
cache_dir=None,
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
) -> RuleSet:
"""
args:
rule_paths: list of paths to rules files or directories containing rules files
cache_dir: directory to use for caching rules, or will use the default detected cache directory if None
on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation
"""
if cache_dir is None:
cache_dir = capa.rules.cache.get_default_cache_directory()
# rule_paths may contain directory paths,
# so search for file paths recursively.
rule_file_paths = collect_rule_file_paths(rule_paths)
# this list is parallel to `rule_file_paths`:
# rule_file_paths[i] corresponds to rule_contents[i].
rule_contents = []
for file_path in rule_file_paths:
with open(file_path, "rb") as f:
rule_contents.append(f.read())
ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None:
return ruleset
rules = [] # type: List[Rule]
pbar = tqdm.tqdm
if disable_progress:
# do not use tqdm to avoid unnecessary side effects when caller intends
# to disable progress completely
pbar = lambda s, *args, **kwargs: s
total_rule_count = len(rule_file_paths)
for i, (path, content) in enumerate(zip(rule_file_paths, rule_contents)):
on_load_rule(path, i, total_rule_count)
for rule_file_path in pbar(list(rule_file_paths), desc="loading ", unit=" rules"):
try:
rule = capa.rules.Rule.from_yaml_file(rule_file_path)
rule = capa.rules.Rule.from_yaml(content.decode("utf-8"))
except capa.rules.InvalidRule:
raise
else:
rule.meta["capa/path"] = rule_file_path
if is_nursery_rule_path(rule_file_path):
rule.meta["capa/path"] = path
if is_nursery_rule_path(path):
rule.meta["capa/nursery"] = True
rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scope)
return rules
ruleset = capa.rules.RuleSet(rules)
capa.rules.cache.cache_ruleset(cache_dir, ruleset)
return ruleset
def get_signatures(sigs_path):
@@ -849,6 +894,9 @@ def handle_common_args(args):
- rules: file system path to rule files.
- signatures: file system path to signature files.
the following field may be added:
- is_default_rules: if the default rules were used.
args:
args (argparse.Namespace): parsed arguments that included at least `install_common_args` args.
"""
@@ -908,6 +956,7 @@ def handle_common_args(args):
return E_MISSING_RULES
rules_paths.append(default_rule_path)
args.is_default_rules = True
else:
rules_paths = args.rules
@@ -917,6 +966,8 @@ def handle_common_args(args):
for rule_path in rules_paths:
logger.debug("using rules path: %s", rule_path)
args.is_default_rules = False
args.rules = rules_paths
if hasattr(args, "signatures"):
@@ -1002,7 +1053,12 @@ def main(argv=None):
return E_INVALID_FILE_TYPE
try:
rules = capa.rules.RuleSet(get_rules(args.rules, disable_progress=args.quiet))
if is_running_standalone() and args.is_default_rules:
cache_dir = os.path.join(get_default_root(), "cache")
else:
cache_dir = capa.rules.cache.get_default_cache_directory()
rules = get_rules(args.rules, cache_dir=cache_dir)
logger.debug(
"successfully loaded %s rules",
@@ -1149,7 +1205,7 @@ def ida_main():
rules_path = os.path.join(get_default_root(), "rules")
logger.debug("rule path: %s", rules_path)
rules = capa.rules.RuleSet(get_rules([rules_path]))
rules = get_rules([rules_path])
meta = capa.ida.helpers.collect_metadata([rules_path])

View File

@@ -743,7 +743,7 @@ class Rule:
return self.statement.evaluate(features, short_circuit=short_circuit)
@classmethod
def from_dict(cls, d, definition) -> "Rule":
def from_dict(cls, d: Dict[str, Any], definition: str) -> "Rule":
meta = d["rule"]["meta"]
name = meta["name"]
# if scope is not specified, default to function scope.
@@ -813,7 +813,7 @@ class Rule:
return y
@classmethod
def from_yaml(cls, s, use_ruamel=False) -> "Rule":
def from_yaml(cls, s: str, use_ruamel=False) -> "Rule":
if use_ruamel:
# ruamel enables nice formatting and doc roundtripping with comments
doc = cls._get_ruamel_yaml_parser().load(s)
@@ -1081,6 +1081,14 @@ class RuleSet:
ensure_rules_are_unique(rules)
# in the next step we extract subscope rules,
# which may inflate the number of rules tracked in this ruleset.
# so record number of rules initially provided to this ruleset.
#
# this number is really only meaningful to the user,
# who may compare it against the number of files on their file system.
self.source_rule_count = len(rules)
rules = self._extract_subscope_rules(rules)
ensure_rule_dependencies_are_met(rules)

155
capa/rules/cache.py Normal file
View File

@@ -0,0 +1,155 @@
import sys
import zlib
import pickle
import hashlib
import logging
import os.path
from typing import List, Optional
from dataclasses import dataclass
import capa.rules
import capa.helpers
import capa.version
logger = logging.getLogger(__name__)
# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+
CacheIdentifier = str
def compute_cache_identifier(rule_content: List[bytes]) -> CacheIdentifier:
hash = hashlib.sha256()
# note that this changes with each release,
# so cache identifiers will never collide across releases.
version = capa.version.__version__
hash.update(version.encode("utf-8"))
hash.update(b"\x00")
rule_hashes = list(sorted([hashlib.sha256(buf).hexdigest() for buf in rule_content]))
for rule_hash in rule_hashes:
hash.update(rule_hash.encode("ascii"))
hash.update(b"\x00")
return hash.hexdigest()
def get_default_cache_directory() -> str:
# ref: https://github.com/mandiant/capa/issues/1212#issuecomment-1361259813
#
# Linux: $XDG_CACHE_HOME/capa/
# Windows: %LOCALAPPDATA%\flare\capa\cache
# MacOS: ~/Library/Caches/capa
# ref: https://stackoverflow.com/a/8220141/87207
if sys.platform == "linux" or sys.platform == "linux2":
directory = os.environ.get("XDG_CACHE_HOME", os.path.join(os.environ["HOME"], ".cache", "capa"))
elif sys.platform == "darwin":
directory = os.path.join(os.environ["HOME"], "Library", "Caches", "capa")
elif sys.platform == "win32":
directory = os.path.join(os.environ["LOCALAPPDATA"], "flare", "capa", "cache")
else:
raise NotImplementedError(f"unsupported platform: {sys.platform}")
os.makedirs(directory, exist_ok=True)
return directory
def get_cache_path(cache_dir: str, id: CacheIdentifier) -> str:
filename = "capa-" + id[:8] + ".cache"
return os.path.join(cache_dir, filename)
MAGIC = b"capa"
VERSION = b"\x00\x00\x00\x01"
@dataclass
class RuleCache:
id: CacheIdentifier
ruleset: capa.rules.RuleSet
def dump(self):
return MAGIC + VERSION + self.id.encode("ascii") + zlib.compress(pickle.dumps(self))
@staticmethod
def load(data):
assert data.startswith(MAGIC + VERSION)
id = data[0x8:0x48].decode("ascii")
cache = pickle.loads(zlib.decompress(data[0x48:]))
assert isinstance(cache, RuleCache)
assert cache.id == id
return cache
def get_ruleset_content(ruleset: capa.rules.RuleSet) -> List[bytes]:
rule_contents = []
for rule in ruleset.rules.values():
if rule.is_subscope_rule():
continue
rule_contents.append(rule.definition.encode("utf-8"))
return rule_contents
def compute_ruleset_cache_identifier(ruleset: capa.rules.RuleSet) -> CacheIdentifier:
rule_contents = get_ruleset_content(ruleset)
return compute_cache_identifier(rule_contents)
def cache_ruleset(cache_dir: str, ruleset: capa.rules.RuleSet):
"""
cache the given ruleset to disk, using the given cache directory.
this can subsequently be reloaded via `load_cached_ruleset`,
assuming the capa version and rule content does not change.
callers should use this function to avoid the performance overhead
of validating rules on each run.
"""
id = compute_ruleset_cache_identifier(ruleset)
path = get_cache_path(cache_dir, id)
if os.path.exists(path):
logger.debug("rule set already cached to %s", path)
return
cache = RuleCache(id, ruleset)
with open(path, "wb") as f:
f.write(cache.dump())
logger.debug("rule set cached to %s", path)
return
def load_cached_ruleset(cache_dir: str, rule_contents: List[bytes]) -> Optional[capa.rules.RuleSet]:
"""
load a cached ruleset from disk, using the given cache directory.
the raw rule contents are required here to prove that the rules haven't changed
and to avoid stale cache entries.
callers should use this function to avoid the performance overhead
of validating rules on each run.
"""
id = compute_cache_identifier(rule_contents)
path = get_cache_path(cache_dir, id)
if not os.path.exists(path):
logger.debug("rule set cache does not exist: %s", path)
return None
logger.debug("loading rule set from cache: %s", path)
with open(path, "rb") as f:
buf = f.read()
try:
cache = RuleCache.load(buf)
except AssertionError:
logger.debug("rule set cache is invalid: %s", path)
# delete the cache that seems to be invalid.
os.remove(path)
return None
else:
return cache.ruleset

View File

@@ -152,7 +152,7 @@ def main(argv=None):
capa.main.handle_common_args(args)
try:
rules = capa.rules.RuleSet(capa.main.get_rules(args.rules))
rules = capa.main.get_rules(args.rules)
logger.info("successfully loaded %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))

67
scripts/cache-ruleset.py Normal file
View File

@@ -0,0 +1,67 @@
"""
Create a cache of the given rules.
This is only really intended to be used by CI to pre-cache rulesets
that will be distributed within PyInstaller binaries.
Usage:
$ python scripts/cache-ruleset.py rules/ /path/to/cache/directory
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import time
import logging
import argparse
import capa.main
import capa.rules
import capa.engine
import capa.helpers
import capa.rules.cache
import capa.features.insn
logger = logging.getLogger("cache-ruleset")
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Cache ruleset.")
capa.main.install_common_args(parser)
parser.add_argument("rules", type=str, action="append", help="Path to rules")
parser.add_argument("cache", type=str, help="Path to cache directory")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
if args.debug:
logging.getLogger("capa").setLevel(logging.DEBUG)
else:
logging.getLogger("capa").setLevel(logging.ERROR)
try:
os.makedirs(args.cache, exist_ok=True)
rules = capa.main.get_rules(args.rules, cache_dir=args.cache)
logger.info("successfully loaded %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
return -1
content = capa.rules.cache.get_ruleset_content(rules)
id = capa.rules.cache.compute_cache_identifier(content)
path = capa.rules.cache.get_cache_path(args.cache, id)
assert os.path.exists(path)
logger.info("cached to: %s", path)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -709,9 +709,8 @@ def main(argv=None):
logging.getLogger("capa2yara").setLevel(level)
try:
rules_ = capa.main.get_rules([args.rules], disable_progress=True)
namespaces = capa.rules.index_rules_by_namespace(rules_)
rules = capa.rules.RuleSet(rules_)
rules = capa.main.get_rules([args.rules])
namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values()))
logger.info("successfully loaded %s rules (including subscope rules which will be ignored)", len(rules))
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)

View File

@@ -161,7 +161,7 @@ def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]:
# ==== render dictionary helpers
def capa_details(rules_path, file_path, output_format="dictionary"):
# load rules from disk
rules = capa.rules.RuleSet(capa.main.get_rules([rules_path], disable_progress=True))
rules = capa.main.get_rules([rules_path])
# extract features and find capabilities
extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, [], False, disable_progress=True)

View File

@@ -917,12 +917,11 @@ def lint(ctx: Context):
"""
ret = {}
with tqdm.contrib.logging.tqdm_logging_redirect(ctx.rules.rules.items(), unit="rule") as pbar:
source_rules = [rule for rule in ctx.rules.rules.values() if not rule.is_subscope_rule()]
with tqdm.contrib.logging.tqdm_logging_redirect(source_rules, unit="rule") as pbar:
with redirecting_print_to_tqdm():
for name, rule in pbar:
if rule.is_subscope_rule():
continue
for rule in pbar:
name = rule.name
pbar.set_description(width("linting rule: %s" % (name), 48))
ret[name] = lint_rule(ctx, rule)
@@ -998,8 +997,8 @@ def main(argv=None):
time0 = time.time()
try:
rules = capa.rules.RuleSet(capa.main.get_rules(args.rules, disable_progress=True))
logger.info("successfully loaded %s rules", len(rules))
rules = capa.main.get_rules(args.rules)
logger.info("successfully loaded %s rules", rules.source_rule_count)
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)
logger.debug("selected %s rules", len(rules))

View File

@@ -88,7 +88,7 @@ def main(argv=None):
try:
with capa.main.timing("load rules"):
rules = capa.rules.RuleSet(capa.main.get_rules(args.rules, disable_progress=True))
rules = capa.main.get_rules(args.rules)
except (IOError) as e:
logger.error("%s", str(e))
return -1

View File

@@ -141,7 +141,7 @@ def main(argv=None):
return -1
try:
rules = capa.rules.RuleSet(capa.main.get_rules(args.rules))
rules = capa.main.get_rules(args.rules)
logger.info("successfully loaded %s rules", len(rules))
if args.tag:
rules = rules.filter_rules_by_meta(args.tag)

115
tests/test_rule_cache.py Normal file
View File

@@ -0,0 +1,115 @@
# Copyright (C) 2023 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import os
import textwrap
import capa.rules
import capa.rules.cache
R1 = capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule
authors:
- user@domain.com
scope: function
examples:
- foo1234
- bar5678
features:
- and:
- number: 1
- number: 2
"""
)
)
R2 = capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: test rule 2
authors:
- user@domain.com
scope: function
examples:
- foo1234
- bar5678
features:
- and:
- number: 3
- number: 4
"""
)
)
def test_ruleset_cache_ids():
rs = capa.rules.RuleSet([R1])
content = capa.rules.cache.get_ruleset_content(rs)
rs2 = capa.rules.RuleSet([R1, R2])
content2 = capa.rules.cache.get_ruleset_content(rs2)
id = capa.rules.cache.compute_cache_identifier(content)
id2 = capa.rules.cache.compute_cache_identifier(content2)
assert id != id2
def test_ruleset_cache_save_load():
rs = capa.rules.RuleSet([R1])
content = capa.rules.cache.get_ruleset_content(rs)
id = capa.rules.cache.compute_cache_identifier(content)
assert id is not None
cache_dir = capa.rules.cache.get_default_cache_directory()
path = capa.rules.cache.get_cache_path(cache_dir, id)
try:
os.remove(path)
except OSError:
pass
capa.rules.cache.cache_ruleset(cache_dir, rs)
assert os.path.exists(path)
assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is not None
def test_ruleset_cache_invalid():
rs = capa.rules.RuleSet([R1])
content = capa.rules.cache.get_ruleset_content(rs)
id = capa.rules.cache.compute_cache_identifier(content)
cache_dir = capa.rules.cache.get_default_cache_directory()
path = capa.rules.cache.get_cache_path(cache_dir, id)
try:
os.remove(path)
except OSError:
pass
capa.rules.cache.cache_ruleset(cache_dir, rs)
assert os.path.exists(path)
with open(path, "rb") as f:
buf = f.read()
# corrupt the magic header
buf = b"x" + buf[1:]
with open(path, "wb") as f:
f.write(buf)
assert os.path.exists(path)
assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is None
# the invalid cache should be deleted
assert not os.path.exists(path)

View File

@@ -31,7 +31,6 @@ from capa.features.common import (
Substring,
FeatureAccess,
)
from capa.features.address import AbsoluteVirtualAddress
ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001)
ADDR2 = capa.features.address.AbsoluteVirtualAddress(0x401002)