Changes os.path to pathlib.Path usage

changed args.rules , args.signatures types in handle_common_args.
This commit is contained in:
Aayush Goel
2023-07-06 05:12:50 +05:30
parent 66e2a225d2
commit c0d712acea
22 changed files with 165 additions and 173 deletions

View File

@@ -382,6 +382,7 @@ def load(buf: bytes) -> capa.features.extractors.base_extractor.FeatureExtractor
def main(argv=None): def main(argv=None):
import sys import sys
import argparse import argparse
from pathlib import Path
import capa.main import capa.main
@@ -398,8 +399,7 @@ def main(argv=None):
extractor = capa.main.get_extractor(args.sample, args.format, args.os, args.backend, sigpaths, False) extractor = capa.main.get_extractor(args.sample, args.format, args.os, args.backend, sigpaths, False)
with open(args.output, "wb") as f: Path(args.output).write_bytes(dump(extractor))
f.write(dump(extractor))
return 0 return 0

View File

@@ -10,6 +10,7 @@ import inspect
import logging import logging
import contextlib import contextlib
from typing import NoReturn from typing import NoReturn
from pathlib import Path
import tqdm import tqdm
@@ -31,8 +32,8 @@ def hex(n: int) -> str:
return f"0x{(n):X}" return f"0x{(n):X}"
def get_file_taste(sample_path: str) -> bytes: def get_file_taste(sample_path: Path) -> bytes:
if not os.path.exists(sample_path): if not sample_path.exists():
raise IOError(f"sample path {sample_path} does not exist or cannot be accessed") raise IOError(f"sample path {sample_path} does not exist or cannot be accessed")
with open(sample_path, "rb") as f: with open(sample_path, "rb") as f:
taste = f.read(8) taste = f.read(8)

View File

@@ -9,7 +9,8 @@ import json
import logging import logging
import datetime import datetime
import contextlib import contextlib
from typing import Optional from typing import List, Optional
from pathlib import Path
import idc import idc
import idaapi import idaapi
@@ -119,7 +120,7 @@ def get_file_sha256():
return sha256 return sha256
def collect_metadata(rules): def collect_metadata(rules: List[Path]):
""" """ """ """
md5 = get_file_md5() md5 = get_file_md5()
sha256 = get_file_sha256() sha256 = get_file_sha256()
@@ -156,7 +157,7 @@ def collect_metadata(rules):
arch=arch, arch=arch,
os=os, os=os,
extractor="ida", extractor="ida",
rules=rules, rules=tuple(rules),
base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()), base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()),
layout=rdoc.Layout( layout=rdoc.Layout(
functions=tuple() functions=tuple()

View File

@@ -11,6 +11,7 @@ import logging
import itertools import itertools
import collections import collections
from typing import Any, List, Optional from typing import Any, List, Optional
from pathlib import Path
import idaapi import idaapi
import ida_kernwin import ida_kernwin
@@ -629,7 +630,7 @@ class CapaExplorerForm(idaapi.PluginForm):
if not self.ensure_capa_settings_rule_path(): if not self.ensure_capa_settings_rule_path():
return False return False
rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "") rule_path: Path = Path(settings.user.get(CAPA_SETTINGS_RULE_PATH, ""))
try: try:
def on_load_rule(_, i, total): def on_load_rule(_, i, total):
@@ -769,7 +770,7 @@ class CapaExplorerForm(idaapi.PluginForm):
update_wait_box("extracting features") update_wait_box("extracting features")
try: try:
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]]) meta = capa.ida.helpers.collect_metadata([Path(settings.user[CAPA_SETTINGS_RULE_PATH])])
capabilities, counts = capa.main.find_capabilities( capabilities, counts = capa.main.find_capabilities(
ruleset, self.feature_extractor, disable_progress=True ruleset, self.feature_extractor, disable_progress=True
) )

View File

