resolve merge conflict

This commit is contained in:
Pratham Chauhan
2023-04-04 18:56:26 +05:30
23 changed files with 475 additions and 116 deletions

View File

@@ -15,7 +15,7 @@ jobs:
fail-fast: true
matrix:
include:
- os: ubuntu-18.04
- os: ubuntu-20.04
# use old linux so that the shared library versioning is more portable
artifact_name: capa
asset_name: linux
@@ -36,7 +36,7 @@ jobs:
uses: actions/setup-python@d27e3f3d7c64b4bbf8e4abfb9b63b83e846e0435 # v4.5.0
with:
python-version: 3.8
- if: matrix.os == 'ubuntu-18.04'
- if: matrix.os == 'ubuntu-20.04'
run: sudo apt-get install -y libyaml-dev
- name: Upgrade pip, setuptools
run: python -m pip install --upgrade pip setuptools
@@ -65,10 +65,7 @@ jobs:
matrix:
include:
# OSs not already tested above
- os: ubuntu-18.04
artifact_name: capa
asset_name: linux
- os: ubuntu-20.04
- os: ubuntu-22.04
artifact_name: capa
asset_name: linux
- os: windows-2022

View File

@@ -76,6 +76,8 @@ jobs:
python-version: "3.8"
- os: ubuntu-20.04
python-version: "3.9"
- os: ubuntu-20.04
python-version: "3.10"
steps:
- name: Checkout capa with submodules
uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c # v3.3.0
@@ -96,6 +98,7 @@ jobs:
binja-tests:
name: Binary Ninja tests for ${{ matrix.python-version }} on ${{ matrix.os }}
runs-on: ubuntu-20.04
needs: [code_style, rule_linter]
strategy:
fail-fast: false
matrix:

View File

@@ -4,13 +4,14 @@
### New Features
- add protobuf format for result documents #1219 @williballenthin @mr-tz
- extractor: add Binary Ninja feature extractor @xusheng6
- new cli flag `--os` to override auto-detected operating system for a sample @captainGeech42
- Change colour/highlight to "cyan" instead of "blue" for easy noticing.#1384 @ggold7046
- add new format to parse output json back to capa #1396 @ooprathamm
### Breaking Changes
### New Rules (22)
### New Rules (26)
- persistence/scheduled-tasks/schedule-task-via-at joren485
- data-manipulation/prng/generate-random-numbers-via-rtlgenrandom william.ballenthin@mandiant.com
@@ -34,9 +35,14 @@
- nursery/set-web-proxy-in-dotnet michael.hunhoff@mandiant.com
- nursery/check-for-windows-sandbox-via-subdirectory echernofsky@google.com
- nursery/enumerate-pe-sections-in-dotnet @mr-tz
- nursery/destroy-software-breakpoint-capability echernofsky@google.com
- nursery/send-data-to-internet michael.hunhoff@mandiant.com
- nursery/compiled-with-cx_freeze @mr-tz
- nursery/contain-a-thread-local-storage-tls-section-in-dotnet michael.hunhoff@mandiant.com
-
### Bug Fixes
- extractor: removed '.dynsym' as the library name for ELF imports #1318 @stevemk14ebr
- extractor: fix vivisect loop detection corner case #1310 @mr-tz
- match: extend OS characteristic to match OS_ANY to all supported OSes #1324 @mike-hunhoff
- extractor: fix IDA and vivisect string and bytes features overlap and tests #1327 #1336 @xusheng6

View File

