mirror of
https://github.com/mandiant/capa.git
synced 2025-12-22 07:10:29 -08:00
3
.github/mypy/mypy.ini
vendored
3
.github/mypy/mypy.ini
vendored
@@ -21,9 +21,6 @@ ignore_missing_imports = True
|
||||
[mypy-flirt.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-smda.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-lief.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
|
||||
@@ -1,133 +0,0 @@
|
||||
import string
|
||||
import struct
|
||||
from typing import Tuple, Iterator
|
||||
|
||||
from capa.features.common import Feature, Characteristic
|
||||
from capa.features.address import Address
|
||||
from capa.features.basicblock import BasicBlock
|
||||
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
|
||||
from capa.features.extractors.base_extractor import BBHandle, FunctionHandle
|
||||
|
||||
|
||||
def _bb_has_tight_loop(f, bb):
|
||||
"""
|
||||
parse tight loops, true if last instruction in basic block branches to bb start
|
||||
"""
|
||||
return bb.offset in f.blockrefs[bb.offset] if bb.offset in f.blockrefs else False
|
||||
|
||||
|
||||
def extract_bb_tight_loop(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""check basic block for tight loop indicators"""
|
||||
if _bb_has_tight_loop(f.inner, bb.inner):
|
||||
yield Characteristic("tight loop"), bb.address
|
||||
|
||||
|
||||
def _bb_has_stackstring(f, bb):
|
||||
"""
|
||||
extract potential stackstring creation, using the following heuristics:
|
||||
- basic block contains enough moves of constant bytes to the stack
|
||||
"""
|
||||
count = 0
|
||||
for instr in bb.getInstructions():
|
||||
if is_mov_imm_to_stack(instr):
|
||||
count += get_printable_len(instr.getDetailed())
|
||||
if count > MIN_STACKSTRING_LEN:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_operands(smda_ins):
|
||||
return [o.strip() for o in smda_ins.operands.split(",")]
|
||||
|
||||
|
||||
def extract_stackstring(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""check basic block for stackstring indicators"""
|
||||
if _bb_has_stackstring(f.inner, bb.inner):
|
||||
yield Characteristic("stack string"), bb.address
|
||||
|
||||
|
||||
def is_mov_imm_to_stack(smda_ins):
|
||||
"""
|
||||
Return if instruction moves immediate onto stack
|
||||
"""
|
||||
if not smda_ins.mnemonic.startswith("mov"):
|
||||
return False
|
||||
|
||||
try:
|
||||
dst, src = get_operands(smda_ins)
|
||||
except ValueError:
|
||||
# not two operands
|
||||
return False
|
||||
|
||||
try:
|
||||
int(src, 16)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
if not any(regname in dst for regname in ["ebp", "rbp", "esp", "rsp"]):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def is_printable_ascii(chars):
|
||||
return all(c < 127 and chr(c) in string.printable for c in chars)
|
||||
|
||||
|
||||
def is_printable_utf16le(chars):
|
||||
if all(c == 0x00 for c in chars[1::2]):
|
||||
return is_printable_ascii(chars[::2])
|
||||
|
||||
|
||||
def get_printable_len(instr):
|
||||
"""
|
||||
Return string length if all operand bytes are ascii or utf16-le printable
|
||||
|
||||
Works on a capstone instruction
|
||||
"""
|
||||
# should have exactly two operands for mov immediate
|
||||
if len(instr.operands) != 2:
|
||||
return 0
|
||||
|
||||
op_value = instr.operands[1].value.imm
|
||||
|
||||
if instr.imm_size == 1:
|
||||
chars = struct.pack("<B", op_value & 0xFF)
|
||||
elif instr.imm_size == 2:
|
||||
chars = struct.pack("<H", op_value & 0xFFFF)
|
||||
elif instr.imm_size == 4:
|
||||
chars = struct.pack("<I", op_value & 0xFFFFFFFF)
|
||||
elif instr.imm_size == 8:
|
||||
chars = struct.pack("<Q", op_value & 0xFFFFFFFFFFFFFFFF)
|
||||
else:
|
||||
raise ValueError("Unhandled operand data type 0x%x." % instr.imm_size)
|
||||
|
||||
if is_printable_ascii(chars):
|
||||
return instr.imm_size
|
||||
if is_printable_utf16le(chars):
|
||||
return instr.imm_size // 2
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def extract_features(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
extract features from the given basic block.
|
||||
|
||||
args:
|
||||
f: the function from which to extract features
|
||||
bb: the basic block to process.
|
||||
|
||||
yields:
|
||||
Tuple[Feature, Address]: the features and their location found in this basic block.
|
||||
"""
|
||||
yield BasicBlock(), bb.address
|
||||
for bb_handler in BASIC_BLOCK_HANDLERS:
|
||||
for feature, addr in bb_handler(f, bb):
|
||||
yield feature, addr
|
||||
|
||||
|
||||
BASIC_BLOCK_HANDLERS = (
|
||||
extract_bb_tight_loop,
|
||||
extract_stackstring,
|
||||
)
|
||||
@@ -1,57 +0,0 @@
|
||||
from typing import List, Tuple
|
||||
|
||||
from smda.common.SmdaReport import SmdaReport
|
||||
|
||||
import capa.features.extractors.common
|
||||
import capa.features.extractors.smda.file
|
||||
import capa.features.extractors.smda.insn
|
||||
import capa.features.extractors.smda.global_
|
||||
import capa.features.extractors.smda.function
|
||||
import capa.features.extractors.smda.basicblock
|
||||
from capa.features.common import Feature
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
|
||||
|
||||
|
||||
class SmdaFeatureExtractor(FeatureExtractor):
|
||||
def __init__(self, smda_report: SmdaReport, path):
|
||||
super(SmdaFeatureExtractor, self).__init__()
|
||||
self.smda_report = smda_report
|
||||
self.path = path
|
||||
with open(self.path, "rb") as f:
|
||||
self.buf = f.read()
|
||||
|
||||
# pre-compute these because we'll yield them at *every* scope.
|
||||
self.global_features: List[Tuple[Feature, Address]] = []
|
||||
self.global_features.extend(capa.features.extractors.common.extract_os(self.buf))
|
||||
self.global_features.extend(capa.features.extractors.smda.global_.extract_arch(self.smda_report))
|
||||
|
||||
def get_base_address(self):
|
||||
return AbsoluteVirtualAddress(self.smda_report.base_addr)
|
||||
|
||||
def extract_global_features(self):
|
||||
yield from self.global_features
|
||||
|
||||
def extract_file_features(self):
|
||||
yield from capa.features.extractors.smda.file.extract_features(self.smda_report, self.buf)
|
||||
|
||||
def get_functions(self):
|
||||
for function in self.smda_report.getFunctions():
|
||||
yield FunctionHandle(address=AbsoluteVirtualAddress(function.offset), inner=function)
|
||||
|
||||
def extract_function_features(self, fh):
|
||||
yield from capa.features.extractors.smda.function.extract_features(fh)
|
||||
|
||||
def get_basic_blocks(self, fh):
|
||||
for bb in fh.inner.getBlocks():
|
||||
yield BBHandle(address=AbsoluteVirtualAddress(bb.offset), inner=bb)
|
||||
|
||||
def extract_basic_block_features(self, fh, bbh):
|
||||
yield from capa.features.extractors.smda.basicblock.extract_features(fh, bbh)
|
||||
|
||||
def get_instructions(self, fh, bbh):
|
||||
for smda_ins in bbh.inner.getInstructions():
|
||||
yield InsnHandle(address=AbsoluteVirtualAddress(smda_ins.offset), inner=smda_ins)
|
||||
|
||||
def extract_insn_features(self, fh, bbh, ih):
|
||||
yield from capa.features.extractors.smda.insn.extract_features(fh, bbh, ih)
|
||||
@@ -1,103 +0,0 @@
|
||||
# if we have SMDA we definitely have lief
|
||||
import lief
|
||||
|
||||
import capa.features.extractors.common
|
||||
import capa.features.extractors.helpers
|
||||
import capa.features.extractors.strings
|
||||
from capa.features.file import Export, Import, Section
|
||||
from capa.features.common import String, Characteristic
|
||||
from capa.features.address import FileOffsetAddress, AbsoluteVirtualAddress
|
||||
|
||||
|
||||
def extract_file_embedded_pe(buf, **kwargs):
|
||||
for offset, _ in capa.features.extractors.helpers.carve_pe(buf, 1):
|
||||
yield Characteristic("embedded pe"), FileOffsetAddress(offset)
|
||||
|
||||
|
||||
def extract_file_export_names(buf, **kwargs):
|
||||
lief_binary = lief.parse(buf)
|
||||
|
||||
if lief_binary is not None:
|
||||
for function in lief_binary.exported_functions:
|
||||
yield Export(function.name), AbsoluteVirtualAddress(function.address)
|
||||
|
||||
|
||||
def extract_file_import_names(smda_report, buf):
|
||||
# extract import table info via LIEF
|
||||
lief_binary = lief.parse(buf)
|
||||
if not isinstance(lief_binary, lief.PE.Binary):
|
||||
return
|
||||
for imported_library in lief_binary.imports:
|
||||
library_name = imported_library.name.lower()
|
||||
library_name = library_name[:-4] if library_name.endswith(".dll") else library_name
|
||||
for func in imported_library.entries:
|
||||
va = func.iat_address + smda_report.base_addr
|
||||
if func.name:
|
||||
for name in capa.features.extractors.helpers.generate_symbols(library_name, func.name):
|
||||
yield Import(name), AbsoluteVirtualAddress(va)
|
||||
elif func.is_ordinal:
|
||||
for name in capa.features.extractors.helpers.generate_symbols(library_name, "#%s" % func.ordinal):
|
||||
yield Import(name), AbsoluteVirtualAddress(va)
|
||||
|
||||
|
||||
def extract_file_section_names(buf, **kwargs):
|
||||
lief_binary = lief.parse(buf)
|
||||
if not isinstance(lief_binary, lief.PE.Binary):
|
||||
return
|
||||
if lief_binary and lief_binary.sections:
|
||||
base_address = lief_binary.optional_header.imagebase
|
||||
for section in lief_binary.sections:
|
||||
yield Section(section.name), AbsoluteVirtualAddress(base_address + section.virtual_address)
|
||||
|
||||
|
||||
def extract_file_strings(buf, **kwargs):
|
||||
"""
|
||||
extract ASCII and UTF-16 LE strings from file
|
||||
"""
|
||||
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
|
||||
yield String(s.s), FileOffsetAddress(s.offset)
|
||||
|
||||
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
|
||||
yield String(s.s), FileOffsetAddress(s.offset)
|
||||
|
||||
|
||||
def extract_file_function_names(smda_report, **kwargs):
|
||||
"""
|
||||
extract the names of statically-linked library functions.
|
||||
"""
|
||||
if False:
|
||||
# using a `yield` here to force this to be a generator, not function.
|
||||
yield NotImplementedError("SMDA doesn't have library matching")
|
||||
return
|
||||
|
||||
|
||||
def extract_file_format(buf, **kwargs):
|
||||
yield from capa.features.extractors.common.extract_format(buf)
|
||||
|
||||
|
||||
def extract_features(smda_report, buf):
|
||||
"""
|
||||
extract file features from given workspace
|
||||
|
||||
args:
|
||||
smda_report (smda.common.SmdaReport): a SmdaReport
|
||||
buf: the raw bytes of the sample
|
||||
|
||||
yields:
|
||||
Tuple[Feature, VA]: a feature and its location.
|
||||
"""
|
||||
|
||||
for file_handler in FILE_HANDLERS:
|
||||
for feature, addr in file_handler(smda_report=smda_report, buf=buf):
|
||||
yield feature, addr
|
||||
|
||||
|
||||
FILE_HANDLERS = (
|
||||
extract_file_embedded_pe,
|
||||
extract_file_export_names,
|
||||
extract_file_import_names,
|
||||
extract_file_section_names,
|
||||
extract_file_strings,
|
||||
extract_file_function_names,
|
||||
extract_file_format,
|
||||
)
|
||||
@@ -1,42 +0,0 @@
|
||||
from typing import Tuple, Iterator
|
||||
|
||||
from capa.features.common import Feature, Characteristic
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress
|
||||
from capa.features.extractors import loops
|
||||
from capa.features.extractors.base_extractor import FunctionHandle
|
||||
|
||||
|
||||
def extract_function_calls_to(f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
for inref in f.inner.inrefs:
|
||||
yield Characteristic("calls to"), AbsoluteVirtualAddress(inref)
|
||||
|
||||
|
||||
def extract_function_loop(f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
parse if a function has a loop
|
||||
"""
|
||||
edges = []
|
||||
for bb_from, bb_tos in f.inner.blockrefs.items():
|
||||
for bb_to in bb_tos:
|
||||
edges.append((bb_from, bb_to))
|
||||
|
||||
if edges and loops.has_loop(edges):
|
||||
yield Characteristic("loop"), f.address
|
||||
|
||||
|
||||
def extract_features(f: FunctionHandle):
|
||||
"""
|
||||
extract features from the given function.
|
||||
|
||||
args:
|
||||
f: the function from which to extract features
|
||||
|
||||
yields:
|
||||
Tuple[Feature, Address]: the features and their location found in this function.
|
||||
"""
|
||||
for func_handler in FUNCTION_HANDLERS:
|
||||
for feature, addr in func_handler(f):
|
||||
yield feature, addr
|
||||
|
||||
|
||||
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)
|
||||
@@ -1,21 +0,0 @@
|
||||
import logging
|
||||
|
||||
from capa.features.common import ARCH_I386, ARCH_AMD64, Arch
|
||||
from capa.features.address import NO_ADDRESS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def extract_arch(smda_report):
|
||||
if smda_report.architecture == "intel":
|
||||
if smda_report.bitness == 32:
|
||||
yield Arch(ARCH_I386), NO_ADDRESS
|
||||
elif smda_report.bitness == 64:
|
||||
yield Arch(ARCH_AMD64), NO_ADDRESS
|
||||
else:
|
||||
# we likely end up here:
|
||||
# 1. handling a new architecture (e.g. aarch64)
|
||||
#
|
||||
# for (1), this logic will need to be updated as the format is implemented.
|
||||
logger.debug("unsupported architecture: %s", smda_report.architecture)
|
||||
return
|
||||
@@ -1,455 +0,0 @@
|
||||
import re
|
||||
import string
|
||||
import struct
|
||||
from typing import Tuple, Iterator
|
||||
|
||||
import smda
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset
|
||||
from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Feature, Characteristic
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
|
||||
|
||||
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
|
||||
# byte range within the first and returning basic blocks, this helps to reduce FP features
|
||||
SECURITY_COOKIE_BYTES_DELTA = 0x40
|
||||
PATTERN_HEXNUM = re.compile(r"[+\-] (?P<num>0x[a-fA-F0-9]+)")
|
||||
PATTERN_SINGLENUM = re.compile(r"[+\-] (?P<num>[0-9])")
|
||||
|
||||
|
||||
def extract_insn_api_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""parse API features from the given instruction."""
|
||||
f: smda.Function = fh.inner
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
if ih.address in f.apirefs:
|
||||
api_entry = f.apirefs[ih.address]
|
||||
# reformat
|
||||
dll_name, api_name = api_entry.split("!")
|
||||
dll_name = dll_name.split(".")[0]
|
||||
dll_name = dll_name.lower()
|
||||
for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name):
|
||||
yield API(name), ih.address
|
||||
elif ih.address in f.outrefs:
|
||||
current_function = f
|
||||
current_instruction = insn
|
||||
for index in range(THUNK_CHAIN_DEPTH_DELTA):
|
||||
if current_function and len(current_function.outrefs[current_instruction.offset]) == 1:
|
||||
target = current_function.outrefs[current_instruction.offset][0]
|
||||
referenced_function = current_function.smda_report.getFunction(target)
|
||||
if referenced_function:
|
||||
# TODO SMDA: implement this function for both jmp and call, checking if function has 1 instruction which refs an API
|
||||
if referenced_function.isApiThunk():
|
||||
api_entry = (
|
||||
referenced_function.apirefs[target] if target in referenced_function.apirefs else None
|
||||
)
|
||||
if api_entry:
|
||||
# reformat
|
||||
dll_name, api_name = api_entry.split("!")
|
||||
dll_name = dll_name.split(".")[0]
|
||||
dll_name = dll_name.lower()
|
||||
for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name):
|
||||
yield API(name), ih.address
|
||||
elif referenced_function.num_instructions == 1 and referenced_function.num_outrefs == 1:
|
||||
current_function = referenced_function
|
||||
current_instruction = [i for i in referenced_function.getInstructions()][0]
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
def extract_insn_number_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""parse number features from the given instruction."""
|
||||
# example:
|
||||
#
|
||||
# push 3136B0h ; dwControlCode
|
||||
f: smda.Function = fh.inner
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
if insn.mnemonic == "add" and operands[0] in ["esp", "rsp"]:
|
||||
# skip things like:
|
||||
#
|
||||
# .text:00401140 call sub_407E2B
|
||||
# .text:00401145 add esp, 0Ch
|
||||
return
|
||||
for i, operand in enumerate(operands):
|
||||
try:
|
||||
# The result of bitwise operations is calculated as though carried out
|
||||
# in two’s complement with an infinite number of sign bits
|
||||
value = int(operand, 16) & ((1 << f.smda_report.bitness) - 1)
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
yield Number(value), ih.address
|
||||
yield OperandNumber(i, value), ih.address
|
||||
|
||||
if insn.mnemonic == "add" and 0 < value < MAX_STRUCTURE_SIZE:
|
||||
# for pattern like:
|
||||
#
|
||||
# add eax, 0x10
|
||||
#
|
||||
# assume 0x10 is also an offset (imagine eax is a pointer).
|
||||
yield Offset(value), ih.address
|
||||
yield OperandOffset(i, value), ih.address
|
||||
|
||||
|
||||
def read_bytes(smda_report, va, num_bytes=None):
|
||||
"""
|
||||
read up to MAX_BYTES_FEATURE_SIZE from the given address.
|
||||
"""
|
||||
|
||||
rva = va - smda_report.base_addr
|
||||
if smda_report.buffer is None:
|
||||
raise ValueError("buffer is empty")
|
||||
buffer_end = len(smda_report.buffer)
|
||||
max_bytes = num_bytes if num_bytes is not None else MAX_BYTES_FEATURE_SIZE
|
||||
if rva + max_bytes > buffer_end:
|
||||
return smda_report.buffer[rva:]
|
||||
else:
|
||||
return smda_report.buffer[rva : rva + max_bytes]
|
||||
|
||||
|
||||
def derefs(smda_report, p):
|
||||
"""
|
||||
recursively follow the given pointer, yielding the valid memory addresses along the way.
|
||||
useful when you may have a pointer to string, or pointer to pointer to string, etc.
|
||||
|
||||
this is a "do what i mean" type of helper function.
|
||||
|
||||
based on the implementation in viv/insn.py
|
||||
"""
|
||||
depth = 0
|
||||
while True:
|
||||
if not smda_report.isAddrWithinMemoryImage(p):
|
||||
return
|
||||
yield p
|
||||
|
||||
bytes_ = read_bytes(smda_report, p, num_bytes=4)
|
||||
val = struct.unpack("I", bytes_)[0]
|
||||
|
||||
# sanity: pointer points to self
|
||||
if val == p:
|
||||
return
|
||||
|
||||
# sanity: avoid chains of pointers that are unreasonably deep
|
||||
depth += 1
|
||||
if depth > 10:
|
||||
return
|
||||
|
||||
p = val
|
||||
|
||||
|
||||
def extract_insn_bytes_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
parse byte sequence features from the given instruction.
|
||||
example:
|
||||
# push offset iid_004118d4_IShellLinkA ; riid
|
||||
"""
|
||||
f: smda.Function = fh.inner
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
for data_ref in insn.getDataRefs():
|
||||
for v in derefs(f.smda_report, data_ref):
|
||||
bytes_read = read_bytes(f.smda_report, v)
|
||||
if bytes_read is None:
|
||||
continue
|
||||
if capa.features.extractors.helpers.all_zeros(bytes_read):
|
||||
continue
|
||||
|
||||
yield Bytes(bytes_read), ih.address
|
||||
|
||||
|
||||
def detect_ascii_len(smda_report, offset):
|
||||
if smda_report.buffer is None:
|
||||
return 0
|
||||
ascii_len = 0
|
||||
rva = offset - smda_report.base_addr
|
||||
char = smda_report.buffer[rva]
|
||||
while char < 127 and chr(char) in string.printable:
|
||||
ascii_len += 1
|
||||
rva += 1
|
||||
char = smda_report.buffer[rva]
|
||||
if char == 0:
|
||||
return ascii_len
|
||||
return 0
|
||||
|
||||
|
||||
def detect_unicode_len(smda_report, offset):
|
||||
if smda_report.buffer is None:
|
||||
return 0
|
||||
unicode_len = 0
|
||||
rva = offset - smda_report.base_addr
|
||||
char = smda_report.buffer[rva]
|
||||
second_char = smda_report.buffer[rva + 1]
|
||||
while char < 127 and chr(char) in string.printable and second_char == 0:
|
||||
unicode_len += 2
|
||||
rva += 2
|
||||
char = smda_report.buffer[rva]
|
||||
second_char = smda_report.buffer[rva + 1]
|
||||
if char == 0 and second_char == 0:
|
||||
return unicode_len
|
||||
return 0
|
||||
|
||||
|
||||
def read_string(smda_report, offset):
|
||||
alen = detect_ascii_len(smda_report, offset)
|
||||
if alen > 1:
|
||||
return read_bytes(smda_report, offset, alen).decode("utf-8")
|
||||
ulen = detect_unicode_len(smda_report, offset)
|
||||
if ulen > 2:
|
||||
return read_bytes(smda_report, offset, ulen).decode("utf-16")
|
||||
|
||||
|
||||
def extract_insn_string_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""parse string features from the given instruction."""
|
||||
# example:
|
||||
#
|
||||
# push offset aAcr ; "ACR > "
|
||||
f: smda.Function = fh.inner
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
for data_ref in insn.getDataRefs():
|
||||
for v in derefs(f.smda_report, data_ref):
|
||||
string_read = read_string(f.smda_report, v)
|
||||
if string_read:
|
||||
yield String(string_read.rstrip("\x00")), ih.address
|
||||
|
||||
|
||||
def extract_insn_offset_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""parse structure offset features from the given instruction."""
|
||||
# examples:
|
||||
#
|
||||
# mov eax, [esi + 4]
|
||||
# mov eax, [esi + ecx + 16384]
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
for i, operand in enumerate(operands):
|
||||
if "esp" in operand or "ebp" in operand or "rbp" in operand:
|
||||
continue
|
||||
|
||||
number = 0
|
||||
number_hex = re.search(PATTERN_HEXNUM, operand)
|
||||
number_int = re.search(PATTERN_SINGLENUM, operand)
|
||||
if number_hex:
|
||||
number = int(number_hex.group("num"), 16)
|
||||
number = -1 * number if number_hex.group().startswith("-") else number
|
||||
elif number_int:
|
||||
number = int(number_int.group("num"))
|
||||
number = -1 * number if number_int.group().startswith("-") else number
|
||||
|
||||
if "ptr" not in operand:
|
||||
if (
|
||||
insn.mnemonic == "lea"
|
||||
and i == 1
|
||||
and (operand.count("+") + operand.count("-")) == 1
|
||||
and operand.count("*") == 0
|
||||
):
|
||||
# for pattern like:
|
||||
#
|
||||
# lea eax, [ebx + 1]
|
||||
#
|
||||
# assume 1 is also an offset (imagine ebx is a zero register).
|
||||
yield Number(number), ih.address
|
||||
yield OperandNumber(i, number), ih.address
|
||||
|
||||
continue
|
||||
|
||||
yield Offset(number), ih.address
|
||||
yield OperandOffset(i, number), ih.address
|
||||
|
||||
|
||||
def is_security_cookie(f, bb, insn):
|
||||
"""
|
||||
check if an instruction is related to security cookie checks
|
||||
"""
|
||||
# security cookie check should use SP or BP
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
if operands[1] not in ["esp", "ebp", "rsp", "rbp"]:
|
||||
return False
|
||||
for index, block in enumerate(f.getBlocks()):
|
||||
# expect security cookie init in first basic block within first bytes (instructions)
|
||||
block_instructions = [i for i in block.getInstructions()]
|
||||
if index == 0 and insn.offset < (block_instructions[0].offset + SECURITY_COOKIE_BYTES_DELTA):
|
||||
return True
|
||||
# ... or within last bytes (instructions) before a return
|
||||
if block_instructions[-1].mnemonic.startswith("ret") and insn.offset > (
|
||||
block_instructions[-1].offset - SECURITY_COOKIE_BYTES_DELTA
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def extract_insn_nzxor_characteristic_features(
|
||||
fh: FunctionHandle, bh: BBHandle, ih: InsnHandle
|
||||
) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
parse non-zeroing XOR instruction from the given instruction.
|
||||
ignore expected non-zeroing XORs, e.g. security cookies.
|
||||
"""
|
||||
f: smda.Function = fh.inner
|
||||
bb: smda.BasicBlock = bh.inner
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
if insn.mnemonic not in ("xor", "xorpd", "xorps", "pxor"):
|
||||
return
|
||||
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
if operands[0] == operands[1]:
|
||||
return
|
||||
|
||||
if is_security_cookie(f, bb, insn):
|
||||
return
|
||||
|
||||
yield Characteristic("nzxor"), ih.address
|
||||
|
||||
|
||||
def extract_insn_mnemonic_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""parse mnemonic features from the given instruction."""
|
||||
yield Mnemonic(ih.inner.mnemonic), ih.address
|
||||
|
||||
|
||||
def extract_insn_obfs_call_plus_5_characteristic_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
parse call $+5 instruction from the given instruction.
|
||||
"""
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
if insn.mnemonic != "call":
|
||||
return
|
||||
|
||||
if not insn.operands.startswith("0x"):
|
||||
return
|
||||
|
||||
if int(insn.operands, 16) == insn.offset + 5:
|
||||
yield Characteristic("call $+5"), ih.address
|
||||
|
||||
|
||||
def extract_insn_peb_access_characteristic_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
|
||||
"""
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
if insn.mnemonic not in ["push", "mov"]:
|
||||
return
|
||||
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
for operand in operands:
|
||||
if "fs:" in operand and "0x30" in operand:
|
||||
yield Characteristic("peb access"), ih.address
|
||||
elif "gs:" in operand and "0x60" in operand:
|
||||
yield Characteristic("peb access"), ih.address
|
||||
|
||||
|
||||
def extract_insn_segment_access_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""parse the instruction for access to fs or gs"""
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
operands = [o.strip() for o in insn.operands.split(",")]
|
||||
for operand in operands:
|
||||
if "fs:" in operand:
|
||||
yield Characteristic("fs access"), ih.address
|
||||
elif "gs:" in operand:
|
||||
yield Characteristic("gs access"), ih.address
|
||||
|
||||
|
||||
def extract_insn_cross_section_cflow(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
inspect the instruction for a CALL or JMP that crosses section boundaries.
|
||||
"""
|
||||
f: smda.Function = fh.inner
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
if insn.mnemonic in ["call", "jmp"]:
|
||||
if ih.address in f.apirefs:
|
||||
return
|
||||
|
||||
smda_report = insn.smda_function.smda_report
|
||||
if ih.address in f.outrefs:
|
||||
for target in f.outrefs[ih.address]:
|
||||
if smda_report.getSection(ih.address) != smda_report.getSection(target):
|
||||
yield Characteristic("cross section flow"), ih.address
|
||||
elif insn.operands.startswith("0x"):
|
||||
target = int(insn.operands, 16)
|
||||
if smda_report.getSection(ih.address) != smda_report.getSection(target):
|
||||
yield Characteristic("cross section flow"), ih.address
|
||||
|
||||
|
||||
# this is a feature that's most relevant at the function scope,
|
||||
# however, its most efficient to extract at the instruction scope.
|
||||
def extract_function_calls_from(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
f: smda.Function = fh.inner
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
if insn.mnemonic != "call":
|
||||
return
|
||||
|
||||
if ih.address in f.outrefs:
|
||||
for outref in f.outrefs[ih.address]:
|
||||
yield Characteristic("calls from"), AbsoluteVirtualAddress(outref)
|
||||
|
||||
if outref == f.offset:
|
||||
# if we found a jump target and it's the function address
|
||||
# mark as recursive
|
||||
yield Characteristic("recursive call"), AbsoluteVirtualAddress(outref)
|
||||
if ih.address in f.apirefs:
|
||||
yield Characteristic("calls from"), ih.address
|
||||
|
||||
|
||||
# this is a feature that's most relevant at the function or basic block scope,
|
||||
# however, its most efficient to extract at the instruction scope.
|
||||
def extract_function_indirect_call_characteristic_features(f, bb, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
|
||||
"""
|
||||
extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4])
|
||||
does not include calls like => call ds:dword_ABD4974
|
||||
"""
|
||||
insn: smda.Insn = ih.inner
|
||||
|
||||
if insn.mnemonic != "call":
|
||||
return
|
||||
if insn.operands.startswith("0x"):
|
||||
return False
|
||||
if "qword ptr" in insn.operands and "rip" in insn.operands:
|
||||
return False
|
||||
if insn.operands.startswith("dword ptr [0x"):
|
||||
return False
|
||||
# call edx
|
||||
# call dword ptr [eax+50h]
|
||||
# call qword ptr [rsp+78h]
|
||||
yield Characteristic("indirect call"), ih.address
|
||||
|
||||
|
||||
def extract_features(f, bb, insn):
|
||||
"""
|
||||
extract features from the given insn.
|
||||
|
||||
args:
|
||||
f: the function to process.
|
||||
bb: the basic block to process.
|
||||
insn: the instruction to process.
|
||||
|
||||
yields:
|
||||
Tuple[Feature, Address]: the features and their location found in this insn.
|
||||
"""
|
||||
for insn_handler in INSTRUCTION_HANDLERS:
|
||||
for feature, addr in insn_handler(f, bb, insn):
|
||||
yield feature, addr
|
||||
|
||||
|
||||
INSTRUCTION_HANDLERS = (
|
||||
extract_insn_api_features,
|
||||
extract_insn_number_features,
|
||||
extract_insn_string_features,
|
||||
extract_insn_bytes_features,
|
||||
extract_insn_offset_features,
|
||||
extract_insn_nzxor_characteristic_features,
|
||||
extract_insn_mnemonic_features,
|
||||
extract_insn_obfs_call_plus_5_characteristic_features,
|
||||
extract_insn_peb_access_characteristic_features,
|
||||
extract_insn_cross_section_cflow,
|
||||
extract_insn_segment_access_features,
|
||||
extract_function_calls_from,
|
||||
extract_function_indirect_call_characteristic_features,
|
||||
)
|
||||
24
capa/main.py
24
capa/main.py
@@ -17,7 +17,6 @@ import os.path
|
||||
import argparse
|
||||
import datetime
|
||||
import textwrap
|
||||
import warnings
|
||||
import itertools
|
||||
import contextlib
|
||||
import collections
|
||||
@@ -72,7 +71,6 @@ from capa.features.extractors.base_extractor import BBHandle, InsnHandle, Functi
|
||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
|
||||
BACKEND_VIV = "vivisect"
|
||||
BACKEND_SMDA = "smda"
|
||||
BACKEND_DOTNET = "dotnet"
|
||||
|
||||
E_MISSING_RULES = -10
|
||||
@@ -513,23 +511,7 @@ def get_extractor(
|
||||
|
||||
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path)
|
||||
|
||||
if backend == "smda":
|
||||
from smda.SmdaConfig import SmdaConfig
|
||||
from smda.Disassembler import Disassembler
|
||||
|
||||
import capa.features.extractors.smda.extractor
|
||||
|
||||
logger.warning("Deprecation warning: v4.0 will be the last capa version to support the SMDA backend.")
|
||||
warnings.warn("v4.0 will be the last capa version to support the SMDA backend.", DeprecationWarning)
|
||||
smda_report = None
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
config = SmdaConfig()
|
||||
config.STORE_BUFFER = True
|
||||
smda_disasm = Disassembler(config)
|
||||
smda_report = smda_disasm.disassembleFile(path)
|
||||
|
||||
return capa.features.extractors.smda.extractor.SmdaFeatureExtractor(smda_report, path)
|
||||
else:
|
||||
if backend == BACKEND_VIV:
|
||||
import capa.features.extractors.viv.extractor
|
||||
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
@@ -547,6 +529,8 @@ def get_extractor(
|
||||
|
||||
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)
|
||||
|
||||
raise ValueError("unexpected extractor specification: format=%s backend=%s", format_, backend)
|
||||
|
||||
|
||||
def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]:
|
||||
file_extractors: List[FeatureExtractor] = list()
|
||||
@@ -830,7 +814,7 @@ def install_common_args(parser, wanted=None):
|
||||
"--backend",
|
||||
type=str,
|
||||
help="select the backend to use",
|
||||
choices=(BACKEND_VIV, BACKEND_SMDA),
|
||||
choices=(BACKEND_VIV,),
|
||||
default=BACKEND_VIV,
|
||||
)
|
||||
|
||||
|
||||
1
setup.py
1
setup.py
@@ -23,7 +23,6 @@ requirements = [
|
||||
"networkx==2.5.1",
|
||||
"ruamel.yaml==0.17.21",
|
||||
"vivisect==1.0.8",
|
||||
"smda==1.8.4",
|
||||
"pefile==2022.5.30",
|
||||
"pyelftools==0.29",
|
||||
"dnfile==0.12.0",
|
||||
|
||||
@@ -120,21 +120,6 @@ def fixup_viv(path, extractor):
|
||||
extractor.vw.makeFunction(0x10006860)
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_smda_extractor(path):
|
||||
from smda.SmdaConfig import SmdaConfig
|
||||
from smda.Disassembler import Disassembler
|
||||
|
||||
import capa.features.extractors.smda.extractor
|
||||
|
||||
config = SmdaConfig()
|
||||
config.STORE_BUFFER = True
|
||||
disasm = Disassembler(config)
|
||||
report = disasm.disassembleFile(path)
|
||||
|
||||
return capa.features.extractors.smda.extractor.SmdaFeatureExtractor(report, path)
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_pefile_extractor(path):
|
||||
import capa.features.extractors.pefile
|
||||
|
||||
@@ -434,12 +434,6 @@ def test_backend_option(capsys):
|
||||
assert std_json["meta"]["analysis"]["extractor"] == "VivisectFeatureExtractor"
|
||||
assert len(std_json["rules"]) > 0
|
||||
|
||||
assert capa.main.main([path, "-j", "-b", capa.main.BACKEND_SMDA]) == 0
|
||||
std = capsys.readouterr()
|
||||
std_json = json.loads(std.out)
|
||||
assert std_json["meta"]["analysis"]["extractor"] == "SmdaFeatureExtractor"
|
||||
assert len(std_json["rules"]) > 0
|
||||
|
||||
|
||||
def test_json_meta(capsys):
|
||||
path = fixtures.get_data_path_by_name("pma01-01")
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import pytest
|
||||
import fixtures
|
||||
from fixtures import *
|
||||
from fixtures import parametrize
|
||||
|
||||
import capa.features.file
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
fixtures.FEATURE_PRESENCE_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_smda_features(sample, scope, feature, expected):
|
||||
if scope.__name__ == "file" and isinstance(feature, capa.features.file.FunctionName) and expected is True:
|
||||
pytest.xfail("SMDA has no function ID")
|
||||
|
||||
fixtures.do_test_feature_presence(fixtures.get_smda_extractor, sample, scope, feature, expected)
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
fixtures.FEATURE_COUNT_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_smda_feature_counts(sample, scope, feature, expected):
|
||||
fixtures.do_test_feature_count(fixtures.get_smda_extractor, sample, scope, feature, expected)
|
||||
Reference in New Issue
Block a user