diff --git a/capa/helpers.py b/capa/helpers.py index bd55e2b4..daa51aa1 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -35,8 +35,7 @@ def hex(n: int) -> str: def get_file_taste(sample_path: Path) -> bytes: if not sample_path.exists(): raise IOError(f"sample path {sample_path} does not exist or cannot be accessed") - with open(sample_path, "rb") as f: - taste = f.read(8) + taste = sample_path.open("rb").read(8) return taste diff --git a/capa/main.py b/capa/main.py index 74557715..ef665ac5 100644 --- a/capa/main.py +++ b/capa/main.py @@ -431,23 +431,26 @@ def is_running_standalone() -> bool: def get_default_root() -> Path: """ - Get the file system path to the default resources directory. - Under PyInstaller, this comes from _MEIPASS. - Under source, this is the root directory of the project. + get the file system path to the default resources directory. + under PyInstaller, this comes from _MEIPASS. + under source, this is the root directory of the project. """ if is_running_standalone(): + # pylance/mypy don't like `sys._MEIPASS` because this isn't standard. + # its injected by pyinstaller. + # so we'll fetch this attribute dynamically. try: meipass = Path(getattr(sys, "_MEIPASS")) return meipass except AttributeError: pass - # Return the root directory of the project when not running from a PyInstaller'd executable + # return the root directory of the project when not running from a PyInstaller'd executable return Path(__file__).resolve().parent.parent def get_default_signatures() -> List[Path]: """ - Compute a list of file system paths to the default FLIRT signatures. + compute a list of file system paths to the default FLIRT signatures. """ sigs_path = get_default_root() / "sigs" logger.debug("signatures path: %s", sigs_path) @@ -600,7 +603,7 @@ def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: return file_extractors -def is_nursery_rule_path(path: str) -> bool: +def is_nursery_rule_path(path: Path) -> bool: """ The nursery is a spot for rules that have not yet been fully polished. For example, they may not have references to public example of a technique. @@ -610,12 +613,12 @@ def is_nursery_rule_path(path: str) -> bool: When nursery rules are loaded, their metadata section should be updated with: `nursery=True`. """ - return "nursery" in path + return "nursery" in path.parts def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]: """ - Collect all rule file paths, including those in subdirectories. + collect all rule file paths, including those in subdirectories. """ rule_file_paths = [] for rule_path in rule_paths: @@ -690,8 +693,7 @@ def get_rules( raise else: rule.meta["capa/path"] = path.as_posix() - if is_nursery_rule_path(path.as_posix()): - rule.meta["capa/nursery"] = True + rule.meta["capa/nursery"] = is_nursery_rule_path(path) rules.append(rule) logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scope) @@ -719,7 +721,8 @@ def get_signatures(sigs_path: Path) -> List[Path]: # Convert paths to their absolute and normalized forms paths = [path.resolve().absolute() for path in paths] - # Sort paths in deterministic order based on filename + # load signatures in deterministic order: the alphabetic sorting of filename. + # this means that `0_sigs.pat` loads before `1_sigs.pat`. paths = sorted(paths, key=lambda path: path.name) for path in paths: diff --git a/capa/render/proto/__init__.py b/capa/render/proto/__init__.py index 10df3b1a..4bbe66f8 100644 --- a/capa/render/proto/__init__.py +++ b/capa/render/proto/__init__.py @@ -137,7 +137,6 @@ def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata: arch=meta.analysis.arch, os=meta.analysis.os, extractor=meta.analysis.extractor, - # TODO convert analysis.rule type to Path in capa_pb2.Metadata rules=list(meta.analysis.rules), base_address=addr_to_pb2(meta.analysis.base_address), layout=capa_pb2.Layout( diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 654cb000..944365e6 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -7,8 +7,7 @@ # See the License for the specific language governing permissions and limitations under the License. import datetime import collections -from typing import Any, Dict, List, Tuple, Union, Optional -from pathlib import Path +from typing import Dict, List, Tuple, Union, Optional from pydantic import Field, BaseModel diff --git a/capa/rules/cache.py b/capa/rules/cache.py index 9ff9fbb7..628ddfe6 100644 --- a/capa/rules/cache.py +++ b/capa/rules/cache.py @@ -105,23 +105,23 @@ def compute_ruleset_cache_identifier(ruleset: capa.rules.RuleSet) -> CacheIdenti def cache_ruleset(cache_dir: Path, ruleset: capa.rules.RuleSet): """ - Cache the given ruleset to disk, using the given cache directory. - This can subsequently be reloaded via `load_cached_ruleset`, + cache the given ruleset to disk, using the given cache directory. + this can subsequently be reloaded via `load_cached_ruleset`, assuming the capa version and rule content does not change. - Callers should use this function to avoid the performance overhead + callers should use this function to avoid the performance overhead of validating rules on each run. """ id = compute_ruleset_cache_identifier(ruleset) path = get_cache_path(cache_dir, id) if path.exists(): - logger.debug("Rule set already cached to %s", path) + logger.debug("rule set already cached to %s", path) return cache = RuleCache(id, ruleset) path.write_bytes(cache.dump()) - logger.debug("Rule set cached to %s", path) + logger.debug("rule set cached to %s", path) return diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 44058153..7f71d067 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -169,9 +169,8 @@ def main(argv=None): return -1 samples = [] - for base, directories, files in os.walk(args.input): - for file in files: - samples.append(str(Path(base) / file)) + for file in Path(args.input).rglob("*"): + samples.append(file.as_posix()) def pmap(f, args, parallelism=multiprocessing.cpu_count()): """apply the given function f to the given args using subprocesses""" diff --git a/scripts/lint.py b/scripts/lint.py index b34d0ccd..dae18c2b 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -886,7 +886,7 @@ def lint(ctx: Context): def collect_samples(path: Path) -> Dict[str, Path]: """ - Recurse through the given path, collecting all file paths, indexed by their content sha256, md5, and filename. + recurse through the given path, collecting all file paths, indexed by their content sha256, md5, and filename. """ samples = {} for path in path.rglob("*"): diff --git a/scripts/setup-linter-dependencies.py b/scripts/setup-linter-dependencies.py index a59d32f0..a6af52e9 100644 --- a/scripts/setup-linter-dependencies.py +++ b/scripts/setup-linter-dependencies.py @@ -187,7 +187,7 @@ if __name__ == "__main__": "--output", "-o", type=str, - default=str(Path(__file__).resolve().parent / "linter-data.json"), + default=(Path(__file__).resolve().parent / "linter-data.json").as_posix(), help="Path to output file (lint.py will be looking for linter-data.json)", ) main(parser.parse_args(args=argv[1:])) diff --git a/setup.py b/setup.py index 2ea59d69..d9986945 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ requirements = [ # this sets __version__ # via: http://stackoverflow.com/a/7071358/87207 # and: http://stackoverflow.com/a/2073599/87207 -exec(Path("capa/version.py").read_text()) +exec((Path("capa") / "version.py").read_text()) # via: https://packaging.python.org/guides/making-a-pypi-friendly-readme/ diff --git a/tests/fixtures.py b/tests/fixtures.py index 5ff359bd..b3109005 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -231,7 +231,7 @@ def extract_instruction_features(extractor, fh, bbh, ih) -> Dict[Feature, Set[Ad return features -# note: too reduce the testing time it's recommended to reuse already existing test samples, if possible +# note: to reduce the testing time it's recommended to reuse already existing test samples, if possible def get_data_path_by_name(name): if name == "mimikatz": return (CD / "data" / "mimikatz.exe_").as_posix() diff --git a/tests/test_rule_cache.py b/tests/test_rule_cache.py index 4acea1b3..f5cca2b6 100644 --- a/tests/test_rule_cache.py +++ b/tests/test_rule_cache.py @@ -102,13 +102,13 @@ def test_ruleset_cache_invalid(): buf = path.read_bytes() - # Corrupt the magic header + # corrupt the magic header buf = b"x" + buf[1:] - # Write the modified contents back to the file + # write the modified contents back to the file path.write_bytes(buf) - # Check if the file still exists + # check if the file still exists assert path.exists() assert capa.rules.cache.load_cached_ruleset(cache_dir, content) is None # the invalid cache should be deleted