@@ -2,7 +2,7 @@
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/flare-capa)](https://pypi.org/project/flare-capa)
[![Last release](https://img.shields.io/github/v/release/mandiant/capa)](https://github.com/mandiant/capa/releases)
[![Number of rules](https://img.shields.io/badge/rules-789-blue.svg)](https://github.com/mandiant/capa-rules)
[![Number of rules](https://img.shields.io/badge/rules-794-blue.svg)](https://github.com/mandiant/capa-rules)
[![CI status](https://github.com/mandiant/capa/workflows/CI/badge.svg)](https://github.com/mandiant/capa/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster)
[![Downloads](https://img.shields.io/github/downloads/mandiant/capa/total)](https://github.com/mandiant/capa/releases)
[![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)](LICENSE.txt)

View File

@@ -450,6 +450,7 @@ FORMAT_AUTO = "auto"
FORMAT_SC32 = "sc32"
FORMAT_SC64 = "sc64"
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
FORMAT_UNKNOWN = "unknown"

View File

@@ -103,7 +103,7 @@ def extract_file_import_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address
ordinal = sym.ordinal
if ordinal != 0 and (lib_name != ""):
ordinal_name = "#%d" % (ordinal)
ordinal_name = f"#{ordinal}"
for name in capa.features.extractors.helpers.generate_symbols(lib_name, ordinal_name):
yield Import(name), addr
@@ -147,7 +147,7 @@ def extract_file_format(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
# no file type to return when processing a binary file, but we want to continue processing
return
else:
raise NotImplementedError("unexpected file format: %d" % view_type)
raise NotImplementedError(f"unexpected file format: {view_type}")
def extract_features(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -26,7 +26,7 @@ if spec is not None:
def find_binja_path() -> str:
raw_output = subprocess.check_output(["python", "-c", "%s" % code]).decode("ascii").strip()
raw_output = subprocess.check_output(["python", "-c", code]).decode("ascii").strip()
return bytes.fromhex(raw_output).decode("utf8")

View File

@@ -12,11 +12,14 @@ import capa.features.extractors.pefile
import capa.features.extractors.strings
from capa.features.common import (
OS,
OS_ANY,
OS_AUTO,
ARCH_ANY,
FORMAT_PE,
FORMAT_ELF,
OS_WINDOWS,
FORMAT_FREEZE,
FORMAT_RESULT,
Arch,
Format,
String,
@@ -27,6 +30,11 @@ from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress
logger = logging.getLogger(__name__)
# match strings for formats
MATCH_PE = b"MZ"
MATCH_ELF = b"\x7fELF"
MATCH_RESULT = b'{"meta":'
def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]:
"""
@@ -40,12 +48,14 @@ def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]:
def extract_format(buf) -> Iterator[Tuple[Feature, Address]]:
if buf.startswith(b"MZ"):
if buf.startswith(MATCH_PE):
yield Format(FORMAT_PE), NO_ADDRESS
elif buf.startswith(b"\x7fELF"):
elif buf.startswith(MATCH_ELF):
yield Format(FORMAT_ELF), NO_ADDRESS
elif is_freeze(buf):
yield Format(FORMAT_FREEZE), NO_ADDRESS
elif buf.startswith(MATCH_RESULT):
yield Format(FORMAT_RESULT), NO_ADDRESS
else:
# we likely end up here:
# 1. handling a file format (e.g. macho)
@@ -56,10 +66,13 @@ def extract_format(buf) -> Iterator[Tuple[Feature, Address]]:
def extract_arch(buf) -> Iterator[Tuple[Feature, Address]]:
if buf.startswith(b"MZ"):
if buf.startswith(MATCH_PE):
yield from capa.features.extractors.pefile.extract_file_arch(pe=pefile.PE(data=buf))
elif buf.startswith(b"\x7fELF"):
elif buf.startswith(MATCH_RESULT):
yield Arch(ARCH_ANY), NO_ADDRESS
elif buf.startswith(MATCH_ELF):
with contextlib.closing(io.BytesIO(buf)) as f:
arch = capa.features.extractors.elf.detect_elf_arch(f)
@@ -88,9 +101,11 @@ def extract_os(buf, os=OS_AUTO) -> Iterator[Tuple[Feature, Address]]:
if os != OS_AUTO:
yield OS(os), NO_ADDRESS
if buf.startswith(b"MZ"):
if buf.startswith(MATCH_PE):
yield OS(OS_WINDOWS), NO_ADDRESS
elif buf.startswith(b"\x7fELF"):
elif buf.startswith(MATCH_RESULT):
yield OS(OS_ANY), NO_ADDRESS
elif buf.startswith(MATCH_ELF):
with contextlib.closing(io.BytesIO(buf)) as f:
os = capa.features.extractors.elf.detect_elf_os(f)

View File

@@ -88,6 +88,7 @@ class Shdr:
offset: int
size: int
link: int
entsize: int
buf: bytes
@@ -320,12 +321,12 @@ class ELF:
shent = self.shbuf[shent_offset : shent_offset + self.e_shentsize]
if self.bitness == 32:
sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from(
self.endian + "IIIIIII", shent, 0x0
sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, _, _, sh_entsize = struct.unpack_from(
self.endian + "IIIIIIIIII", shent, 0x0
)
elif self.bitness == 64:
sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link = struct.unpack_from(
self.endian + "IIQQQQI", shent, 0x0
sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, _, _, sh_entsize = struct.unpack_from(
self.endian + "IIQQQQIIQQ", shent, 0x0
)
else:
raise NotImplementedError()
@@ -337,7 +338,7 @@ class ELF:
if len(buf) != sh_size:
raise ValueError("failed to read section header content")
return Shdr(sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, buf)
return Shdr(sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, sh_entsize, buf)
@property
def section_headers(self):
@@ -502,6 +503,23 @@ class ELF:
yield read_cstr(strtab, d_val)
@property
def symtab(self) -> Optional[Tuple[Shdr, Shdr]]:
"""
fetch the Shdr for the symtab and the associated strtab.
"""
SHT_SYMTAB = 0x2
for shdr in self.section_headers:
if shdr.type != SHT_SYMTAB:
continue
# the linked section contains strings referenced by the symtab structures.
strtab_shdr = self.parse_section_header(shdr.link)
return shdr, strtab_shdr
return None
@dataclass
class ABITag:
@@ -603,11 +621,76 @@ class SHNote:
return ABITag(os, kmajor, kminor, kpatch)
def guess_os_from_osabi(elf) -> Optional[OS]:
@dataclass
class Symbol:
name_offset: int
value: int
size: int
info: int
other: int
shndx: int
class SymTab:
def __init__(
self,
endian: str,
bitness: int,
symtab: Shdr,
strtab: Shdr,
) -> None:
self.symbols: List[Symbol] = []
self.symtab = symtab
self.strtab = strtab
self._parse(endian, bitness, symtab.buf)
def _parse(self, endian: str, bitness: int, symtab_buf: bytes) -> None:
"""
return the symbol's information in
the order specified by sys/elf32.h
"""
for i in range(int(len(self.symtab.buf) / self.symtab.entsize)):
if bitness == 32:
name_offset, value, size, info, other, shndx = struct.unpack_from(
endian + "IIIBBH", symtab_buf, i * self.symtab.entsize
)
elif bitness == 64:
name_offset, info, other, shndx, value, size = struct.unpack_from(
endian + "IBBBQQ", symtab_buf, i * self.symtab.entsize
)
self.symbols.append(Symbol(name_offset, value, size, info, other, shndx))
def get_name(self, symbol: Symbol) -> str:
"""
fetch a symbol's name from symtab's
associated strings' section (SHT_STRTAB)
"""
if not self.strtab:
raise ValueError("no strings found")
for i in range(symbol.name_offset, self.strtab.size):
if self.strtab.buf[i] == 0:
return self.strtab.buf[symbol.name_offset : i].decode("utf-8")
raise ValueError("symbol name not found")
def get_symbols(self) -> Iterator[Symbol]:
"""
return a tuple: (name, value, size, info, other, shndx)
for each symbol contained in the symbol table
"""
for symbol in self.symbols:
yield symbol
def guess_os_from_osabi(elf: ELF) -> Optional[OS]:
return elf.ei_osabi
def guess_os_from_ph_notes(elf) -> Optional[OS]:
def guess_os_from_ph_notes(elf: ELF) -> Optional[OS]:
# search for PT_NOTE sections that specify an OS
# for example, on Linux there is a GNU section with minimum kernel version
PT_NOTE = 0x4
@@ -646,7 +729,7 @@ def guess_os_from_ph_notes(elf) -> Optional[OS]:
return None
def guess_os_from_sh_notes(elf) -> Optional[OS]:
def guess_os_from_sh_notes(elf: ELF) -> Optional[OS]:
# search for notes stored in sections that aren't visible in program headers.
# e.g. .note.Linux in Linux kernel modules.
SHT_NOTE = 0x7
@@ -679,7 +762,7 @@ def guess_os_from_sh_notes(elf) -> Optional[OS]:
return None
def guess_os_from_linker(elf) -> Optional[OS]:
def guess_os_from_linker(elf: ELF) -> Optional[OS]:
# search for recognizable dynamic linkers (interpreters)
# for example, on linux, we see file paths like: /lib64/ld-linux-x86-64.so.2
linker = elf.linker
@@ -689,7 +772,7 @@ def guess_os_from_linker(elf) -> Optional[OS]:
return None
def guess_os_from_abi_versions_needed(elf) -> Optional[OS]:
def guess_os_from_abi_versions_needed(elf: ELF) -> Optional[OS]:
# then lets look for GLIBC symbol versioning requirements.
# this will let us guess about linux/hurd in some cases.
@@ -720,7 +803,7 @@ def guess_os_from_abi_versions_needed(elf) -> Optional[OS]:
return None
def guess_os_from_needed_dependencies(elf) -> Optional[OS]:
def guess_os_from_needed_dependencies(elf: ELF) -> Optional[OS]:
for needed in elf.needed:
if needed.startswith("libmachuser.so"):
return OS.HURD
@@ -730,6 +813,33 @@ def guess_os_from_needed_dependencies(elf) -> Optional[OS]:
return None
def guess_os_from_symtab(elf: ELF) -> Optional[OS]:
shdrs = elf.symtab
if not shdrs:
# executable does not contain a symbol table
# or the symbol's names are stripped
return None
symtab_shdr, strtab_shdr = shdrs
symtab = SymTab(elf.endian, elf.bitness, symtab_shdr, strtab_shdr)
keywords = {
OS.LINUX: [
"linux",
"/linux/",
],
}
for symbol in symtab.get_symbols():
sym_name = symtab.get_name(symbol)
for os, hints in keywords.items():
if any(map(lambda x: x in sym_name, hints)):
return os
return None
def detect_elf_os(f) -> str:
"""
f: type Union[BinaryIO, IDAIO]
@@ -754,6 +864,9 @@ def detect_elf_os(f) -> str:
needed_dependencies_guess = guess_os_from_needed_dependencies(elf)
logger.debug("guess: needed dependencies: %s", needed_dependencies_guess)
symtab_guess = guess_os_from_symtab(elf)
logger.debug("guess: pertinent symbol name: %s", symtab_guess)
ret = None
if osabi_guess:
@@ -774,6 +887,9 @@ def detect_elf_os(f) -> str:
elif needed_dependencies_guess:
ret = needed_dependencies_guess
elif symtab_guess:
ret = symtab_guess
return ret.value if ret is not None else "unknown"

View File

@@ -90,8 +90,11 @@ def get_file_imports() -> Dict[int, Tuple[str, str, int]]:
if not library:
continue
# IDA uses section names for the library of ELF imports, like ".dynsym"
library = library.lstrip(".")
# IDA uses section names for the library of ELF imports, like ".dynsym".
# These are not useful to us, we may need to expand this list over time
# TODO: exhaust this list, see #1419
if library == ".dynsym":
library = ""
def inspect_import(ea, function, ordinal):
if function and function.startswith("__imp_"):

View File

@@ -70,7 +70,7 @@ class Number(Feature):
elif isinstance(self.value, float):
return str(self.value)
else:
raise ValueError("invalid value type %s" % (type(self.value)))
raise ValueError(f"invalid value type {type(self.value)}")
# max recognized structure size (and therefore, offset size)

View File

@@ -69,6 +69,7 @@ from capa.features.common import (
FORMAT_SC64,
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_RESULT,
)
from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
@@ -498,7 +499,7 @@ def get_workspace(path, format_, sigpaths):
def get_extractor(
path: str,
format_: str,
os: str,
os_: str,
backend: str,
sigpaths: List[str],
should_save_workspace=False,
@@ -517,7 +518,7 @@ def get_extractor(
if not is_supported_arch(path):
raise UnsupportedArchError()
if os == OS_AUTO and not is_supported_os(path):
if os_ == OS_AUTO and not is_supported_os(path):
raise UnsupportedOSError()
if format_ == FORMAT_DOTNET:
@@ -548,7 +549,7 @@ def get_extractor(
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
bv: BinaryView = BinaryViewType.get_view_of_file(path)
if bv is None:
raise RuntimeError("Binary Ninja cannot open file %s" % (path))
raise RuntimeError(f"Binary Ninja cannot open file {path}")
return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv)
@@ -569,7 +570,7 @@ def get_extractor(
else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os)
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_)
def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]:
@@ -913,12 +914,12 @@ def install_common_args(parser, wanted=None):
(OS_MACOS,),
(OS_WINDOWS,),
]
os_help = ", ".join(["%s (%s)" % (o[0], o[1]) if len(o) == 2 else o[0] for o in oses])
os_help = ", ".join([f"{o[0]} ({o[1]})" if len(o) == 2 else o[0] for o in oses])
parser.add_argument(
"--os",
choices=[o[0] for o in oses],
default=OS_AUTO,
help="select sample OS: %s" % os_help,
help=f"select sample OS: {os_help}",
)
if "rules" in wanted:
@@ -1182,10 +1183,29 @@ def main(argv=None):
logger.debug("file limitation short circuit, won't analyze fully.")
return E_FILE_LIMITATION
# TODO: #1411 use a real type, not a dict here.
meta: Dict[str, Any]
capabilities: MatchResults
counts: Dict[str, Any]
if format_ == FORMAT_RESULT:
# result document directly parses into meta, capabilities
result_doc = capa.render.result_document.ResultDocument.parse_file(args.sample)
meta, capabilities = result_doc.to_capa()
else:
# all other formats we must create an extractor
# and use that to extract meta and capabilities
if format_ == FORMAT_FREEZE:
# freeze format deserializes directly into an extractor
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
# all other formats we must create an extractor,
# such as viv, binary ninja, etc. workspaces
# and use those for extracting.
try:
if format_ == FORMAT_PE:
sig_paths = get_signatures(args.signatures)

View File

@@ -502,27 +502,36 @@ def metadata_from_pb2(meta: capa_pb2.Metadata) -> rd.Metadata:
rules=tuple(meta.analysis.rules),
base_address=addr_from_pb2(meta.analysis.base_address),
layout=rd.Layout(
functions=[
functions=tuple(
[
rd.FunctionLayout(
address=addr_from_pb2(f.address),
matched_basic_blocks=[
rd.BasicBlockLayout(address=addr_from_pb2(bb.address)) for bb in f.matched_basic_blocks
],
matched_basic_blocks=tuple(
[
rd.BasicBlockLayout(address=addr_from_pb2(bb.address))
for bb in f.matched_basic_blocks
]
),
)
for f in meta.analysis.layout.functions
]
)
),
feature_counts=rd.FeatureCounts(
file=meta.analysis.feature_counts.file,
functions=[
functions=tuple(
[
rd.FunctionFeatureCount(address=addr_from_pb2(f.address), count=f.count)
for f in meta.analysis.feature_counts.functions
],
]
),
library_functions=[
),
library_functions=tuple(
[
rd.LibraryFunction(address=addr_from_pb2(lf.address), name=lf.name)
for lf in meta.analysis.library_functions
],
]
),
),
)
@@ -585,13 +594,14 @@ def feature_from_pb2(f: capa_pb2.FeatureNode) -> frzf.Feature:
return frzf.ExportFeature(export=ff.export, description=ff.description or None)
elif type_ == "import_":
ff = f.import_
return frzf.ImportFeature(import_=ff.import_, description=ff.description or None)
return frzf.ImportFeature(import_=ff.import_, description=ff.description or None) # type: ignore
# Mypy is unable to recognize `import_` as an argument
elif type_ == "section":
ff = f.section
return frzf.SectionFeature(section=ff.section, description=ff.description or None)
elif type_ == "function_name":
ff = f.function_name
return frzf.FunctionNameFeature(function_name=ff.function_name, description=ff.description or None)
return frzf.FunctionNameFeature(function_name=ff.function_name, description=ff.description or None) # type: ignore
elif type_ == "substring":
ff = f.substring
return frzf.SubstringFeature(substring=ff.substring, description=ff.description or None)
@@ -603,7 +613,8 @@ def feature_from_pb2(f: capa_pb2.FeatureNode) -> frzf.Feature:
return frzf.StringFeature(string=ff.string, description=ff.description or None)
elif type_ == "class_":
ff = f.class_
return frzf.ClassFeature(class_=ff.class_, description=ff.description or None)
return frzf.ClassFeature(class_=ff.class_, description=ff.description or None) # type: ignore
# Mypy is unable to recognize `class_` as an argument due to aliasing
elif type_ == "namespace":
ff = f.namespace
return frzf.NamespaceFeature(namespace=ff.namespace, description=ff.description or None)
@@ -629,12 +640,13 @@ def feature_from_pb2(f: capa_pb2.FeatureNode) -> frzf.Feature:
ff = f.operand_number
return frzf.OperandNumberFeature(
index=ff.index, operand_number=number_from_pb2(ff.operand_number), description=ff.description or None
)
) # type: ignore
elif type_ == "operand_offset":
ff = f.operand_offset
return frzf.OperandOffsetFeature(
index=ff.index, operand_offset=int_from_pb2(ff.operand_offset), description=ff.description or None
)
) # type: ignore
# Mypy is unable to recognize `operand_offset` as an argument due to aliasing
elif type_ == "basic_block":
ff = f.basic_block
return frzf.BasicBlockFeature(description=ff.description or None)
@@ -651,16 +663,16 @@ def match_from_pb2(match: capa_pb2.Match) -> rd.Match:
return rd.Match(
success=match.success,
node=rd.StatementNode(statement=statement_from_pb2(match.statement)),
children=children,
locations=locations,
children=tuple(children),
locations=tuple(locations),
captures={},
)
elif node_type == "feature":
return rd.Match(
success=match.success,
node=rd.FeatureNode(feature=feature_from_pb2(match.feature)),
children=children,
locations=locations,
children=tuple(children),
locations=tuple(locations),
captures={capture: tuple(map(addr_from_pb2, locs.address)) for capture, locs in match.captures.items()},
)
else:
@@ -694,7 +706,8 @@ def maec_from_pb2(pb: capa_pb2.MaecMetadata) -> rd.MaecMetadata:
malware_family=pb.malware_family or None,
malware_category=pb.malware_category or None,
malware_category_ov=pb.malware_category_ov or None,
)
) # type: ignore
# Mypy is unable to recognise arguments due to alias
def rule_metadata_from_pb2(pb: capa_pb2.RuleMetadata) -> rd.RuleMetadata:
@@ -711,7 +724,8 @@ def rule_metadata_from_pb2(pb: capa_pb2.RuleMetadata) -> rd.RuleMetadata:
lib=pb.lib,
is_subscope_rule=pb.is_subscope_rule,
maec=maec_from_pb2(pb.maec),
)
) # type: ignore
# Mypy is unable to recognise `attack` and `is_subscope_rule` as arguments due to alias
def doc_from_pb2(doc: capa_pb2.ResultDocument) -> rd.ResultDocument:

View File

@@ -6,7 +6,8 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import datetime
from typing import Any, Dict, Tuple, Union, Optional
import collections
from typing import Any, Dict, List, Tuple, Union, Optional
from pydantic import Field, BaseModel
@@ -125,6 +126,41 @@ class Metadata(FrozenModel):
),
)
def to_capa(self) -> Dict[str, Any]:
capa_meta = {
"timestamp": self.timestamp.isoformat(),
"version": self.version,
"sample": {
"md5": self.sample.md5,
"sha1": self.sample.sha1,
"sha256": self.sample.sha256,
"path": self.sample.path,
},
"analysis": {
"format": self.analysis.format,
"arch": self.analysis.arch,
"os": self.analysis.os,
"extractor": self.analysis.extractor,
"rules": self.analysis.rules,
"base_address": self.analysis.base_address.to_capa(),
"layout": {
"functions": {
f.address.to_capa(): {
"matched_basic_blocks": [bb.address.to_capa() for bb in f.matched_basic_blocks]
}
for f in self.analysis.layout.functions
}
},
"feature_counts": {
"file": self.analysis.feature_counts.file,
"functions": {fc.address.to_capa(): fc.count for fc in self.analysis.feature_counts.functions},
},
"library_functions": {lf.address.to_capa(): lf.name for lf in self.analysis.library_functions},
},
}
return capa_meta
class CompoundStatementType:
AND = "and"
@@ -227,6 +263,54 @@ def node_from_capa(node: Union[capa.engine.Statement, capa.engine.Feature]) -> N
assert_never(node)
def node_to_capa(
node: Node, children: List[Union[capa.engine.Statement, capa.engine.Feature]]
) -> Union[capa.engine.Statement, capa.engine.Feature]:
if isinstance(node, StatementNode):
if isinstance(node.statement, CompoundStatement):
if node.statement.type == CompoundStatementType.AND:
return capa.engine.And(description=node.statement.description, children=children)
elif node.statement.type == CompoundStatementType.OR:
return capa.engine.Or(description=node.statement.description, children=children)
elif node.statement.type == CompoundStatementType.NOT:
return capa.engine.Not(description=node.statement.description, child=children[0])
elif node.statement.type == CompoundStatementType.OPTIONAL:
return capa.engine.Some(description=node.statement.description, count=0, children=children)
else:
assert_never(node.statement.type)
elif isinstance(node.statement, SomeStatement):
return capa.engine.Some(
description=node.statement.description, count=node.statement.count, children=children
)
elif isinstance(node.statement, RangeStatement):
return capa.engine.Range(
description=node.statement.description,
min=node.statement.min,
max=node.statement.max,
child=node.statement.child.to_capa(),
)
elif isinstance(node.statement, SubscopeStatement):
return capa.engine.Subscope(
description=node.statement.description, scope=node.statement.scope, child=children[0]
)
else:
assert_never(node.statement)
elif isinstance(node, FeatureNode):
return node.feature.to_capa()
else:
assert_never(node)
class Match(FrozenModel):
"""
args:
@@ -359,6 +443,39 @@ class Match(FrozenModel):
captures={capture: tuple(captures[capture]) for capture in captures},
)
def to_capa(self, rules_by_name: Dict[str, capa.rules.Rule]) -> capa.engine.Result:
children = [child.to_capa(rules_by_name) for child in self.children]
statement = node_to_capa(self.node, [child.statement for child in children])
if isinstance(self.node, FeatureNode):
feature = self.node.feature
if isinstance(feature, (frzf.SubstringFeature, frzf.RegexFeature)):
matches = {capture: {loc.to_capa() for loc in locs} for capture, locs in self.captures.items()}
if isinstance(feature, frzf.SubstringFeature):
assert isinstance(statement, capa.features.common.Substring)
statement = capa.features.common._MatchedSubstring(statement, matches)
elif isinstance(feature, frzf.RegexFeature):
assert isinstance(statement, capa.features.common.Regex)
statement = capa.features.common._MatchedRegex(statement, matches)
else:
assert_never(feature)
# apparently we don't have to fixup match and subscope entries here.
# at least, default, verbose, and vverbose renderers seem to work well without any special handling here.
#
# children contains a single tree of results, corresponding to the logic of the matched rule.
# self.node.feature.match contains the name of the rule that was matched.
# so its all available to reconstruct, if necessary.
return capa.features.common.Result(
success=self.success,
statement=statement,
locations={loc.to_capa() for loc in self.locations},
children=children,
)
def parse_parts_id(s: str):
id_ = ""
@@ -543,3 +660,22 @@ class ResultDocument(FrozenModel):
)
return ResultDocument(meta=Metadata.from_capa(meta), rules=rule_matches)
def to_capa(self) -> Tuple[Dict, Dict]:
meta = self.meta.to_capa()
capabilities: Dict[
str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]
] = collections.defaultdict(list)
# this doesn't quite work because we don't have the rule source for rules that aren't matched.
rules_by_name = {
rule_name: capa.rules.Rule.from_yaml(rule_match.source) for rule_name, rule_match in self.rules.items()
}
for rule_name, rule_match in self.rules.items():
for addr, match in rule_match.matches:
result: capa.engine.Result = match.to_capa(rules_by_name)
capabilities[rule_name].append((addr.to_capa(), result))
return meta, capabilities

View File

@@ -16,7 +16,7 @@ import capa.render.result_document as rd
def bold(s: str) -> str:
"""draw attention to the given string"""
return termcolor.colored(s, "blue")
return termcolor.colored(s, "cyan")
def bold2(s: str) -> str:

2
rules

Submodule rules updated: 4ca8023363...a10ccf3fd8

View File

@@ -27,8 +27,8 @@ requirements = [
"pyelftools==0.29",
"dnfile==0.13.0",
"dncil==1.0.2",
"pydantic==1.10.6",
"protobuf==4.21.12",
"pydantic==1.10.7",
"protobuf==4.22.1",
]
# this sets __version__
@@ -71,7 +71,7 @@ setuptools.setup(
"dev": [
"pytest==7.1.3",
"pytest-sugar==0.9.4",
"pytest-instafail==0.4.2",
"pytest-instafail==0.5.0",
"pytest-cov==4.0.0",
"pycodestyle==2.10.0",
"ruff==0.0.259",
@@ -90,7 +90,7 @@ setuptools.setup(
"types-termcolor==1.1.4",
"types-psutil==5.8.23",
"types_requests==2.28.1",
"types-protobuf==4.21.0.5",
"types-protobuf==4.22.0.1",
],
"build": [
"pyinstaller==5.9.0",

View File

@@ -241,6 +241,8 @@ def get_data_path_by_name(name):
return os.path.join(CD, "data", "kernel32-64.dll_")
elif name == "pma01-01":
return os.path.join(CD, "data", "Practical Malware Analysis Lab 01-01.dll_")
elif name == "pma01-01-rd":
return os.path.join(CD, "data", "rd", "Practical Malware Analysis Lab 01-01.dll_.json")
elif name == "pma12-04":
return os.path.join(CD, "data", "Practical Malware Analysis Lab 12-04.exe_")
elif name == "pma16-01":
@@ -307,6 +309,8 @@ def get_data_path_by_name(name):
return os.path.join(CD, "data", "bf7a9c8bdfa6d47e01ad2b056264acc3fd90cf43fe0ed8deec93ab46b47d76cb.elf_")
elif name.startswith("294b8d"):
return os.path.join(CD, "data", "294b8db1f2702b60fb2e42fdc50c2cee6a5046112da9a5703a548a4fa50477bc.elf_")
elif name.startswith("2bf18d"):
return os.path.join(CD, "data", "2bf18d0403677378adad9001b1243211.elf_")
else:
raise ValueError(f"unexpected sample fixture: {name}")
@@ -365,6 +369,8 @@ def get_sample_md5_by_name(name):
elif name.startswith("294b8d"):
# file name is SHA256 hash
return "3db3e55b16a7b1b1afb970d5e77c5d98"
elif name.startswith("2bf18d"):
return "2bf18d0403677378adad9001b1243211"
else:
raise ValueError(f"unexpected sample fixture: {name}")

View File

@@ -467,3 +467,12 @@ def test_main_dotnet4(_039a6_dotnetfile_extractor):
# tests successful execution and one rendering
path = _039a6_dotnetfile_extractor.path
assert capa.main.main([path, "-vv"]) == 0
def test_main_rd():
path = fixtures.get_data_path_by_name("pma01-01-rd")
assert capa.main.main([path, "-vv"]) == 0
assert capa.main.main([path, "-v"]) == 0
assert capa.main.main([path, "-j"]) == 0
assert capa.main.main([path, "-q"]) == 0
assert capa.main.main([path]) == 0

View File

@@ -20,6 +20,7 @@ def test_elf_sh_notes():
# guess: sh notes: OS.LINUX
# guess: linker: None
# guess: ABI versions needed: None
# guess: symtab: None
# guess: needed dependencies: None
path = get_data_path_by_name("2f7f5f")
with open(path, "rb") as f:
@@ -32,6 +33,7 @@ def test_elf_pt_notes():
# guess: sh notes: OS.LINUX
# guess: linker: OS.LINUX
# guess: ABI versions needed: OS.LINUX
# guess: symtab: None
# guess: needed dependencies: None
path = get_data_path_by_name("7351f.elf")
with open(path, "rb") as f:
@@ -44,6 +46,7 @@ def test_elf_so_needed():
# guess: sh notes: OS.HURD
# guess: linker: None
# guess: ABI versions needed: OS.HURD
# guess: symtab: None
# guess: needed dependencies: OS.HURD
path = get_data_path_by_name("b5f052")
with open(path, "rb") as f:
@@ -56,7 +59,21 @@ def test_elf_abi_version_hurd():
# guess: sh notes: OS.HURD
# guess: linker: None
# guess: ABI versions needed: OS.HURD
# guess: symtab: None
# guess: needed dependencies: None
path = get_data_path_by_name("bf7a9c")
with open(path, "rb") as f:
assert capa.features.extractors.elf.detect_elf_os(f) == "hurd"
def test_elf_symbol_table():
# guess: osabi: None
# guess: ph notes: None
# guess: sh notes: None
# guess: linker: None
# guess: ABI versions needed: None
# guess: symtab: OS.LINUX
# guess: needed dependencies: None
path = get_data_path_by_name("2bf18d")
with open(path, "rb") as f:
assert capa.features.extractors.elf.detect_elf_os(f) == "linux"

View File

@@ -8,6 +8,7 @@
import copy
import pytest
import fixtures
from fixtures import *
import capa
@@ -268,3 +269,18 @@ def assert_round_trip(rd: rdoc.ResultDocument):
def test_round_trip(request, rd_file):
rd: rdoc.ResultDocument = request.getfixturevalue(rd_file)
assert_round_trip(rd)
def test_json_to_rdoc():
path = fixtures.get_data_path_by_name("pma01-01-rd")
assert isinstance(rdoc.ResultDocument.parse_file(path), rdoc.ResultDocument)
def test_rdoc_to_capa():
path = fixtures.get_data_path_by_name("pma01-01-rd")
rd = rdoc.ResultDocument.parse_file(path)
meta, capabilites = rd.to_capa()
assert isinstance(meta, dict)
assert isinstance(capabilites, dict)

View File

@@ -81,4 +81,4 @@ def test_proto_conversion(tmpdir):
p = run_program(get_script_path("proto-to-results.py"), [pb])
assert p.returncode == 0
assert p.stdout.startswith(b'{\n "meta": ')
assert p.stdout.startswith(b'{\n "meta": ') or p.stdout.startswith(b'{\r\n "meta": ')