Merge remote-tracking branch 'parentrepo/dynamic-feature-extraction' into find-dynamic-capabilities

This commit is contained in:
Yacine Elhamer
2023-07-20 20:02:49 +01:00
141 changed files with 1054 additions and 756 deletions

View File

@@ -1,4 +1,11 @@
#!/usr/bin/env python
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
bulk-process
@@ -47,7 +54,7 @@ usage:
parallelism factor
--no-mp disable subprocesses
Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
@@ -59,10 +66,10 @@ import os
import sys
import json
import logging
import os.path
import argparse
import multiprocessing
import multiprocessing.pool
from pathlib import Path
import capa
import capa.main
@@ -167,9 +174,8 @@ def main(argv=None):
return -1
samples = []
for base, _, files in os.walk(args.input):
for file in files:
samples.append(os.path.join(base, file))
for file in Path(args.input).rglob("*"):
samples.append(file)
cpu_count = multiprocessing.cpu_count()
@@ -206,7 +212,7 @@ def main(argv=None):
if result["status"] == "error":
logger.warning(result["error"])
elif result["status"] == "ok":
results[result["path"]] = rd.ResultDocument.parse_obj(result["ok"]).json(exclude_none=True)
results[result["path"].as_posix()] = rd.ResultDocument.parse_obj(result["ok"]).json(exclude_none=True)
else:
raise ValueError(f"unexpected status: {result['status']}")

View File

@@ -15,10 +15,10 @@ Unless required by applicable law or agreed to in writing, software distributed
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import os
import sys
import logging
import argparse
from pathlib import Path
import capa.main
import capa.rules
@@ -47,8 +47,9 @@ def main(argv=None):
logging.getLogger("capa").setLevel(logging.ERROR)
try:
os.makedirs(args.cache, exist_ok=True)
rules = capa.main.get_rules(args.rules, cache_dir=args.cache)
cache_dir = Path(args.cache)
cache_dir.mkdir(parents=True, exist_ok=True)
rules = capa.main.get_rules(args.rules, cache_dir)
logger.info("successfully loaded %s rules", len(rules))
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
logger.error("%s", str(e))
@@ -56,9 +57,9 @@ def main(argv=None):
content = capa.rules.cache.get_ruleset_content(rules)
id = capa.rules.cache.compute_cache_identifier(content)
path = capa.rules.cache.get_cache_path(args.cache, id)
path = capa.rules.cache.get_cache_path(cache_dir, id)
assert os.path.exists(path)
assert path.exists()
logger.info("cached to: %s", path)

View File

