Update args.sample type to Path and str vs as_posix comparisons

This commit is contained in:
Aayush Goel
2023-07-09 16:02:28 +05:30
parent e0ed8c6e04
commit 673af45c55
13 changed files with 124 additions and 131 deletions

View File

@@ -52,15 +52,15 @@ def assert_never(value) -> NoReturn:
assert False, f"Unhandled value: {value} ({type(value).__name__})" assert False, f"Unhandled value: {value} ({type(value).__name__})"
def get_format_from_extension(sample: str) -> str: def get_format_from_extension(sample: Path) -> str:
if sample.endswith(EXTENSIONS_SHELLCODE_32): if sample.name.endswith(EXTENSIONS_SHELLCODE_32):
return FORMAT_SC32 return FORMAT_SC32
elif sample.endswith(EXTENSIONS_SHELLCODE_64): elif sample.name.endswith(EXTENSIONS_SHELLCODE_64):
return FORMAT_SC64 return FORMAT_SC64
return FORMAT_UNKNOWN return FORMAT_UNKNOWN
def get_auto_format(path: str) -> str: def get_auto_format(path: Path) -> str:
format_ = get_format(path) format_ = get_format(path)
if format_ == FORMAT_UNKNOWN: if format_ == FORMAT_UNKNOWN:
format_ = get_format_from_extension(path) format_ = get_format_from_extension(path)
@@ -69,17 +69,16 @@ def get_auto_format(path: str) -> str:
return format_ return format_
def get_format(sample: str) -> str: def get_format(sample: Path) -> str:
# imported locally to avoid import cycle # imported locally to avoid import cycle
from capa.features.extractors.common import extract_format from capa.features.extractors.common import extract_format
from capa.features.extractors.dnfile_ import DnfileFeatureExtractor from capa.features.extractors.dnfile_ import DnfileFeatureExtractor
with open(sample, "rb") as f: buf = sample.read_bytes()
buf = f.read()
for feature, _ in extract_format(buf): for feature, _ in extract_format(buf):
if feature == Format(FORMAT_PE): if feature == Format(FORMAT_PE):
dnfile_extractor = DnfileFeatureExtractor(sample) dnfile_extractor = DnfileFeatureExtractor(str(sample))
if dnfile_extractor.is_dotnet_file(): if dnfile_extractor.is_dotnet_file():
feature = Format(FORMAT_DOTNET) feature = Format(FORMAT_DOTNET)

View File

