diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index d0eb720c..360868da 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -382,6 +382,7 @@ def load(buf: bytes) -> capa.features.extractors.base_extractor.FeatureExtractor def main(argv=None): import sys import argparse + from pathlib import Path import capa.main @@ -398,8 +399,7 @@ def main(argv=None): extractor = capa.main.get_extractor(args.sample, args.format, args.os, args.backend, sigpaths, False) - with open(args.output, "wb") as f: - f.write(dump(extractor)) + Path(args.output).write_bytes(dump(extractor)) return 0 diff --git a/capa/helpers.py b/capa/helpers.py index c03e0553..e4dc850f 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -10,6 +10,7 @@ import inspect import logging import contextlib from typing import NoReturn +from pathlib import Path import tqdm @@ -31,8 +32,8 @@ def hex(n: int) -> str: return f"0x{(n):X}" -def get_file_taste(sample_path: str) -> bytes: - if not os.path.exists(sample_path): +def get_file_taste(sample_path: Path) -> bytes: + if not sample_path.exists(): raise IOError(f"sample path {sample_path} does not exist or cannot be accessed") with open(sample_path, "rb") as f: taste = f.read(8) diff --git a/capa/ida/helpers.py b/capa/ida/helpers.py index d66bfdd0..520e81c2 100644 --- a/capa/ida/helpers.py +++ b/capa/ida/helpers.py @@ -9,7 +9,8 @@ import json import logging import datetime import contextlib -from typing import Optional +from typing import List, Optional +from pathlib import Path import idc import idaapi @@ -119,7 +120,7 @@ def get_file_sha256(): return sha256 -def collect_metadata(rules): +def collect_metadata(rules: List[Path]): """ """ md5 = get_file_md5() sha256 = get_file_sha256() @@ -156,7 +157,7 @@ def collect_metadata(rules): arch=arch, os=os, extractor="ida", - rules=rules, + rules=tuple(rules), base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()), layout=rdoc.Layout( functions=tuple() diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 07fbe69f..1b8bd235 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -11,6 +11,7 @@ import logging import itertools import collections from typing import Any, List, Optional +from pathlib import Path import idaapi import ida_kernwin @@ -629,7 +630,7 @@ class CapaExplorerForm(idaapi.PluginForm): if not self.ensure_capa_settings_rule_path(): return False - rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "") + rule_path: Path = Path(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")) try: def on_load_rule(_, i, total): @@ -769,7 +770,7 @@ class CapaExplorerForm(idaapi.PluginForm): update_wait_box("extracting features") try: - meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]]) + meta = capa.ida.helpers.collect_metadata([Path(settings.user[CAPA_SETTINGS_RULE_PATH])]) capabilities, counts = capa.main.find_capabilities( ruleset, self.feature_extractor, disable_progress=True ) diff --git a/capa/main.py b/capa/main.py index bdf0cec3..61a11b80 100644 --- a/capa/main.py +++ b/capa/main.py @@ -21,6 +21,7 @@ import itertools import contextlib import collections from typing import Any, Dict, List, Tuple, Callable +from pathlib import Path import halo import tqdm @@ -428,40 +429,38 @@ def is_running_standalone() -> bool: return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS") -def get_default_root() -> str: +def get_default_root() -> Path: """ - get the file system path to the default resources directory. - under PyInstaller, this comes from _MEIPASS. - under source, this is the root directory of the project. + Get the file system path to the default resources directory. + Under PyInstaller, this comes from _MEIPASS. + Under source, this is the root directory of the project. """ if is_running_standalone(): - # pylance/mypy don't like `sys._MEIPASS` because this isn't standard. - # its injected by pyinstaller. - # so we'll fetch this attribute dynamically. - return getattr(sys, "_MEIPASS") - else: - return os.path.join(os.path.dirname(__file__), "..") + try: + meipass = Path(getattr(sys, "_MEIPASS")) + return meipass + except AttributeError: + pass + # Return the root directory of the project when not running from a PyInstaller'd executable + return Path(__file__).resolve().parent.parent -def get_default_signatures() -> List[str]: +def get_default_signatures() -> List[Path]: """ - compute a list of file system paths to the default FLIRT signatures. + Compute a list of file system paths to the default FLIRT signatures. """ - sigs_path = os.path.join(get_default_root(), "sigs") + sigs_path = get_default_root() / "sigs" logger.debug("signatures path: %s", sigs_path) ret = [] - for root, _, files in os.walk(sigs_path): - for file in files: - if not (file.endswith(".pat") or file.endswith(".pat.gz") or file.endswith(".sig")): - continue - - ret.append(os.path.join(root, file)) + for file in sigs_path.rglob("*"): + if file.is_file() and file.suffix.lower() in (".pat", ".pat.gz", ".sig"): + ret.append(file) return ret -def get_workspace(path, format_, sigpaths): +def get_workspace(path, format_, sigpaths: List[Path]): """ load the program at the given path into a vivisect workspace using the given format. also apply the given FLIRT signatures. @@ -499,7 +498,7 @@ def get_workspace(path, format_, sigpaths): else: raise ValueError("unexpected format: " + format_) - viv_utils.flirt.register_flirt_signature_analyzers(vw, sigpaths) + viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths]) vw.analyze() @@ -513,7 +512,7 @@ def get_extractor( format_: str, os_: str, backend: str, - sigpaths: List[str], + sigpaths: List[Path], should_save_workspace=False, disable_progress=False, ) -> FeatureExtractor: @@ -614,42 +613,34 @@ def is_nursery_rule_path(path: str) -> bool: return "nursery" in path -def collect_rule_file_paths(rule_paths: List[str]) -> List[str]: +def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]: """ - collect all rule file paths, including those in subdirectories. + Collect all rule file paths, including those in subdirectories. """ rule_file_paths = [] for rule_path in rule_paths: - if not os.path.exists(rule_path): + if not rule_path.exists(): raise IOError(f"rule path {rule_path} does not exist or cannot be accessed") - if os.path.isfile(rule_path): + if rule_path.is_file(): rule_file_paths.append(rule_path) - elif os.path.isdir(rule_path): + elif rule_path.is_dir(): logger.debug("reading rules from directory %s", rule_path) for root, _, files in os.walk(rule_path): if ".git" in root: - # the .github directory contains CI config in capa-rules - # this includes some .yml files - # these are not rules - # additionally, .git has files that are not .yml and generate the warning - # skip those too + # Skip certain directories within the walk continue for file in files: if not file.endswith(".yml"): if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))): - # expect to see .git* files, readme.md, format.md, and maybe a .git directory - # other things maybe are rules, but are mis-named. logger.warning("skipping non-.yml file: %s", file) continue - rule_path = os.path.join(root, file) - rule_file_paths.append(rule_path) - + rule_file_paths.append(Path(root) / file) return rule_file_paths # TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+ -RulePath = str +RulePath = Path def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None: @@ -669,17 +660,13 @@ def get_rules( """ if cache_dir is None: cache_dir = capa.rules.cache.get_default_cache_directory() - # rule_paths may contain directory paths, # so search for file paths recursively. rule_file_paths = collect_rule_file_paths(rule_paths) # this list is parallel to `rule_file_paths`: # rule_file_paths[i] corresponds to rule_contents[i]. - rule_contents = [] - for file_path in rule_file_paths: - with open(file_path, "rb") as f: - rule_contents.append(f.read()) + rule_contents = [file_path.read_bytes() for file_path in rule_file_paths] ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents) if ruleset is not None: @@ -696,8 +683,8 @@ def get_rules( except capa.rules.InvalidRule: raise else: - rule.meta["capa/path"] = path - if is_nursery_rule_path(path): + rule.meta["capa/path"] = str(path) + if is_nursery_rule_path(str(path)): rule.meta["capa/nursery"] = True rules.append(rule) @@ -710,27 +697,24 @@ def get_rules( return ruleset -def get_signatures(sigs_path): - if not os.path.exists(sigs_path): +def get_signatures(sigs_path: Path) -> List[Path]: + if not sigs_path.exists(): raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed") - paths = [] - if os.path.isfile(sigs_path): + paths: List[Path] = [] + if sigs_path.is_file(): paths.append(sigs_path) - elif os.path.isdir(sigs_path): - logger.debug("reading signatures from directory %s", os.path.abspath(os.path.normpath(sigs_path))) - for root, _, files in os.walk(sigs_path): - for file in files: - if file.endswith((".pat", ".pat.gz", ".sig")): - sig_path = os.path.join(root, file) - paths.append(sig_path) + elif sigs_path.is_dir(): + logger.debug("reading signatures from directory %s", sigs_path.resolve()) + for file in sigs_path.rglob("*"): + if file.is_file() and file.suffix.lower() in (".pat", ".pat.gz", ".sig"): + paths.append(file) - # nicely normalize and format path so that debugging messages are clearer - paths = [os.path.abspath(os.path.normpath(path)) for path in paths] + # Convert paths to their absolute and normalized forms + paths = [path.resolve().absolute() for path in paths] - # load signatures in deterministic order: the alphabetic sorting of filename. - # this means that `0_sigs.pat` loads before `1_sigs.pat`. - paths = sorted(paths, key=os.path.basename) + # Sort paths in deterministic order based on filename + paths = sorted(paths, key=lambda path: path.name) for path in paths: logger.debug("found signature file: %s", path) @@ -743,23 +727,20 @@ def collect_metadata( sample_path: str, format_: str, os_: str, - rules_path: List[str], + rules_path: List[Path], extractor: capa.features.extractors.base_extractor.FeatureExtractor, ) -> rdoc.Metadata: md5 = hashlib.md5() sha1 = hashlib.sha1() sha256 = hashlib.sha256() - with open(sample_path, "rb") as f: - buf = f.read() + buf = Path(sample_path).read_bytes() md5.update(buf) sha1.update(buf) sha256.update(buf) - if rules_path != [RULES_PATH_DEFAULT_STRING]: - rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path] - + rules_path = [r.resolve().absolute() for r in rules_path] format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_ arch = get_arch(sample_path) os_ = get_os(sample_path) if os_ == OS_AUTO else os_ @@ -1011,7 +992,7 @@ def handle_common_args(args): raise RuntimeError("unexpected --color value: " + args.color) if hasattr(args, "rules"): - rules_paths: List[str] = [] + rules_paths: List[Path] = [] if args.rules == [RULES_PATH_DEFAULT_STRING]: logger.debug("-" * 80) @@ -1021,9 +1002,9 @@ def handle_common_args(args): logger.debug(" https://github.com/mandiant/capa-rules") logger.debug("-" * 80) - default_rule_path = os.path.join(get_default_root(), "rules") + default_rule_path = get_default_root() / "rules" - if not os.path.exists(default_rule_path): + if not default_rule_path.exists(): # when a users installs capa via pip, # this pulls down just the source code - not the default rules. # i'm not sure the default rules should even be written to the library directory, @@ -1035,10 +1016,9 @@ def handle_common_args(args): rules_paths.append(default_rule_path) args.is_default_rules = True else: - rules_paths = args.rules - - if RULES_PATH_DEFAULT_STRING in rules_paths: - rules_paths.remove(RULES_PATH_DEFAULT_STRING) + for rule in args.rules: + if RULES_PATH_DEFAULT_STRING != rule: + rules_paths.append(Path(rule)) for rule_path in rules_paths: logger.debug("using rules path: %s", rule_path) @@ -1056,8 +1036,9 @@ def handle_common_args(args): ) logger.debug("-" * 80) - sigs_path = os.path.join(get_default_root(), "sigs") - if not os.path.exists(sigs_path): + sigs_path = get_default_root() / "sigs" + + if not sigs_path.exists(): logger.error( "Using default signature path, but it doesn't exist. " "Please install the signatures first: " @@ -1065,7 +1046,7 @@ def handle_common_args(args): ) raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed") else: - sigs_path = args.signatures + sigs_path = Path(args.signatures) logger.debug("using signatures path: %s", sigs_path) args.signatures = sigs_path @@ -1118,7 +1099,7 @@ def main(argv=None): return ret try: - _ = get_file_taste(args.sample) + _ = get_file_taste(Path(args.sample)) except IOError as e: # per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we # handle the IOError separately and reach into the args @@ -1138,7 +1119,7 @@ def main(argv=None): try: if is_running_standalone() and args.is_default_rules: - cache_dir = os.path.join(get_default_root(), "cache") + cache_dir = get_default_root() / "cache" else: cache_dir = capa.rules.cache.get_default_cache_directory() @@ -1223,8 +1204,7 @@ def main(argv=None): if format_ == FORMAT_FREEZE: # freeze format deserializes directly into an extractor - with open(args.sample, "rb") as f: - extractor = frz.load(f.read()) + extractor = frz.load(Path(args.sample).read_bytes()) else: # all other formats we must create an extractor, # such as viv, binary ninja, etc. workspaces @@ -1312,7 +1292,7 @@ def ida_main(): logger.debug(" https://github.com/mandiant/capa-rules") logger.debug("-" * 80) - rules_path = os.path.join(get_default_root(), "rules") + rules_path = get_default_root() / "rules" logger.debug("rule path: %s", rules_path) rules = get_rules([rules_path]) diff --git a/capa/render/proto/__init__.py b/capa/render/proto/__init__.py index 66a06c82..a5a3f04d 100644 --- a/capa/render/proto/__init__.py +++ b/capa/render/proto/__init__.py @@ -136,7 +136,8 @@ def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata: arch=meta.analysis.arch, os=meta.analysis.os, extractor=meta.analysis.extractor, - rules=meta.analysis.rules, + # TODO convert analysis.rule type to Path in capa_pb2.Metadata + rules=list(str(rule) for rule in meta.analysis.rules), base_address=addr_to_pb2(meta.analysis.base_address), layout=capa_pb2.Layout( functions=[ diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 21957ddf..79ea63b6 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -8,6 +8,7 @@ import datetime import collections from typing import Any, Dict, List, Tuple, Union, Optional +from pathlib import Path from pydantic import Field, BaseModel @@ -73,7 +74,7 @@ class Analysis(Model): arch: str os: str extractor: str - rules: Tuple[str, ...] + rules: Tuple[Path, ...] base_address: frz.Address layout: Layout feature_counts: FeatureCounts diff --git a/capa/render/verbose.py b/capa/render/verbose.py index 536e7242..fea63d07 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -91,7 +91,7 @@ def render_meta(ostream, doc: rd.ResultDocument): ("arch", doc.meta.analysis.arch), ("extractor", doc.meta.analysis.extractor), ("base address", format_address(doc.meta.analysis.base_address)), - ("rules", "\n".join(doc.meta.analysis.rules)), + ("rules", "\n".join(tuple(str(rule) for rule in doc.meta.analysis.rules))), ("function count", len(doc.meta.analysis.feature_counts.functions)), ("library function count", len(doc.meta.analysis.library_functions)), ( diff --git a/capa/rules/cache.py b/capa/rules/cache.py index e2bdfc70..2d49c407 100644 --- a/capa/rules/cache.py +++ b/capa/rules/cache.py @@ -3,8 +3,8 @@ import zlib import pickle import hashlib import logging -import os.path from typing import List, Optional +from pathlib import Path from dataclasses import dataclass import capa.rules @@ -36,7 +36,7 @@ def compute_cache_identifier(rule_content: List[bytes]) -> CacheIdentifier: return hash.hexdigest() -def get_default_cache_directory() -> str: +def get_default_cache_directory() -> Path: # ref: https://github.com/mandiant/capa/issues/1212#issuecomment-1361259813 # # Linux: $XDG_CACHE_HOME/capa/ @@ -45,22 +45,22 @@ def get_default_cache_directory() -> str: # ref: https://stackoverflow.com/a/8220141/87207 if sys.platform == "linux" or sys.platform == "linux2": - directory = os.environ.get("XDG_CACHE_HOME", os.path.join(os.environ["HOME"], ".cache", "capa")) + directory = Path(os.environ.get("XDG_CACHE_HOME", Path.home() / ".cache" / "capa")) elif sys.platform == "darwin": - directory = os.path.join(os.environ["HOME"], "Library", "Caches", "capa") + directory = Path.home() / "Library" / "Caches" / "capa" elif sys.platform == "win32": - directory = os.path.join(os.environ["LOCALAPPDATA"], "flare", "capa", "cache") + directory = Path(os.environ["LOCALAPPDATA"]) / "flare" / "capa" / "cache" else: raise NotImplementedError(f"unsupported platform: {sys.platform}") - os.makedirs(directory, exist_ok=True) + directory.mkdir(parents=True, exist_ok=True) return directory -def get_cache_path(cache_dir: str, id: CacheIdentifier) -> str: +def get_cache_path(cache_dir: Path, id: CacheIdentifier) -> Path: filename = "capa-" + id[:8] + ".cache" - return os.path.join(cache_dir, filename) + return cache_dir / filename MAGIC = b"capa" @@ -102,30 +102,29 @@ def compute_ruleset_cache_identifier(ruleset: capa.rules.RuleSet) -> CacheIdenti return compute_cache_identifier(rule_contents) -def cache_ruleset(cache_dir: str, ruleset: capa.rules.RuleSet): +def cache_ruleset(cache_dir: Path, ruleset: capa.rules.RuleSet): """ - cache the given ruleset to disk, using the given cache directory. - this can subsequently be reloaded via `load_cached_ruleset`, + Cache the given ruleset to disk, using the given cache directory. + This can subsequently be reloaded via `load_cached_ruleset`, assuming the capa version and rule content does not change. - callers should use this function to avoid the performance overhead + Callers should use this function to avoid the performance overhead of validating rules on each run. """ id = compute_ruleset_cache_identifier(ruleset) path = get_cache_path(cache_dir, id) - if os.path.exists(path): - logger.debug("rule set already cached to %s", path) + if path.exists(): + logger.debug("Rule set already cached to %s", path) return cache = RuleCache(id, ruleset) - with open(path, "wb") as f: - f.write(cache.dump()) + path.write_bytes(cache.dump()) - logger.debug("rule set cached to %s", path) + logger.debug("Rule set cached to %s", path) return -def load_cached_ruleset(cache_dir: str, rule_contents: List[bytes]) -> Optional[capa.rules.RuleSet]: +def load_cached_ruleset(cache_dir: Path, rule_contents: List[bytes]) -> Optional[capa.rules.RuleSet]: """ load a cached ruleset from disk, using the given cache directory. the raw rule contents are required here to prove that the rules haven't changed @@ -136,20 +135,19 @@ def load_cached_ruleset(cache_dir: str, rule_contents: List[bytes]) -> Optional[ """ id = compute_cache_identifier(rule_contents) path = get_cache_path(cache_dir, id) - if not os.path.exists(path): + if not path.exists(): logger.debug("rule set cache does not exist: %s", path) return None logger.debug("loading rule set from cache: %s", path) - with open(path, "rb") as f: - buf = f.read() + buf = path.read_bytes() try: cache = RuleCache.load(buf) except AssertionError: logger.debug("rule set cache is invalid: %s", path) # delete the cache that seems to be invalid. - os.remove(path) + path.unlink() return None else: return cache.ruleset diff --git a/scripts/capa2yara.py b/scripts/capa2yara.py index 9752c996..15c0e190 100644 --- a/scripts/capa2yara.py +++ b/scripts/capa2yara.py @@ -37,6 +37,7 @@ import logging import argparse import datetime import itertools +from pathlib import Path import capa.main import capa.rules @@ -711,7 +712,7 @@ def main(argv=None): logging.getLogger("capa2yara").setLevel(level) try: - rules = capa.main.get_rules([args.rules]) + rules = capa.main.get_rules([Path(args.rules)]) namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values())) logger.info("successfully loaded %s rules (including subscope rules which will be ignored)", len(rules)) if args.tag: diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 57657018..452fdc53 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -3,6 +3,7 @@ import json import collections from typing import Any, Dict +from pathlib import Path import capa.main import capa.rules @@ -171,7 +172,7 @@ def capa_details(rules_path, file_path, output_format="dictionary"): capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) - meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor) + meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor) meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] @@ -199,11 +200,11 @@ if __name__ == "__main__": import os.path import argparse - RULES_PATH = os.path.join(os.path.dirname(__file__), "..", "rules") + RULES_PATH = capa.main.get_default_root() / "rules" parser = argparse.ArgumentParser(description="Extract capabilities from a file") parser.add_argument("file", help="file to extract capabilities from") - parser.add_argument("--rules", help="path to rules directory", default=os.path.abspath(RULES_PATH)) + parser.add_argument("--rules", help="path to rules directory", default=RULES_PATH) parser.add_argument( "--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary" ) diff --git a/scripts/detect_duplicate_features.py b/scripts/detect_duplicate_features.py index ff21fd0b..88476b9d 100644 --- a/scripts/detect_duplicate_features.py +++ b/scripts/detect_duplicate_features.py @@ -1,6 +1,7 @@ import sys import logging import argparse +from pathlib import Path import capa.main import capa.rules @@ -89,7 +90,7 @@ def main(): args = parser.parse_args() new_rule_path = args.new_rule - rules_path = args.rules + rules_path = [Path(rule) for rule in args.rules] result = find_overlapping_rules(new_rule_path, rules_path) diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 09d125d8..8efb664f 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -34,6 +34,7 @@ import timeit import logging import argparse import subprocess +from pathlib import Path import tqdm import tabulate @@ -81,7 +82,7 @@ def main(argv=None): capa.main.handle_common_args(args) try: - taste = capa.helpers.get_file_taste(args.sample) + taste = capa.helpers.get_file_taste(Path(args.sample)) except IOError as e: logger.error("%s", str(e)) return -1 diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index b58c7568..dfd0a31e 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -54,6 +54,7 @@ import logging import argparse import collections from typing import Dict +from pathlib import Path import colorama @@ -136,7 +137,7 @@ def main(argv=None): capa.main.handle_common_args(args) try: - taste = get_file_taste(args.sample) + taste = get_file_taste(Path(args.sample)) except IOError as e: logger.error("%s", str(e)) return -1 diff --git a/scripts/show-features.py b/scripts/show-features.py index bb83bad9..3400da7a 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -67,8 +67,8 @@ Example:: import os import sys import logging -import os.path import argparse +from pathlib import Path import capa.main import capa.rules @@ -102,7 +102,7 @@ def main(argv=None): capa.main.handle_common_args(args) try: - taste = capa.helpers.get_file_taste(args.sample) + taste = capa.helpers.get_file_taste(Path(args.sample)) except IOError as e: logger.error("%s", str(e)) return -1 diff --git a/setup.py b/setup.py index 4a67c68c..a2fb0f5d 100644 --- a/setup.py +++ b/setup.py @@ -7,6 +7,7 @@ # See the License for the specific language governing permissions and limitations under the License. import os +from pathlib import Path import setuptools @@ -34,14 +35,11 @@ requirements = [ # this sets __version__ # via: http://stackoverflow.com/a/7071358/87207 # and: http://stackoverflow.com/a/2073599/87207 -with open(os.path.join("capa", "version.py"), "r") as f: - exec(f.read()) +exec(Path("capa/version.py").read_text()) # via: https://packaging.python.org/guides/making-a-pypi-friendly-readme/ -this_directory = os.path.abspath(os.path.dirname(__file__)) -with open(os.path.join(this_directory, "README.md"), "r") as f: - long_description = f.read() +long_description = (Path(__file__).resolve().parent / "README.md").read_text() setuptools.setup( diff --git a/tests/fixtures.py b/tests/fixtures.py index 84e40209..3fcc6b29 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -14,6 +14,7 @@ import itertools import contextlib import collections from typing import Set, Dict +from pathlib import Path from functools import lru_cache import pytest @@ -44,9 +45,9 @@ from capa.features.address import Address from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle from capa.features.extractors.dnfile.extractor import DnfileFeatureExtractor -CD = os.path.dirname(__file__) -DOTNET_DIR = os.path.join(CD, "data", "dotnet") -DNFILE_TESTFILES = os.path.join(DOTNET_DIR, "dnfile-testfiles") +CD = Path(__file__).resolve().parent +DOTNET_DIR = CD / "data" / "dotnet" +DNFILE_TESTFILES = DOTNET_DIR / "dnfile-testfiles" @contextlib.contextmanager @@ -94,11 +95,11 @@ def get_viv_extractor(path): import capa.features.extractors.viv.extractor sigpaths = [ - os.path.join(CD, "data", "sigs", "test_aulldiv.pat"), - os.path.join(CD, "data", "sigs", "test_aullrem.pat.gz"), - os.path.join(CD, "..", "sigs", "1_flare_msvc_rtf_32_64.sig"), - os.path.join(CD, "..", "sigs", "2_flare_msvc_atlmfc_32_64.sig"), - os.path.join(CD, "..", "sigs", "3_flare_common_libs.sig"), + CD / "data" / "sigs" / "test_aulldiv.pat", + CD / "data" / "sigs" / "test_aullrem.pat.gz", + CD.parent / "sigs" / "1_flare_msvc_rtf_32_64.sig", + CD.parent / "sigs" / "2_flare_msvc_atlmfc_32_64.sig", + CD.parent / "sigs" / "3_flare_common_libs.sig", ] if "raw32" in path: diff --git a/tests/test_binja_features.py b/tests/test_binja_features.py index 04c8a49e..c5c3d33f 100644 --- a/tests/test_binja_features.py +++ b/tests/test_binja_features.py @@ -6,6 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import logging +from pathlib import Path import fixtures from fixtures import * @@ -52,8 +53,8 @@ def test_binja_feature_counts(sample, scope, feature, expected): @pytest.mark.skipif(binja_present is False, reason="Skip binja tests if the binaryninja Python API is not installed") def test_standalone_binja_backend(): - CD = os.path.dirname(__file__) - test_path = os.path.join(CD, "..", "tests", "data", "Practical Malware Analysis Lab 01-01.exe_") + CD = Path(__file__).resolve().parent + test_path = CD / ".." / "tests" / "data" / "Practical Malware Analysis Lab 01-01.exe_" assert capa.main.main([test_path, "-b", capa.main.BACKEND_BINJA]) == 0 diff --git a/tests/test_ida_features.py b/tests/test_ida_features.py index 99e7d5a1..77897194 100644 --- a/tests/test_ida_features.py +++ b/tests/test_ida_features.py @@ -46,14 +46,14 @@ import io import sys import inspect import logging -import os.path import binascii import traceback +from pathlib import Path import pytest try: - sys.path.append(os.path.dirname(__file__)) + sys.path.append(str(Path(__file__).parent)) import fixtures from fixtures import * finally: diff --git a/tests/test_proto.py b/tests/test_proto.py index 64c950bc..da7cc686 100644 --- a/tests/test_proto.py +++ b/tests/test_proto.py @@ -144,7 +144,7 @@ def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata): assert meta.analysis.arch == dst.analysis.arch assert meta.analysis.os == dst.analysis.os assert meta.analysis.extractor == dst.analysis.extractor - assert list(meta.analysis.rules) == dst.analysis.rules + assert list(str(r) for r in meta.analysis.rules) == dst.analysis.rules assert capa.render.proto.addr_to_pb2(meta.analysis.base_address) == dst.analysis.base_address assert len(meta.analysis.layout.functions) == len(dst.analysis.layout.functions) diff --git a/tests/test_rule_cache.py b/tests/test_rule_cache.py index b52e2577..4acea1b3 100644 --- a/tests/test_rule_cache.py +++ b/tests/test_rule_cache.py @@ -76,12 +76,12 @@ def test_ruleset_cache_save_load(): path = capa.rules.cache.get_cache_path(cache_dir, id) try: - os.remove(path) + path.unlink() except OSError: pass capa.rules.cache.cache_ruleset(cache_dir, rs) - assert os.path.exists(path) + assert path.exists() assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is not None @@ -93,23 +93,23 @@ def test_ruleset_cache_invalid(): cache_dir = capa.rules.cache.get_default_cache_directory() path = capa.rules.cache.get_cache_path(cache_dir, id) try: - os.remove(path) + path.unlink() except OSError: pass capa.rules.cache.cache_ruleset(cache_dir, rs) - assert os.path.exists(path) + assert path.exists() - with open(path, "rb") as f: - buf = f.read() + buf = path.read_bytes() - # corrupt the magic header + # Corrupt the magic header buf = b"x" + buf[1:] - with open(path, "wb") as f: - f.write(buf) + # Write the modified contents back to the file + path.write_bytes(buf) - assert os.path.exists(path) + # Check if the file still exists + assert path.exists() assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is None # the invalid cache should be deleted - assert not os.path.exists(path) + assert not path.exists() diff --git a/tests/test_scripts.py b/tests/test_scripts.py index 2d8fefac..35b46341 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -10,27 +10,28 @@ import os import sys import textwrap import subprocess +from pathlib import Path import pytest from fixtures import * -CD = os.path.dirname(__file__) +CD = Path(__file__).resolve().parent def get_script_path(s): - return os.path.join(CD, "..", "scripts", s) + return CD / ".." / "scripts" / s def get_file_path(): - return os.path.join(CD, "data", "9324d1a8ae37a36ae560c37448c9705a.exe_") + return CD / "data" / "9324d1a8ae37a36ae560c37448c9705a.exe_" def get_rules_path(): - return os.path.join(CD, "..", "rules") + return CD / ".." / "rules" def get_rule_path(): - return os.path.join(get_rules_path(), "lib", "allocate-memory.yml") + return get_rules_path() / "lib" / "allocate-memory.yml" @pytest.mark.parametrize( @@ -52,13 +53,17 @@ def test_scripts(script, args): assert p.returncode == 0 -def test_bulk_process(tmpdir): +def test_bulk_process(tmp_path): # create test directory to recursively analyze - t = tmpdir.mkdir("test") - with open(os.path.join(CD, "data", "ping_täst.exe_"), "rb") as f: - t.join("test.exe_").write_binary(f.read()) + t = tmp_path / "test" + t.mkdir() - p = run_program(get_script_path("bulk-process.py"), [t.dirname]) + source_file = Path(__file__).resolve().parent / "data" / "ping_täst.exe_" + dest_file = t / "test.exe_" + + dest_file.write_bytes(source_file.read_bytes()) + + p = run_program(get_script_path("bulk-process.py"), [t.parent]) assert p.returncode == 0 @@ -68,19 +73,18 @@ def run_program(script_path, args): return subprocess.run(args, stdout=subprocess.PIPE) -def test_proto_conversion(tmpdir): - t = tmpdir.mkdir("proto-test") +def test_proto_conversion(tmp_path): + t = tmp_path / "proto-test" + t.mkdir() + json_file = Path(__file__).resolve().parent / "data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json" - json = os.path.join(CD, "data", "rd", "Practical Malware Analysis Lab 01-01.dll_.json") - - p = run_program(get_script_path("proto-from-results.py"), [json]) + p = run_program(get_script_path("proto-from-results.py"), [json_file]) assert p.returncode == 0 - pb = os.path.join(t, "pma.pb") - with open(pb, "wb") as f: - f.write(p.stdout) + pb_file = t / "pma.pb" + pb_file.write_bytes(p.stdout) - p = run_program(get_script_path("proto-to-results.py"), [pb]) + p = run_program(get_script_path("proto-to-results.py"), [pb_file]) assert p.returncode == 0 assert p.stdout.startswith(b'{\n "meta": ') or p.stdout.startswith(b'{\r\n "meta": ')