From edeb458b337246c22945bb9484dde211b2d6846f Mon Sep 17 00:00:00 2001 From: Aayush Goel <81844215+Aayush-Goel-04@users.noreply.github.com> Date: Thu, 6 Jul 2023 23:59:01 +0530 Subject: [PATCH] some more changes --- capa/ida/helpers.py | 2 +- capa/ida/plugin/form.py | 10 ++--- capa/main.py | 17 +++++--- capa/render/proto/__init__.py | 4 +- capa/render/result_document.py | 2 +- capa/render/verbose.py | 2 +- scripts/bulk-process.py | 4 +- scripts/cache-ruleset.py | 2 +- scripts/capa_as_library.py | 4 +- scripts/import-to-bn.py | 20 +++++---- scripts/lint.py | 64 ++++++++++++---------------- scripts/setup-linter-dependencies.py | 4 +- tests/test_proto.py | 2 +- 13 files changed, 67 insertions(+), 70 deletions(-) diff --git a/capa/ida/helpers.py b/capa/ida/helpers.py index 520e81c2..64dcb84d 100644 --- a/capa/ida/helpers.py +++ b/capa/ida/helpers.py @@ -157,7 +157,7 @@ def collect_metadata(rules: List[Path]): arch=arch, os=os, extractor="ida", - rules=tuple(rules), + rules=tuple(str(r.resolve().absolute()) for r in rules), base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()), layout=rdoc.Layout( functions=tuple() diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 1b8bd235..d36647a6 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -577,10 +577,10 @@ class CapaExplorerForm(idaapi.PluginForm): def ensure_capa_settings_rule_path(self): try: - path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "") + path: Path = Path(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")) # resolve rules directory - check self and settings first, then ask user - if not os.path.exists(path): + if not path.exists(): # configure rules selection messagebox rules_message = QtWidgets.QMessageBox() rules_message.setIcon(QtWidgets.QMessageBox.Information) @@ -598,15 +598,15 @@ class CapaExplorerForm(idaapi.PluginForm): if pressed == QtWidgets.QMessageBox.Cancel: raise UserCancelledError() - path = self.ask_user_directory() + path = Path(self.ask_user_directory()) if not path: raise UserCancelledError() - if not os.path.exists(path): + if not path.exists(): logger.error("rule path %s does not exist or cannot be accessed" % path) return False - settings.user[CAPA_SETTINGS_RULE_PATH] = path + settings.user[CAPA_SETTINGS_RULE_PATH] = str(path) except UserCancelledError as e: capa.ida.helpers.inform_user_ida_ui("Analysis requires capa rules") logger.warning( diff --git a/capa/main.py b/capa/main.py index 61a11b80..46590636 100644 --- a/capa/main.py +++ b/capa/main.py @@ -13,7 +13,6 @@ import sys import time import hashlib import logging -import os.path import argparse import datetime import textwrap @@ -544,7 +543,7 @@ def get_extractor( # We need to fist find the binja API installation path and add it into sys.path if is_running_standalone(): bn_api = find_binja_path() - if os.path.exists(bn_api): + if Path(bn_api).exists(): sys.path.append(bn_api) try: @@ -628,11 +627,17 @@ def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]: logger.debug("reading rules from directory %s", rule_path) for root, _, files in os.walk(rule_path): if ".git" in root: - # Skip certain directories within the walk + # the .github directory contains CI config in capa-rules + # this includes some .yml files + # these are not rules + # additionally, .git has files that are not .yml and generate the warning + # skip those too continue for file in files: if not file.endswith(".yml"): if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))): + # expect to see .git* files, readme.md, format.md, and maybe a .git directory + # other things maybe are rules, but are mis-named. logger.warning("skipping non-.yml file: %s", file) continue rule_file_paths.append(Path(root) / file) @@ -740,7 +745,7 @@ def collect_metadata( sha1.update(buf) sha256.update(buf) - rules_path = [r.resolve().absolute() for r in rules_path] + rules = tuple(str(r.resolve().absolute()) for r in rules_path) format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_ arch = get_arch(sample_path) os_ = get_os(sample_path) if os_ == OS_AUTO else os_ @@ -753,14 +758,14 @@ def collect_metadata( md5=md5.hexdigest(), sha1=sha1.hexdigest(), sha256=sha256.hexdigest(), - path=os.path.normpath(sample_path), + path=str(Path(sample_path).resolve()), ), analysis=rdoc.Analysis( format=format_, arch=arch, os=os_, extractor=extractor.__class__.__name__, - rules=tuple(rules_path), + rules=rules, base_address=frz.Address.from_capa(extractor.get_base_address()), layout=rdoc.Layout( functions=tuple(), diff --git a/capa/render/proto/__init__.py b/capa/render/proto/__init__.py index 9aae03ca..10df3b1a 100644 --- a/capa/render/proto/__init__.py +++ b/capa/render/proto/__init__.py @@ -138,7 +138,7 @@ def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata: os=meta.analysis.os, extractor=meta.analysis.extractor, # TODO convert analysis.rule type to Path in capa_pb2.Metadata - rules=list(str(rule) for rule in meta.analysis.rules), + rules=list(meta.analysis.rules), base_address=addr_to_pb2(meta.analysis.base_address), layout=capa_pb2.Layout( functions=[ @@ -501,7 +501,7 @@ def metadata_from_pb2(meta: capa_pb2.Metadata) -> rd.Metadata: arch=meta.analysis.arch, os=meta.analysis.os, extractor=meta.analysis.extractor, - rules=tuple(Path(r) for r in meta.analysis.rules), + rules=tuple(meta.analysis.rules), base_address=addr_from_pb2(meta.analysis.base_address), layout=rd.Layout( functions=tuple( diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 79ea63b6..654cb000 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -74,7 +74,7 @@ class Analysis(Model): arch: str os: str extractor: str - rules: Tuple[Path, ...] + rules: Tuple[str, ...] base_address: frz.Address layout: Layout feature_counts: FeatureCounts diff --git a/capa/render/verbose.py b/capa/render/verbose.py index fea63d07..536e7242 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -91,7 +91,7 @@ def render_meta(ostream, doc: rd.ResultDocument): ("arch", doc.meta.analysis.arch), ("extractor", doc.meta.analysis.extractor), ("base address", format_address(doc.meta.analysis.base_address)), - ("rules", "\n".join(tuple(str(rule) for rule in doc.meta.analysis.rules))), + ("rules", "\n".join(doc.meta.analysis.rules)), ("function count", len(doc.meta.analysis.feature_counts.functions)), ("library function count", len(doc.meta.analysis.library_functions)), ( diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 5e3ed0a1..44058153 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -59,10 +59,10 @@ import os import sys import json import logging -import os.path import argparse import multiprocessing import multiprocessing.pool +from pathlib import Path import capa import capa.main @@ -171,7 +171,7 @@ def main(argv=None): samples = [] for base, directories, files in os.walk(args.input): for file in files: - samples.append(os.path.join(base, file)) + samples.append(str(Path(base) / file)) def pmap(f, args, parallelism=multiprocessing.cpu_count()): """apply the given function f to the given args using subprocesses""" diff --git a/scripts/cache-ruleset.py b/scripts/cache-ruleset.py index 76dd3fd8..94ef3646 100644 --- a/scripts/cache-ruleset.py +++ b/scripts/cache-ruleset.py @@ -61,7 +61,7 @@ def main(argv=None): id = capa.rules.cache.compute_cache_identifier(content) path = capa.rules.cache.get_cache_path(args.cache, id) - assert os.path.exists(path) + assert path.exists() logger.info("cached to: %s", path) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 452fdc53..57e32b2f 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -197,7 +197,6 @@ def capa_details(rules_path, file_path, output_format="dictionary"): if __name__ == "__main__": import sys - import os.path import argparse RULES_PATH = capa.main.get_default_root() / "rules" @@ -209,6 +208,7 @@ if __name__ == "__main__": "--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary" ) args = parser.parse_args() - + if args.rules != RULES_PATH: + args.rules = Path(args.rules) print(capa_details(args.rules, args.file, args.output)) sys.exit(0) diff --git a/scripts/import-to-bn.py b/scripts/import-to-bn.py index 1e5b4ca0..04662324 100644 --- a/scripts/import-to-bn.py +++ b/scripts/import-to-bn.py @@ -24,6 +24,7 @@ Derived from: https://github.com/mandiant/capa/blob/master/scripts/import-to-ida """ import os import json +from pathlib import Path import binaryninja import binaryninja.interaction @@ -45,22 +46,23 @@ def append_func_cmt(bv, va, cmt): def load_analysis(bv): - shortname = os.path.splitext(os.path.basename(bv.file.filename))[0] - dirname = os.path.dirname(bv.file.filename) + shortname = Path(bv.file.filename).resolve().stem + dirname = Path(bv.file.filename).resolve().parent binaryninja.log_info(f"dirname: {dirname}\nshortname: {shortname}\n") - if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK): - path = os.path.join(dirname, shortname + ".js") - elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK): - path = os.path.join(dirname, shortname + ".json") + js_path = path = dirname / (shortname + ".js") + json_path = dirname / (shortname + ".json") + if os.access(str(js_path), os.R_OK): + path = js_path + elif os.access(str(json_path), os.R_OK): + path = json_path else: path = binaryninja.interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)") - if not path or not os.access(path, os.R_OK): + if not path or not os.access(str(path), os.R_OK): binaryninja.log_error("Invalid filename.") return 0 binaryninja.log_info(f"Using capa file {path}") - with open(path, "rb") as f: - doc = json.loads(f.read().decode("utf-8")) + doc = json.loads(path.read_bytes().decode("utf-8")) if "meta" not in doc or "rules" not in doc: binaryninja.log_error("doesn't appear to be a capa report") diff --git a/scripts/lint.py b/scripts/lint.py index a80d3e12..359f282b 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -114,7 +114,7 @@ class FilenameDoesntMatchRuleName(Lint): expected = expected.replace(".", "") expected = expected + ".yml" - found = os.path.basename(rule.meta["capa/path"]) + found = Path(rule.meta["capa/path"]).name self.recommendation = self.recommendation_template.format(expected, found) @@ -249,7 +249,8 @@ class InvalidAttckOrMbcTechnique(Lint): super().__init__() try: - with open(f"{os.path.dirname(__file__)}/linter-data.json", "rb") as fd: + data_path = Path(__file__).resolve().parent / "linter-data.json" + with data_path.open("rb") as fd: self.data = json.load(fd) self.enabled_frameworks = self.data.keys() except BaseException: @@ -295,7 +296,7 @@ DEFAULT_SIGNATURES = capa.main.get_default_signatures() def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]: - nice_path = os.path.abspath(str(path)) + nice_path = path.resolve().absolute().as_posix() if path in ctx.capabilities_by_sample: logger.debug("found cached results: %s: %d capabilities", nice_path, len(ctx.capabilities_by_sample[path])) return ctx.capabilities_by_sample[path] @@ -883,43 +884,31 @@ def lint(ctx: Context): return ret -def collect_samples(path) -> Dict[str, Path]: +def collect_samples(path: Path) -> Dict[str, Path]: """ - recurse through the given path, collecting all file paths, indexed by their content sha256, md5, and filename. + Recurse through the given path, collecting all file paths, indexed by their content sha256, md5, and filename. """ samples = {} - for root, dirs, files in os.walk(path): - for name in files: - if name.endswith(".viv"): - continue - if name.endswith(".idb"): - continue - if name.endswith(".i64"): - continue - if name.endswith(".frz"): - continue - if name.endswith(".fnames"): - continue + for path in path.rglob("*"): + if path.suffix in [".viv", ".idb", ".i64", ".frz", ".fnames"]: + continue - path = pathlib.Path(os.path.join(root, name)) + try: + buf = path.read_bytes() + except IOError: + continue - try: - with path.open("rb") as f: - buf = f.read() - except IOError: - continue + sha256 = hashlib.sha256() + sha256.update(buf) - sha256 = hashlib.sha256() - sha256.update(buf) + md5 = hashlib.md5() + md5.update(buf) - md5 = hashlib.md5() - md5.update(buf) - - samples[sha256.hexdigest().lower()] = path - samples[sha256.hexdigest().upper()] = path - samples[md5.hexdigest().lower()] = path - samples[md5.hexdigest().upper()] = path - samples[name] = path + samples[sha256.hexdigest().lower()] = path + samples[sha256.hexdigest().upper()] = path + samples[md5.hexdigest().lower()] = path + samples[md5.hexdigest().upper()] = path + samples[path.name] = path return samples @@ -928,7 +917,7 @@ def main(argv=None): if argv is None: argv = sys.argv[1:] - samples_path = os.path.join(os.path.dirname(__file__), "..", "tests", "data") + samples_path = str(Path(__file__).resolve().parent.parent / "tests" / "data") parser = argparse.ArgumentParser(description="Lint capa rules.") capa.main.install_common_args(parser, wanted={"tag"}) @@ -964,11 +953,12 @@ def main(argv=None): return -1 logger.info("collecting potentially referenced samples") - if not os.path.exists(args.samples): - logger.error("samples path %s does not exist", args.samples) + samplePath = Path(args.samples) + if not samplePath.exists(): + logger.error("samples path %s does not exist", samplePath) return -1 - samples = collect_samples(args.samples) + samples = collect_samples(samplePath) ctx = Context(samples=samples, rules=rules, is_thorough=args.thorough) diff --git a/scripts/setup-linter-dependencies.py b/scripts/setup-linter-dependencies.py index ff271ab6..a59d32f0 100644 --- a/scripts/setup-linter-dependencies.py +++ b/scripts/setup-linter-dependencies.py @@ -37,7 +37,7 @@ import logging import argparse from sys import argv from typing import Dict, List -from os.path import dirname +from pathlib import Path import requests from stix2 import Filter, MemoryStore, AttackPattern # type: ignore @@ -187,7 +187,7 @@ if __name__ == "__main__": "--output", "-o", type=str, - default=f"{dirname(__file__)}/linter-data.json", + default=str(Path(__file__).resolve().parent / "linter-data.json"), help="Path to output file (lint.py will be looking for linter-data.json)", ) main(parser.parse_args(args=argv[1:])) diff --git a/tests/test_proto.py b/tests/test_proto.py index da7cc686..64c950bc 100644 --- a/tests/test_proto.py +++ b/tests/test_proto.py @@ -144,7 +144,7 @@ def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata): assert meta.analysis.arch == dst.analysis.arch assert meta.analysis.os == dst.analysis.os assert meta.analysis.extractor == dst.analysis.extractor - assert list(str(r) for r in meta.analysis.rules) == dst.analysis.rules + assert list(meta.analysis.rules) == dst.analysis.rules assert capa.render.proto.addr_to_pb2(meta.analysis.base_address) == dst.analysis.base_address assert len(meta.analysis.layout.functions) == len(dst.analysis.layout.functions)