@@ -157,7 +157,7 @@ def collect_metadata(rules: List[Path]):
arch=arch, arch=arch,
os=os, os=os,
extractor="ida", extractor="ida",
rules=tuple(str(r.resolve().absolute()) for r in rules), rules=tuple(r.resolve().absolute().as_posix() for r in rules),
base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()), base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()),
layout=rdoc.Layout( layout=rdoc.Layout(
functions=tuple() functions=tuple()

View File

@@ -364,26 +364,23 @@ def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalon
return False return False
def is_supported_format(sample: str) -> bool: def is_supported_format(sample: Path) -> bool:
""" """
Return if this is a supported file based on magic header values Return if this is a supported file based on magic header values
""" """
with open(sample, "rb") as f: taste = sample.open("rb").read(0x100)
taste = f.read(0x100)
return len(list(capa.features.extractors.common.extract_format(taste))) == 1 return len(list(capa.features.extractors.common.extract_format(taste))) == 1
def is_supported_arch(sample: str) -> bool: def is_supported_arch(sample: Path) -> bool:
with open(sample, "rb") as f: buf = sample.read_bytes()
buf = f.read()
return len(list(capa.features.extractors.common.extract_arch(buf))) == 1 return len(list(capa.features.extractors.common.extract_arch(buf))) == 1
def get_arch(sample: str) -> str: def get_arch(sample: Path) -> str:
with open(sample, "rb") as f: buf = sample.read_bytes()
buf = f.read()
for feature, _ in capa.features.extractors.common.extract_arch(buf): for feature, _ in capa.features.extractors.common.extract_arch(buf):
assert isinstance(feature.value, str) assert isinstance(feature.value, str)
@@ -392,16 +389,14 @@ def get_arch(sample: str) -> str:
return "unknown" return "unknown"
def is_supported_os(sample: str) -> bool: def is_supported_os(sample: Path) -> bool:
with open(sample, "rb") as f: buf = sample.read_bytes()
buf = f.read()
return len(list(capa.features.extractors.common.extract_os(buf))) == 1 return len(list(capa.features.extractors.common.extract_os(buf))) == 1
def get_os(sample: str) -> str: def get_os(sample: Path) -> str:
with open(sample, "rb") as f: buf = sample.read_bytes()
buf = f.read()
for feature, _ in capa.features.extractors.common.extract_os(buf): for feature, _ in capa.features.extractors.common.extract_os(buf):
assert isinstance(feature.value, str) assert isinstance(feature.value, str)
@@ -463,7 +458,7 @@ def get_default_signatures() -> List[Path]:
return ret return ret
def get_workspace(path, format_, sigpaths: List[Path]): def get_workspace(path: Path, format_: str, sigpaths: List[Path]):
""" """
load the program at the given path into a vivisect workspace using the given format. load the program at the given path into a vivisect workspace using the given format.
also apply the given FLIRT signatures. also apply the given FLIRT signatures.
@@ -490,18 +485,18 @@ def get_workspace(path, format_, sigpaths: List[Path]):
raise UnsupportedFormatError() raise UnsupportedFormatError()
# don't analyze, so that we can add our Flirt function analyzer first. # don't analyze, so that we can add our Flirt function analyzer first.
vw = viv_utils.getWorkspace(path, analyze=False, should_save=False) vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
elif format_ in {FORMAT_PE, FORMAT_ELF}: elif format_ in {FORMAT_PE, FORMAT_ELF}:
vw = viv_utils.getWorkspace(path, analyze=False, should_save=False) vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
elif format_ == FORMAT_SC32: elif format_ == FORMAT_SC32:
# these are not analyzed nor saved. # these are not analyzed nor saved.
vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="i386", analyze=False) vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="i386", analyze=False)
elif format_ == FORMAT_SC64: elif format_ == FORMAT_SC64:
vw = viv_utils.getShellcodeWorkspaceFromFile(path, arch="amd64", analyze=False) vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="amd64", analyze=False)
else: else:
raise ValueError("unexpected format: " + format_) raise ValueError("unexpected format: " + format_)
viv_utils.flirt.register_flirt_signature_analyzers(vw, [s.as_posix() for s in sigpaths]) viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths])
vw.analyze() vw.analyze()
@@ -511,7 +506,7 @@ def get_workspace(path, format_, sigpaths: List[Path]):
# TODO get_extractors -> List[FeatureExtractor]? # TODO get_extractors -> List[FeatureExtractor]?
def get_extractor( def get_extractor(
path: str, path: Path,
format_: str, format_: str,
os_: str, os_: str,
backend: str, backend: str,
@@ -538,7 +533,7 @@ def get_extractor(
if format_ == FORMAT_DOTNET: if format_ == FORMAT_DOTNET:
import capa.features.extractors.dnfile.extractor import capa.features.extractors.dnfile.extractor
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path) return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(str(path))
elif backend == BACKEND_BINJA: elif backend == BACKEND_BINJA:
from capa.features.extractors.binja.find_binja_api import find_binja_path from capa.features.extractors.binja.find_binja_api import find_binja_path
@@ -561,7 +556,7 @@ def get_extractor(
import capa.features.extractors.binja.extractor import capa.features.extractors.binja.extractor
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
bv: BinaryView = BinaryViewType.get_view_of_file(path) bv: BinaryView = BinaryViewType.get_view_of_file(str(path))
if bv is None: if bv is None:
raise RuntimeError(f"Binary Ninja cannot open file {path}") raise RuntimeError(f"Binary Ninja cannot open file {path}")
@@ -587,18 +582,18 @@ def get_extractor(
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_) return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_)
def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: def get_file_extractors(sample: Path, format_: str) -> List[FeatureExtractor]:
file_extractors: List[FeatureExtractor] = list() file_extractors: List[FeatureExtractor] = list()
if format_ == FORMAT_PE: if format_ == FORMAT_PE:
file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(sample)) file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(str(sample)))
elif format_ == FORMAT_DOTNET: elif format_ == FORMAT_DOTNET:
file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(sample)) file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(str(sample)))
file_extractors.append(capa.features.extractors.dnfile_.DnfileFeatureExtractor(sample)) file_extractors.append(capa.features.extractors.dnfile_.DnfileFeatureExtractor(str(sample)))
elif format_ == capa.features.extractors.common.FORMAT_ELF: elif format_ == capa.features.extractors.common.FORMAT_ELF:
file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample)) file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(str(sample)))
return file_extractors return file_extractors
@@ -733,7 +728,7 @@ def get_signatures(sigs_path: Path) -> List[Path]:
def collect_metadata( def collect_metadata(
argv: List[str], argv: List[str],
sample_path: str, sample_path: Path,
format_: str, format_: str,
os_: str, os_: str,
rules_path: List[Path], rules_path: List[Path],
@@ -743,7 +738,7 @@ def collect_metadata(
sha1 = hashlib.sha1() sha1 = hashlib.sha1()
sha256 = hashlib.sha256() sha256 = hashlib.sha256()
buf = Path(sample_path).read_bytes() buf = sample_path.read_bytes()
md5.update(buf) md5.update(buf)
sha1.update(buf) sha1.update(buf)
@@ -762,7 +757,7 @@ def collect_metadata(
md5=md5.hexdigest(), md5=md5.hexdigest(),
sha1=sha1.hexdigest(), sha1=sha1.hexdigest(),
sha256=sha256.hexdigest(), sha256=sha256.hexdigest(),
path=Path(sample_path).resolve().as_posix(), path=sample_path.resolve().absolute().as_posix(),
), ),
analysis=rdoc.Analysis( analysis=rdoc.Analysis(
format=format_, format=format_,
@@ -1008,6 +1003,9 @@ def handle_common_args(args):
else: else:
raise RuntimeError("unexpected --color value: " + args.color) raise RuntimeError("unexpected --color value: " + args.color)
if hasattr(args, "sample"):
args.sample = Path(args.sample)
if hasattr(args, "rules"): if hasattr(args, "rules"):
rules_paths: List[Path] = [] rules_paths: List[Path] = []
@@ -1116,7 +1114,7 @@ def main(argv=None):
return ret return ret
try: try:
_ = get_file_taste(Path(args.sample)) _ = get_file_taste(args.sample)
except IOError as e: except IOError as e:
# per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we # per our research there's not a programmatic way to render the IOError with non-ASCII filename unless we
# handle the IOError separately and reach into the args # handle the IOError separately and reach into the args

View File

@@ -24,15 +24,10 @@ $ protoc.exe --python_out=. --mypy_out=. <path_to_proto> (e.g. capa/render/proto
Alternatively, --pyi_out=. can be used to generate a Python Interface file that supports development Alternatively, --pyi_out=. can be used to generate a Python Interface file that supports development
""" """
import sys
import json
import argparse
import datetime import datetime
from typing import Any, Dict, Union from typing import Any, Dict, Union
from pathlib import Path
import google.protobuf.json_format import google.protobuf.json_format
from google.protobuf.json_format import MessageToJson
import capa.rules import capa.rules
import capa.features.freeze as frz import capa.features.freeze as frz

View File

@@ -170,7 +170,7 @@ def main(argv=None):
samples = [] samples = []
for file in Path(args.input).rglob("*"): for file in Path(args.input).rglob("*"):
samples.append(file.as_posix()) samples.append(file)
def pmap(f, args, parallelism=multiprocessing.cpu_count()): def pmap(f, args, parallelism=multiprocessing.cpu_count()):
"""apply the given function f to the given args using subprocesses""" """apply the given function f to the given args using subprocesses"""
@@ -205,7 +205,7 @@ def main(argv=None):
if result["status"] == "error": if result["status"] == "error":
logger.warning(result["error"]) logger.warning(result["error"])
elif result["status"] == "ok": elif result["status"] == "ok":
results[result["path"]] = rd.ResultDocument.parse_obj(result["ok"]).json(exclude_none=True) results[result["path"].as_posix()] = rd.ResultDocument.parse_obj(result["ok"]).json(exclude_none=True)
else: else:
raise ValueError(f"unexpected status: {result['status']}") raise ValueError(f"unexpected status: {result['status']}")

View File

@@ -161,7 +161,7 @@ def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]:
# ==== render dictionary helpers # ==== render dictionary helpers
def capa_details(rules_path, file_path, output_format="dictionary"): def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"):
# load rules from disk # load rules from disk
rules = capa.main.get_rules([rules_path]) rules = capa.main.get_rules([rules_path])
@@ -210,5 +210,5 @@ if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
if args.rules != RULES_PATH: if args.rules != RULES_PATH:
args.rules = Path(args.rules) args.rules = Path(args.rules)
print(capa_details(args.rules, args.file, args.output)) print(capa_details(args.rules, Path(args.file), args.output))
sys.exit(0) sys.exit(0)

View File

@@ -51,13 +51,13 @@ def load_analysis(bv):
binaryninja.log_info(f"dirname: {dirname}\nshortname: {shortname}\n") binaryninja.log_info(f"dirname: {dirname}\nshortname: {shortname}\n")
js_path = path = dirname / (shortname + ".js") js_path = path = dirname / (shortname + ".js")
json_path = dirname / (shortname + ".json") json_path = dirname / (shortname + ".json")
if os.access(js_path.as_posix(), os.R_OK): if os.access(str(js_path), os.R_OK):
path = js_path path = js_path
elif os.access(json_path.as_posix(), os.R_OK): elif os.access(str(json_path), os.R_OK):
path = json_path path = json_path
else: else:
path = binaryninja.interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)") path = binaryninja.interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)")
if not path or not os.access(path.as_posix(), os.R_OK): if not path or not os.access(str(path), os.R_OK):
binaryninja.log_error("Invalid filename.") binaryninja.log_error("Invalid filename.")
return 0 return 0
binaryninja.log_info(f"Using capa file {path}") binaryninja.log_info(f"Using capa file {path}")

View File

@@ -296,14 +296,14 @@ DEFAULT_SIGNATURES = capa.main.get_default_signatures()
def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]: def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]:
nice_path = path.resolve().absolute().as_posix() nice_path = path.resolve().absolute()
if path in ctx.capabilities_by_sample: if path in ctx.capabilities_by_sample:
logger.debug("found cached results: %s: %d capabilities", nice_path, len(ctx.capabilities_by_sample[path])) logger.debug("found cached results: %s: %d capabilities", nice_path, len(ctx.capabilities_by_sample[path]))
return ctx.capabilities_by_sample[path] return ctx.capabilities_by_sample[path]
if nice_path.endswith(capa.helpers.EXTENSIONS_SHELLCODE_32): if nice_path.name.endswith(capa.helpers.EXTENSIONS_SHELLCODE_32):
format_ = "sc32" format_ = "sc32"
elif nice_path.endswith(capa.helpers.EXTENSIONS_SHELLCODE_64): elif nice_path.name.endswith(capa.helpers.EXTENSIONS_SHELLCODE_64):
format_ = "sc64" format_ = "sc64"
else: else:
format_ = capa.main.get_auto_format(nice_path) format_ = capa.main.get_auto_format(nice_path)
@@ -356,7 +356,7 @@ class DoesntMatchExample(Lint):
try: try:
capabilities = get_sample_capabilities(ctx, path) capabilities = get_sample_capabilities(ctx, path)
except Exception as e: except Exception as e:
logger.error("failed to extract capabilities: %s %s %s", rule.name, str(path), e, exc_info=True) logger.error("failed to extract capabilities: %s %s %s", rule.name, path, e, exc_info=True)
return True return True
if rule.name not in capabilities: if rule.name not in capabilities:
@@ -917,12 +917,12 @@ def main(argv=None):
if argv is None: if argv is None:
argv = sys.argv[1:] argv = sys.argv[1:]
samples_path = (Path(__file__).resolve().parent.parent / "tests" / "data").as_posix() default_samples_path = str(Path(__file__).resolve().parent.parent / "tests" / "data")
parser = argparse.ArgumentParser(description="Lint capa rules.") parser = argparse.ArgumentParser(description="Lint capa rules.")
capa.main.install_common_args(parser, wanted={"tag"}) capa.main.install_common_args(parser, wanted={"tag"})
parser.add_argument("rules", type=str, action="append", help="Path to rules") parser.add_argument("rules", type=str, action="append", help="Path to rules")
parser.add_argument("--samples", type=str, default=samples_path, help="Path to samples") parser.add_argument("--samples", type=str, default=default_samples_path, help="Path to samples")
parser.add_argument( parser.add_argument(
"--thorough", "--thorough",
action="store_true", action="store_true",
@@ -953,12 +953,12 @@ def main(argv=None):
return -1 return -1
logger.info("collecting potentially referenced samples") logger.info("collecting potentially referenced samples")
samplePath = Path(args.samples) samples_path = Path(args.samples)
if not samplePath.exists(): if not samples_path.exists():
logger.error("samples path %s does not exist", samplePath) logger.error("samples path %s does not exist", Path(samples_path))
return -1 return -1
samples = collect_samples(samplePath) samples = collect_samples(Path(samples_path))
ctx = Context(samples=samples, rules=rules, is_thorough=args.thorough) ctx = Context(samples=samples, rules=rules, is_thorough=args.thorough)

View File

@@ -187,7 +187,7 @@ if __name__ == "__main__":
"--output", "--output",
"-o", "-o",
type=str, type=str,
default=(Path(__file__).resolve().parent / "linter-data.json").as_posix(), default=str(Path(__file__).resolve().parent / "linter-data.json"),
help="Path to output file (lint.py will be looking for linter-data.json)", help="Path to output file (lint.py will be looking for linter-data.json)",
) )
main(parser.parse_args(args=argv[1:])) main(parser.parse_args(args=argv[1:]))