@@ -21,6 +21,7 @@ import itertools
import contextlib import contextlib
import collections import collections
from typing import Any, Dict, List, Tuple, Callable from typing import Any, Dict, List, Tuple, Callable
from pathlib import Path
import halo import halo
import tqdm import tqdm
@@ -428,40 +429,38 @@ def is_running_standalone() -> bool:
return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS") return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")
def get_default_root() -> str: def get_default_root() -> Path:
""" """
get the file system path to the default resources directory. Get the file system path to the default resources directory.
under PyInstaller, this comes from _MEIPASS. Under PyInstaller, this comes from _MEIPASS.
under source, this is the root directory of the project. Under source, this is the root directory of the project.
""" """
if is_running_standalone(): if is_running_standalone():
# pylance/mypy don't like `sys._MEIPASS` because this isn't standard. try:
# its injected by pyinstaller. meipass = Path(getattr(sys, "_MEIPASS"))
# so we'll fetch this attribute dynamically. return meipass
return getattr(sys, "_MEIPASS") except AttributeError:
else: pass
return os.path.join(os.path.dirname(__file__), "..") # Return the root directory of the project when not running from a PyInstaller'd executable
return Path(__file__).resolve().parent.parent
def get_default_signatures() -> List[str]: def get_default_signatures() -> List[Path]:
""" """
compute a list of file system paths to the default FLIRT signatures. Compute a list of file system paths to the default FLIRT signatures.
""" """
sigs_path = os.path.join(get_default_root(), "sigs") sigs_path = get_default_root() / "sigs"
logger.debug("signatures path: %s", sigs_path) logger.debug("signatures path: %s", sigs_path)
ret = [] ret = []
for root, _, files in os.walk(sigs_path): for file in sigs_path.rglob("*"):
for file in files: if file.is_file() and file.suffix.lower() in (".pat", ".pat.gz", ".sig"):
if not (file.endswith(".pat") or file.endswith(".pat.gz") or file.endswith(".sig")): ret.append(file)
continue
ret.append(os.path.join(root, file))
return ret return ret
def get_workspace(path, format_, sigpaths): def get_workspace(path, format_, sigpaths: List[Path]):
""" """
load the program at the given path into a vivisect workspace using the given format. load the program at the given path into a vivisect workspace using the given format.
also apply the given FLIRT signatures. also apply the given FLIRT signatures.
@@ -499,7 +498,7 @@ def get_workspace(path, format_, sigpaths):
else: else:
raise ValueError("unexpected format: " + format_) raise ValueError("unexpected format: " + format_)
viv_utils.flirt.register_flirt_signature_analyzers(vw, sigpaths) viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths])
vw.analyze() vw.analyze()
@@ -513,7 +512,7 @@ def get_extractor(
format_: str, format_: str,
os_: str, os_: str,
backend: str, backend: str,
sigpaths: List[str], sigpaths: List[Path],
should_save_workspace=False, should_save_workspace=False,
disable_progress=False, disable_progress=False,
) -> FeatureExtractor: ) -> FeatureExtractor:
@@ -614,42 +613,34 @@ def is_nursery_rule_path(path: str) -> bool:
return "nursery" in path return "nursery" in path
def collect_rule_file_paths(rule_paths: List[str]) -> List[str]: def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]:
""" """
collect all rule file paths, including those in subdirectories. Collect all rule file paths, including those in subdirectories.
""" """
rule_file_paths = [] rule_file_paths = []
for rule_path in rule_paths: for rule_path in rule_paths:
if not os.path.exists(rule_path): if not rule_path.exists():
raise IOError(f"rule path {rule_path} does not exist or cannot be accessed") raise IOError(f"rule path {rule_path} does not exist or cannot be accessed")
if os.path.isfile(rule_path): if rule_path.is_file():
rule_file_paths.append(rule_path) rule_file_paths.append(rule_path)
elif os.path.isdir(rule_path): elif rule_path.is_dir():
logger.debug("reading rules from directory %s", rule_path) logger.debug("reading rules from directory %s", rule_path)
for root, _, files in os.walk(rule_path): for root, _, files in os.walk(rule_path):
if ".git" in root: if ".git" in root:
# the .github directory contains CI config in capa-rules # Skip certain directories within the walk
# this includes some .yml files
# these are not rules
# additionally, .git has files that are not .yml and generate the warning
# skip those too
continue continue
for file in files: for file in files:
if not file.endswith(".yml"): if not file.endswith(".yml"):
if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))): if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
# expect to see .git* files, readme.md, format.md, and maybe a .git directory
# other things maybe are rules, but are mis-named.
logger.warning("skipping non-.yml file: %s", file) logger.warning("skipping non-.yml file: %s", file)
continue continue
rule_path = os.path.join(root, file) rule_file_paths.append(Path(root) / file)
rule_file_paths.append(rule_path)
return rule_file_paths return rule_file_paths
# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+ # TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+
RulePath = str RulePath = Path
def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None: def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None:
@@ -669,17 +660,13 @@ def get_rules(
""" """
if cache_dir is None: if cache_dir is None:
cache_dir = capa.rules.cache.get_default_cache_directory() cache_dir = capa.rules.cache.get_default_cache_directory()
# rule_paths may contain directory paths, # rule_paths may contain directory paths,
# so search for file paths recursively. # so search for file paths recursively.
rule_file_paths = collect_rule_file_paths(rule_paths) rule_file_paths = collect_rule_file_paths(rule_paths)
# this list is parallel to `rule_file_paths`: # this list is parallel to `rule_file_paths`:
# rule_file_paths[i] corresponds to rule_contents[i]. # rule_file_paths[i] corresponds to rule_contents[i].
rule_contents = [] rule_contents = [file_path.read_bytes() for file_path in rule_file_paths]
for file_path in rule_file_paths:
with open(file_path, "rb") as f:
rule_contents.append(f.read())
ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents) ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
if ruleset is not None: if ruleset is not None:
@@ -696,8 +683,8 @@ def get_rules(
except capa.rules.InvalidRule: except capa.rules.InvalidRule:
raise raise
else: else:
rule.meta["capa/path"] = path rule.meta["capa/path"] = str(path)
if is_nursery_rule_path(path): if is_nursery_rule_path(str(path)):
rule.meta["capa/nursery"] = True rule.meta["capa/nursery"] = True
rules.append(rule) rules.append(rule)
@@ -710,27 +697,24 @@ def get_rules(
return ruleset return ruleset
def get_signatures(sigs_path): def get_signatures(sigs_path: Path) -> List[Path]:
if not os.path.exists(sigs_path): if not sigs_path.exists():
raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed") raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed")
paths = [] paths: List[Path] = []
if os.path.isfile(sigs_path): if sigs_path.is_file():
paths.append(sigs_path) paths.append(sigs_path)
elif os.path.isdir(sigs_path): elif sigs_path.is_dir():
logger.debug("reading signatures from directory %s", os.path.abspath(os.path.normpath(sigs_path))) logger.debug("reading signatures from directory %s", sigs_path.resolve())
for root, _, files in os.walk(sigs_path): for file in sigs_path.rglob("*"):
for file in files: if file.is_file() and file.suffix.lower() in (".pat", ".pat.gz", ".sig"):
if file.endswith((".pat", ".pat.gz", ".sig")): paths.append(file)
sig_path = os.path.join(root, file)
paths.append(sig_path)
# nicely normalize and format path so that debugging messages are clearer # Convert paths to their absolute and normalized forms
paths = [os.path.abspath(os.path.normpath(path)) for path in paths] paths = [path.resolve().absolute() for path in paths]
# load signatures in deterministic order: the alphabetic sorting of filename. # Sort paths in deterministic order based on filename
# this means that `0_sigs.pat` loads before `1_sigs.pat`. paths = sorted(paths, key=lambda path: path.name)
paths = sorted(paths, key=os.path.basename)
for path in paths: for path in paths:
logger.debug("found signature file: %s", path) logger.debug("found signature file: %s", path)
@@ -743,23 +727,20 @@ def collect_metadata(
sample_path: str, sample_path: str,
format_: str, format_: str,
os_: str, os_: str,
rules_path: List[str], rules_path: List[Path],
extractor: capa.features.extractors.base_extractor.FeatureExtractor, extractor: capa.features.extractors.base_extractor.FeatureExtractor,
) -> rdoc.Metadata: ) -> rdoc.Metadata:
md5 = hashlib.md5() md5 = hashlib.md5()
sha1 = hashlib.sha1() sha1 = hashlib.sha1()
sha256 = hashlib.sha256() sha256 = hashlib.sha256()
with open(sample_path, "rb") as f: buf = Path(sample_path).read_bytes()
buf = f.read()
md5.update(buf) md5.update(buf)
sha1.update(buf) sha1.update(buf)
sha256.update(buf) sha256.update(buf)
if rules_path != [RULES_PATH_DEFAULT_STRING]: rules_path = [r.resolve().absolute() for r in rules_path]
rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path]
format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_ format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_
arch = get_arch(sample_path) arch = get_arch(sample_path)
os_ = get_os(sample_path) if os_ == OS_AUTO else os_ os_ = get_os(sample_path) if os_ == OS_AUTO else os_
@@ -1011,7 +992,7 @@ def handle_common_args(args):
raise RuntimeError("unexpected --color value: " + args.color) raise RuntimeError("unexpected --color value: " + args.color)
if hasattr(args, "rules"): if hasattr(args, "rules"):
rules_paths: List[str] = [] rules_paths: List[Path] = []
if args.rules == [RULES_PATH_DEFAULT_STRING]: if args.rules == [RULES_PATH_DEFAULT_STRING]:
logger.debug("-" * 80) logger.debug("-" * 80)
@@ -1021,9 +1002,9 @@ def handle_common_args(args):
logger.debug(" https://github.com/mandiant/capa-rules") logger.debug(" https://github.com/mandiant/capa-rules")
logger.debug("-" * 80) logger.debug("-" * 80)
default_rule_path = os.path.join(get_default_root(), "rules") default_rule_path = get_default_root() / "rules"
if not os.path.exists(default_rule_path): if not default_rule_path.exists():
# when a users installs capa via pip, # when a users installs capa via pip,
# this pulls down just the source code - not the default rules. # this pulls down just the source code - not the default rules.
# i'm not sure the default rules should even be written to the library directory, # i'm not sure the default rules should even be written to the library directory,
@@ -1035,10 +1016,9 @@ def handle_common_args(args):
rules_paths.append(default_rule_path) rules_paths.append(default_rule_path)
args.is_default_rules = True args.is_default_rules = True
else: else:
rules_paths = args.rules for rule in args.rules:
if RULES_PATH_DEFAULT_STRING != rule:
if RULES_PATH_DEFAULT_STRING in rules_paths: rules_paths.append(Path(rule))
rules_paths.remove(RULES_PATH_DEFAULT_STRING)
for rule_path in rules_paths: for rule_path in rules_paths:
logger.debug("using rules path: %s", rule_path) logger.debug("using rules path: %s", rule_path)
@@ -1056,8 +1036,9 @@ def handle_common_args(args):
) )
logger.debug("-" * 80) logger.debug("-" * 80)
sigs_path = os.path.join(get_default_root(), "sigs") sigs_path = get_default_root() / "sigs"
if not os.path.exists(sigs_path):
if not sigs_path.exists():
logger.error( logger.error(
"Using default signature path, but it doesn't exist. " "Using default signature path, but it doesn't exist. "
"Please install the signatures first: " "Please install the signatures first: "
@@ -1065,7 +1046,7 @@ def handle_common_args(args):
) )
raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed") raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed")
else: else:
sigs_path = args.signatures sigs_path = Path(args.signatures)
logger.debug("using signatures path: %s", sigs_path) logger.debug("using signatures path: %s", sigs_path)
args.signatures = sigs_path args.signatures = sigs_path
@@ -1118,7 +1099,7 @@ def main(argv=None):
return ret return ret
try: try:
_ = get_file_taste(args.sample) _ = get_file_taste(Path(args.sample))
except IOError as e: except IOError as e:
# per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we # per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we
# handle the IOError separately and reach into the args # handle the IOError separately and reach into the args
@@ -1138,7 +1119,7 @@ def main(argv=None):
try: try:
if is_running_standalone() and args.is_default_rules: if is_running_standalone() and args.is_default_rules:
cache_dir = os.path.join(get_default_root(), "cache") cache_dir = get_default_root() / "cache"
else: else:
cache_dir = capa.rules.cache.get_default_cache_directory() cache_dir = capa.rules.cache.get_default_cache_directory()
@@ -1223,8 +1204,7 @@ def main(argv=None):
if format_ == FORMAT_FREEZE: if format_ == FORMAT_FREEZE:
# freeze format deserializes directly into an extractor # freeze format deserializes directly into an extractor
with open(args.sample, "rb") as f: extractor = frz.load(Path(args.sample).read_bytes())
extractor = frz.load(f.read())
else: else:
# all other formats we must create an extractor, # all other formats we must create an extractor,
# such as viv, binary ninja, etc. workspaces # such as viv, binary ninja, etc. workspaces
@@ -1312,7 +1292,7 @@ def ida_main():
logger.debug(" https://github.com/mandiant/capa-rules") logger.debug(" https://github.com/mandiant/capa-rules")
logger.debug("-" * 80) logger.debug("-" * 80)
rules_path = os.path.join(get_default_root(), "rules") rules_path = get_default_root() / "rules"
logger.debug("rule path: %s", rules_path) logger.debug("rule path: %s", rules_path)
rules = get_rules([rules_path]) rules = get_rules([rules_path])

