diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b294eaf4..acd7d807 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -15,7 +15,7 @@ jobs: fail-fast: true matrix: include: - - os: ubuntu-18.04 + - os: ubuntu-20.04 # use old linux so that the shared library versioning is more portable artifact_name: capa asset_name: linux @@ -36,7 +36,7 @@ jobs: uses: actions/setup-python@d27e3f3d7c64b4bbf8e4abfb9b63b83e846e0435 # v4.5.0 with: python-version: 3.8 - - if: matrix.os == 'ubuntu-18.04' + - if: matrix.os == 'ubuntu-20.04' run: sudo apt-get install -y libyaml-dev - name: Upgrade pip, setuptools run: python -m pip install --upgrade pip setuptools @@ -65,10 +65,7 @@ jobs: matrix: include: # OSs not already tested above - - os: ubuntu-18.04 - artifact_name: capa - asset_name: linux - - os: ubuntu-20.04 + - os: ubuntu-22.04 artifact_name: capa asset_name: linux - os: windows-2022 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 6d270aaa..a675ed58 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -76,6 +76,8 @@ jobs: python-version: "3.8" - os: ubuntu-20.04 python-version: "3.9" + - os: ubuntu-20.04 + python-version: "3.10" steps: - name: Checkout capa with submodules uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c # v3.3.0 @@ -96,6 +98,7 @@ jobs: binja-tests: name: Binary Ninja tests for ${{ matrix.python-version }} on ${{ matrix.os }} runs-on: ubuntu-20.04 + needs: [code_style, rule_linter] strategy: fail-fast: false matrix: diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a92a5a9..31254ffb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,13 +4,14 @@ ### New Features - add protobuf format for result documents #1219 @williballenthin @mr-tz - - extractor: add Binary Ninja feature extractor @xusheng6 - new cli flag `--os` to override auto-detected operating system for a sample @captainGeech42 +- Change colour/highlight to "cyan" instead of "blue" for easy noticing.#1384 @ggold7046 +- add new format to parse output json back to capa #1396 @ooprathamm ### Breaking Changes -### New Rules (22) +### New Rules (26) - persistence/scheduled-tasks/schedule-task-via-at joren485 - data-manipulation/prng/generate-random-numbers-via-rtlgenrandom william.ballenthin@mandiant.com @@ -34,9 +35,14 @@ - nursery/set-web-proxy-in-dotnet michael.hunhoff@mandiant.com - nursery/check-for-windows-sandbox-via-subdirectory echernofsky@google.com - nursery/enumerate-pe-sections-in-dotnet @mr-tz +- nursery/destroy-software-breakpoint-capability echernofsky@google.com +- nursery/send-data-to-internet michael.hunhoff@mandiant.com +- nursery/compiled-with-cx_freeze @mr-tz +- nursery/contain-a-thread-local-storage-tls-section-in-dotnet michael.hunhoff@mandiant.com - ### Bug Fixes +- extractor: removed '.dynsym' as the library name for ELF imports #1318 @stevemk14ebr - extractor: fix vivisect loop detection corner case #1310 @mr-tz - match: extend OS characteristic to match OS_ANY to all supported OSes #1324 @mike-hunhoff - extractor: fix IDA and vivisect string and bytes features overlap and tests #1327 #1336 @xusheng6 diff --git a/README.md b/README.md index 996467b2..0778cc2b 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/flare-capa)](https://pypi.org/project/flare-capa) [![Last release](https://img.shields.io/github/v/release/mandiant/capa)](https://github.com/mandiant/capa/releases) -[![Number of rules](https://img.shields.io/badge/rules-789-blue.svg)](https://github.com/mandiant/capa-rules) +[![Number of rules](https://img.shields.io/badge/rules-794-blue.svg)](https://github.com/mandiant/capa-rules) [![CI status](https://github.com/mandiant/capa/workflows/CI/badge.svg)](https://github.com/mandiant/capa/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster) [![Downloads](https://img.shields.io/github/downloads/mandiant/capa/total)](https://github.com/mandiant/capa/releases) [![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)](LICENSE.txt) diff --git a/capa/features/common.py b/capa/features/common.py index 062c27fa..5060ebaa 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -450,6 +450,7 @@ FORMAT_AUTO = "auto" FORMAT_SC32 = "sc32" FORMAT_SC64 = "sc64" FORMAT_FREEZE = "freeze" +FORMAT_RESULT = "result" FORMAT_UNKNOWN = "unknown" diff --git a/capa/features/extractors/binja/file.py b/capa/features/extractors/binja/file.py index 8469a808..9700a8c1 100644 --- a/capa/features/extractors/binja/file.py +++ b/capa/features/extractors/binja/file.py @@ -103,7 +103,7 @@ def extract_file_import_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address ordinal = sym.ordinal if ordinal != 0 and (lib_name != ""): - ordinal_name = "#%d" % (ordinal) + ordinal_name = f"#{ordinal}" for name in capa.features.extractors.helpers.generate_symbols(lib_name, ordinal_name): yield Import(name), addr @@ -147,7 +147,7 @@ def extract_file_format(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]: # no file type to return when processing a binary file, but we want to continue processing return else: - raise NotImplementedError("unexpected file format: %d" % view_type) + raise NotImplementedError(f"unexpected file format: {view_type}") def extract_features(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]: diff --git a/capa/features/extractors/binja/find_binja_api.py b/capa/features/extractors/binja/find_binja_api.py index 7cb78d7a..6356c074 100644 --- a/capa/features/extractors/binja/find_binja_api.py +++ b/capa/features/extractors/binja/find_binja_api.py @@ -26,7 +26,7 @@ if spec is not None: def find_binja_path() -> str: - raw_output = subprocess.check_output(["python", "-c", "%s" % code]).decode("ascii").strip() + raw_output = subprocess.check_output(["python", "-c", code]).decode("ascii").strip() return bytes.fromhex(raw_output).decode("utf8") diff --git a/capa/features/extractors/common.py b/capa/features/extractors/common.py index 3c1145ce..6beaa72d 100644 --- a/capa/features/extractors/common.py +++ b/capa/features/extractors/common.py @@ -12,11 +12,14 @@ import capa.features.extractors.pefile import capa.features.extractors.strings from capa.features.common import ( OS, + OS_ANY, OS_AUTO, + ARCH_ANY, FORMAT_PE, FORMAT_ELF, OS_WINDOWS, FORMAT_FREEZE, + FORMAT_RESULT, Arch, Format, String, @@ -27,6 +30,11 @@ from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress logger = logging.getLogger(__name__) +# match strings for formats +MATCH_PE = b"MZ" +MATCH_ELF = b"\x7fELF" +MATCH_RESULT = b'{"meta":' + def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]: """ @@ -40,12 +48,14 @@ def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]: def extract_format(buf) -> Iterator[Tuple[Feature, Address]]: - if buf.startswith(b"MZ"): + if buf.startswith(MATCH_PE): yield Format(FORMAT_PE), NO_ADDRESS - elif buf.startswith(b"\x7fELF"): + elif buf.startswith(MATCH_ELF): yield Format(FORMAT_ELF), NO_ADDRESS elif is_freeze(buf): yield Format(FORMAT_FREEZE), NO_ADDRESS + elif buf.startswith(MATCH_RESULT): + yield Format(FORMAT_RESULT), NO_ADDRESS else: # we likely end up here: # 1. handling a file format (e.g. macho) @@ -56,10 +66,13 @@ def extract_format(buf) -> Iterator[Tuple[Feature, Address]]: def extract_arch(buf) -> Iterator[Tuple[Feature, Address]]: - if buf.startswith(b"MZ"): + if buf.startswith(MATCH_PE): yield from capa.features.extractors.pefile.extract_file_arch(pe=pefile.PE(data=buf)) - elif buf.startswith(b"\x7fELF"): + elif buf.startswith(MATCH_RESULT): + yield Arch(ARCH_ANY), NO_ADDRESS + + elif buf.startswith(MATCH_ELF): with contextlib.closing(io.BytesIO(buf)) as f: arch = capa.features.extractors.elf.detect_elf_arch(f) @@ -88,9 +101,11 @@ def extract_os(buf, os=OS_AUTO) -> Iterator[Tuple[Feature, Address]]: if os != OS_AUTO: yield OS(os), NO_ADDRESS - if buf.startswith(b"MZ"): + if buf.startswith(MATCH_PE): yield OS(OS_WINDOWS), NO_ADDRESS - elif buf.startswith(b"\x7fELF"): + elif buf.startswith(MATCH_RESULT): + yield OS(OS_ANY), NO_ADDRESS + elif buf.startswith(MATCH_ELF): with contextlib.closing(io.BytesIO(buf)) as f: os = capa.features.extractors.elf.detect_elf_os(f) diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index ab79cc74..e0dc596c 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -88,6 +88,7 @@ class Shdr: offset: int size: int link: int + entsize: int buf: bytes @@ -320,12 +321,12 @@ class ELF: shent = self.shbuf[shent_offset : shent_offset + self.e_shentsize] if self.bitness == 32: - sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from( - self.endian + "IIIIIII", shent, 0x0 + sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, _, _, sh_entsize = struct.unpack_from( + self.endian + "IIIIIIIIII", shent, 0x0 ) elif self.bitness == 64: - sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from( - self.endian + "IIQQQQI", shent, 0x0 + sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, _, _, sh_entsize = struct.unpack_from( + self.endian + "IIQQQQIIQQ", shent, 0x0 ) else: raise NotImplementedError() @@ -337,7 +338,7 @@ class ELF: if len(buf) != sh_size: raise ValueError("failed to read section header content") - return Shdr(sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, buf) + return Shdr(sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, sh_entsize, buf) @property def section_headers(self): @@ -502,6 +503,23 @@ class ELF: yield read_cstr(strtab, d_val) + @property + def symtab(self) -> Optional[Tuple[Shdr, Shdr]]: + """ + fetch the Shdr for the symtab and the associated strtab. + """ + SHT_SYMTAB = 0x2 + for shdr in self.section_headers: + if shdr.type != SHT_SYMTAB: + continue + + # the linked section contains strings referenced by the symtab structures. + strtab_shdr = self.parse_section_header(shdr.link) + + return shdr, strtab_shdr + + return None + @dataclass class ABITag: @@ -603,11 +621,76 @@ class SHNote: return ABITag(os, kmajor, kminor, kpatch) -def guess_os_from_osabi(elf) -> Optional[OS]: +@dataclass +class Symbol: + name_offset: int + value: int + size: int + info: int + other: int + shndx: int + + +class SymTab: + def __init__( + self, + endian: str, + bitness: int, + symtab: Shdr, + strtab: Shdr, + ) -> None: + self.symbols: List[Symbol] = [] + + self.symtab = symtab + self.strtab = strtab + + self._parse(endian, bitness, symtab.buf) + + def _parse(self, endian: str, bitness: int, symtab_buf: bytes) -> None: + """ + return the symbol's information in + the order specified by sys/elf32.h + """ + for i in range(int(len(self.symtab.buf) / self.symtab.entsize)): + if bitness == 32: + name_offset, value, size, info, other, shndx = struct.unpack_from( + endian + "IIIBBH", symtab_buf, i * self.symtab.entsize + ) + elif bitness == 64: + name_offset, info, other, shndx, value, size = struct.unpack_from( + endian + "IBBBQQ", symtab_buf, i * self.symtab.entsize + ) + + self.symbols.append(Symbol(name_offset, value, size, info, other, shndx)) + + def get_name(self, symbol: Symbol) -> str: + """ + fetch a symbol's name from symtab's + associated strings' section (SHT_STRTAB) + """ + if not self.strtab: + raise ValueError("no strings found") + + for i in range(symbol.name_offset, self.strtab.size): + if self.strtab.buf[i] == 0: + return self.strtab.buf[symbol.name_offset : i].decode("utf-8") + + raise ValueError("symbol name not found") + + def get_symbols(self) -> Iterator[Symbol]: + """ + return a tuple: (name, value, size, info, other, shndx) + for each symbol contained in the symbol table + """ + for symbol in self.symbols: + yield symbol + + +def guess_os_from_osabi(elf: ELF) -> Optional[OS]: return elf.ei_osabi -def guess_os_from_ph_notes(elf) -> Optional[OS]: +def guess_os_from_ph_notes(elf: ELF) -> Optional[OS]: # search for PT_NOTE sections that specify an OS # for example, on Linux there is a GNU section with minimum kernel version PT_NOTE = 0x4 @@ -646,7 +729,7 @@ def guess_os_from_ph_notes(elf) -> Optional[OS]: return None -def guess_os_from_sh_notes(elf) -> Optional[OS]: +def guess_os_from_sh_notes(elf: ELF) -> Optional[OS]: # search for notes stored in sections that aren't visible in program headers. # e.g. .note.Linux in Linux kernel modules. SHT_NOTE = 0x7 @@ -679,7 +762,7 @@ def guess_os_from_sh_notes(elf) -> Optional[OS]: return None -def guess_os_from_linker(elf) -> Optional[OS]: +def guess_os_from_linker(elf: ELF) -> Optional[OS]: # search for recognizable dynamic linkers (interpreters) # for example, on linux, we see file paths like: /lib64/ld-linux-x86-64.so.2 linker = elf.linker @@ -689,7 +772,7 @@ def guess_os_from_linker(elf) -> Optional[OS]: return None -def guess_os_from_abi_versions_needed(elf) -> Optional[OS]: +def guess_os_from_abi_versions_needed(elf: ELF) -> Optional[OS]: # then lets look for GLIBC symbol versioning requirements. # this will let us guess about linux/hurd in some cases. @@ -720,7 +803,7 @@ def guess_os_from_abi_versions_needed(elf) -> Optional[OS]: return None -def guess_os_from_needed_dependencies(elf) -> Optional[OS]: +def guess_os_from_needed_dependencies(elf: ELF) -> Optional[OS]: for needed in elf.needed: if needed.startswith("libmachuser.so"): return OS.HURD @@ -730,6 +813,33 @@ def guess_os_from_needed_dependencies(elf) -> Optional[OS]: return None +def guess_os_from_symtab(elf: ELF) -> Optional[OS]: + shdrs = elf.symtab + if not shdrs: + # executable does not contain a symbol table + # or the symbol's names are stripped + return None + + symtab_shdr, strtab_shdr = shdrs + symtab = SymTab(elf.endian, elf.bitness, symtab_shdr, strtab_shdr) + + keywords = { + OS.LINUX: [ + "linux", + "/linux/", + ], + } + + for symbol in symtab.get_symbols(): + sym_name = symtab.get_name(symbol) + + for os, hints in keywords.items(): + if any(map(lambda x: x in sym_name, hints)): + return os + + return None + + def detect_elf_os(f) -> str: """ f: type Union[BinaryIO, IDAIO] @@ -754,6 +864,9 @@ def detect_elf_os(f) -> str: needed_dependencies_guess = guess_os_from_needed_dependencies(elf) logger.debug("guess: needed dependencies: %s", needed_dependencies_guess) + symtab_guess = guess_os_from_symtab(elf) + logger.debug("guess: pertinent symbol name: %s", symtab_guess) + ret = None if osabi_guess: @@ -774,6 +887,9 @@ def detect_elf_os(f) -> str: elif needed_dependencies_guess: ret = needed_dependencies_guess + elif symtab_guess: + ret = symtab_guess + return ret.value if ret is not None else "unknown" diff --git a/capa/features/extractors/ida/helpers.py b/capa/features/extractors/ida/helpers.py index dbb9bd1d..0279f6fd 100644 --- a/capa/features/extractors/ida/helpers.py +++ b/capa/features/extractors/ida/helpers.py @@ -90,8 +90,11 @@ def get_file_imports() -> Dict[int, Tuple[str, str, int]]: if not library: continue - # IDA uses section names for the library of ELF imports, like ".dynsym" - library = library.lstrip(".") + # IDA uses section names for the library of ELF imports, like ".dynsym". + # These are not useful to us, we may need to expand this list over time + # TODO: exhaust this list, see #1419 + if library == ".dynsym": + library = "" def inspect_import(ea, function, ordinal): if function and function.startswith("__imp_"): diff --git a/capa/features/insn.py b/capa/features/insn.py index 24b7c7b5..f4be23c8 100644 --- a/capa/features/insn.py +++ b/capa/features/insn.py @@ -70,7 +70,7 @@ class Number(Feature): elif isinstance(self.value, float): return str(self.value) else: - raise ValueError("invalid value type %s" % (type(self.value))) + raise ValueError(f"invalid value type {type(self.value)}") # max recognized structure size (and therefore, offset size) diff --git a/capa/main.py b/capa/main.py index 172f6c70..fee4a270 100644 --- a/capa/main.py +++ b/capa/main.py @@ -69,6 +69,7 @@ from capa.features.common import ( FORMAT_SC64, FORMAT_DOTNET, FORMAT_FREEZE, + FORMAT_RESULT, ) from capa.features.address import NO_ADDRESS, Address from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor @@ -498,7 +499,7 @@ def get_workspace(path, format_, sigpaths): def get_extractor( path: str, format_: str, - os: str, + os_: str, backend: str, sigpaths: List[str], should_save_workspace=False, @@ -517,7 +518,7 @@ def get_extractor( if not is_supported_arch(path): raise UnsupportedArchError() - if os == OS_AUTO and not is_supported_os(path): + if os_ == OS_AUTO and not is_supported_os(path): raise UnsupportedOSError() if format_ == FORMAT_DOTNET: @@ -548,7 +549,7 @@ def get_extractor( with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): bv: BinaryView = BinaryViewType.get_view_of_file(path) if bv is None: - raise RuntimeError("Binary Ninja cannot open file %s" % (path)) + raise RuntimeError(f"Binary Ninja cannot open file {path}") return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv) @@ -569,7 +570,7 @@ def get_extractor( else: logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace") - return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os) + return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_) def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: @@ -913,12 +914,12 @@ def install_common_args(parser, wanted=None): (OS_MACOS,), (OS_WINDOWS,), ] - os_help = ", ".join(["%s (%s)" % (o[0], o[1]) if len(o) == 2 else o[0] for o in oses]) + os_help = ", ".join([f"{o[0]} ({o[1]})" if len(o) == 2 else o[0] for o in oses]) parser.add_argument( "--os", choices=[o[0] for o in oses], default=OS_AUTO, - help="select sample OS: %s" % os_help, + help=f"select sample OS: {os_help}", ) if "rules" in wanted: @@ -1182,53 +1183,72 @@ def main(argv=None): logger.debug("file limitation short circuit, won't analyze fully.") return E_FILE_LIMITATION - if format_ == FORMAT_FREEZE: - with open(args.sample, "rb") as f: - extractor = capa.features.freeze.load(f.read()) + # TODO: #1411 use a real type, not a dict here. + meta: Dict[str, Any] + capabilities: MatchResults + counts: Dict[str, Any] + + if format_ == FORMAT_RESULT: + # result document directly parses into meta, capabilities + result_doc = capa.render.result_document.ResultDocument.parse_file(args.sample) + meta, capabilities = result_doc.to_capa() + else: - try: - if format_ == FORMAT_PE: - sig_paths = get_signatures(args.signatures) - else: - sig_paths = [] - logger.debug("skipping library code matching: only have native PE signatures") - except IOError as e: - logger.error("%s", str(e)) - return E_INVALID_SIG + # all other formats we must create an extractor + # and use that to extract meta and capabilities - should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) + if format_ == FORMAT_FREEZE: + # freeze format deserializes directly into an extractor + with open(args.sample, "rb") as f: + extractor = capa.features.freeze.load(f.read()) + else: + # all other formats we must create an extractor, + # such as viv, binary ninja, etc. workspaces + # and use those for extracting. - try: - extractor = get_extractor( - args.sample, - format_, - args.os, - args.backend, - sig_paths, - should_save_workspace, - disable_progress=args.quiet, - ) - except UnsupportedFormatError: - log_unsupported_format_error() - return E_INVALID_FILE_TYPE - except UnsupportedArchError: - log_unsupported_arch_error() - return E_INVALID_FILE_ARCH - except UnsupportedOSError: - log_unsupported_os_error() - return E_INVALID_FILE_OS + try: + if format_ == FORMAT_PE: + sig_paths = get_signatures(args.signatures) + else: + sig_paths = [] + logger.debug("skipping library code matching: only have native PE signatures") + except IOError as e: + logger.error("%s", str(e)) + return E_INVALID_SIG - meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor) + should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) - capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) - meta["analysis"].update(counts) - meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities) + try: + extractor = get_extractor( + args.sample, + format_, + args.os, + args.backend, + sig_paths, + should_save_workspace, + disable_progress=args.quiet, + ) + except UnsupportedFormatError: + log_unsupported_format_error() + return E_INVALID_FILE_TYPE + except UnsupportedArchError: + log_unsupported_arch_error() + return E_INVALID_FILE_ARCH + except UnsupportedOSError: + log_unsupported_os_error() + return E_INVALID_FILE_OS - if has_file_limitation(rules, capabilities): - # bail if capa encountered file limitation e.g. a packed binary - # do show the output in verbose mode, though. - if not (args.verbose or args.vverbose or args.json): - return E_FILE_LIMITATION + meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor) + + capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) + meta["analysis"].update(counts) + meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities) + + if has_file_limitation(rules, capabilities): + # bail if capa encountered file limitation e.g. a packed binary + # do show the output in verbose mode, though. + if not (args.verbose or args.vverbose or args.json): + return E_FILE_LIMITATION if args.json: print(capa.render.json.render(meta, rules, capabilities)) diff --git a/capa/render/proto/__init__.py b/capa/render/proto/__init__.py index 58486f43..66a06c82 100644 --- a/capa/render/proto/__init__.py +++ b/capa/render/proto/__init__.py @@ -502,27 +502,36 @@ def metadata_from_pb2(meta: capa_pb2.Metadata) -> rd.Metadata: rules=tuple(meta.analysis.rules), base_address=addr_from_pb2(meta.analysis.base_address), layout=rd.Layout( - functions=[ - rd.FunctionLayout( - address=addr_from_pb2(f.address), - matched_basic_blocks=[ - rd.BasicBlockLayout(address=addr_from_pb2(bb.address)) for bb in f.matched_basic_blocks - ], - ) - for f in meta.analysis.layout.functions - ] + functions=tuple( + [ + rd.FunctionLayout( + address=addr_from_pb2(f.address), + matched_basic_blocks=tuple( + [ + rd.BasicBlockLayout(address=addr_from_pb2(bb.address)) + for bb in f.matched_basic_blocks + ] + ), + ) + for f in meta.analysis.layout.functions + ] + ) ), feature_counts=rd.FeatureCounts( file=meta.analysis.feature_counts.file, - functions=[ - rd.FunctionFeatureCount(address=addr_from_pb2(f.address), count=f.count) - for f in meta.analysis.feature_counts.functions - ], + functions=tuple( + [ + rd.FunctionFeatureCount(address=addr_from_pb2(f.address), count=f.count) + for f in meta.analysis.feature_counts.functions + ] + ), + ), + library_functions=tuple( + [ + rd.LibraryFunction(address=addr_from_pb2(lf.address), name=lf.name) + for lf in meta.analysis.library_functions + ] ), - library_functions=[ - rd.LibraryFunction(address=addr_from_pb2(lf.address), name=lf.name) - for lf in meta.analysis.library_functions - ], ), ) @@ -585,13 +594,14 @@ def feature_from_pb2(f: capa_pb2.FeatureNode) -> frzf.Feature: return frzf.ExportFeature(export=ff.export, description=ff.description or None) elif type_ == "import_": ff = f.import_ - return frzf.ImportFeature(import_=ff.import_, description=ff.description or None) + return frzf.ImportFeature(import_=ff.import_, description=ff.description or None) # type: ignore + # Mypy is unable to recognize `import_` as an argument elif type_ == "section": ff = f.section return frzf.SectionFeature(section=ff.section, description=ff.description or None) elif type_ == "function_name": ff = f.function_name - return frzf.FunctionNameFeature(function_name=ff.function_name, description=ff.description or None) + return frzf.FunctionNameFeature(function_name=ff.function_name, description=ff.description or None) # type: ignore elif type_ == "substring": ff = f.substring return frzf.SubstringFeature(substring=ff.substring, description=ff.description or None) @@ -603,7 +613,8 @@ def feature_from_pb2(f: capa_pb2.FeatureNode) -> frzf.Feature: return frzf.StringFeature(string=ff.string, description=ff.description or None) elif type_ == "class_": ff = f.class_ - return frzf.ClassFeature(class_=ff.class_, description=ff.description or None) + return frzf.ClassFeature(class_=ff.class_, description=ff.description or None) # type: ignore + # Mypy is unable to recognize `class_` as an argument due to aliasing elif type_ == "namespace": ff = f.namespace return frzf.NamespaceFeature(namespace=ff.namespace, description=ff.description or None) @@ -629,12 +640,13 @@ def feature_from_pb2(f: capa_pb2.FeatureNode) -> frzf.Feature: ff = f.operand_number return frzf.OperandNumberFeature( index=ff.index, operand_number=number_from_pb2(ff.operand_number), description=ff.description or None - ) + ) # type: ignore elif type_ == "operand_offset": ff = f.operand_offset return frzf.OperandOffsetFeature( index=ff.index, operand_offset=int_from_pb2(ff.operand_offset), description=ff.description or None - ) + ) # type: ignore + # Mypy is unable to recognize `operand_offset` as an argument due to aliasing elif type_ == "basic_block": ff = f.basic_block return frzf.BasicBlockFeature(description=ff.description or None) @@ -651,16 +663,16 @@ def match_from_pb2(match: capa_pb2.Match) -> rd.Match: return rd.Match( success=match.success, node=rd.StatementNode(statement=statement_from_pb2(match.statement)), - children=children, - locations=locations, + children=tuple(children), + locations=tuple(locations), captures={}, ) elif node_type == "feature": return rd.Match( success=match.success, node=rd.FeatureNode(feature=feature_from_pb2(match.feature)), - children=children, - locations=locations, + children=tuple(children), + locations=tuple(locations), captures={capture: tuple(map(addr_from_pb2, locs.address)) for capture, locs in match.captures.items()}, ) else: @@ -694,7 +706,8 @@ def maec_from_pb2(pb: capa_pb2.MaecMetadata) -> rd.MaecMetadata: malware_family=pb.malware_family or None, malware_category=pb.malware_category or None, malware_category_ov=pb.malware_category_ov or None, - ) + ) # type: ignore + # Mypy is unable to recognise arguments due to alias def rule_metadata_from_pb2(pb: capa_pb2.RuleMetadata) -> rd.RuleMetadata: @@ -711,7 +724,8 @@ def rule_metadata_from_pb2(pb: capa_pb2.RuleMetadata) -> rd.RuleMetadata: lib=pb.lib, is_subscope_rule=pb.is_subscope_rule, maec=maec_from_pb2(pb.maec), - ) + ) # type: ignore + # Mypy is unable to recognise `attack` and `is_subscope_rule` as arguments due to alias def doc_from_pb2(doc: capa_pb2.ResultDocument) -> rd.ResultDocument: diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 960635f0..cef49d12 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -6,7 +6,8 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import datetime -from typing import Any, Dict, Tuple, Union, Optional +import collections +from typing import Any, Dict, List, Tuple, Union, Optional from pydantic import Field, BaseModel @@ -125,6 +126,41 @@ class Metadata(FrozenModel): ), ) + def to_capa(self) -> Dict[str, Any]: + capa_meta = { + "timestamp": self.timestamp.isoformat(), + "version": self.version, + "sample": { + "md5": self.sample.md5, + "sha1": self.sample.sha1, + "sha256": self.sample.sha256, + "path": self.sample.path, + }, + "analysis": { + "format": self.analysis.format, + "arch": self.analysis.arch, + "os": self.analysis.os, + "extractor": self.analysis.extractor, + "rules": self.analysis.rules, + "base_address": self.analysis.base_address.to_capa(), + "layout": { + "functions": { + f.address.to_capa(): { + "matched_basic_blocks": [bb.address.to_capa() for bb in f.matched_basic_blocks] + } + for f in self.analysis.layout.functions + } + }, + "feature_counts": { + "file": self.analysis.feature_counts.file, + "functions": {fc.address.to_capa(): fc.count for fc in self.analysis.feature_counts.functions}, + }, + "library_functions": {lf.address.to_capa(): lf.name for lf in self.analysis.library_functions}, + }, + } + + return capa_meta + class CompoundStatementType: AND = "and" @@ -227,6 +263,54 @@ def node_from_capa(node: Union[capa.engine.Statement, capa.engine.Feature]) -> N assert_never(node) +def node_to_capa( + node: Node, children: List[Union[capa.engine.Statement, capa.engine.Feature]] +) -> Union[capa.engine.Statement, capa.engine.Feature]: + if isinstance(node, StatementNode): + if isinstance(node.statement, CompoundStatement): + if node.statement.type == CompoundStatementType.AND: + return capa.engine.And(description=node.statement.description, children=children) + + elif node.statement.type == CompoundStatementType.OR: + return capa.engine.Or(description=node.statement.description, children=children) + + elif node.statement.type == CompoundStatementType.NOT: + return capa.engine.Not(description=node.statement.description, child=children[0]) + + elif node.statement.type == CompoundStatementType.OPTIONAL: + return capa.engine.Some(description=node.statement.description, count=0, children=children) + + else: + assert_never(node.statement.type) + + elif isinstance(node.statement, SomeStatement): + return capa.engine.Some( + description=node.statement.description, count=node.statement.count, children=children + ) + + elif isinstance(node.statement, RangeStatement): + return capa.engine.Range( + description=node.statement.description, + min=node.statement.min, + max=node.statement.max, + child=node.statement.child.to_capa(), + ) + + elif isinstance(node.statement, SubscopeStatement): + return capa.engine.Subscope( + description=node.statement.description, scope=node.statement.scope, child=children[0] + ) + + else: + assert_never(node.statement) + + elif isinstance(node, FeatureNode): + return node.feature.to_capa() + + else: + assert_never(node) + + class Match(FrozenModel): """ args: @@ -359,6 +443,39 @@ class Match(FrozenModel): captures={capture: tuple(captures[capture]) for capture in captures}, ) + def to_capa(self, rules_by_name: Dict[str, capa.rules.Rule]) -> capa.engine.Result: + children = [child.to_capa(rules_by_name) for child in self.children] + statement = node_to_capa(self.node, [child.statement for child in children]) + + if isinstance(self.node, FeatureNode): + feature = self.node.feature + + if isinstance(feature, (frzf.SubstringFeature, frzf.RegexFeature)): + matches = {capture: {loc.to_capa() for loc in locs} for capture, locs in self.captures.items()} + + if isinstance(feature, frzf.SubstringFeature): + assert isinstance(statement, capa.features.common.Substring) + statement = capa.features.common._MatchedSubstring(statement, matches) + elif isinstance(feature, frzf.RegexFeature): + assert isinstance(statement, capa.features.common.Regex) + statement = capa.features.common._MatchedRegex(statement, matches) + else: + assert_never(feature) + + # apparently we don't have to fixup match and subscope entries here. + # at least, default, verbose, and vverbose renderers seem to work well without any special handling here. + # + # children contains a single tree of results, corresponding to the logic of the matched rule. + # self.node.feature.match contains the name of the rule that was matched. + # so its all available to reconstruct, if necessary. + + return capa.features.common.Result( + success=self.success, + statement=statement, + locations={loc.to_capa() for loc in self.locations}, + children=children, + ) + def parse_parts_id(s: str): id_ = "" @@ -543,3 +660,22 @@ class ResultDocument(FrozenModel): ) return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches) + + def to_capa(self) -> Tuple[Dict, Dict]: + meta = self.meta.to_capa() + capabilities: Dict[ + str, List[Tuple[capa.features.address.Address, capa.features.common.Result]] + ] = collections.defaultdict(list) + + # this doesn't quite work because we don't have the rule source for rules that aren't matched. + rules_by_name = { + rule_name: capa.rules.Rule.from_yaml(rule_match.source) for rule_name, rule_match in self.rules.items() + } + + for rule_name, rule_match in self.rules.items(): + for addr, match in rule_match.matches: + result: capa.engine.Result = match.to_capa(rules_by_name) + + capabilities[rule_name].append((addr.to_capa(), result)) + + return meta, capabilities diff --git a/capa/render/utils.py b/capa/render/utils.py index c65b705b..20a817d7 100644 --- a/capa/render/utils.py +++ b/capa/render/utils.py @@ -16,7 +16,7 @@ import capa.render.result_document as rd def bold(s: str) -> str: """draw attention to the given string""" - return termcolor.colored(s, "blue") + return termcolor.colored(s, "cyan") def bold2(s: str) -> str: diff --git a/rules b/rules index 4ca80233..a10ccf3f 160000 --- a/rules +++ b/rules @@ -1 +1 @@ -Subproject commit 4ca802336320e9b2f1f524d5aa95ef102ad3c1e4 +Subproject commit a10ccf3fd8d2d3b83a36a1f7b3cd08be4b7434f6 diff --git a/setup.py b/setup.py index dd680893..c7688027 100644 --- a/setup.py +++ b/setup.py @@ -27,8 +27,8 @@ requirements = [ "pyelftools==0.29", "dnfile==0.13.0", "dncil==1.0.2", - "pydantic==1.10.6", - "protobuf==4.21.12", + "pydantic==1.10.7", + "protobuf==4.22.1", ] # this sets __version__ @@ -71,7 +71,7 @@ setuptools.setup( "dev": [ "pytest==7.1.3", "pytest-sugar==0.9.4", - "pytest-instafail==0.4.2", + "pytest-instafail==0.5.0", "pytest-cov==4.0.0", "pycodestyle==2.10.0", "ruff==0.0.259", @@ -90,7 +90,7 @@ setuptools.setup( "types-termcolor==1.1.4", "types-psutil==5.8.23", "types_requests==2.28.1", - "types-protobuf==4.21.0.5", + "types-protobuf==4.22.0.1", ], "build": [ "pyinstaller==5.9.0", diff --git a/tests/data b/tests/data index 3cbd7768..9b302d4b 160000 --- a/tests/data +++ b/tests/data @@ -1 +1 @@ -Subproject commit 3cbd7768c27fbcc77dc46d8f7bddd16834e352f1 +Subproject commit 9b302d4bcfea9c87704df51311f6e170b87d13c9 diff --git a/tests/fixtures.py b/tests/fixtures.py index 51e1c0af..04c9c53b 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -241,6 +241,8 @@ def get_data_path_by_name(name): return os.path.join(CD, "data", "kernel32-64.dll_") elif name == "pma01-01": return os.path.join(CD, "data", "Practical Malware Analysis Lab 01-01.dll_") + elif name == "pma01-01-rd": + return os.path.join(CD, "data", "rd", "Practical Malware Analysis Lab 01-01.dll_.json") elif name == "pma12-04": return os.path.join(CD, "data", "Practical Malware Analysis Lab 12-04.exe_") elif name == "pma16-01": @@ -307,6 +309,8 @@ def get_data_path_by_name(name): return os.path.join(CD, "data", "bf7a9c8bdfa6d47e01ad2b056264acc3fd90cf43fe0ed8deec93ab46b47d76cb.elf_") elif name.startswith("294b8d"): return os.path.join(CD, "data", "294b8db1f2702b60fb2e42fdc50c2cee6a5046112da9a5703a548a4fa50477bc.elf_") + elif name.startswith("2bf18d"): + return os.path.join(CD, "data", "2bf18d0403677378adad9001b1243211.elf_") else: raise ValueError(f"unexpected sample fixture: {name}") @@ -365,6 +369,8 @@ def get_sample_md5_by_name(name): elif name.startswith("294b8d"): # file name is SHA256 hash return "3db3e55b16a7b1b1afb970d5e77c5d98" + elif name.startswith("2bf18d"): + return "2bf18d0403677378adad9001b1243211" else: raise ValueError(f"unexpected sample fixture: {name}") diff --git a/tests/test_main.py b/tests/test_main.py index d515c104..d17e6e64 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -467,3 +467,12 @@ def test_main_dotnet4(_039a6_dotnetfile_extractor): # tests successful execution and one rendering path = _039a6_dotnetfile_extractor.path assert capa.main.main([path, "-vv"]) == 0 + + +def test_main_rd(): + path = fixtures.get_data_path_by_name("pma01-01-rd") + assert capa.main.main([path, "-vv"]) == 0 + assert capa.main.main([path, "-v"]) == 0 + assert capa.main.main([path, "-j"]) == 0 + assert capa.main.main([path, "-q"]) == 0 + assert capa.main.main([path]) == 0 diff --git a/tests/test_os_detection.py b/tests/test_os_detection.py index bdc89686..600213b2 100644 --- a/tests/test_os_detection.py +++ b/tests/test_os_detection.py @@ -20,6 +20,7 @@ def test_elf_sh_notes(): # guess: sh notes: OS.LINUX # guess: linker: None # guess: ABI versions needed: None + # guess: symtab: None # guess: needed dependencies: None path = get_data_path_by_name("2f7f5f") with open(path, "rb") as f: @@ -32,6 +33,7 @@ def test_elf_pt_notes(): # guess: sh notes: OS.LINUX # guess: linker: OS.LINUX # guess: ABI versions needed: OS.LINUX + # guess: symtab: None # guess: needed dependencies: None path = get_data_path_by_name("7351f.elf") with open(path, "rb") as f: @@ -44,6 +46,7 @@ def test_elf_so_needed(): # guess: sh notes: OS.HURD # guess: linker: None # guess: ABI versions needed: OS.HURD + # guess: symtab: None # guess: needed dependencies: OS.HURD path = get_data_path_by_name("b5f052") with open(path, "rb") as f: @@ -56,7 +59,21 @@ def test_elf_abi_version_hurd(): # guess: sh notes: OS.HURD # guess: linker: None # guess: ABI versions needed: OS.HURD + # guess: symtab: None # guess: needed dependencies: None path = get_data_path_by_name("bf7a9c") with open(path, "rb") as f: assert capa.features.extractors.elf.detect_elf_os(f) == "hurd" + + +def test_elf_symbol_table(): + # guess: osabi: None + # guess: ph notes: None + # guess: sh notes: None + # guess: linker: None + # guess: ABI versions needed: None + # guess: symtab: OS.LINUX + # guess: needed dependencies: None + path = get_data_path_by_name("2bf18d") + with open(path, "rb") as f: + assert capa.features.extractors.elf.detect_elf_os(f) == "linux" diff --git a/tests/test_result_document.py b/tests/test_result_document.py index 87e62911..bd074c6b 100644 --- a/tests/test_result_document.py +++ b/tests/test_result_document.py @@ -8,6 +8,7 @@ import copy import pytest +import fixtures from fixtures import * import capa @@ -268,3 +269,18 @@ def assert_round_trip(rd: rdoc.ResultDocument): def test_round_trip(request, rd_file): rd: rdoc.ResultDocument = request.getfixturevalue(rd_file) assert_round_trip(rd) + + +def test_json_to_rdoc(): + path = fixtures.get_data_path_by_name("pma01-01-rd") + assert isinstance(rdoc.ResultDocument.parse_file(path), rdoc.ResultDocument) + + +def test_rdoc_to_capa(): + path = fixtures.get_data_path_by_name("pma01-01-rd") + + rd = rdoc.ResultDocument.parse_file(path) + + meta, capabilites = rd.to_capa() + assert isinstance(meta, dict) + assert isinstance(capabilites, dict) diff --git a/tests/test_scripts.py b/tests/test_scripts.py index fb1ce734..3b299a36 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -81,4 +81,4 @@ def test_proto_conversion(tmpdir): p = run_program(get_script_path("proto-to-results.py"), [pb]) assert p.returncode == 0 - assert p.stdout.startswith(b'{\n "meta": ') + assert p.stdout.startswith(b'{\n "meta": ') or p.stdout.startswith(b'{\r\n "meta": ')