@@ -1,3 +1,10 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
Convert capa rules to YARA rules (where this is possible)
@@ -37,6 +44,7 @@ import logging
import argparse
import datetime
import itertools
from pathlib import Path
import capa.main
import capa.rules
@@ -71,8 +79,8 @@ default_tags = "CAPA "
# minimum number of rounds to do be able to convert rules which depend on referenced rules in several levels of depth
min_rounds = 5
unsupported_capa_rules = open("unsupported_capa_rules.yml", "wb")
unsupported_capa_rules_names = open("unsupported_capa_rules.txt", "wb")
unsupported_capa_rules = Path("unsupported_capa_rules.yml").open("wb")
unsupported_capa_rules_names = Path("unsupported_capa_rules.txt").open("wb")
unsupported_capa_rules_list = []
condition_header = """
@@ -396,7 +404,7 @@ def convert_rule(rule, rulename, cround, depth):
# this is "x or more". could be coded for strings TODO
return "BREAK", "Some aka x or more (TODO)", rule_comment, incomplete
if s_type == "And" or s_type == "Or" or s_type == "Not" and not kid.name == "Some":
if s_type == "And" or s_type == "Or" or s_type == "Not" and kid.name != "Some":
logger.info("doing bool with recursion: %r", kid)
logger.info("kid coming: %r", kid.name)
# logger.info("grandchildren: " + repr(kid.children))
@@ -714,7 +722,7 @@ def main(argv=None):
logging.getLogger("capa2yara").setLevel(level)
try:
rules = capa.main.get_rules([args.rules])
rules = capa.main.get_rules([Path(args.rules)])
namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values()))
logger.info("successfully loaded %d rules (including subscope rules which will be ignored)", len(rules))
if args.tag:

View File

@@ -1,8 +1,16 @@
#!/usr/bin/env python3
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import json
import collections
from typing import Any, Set, Dict
from pathlib import Path
import capa.main
import capa.rules
@@ -159,7 +167,7 @@ def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]:
# ==== render dictionary helpers
def capa_details(rules_path, file_path, output_format="dictionary"):
def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"):
# load rules from disk
rules = capa.main.get_rules([rules_path])
@@ -170,7 +178,7 @@ def capa_details(rules_path, file_path, output_format="dictionary"):
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
# collect metadata (used only to make rendering more complete)
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor, counts)
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts)
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
capa_output: Any = False
@@ -192,18 +200,18 @@ def capa_details(rules_path, file_path, output_format="dictionary"):
if __name__ == "__main__":
import sys
import os.path
import argparse
RULES_PATH = os.path.join(os.path.dirname(__file__), "..", "rules")
RULES_PATH = capa.main.get_default_root() / "rules"
parser = argparse.ArgumentParser(description="Extract capabilities from a file")
parser.add_argument("file", help="file to extract capabilities from")
parser.add_argument("--rules", help="path to rules directory", default=os.path.abspath(RULES_PATH))
parser.add_argument("--rules", help="path to rules directory", default=RULES_PATH)
parser.add_argument(
"--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary"
)
args = parser.parse_args()
print(capa_details(args.rules, args.file, args.output))
if args.rules != RULES_PATH:
args.rules = Path(args.rules)
print(capa_details(args.rules, Path(args.file), args.output))
sys.exit(0)

View File

@@ -6,7 +6,7 @@ Usage:
$ python capafmt.py -i foo.yml
Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
@@ -17,6 +17,7 @@ See the License for the specific language governing permissions and limitations
import sys
import logging
import argparse
from pathlib import Path
import capa.rules
@@ -70,8 +71,7 @@ def main(argv=None):
return 1
if args.in_place:
with open(args.path, "wb") as f:
f.write(reformatted_rule.encode("utf-8"))
Path(args.path).write_bytes(reformatted_rule.encode("utf-8"))
else:
print(reformatted_rule)

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python2
"""
Copyright (C) 2021 Mandiant, Inc. All Rights Reserved.
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
@@ -17,6 +17,7 @@ import logging
import argparse
import contextlib
from typing import BinaryIO
from pathlib import Path
import capa.helpers
import capa.features.extractors.elf
@@ -56,7 +57,7 @@ def main(argv=None):
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
f = open(args.sample, "rb")
f = Path(args.sample).open("rb")
with contextlib.closing(f):
try:

View File

@@ -1,6 +1,14 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import logging
import argparse
from pathlib import Path
import capa.main
import capa.rules
@@ -42,7 +50,7 @@ def get_features(rule_path: str) -> list:
list: A list of all feature statements contained within the rule file.
"""
feature_list = []
with open(rule_path, "r", encoding="utf-8") as f:
with Path(rule_path).open("r", encoding="utf-8") as f:
try:
new_rule = capa.rules.Rule.from_yaml(f.read())
feature_list = get_child_features(new_rule.statement)
@@ -89,7 +97,7 @@ def main():
args = parser.parse_args()
new_rule_path = args.new_rule
rules_path = args.rules
rules_path = [Path(rule) for rule in args.rules]
result = find_overlapping_rules(new_rule_path, rules_path)

View File

@@ -1,3 +1,10 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
Binary Ninja plugin that imports a capa report,
produced via `capa --json /path/to/sample`,
@@ -24,6 +31,7 @@ Derived from: https://github.com/mandiant/capa/blob/master/scripts/import-to-ida
"""
import os
import json
from pathlib import Path
import binaryninja
import binaryninja.interaction
@@ -45,22 +53,23 @@ def append_func_cmt(bv, va, cmt):
def load_analysis(bv):
shortname = os.path.splitext(os.path.basename(bv.file.filename))[0]
dirname = os.path.dirname(bv.file.filename)
shortname = Path(bv.file.filename).resolve().stem
dirname = Path(bv.file.filename).resolve().parent
binaryninja.log_info(f"dirname: {dirname}\nshortname: {shortname}\n")
if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK):
path = os.path.join(dirname, shortname + ".js")
elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK):
path = os.path.join(dirname, shortname + ".json")
js_path = path = dirname / (shortname + ".js")
json_path = dirname / (shortname + ".json")
if os.access(str(js_path), os.R_OK):
path = js_path
elif os.access(str(json_path), os.R_OK):
path = json_path
else:
path = binaryninja.interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)")
if not path or not os.access(path, os.R_OK):
if not path or not os.access(str(path), os.R_OK):
binaryninja.log_error("Invalid filename.")
return 0
binaryninja.log_info(f"Using capa file {path}")
with open(path, "rb") as f:
doc = json.loads(f.read().decode("utf-8"))
doc = json.loads(path.read_bytes().decode("utf-8"))
if "meta" not in doc or "rules" not in doc:
binaryninja.log_error("doesn't appear to be a capa report")
@@ -70,7 +79,7 @@ def load_analysis(bv):
md5 = binaryninja.Transform["MD5"]
rawhex = binaryninja.Transform["RawHex"]
b = rawhex.encode(md5.encode(bv.parent_view.read(bv.parent_view.start, bv.parent_view.end))).decode("utf-8")
if not a == b:
if a != b:
binaryninja.log_error("sample mismatch")
return -2