View File

@@ -136,7 +136,8 @@ def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
arch=meta.analysis.arch, arch=meta.analysis.arch,
os=meta.analysis.os, os=meta.analysis.os,
extractor=meta.analysis.extractor, extractor=meta.analysis.extractor,
rules=meta.analysis.rules, # TODO convert analysis.rule type to Path in capa_pb2.Metadata
rules=list(str(rule) for rule in meta.analysis.rules),
base_address=addr_to_pb2(meta.analysis.base_address), base_address=addr_to_pb2(meta.analysis.base_address),
layout=capa_pb2.Layout( layout=capa_pb2.Layout(
functions=[ functions=[

View File

@@ -8,6 +8,7 @@
import datetime import datetime
import collections import collections
from typing import Any, Dict, List, Tuple, Union, Optional from typing import Any, Dict, List, Tuple, Union, Optional
from pathlib import Path
from pydantic import Field, BaseModel from pydantic import Field, BaseModel
@@ -73,7 +74,7 @@ class Analysis(Model):
arch: str arch: str
os: str os: str
extractor: str extractor: str
rules: Tuple[str, ...] rules: Tuple[Path, ...]
base_address: frz.Address base_address: frz.Address
layout: Layout layout: Layout
feature_counts: FeatureCounts feature_counts: FeatureCounts

View File

@@ -91,7 +91,7 @@ def render_meta(ostream, doc: rd.ResultDocument):
("arch", doc.meta.analysis.arch), ("arch", doc.meta.analysis.arch),
("extractor", doc.meta.analysis.extractor), ("extractor", doc.meta.analysis.extractor),
("base address", format_address(doc.meta.analysis.base_address)), ("base address", format_address(doc.meta.analysis.base_address)),
("rules", "\n".join(doc.meta.analysis.rules)), ("rules", "\n".join(tuple(str(rule) for rule in doc.meta.analysis.rules))),
("function count", len(doc.meta.analysis.feature_counts.functions)), ("function count", len(doc.meta.analysis.feature_counts.functions)),
("library function count", len(doc.meta.analysis.library_functions)), ("library function count", len(doc.meta.analysis.library_functions)),
( (

View File

@@ -3,8 +3,8 @@ import zlib
import pickle import pickle
import hashlib import hashlib
import logging import logging
import os.path
from typing import List, Optional from typing import List, Optional
from pathlib import Path
from dataclasses import dataclass from dataclasses import dataclass
import capa.rules import capa.rules
@@ -36,7 +36,7 @@ def compute_cache_identifier(rule_content: List[bytes]) -> CacheIdentifier:
return hash.hexdigest() return hash.hexdigest()
def get_default_cache_directory() -> str: def get_default_cache_directory() -> Path:
# ref: https://github.com/mandiant/capa/issues/1212#issuecomment-1361259813 # ref: https://github.com/mandiant/capa/issues/1212#issuecomment-1361259813
# #
# Linux: $XDG_CACHE_HOME/capa/ # Linux: $XDG_CACHE_HOME/capa/
@@ -45,22 +45,22 @@ def get_default_cache_directory() -> str:
# ref: https://stackoverflow.com/a/8220141/87207 # ref: https://stackoverflow.com/a/8220141/87207
if sys.platform == "linux" or sys.platform == "linux2": if sys.platform == "linux" or sys.platform == "linux2":
directory = os.environ.get("XDG_CACHE_HOME", os.path.join(os.environ["HOME"], ".cache", "capa")) directory = Path(os.environ.get("XDG_CACHE_HOME", Path.home() / ".cache" / "capa"))
elif sys.platform == "darwin": elif sys.platform == "darwin":
directory = os.path.join(os.environ["HOME"], "Library", "Caches", "capa") directory = Path.home() / "Library" / "Caches" / "capa"
elif sys.platform == "win32": elif sys.platform == "win32":
directory = os.path.join(os.environ["LOCALAPPDATA"], "flare", "capa", "cache") directory = Path(os.environ["LOCALAPPDATA"]) / "flare" / "capa" / "cache"
else: else:
raise NotImplementedError(f"unsupported platform: {sys.platform}") raise NotImplementedError(f"unsupported platform: {sys.platform}")
os.makedirs(directory, exist_ok=True) directory.mkdir(parents=True, exist_ok=True)
return directory return directory
def get_cache_path(cache_dir: str, id: CacheIdentifier) -> str: def get_cache_path(cache_dir: Path, id: CacheIdentifier) -> Path:
filename = "capa-" + id[:8] + ".cache" filename = "capa-" + id[:8] + ".cache"
return os.path.join(cache_dir, filename) return cache_dir / filename
MAGIC = b"capa" MAGIC = b"capa"
@@ -102,30 +102,29 @@ def compute_ruleset_cache_identifier(ruleset: capa.rules.RuleSet) -> CacheIdenti
return compute_cache_identifier(rule_contents) return compute_cache_identifier(rule_contents)
def cache_ruleset(cache_dir: str, ruleset: capa.rules.RuleSet): def cache_ruleset(cache_dir: Path, ruleset: capa.rules.RuleSet):
""" """
cache the given ruleset to disk, using the given cache directory. Cache the given ruleset to disk, using the given cache directory.
this can subsequently be reloaded via `load_cached_ruleset`, This can subsequently be reloaded via `load_cached_ruleset`,
assuming the capa version and rule content does not change. assuming the capa version and rule content does not change.
callers should use this function to avoid the performance overhead Callers should use this function to avoid the performance overhead
of validating rules on each run. of validating rules on each run.
""" """
id = compute_ruleset_cache_identifier(ruleset) id = compute_ruleset_cache_identifier(ruleset)
path = get_cache_path(cache_dir, id) path = get_cache_path(cache_dir, id)
if os.path.exists(path): if path.exists():
logger.debug("rule set already cached to %s", path) logger.debug("Rule set already cached to %s", path)
return return
cache = RuleCache(id, ruleset) cache = RuleCache(id, ruleset)
with open(path, "wb") as f: path.write_bytes(cache.dump())
f.write(cache.dump())
logger.debug("rule set cached to %s", path) logger.debug("Rule set cached to %s", path)
return return
def load_cached_ruleset(cache_dir: str, rule_contents: List[bytes]) -> Optional[capa.rules.RuleSet]: def load_cached_ruleset(cache_dir: Path, rule_contents: List[bytes]) -> Optional[capa.rules.RuleSet]:
""" """
load a cached ruleset from disk, using the given cache directory. load a cached ruleset from disk, using the given cache directory.
the raw rule contents are required here to prove that the rules haven't changed the raw rule contents are required here to prove that the rules haven't changed
@@ -136,20 +135,19 @@ def load_cached_ruleset(cache_dir: str, rule_contents: List[bytes]) -> Optional[
""" """
id = compute_cache_identifier(rule_contents) id = compute_cache_identifier(rule_contents)
path = get_cache_path(cache_dir, id) path = get_cache_path(cache_dir, id)
if not os.path.exists(path): if not path.exists():
logger.debug("rule set cache does not exist: %s", path) logger.debug("rule set cache does not exist: %s", path)
return None return None
logger.debug("loading rule set from cache: %s", path) logger.debug("loading rule set from cache: %s", path)
with open(path, "rb") as f: buf = path.read_bytes()
buf = f.read()
try: try:
cache = RuleCache.load(buf) cache = RuleCache.load(buf)
except AssertionError: except AssertionError:
logger.debug("rule set cache is invalid: %s", path) logger.debug("rule set cache is invalid: %s", path)
# delete the cache that seems to be invalid. # delete the cache that seems to be invalid.
os.remove(path) path.unlink()
return None return None
else: else:
return cache.ruleset return cache.ruleset

View File

@@ -37,6 +37,7 @@ import logging
import argparse import argparse
import datetime import datetime
import itertools import itertools
from pathlib import Path
import capa.main import capa.main
import capa.rules import capa.rules
@@ -711,7 +712,7 @@ def main(argv=None):
logging.getLogger("capa2yara").setLevel(level) logging.getLogger("capa2yara").setLevel(level)
try: try:
rules = capa.main.get_rules([args.rules]) rules = capa.main.get_rules([Path(args.rules)])
namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values())) namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values()))
logger.info("successfully loaded %s rules (including subscope rules which will be ignored)", len(rules)) logger.info("successfully loaded %s rules (including subscope rules which will be ignored)", len(rules))
if args.tag: if args.tag:

View File

@@ -3,6 +3,7 @@
import json import json
import collections import collections
from typing import Any, Dict from typing import Any, Dict
from pathlib import Path
import capa.main import capa.main
import capa.rules import capa.rules
@@ -171,7 +172,7 @@ def capa_details(rules_path, file_path, output_format="dictionary"):
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
# collect metadata (used only to make rendering more complete) # collect metadata (used only to make rendering more complete)
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor) meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor)
meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.feature_counts = counts["feature_counts"]
meta.analysis.library_functions = counts["library_functions"] meta.analysis.library_functions = counts["library_functions"]
@@ -199,11 +200,11 @@ if __name__ == "__main__":
import os.path import os.path
import argparse import argparse
RULES_PATH = os.path.join(os.path.dirname(__file__), "..", "rules") RULES_PATH = capa.main.get_default_root() / "rules"
parser = argparse.ArgumentParser(description="Extract capabilities from a file") parser = argparse.ArgumentParser(description="Extract capabilities from a file")
parser.add_argument("file", help="file to extract capabilities from") parser.add_argument("file", help="file to extract capabilities from")
parser.add_argument("--rules", help="path to rules directory", default=os.path.abspath(RULES_PATH)) parser.add_argument("--rules", help="path to rules directory", default=RULES_PATH)
parser.add_argument( parser.add_argument(
"--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary" "--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary"
) )

View File

@@ -1,6 +1,7 @@
import sys import sys
import logging import logging
import argparse import argparse
from pathlib import Path
import capa.main import capa.main
import capa.rules import capa.rules
@@ -89,7 +90,7 @@ def main():
args = parser.parse_args() args = parser.parse_args()
new_rule_path = args.new_rule new_rule_path = args.new_rule
rules_path = args.rules rules_path = [Path(rule) for rule in args.rules]
result = find_overlapping_rules(new_rule_path, rules_path) result = find_overlapping_rules(new_rule_path, rules_path)

View File

@@ -34,6 +34,7 @@ import timeit
import logging import logging
import argparse import argparse
import subprocess import subprocess
from pathlib import Path
import tqdm import tqdm
import tabulate import tabulate
@@ -81,7 +82,7 @@ def main(argv=None):
capa.main.handle_common_args(args) capa.main.handle_common_args(args)
try: try:
taste = capa.helpers.get_file_taste(args.sample) taste = capa.helpers.get_file_taste(Path(args.sample))
except IOError as e: except IOError as e:
logger.error("%s", str(e)) logger.error("%s", str(e))
return -1 return -1

View File

@@ -54,6 +54,7 @@ import logging
import argparse import argparse
import collections import collections
from typing import Dict from typing import Dict
from pathlib import Path
import colorama import colorama
@@ -136,7 +137,7 @@ def main(argv=None):
capa.main.handle_common_args(args) capa.main.handle_common_args(args)
try: try:
taste = get_file_taste(args.sample) taste = get_file_taste(Path(args.sample))
except IOError as e: except IOError as e:
logger.error("%s", str(e)) logger.error("%s", str(e))
return -1 return -1

View File

@@ -67,8 +67,8 @@ Example::
import os import os
import sys import sys
import logging import logging
import os.path
import argparse import argparse
from pathlib import Path
import capa.main import capa.main
import capa.rules import capa.rules
@@ -102,7 +102,7 @@ def main(argv=None):
capa.main.handle_common_args(args) capa.main.handle_common_args(args)
try: try:
taste = capa.helpers.get_file_taste(args.sample) taste = capa.helpers.get_file_taste(Path(args.sample))
except IOError as e: except IOError as e:
logger.error("%s", str(e)) logger.error("%s", str(e))
return -1 return -1

View File

@@ -7,6 +7,7 @@
# See the License for the specific language governing permissions and limitations under the License. # See the License for the specific language governing permissions and limitations under the License.
import os import os
from pathlib import Path
import setuptools import setuptools
@@ -34,14 +35,11 @@ requirements = [
# this sets __version__ # this sets __version__
# via: http://stackoverflow.com/a/7071358/87207 # via: http://stackoverflow.com/a/7071358/87207
# and: http://stackoverflow.com/a/2073599/87207 # and: http://stackoverflow.com/a/2073599/87207
with open(os.path.join("capa", "version.py"), "r") as f: exec(Path("capa/version.py").read_text())
exec(f.read())
# via: https://packaging.python.org/guides/making-a-pypi-friendly-readme/ # via: https://packaging.python.org/guides/making-a-pypi-friendly-readme/
this_directory = os.path.abspath(os.path.dirname(__file__)) long_description = (Path(__file__).resolve().parent / "README.md").read_text()
with open(os.path.join(this_directory, "README.md"), "r") as f:
long_description = f.read()
setuptools.setup( setuptools.setup(

View File

@@ -14,6 +14,7 @@ import itertools
import contextlib import contextlib
import collections import collections
from typing import Set, Dict from typing import Set, Dict
from pathlib import Path
from functools import lru_cache from functools import lru_cache
import pytest import pytest
@@ -44,9 +45,9 @@ from capa.features.address import Address
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
from capa.features.extractors.dnfile.extractor import DnfileFeatureExtractor from capa.features.extractors.dnfile.extractor import DnfileFeatureExtractor
CD = os.path.dirname(__file__) CD = Path(__file__).resolve().parent
DOTNET_DIR = os.path.join(CD, "data", "dotnet") DOTNET_DIR = CD / "data" / "dotnet"
DNFILE_TESTFILES = os.path.join(DOTNET_DIR, "dnfile-testfiles") DNFILE_TESTFILES = DOTNET_DIR / "dnfile-testfiles"
@contextlib.contextmanager @contextlib.contextmanager
@@ -94,11 +95,11 @@ def get_viv_extractor(path):
import capa.features.extractors.viv.extractor import capa.features.extractors.viv.extractor
sigpaths = [ sigpaths = [
os.path.join(CD, "data", "sigs", "test_aulldiv.pat"), CD / "data" / "sigs" / "test_aulldiv.pat",
os.path.join(CD, "data", "sigs", "test_aullrem.pat.gz"), CD / "data" / "sigs" / "test_aullrem.pat.gz",
os.path.join(CD, "..", "sigs", "1_flare_msvc_rtf_32_64.sig"), CD.parent / "sigs" / "1_flare_msvc_rtf_32_64.sig",
os.path.join(CD, "..", "sigs", "2_flare_msvc_atlmfc_32_64.sig"), CD.parent / "sigs" / "2_flare_msvc_atlmfc_32_64.sig",
os.path.join(CD, "..", "sigs", "3_flare_common_libs.sig"), CD.parent / "sigs" / "3_flare_common_libs.sig",
] ]
if "raw32" in path: if "raw32" in path:

View File

@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License. # See the License for the specific language governing permissions and limitations under the License.
import logging import logging
from pathlib import Path
import fixtures import fixtures
from fixtures import * from fixtures import *
@@ -52,8 +53,8 @@ def test_binja_feature_counts(sample, scope, feature, expected):
@pytest.mark.skipif(binja_present is False, reason="Skip binja tests if the binaryninja Python API is not installed") @pytest.mark.skipif(binja_present is False, reason="Skip binja tests if the binaryninja Python API is not installed")
def test_standalone_binja_backend(): def test_standalone_binja_backend():
CD = os.path.dirname(__file__) CD = Path(__file__).resolve().parent
test_path = os.path.join(CD, "..", "tests", "data", "Practical Malware Analysis Lab 01-01.exe_") test_path = CD / ".." / "tests" / "data" / "Practical Malware Analysis Lab 01-01.exe_"
assert capa.main.main([test_path, "-b", capa.main.BACKEND_BINJA]) == 0 assert capa.main.main([test_path, "-b", capa.main.BACKEND_BINJA]) == 0

View File

@@ -46,14 +46,14 @@ import io
import sys import sys
import inspect import inspect
import logging import logging
import os.path
import binascii import binascii
import traceback import traceback
from pathlib import Path
import pytest import pytest
try: try:
sys.path.append(os.path.dirname(__file__)) sys.path.append(str(Path(__file__).parent))
import fixtures import fixtures
from fixtures import * from fixtures import *
finally: finally:

View File

@@ -144,7 +144,7 @@ def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
assert meta.analysis.arch == dst.analysis.arch assert meta.analysis.arch == dst.analysis.arch
assert meta.analysis.os == dst.analysis.os assert meta.analysis.os == dst.analysis.os
assert meta.analysis.extractor == dst.analysis.extractor assert meta.analysis.extractor == dst.analysis.extractor
assert list(meta.analysis.rules) == dst.analysis.rules assert list(str(r) for r in meta.analysis.rules) == dst.analysis.rules
assert capa.render.proto.addr_to_pb2(meta.analysis.base_address) == dst.analysis.base_address assert capa.render.proto.addr_to_pb2(meta.analysis.base_address) == dst.analysis.base_address
assert len(meta.analysis.layout.functions) == len(dst.analysis.layout.functions) assert len(meta.analysis.layout.functions) == len(dst.analysis.layout.functions)

View File

@@ -76,12 +76,12 @@ def test_ruleset_cache_save_load():
path = capa.rules.cache.get_cache_path(cache_dir, id) path = capa.rules.cache.get_cache_path(cache_dir, id)
try: try:
os.remove(path) path.unlink()
except OSError: except OSError:
pass pass
capa.rules.cache.cache_ruleset(cache_dir, rs) capa.rules.cache.cache_ruleset(cache_dir, rs)
assert os.path.exists(path) assert path.exists()
assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is not None assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is not None
@@ -93,23 +93,23 @@ def test_ruleset_cache_invalid():
cache_dir = capa.rules.cache.get_default_cache_directory() cache_dir = capa.rules.cache.get_default_cache_directory()
path = capa.rules.cache.get_cache_path(cache_dir, id) path = capa.rules.cache.get_cache_path(cache_dir, id)
try: try:
os.remove(path) path.unlink()
except OSError: except OSError:
pass pass
capa.rules.cache.cache_ruleset(cache_dir, rs) capa.rules.cache.cache_ruleset(cache_dir, rs)
assert os.path.exists(path) assert path.exists()
with open(path, "rb") as f: buf = path.read_bytes()
buf = f.read()
# corrupt the magic header # Corrupt the magic header
buf = b"x" + buf[1:] buf = b"x" + buf[1:]
with open(path, "wb") as f: # Write the modified contents back to the file
f.write(buf) path.write_bytes(buf)
assert os.path.exists(path) # Check if the file still exists
assert path.exists()
assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is None assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is None
# the invalid cache should be deleted # the invalid cache should be deleted
assert not os.path.exists(path) assert not path.exists()

View File

@@ -10,27 +10,28 @@ import os
import sys import sys
import textwrap import textwrap
import subprocess import subprocess
from pathlib import Path
import pytest import pytest
from fixtures import * from fixtures import *
CD = os.path.dirname(__file__) CD = Path(__file__).resolve().parent
def get_script_path(s): def get_script_path(s):
return os.path.join(CD, "..", "scripts", s) return CD / ".." / "scripts" / s
def get_file_path(): def get_file_path():
return os.path.join(CD, "data", "9324d1a8ae37a36ae560c37448c9705a.exe_") return CD / "data" / "9324d1a8ae37a36ae560c37448c9705a.exe_"
def get_rules_path(): def get_rules_path():
return os.path.join(CD, "..", "rules") return CD / ".." / "rules"
def get_rule_path(): def get_rule_path():
return os.path.join(get_rules_path(), "lib", "allocate-memory.yml") return get_rules_path() / "lib" / "allocate-memory.yml"
@pytest.mark.parametrize( @pytest.mark.parametrize(
@@ -52,13 +53,17 @@ def test_scripts(script, args):
assert p.returncode == 0 assert p.returncode == 0
def test_bulk_process(tmpdir): def test_bulk_process(tmp_path):
# create test directory to recursively analyze # create test directory to recursively analyze
t = tmpdir.mkdir("test") t = tmp_path / "test"
with open(os.path.join(CD, "data", "ping_täst.exe_"), "rb") as f: t.mkdir()
t.join("test.exe_").write_binary(f.read())
p = run_program(get_script_path("bulk-process.py"), [t.dirname]) source_file = Path(__file__).resolve().parent / "data" / "ping_täst.exe_"
dest_file = t / "test.exe_"
dest_file.write_bytes(source_file.read_bytes())
p = run_program(get_script_path("bulk-process.py"), [t.parent])
assert p.returncode == 0 assert p.returncode == 0
@@ -68,19 +73,18 @@ def run_program(script_path, args):
return subprocess.run(args, stdout=subprocess.PIPE) return subprocess.run(args, stdout=subprocess.PIPE)
def test_proto_conversion(tmpdir): def test_proto_conversion(tmp_path):
t = tmpdir.mkdir("proto-test") t = tmp_path / "proto-test"
t.mkdir()
json_file = Path(__file__).resolve().parent / "data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json"
json = os.path.join(CD, "data", "rd", "Practical Malware Analysis Lab 01-01.dll_.json") p = run_program(get_script_path("proto-from-results.py"), [json_file])
p = run_program(get_script_path("proto-from-results.py"), [json])
assert p.returncode == 0 assert p.returncode == 0
pb = os.path.join(t, "pma.pb") pb_file = t / "pma.pb"
with open(pb, "wb") as f: pb_file.write_bytes(p.stdout)
f.write(p.stdout)
p = run_program(get_script_path("proto-to-results.py"), [pb]) p = run_program(get_script_path("proto-to-results.py"), [pb_file])
assert p.returncode == 0 assert p.returncode == 0
assert p.stdout.startswith(b'{\n "meta": ') or p.stdout.startswith(b'{\r\n "meta": ') assert p.stdout.startswith(b'{\n "meta": ') or p.stdout.startswith(b'{\r\n "meta": ')