View File

@@ -89,7 +89,7 @@ def xfail(condition, reason=None):
# need to limit cache size so GitHub Actions doesn't run out of memory, see #545 # need to limit cache size so GitHub Actions doesn't run out of memory, see #545
@lru_cache(maxsize=1) @lru_cache(maxsize=1)
def get_viv_extractor(path): def get_viv_extractor(path: Path):
import capa.main import capa.main
import capa.features.extractors.viv.extractor import capa.features.extractors.viv.extractor
@@ -101,9 +101,9 @@ def get_viv_extractor(path):
CD.parent / "sigs" / "3_flare_common_libs.sig", CD.parent / "sigs" / "3_flare_common_libs.sig",
] ]
if "raw32" in path: if "raw32" in path.name:
vw = capa.main.get_workspace(path, "sc32", sigpaths=sigpaths) vw = capa.main.get_workspace(path, "sc32", sigpaths=sigpaths)
elif "raw64" in path: elif "raw64" in path.name:
vw = capa.main.get_workspace(path, "sc64", sigpaths=sigpaths) vw = capa.main.get_workspace(path, "sc64", sigpaths=sigpaths)
else: else:
vw = capa.main.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths) vw = capa.main.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths)
@@ -113,14 +113,14 @@ def get_viv_extractor(path):
return extractor return extractor
def fixup_viv(path, extractor): def fixup_viv(path: Path, extractor):
""" """
vivisect fixups to overcome differences between backends vivisect fixups to overcome differences between backends
""" """
if "3b13b" in path: if "3b13b" in path.name:
# vivisect only recognizes calling thunk function at 0x10001573 # vivisect only recognizes calling thunk function at 0x10001573
extractor.vw.makeFunction(0x10006860) extractor.vw.makeFunction(0x10006860)
if "294b8d" in path: if "294b8d" in path.name:
# see vivisect/#561 # see vivisect/#561
extractor.vw.makeFunction(0x404970) extractor.vw.makeFunction(0x404970)
@@ -129,10 +129,10 @@ def fixup_viv(path, extractor):
def get_pefile_extractor(path): def get_pefile_extractor(path):
import capa.features.extractors.pefile import capa.features.extractors.pefile
extractor = capa.features.extractors.pefile.PefileFeatureExtractor(path) extractor = capa.features.extractors.pefile.PefileFeatureExtractor(str(path))
# overload the extractor so that the fixture exposes `extractor.path` # overload the extractor so that the fixture exposes `extractor.path`
setattr(extractor, "path", path) setattr(extractor, "path", str(path))
return extractor return extractor
@@ -140,10 +140,10 @@ def get_pefile_extractor(path):
def get_dotnetfile_extractor(path): def get_dotnetfile_extractor(path):
import capa.features.extractors.dotnetfile import capa.features.extractors.dotnetfile
extractor = capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(path) extractor = capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(str(path))
# overload the extractor so that the fixture exposes `extractor.path` # overload the extractor so that the fixture exposes `extractor.path`
setattr(extractor, "path", path) setattr(extractor, "path", str(path))
return extractor return extractor
@@ -152,10 +152,10 @@ def get_dotnetfile_extractor(path):
def get_dnfile_extractor(path): def get_dnfile_extractor(path):
import capa.features.extractors.dnfile.extractor import capa.features.extractors.dnfile.extractor
extractor = capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path) extractor = capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(str(path))
# overload the extractor so that the fixture exposes `extractor.path` # overload the extractor so that the fixture exposes `extractor.path`
setattr(extractor, "path", path) setattr(extractor, "path", str(path))
return extractor return extractor
@@ -234,81 +234,81 @@ def extract_instruction_features(extractor, fh, bbh, ih) -> Dict[Feature, Set[Ad
# note: to reduce the testing time it's recommended to reuse already existing test samples, if possible # note: to reduce the testing time it's recommended to reuse already existing test samples, if possible
def get_data_path_by_name(name): def get_data_path_by_name(name):
if name == "mimikatz": if name == "mimikatz":
return (CD / "data" / "mimikatz.exe_").as_posix() return CD / "data" / "mimikatz.exe_"
elif name == "kernel32": elif name == "kernel32":
return (CD / "data" / "kernel32.dll_").as_posix() return CD / "data" / "kernel32.dll_"
elif name == "kernel32-64": elif name == "kernel32-64":
return (CD / "data" / "kernel32-64.dll_").as_posix() return CD / "data" / "kernel32-64.dll_"
elif name == "pma01-01": elif name == "pma01-01":
return (CD / "data" / "Practical Malware Analysis Lab 01-01.dll_").as_posix() return CD / "data" / "Practical Malware Analysis Lab 01-01.dll_"
elif name == "pma01-01-rd": elif name == "pma01-01-rd":
return (CD / "data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json").as_posix() return CD / "data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json"
elif name == "pma12-04": elif name == "pma12-04":
return (CD / "data" / "Practical Malware Analysis Lab 12-04.exe_").as_posix() return CD / "data" / "Practical Malware Analysis Lab 12-04.exe_"
elif name == "pma16-01": elif name == "pma16-01":
return (CD / "data" / "Practical Malware Analysis Lab 16-01.exe_").as_posix() return CD / "data" / "Practical Malware Analysis Lab 16-01.exe_"
elif name == "pma21-01": elif name == "pma21-01":
return (CD / "data" / "Practical Malware Analysis Lab 21-01.exe_").as_posix() return CD / "data" / "Practical Malware Analysis Lab 21-01.exe_"
elif name == "al-khaser x86": elif name == "al-khaser x86":
return (CD / "data" / "al-khaser_x86.exe_").as_posix() return CD / "data" / "al-khaser_x86.exe_"
elif name == "al-khaser x64": elif name == "al-khaser x64":
return (CD / "data" / "al-khaser_x64.exe_").as_posix() return CD / "data" / "al-khaser_x64.exe_"
elif name.startswith("39c05"): elif name.startswith("39c05"):
return (CD / "data" / "39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.dll_").as_posix() return CD / "data" / "39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.dll_"
elif name.startswith("499c2"): elif name.startswith("499c2"):
return (CD / "data" / "499c2a85f6e8142c3f48d4251c9c7cd6.raw32").as_posix() return CD / "data" / "499c2a85f6e8142c3f48d4251c9c7cd6.raw32"
elif name.startswith("9324d"): elif name.startswith("9324d"):
return (CD / "data" / "9324d1a8ae37a36ae560c37448c9705a.exe_").as_posix() return CD / "data" / "9324d1a8ae37a36ae560c37448c9705a.exe_"
elif name.startswith("a1982"): elif name.startswith("a1982"):
return (CD / "data" / "a198216798ca38f280dc413f8c57f2c2.exe_").as_posix() return CD / "data" / "a198216798ca38f280dc413f8c57f2c2.exe_"
elif name.startswith("a933a"): elif name.startswith("a933a"):
return (CD / "data" / "a933a1a402775cfa94b6bee0963f4b46.dll_").as_posix() return CD / "data" / "a933a1a402775cfa94b6bee0963f4b46.dll_"
elif name.startswith("bfb9b"): elif name.startswith("bfb9b"):
return (CD / "data" / "bfb9b5391a13d0afd787e87ab90f14f5.dll_").as_posix() return CD / "data" / "bfb9b5391a13d0afd787e87ab90f14f5.dll_"
elif name.startswith("c9188"): elif name.startswith("c9188"):
return (CD / "data" / "c91887d861d9bd4a5872249b641bc9f9.exe_").as_posix() return CD / "data" / "c91887d861d9bd4a5872249b641bc9f9.exe_"
elif name.startswith("64d9f"): elif name.startswith("64d9f"):
return (CD / "data" / "64d9f7d96b99467f36e22fada623c3bb.dll_").as_posix() return CD / "data" / "64d9f7d96b99467f36e22fada623c3bb.dll_"
elif name.startswith("82bf6"): elif name.startswith("82bf6"):
return (CD / "data" / "82BF6347ACF15E5D883715DC289D8A2B.exe_").as_posix() return CD / "data" / "82BF6347ACF15E5D883715DC289D8A2B.exe_"
elif name.startswith("pingtaest"): elif name.startswith("pingtaest"):
return (CD / "data" / "ping_täst.exe_").as_posix() return CD / "data" / "ping_täst.exe_"
elif name.startswith("77329"): elif name.startswith("77329"):
return (CD / "data" / "773290480d5445f11d3dc1b800728966.exe_").as_posix() return CD / "data" / "773290480d5445f11d3dc1b800728966.exe_"
elif name.startswith("3b13b"): elif name.startswith("3b13b"):
return (CD / "data" / "3b13b6f1d7cd14dc4a097a12e2e505c0a4cff495262261e2bfc991df238b9b04.dll_").as_posix() return CD / "data" / "3b13b6f1d7cd14dc4a097a12e2e505c0a4cff495262261e2bfc991df238b9b04.dll_"
elif name == "7351f.elf": elif name == "7351f.elf":
return (CD / "data" / "7351f8a40c5450557b24622417fc478d.elf_").as_posix() return CD / "data" / "7351f8a40c5450557b24622417fc478d.elf_"
elif name.startswith("79abd"): elif name.startswith("79abd"):
return (CD / "data" / "79abd17391adc6251ecdc58d13d76baf.dll_").as_posix() return CD / "data" / "79abd17391adc6251ecdc58d13d76baf.dll_"
elif name.startswith("946a9"): elif name.startswith("946a9"):
return (CD / "data" / "946a99f36a46d335dec080d9a4371940.dll_").as_posix() return CD / "data" / "946a99f36a46d335dec080d9a4371940.dll_"
elif name.startswith("2f7f5f"): elif name.startswith("2f7f5f"):
return (CD / "data" / "2f7f5fb5de175e770d7eae87666f9831.elf_").as_posix() return CD / "data" / "2f7f5fb5de175e770d7eae87666f9831.elf_"
elif name.startswith("b9f5b"): elif name.startswith("b9f5b"):
return (CD / "data" / "b9f5bd514485fb06da39beff051b9fdc.exe_").as_posix() return CD / "data" / "b9f5bd514485fb06da39beff051b9fdc.exe_"
elif name.startswith("mixed-mode-64"): elif name.startswith("mixed-mode-64"):
return (DNFILE_TESTFILES / "mixed-mode" / "ModuleCode" / "bin" / "ModuleCode_amd64.exe").as_posix() return DNFILE_TESTFILES / "mixed-mode" / "ModuleCode" / "bin" / "ModuleCode_amd64.exe"
elif name.startswith("hello-world"): elif name.startswith("hello-world"):
return (DNFILE_TESTFILES / "hello-world" / "hello-world.exe").as_posix() return DNFILE_TESTFILES / "hello-world" / "hello-world.exe"
elif name.startswith("_1c444"): elif name.startswith("_1c444"):
return (DOTNET_DIR / "1c444ebeba24dcba8628b7dfe5fec7c6.exe_").as_posix() return DOTNET_DIR / "1c444ebeba24dcba8628b7dfe5fec7c6.exe_"
elif name.startswith("_387f15"): elif name.startswith("_387f15"):
return (DOTNET_DIR / "387f15043f0198fd3a637b0758c2b6dde9ead795c3ed70803426fc355731b173.dll_").as_posix() return DOTNET_DIR / "387f15043f0198fd3a637b0758c2b6dde9ead795c3ed70803426fc355731b173.dll_"
elif name.startswith("_692f"): elif name.startswith("_692f"):
return (DOTNET_DIR / "692f7fd6d198e804d6af98eb9e390d61.exe_").as_posix() return DOTNET_DIR / "692f7fd6d198e804d6af98eb9e390d61.exe_"
elif name.startswith("_0953c"): elif name.startswith("_0953c"):
return (CD / "data" / "0953cc3b77ed2974b09e3a00708f88de931d681e2d0cb64afbaf714610beabe6.exe_").as_posix() return CD / "data" / "0953cc3b77ed2974b09e3a00708f88de931d681e2d0cb64afbaf714610beabe6.exe_"
elif name.startswith("_039a6"): elif name.startswith("_039a6"):
return (CD / "data" / "039a6336d0802a2255669e6867a5679c7eb83313dbc61fb1c7232147379bd304.exe_").as_posix() return CD / "data" / "039a6336d0802a2255669e6867a5679c7eb83313dbc61fb1c7232147379bd304.exe_"
elif name.startswith("b5f052"): elif name.startswith("b5f052"):
return (CD / "data" / "b5f0524e69b3a3cf636c7ac366ca57bf5e3a8fdc8a9f01caf196c611a7918a87.elf_").as_posix() return CD / "data" / "b5f0524e69b3a3cf636c7ac366ca57bf5e3a8fdc8a9f01caf196c611a7918a87.elf_"
elif name.startswith("bf7a9c"): elif name.startswith("bf7a9c"):
return (CD / "data" / "bf7a9c8bdfa6d47e01ad2b056264acc3fd90cf43fe0ed8deec93ab46b47d76cb.elf_").as_posix() return CD / "data" / "bf7a9c8bdfa6d47e01ad2b056264acc3fd90cf43fe0ed8deec93ab46b47d76cb.elf_"
elif name.startswith("294b8d"): elif name.startswith("294b8d"):
return (CD / "data" / "294b8db1f2702b60fb2e42fdc50c2cee6a5046112da9a5703a548a4fa50477bc.elf_").as_posix() return CD / "data" / "294b8db1f2702b60fb2e42fdc50c2cee6a5046112da9a5703a548a4fa50477bc.elf_"
elif name.startswith("2bf18d"): elif name.startswith("2bf18d"):
return (CD / "data" / "2bf18d0403677378adad9001b1243211.elf_").as_posix() return CD / "data" / "2bf18d0403677378adad9001b1243211.elf_"
else: else:
raise ValueError(f"unexpected sample fixture: {name}") raise ValueError(f"unexpected sample fixture: {name}")
@@ -1048,6 +1048,7 @@ FEATURE_COUNT_TESTS_DOTNET = [
def do_test_feature_presence(get_extractor, sample, scope, feature, expected): def do_test_feature_presence(get_extractor, sample, scope, feature, expected):
print(sample)
extractor = get_extractor(sample) extractor = get_extractor(sample)
features = scope(extractor) features = scope(extractor)
if expected: if expected:
@@ -1064,10 +1065,10 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected):
assert len(features[feature]) == expected, msg assert len(features[feature]) == expected, msg
def get_extractor(path): def get_extractor(path: Path):
extractor = get_viv_extractor(path) extractor = get_viv_extractor(path)
# overload the extractor so that the fixture exposes `extractor.path` # overload the extractor so that the fixture exposes `extractor.path`
setattr(extractor, "path", path) setattr(extractor, "path", str(path))
return extractor return extractor
@@ -1182,29 +1183,29 @@ def get_result_doc(path):
@pytest.fixture @pytest.fixture
def pma0101_rd(): def pma0101_rd():
return get_result_doc((CD / "data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json").as_posix()) return get_result_doc(CD / "data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json")
@pytest.fixture @pytest.fixture
def dotnet_1c444e_rd(): def dotnet_1c444e_rd():
return get_result_doc((CD / "data" / "rd" / "1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json").as_posix()) return get_result_doc(CD / "data" / "rd" / "1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json")
@pytest.fixture @pytest.fixture
def a3f3bbc_rd(): def a3f3bbc_rd():
return get_result_doc((CD / "data" / "rd" / "3f3bbcf8fd90bdcdcdc5494314ed4225.exe_.json").as_posix()) return get_result_doc(CD / "data" / "rd" / "3f3bbcf8fd90bdcdcdc5494314ed4225.exe_.json")
@pytest.fixture @pytest.fixture
def al_khaserx86_rd(): def al_khaserx86_rd():
return get_result_doc((CD / "data" / "rd" / "al-khaser_x86.exe_.json").as_posix()) return get_result_doc(CD / "data" / "rd" / "al-khaser_x86.exe_.json")
@pytest.fixture @pytest.fixture
def al_khaserx64_rd(): def al_khaserx64_rd():
return get_result_doc((CD / "data" / "rd" / "al-khaser_x64.exe_.json").as_posix()) return get_result_doc(CD / "data" / "rd" / "al-khaser_x64.exe_.json")
@pytest.fixture @pytest.fixture
def a076114_rd(): def a076114_rd():
return get_result_doc((CD / "data" / "rd" / "0761142efbda6c4b1e801223de723578.dll_.json").as_posix()) return get_result_doc(CD / "data" / "rd" / "0761142efbda6c4b1e801223de723578.dll_.json")

View File

@@ -426,7 +426,7 @@ def test_not_render_rules_also_matched(z9324d_extractor, capsys):
def test_json_meta(capsys): def test_json_meta(capsys):
path = fixtures.get_data_path_by_name("pma01-01") path = str(fixtures.get_data_path_by_name("pma01-01"))
assert capa.main.main([path, "-j"]) == 0 assert capa.main.main([path, "-j"]) == 0
std = capsys.readouterr() std = capsys.readouterr()
std_json = json.loads(std.out) std_json = json.loads(std.out)
@@ -470,7 +470,7 @@ def test_main_dotnet4(_039a6_dotnetfile_extractor):
def test_main_rd(): def test_main_rd():
path = fixtures.get_data_path_by_name("pma01-01-rd") path = str(fixtures.get_data_path_by_name("pma01-01-rd"))
assert capa.main.main([path, "-vv"]) == 0 assert capa.main.main([path, "-vv"]) == 0
assert capa.main.main([path, "-v"]) == 0 assert capa.main.main([path, "-v"]) == 0
assert capa.main.main([path, "-j"]) == 0 assert capa.main.main([path, "-j"]) == 0

View File

@@ -25,7 +25,7 @@ def test_pefile_features(sample, scope, feature, expected):
if isinstance(feature, capa.features.file.FunctionName): if isinstance(feature, capa.features.file.FunctionName):
pytest.xfail("pefile doesn't extract function names") pytest.xfail("pefile doesn't extract function names")
if ".elf" in sample: if ".elf" in sample.name:
pytest.xfail("pefile doesn't handle ELF files") pytest.xfail("pefile doesn't handle ELF files")
fixtures.do_test_feature_presence(fixtures.get_pefile_extractor, sample, scope, feature, expected) fixtures.do_test_feature_presence(fixtures.get_pefile_extractor, sample, scope, feature, expected)

View File

@@ -19,19 +19,19 @@ CD = Path(__file__).resolve().parent
def get_script_path(s: str): def get_script_path(s: str):
return (CD / ".." / "scripts" / s).as_posix() return str(CD / ".." / "scripts" / s)
def get_file_path(): def get_file_path():
return (CD / "data" / "9324d1a8ae37a36ae560c37448c9705a.exe_").as_posix() return str(CD / "data" / "9324d1a8ae37a36ae560c37448c9705a.exe_")
def get_rules_path(): def get_rules_path():
return (CD / ".." / "rules").as_posix() return str(CD / ".." / "rules")
def get_rule_path(): def get_rule_path():
return (Path(get_rules_path()) / "lib" / "allocate-memory.yml").as_posix() return str(Path(get_rules_path()) / "lib" / "allocate-memory.yml")
@pytest.mark.parametrize( @pytest.mark.parametrize(
@@ -63,7 +63,7 @@ def test_bulk_process(tmp_path):
dest_file.write_bytes(source_file.read_bytes()) dest_file.write_bytes(source_file.read_bytes())
p = run_program(get_script_path("bulk-process.py"), [t.parent.as_posix()]) p = run_program(get_script_path("bulk-process.py"), [str(t.parent)])
assert p.returncode == 0 assert p.returncode == 0