View File

@@ -20,7 +20,7 @@ and then select the existing capa report from the file system.
This script will verify that the report matches the workspace.
Check the output window for any errors, and/or the summary of changes.
Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt

View File

@@ -5,7 +5,7 @@ Usage:
$ python scripts/lint.py rules/
Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
@@ -23,7 +23,6 @@ import string
import difflib
import hashlib
import logging
import pathlib
import argparse
import itertools
import posixpath
@@ -114,7 +113,7 @@ class FilenameDoesntMatchRuleName(Lint):
expected = expected.replace(".", "")
expected = expected + ".yml"
found = os.path.basename(rule.meta["capa/path"])
found = Path(rule.meta["capa/path"]).name
self.recommendation = self.recommendation_template.format(expected, found)
@@ -249,7 +248,8 @@ class InvalidAttckOrMbcTechnique(Lint):
super().__init__()
try:
with open(f"{os.path.dirname(__file__)}/linter-data.json", "rb") as fd:
data_path = Path(__file__).resolve().parent / "linter-data.json"
with data_path.open("rb") as fd:
self.data = json.load(fd)
self.enabled_frameworks = self.data.keys()
except BaseException:
@@ -279,7 +279,7 @@ class InvalidAttckOrMbcTechnique(Lint):
def check_rule(self, ctx: Context, rule: Rule):
for framework in self.enabled_frameworks:
if framework in rule.meta.keys():
if framework in rule.meta:
for r in rule.meta[framework]:
m = self.reg.match(r)
if m is None:
@@ -295,14 +295,14 @@ DEFAULT_SIGNATURES = capa.main.get_default_signatures()
def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]:
nice_path = os.path.abspath(str(path))
nice_path = path.resolve().absolute()
if path in ctx.capabilities_by_sample:
logger.debug("found cached results: %s: %d capabilities", nice_path, len(ctx.capabilities_by_sample[path]))
return ctx.capabilities_by_sample[path]
if nice_path.endswith(capa.helpers.EXTENSIONS_SHELLCODE_32):
if nice_path.name.endswith(capa.helpers.EXTENSIONS_SHELLCODE_32):
format_ = "sc32"
elif nice_path.endswith(capa.helpers.EXTENSIONS_SHELLCODE_64):
elif nice_path.name.endswith(capa.helpers.EXTENSIONS_SHELLCODE_64):
format_ = "sc64"
else:
format_ = capa.main.get_auto_format(nice_path)
@@ -355,7 +355,7 @@ class DoesntMatchExample(Lint):
try:
capabilities = get_sample_capabilities(ctx, path)
except Exception as e:
logger.exception("failed to extract capabilities: %s %s %s", rule.name, str(path), e)
logger.exception("failed to extract capabilities: %s %s %s", rule.name, path, e)
return True
if rule.name not in capabilities:
@@ -543,47 +543,45 @@ class FeatureNtdllNtoskrnlApi(Lint):
assert isinstance(feature.value, str)
modname, _, impname = feature.value.rpartition(".")
if modname == "ntdll":
if impname in (
"LdrGetProcedureAddress",
"LdrLoadDll",
"NtCreateThread",
"NtCreatUserProcess",
"NtLoadDriver",
"NtQueryDirectoryObject",
"NtResumeThread",
"NtSuspendThread",
"NtTerminateProcess",
"NtWriteVirtualMemory",
"RtlGetNativeSystemInformation",
"NtCreateThreadEx",
"NtCreateUserProcess",
"NtOpenDirectoryObject",
"NtQueueApcThread",
"ZwResumeThread",
"ZwSuspendThread",
"ZwWriteVirtualMemory",
"NtCreateProcess",
"ZwCreateThread",
"NtCreateProcessEx",
"ZwCreateThreadEx",
"ZwCreateProcess",
"ZwCreateUserProcess",
"RtlCreateUserProcess",
):
# ntoskrnl.exe does not export these routines
continue
if modname == "ntdll" and impname in (
"LdrGetProcedureAddress",
"LdrLoadDll",
"NtCreateThread",
"NtCreatUserProcess",
"NtLoadDriver",
"NtQueryDirectoryObject",
"NtResumeThread",
"NtSuspendThread",
"NtTerminateProcess",
"NtWriteVirtualMemory",
"RtlGetNativeSystemInformation",
"NtCreateThreadEx",
"NtCreateUserProcess",
"NtOpenDirectoryObject",
"NtQueueApcThread",
"ZwResumeThread",
"ZwSuspendThread",
"ZwWriteVirtualMemory",
"NtCreateProcess",
"ZwCreateThread",
"NtCreateProcessEx",
"ZwCreateThreadEx",
"ZwCreateProcess",
"ZwCreateUserProcess",
"RtlCreateUserProcess",
):
# ntoskrnl.exe does not export these routines
continue
if modname == "ntoskrnl":
if impname in (
"PsGetVersion",
"PsLookupProcessByProcessId",
"KeStackAttachProcess",
"ObfDereferenceObject",
"KeUnstackDetachProcess",
):
# ntdll.dll does not export these routines
continue
if modname == "ntoskrnl" and impname in (
"PsGetVersion",
"PsLookupProcessByProcessId",
"KeStackAttachProcess",
"ObfDereferenceObject",
"KeUnstackDetachProcess",
):
# ntdll.dll does not export these routines
continue
if modname in ("ntdll", "ntoskrnl"):
self.recommendation = self.recommendation_template.format(impname, modname)
@@ -883,43 +881,31 @@ def lint(ctx: Context):
return ret
def collect_samples(path) -> Dict[str, Path]:
def collect_samples(samples_path: Path) -> Dict[str, Path]:
"""
recurse through the given path, collecting all file paths, indexed by their content sha256, md5, and filename.
"""
samples = {}
for root, _, files in os.walk(path):
for name in files:
if name.endswith(".viv"):
continue
if name.endswith(".idb"):
continue
if name.endswith(".i64"):
continue
if name.endswith(".frz"):
continue
if name.endswith(".fnames"):
continue
for path in samples_path.rglob("*"):
if path.suffix in [".viv", ".idb", ".i64", ".frz", ".fnames"]:
continue
path = pathlib.Path(os.path.join(root, name))
try:
buf = path.read_bytes()
except IOError:
continue
try:
with path.open("rb") as f:
buf = f.read()
except IOError:
continue
sha256 = hashlib.sha256()
sha256.update(buf)
sha256 = hashlib.sha256()
sha256.update(buf)
md5 = hashlib.md5()
md5.update(buf)
md5 = hashlib.md5()
md5.update(buf)
samples[sha256.hexdigest().lower()] = path
samples[sha256.hexdigest().upper()] = path
samples[md5.hexdigest().lower()] = path
samples[md5.hexdigest().upper()] = path
samples[name] = path
samples[sha256.hexdigest().lower()] = path
samples[sha256.hexdigest().upper()] = path
samples[md5.hexdigest().lower()] = path
samples[md5.hexdigest().upper()] = path
samples[path.name] = path
return samples
@@ -928,16 +914,16 @@ def main(argv=None):
if argv is None:
argv = sys.argv[1:]
default_samples_path = str(Path(__file__).resolve().parent.parent / "tests" / "data")
# TODO(yelhamer): remove once support for the legacy scope field has been added
# https://github.com/mandiant/capa/pull/1580
return 0
samples_path = os.path.join(os.path.dirname(__file__), "..", "tests", "data")
parser = argparse.ArgumentParser(description="Lint capa rules.")
capa.main.install_common_args(parser, wanted={"tag"})
parser.add_argument("rules", type=str, action="append", help="Path to rules")
parser.add_argument("--samples", type=str, default=samples_path, help="Path to samples")
parser.add_argument("--samples", type=str, default=default_samples_path, help="Path to samples")
parser.add_argument(
"--thorough",
action="store_true",
@@ -968,11 +954,12 @@ def main(argv=None):
return -1
logger.info("collecting potentially referenced samples")
if not os.path.exists(args.samples):
logger.error("samples path %s does not exist", args.samples)
samples_path = Path(args.samples)
if not samples_path.exists():
logger.error("samples path %s does not exist", Path(samples_path))
return -1
samples = collect_samples(args.samples)
samples = collect_samples(Path(samples_path))
ctx = Context(samples=samples, rules=rules, is_thorough=args.thorough)

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
Copyright (C) 2021 Mandiant, Inc. All Rights Reserved.
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt

View File

@@ -1,3 +1,10 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import gc
import linecache
import tracemalloc
@@ -50,13 +57,12 @@ def main():
for i in range(count):
print(f"iteration {i+1}/{count}...")
with contextlib.redirect_stdout(io.StringIO()):
with contextlib.redirect_stderr(io.StringIO()):
t0 = time.time()
capa.main.main()
t1 = time.time()
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
t0 = time.time()
capa.main.main()
t1 = time.time()
gc.collect()
gc.collect()
process = psutil.Process(os.getpid())
print(f" duration: {(t1-t0):.2f}")

View File

@@ -1,3 +1,10 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
Invoke capa multiple times and record profiling informations.
Use the --number and --repeat options to change the number of iterations.
@@ -34,6 +41,7 @@ import timeit
import logging
import argparse
import subprocess
from pathlib import Path
import tqdm
import tabulate
@@ -51,22 +59,16 @@ from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeat
logger = logging.getLogger("capa.profile")
def subshell(cmd):
return subprocess.run(cmd, shell=True, capture_output=True, text=True).stdout.strip()
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
label = subprocess.run(
"git show --pretty=oneline --abbrev-commit | head -n 1", shell=True, capture_output=True, text=True
).stdout.strip()
is_dirty = (
subprocess.run(
"git status | grep 'modified: ' | grep -v 'rules' | grep -v 'tests/data'",
shell=True,
capture_output=True,
text=True,
).stdout
!= ""
)
label = subshell("git show --pretty=oneline --abbrev-commit | head -n 1").strip()
is_dirty = subshell("git status | grep 'modified: ' | grep -v 'rules' | grep -v 'tests/data'") != ""
if is_dirty:
label += " (dirty)"
@@ -82,7 +84,7 @@ def main(argv=None):
capa.main.handle_common_args(args)
try:
taste = capa.helpers.get_file_taste(args.sample)
taste = capa.helpers.get_file_taste(Path(args.sample))
except IOError as e:
logger.error("%s", str(e))
return -1
@@ -103,9 +105,8 @@ def main(argv=None):
if (args.format == "freeze") or (
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
):
with open(args.sample, "rb") as f:
extractor: FeatureExtractor = capa.features.freeze.load(f.read())
assert isinstance(extractor, StaticFeatureExtractor)
extractor: FeatureExtractor = capa.features.freeze.load(Path(args.sample).read_bytes())
assert isinstance(extractor, StaticFeatureExtractor)
else:
extractor = capa.main.get_extractor(
args.sample, args.format, args.os, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False

View File

@@ -34,6 +34,7 @@ Example:
import sys
import logging
import argparse
from pathlib import Path
import capa.render.json
import capa.render.proto
@@ -71,8 +72,7 @@ def main(argv=None):
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
with open(args.pb, "rb") as f:
pb = f.read()
pb = Path(args.pb).read_bytes()
rdpb = capa.render.proto.capa_pb2.ResultDocument()
rdpb.ParseFromString(pb)

View File

@@ -1,3 +1,10 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
Generate capa linter-data.json, used to validate Att&ck/MBC IDs and names.
@@ -37,7 +44,7 @@ import logging
import argparse
from sys import argv
from typing import Dict, List
from os.path import dirname
from pathlib import Path
import requests
from stix2 import Filter, MemoryStore, AttackPattern # type: ignore
@@ -172,7 +179,7 @@ def main(args: argparse.Namespace) -> None:
logging.info("Writing results to %s", args.output)
try:
with open(args.output, "w", encoding="utf-8") as jf:
with Path(args.output).open("w", encoding="utf-8") as jf:
json.dump(data, jf, indent=2)
except BaseException as e:
logging.error("Exception encountered when writing results: %s", e)
@@ -187,7 +194,7 @@ if __name__ == "__main__":
"--output",
"-o",
type=str,
default=f"{dirname(__file__)}/linter-data.json",
default=str(Path(__file__).resolve().parent / "linter-data.json"),
help="Path to output file (lint.py will be looking for linter-data.json)",
)
main(parser.parse_args(args=argv[1:]))

View File

@@ -1,4 +1,11 @@
#!/usr/bin/env python2
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
show-capabilities-by-function
@@ -40,7 +47,7 @@ Example::
- connect TCP socket
...
Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
@@ -54,6 +61,7 @@ import logging
import argparse
import collections
from typing import Dict
from pathlib import Path
import colorama
@@ -138,7 +146,7 @@ def main(argv=None):
capa.main.handle_common_args(args)
try:
taste = get_file_taste(args.sample)
taste = get_file_taste(Path(args.sample))
except IOError as e:
logger.error("%s", str(e))
return -1
@@ -161,8 +169,7 @@ def main(argv=None):
if (args.format == "freeze") or (args.format == FORMAT_AUTO and capa.features.freeze.is_freeze(taste)):
format_ = "freeze"
with open(args.sample, "rb") as f:
extractor: FeatureExtractor = capa.features.freeze.load(f.read())
extractor: FeatureExtractor = capa.features.freeze.load(Path(args.sample).read_bytes())
else:
format_ = args.format
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python2
"""
Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt
@@ -67,8 +67,9 @@ Example::
import os
import sys
import logging
import os.path
import argparse
from typing import Tuple
from pathlib import Path
import capa.main
import capa.rules
@@ -79,9 +80,10 @@ import capa.exceptions
import capa.render.verbose as v
import capa.features.freeze
import capa.features.address
import capa.features.extractors.pefile
from capa.helpers import get_auto_format, log_unsupported_runtime_error
from capa.features.common import FORMAT_AUTO, FORMAT_FREEZE, DYNAMIC_FORMATS, is_global_feature
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor
from capa.features.extractors.base_extractor import FunctionHandle, StaticFeatureExtractor, DynamicFeatureExtractor
logger = logging.getLogger("capa.show-features")
@@ -102,8 +104,12 @@ def main(argv=None):
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
if args.function and args.backend == "pefile":
print("pefile backend does not support extracting function features")
return -1
try:
_ = capa.helpers.get_file_taste(args.sample)
_ = capa.helpers.get_file_taste(Path(args.sample))
except IOError as e:
logger.error("%s", str(e))
return -1
@@ -118,8 +124,7 @@ def main(argv=None):
if format_ == FORMAT_FREEZE:
# this should be moved above the previous if clause after implementing
# feature freeze for the dynamic analysis flavor
with open(args.sample, "rb") as f:
extractor: FeatureExtractor = capa.features.freeze.load(f.read())
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
else:
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try:
@@ -151,7 +156,12 @@ def print_static_analysis(extractor: StaticFeatureExtractor, args):
for feature, addr in extractor.extract_file_features():
print(f"file: {format_address(addr)}: {feature}")
function_handles = tuple(extractor.get_functions())
function_handles: Tuple[FunctionHandle, ...]
if isinstance(extractor, capa.features.extractors.pefile.PefileFeatureExtractor):
# pefile extractor doesn't extract function features
function_handles = ()
else:
function_handles = tuple(extractor.get_functions())
if args.function:
if args.format == "freeze":