diff --git a/capa/features/freeze.py b/capa/features/freeze.py index bac23b8d..47fbcf3a 100644 --- a/capa/features/freeze.py +++ b/capa/features/freeze.py @@ -52,6 +52,7 @@ See the License for the specific language governing permissions and limitations import json import zlib import logging +import os.path import capa.features.file import capa.features.insn @@ -254,7 +255,10 @@ def main(argv=None): args = parser.parse_args(args=argv) capa.main.handle_common_args(args) - extractor = capa.main.get_extractor(args.sample, args.format, args.backend, args.signatures, False) + sigpaths = capa.main.get_signatures(args.signatures) + + extractor = capa.main.get_extractor(args.sample, args.format, args.backend, sigpaths, False) + with open(args.output, "wb") as f: f.write(dump(extractor)) diff --git a/capa/main.py b/capa/main.py index 53addd5b..1f960a01 100644 --- a/capa/main.py +++ b/capa/main.py @@ -44,6 +44,7 @@ from capa.helpers import get_file_taste from capa.features.extractors.base_extractor import FunctionHandle, FeatureExtractor RULES_PATH_DEFAULT_STRING = "(embedded rules)" +SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)" SUPPORTED_FILE_MAGIC = set([b"MZ"]) BACKEND_VIV = "vivisect" BACKEND_SMDA = "smda" @@ -539,6 +540,25 @@ def get_rules(rule_path: str, disable_progress=False) -> List[Rule]: return rules +def get_signatures(sigs_path): + if not os.path.exists(sigs_path): + raise IOError("signatures path %s does not exist or cannot be accessed" % sigs_path) + + paths = [] + if os.path.isfile(sigs_path): + paths.append(sigs_path) + elif os.path.isdir(sigs_path): + logger.debug("reading signatures from directory %s", sigs_path) + for root, dirs, files in os.walk(sigs_path): + for file in files: + if file.endswith((".pat", ".pat.gz", ".sig")): + sig_path = os.path.join(root, file) + logger.debug("found signature: %s", sig_path) + paths.append(sig_path) + + return paths + + def collect_metadata(argv, sample_path, rules_path, format, extractor): md5 = hashlib.md5() sha1 = hashlib.sha1() @@ -670,12 +690,9 @@ def install_common_args(parser, wanted=None): if "signatures" in wanted: parser.add_argument( "--signature", - action="append", dest="signatures", type=str, - # with action=append, users can specify futher signatures but not override whats found in $capa/sigs/. - # seems reasonable for now. this is an easy way to register the default signature set. - default=get_default_signatures(), + default=SIGNATURES_PATH_DEFAULT_STRING, help="use the given signatures to identify library functions, file system paths to .sig/.pat files.", ) @@ -687,6 +704,9 @@ def handle_common_args(args): """ handle the global config specified by `install_common_args`, such as configuring logging/coloring/etc. + the following fields will be overwritten when present: + - rules: file system path to rule files. + - signatures: file system path to signature files. args: args (argparse.Namespace): parsed arguments that included at least `install_common_args` args. @@ -724,6 +744,47 @@ def handle_common_args(args): else: raise RuntimeError("unexpected --color value: " + args.color) + if hasattr(args, "rules"): + if args.rules == RULES_PATH_DEFAULT_STRING: + logger.debug("-" * 80) + logger.debug(" Using default embedded rules.") + logger.debug(" To provide your own rules, use the form `capa.exe -r ./path/to/rules/ /path/to/mal.exe`.") + logger.debug(" You can see the current default rule set here:") + logger.debug(" https://github.com/fireeye/capa-rules") + logger.debug("-" * 80) + + rules_path = os.path.join(get_default_root(), "rules") + + if not os.path.exists(rules_path): + # when a users installs capa via pip, + # this pulls down just the source code - not the default rules. + # i'm not sure the default rules should even be written to the library directory, + # so in this case, we require the user to use -r to specify the rule directory. + logger.error("default embedded rules not found! (maybe you installed capa as a library?)") + logger.error("provide your own rule set via the `-r` option.") + return -1 + else: + rules_path = args.rules + logger.debug("using rules path: %s", rules_path) + + args.rules = rules_path + + if hasattr(args, "signatures"): + if args.signatures == SIGNATURES_PATH_DEFAULT_STRING: + logger.debug("-" * 80) + logger.debug(" Using default embedded signatures.") + logger.debug( + " To provide your own signatures, use the form `capa.exe --signature ./path/to/signatures/ /path/to/mal.exe`." + ) + logger.debug("-" * 80) + + sigs_path = os.path.join(get_default_root(), "sigs") + else: + sigs_path = args.signatures + logger.debug("using signatures path: %s", sigs_path) + + args.signatures = sigs_path + def main(argv=None): if sys.version_info < (3, 6): @@ -777,31 +838,8 @@ def main(argv=None): logger.error("%s", e.args[0]) return -1 - if args.rules == RULES_PATH_DEFAULT_STRING: - logger.debug("-" * 80) - logger.debug(" Using default embedded rules.") - logger.debug(" To provide your own rules, use the form `capa.exe -r ./path/to/rules/ /path/to/mal.exe`.") - logger.debug(" You can see the current default rule set here:") - logger.debug(" https://github.com/fireeye/capa-rules") - logger.debug("-" * 80) - - rules_path = os.path.join(get_default_root(), "rules") - logger.debug("rule path: %s", rules_path) - - if not os.path.exists(rules_path): - # when a users installs capa via pip, - # this pulls down just the source code - not the default rules. - # i'm not sure the default rules should even be written to the library directory, - # so in this case, we require the user to use -r to specify the rule directory. - logger.error("default embedded rules not found! (maybe you installed capa as a library?)") - logger.error("provide your own rule set via the `-r` option.") - return -1 - else: - rules_path = args.rules - logger.debug("using rules path: %s", rules_path) - try: - rules = get_rules(rules_path, disable_progress=args.quiet) + rules = get_rules(args.rules, disable_progress=args.quiet) rules = capa.rules.RuleSet(rules) logger.debug( "successfully loaded %s rules", @@ -837,6 +875,12 @@ def main(argv=None): logger.debug("file limitation short circuit, won't analyze fully.") return -1 + try: + sig_paths = get_signatures(args.signatures) + except (IOError) as e: + logger.error("%s", str(e)) + return -1 + if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)): format = "freeze" with open(args.sample, "rb") as f: @@ -847,7 +891,7 @@ def main(argv=None): try: extractor = get_extractor( - args.sample, format, args.backend, args.signatures, should_save_workspace, disable_progress=args.quiet + args.sample, format, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet ) except UnsupportedFormatError: logger.error("-" * 80) diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 81575d9d..3e4d6c25 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -5,7 +5,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from typing import Dict, List, Tuple +import copy import capa.rules import capa.engine @@ -206,6 +206,9 @@ def convert_match_to_result_document(rules, capabilities, result): def convert_meta_to_result_document(meta): + # make a copy so that we don't modify the given parameter + meta = copy.deepcopy(meta) + attacks = meta.get("att&ck", []) meta["att&ck"] = [parse_canonical_attack(attack) for attack in attacks] mbcs = meta.get("mbc", []) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index f120fe66..da80a477 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -78,6 +78,7 @@ def get_capa_results(args): args is a tuple, containing: rules (capa.rules.RuleSet): the rules to match + signatures (List[str]): list of file system paths to signature files format (str): the name of the sample file format path (str): the file system path to the sample to process @@ -94,12 +95,12 @@ def get_capa_results(args): meta (dict): the meta analysis results capabilities (dict): the matched capabilities and their result objects """ - rules, format, path = args + rules, sigpaths, format, path = args should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) logger.info("computing capa results for: %s", path) try: extractor = capa.main.get_extractor( - path, format, capa.main.BACKEND_VIV, args.signatures, should_save_workspace, disable_progress=True + path, format, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True ) except capa.main.UnsupportedFormatError: # i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries. @@ -153,14 +154,6 @@ def main(argv=None): args = parser.parse_args(args=argv) capa.main.handle_common_args(args) - if args.rules == "(embedded rules)": - logger.info("using default embedded rules") - logger.debug("detected running from source") - args.rules = os.path.join(os.path.dirname(__file__), "..", "rules") - logger.debug("default rule path (source method): %s", args.rules) - else: - logger.info("using rules path: %s", args.rules) - try: rules = capa.main.get_rules(args.rules) rules = capa.rules.RuleSet(rules) @@ -169,6 +162,12 @@ def main(argv=None): logger.error("%s", str(e)) return -1 + try: + sig_paths = capa.main.get_signatures(args.signatures) + except (IOError) as e: + logger.error("%s", str(e)) + return -1 + samples = [] for (base, directories, files) in os.walk(args.input): for file in files: @@ -200,7 +199,7 @@ def main(argv=None): results = {} for result in mapper( - get_capa_results, [(rules, "pe", sample) for sample in samples], parallelism=args.parallelism + get_capa_results, [(rules, sig_paths, "pe", sample) for sample in samples], parallelism=args.parallelism ): if result["status"] == "error": logger.warning(result["error"]) diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 5ec8d6f3..b95481df 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -121,20 +121,6 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - if args.rules == "(embedded rules)": - logger.info("-" * 80) - logger.info(" Using default embedded rules.") - logger.info(" To provide your own rules, use the form `capa.exe -r ./path/to/rules/ /path/to/mal.exe`.") - logger.info(" You can see the current default rule set here:") - logger.info(" https://github.com/fireeye/capa-rules") - logger.info("-" * 80) - - logger.debug("detected running from source") - args.rules = os.path.join(os.path.dirname(__file__), "..", "rules") - logger.debug("default rule path (source method): %s", args.rules) - else: - logger.info("using rules path: %s", args.rules) - try: rules = capa.main.get_rules(args.rules) rules = capa.rules.RuleSet(rules) @@ -146,6 +132,12 @@ def main(argv=None): logger.error("%s", str(e)) return -1 + try: + sig_paths = capa.main.get_signatures(args.signatures) + except (IOError) as e: + logger.error("%s", str(e)) + return -1 + if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)): format = "freeze" with open(args.sample, "rb") as f: @@ -156,7 +148,7 @@ def main(argv=None): try: extractor = capa.main.get_extractor( - args.sample, args.format, args.backend, args.signatures, should_save_workspace + args.sample, args.format, args.backend, sig_paths, should_save_workspace ) except capa.main.UnsupportedFormatError: logger.error("-" * 80) diff --git a/scripts/show-features.py b/scripts/show-features.py index 182ddb42..78d3b370 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -66,6 +66,7 @@ Example:: """ import sys import logging +import os.path import argparse import capa.main @@ -95,14 +96,18 @@ def main(argv=None): logger.error("%s", str(e)) return -1 + try: + sig_paths = capa.main.get_signatures(args.signatures) + except (IOError) as e: + logger.error("%s", str(e)) + return -1 + if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)): with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) else: try: - extractor = capa.main.get_extractor( - args.sample, args.format, capa.main.BACKEND_VIV, sigpaths=args.signatures - ) + extractor = capa.main.get_extractor(args.sample, args.format, capa.main.BACKEND_VIV, sigpaths=sig_paths) except capa.main.UnsupportedFormatError: logger.error("-" * 80) logger.error(" Input file does not appear to be a PE file.")