diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 31742757..09c29338 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -62,7 +62,8 @@ jobs: with: python-version: ${{ matrix.python }} - name: Install capa - run: pip install -e .[dev] + # TODO: remove `pefile` when we bump lancelot >= 0.3.7 + run: pip install -e .[dev] pefile - name: Run tests run: pytest tests/ diff --git a/capa/features/extractors/lancelot/__init__.py b/capa/features/extractors/lancelot/__init__.py new file mode 100644 index 00000000..e22acade --- /dev/null +++ b/capa/features/extractors/lancelot/__init__.py @@ -0,0 +1,92 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging + +import lancelot + +import capa.features.extractors +import capa.features.extractors.lancelot.file +import capa.features.extractors.lancelot.insn +import capa.features.extractors.lancelot.function +import capa.features.extractors.lancelot.basicblock + +__all__ = ["file", "function", "basicblock", "insn"] +logger = logging.getLogger(__name__) + + +class BB(object): + """extend the lancelot.BasicBlock with an __int__ method to access the address""" + + def __init__(self, ws, bb): + super(BB, self).__init__() + self.ws = ws + self.address = bb.address + self.length = bb.length + self.predecessors = bb.predecessors + self.successors = bb.successors + + def __int__(self): + return self.address + + @property + def instructions(self): + va = self.address + while va < self.address + self.length: + try: + insn = self.ws.read_insn(va) + except ValueError: + logger.warning("failed to read instruction at 0x%x", va) + return + + yield insn + va += insn.length + + +class LancelotFeatureExtractor(capa.features.extractors.FeatureExtractor): + def __init__(self, buf): + super(LancelotFeatureExtractor, self).__init__() + self.buf = buf + self.ws = lancelot.from_bytes(buf) + self.ctx = {} + + def get_base_address(self): + return self.ws.base_address + + def extract_file_features(self): + for feature, va in capa.features.extractors.lancelot.file.extract_file_features(self.buf): + yield feature, va + + def get_functions(self): + for va in self.ws.get_functions(): + # this is just the address of the function + yield va + + def extract_function_features(self, f): + for feature, va in capa.features.extractors.lancelot.function.extract_function_features(self.ws, f): + yield feature, va + + def get_basic_blocks(self, f): + try: + cfg = self.ws.build_cfg(f) + except: + logger.warning("failed to build CFG for 0x%x", f) + return + else: + for bb in cfg.basic_blocks.values(): + yield BB(self.ws, bb) + + def extract_basic_block_features(self, f, bb): + for feature, va in capa.features.extractors.lancelot.basicblock.extract_basic_block_features(self.ws, bb): + yield feature, va + + def get_instructions(self, f, bb): + return bb.instructions + + def extract_insn_features(self, f, bb, insn): + for feature, va in capa.features.extractors.lancelot.insn.extract_insn_features(self, f, bb, insn): + yield feature, va diff --git a/capa/features/extractors/lancelot/basicblock.py b/capa/features/extractors/lancelot/basicblock.py new file mode 100644 index 00000000..e1dc0e36 --- /dev/null +++ b/capa/features/extractors/lancelot/basicblock.py @@ -0,0 +1,120 @@ +import string +import struct +import logging + +from lancelot import ( + FLOW_VA, + OPERAND_SIZE, + OPERAND_TYPE, + MEMORY_OPERAND_BASE, + OPERAND_TYPE_MEMORY, + OPERAND_TYPE_IMMEDIATE, + IMMEDIATE_OPERAND_VALUE, +) + +from capa.features import Characteristic +from capa.features.basicblock import BasicBlock +from capa.features.extractors.helpers import MIN_STACKSTRING_LEN + +logger = logging.getLogger(__name__) + + +def extract_bb_tight_loop(ws, bb): + """ check basic block for tight loop indicators """ + if bb.address in map(lambda flow: flow[FLOW_VA], bb.successors): + yield Characteristic("tight loop"), bb.address + + +def is_mov_imm_to_stack(insn): + if not insn.mnemonic.startswith("mov"): + return False + + try: + dst, src = insn.operands + except ValueError: + # not two operands + return False + + if src[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE: + return False + + if src[IMMEDIATE_OPERAND_VALUE] < 0: + return False + + if dst[OPERAND_TYPE] != OPERAND_TYPE_MEMORY: + return False + + if dst[MEMORY_OPERAND_BASE] not in ("ebp", "rbp", "esp", "rsp"): + return False + + return True + + +def is_printable_ascii(chars): + return all(c < 127 and chr(c) in string.printable for c in chars) + + +def is_printable_utf16le(chars): + if all(c == b"\x00" for c in chars[1::2]): + return is_printable_ascii(chars[::2]) + + +def get_printable_len(operand): + """ + Return string length if all operand bytes are ascii or utf16-le printable + """ + operand_size = operand[OPERAND_SIZE] + if operand_size == 8: + chars = struct.pack(" MIN_STACKSTRING_LEN: + return True + + return False + + +def extract_stackstring(ws, bb): + """ check basic block for stackstring indicators """ + if _bb_has_stackstring(ws, bb): + yield Characteristic("stack string"), bb.address + + +def extract_basic_block_features(ws, bb): + yield BasicBlock(), bb.address + for bb_handler in BASIC_BLOCK_HANDLERS: + for feature, va in bb_handler(ws, bb): + yield feature, va + + +BASIC_BLOCK_HANDLERS = ( + extract_bb_tight_loop, + extract_stackstring, +) diff --git a/capa/features/extractors/lancelot/file.py b/capa/features/extractors/lancelot/file.py new file mode 100644 index 00000000..73bc8243 --- /dev/null +++ b/capa/features/extractors/lancelot/file.py @@ -0,0 +1,81 @@ +import pefile + +import capa.features.extractors.strings +from capa.features import String, Characteristic +from capa.features.file import Export, Import, Section + + +def extract_file_embedded_pe(buf, pe): + buf = buf[2:] + + total_offset = 2 + while True: + try: + offset = buf.index(b"MZ") + except ValueError: + return + else: + rest = buf[offset:] + total_offset += offset + + try: + _ = pefile.PE(data=rest) + except: + pass + else: + yield Characteristic("embedded pe"), total_offset + + buf = rest[2:] + total_offset += 2 + + +def extract_file_export_names(buf, pe): + if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"): + return + + base_address = pe.OPTIONAL_HEADER.ImageBase + for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols: + yield Export(exp.name.decode("ascii")), base_address + exp.address + + +def extract_file_import_names(buf, pe): + base_address = pe.OPTIONAL_HEADER.ImageBase + for entry in pe.DIRECTORY_ENTRY_IMPORT: + libname = entry.dll.decode("ascii").lower().partition(".")[0] + for imp in entry.imports: + if imp.ordinal: + yield Import("%s.#%s" % (libname, imp.ordinal)), imp.address + else: + impname = imp.name.decode("ascii") + yield Import("%s.%s" % (libname, impname)), imp.address + yield Import("%s" % (impname)), imp.address + + +def extract_file_section_names(buf, pe): + base_address = pe.OPTIONAL_HEADER.ImageBase + for section in pe.sections: + yield Section(section.Name.partition(b"\x00")[0].decode("ascii")), base_address + section.VirtualAddress + + +def extract_file_strings(buf, pe): + for s in capa.features.extractors.strings.extract_ascii_strings(buf): + yield String(s.s), s.offset + + for s in capa.features.extractors.strings.extract_unicode_strings(buf): + yield String(s.s), s.offset + + +def extract_file_features(buf): + pe = pefile.PE(data=buf) + for file_handler in FILE_HANDLERS: + for feature, va in file_handler(buf, pe): + yield feature, va + + +FILE_HANDLERS = ( + extract_file_embedded_pe, + extract_file_export_names, + extract_file_import_names, + extract_file_section_names, + extract_file_strings, +) diff --git a/capa/features/extractors/lancelot/function.py b/capa/features/extractors/lancelot/function.py new file mode 100644 index 00000000..b885f392 --- /dev/null +++ b/capa/features/extractors/lancelot/function.py @@ -0,0 +1,64 @@ +import logging + +try: + from functools import lru_cache +except ImportError: + from backports.functools_lru_cache import lru_cache + +from lancelot import ( + FLOW_VA, + FLOW_TYPE, + FLOW_TYPE_CONDITIONAL_JUMP, + FLOW_TYPE_CONDITIONAL_MOVE, + FLOW_TYPE_UNCONDITIONAL_JUMP, +) + +from capa.features import Characteristic +from capa.features.extractors import loops + +logger = logging.getLogger(__name__) + + +@lru_cache +def get_call_graph(ws): + return ws.build_call_graph() + + +def extract_function_calls_to(ws, f): + cg = get_call_graph(ws) + + for caller in cg.calls_to.get(f, []): + yield Characteristic("calls to"), caller + + +def extract_function_loop(ws, f): + edges = [] + for bb in ws.build_cfg(f).basic_blocks.values(): + for flow in bb.successors: + if flow[FLOW_TYPE] in ( + FLOW_TYPE_UNCONDITIONAL_JUMP, + FLOW_TYPE_CONDITIONAL_JUMP, + FLOW_TYPE_CONDITIONAL_MOVE, + ): + edges.append((bb.address, flow[FLOW_VA])) + continue + + if edges and loops.has_loop(edges): + yield Characteristic("loop"), f + + +FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop) + + +_not_implemented = set([]) + + +def extract_function_features(ws, f): + for func_handler in FUNCTION_HANDLERS: + try: + for feature, va in func_handler(ws, f): + yield feature, va + except NotImplementedError: + if func_handler.__name__ not in _not_implemented: + logger.warning("not implemented: %s", func_handler.__name__) + _not_implemented.add(func_handler.__name__) diff --git a/capa/features/extractors/lancelot/helpers.py b/capa/features/extractors/lancelot/helpers.py new file mode 100644 index 00000000..fcf6c037 --- /dev/null +++ b/capa/features/extractors/lancelot/helpers.py @@ -0,0 +1,33 @@ +from lancelot import ( + OPERAND_TYPE, + MEMORY_OPERAND_BASE, + MEMORY_OPERAND_DISP, + OPERAND_TYPE_MEMORY, + OPERAND_TYPE_IMMEDIATE, + IMMEDIATE_OPERAND_VALUE, + IMMEDIATE_OPERAND_IS_RELATIVE, +) + + +def get_operand_target(insn, op): + if op[OPERAND_TYPE] == OPERAND_TYPE_MEMORY: + # call direct, x64 + # rip relative + # kernel32-64:180001041 call cs:__imp_RtlVirtualUnwind_0 + if op[MEMORY_OPERAND_BASE] == "rip": + return op[MEMORY_OPERAND_DISP] + insn.address + insn.length + + # call direct, x32 + # mimikatz:0x403BD3 call ds:CryptAcquireContextW + elif op[MEMORY_OPERAND_BASE] == None: + return op[MEMORY_OPERAND_DISP] + + # call via thunk + # mimikatz:0x455A41 call LsaQueryInformationPolicy + elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE and op[IMMEDIATE_OPERAND_IS_RELATIVE]: + return op[IMMEDIATE_OPERAND_VALUE] + insn.address + insn.length + + elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE: + return op[IMMEDIATE_OPERAND_VALUE] + + raise ValueError("memory operand has no target") diff --git a/capa/features/extractors/lancelot/indirect_calls.py b/capa/features/extractors/lancelot/indirect_calls.py new file mode 100644 index 00000000..f6f376b0 --- /dev/null +++ b/capa/features/extractors/lancelot/indirect_calls.py @@ -0,0 +1,149 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import collections + +from lancelot import ( + FLOW_VA, + OPERAND_TYPE, + PERMISSION_READ, + MEMORY_OPERAND_BASE, + MEMORY_OPERAND_DISP, + OPERAND_TYPE_MEMORY, + MEMORY_OPERAND_INDEX, + OPERAND_TYPE_REGISTER, + MEMORY_OPERAND_SEGMENT, + OPERAND_TYPE_IMMEDIATE, + IMMEDIATE_OPERAND_VALUE, + REGISTER_OPERAND_REGISTER, + IMMEDIATE_OPERAND_IS_RELATIVE, +) + +from capa.features.extractors.lancelot.helpers import get_operand_target + +DESTRUCTIVE_MNEMONICS = ("mov", "lea", "pop", "xor") + + +class NotFoundError(Exception): + pass + + +def read_instructions(ws, bb): + va = bb.address + while va < bb.address + bb.length: + try: + insn = ws.read_insn(va) + except ValueError: + return + + yield insn + va += insn.length + + +def build_instruction_predecessors(ws, cfg): + preds = collections.defaultdict(set) + + for bb in cfg.basic_blocks.values(): + insns = list(read_instructions(ws, bb)) + + for i, insn in enumerate(insns): + if i == 0: + for pred in bb.predecessors: + pred_bb = cfg.basic_blocks[pred[FLOW_VA]] + preds[insn.address].add(list(read_instructions(ws, pred_bb))[-1].address) + else: + preds[insn.address].add(insns[i - 1].address) + + return preds + + +def find_definition(ws, f, insn): + """ + scan backwards from the given address looking for assignments to the given register. + if a constant, return that value. + args: + ws (lancelot.PE) + f (int): the function start address + insn (lancelot.Instruction): call instruction to resolve + returns: + (va: int, value?: int|None): the address of the assignment and the value, if a constant. + raises: + NotFoundError: when the definition cannot be found. + """ + assert insn.mnemonic == "call" + op0 = insn.operands[0] + assert op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER + reg = op0[REGISTER_OPERAND_REGISTER] + + cfg = ws.build_cfg(f) + preds = build_instruction_predecessors(ws, cfg) + + q = collections.deque() + seen = set([]) + q.extend(preds[insn.address]) + while q: + cur = q.popleft() + + # skip if we've already processed this location + if cur in seen: + continue + seen.add(cur) + + insn = ws.read_insn(cur) + operands = insn.operands + + if len(operands) == 0: + q.extend(preds[cur]) + continue + + op0 = operands[0] + if not ( + op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER + and op0[REGISTER_OPERAND_REGISTER] == reg + and insn.mnemonic in DESTRUCTIVE_MNEMONICS + ): + q.extend(preds[cur]) + continue + + # if we reach here, the instruction is destructive to our target register. + + # we currently only support extracting the constant from something like: `mov $reg, IAT` + # so, any other pattern results in an unknown value, represented by None. + # this is a good place to extend in the future, if we need more robust support. + if insn.mnemonic != "mov": + return (cur, None) + else: + op1 = operands[1] + try: + target = get_operand_target(insn, op1) + except ValueError: + return (cur, None) + else: + return (cur, target) + + raise NotFoundError() + + +def is_indirect_call(insn): + return insn.mnemonic == "call" and insn.operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER + + +def resolve_indirect_call(ws, f, insn): + """ + inspect the given indirect call instruction and attempt to resolve the target address. + args: + ws (lancelot.PE): the analysis workspace + f (int): the address of the function to analyze + insn (lancelot.Instruction): the instruction at which to start analysis + returns: + (va: int, value?: int|None): the address of the assignment and the value, if a constant. + raises: + NotFoundError: when the definition cannot be found. + """ + assert is_indirect_call(insn) + return find_definition(ws, f, insn) diff --git a/capa/features/extractors/lancelot/insn.py b/capa/features/extractors/lancelot/insn.py new file mode 100644 index 00000000..d658bf79 --- /dev/null +++ b/capa/features/extractors/lancelot/insn.py @@ -0,0 +1,487 @@ +import logging +import itertools + +import pefile + +try: + from functools import lru_cache +except ImportError: + from backports.functools_lru_cache import lru_cache + +from lancelot import ( + OPERAND_TYPE, + PERMISSION_READ, + MEMORY_OPERAND_BASE, + MEMORY_OPERAND_DISP, + OPERAND_TYPE_MEMORY, + MEMORY_OPERAND_INDEX, + OPERAND_TYPE_REGISTER, + MEMORY_OPERAND_SEGMENT, + OPERAND_TYPE_IMMEDIATE, + IMMEDIATE_OPERAND_VALUE, + REGISTER_OPERAND_REGISTER, + IMMEDIATE_OPERAND_IS_RELATIVE, +) + +import capa.features.extractors.helpers +from capa.features import ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, Bytes, String, Characteristic +from capa.features.insn import Number, Offset, Mnemonic +from capa.features.extractors.lancelot.helpers import get_operand_target +from capa.features.extractors.lancelot.function import get_call_graph +from capa.features.extractors.lancelot.indirect_calls import NotFoundError, resolve_indirect_call + +logger = logging.getLogger(__name__) + + +# security cookie checks may perform non-zeroing XORs, these are expected within a certain +# byte range within the first and returning basic blocks, this helps to reduce FP features +SECURITY_COOKIE_BYTES_DELTA = 0x40 + + +def get_arch(ws): + if ws.arch == "x32": + return ARCH_X32 + elif ws.arch == "x64": + return ARCH_X64 + else: + raise ValueError("unexpected architecture") + + +@lru_cache +def get_pefile(xtor): + return pefile.PE(data=xtor.buf) + + +@lru_cache +def get_imports(xtor): + pe = get_pefile(xtor) + + imports = {} + for entry in pe.DIRECTORY_ENTRY_IMPORT: + libname = entry.dll.decode("ascii").lower().partition(".")[0] + for imp in entry.imports: + if imp.ordinal: + imports[imp.address] = "%s.#%s" % (libname, imp.ordinal) + else: + impname = imp.name.decode("ascii") + imports[imp.address] = "%s.%s" % (libname, impname) + return imports + + +@lru_cache +def get_thunks(xtor): + thunks = {} + for va in xtor.ws.get_functions(): + try: + insn = xtor.ws.read_insn(va) + except ValueError: + continue + + if insn.mnemonic != "jmp": + continue + + op0 = insn.operands[0] + + try: + target = get_operand_target(insn, op0) + except ValueError: + continue + + imports = get_imports(xtor) + if target not in imports: + continue + + thunks[va] = imports[target] + + return thunks + + +def extract_insn_api_features(xtor, f, bb, insn): + """parse API features from the given instruction.""" + + if insn.mnemonic != "call": + return + + op0 = insn.operands[0] + + if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER: + try: + (_, target) = resolve_indirect_call(xtor.ws, f, insn) + except NotFoundError: + return + if target is None: + return + else: + try: + target = get_operand_target(insn, op0) + except ValueError: + return + + imports = get_imports(xtor) + if target in imports: + for feature, va in capa.features.extractors.helpers.generate_api_features(imports[target], insn.address): + yield feature, va + return + + thunks = get_thunks(xtor) + if target in thunks: + for feature, va in capa.features.extractors.helpers.generate_api_features(thunks[target], insn.address): + yield feature, va + + +def extract_insn_mnemonic_features(xtor, f, bb, insn): + """parse mnemonic features from the given instruction.""" + yield Mnemonic(insn.mnemonic), insn.address + + +def extract_insn_number_features(xtor, f, bb, insn): + """parse number features from the given instruction.""" + operands = insn.operands + + for operand in operands: + if operand[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE: + continue + + v = operand[IMMEDIATE_OPERAND_VALUE] + + if xtor.ws.probe(v) & PERMISSION_READ: + # v is a valid address + # therefore, assume its not also a constant. + continue + + if ( + insn.mnemonic == "add" + and operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER + and operands[0][REGISTER_OPERAND_REGISTER] == "esp" + ): + # skip things like: + # + # .text:00401140 call sub_407E2B + # .text:00401145 add esp, 0Ch + return + + yield Number(v), insn.address + yield Number(v, arch=get_arch(xtor.ws)), insn.address + + +def extract_insn_offset_features(xtor, f, bb, insn): + """parse structure offset features from the given instruction.""" + operands = insn.operands + + for operand in operands: + if operand[OPERAND_TYPE] != OPERAND_TYPE_MEMORY: + continue + + if operand[MEMORY_OPERAND_BASE] in ("esp", "ebp", "rbp"): + continue + + # lancelot provides `None` when the displacement is not present. + v = operand[MEMORY_OPERAND_DISP] or 0 + + yield Offset(v), insn.address + yield Offset(v, arch=get_arch(xtor.ws)), insn.address + + +def derefs(xtor, p): + """ + recursively follow the given pointer, yielding the valid memory addresses along the way. + useful when you may have a pointer to string, or pointer to pointer to string, etc. + this is a "do what i mean" type of helper function. + """ + + depth = 0 + while True: + if not xtor.ws.probe(p) & PERMISSION_READ: + return + yield p + + next = xtor.ws.read_pointer(p) + + # sanity: pointer points to self + if next == p: + return + + # sanity: avoid chains of pointers that are unreasonably deep + depth += 1 + if depth > 10: + return + + p = next + + +def read_bytes(xtor, va): + """ + read up to MAX_BYTES_FEATURE_SIZE from the given address. + + raises: + ValueError: if the given address is not valid. + """ + start = va + end = va + MAX_BYTES_FEATURE_SIZE + pe = get_pefile(xtor) + + for section in pe.sections: + section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize + + if section_start <= start < section_end: + end = min(end, section_end) + return xtor.ws.read_bytes(start, end - start) + + raise ValueError("invalid address") + + +# these are mnemonics that may flow (jump) elsewhere +FLOW_MNEMONICS = set( + [ + "call", + "jb", + "jbe", + "jcxz", + "jecxz", + "jknzd", + "jkzd", + "jl", + "jle", + "jmp", + "jnb", + "jnbe", + "jnl", + "jnle", + "jno", + "jnp", + "jns", + "jnz", + "jo", + "jp", + "jrcxz", + "js", + "jz", + ] +) + + +def extract_insn_bytes_features(xtor, f, bb, insn): + """ + parse byte sequence features from the given instruction. + """ + if insn.mnemonic in FLOW_MNEMONICS: + return + + for operand in insn.operands: + try: + target = get_operand_target(insn, operand) + except ValueError: + continue + + for ptr in derefs(xtor, target): + try: + buf = read_bytes(xtor, ptr) + except ValueError: + continue + + if capa.features.extractors.helpers.all_zeros(buf): + continue + + yield Bytes(buf), insn.address + + +def first(s): + """enumerate the first element in the sequence""" + for i in s: + yield i + break + + +def extract_insn_string_features(xtor, f, bb, insn): + """parse string features from the given instruction.""" + for bytez, va in extract_insn_bytes_features(xtor, f, bb, insn): + buf = bytez.value + + for s in itertools.chain( + first(capa.features.extractors.strings.extract_ascii_strings(buf)), + first(capa.features.extractors.strings.extract_unicode_strings(buf)), + ): + if s.offset == 0: + yield String(s.s), va + + +def is_security_cookie(xtor, f, bb, insn): + """ + check if an instruction is related to security cookie checks + """ + op1 = insn.operands[1] + if op1[OPERAND_TYPE] == OPERAND_TYPE_REGISTER and op1[REGISTER_OPERAND_REGISTER] not in ( + "esp", + "ebp", + "rbp", + "rsp", + ): + return False + + # expect security cookie init in first basic block within first bytes (instructions) + if f == bb.address and insn.address < (bb.address + SECURITY_COOKIE_BYTES_DELTA): + return True + + # ... or within last bytes (instructions) before a return + insns = list(xtor.get_instructions(f, bb)) + if insns[-1].mnemonic in ("ret", "retn") and insn.address > (bb.address + bb.length - SECURITY_COOKIE_BYTES_DELTA): + return True + + return False + + +def extract_insn_nzxor_characteristic_features(xtor, f, bb, insn): + """ + parse non-zeroing XOR instruction from the given instruction. + ignore expected non-zeroing XORs, e.g. security cookies. + """ + if insn.mnemonic != "xor": + return + + operands = insn.operands + if operands[0] == operands[1]: + return + + if is_security_cookie(xtor, f, bb, insn): + return + + yield Characteristic("nzxor"), insn.address + + +def extract_insn_peb_access_characteristic_features(xtor, f, bb, insn): + """ + parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64 + """ + for operand in insn.operands: + if ( + operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY + and operand[MEMORY_OPERAND_SEGMENT] == "gs" + and operand[MEMORY_OPERAND_DISP] == 0x60 + ): + yield Characteristic("peb access"), insn.address + + if ( + operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY + and operand[MEMORY_OPERAND_SEGMENT] == "fs" + and operand[MEMORY_OPERAND_DISP] == 0x30 + ): + yield Characteristic("peb access"), insn.address + + +def extract_insn_segment_access_features(xtor, f, bb, insn): + """ parse the instruction for access to fs or gs """ + for operand in insn.operands: + if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "gs": + yield Characteristic("gs access"), insn.address + + if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "fs": + yield Characteristic("fs access"), insn.address + + +def get_section(xtor, va): + pe = get_pefile(xtor) + + for i, section in enumerate(pe.sections): + section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize + + if section_start <= va < section_end: + return i + + raise ValueError("invalid address") + + +def extract_insn_cross_section_cflow(xtor, f, bb, insn): + """ + inspect the instruction for a CALL or JMP that crosses section boundaries. + """ + if insn.mnemonic not in FLOW_MNEMONICS: + return + + try: + target = get_operand_target(insn, insn.operands[0]) + except ValueError: + return + + if target in get_imports(xtor): + return + + try: + if get_section(xtor, insn.address) != get_section(xtor, target): + yield Characteristic("cross section flow"), insn.address + except ValueError: + return + + +def extract_function_calls_from(xtor, f, bb, insn): + cg = get_call_graph(xtor.ws) + + for callee in cg.calls_from.get(insn.address, []): + yield Characteristic("calls from"), callee + + if callee == f: + yield Characteristic("recursive call"), insn.address + + # lancelot doesn't count API calls when constructing the call graph + # so we still have to scan for calls to an import + if insn.mnemonic != "call": + return + + try: + target = get_operand_target(insn, insn.operands[0]) + except ValueError: + return + + imports = get_imports(xtor) + if target in imports: + yield Characteristic("calls from"), target + + +# this is a feature that's most relevant at the function or basic block scope, +# however, its most efficient to extract at the instruction scope. +def extract_function_indirect_call_characteristic_features(xtor, f, bb, insn): + """ + extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4]) + does not include calls like => call ds:dword_ABD4974 + """ + if insn.mnemonic != "call": + return + + op0 = insn.operands[0] + if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER: + yield Characteristic("indirect call"), insn.address + elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_BASE] is not None: + yield Characteristic("indirect call"), insn.address + elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_INDEX] is not None: + yield Characteristic("indirect call"), insn.address + + +_not_implemented = set([]) + + +def extract_insn_features(xtor, f, bb, insn): + for insn_handler in INSTRUCTION_HANDLERS: + try: + for feature, va in insn_handler(xtor, f, bb, insn): + yield feature, va + except NotImplementedError: + if insn_handler.__name__ not in _not_implemented: + logger.warning("not implemented: %s", insn_handler.__name__) + _not_implemented.add(insn_handler.__name__) + + +INSTRUCTION_HANDLERS = ( + extract_insn_api_features, + extract_insn_number_features, + extract_insn_string_features, + extract_insn_bytes_features, + extract_insn_offset_features, + extract_insn_nzxor_characteristic_features, + extract_insn_mnemonic_features, + extract_insn_peb_access_characteristic_features, + extract_insn_cross_section_cflow, + extract_insn_segment_access_features, + extract_function_calls_from, + extract_function_indirect_call_characteristic_features, +) diff --git a/capa/features/extractors/viv/__init__.py b/capa/features/extractors/viv/__init__.py index c6511215..76e2a2f2 100644 --- a/capa/features/extractors/viv/__init__.py +++ b/capa/features/extractors/viv/__init__.py @@ -8,11 +8,7 @@ import types -import file -import insn -import function import viv_utils -import basicblock import capa.features.extractors import capa.features.extractors.viv.file diff --git a/capa/features/insn.py b/capa/features/insn.py index bd8b4c94..fc8d01f6 100644 --- a/capa/features/insn.py +++ b/capa/features/insn.py @@ -24,7 +24,10 @@ class Number(Feature): super(Number, self).__init__(value, arch=arch, description=description) def get_value_str(self): - return "0x%X" % self.value + if self.value < 0: + return "-0x%X" % (-self.value) + else: + return "0x%X" % self.value class Offset(Feature): diff --git a/capa/main.py b/capa/main.py index a779c9a6..fb4a22d2 100644 --- a/capa/main.py +++ b/capa/main.py @@ -32,7 +32,7 @@ import capa.features.extractors from capa.helpers import oint, get_file_taste RULES_PATH_DEFAULT_STRING = "(embedded rules)" -SUPPORTED_FILE_MAGIC = set(["MZ"]) +SUPPORTED_FILE_MAGIC = set([b"MZ"]) logger = logging.getLogger("capa") @@ -290,7 +290,24 @@ class UnsupportedRuntimeError(RuntimeError): def get_extractor_py3(path, format, disable_progress=False): - raise UnsupportedRuntimeError() + try: + import lancelot + + import capa.features.extractors.lancelot + except ImportError: + logger.warning("lancelot not installed") + raise UnsupportedRuntimeError() + + if format not in ("pe", "auto"): + raise UnsupportedFormatError(format) + + if not is_supported_file_type(path): + raise UnsupportedFormatError() + + with open(path, "rb") as f: + buf = f.read() + + return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf) def get_extractor(path, format, disable_progress=False): diff --git a/scripts/import-to-bn.py b/scripts/import-to-bn.py index 69e050cc..20851b33 100644 --- a/scripts/import-to-bn.py +++ b/scripts/import-to-bn.py @@ -1,112 +1,112 @@ -""" -Binary Ninja plugin that imports a capa report, -produced via `capa --json /path/to/sample`, -into the current database. - -It will mark up functions with their capa matches, like: - - ; capa: print debug messages (host-interaction/log/debug/write-event) - ; capa: delete service (host-interaction/service/delete) - ; Attributes: bp-based frame - - public UninstallService - UninstallService proc near - ... - -To use, invoke from the Binary Ninja Tools menu, or from the -command-palette. - -Adapted for Binary Ninja by @psifertex - -This script will verify that the report matches the workspace. -Check the log window for any errors, and/or the summary of changes. - -Derived from: https://github.com/fireeye/capa/blob/master/scripts/import-to-ida.py -""" -import os -import json - -from binaryninja import * - - -def append_func_cmt(bv, va, cmt): - """ - add the given comment to the given function, - if it doesn't already exist. - """ - func = bv.get_function_at(va) - if not func: - raise ValueError("not a function") - - if cmt in func.comment: - return - - func.comment = func.comment + "\n" + cmt - - -def load_analysis(bv): - shortname = os.path.splitext(os.path.basename(bv.file.filename))[0] - dirname = os.path.dirname(bv.file.filename) - log_info(f"dirname: {dirname}\nshortname: {shortname}\n") - if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK): - path = os.path.join(dirname, shortname + ".js") - elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK): - path = os.path.join(dirname, shortname + ".json") - else: - path = interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)") - if not path or not os.access(path, os.R_OK): - log_error("Invalid filename.") - return 0 - log_info("Using capa file %s" % path) - - with open(path, "rb") as f: - doc = json.loads(f.read().decode("utf-8")) - - if "meta" not in doc or "rules" not in doc: - log_error("doesn't appear to be a capa report") - return -1 - - a = doc["meta"]["sample"]["md5"].lower() - md5 = Transform["MD5"] - rawhex = Transform["RawHex"] - b = rawhex.encode(md5.encode(bv.parent_view.read(bv.parent_view.start, bv.parent_view.end))).decode("utf-8") - if not a == b: - log_error("sample mismatch") - return -2 - - rows = [] - for rule in doc["rules"].values(): - if rule["meta"].get("lib"): - continue - if rule["meta"].get("capa/subscope"): - continue - if rule["meta"]["scope"] != "function": - continue - - name = rule["meta"]["name"] - ns = rule["meta"].get("namespace", "") - for va in rule["matches"].keys(): - va = int(va) - rows.append((ns, name, va)) - - # order by (namespace, name) so that like things show up together - rows = sorted(rows) - for ns, name, va in rows: - if ns: - cmt = "%s (%s)" % (name, ns) - else: - cmt = "%s" % (name,) - - log_info("0x%x: %s" % (va, cmt)) - try: - # message will look something like: - # - # capa: delete service (host-interaction/service/delete) - append_func_cmt(bv, va, "capa: " + cmt) - except ValueError: - continue - - log_info("ok") - - -PluginCommand.register("Load capa file", "Loads an analysis file from capa", load_analysis) +""" +Binary Ninja plugin that imports a capa report, +produced via `capa --json /path/to/sample`, +into the current database. + +It will mark up functions with their capa matches, like: + + ; capa: print debug messages (host-interaction/log/debug/write-event) + ; capa: delete service (host-interaction/service/delete) + ; Attributes: bp-based frame + + public UninstallService + UninstallService proc near + ... + +To use, invoke from the Binary Ninja Tools menu, or from the +command-palette. + +Adapted for Binary Ninja by @psifertex + +This script will verify that the report matches the workspace. +Check the log window for any errors, and/or the summary of changes. + +Derived from: https://github.com/fireeye/capa/blob/master/scripts/import-to-ida.py +""" +import os +import json + +from binaryninja import * + + +def append_func_cmt(bv, va, cmt): + """ + add the given comment to the given function, + if it doesn't already exist. + """ + func = bv.get_function_at(va) + if not func: + raise ValueError("not a function") + + if cmt in func.comment: + return + + func.comment = func.comment + "\n" + cmt + + +def load_analysis(bv): + shortname = os.path.splitext(os.path.basename(bv.file.filename))[0] + dirname = os.path.dirname(bv.file.filename) + log_info(f"dirname: {dirname}\nshortname: {shortname}\n") + if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK): + path = os.path.join(dirname, shortname + ".js") + elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK): + path = os.path.join(dirname, shortname + ".json") + else: + path = interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)") + if not path or not os.access(path, os.R_OK): + log_error("Invalid filename.") + return 0 + log_info("Using capa file %s" % path) + + with open(path, "rb") as f: + doc = json.loads(f.read().decode("utf-8")) + + if "meta" not in doc or "rules" not in doc: + log_error("doesn't appear to be a capa report") + return -1 + + a = doc["meta"]["sample"]["md5"].lower() + md5 = Transform["MD5"] + rawhex = Transform["RawHex"] + b = rawhex.encode(md5.encode(bv.parent_view.read(bv.parent_view.start, bv.parent_view.end))).decode("utf-8") + if not a == b: + log_error("sample mismatch") + return -2 + + rows = [] + for rule in doc["rules"].values(): + if rule["meta"].get("lib"): + continue + if rule["meta"].get("capa/subscope"): + continue + if rule["meta"]["scope"] != "function": + continue + + name = rule["meta"]["name"] + ns = rule["meta"].get("namespace", "") + for va in rule["matches"].keys(): + va = int(va) + rows.append((ns, name, va)) + + # order by (namespace, name) so that like things show up together + rows = sorted(rows) + for ns, name, va in rows: + if ns: + cmt = "%s (%s)" % (name, ns) + else: + cmt = "%s" % (name,) + + log_info("0x%x: %s" % (va, cmt)) + try: + # message will look something like: + # + # capa: delete service (host-interaction/service/delete) + append_func_cmt(bv, va, "capa: " + cmt) + except ValueError: + continue + + log_info("ok") + + +PluginCommand.register("Load capa file", "Loads an analysis file from capa", load_analysis) diff --git a/scripts/import-to-ida.py b/scripts/import-to-ida.py index 9a5caff2..c52e2cd0 100644 --- a/scripts/import-to-ida.py +++ b/scripts/import-to-ida.py @@ -1,117 +1,117 @@ -""" -IDA Pro script that imports a capa report, -produced via `capa --json /path/to/sample`, -into the current database. - -It will mark up functions with their capa matches, like: - - ; capa: print debug messages (host-interaction/log/debug/write-event) - ; capa: delete service (host-interaction/service/delete) - ; Attributes: bp-based frame - - public UninstallService - UninstallService proc near - ... - -To use, invoke from the IDA Pro scripting dialog, -such as via Alt-F9, -and then select the existing capa report from the file system. - -This script will verify that the report matches the workspace. -Check the output window for any errors, and/or the summary of changes. - -Copyright (C) 2020 FireEye, Inc. All Rights Reserved. -Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. -You may obtain a copy of the License at: [package root]/LICENSE.txt -Unless required by applicable law or agreed to in writing, software distributed under the License - is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and limitations under the License. -""" -import json -import logging - -import idc -import idautils -import ida_idaapi -import ida_kernwin - -logger = logging.getLogger("capa") - - -def append_func_cmt(va, cmt, repeatable=False): - """ - add the given comment to the given function, - if it doesn't already exist. - """ - func = ida_funcs.get_func(va) - if not func: - raise ValueError("not a function") - - existing = ida_funcs.get_func_cmt(func, repeatable) or "" - if cmt in existing: - return - - new = existing + "\n" + cmt - ida_funcs.set_func_cmt(func, new, repeatable) - - -def main(): - path = ida_kernwin.ask_file(False, "*", "capa report") - if not path: - return 0 - - with open(path, "rb") as f: - doc = json.loads(f.read().decode("utf-8")) - - if "meta" not in doc or "rules" not in doc: - logger.error("doesn't appear to be a capa report") - return -1 - - # in IDA 7.4, the MD5 hash may be truncated, for example: - # wanted: 84882c9d43e23d63b82004fae74ebb61 - # found: b'84882C9D43E23D63B82004FAE74EBB6\x00' - # - # see: https://github.com/idapython/bin/issues/11 - a = doc["meta"]["sample"]["md5"].lower() - b = idautils.GetInputFileMD5().decode("ascii").lower().rstrip("\x00") - if not a.startswith(b): - logger.error("sample mismatch") - return -2 - - rows = [] - for rule in doc["rules"].values(): - if rule["meta"].get("lib"): - continue - if rule["meta"].get("capa/subscope"): - continue - if rule["meta"]["scope"] != "function": - continue - - name = rule["meta"]["name"] - ns = rule["meta"].get("namespace", "") - for va in rule["matches"].keys(): - va = int(va) - rows.append((ns, name, va)) - - # order by (namespace, name) so that like things show up together - rows = sorted(rows) - for ns, name, va in rows: - if ns: - cmt = "%s (%s)" % (name, ns) - else: - cmt = "%s" % (name,) - - logger.info("0x%x: %s", va, cmt) - try: - # message will look something like: - # - # capa: delete service (host-interaction/service/delete) - append_func_cmt(va, "capa: " + cmt, repeatable=False) - except ValueError: - continue - - logger.info("ok") - - -main() +""" +IDA Pro script that imports a capa report, +produced via `capa --json /path/to/sample`, +into the current database. + +It will mark up functions with their capa matches, like: + + ; capa: print debug messages (host-interaction/log/debug/write-event) + ; capa: delete service (host-interaction/service/delete) + ; Attributes: bp-based frame + + public UninstallService + UninstallService proc near + ... + +To use, invoke from the IDA Pro scripting dialog, +such as via Alt-F9, +and then select the existing capa report from the file system. + +This script will verify that the report matches the workspace. +Check the output window for any errors, and/or the summary of changes. + +Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. +You may obtain a copy of the License at: [package root]/LICENSE.txt +Unless required by applicable law or agreed to in writing, software distributed under the License + is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +""" +import json +import logging + +import idc +import idautils +import ida_idaapi +import ida_kernwin + +logger = logging.getLogger("capa") + + +def append_func_cmt(va, cmt, repeatable=False): + """ + add the given comment to the given function, + if it doesn't already exist. + """ + func = ida_funcs.get_func(va) + if not func: + raise ValueError("not a function") + + existing = ida_funcs.get_func_cmt(func, repeatable) or "" + if cmt in existing: + return + + new = existing + "\n" + cmt + ida_funcs.set_func_cmt(func, new, repeatable) + + +def main(): + path = ida_kernwin.ask_file(False, "*", "capa report") + if not path: + return 0 + + with open(path, "rb") as f: + doc = json.loads(f.read().decode("utf-8")) + + if "meta" not in doc or "rules" not in doc: + logger.error("doesn't appear to be a capa report") + return -1 + + # in IDA 7.4, the MD5 hash may be truncated, for example: + # wanted: 84882c9d43e23d63b82004fae74ebb61 + # found: b'84882C9D43E23D63B82004FAE74EBB6\x00' + # + # see: https://github.com/idapython/bin/issues/11 + a = doc["meta"]["sample"]["md5"].lower() + b = idautils.GetInputFileMD5().decode("ascii").lower().rstrip("\x00") + if not a.startswith(b): + logger.error("sample mismatch") + return -2 + + rows = [] + for rule in doc["rules"].values(): + if rule["meta"].get("lib"): + continue + if rule["meta"].get("capa/subscope"): + continue + if rule["meta"]["scope"] != "function": + continue + + name = rule["meta"]["name"] + ns = rule["meta"].get("namespace", "") + for va in rule["matches"].keys(): + va = int(va) + rows.append((ns, name, va)) + + # order by (namespace, name) so that like things show up together + rows = sorted(rows) + for ns, name, va in rows: + if ns: + cmt = "%s (%s)" % (name, ns) + else: + cmt = "%s" % (name,) + + logger.info("0x%x: %s", va, cmt) + try: + # message will look something like: + # + # capa: delete service (host-interaction/service/delete) + append_func_cmt(va, "capa: " + cmt, repeatable=False) + except ValueError: + continue + + logger.info("ok") + + +main() diff --git a/setup.py b/setup.py index 3840ad1c..76e5de7d 100644 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ requirements = ["six", "tqdm", "pyyaml", "tabulate", "colorama", "termcolor", "r if sys.version_info >= (3, 0): # py3 requirements.append("networkx") + requirements.append("pylancelot~=0.3.6") else: # py2 requirements.append("enum34") @@ -54,7 +55,7 @@ setuptools.setup( "pycodestyle", "black ; python_version>'3.0'", "isort", - ] + ], }, zip_safe=False, keywords="capa", diff --git a/tests/fixtures.py b/tests/fixtures.py index f41603cb..fe3834a9 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -80,6 +80,16 @@ def get_viv_extractor(path): return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path) +@lru_cache +def get_lancelot_extractor(path): + import capa.features.extractors.lancelot + + with open(path, "rb") as f: + buf = f.read() + + return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf) + + @lru_cache() def extract_file_features(extractor): features = collections.defaultdict(set) @@ -429,7 +439,7 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected): def get_extractor(path): if sys.version_info >= (3, 0): - raise RuntimeError("no supported py3 backends yet") + extractor = get_lancelot_extractor(path) else: extractor = get_viv_extractor(path) diff --git a/tests/test_ida_features.py b/tests/test_ida_features.py index b227775e..86fd79b3 100644 --- a/tests/test_ida_features.py +++ b/tests/test_ida_features.py @@ -1,104 +1,104 @@ -# run this script from within IDA with ./tests/data/mimikatz.exe open -import sys -import logging -import os.path -import binascii -import traceback - -import pytest - -try: - sys.path.append(os.path.dirname(__file__)) - from fixtures import * -finally: - sys.path.pop() - - -logger = logging.getLogger("test_ida_features") - - -def check_input_file(wanted): - import idautils - - # some versions (7.4) of IDA return a truncated version of the MD5. - # https://github.com/idapython/bin/issues/11 - try: - found = idautils.GetInputFileMD5()[:31].decode("ascii").lower() - except UnicodeDecodeError: - # in IDA 7.5 or so, GetInputFileMD5 started returning raw binary - # rather than the hex digest - found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower() - - if not wanted.startswith(found): - raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted)) - - -def get_ida_extractor(_path): - check_input_file("5f66b82558ca92e54e77f216ef4c066c") - - # have to import import this inline so pytest doesn't bail outside of IDA - import capa.features.extractors.ida - - return capa.features.extractors.ida.IdaFeatureExtractor() - - -@pytest.mark.skip(reason="IDA Pro tests must be run within IDA") -def test_ida_features(): - for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS: - id = make_test_id((sample, scope, feature, expected)) - - try: - check_input_file(get_sample_md5_by_name(sample)) - except RuntimeError: - print("SKIP %s" % (id)) - continue - - scope = resolve_scope(scope) - sample = resolve_sample(sample) - - try: - do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected) - except Exception as e: - print("FAIL %s" % (id)) - traceback.print_exc() - else: - print("OK %s" % (id)) - - -@pytest.mark.skip(reason="IDA Pro tests must be run within IDA") -def test_ida_feature_counts(): - for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS: - id = make_test_id((sample, scope, feature, expected)) - - try: - check_input_file(get_sample_md5_by_name(sample)) - except RuntimeError: - print("SKIP %s" % (id)) - continue - - scope = resolve_scope(scope) - sample = resolve_sample(sample) - - try: - do_test_feature_count(get_ida_extractor, sample, scope, feature, expected) - except Exception as e: - print("FAIL %s" % (id)) - traceback.print_exc() - else: - print("OK %s" % (id)) - - -if __name__ == "__main__": - print("-" * 80) - - # invoke all functions in this module that start with `test_` - for name in dir(sys.modules[__name__]): - if not name.startswith("test_"): - continue - - test = getattr(sys.modules[__name__], name) - logger.debug("invoking test: %s", name) - sys.stderr.flush() - test() - - print("DONE") +# run this script from within IDA with ./tests/data/mimikatz.exe open +import sys +import logging +import os.path +import binascii +import traceback + +import pytest + +try: + sys.path.append(os.path.dirname(__file__)) + from fixtures import * +finally: + sys.path.pop() + + +logger = logging.getLogger("test_ida_features") + + +def check_input_file(wanted): + import idautils + + # some versions (7.4) of IDA return a truncated version of the MD5. + # https://github.com/idapython/bin/issues/11 + try: + found = idautils.GetInputFileMD5()[:31].decode("ascii").lower() + except UnicodeDecodeError: + # in IDA 7.5 or so, GetInputFileMD5 started returning raw binary + # rather than the hex digest + found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower() + + if not wanted.startswith(found): + raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted)) + + +def get_ida_extractor(_path): + check_input_file("5f66b82558ca92e54e77f216ef4c066c") + + # have to import import this inline so pytest doesn't bail outside of IDA + import capa.features.extractors.ida + + return capa.features.extractors.ida.IdaFeatureExtractor() + + +@pytest.mark.skip(reason="IDA Pro tests must be run within IDA") +def test_ida_features(): + for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS: + id = make_test_id((sample, scope, feature, expected)) + + try: + check_input_file(get_sample_md5_by_name(sample)) + except RuntimeError: + print("SKIP %s" % (id)) + continue + + scope = resolve_scope(scope) + sample = resolve_sample(sample) + + try: + do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected) + except Exception as e: + print("FAIL %s" % (id)) + traceback.print_exc() + else: + print("OK %s" % (id)) + + +@pytest.mark.skip(reason="IDA Pro tests must be run within IDA") +def test_ida_feature_counts(): + for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS: + id = make_test_id((sample, scope, feature, expected)) + + try: + check_input_file(get_sample_md5_by_name(sample)) + except RuntimeError: + print("SKIP %s" % (id)) + continue + + scope = resolve_scope(scope) + sample = resolve_sample(sample) + + try: + do_test_feature_count(get_ida_extractor, sample, scope, feature, expected) + except Exception as e: + print("FAIL %s" % (id)) + traceback.print_exc() + else: + print("OK %s" % (id)) + + +if __name__ == "__main__": + print("-" * 80) + + # invoke all functions in this module that start with `test_` + for name in dir(sys.modules[__name__]): + if not name.startswith("test_"): + continue + + test = getattr(sys.modules[__name__], name) + logger.debug("invoking test: %s", name) + sys.stderr.flush() + test() + + print("DONE") diff --git a/tests/test_lancelot_features.py b/tests/test_lancelot_features.py new file mode 100644 index 00000000..e83b00af --- /dev/null +++ b/tests/test_lancelot_features.py @@ -0,0 +1,26 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + + +from fixtures import * + + +@parametrize( + "sample,scope,feature,expected", FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"], +) +def test_lancelot_features(sample, scope, feature, expected): + with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"): + do_test_feature_presence(get_lancelot_extractor, sample, scope, feature, expected) + + +@parametrize( + "sample,scope,feature,expected", FEATURE_COUNT_TESTS, indirect=["sample", "scope"], +) +def test_lancelot_feature_counts(sample, scope, feature, expected): + with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"): + do_test_feature_count(get_lancelot_extractor, sample, scope, feature, expected) diff --git a/tests/test_main.py b/tests/test_main.py index 2d9ac32d..05b121c8 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -47,7 +47,7 @@ def test_main_single_rule(z9324d_extractor, tmpdir): assert capa.main.main([path, "-v", "-r", rule_file.strpath,]) == 0 -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") +@pytest.mark.xfail(sys.version_info >= (3, 0), reason="lancelot doesn't support shellcode workspaces") def test_main_shellcode(z499c2_extractor): path = z499c2_extractor.path assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0