Compare commits

...

60 Commits

Author SHA1 Message Date
William Ballenthin
216e288ade unpack: add can_unpack routine 2020-08-29 21:38:26 -06:00
William Ballenthin
ec55a9c482 unpack: aspack: add doc 2020-08-26 15:15:06 -06:00
William Ballenthin
e5136683e6 unpack: aspack: add documentation 2020-08-26 14:26:36 -06:00
William Ballenthin
63561b73c1 unpack: aspack: split into its own module 2020-08-26 14:07:25 -06:00
William Ballenthin
b700ad1655 unpack: aspack: fixup PE imports after dumping 2020-08-26 13:49:08 -06:00
William Ballenthin
52eef09c8b unpack: aspack: split dump from fixup 2020-08-25 19:08:00 -06:00
William Ballenthin
de3f223bb5 unpack: aspack: fixup the PE header for OEP 2020-08-25 18:57:11 -06:00
William Ballenthin
6a2e5ad530 unpack: use speakeasy to find aspack OEP 2020-08-25 18:50:15 -06:00
William Ballenthin
5575166b7a unpack: flesh out tests 2020-08-25 12:53:43 -06:00
William Ballenthin
8a2ba9cd71 Merge branch 'backend-lancelot' into fix-46 2020-08-25 12:43:45 -06:00
William Ballenthin
e523c6cb50 init unpack module 2020-08-25 12:43:40 -06:00
William Ballenthin
854e3d7774 submodule: rules update 2020-08-20 15:15:14 -06:00
William Ballenthin
1b6ac29053 Merge branch 'master' into backend-lancelot 2020-08-17 00:04:57 -06:00
William Ballenthin
df4c75882d dos2unix 2020-08-16 04:58:35 -06:00
William Ballenthin
0cfbed05b4 ci: install pefile on py3 2020-08-16 01:41:25 -06:00
William Ballenthin
ca95512811 ci: test on both py2 and py3 2020-08-16 01:36:25 -06:00
William Ballenthin
bb9803fcc0 tests: limit tests to py2/py3 2020-08-16 01:10:10 -06:00
William Ballenthin
1fe945e3ed tests: main: xfail sc analysis on py3 2020-08-16 01:09:40 -06:00
William Ballenthin
6ba4798822 tests: fixtures: add ctxmgr for catching xfail 2020-08-16 01:09:06 -06:00
William Ballenthin
f424dd126f *: py3 compat 2020-08-16 00:05:26 -06:00
William Ballenthin
9fa128b27d tests: freeze: make py3 compatible 2020-08-16 00:05:04 -06:00
William Ballenthin
09bca1e5f7 setup: bump lancelot dep version 2020-08-16 00:04:39 -06:00
William Ballenthin
c623791a84 tests: lancelot: use common harness from fixtures 2020-08-15 23:32:50 -06:00
William Ballenthin
980a34adca setup: bump lancelot dep version 2020-08-15 23:32:35 -06:00
William Ballenthin
8721eb05eb tests: show found number of features when unexpected 2020-08-15 23:32:13 -06:00
William Ballenthin
512ea89662 Merge branch 'backend-lancelot' of github.com:fireeye/capa into backend-lancelot 2020-08-15 21:52:53 -06:00
William Ballenthin
15259d455c Merge branch 'master' into backend-lancelot 2020-08-15 21:52:28 -06:00
Willi Ballenthin
1e097ef759 setup: add pylancelot dep on py3 2020-08-15 11:13:06 -06:00
William Ballenthin
f193ceb91a merge 2020-08-14 10:26:52 -06:00
William Ballenthin
fb19841997 Merge branch 'master' into backend-lancelot 2020-08-13 17:24:28 -06:00
William Ballenthin
40d16c925f main: progress bar updates (+rules, and realize iterators) 2020-08-13 17:23:36 -06:00
William Ballenthin
d5f73b47a4 main: use lancelot on py3 2020-08-13 13:35:09 -06:00
William Ballenthin
caf738ee4e lancelot: implement indirect call resolution 2020-08-13 13:22:50 -06:00
William Ballenthin
c6f27200fe lancelot: pull get_operand_target into helpers 2020-08-13 13:22:29 -06:00
William Ballenthin
5c967cd6ef lancelot: insn: calls to/from 2020-08-13 12:39:32 -06:00
William Ballenthin
a59e1054fe tests: lancelot: feature counts 2020-08-10 19:04:29 -06:00
William Ballenthin
db45a06ba7 lancelot: insn: simplify operand target fetching 2020-08-10 19:04:09 -06:00
William Ballenthin
c739caee40 tests: lancelot: add tests for counts of features 2020-08-10 18:53:40 -06:00
William Ballenthin
bdf6b69be6 lancelot: insn: indirect call 2020-08-10 18:45:58 -06:00
William Ballenthin
fdd6f7434b lancelot: insn: xsection flow and recursive calls 2020-08-10 18:40:45 -06:00
William Ballenthin
21adb2b9d1 tests: lancelot: formatting 2020-08-10 18:16:14 -06:00
William Ballenthin
5929c0652c lancelot: insn: fs/gs 2020-08-10 18:15:10 -06:00
William Ballenthin
e7bf5bfceb lancelot: insn: nzxor 2020-08-10 18:05:26 -06:00
William Ballenthin
c2f55fad12 tests: lancelot: construct assert message 2020-08-10 18:05:08 -06:00
William Ballenthin
7ac4cf47f7 lancelot: insn: pass f, bb, insn throughout 2020-08-10 18:04:37 -06:00
William Ballenthin
3f49a224f5 lancelot: off-by-one instruction enumerator 2020-08-10 18:03:40 -06:00
William Ballenthin
695f1bf55a lancelot: insn: strings 2020-08-10 17:23:19 -06:00
William Ballenthin
10f5a54e1d lancelot: insn: bytes 2020-08-10 17:08:28 -06:00
William Ballenthin
042654ee97 lancelot: insn: mnemonic 2020-08-10 13:50:46 -06:00
William Ballenthin
1da450001c lancelot: insn: offset 2020-08-10 13:47:43 -06:00
William Ballenthin
7996e2efe7 tests: lancelot: remove old tests 2020-08-10 11:51:48 -06:00
William Ballenthin
5eded3c5cc lancelot: insn: implement API features 2020-08-10 11:49:37 -06:00
William Ballenthin
cdae840519 lancelot: file: fix import address 2020-08-10 11:49:11 -06:00
William Ballenthin
fcb8c4a293 tests: lancelot: override parametrize for better naming 2020-08-09 15:46:34 -06:00
William Ballenthin
4e6b475ff6 tests: lancelot: add number tests 2020-08-08 13:55:52 -06:00
William Ballenthin
02a8ad1ea4 tests: add more lancelot feature tests 2020-08-08 13:52:22 -06:00
William Ballenthin
393b332f9c feature: insn: better render negative offset 2020-08-08 13:52:01 -06:00
William Ballenthin
bf4a8dcd3e setup: add dep on backports.lru 2020-08-08 13:51:50 -06:00
William Ballenthin
9bde11fa6f extractor: lancelot: fix stackstring 2020-08-08 13:51:34 -06:00
William Ballenthin
43c6eec30b extractors: begin to implement lancelot backend 2020-08-08 12:48:56 -06:00
21 changed files with 1999 additions and 344 deletions

View File

@@ -62,7 +62,8 @@ jobs:
with: with:
python-version: ${{ matrix.python }} python-version: ${{ matrix.python }}
- name: Install capa - name: Install capa
run: pip install -e .[dev] # TODO: remove `pefile` when we bump lancelot >= 0.3.7
run: pip install -e .[dev] pefile
- name: Run tests - name: Run tests
run: pytest tests/ run: pytest tests/

View File

@@ -0,0 +1,92 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
import lancelot
import capa.features.extractors
import capa.features.extractors.lancelot.file
import capa.features.extractors.lancelot.insn
import capa.features.extractors.lancelot.function
import capa.features.extractors.lancelot.basicblock
__all__ = ["file", "function", "basicblock", "insn"]
logger = logging.getLogger(__name__)
class BB(object):
"""extend the lancelot.BasicBlock with an __int__ method to access the address"""
def __init__(self, ws, bb):
super(BB, self).__init__()
self.ws = ws
self.address = bb.address
self.length = bb.length
self.predecessors = bb.predecessors
self.successors = bb.successors
def __int__(self):
return self.address
@property
def instructions(self):
va = self.address
while va < self.address + self.length:
try:
insn = self.ws.read_insn(va)
except ValueError:
logger.warning("failed to read instruction at 0x%x", va)
return
yield insn
va += insn.length
class LancelotFeatureExtractor(capa.features.extractors.FeatureExtractor):
def __init__(self, buf):
super(LancelotFeatureExtractor, self).__init__()
self.buf = buf
self.ws = lancelot.from_bytes(buf)
self.ctx = {}
def get_base_address(self):
return self.ws.base_address
def extract_file_features(self):
for feature, va in capa.features.extractors.lancelot.file.extract_file_features(self.buf):
yield feature, va
def get_functions(self):
for va in self.ws.get_functions():
# this is just the address of the function
yield va
def extract_function_features(self, f):
for feature, va in capa.features.extractors.lancelot.function.extract_function_features(self.ws, f):
yield feature, va
def get_basic_blocks(self, f):
try:
cfg = self.ws.build_cfg(f)
except:
logger.warning("failed to build CFG for 0x%x", f)
return
else:
for bb in cfg.basic_blocks.values():
yield BB(self.ws, bb)
def extract_basic_block_features(self, f, bb):
for feature, va in capa.features.extractors.lancelot.basicblock.extract_basic_block_features(self.ws, bb):
yield feature, va
def get_instructions(self, f, bb):
return bb.instructions
def extract_insn_features(self, f, bb, insn):
for feature, va in capa.features.extractors.lancelot.insn.extract_insn_features(self, f, bb, insn):
yield feature, va

View File

@@ -0,0 +1,120 @@
import string
import struct
import logging
from lancelot import (
FLOW_VA,
OPERAND_SIZE,
OPERAND_TYPE,
MEMORY_OPERAND_BASE,
OPERAND_TYPE_MEMORY,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
)
from capa.features import Characteristic
from capa.features.basicblock import BasicBlock
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
logger = logging.getLogger(__name__)
def extract_bb_tight_loop(ws, bb):
""" check basic block for tight loop indicators """
if bb.address in map(lambda flow: flow[FLOW_VA], bb.successors):
yield Characteristic("tight loop"), bb.address
def is_mov_imm_to_stack(insn):
if not insn.mnemonic.startswith("mov"):
return False
try:
dst, src = insn.operands
except ValueError:
# not two operands
return False
if src[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE:
return False
if src[IMMEDIATE_OPERAND_VALUE] < 0:
return False
if dst[OPERAND_TYPE] != OPERAND_TYPE_MEMORY:
return False
if dst[MEMORY_OPERAND_BASE] not in ("ebp", "rbp", "esp", "rsp"):
return False
return True
def is_printable_ascii(chars):
return all(c < 127 and chr(c) in string.printable for c in chars)
def is_printable_utf16le(chars):
if all(c == b"\x00" for c in chars[1::2]):
return is_printable_ascii(chars[::2])
def get_printable_len(operand):
"""
Return string length if all operand bytes are ascii or utf16-le printable
"""
operand_size = operand[OPERAND_SIZE]
if operand_size == 8:
chars = struct.pack("<B", operand[IMMEDIATE_OPERAND_VALUE])
elif operand_size == 16:
chars = struct.pack("<H", operand[IMMEDIATE_OPERAND_VALUE])
elif operand_size == 32:
chars = struct.pack("<I", operand[IMMEDIATE_OPERAND_VALUE])
elif operand_size == 64:
chars = struct.pack("<Q", operand[IMMEDIATE_OPERAND_VALUE])
else:
raise ValueError("unexpected operand size: " + str(operand_size))
if is_printable_ascii(chars):
return operand_size / 8
if is_printable_utf16le(chars):
return operand_size / 16
return 0
def _bb_has_stackstring(ws, bb):
"""
extract potential stackstring creation, using the following heuristics:
- basic block contains enough moves of constant bytes to the stack
"""
count = 0
for insn in bb.instructions:
if is_mov_imm_to_stack(insn):
# add number of operand bytes
src = insn.operands[1]
count += get_printable_len(src)
if count > MIN_STACKSTRING_LEN:
return True
return False
def extract_stackstring(ws, bb):
""" check basic block for stackstring indicators """
if _bb_has_stackstring(ws, bb):
yield Characteristic("stack string"), bb.address
def extract_basic_block_features(ws, bb):
yield BasicBlock(), bb.address
for bb_handler in BASIC_BLOCK_HANDLERS:
for feature, va in bb_handler(ws, bb):
yield feature, va
BASIC_BLOCK_HANDLERS = (
extract_bb_tight_loop,
extract_stackstring,
)

View File

@@ -0,0 +1,81 @@
import pefile
import capa.features.extractors.strings
from capa.features import String, Characteristic
from capa.features.file import Export, Import, Section
def extract_file_embedded_pe(buf, pe):
buf = buf[2:]
total_offset = 2
while True:
try:
offset = buf.index(b"MZ")
except ValueError:
return
else:
rest = buf[offset:]
total_offset += offset
try:
_ = pefile.PE(data=rest)
except:
pass
else:
yield Characteristic("embedded pe"), total_offset
buf = rest[2:]
total_offset += 2
def extract_file_export_names(buf, pe):
if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
return
base_address = pe.OPTIONAL_HEADER.ImageBase
for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
yield Export(exp.name.decode("ascii")), base_address + exp.address
def extract_file_import_names(buf, pe):
base_address = pe.OPTIONAL_HEADER.ImageBase
for entry in pe.DIRECTORY_ENTRY_IMPORT:
libname = entry.dll.decode("ascii").lower().partition(".")[0]
for imp in entry.imports:
if imp.ordinal:
yield Import("%s.#%s" % (libname, imp.ordinal)), imp.address
else:
impname = imp.name.decode("ascii")
yield Import("%s.%s" % (libname, impname)), imp.address
yield Import("%s" % (impname)), imp.address
def extract_file_section_names(buf, pe):
base_address = pe.OPTIONAL_HEADER.ImageBase
for section in pe.sections:
yield Section(section.Name.partition(b"\x00")[0].decode("ascii")), base_address + section.VirtualAddress
def extract_file_strings(buf, pe):
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
yield String(s.s), s.offset
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
yield String(s.s), s.offset
def extract_file_features(buf):
pe = pefile.PE(data=buf)
for file_handler in FILE_HANDLERS:
for feature, va in file_handler(buf, pe):
yield feature, va
FILE_HANDLERS = (
extract_file_embedded_pe,
extract_file_export_names,
extract_file_import_names,
extract_file_section_names,
extract_file_strings,
)

View File

@@ -0,0 +1,64 @@
import logging
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
from lancelot import (
FLOW_VA,
FLOW_TYPE,
FLOW_TYPE_CONDITIONAL_JUMP,
FLOW_TYPE_CONDITIONAL_MOVE,
FLOW_TYPE_UNCONDITIONAL_JUMP,
)
from capa.features import Characteristic
from capa.features.extractors import loops
logger = logging.getLogger(__name__)
@lru_cache
def get_call_graph(ws):
return ws.build_call_graph()
def extract_function_calls_to(ws, f):
cg = get_call_graph(ws)
for caller in cg.calls_to.get(f, []):
yield Characteristic("calls to"), caller
def extract_function_loop(ws, f):
edges = []
for bb in ws.build_cfg(f).basic_blocks.values():
for flow in bb.successors:
if flow[FLOW_TYPE] in (
FLOW_TYPE_UNCONDITIONAL_JUMP,
FLOW_TYPE_CONDITIONAL_JUMP,
FLOW_TYPE_CONDITIONAL_MOVE,
):
edges.append((bb.address, flow[FLOW_VA]))
continue
if edges and loops.has_loop(edges):
yield Characteristic("loop"), f
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)
_not_implemented = set([])
def extract_function_features(ws, f):
for func_handler in FUNCTION_HANDLERS:
try:
for feature, va in func_handler(ws, f):
yield feature, va
except NotImplementedError:
if func_handler.__name__ not in _not_implemented:
logger.warning("not implemented: %s", func_handler.__name__)
_not_implemented.add(func_handler.__name__)

View File

@@ -0,0 +1,33 @@
from lancelot import (
OPERAND_TYPE,
MEMORY_OPERAND_BASE,
MEMORY_OPERAND_DISP,
OPERAND_TYPE_MEMORY,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
IMMEDIATE_OPERAND_IS_RELATIVE,
)
def get_operand_target(insn, op):
if op[OPERAND_TYPE] == OPERAND_TYPE_MEMORY:
# call direct, x64
# rip relative
# kernel32-64:180001041 call cs:__imp_RtlVirtualUnwind_0
if op[MEMORY_OPERAND_BASE] == "rip":
return op[MEMORY_OPERAND_DISP] + insn.address + insn.length
# call direct, x32
# mimikatz:0x403BD3 call ds:CryptAcquireContextW
elif op[MEMORY_OPERAND_BASE] == None:
return op[MEMORY_OPERAND_DISP]
# call via thunk
# mimikatz:0x455A41 call LsaQueryInformationPolicy
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE and op[IMMEDIATE_OPERAND_IS_RELATIVE]:
return op[IMMEDIATE_OPERAND_VALUE] + insn.address + insn.length
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE:
return op[IMMEDIATE_OPERAND_VALUE]
raise ValueError("memory operand has no target")

View File

@@ -0,0 +1,149 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import collections
from lancelot import (
FLOW_VA,
OPERAND_TYPE,
PERMISSION_READ,
MEMORY_OPERAND_BASE,
MEMORY_OPERAND_DISP,
OPERAND_TYPE_MEMORY,
MEMORY_OPERAND_INDEX,
OPERAND_TYPE_REGISTER,
MEMORY_OPERAND_SEGMENT,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
REGISTER_OPERAND_REGISTER,
IMMEDIATE_OPERAND_IS_RELATIVE,
)
from capa.features.extractors.lancelot.helpers import get_operand_target
DESTRUCTIVE_MNEMONICS = ("mov", "lea", "pop", "xor")
class NotFoundError(Exception):
pass
def read_instructions(ws, bb):
va = bb.address
while va < bb.address + bb.length:
try:
insn = ws.read_insn(va)
except ValueError:
return
yield insn
va += insn.length
def build_instruction_predecessors(ws, cfg):
preds = collections.defaultdict(set)
for bb in cfg.basic_blocks.values():
insns = list(read_instructions(ws, bb))
for i, insn in enumerate(insns):
if i == 0:
for pred in bb.predecessors:
pred_bb = cfg.basic_blocks[pred[FLOW_VA]]
preds[insn.address].add(list(read_instructions(ws, pred_bb))[-1].address)
else:
preds[insn.address].add(insns[i - 1].address)
return preds
def find_definition(ws, f, insn):
"""
scan backwards from the given address looking for assignments to the given register.
if a constant, return that value.
args:
ws (lancelot.PE)
f (int): the function start address
insn (lancelot.Instruction): call instruction to resolve
returns:
(va: int, value?: int|None): the address of the assignment and the value, if a constant.
raises:
NotFoundError: when the definition cannot be found.
"""
assert insn.mnemonic == "call"
op0 = insn.operands[0]
assert op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER
reg = op0[REGISTER_OPERAND_REGISTER]
cfg = ws.build_cfg(f)
preds = build_instruction_predecessors(ws, cfg)
q = collections.deque()
seen = set([])
q.extend(preds[insn.address])
while q:
cur = q.popleft()
# skip if we've already processed this location
if cur in seen:
continue
seen.add(cur)
insn = ws.read_insn(cur)
operands = insn.operands
if len(operands) == 0:
q.extend(preds[cur])
continue
op0 = operands[0]
if not (
op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER
and op0[REGISTER_OPERAND_REGISTER] == reg
and insn.mnemonic in DESTRUCTIVE_MNEMONICS
):
q.extend(preds[cur])
continue
# if we reach here, the instruction is destructive to our target register.
# we currently only support extracting the constant from something like: `mov $reg, IAT`
# so, any other pattern results in an unknown value, represented by None.
# this is a good place to extend in the future, if we need more robust support.
if insn.mnemonic != "mov":
return (cur, None)
else:
op1 = operands[1]
try:
target = get_operand_target(insn, op1)
except ValueError:
return (cur, None)
else:
return (cur, target)
raise NotFoundError()
def is_indirect_call(insn):
return insn.mnemonic == "call" and insn.operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER
def resolve_indirect_call(ws, f, insn):
"""
inspect the given indirect call instruction and attempt to resolve the target address.
args:
ws (lancelot.PE): the analysis workspace
f (int): the address of the function to analyze
insn (lancelot.Instruction): the instruction at which to start analysis
returns:
(va: int, value?: int|None): the address of the assignment and the value, if a constant.
raises:
NotFoundError: when the definition cannot be found.
"""
assert is_indirect_call(insn)
return find_definition(ws, f, insn)

View File

@@ -0,0 +1,487 @@
import logging
import itertools
import pefile
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
from lancelot import (
OPERAND_TYPE,
PERMISSION_READ,
MEMORY_OPERAND_BASE,
MEMORY_OPERAND_DISP,
OPERAND_TYPE_MEMORY,
MEMORY_OPERAND_INDEX,
OPERAND_TYPE_REGISTER,
MEMORY_OPERAND_SEGMENT,
OPERAND_TYPE_IMMEDIATE,
IMMEDIATE_OPERAND_VALUE,
REGISTER_OPERAND_REGISTER,
IMMEDIATE_OPERAND_IS_RELATIVE,
)
import capa.features.extractors.helpers
from capa.features import ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, Bytes, String, Characteristic
from capa.features.insn import Number, Offset, Mnemonic
from capa.features.extractors.lancelot.helpers import get_operand_target
from capa.features.extractors.lancelot.function import get_call_graph
from capa.features.extractors.lancelot.indirect_calls import NotFoundError, resolve_indirect_call
logger = logging.getLogger(__name__)
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
# byte range within the first and returning basic blocks, this helps to reduce FP features
SECURITY_COOKIE_BYTES_DELTA = 0x40
def get_arch(ws):
if ws.arch == "x32":
return ARCH_X32
elif ws.arch == "x64":
return ARCH_X64
else:
raise ValueError("unexpected architecture")
@lru_cache
def get_pefile(xtor):
return pefile.PE(data=xtor.buf)
@lru_cache
def get_imports(xtor):
pe = get_pefile(xtor)
imports = {}
for entry in pe.DIRECTORY_ENTRY_IMPORT:
libname = entry.dll.decode("ascii").lower().partition(".")[0]
for imp in entry.imports:
if imp.ordinal:
imports[imp.address] = "%s.#%s" % (libname, imp.ordinal)
else:
impname = imp.name.decode("ascii")
imports[imp.address] = "%s.%s" % (libname, impname)
return imports
@lru_cache
def get_thunks(xtor):
thunks = {}
for va in xtor.ws.get_functions():
try:
insn = xtor.ws.read_insn(va)
except ValueError:
continue
if insn.mnemonic != "jmp":
continue
op0 = insn.operands[0]
try:
target = get_operand_target(insn, op0)
except ValueError:
continue
imports = get_imports(xtor)
if target not in imports:
continue
thunks[va] = imports[target]
return thunks
def extract_insn_api_features(xtor, f, bb, insn):
"""parse API features from the given instruction."""
if insn.mnemonic != "call":
return
op0 = insn.operands[0]
if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER:
try:
(_, target) = resolve_indirect_call(xtor.ws, f, insn)
except NotFoundError:
return
if target is None:
return
else:
try:
target = get_operand_target(insn, op0)
except ValueError:
return
imports = get_imports(xtor)
if target in imports:
for feature, va in capa.features.extractors.helpers.generate_api_features(imports[target], insn.address):
yield feature, va
return
thunks = get_thunks(xtor)
if target in thunks:
for feature, va in capa.features.extractors.helpers.generate_api_features(thunks[target], insn.address):
yield feature, va
def extract_insn_mnemonic_features(xtor, f, bb, insn):
"""parse mnemonic features from the given instruction."""
yield Mnemonic(insn.mnemonic), insn.address
def extract_insn_number_features(xtor, f, bb, insn):
"""parse number features from the given instruction."""
operands = insn.operands
for operand in operands:
if operand[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE:
continue
v = operand[IMMEDIATE_OPERAND_VALUE]
if xtor.ws.probe(v) & PERMISSION_READ:
# v is a valid address
# therefore, assume its not also a constant.
continue
if (
insn.mnemonic == "add"
and operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER
and operands[0][REGISTER_OPERAND_REGISTER] == "esp"
):
# skip things like:
#
# .text:00401140 call sub_407E2B
# .text:00401145 add esp, 0Ch
return
yield Number(v), insn.address
yield Number(v, arch=get_arch(xtor.ws)), insn.address
def extract_insn_offset_features(xtor, f, bb, insn):
"""parse structure offset features from the given instruction."""
operands = insn.operands
for operand in operands:
if operand[OPERAND_TYPE] != OPERAND_TYPE_MEMORY:
continue
if operand[MEMORY_OPERAND_BASE] in ("esp", "ebp", "rbp"):
continue
# lancelot provides `None` when the displacement is not present.
v = operand[MEMORY_OPERAND_DISP] or 0
yield Offset(v), insn.address
yield Offset(v, arch=get_arch(xtor.ws)), insn.address
def derefs(xtor, p):
"""
recursively follow the given pointer, yielding the valid memory addresses along the way.
useful when you may have a pointer to string, or pointer to pointer to string, etc.
this is a "do what i mean" type of helper function.
"""
depth = 0
while True:
if not xtor.ws.probe(p) & PERMISSION_READ:
return
yield p
next = xtor.ws.read_pointer(p)
# sanity: pointer points to self
if next == p:
return
# sanity: avoid chains of pointers that are unreasonably deep
depth += 1
if depth > 10:
return
p = next
def read_bytes(xtor, va):
"""
read up to MAX_BYTES_FEATURE_SIZE from the given address.
raises:
ValueError: if the given address is not valid.
"""
start = va
end = va + MAX_BYTES_FEATURE_SIZE
pe = get_pefile(xtor)
for section in pe.sections:
section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress
section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize
if section_start <= start < section_end:
end = min(end, section_end)
return xtor.ws.read_bytes(start, end - start)
raise ValueError("invalid address")
# these are mnemonics that may flow (jump) elsewhere
FLOW_MNEMONICS = set(
[
"call",
"jb",
"jbe",
"jcxz",
"jecxz",
"jknzd",
"jkzd",
"jl",
"jle",
"jmp",
"jnb",
"jnbe",
"jnl",
"jnle",
"jno",
"jnp",
"jns",
"jnz",
"jo",
"jp",
"jrcxz",
"js",
"jz",
]
)
def extract_insn_bytes_features(xtor, f, bb, insn):
"""
parse byte sequence features from the given instruction.
"""
if insn.mnemonic in FLOW_MNEMONICS:
return
for operand in insn.operands:
try:
target = get_operand_target(insn, operand)
except ValueError:
continue
for ptr in derefs(xtor, target):
try:
buf = read_bytes(xtor, ptr)
except ValueError:
continue
if capa.features.extractors.helpers.all_zeros(buf):
continue
yield Bytes(buf), insn.address
def first(s):
"""enumerate the first element in the sequence"""
for i in s:
yield i
break
def extract_insn_string_features(xtor, f, bb, insn):
"""parse string features from the given instruction."""
for bytez, va in extract_insn_bytes_features(xtor, f, bb, insn):
buf = bytez.value
for s in itertools.chain(
first(capa.features.extractors.strings.extract_ascii_strings(buf)),
first(capa.features.extractors.strings.extract_unicode_strings(buf)),
):
if s.offset == 0:
yield String(s.s), va
def is_security_cookie(xtor, f, bb, insn):
"""
check if an instruction is related to security cookie checks
"""
op1 = insn.operands[1]
if op1[OPERAND_TYPE] == OPERAND_TYPE_REGISTER and op1[REGISTER_OPERAND_REGISTER] not in (
"esp",
"ebp",
"rbp",
"rsp",
):
return False
# expect security cookie init in first basic block within first bytes (instructions)
if f == bb.address and insn.address < (bb.address + SECURITY_COOKIE_BYTES_DELTA):
return True
# ... or within last bytes (instructions) before a return
insns = list(xtor.get_instructions(f, bb))
if insns[-1].mnemonic in ("ret", "retn") and insn.address > (bb.address + bb.length - SECURITY_COOKIE_BYTES_DELTA):
return True
return False
def extract_insn_nzxor_characteristic_features(xtor, f, bb, insn):
"""
parse non-zeroing XOR instruction from the given instruction.
ignore expected non-zeroing XORs, e.g. security cookies.
"""
if insn.mnemonic != "xor":
return
operands = insn.operands
if operands[0] == operands[1]:
return
if is_security_cookie(xtor, f, bb, insn):
return
yield Characteristic("nzxor"), insn.address
def extract_insn_peb_access_characteristic_features(xtor, f, bb, insn):
"""
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
"""
for operand in insn.operands:
if (
operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY
and operand[MEMORY_OPERAND_SEGMENT] == "gs"
and operand[MEMORY_OPERAND_DISP] == 0x60
):
yield Characteristic("peb access"), insn.address
if (
operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY
and operand[MEMORY_OPERAND_SEGMENT] == "fs"
and operand[MEMORY_OPERAND_DISP] == 0x30
):
yield Characteristic("peb access"), insn.address
def extract_insn_segment_access_features(xtor, f, bb, insn):
""" parse the instruction for access to fs or gs """
for operand in insn.operands:
if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "gs":
yield Characteristic("gs access"), insn.address
if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "fs":
yield Characteristic("fs access"), insn.address
def get_section(xtor, va):
pe = get_pefile(xtor)
for i, section in enumerate(pe.sections):
section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress
section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize
if section_start <= va < section_end:
return i
raise ValueError("invalid address")
def extract_insn_cross_section_cflow(xtor, f, bb, insn):
"""
inspect the instruction for a CALL or JMP that crosses section boundaries.
"""
if insn.mnemonic not in FLOW_MNEMONICS:
return
try:
target = get_operand_target(insn, insn.operands[0])
except ValueError:
return
if target in get_imports(xtor):
return
try:
if get_section(xtor, insn.address) != get_section(xtor, target):
yield Characteristic("cross section flow"), insn.address
except ValueError:
return
def extract_function_calls_from(xtor, f, bb, insn):
cg = get_call_graph(xtor.ws)
for callee in cg.calls_from.get(insn.address, []):
yield Characteristic("calls from"), callee
if callee == f:
yield Characteristic("recursive call"), insn.address
# lancelot doesn't count API calls when constructing the call graph
# so we still have to scan for calls to an import
if insn.mnemonic != "call":
return
try:
target = get_operand_target(insn, insn.operands[0])
except ValueError:
return
imports = get_imports(xtor)
if target in imports:
yield Characteristic("calls from"), target
# this is a feature that's most relevant at the function or basic block scope,
# however, its most efficient to extract at the instruction scope.
def extract_function_indirect_call_characteristic_features(xtor, f, bb, insn):
"""
extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4])
does not include calls like => call ds:dword_ABD4974
"""
if insn.mnemonic != "call":
return
op0 = insn.operands[0]
if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER:
yield Characteristic("indirect call"), insn.address
elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_BASE] is not None:
yield Characteristic("indirect call"), insn.address
elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_INDEX] is not None:
yield Characteristic("indirect call"), insn.address
_not_implemented = set([])
def extract_insn_features(xtor, f, bb, insn):
for insn_handler in INSTRUCTION_HANDLERS:
try:
for feature, va in insn_handler(xtor, f, bb, insn):
yield feature, va
except NotImplementedError:
if insn_handler.__name__ not in _not_implemented:
logger.warning("not implemented: %s", insn_handler.__name__)
_not_implemented.add(insn_handler.__name__)
INSTRUCTION_HANDLERS = (
extract_insn_api_features,
extract_insn_number_features,
extract_insn_string_features,
extract_insn_bytes_features,
extract_insn_offset_features,
extract_insn_nzxor_characteristic_features,
extract_insn_mnemonic_features,
extract_insn_peb_access_characteristic_features,
extract_insn_cross_section_cflow,
extract_insn_segment_access_features,
extract_function_calls_from,
extract_function_indirect_call_characteristic_features,
)

View File

@@ -8,11 +8,7 @@
import types import types
import file
import insn
import function
import viv_utils import viv_utils
import basicblock
import capa.features.extractors import capa.features.extractors
import capa.features.extractors.viv.file import capa.features.extractors.viv.file

View File

@@ -24,7 +24,10 @@ class Number(Feature):
super(Number, self).__init__(value, arch=arch, description=description) super(Number, self).__init__(value, arch=arch, description=description)
def get_value_str(self): def get_value_str(self):
return "0x%X" % self.value if self.value < 0:
return "-0x%X" % (-self.value)
else:
return "0x%X" % self.value
class Offset(Feature): class Offset(Feature):

View File

@@ -32,7 +32,7 @@ import capa.features.extractors
from capa.helpers import oint, get_file_taste from capa.helpers import oint, get_file_taste
RULES_PATH_DEFAULT_STRING = "(embedded rules)" RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SUPPORTED_FILE_MAGIC = set(["MZ"]) SUPPORTED_FILE_MAGIC = set([b"MZ"])
logger = logging.getLogger("capa") logger = logging.getLogger("capa")
@@ -290,7 +290,24 @@ class UnsupportedRuntimeError(RuntimeError):
def get_extractor_py3(path, format, disable_progress=False): def get_extractor_py3(path, format, disable_progress=False):
raise UnsupportedRuntimeError() try:
import lancelot
import capa.features.extractors.lancelot
except ImportError:
logger.warning("lancelot not installed")
raise UnsupportedRuntimeError()
if format not in ("pe", "auto"):
raise UnsupportedFormatError(format)
if not is_supported_file_type(path):
raise UnsupportedFormatError()
with open(path, "rb") as f:
buf = f.read()
return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf)
def get_extractor(path, format, disable_progress=False): def get_extractor(path, format, disable_progress=False):

47
capa/unpack/__init__.py Normal file
View File

@@ -0,0 +1,47 @@
import sys
import logging
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
logger = logging.getLogger(__name__)
class NotPackedError(ValueError):
def __init__(self):
super(NotPackedError, self).__init__("not packed")
def can_unpack():
# the unpacking backend is based on Speakeasy, which supports python 3.6+
return sys.version_info >= (3, 6)
@lru_cache
def get_unpackers():
# break import loop
import capa.unpack.aspack
return {p.name: p for p in [capa.unpack.aspack.AspackUnpacker]}
def detect_packer(buf):
for unpacker in get_unpackers().values():
if unpacker.is_packed(buf):
return unpacker.name
raise NotPackedError()
def is_packed(buf):
try:
detect_packer(buf)
return True
except NotPackedError:
return False
def unpack_pe(packer, buf):
return get_unpackers()[packer].unpack_pe(buf)

459
capa/unpack/aspack.py Normal file
View File

@@ -0,0 +1,459 @@
import io
import struct
import logging
import contextlib
import collections
import pefile
import speakeasy
import speakeasy.common as se_common
import speakeasy.profiler
import speakeasy.windows.objman
logger = logging.getLogger(__name__)
def pefile_get_section_by_name(pe, section_name):
for section in pe.sections:
try:
if section.Name.partition(b"\x00")[0].decode("ascii") == section_name:
return section
except:
continue
raise ValueError("section not found")
def prepare_emu_context(se, module):
"""
prepare an Speakeasy instance for emulating the given module, without running it.
this is useful when planning to manually control the emulator,
such as via `Speakeasy.emu.emu_eng.start(...)`.
typically, Speakeasy expects to do "Run based" analysis,
which doesn't give us too much control.
much of this was derived from win32::Win32Emulator::run_module.
hopefully this can eventually be merged into Speakeasy.
args:
se (speakeasy.Speakeasy): the instance to prepare
module (speakeasy.Module): the module that will be emulated
"""
se._init_hooks()
main_exe = None
if not module.is_exe():
container = se.emu.init_container_process()
if container:
se.emu.processes.append(container)
se.emu.curr_process = container
else:
main_exe = module
if main_exe:
se.emu.user_modules = [main_exe] + se.emu.user_modules
# Create an empty process object for the module if none is supplied
if len(se.emu.processes) == 0:
p = speakeasy.windows.objman.Process(se.emu, path=module.get_emu_path(), base=module.base, pe=module)
se.emu.curr_process = p
t = speakeasy.windows.objman.Thread(se.emu, stack_base=se.emu.stack_base, stack_commit=module.stack_commit)
se.emu.om.objects.update({t.address: t})
se.emu.curr_process.threads.append(t)
se.emu.curr_thread = t
peb = se.emu.alloc_peb(se.emu.curr_process)
se.emu.init_teb(t, peb)
INSN_PUSHA = 0x60
INSN_POPA = 0x61
class AspackUnpacker(speakeasy.Speakeasy):
name = "aspack"
def __init__(self, buf, debug=False):
super(AspackUnpacker, self).__init__(debug=debug)
self.module = self.load_module(data=buf)
prepare_emu_context(self, self.module)
@staticmethod
def detect_aspack(buf):
"""
return True if the given buffer contains an ASPack'd PE file.
we detect aspack by looking at the section names for .aspack.
the unpacking routine contains further validation and will raise an exception if necessary.
args:
buf (bytes): the contents of a PE file.
returns: bool
"""
try:
pe = pefile.PE(data=buf, fast_load=True)
except:
return False
try:
pefile_get_section_by_name(pe, ".aspack")
except ValueError:
pass
else:
return True
return False
@classmethod
def unpack_pe(cls, buf):
"""
unpack the given buffer that contains an ASPack'd PE file.
return the contents of a reconstructed PE file.
args:
buf (bytes): the contents of an ASPack'd PE file.
returns: bytes
"""
unpacker = cls(buf)
return unpacker.unpack()
def stepi(self):
self.emu.emu_eng.start(self.emu.get_pc(), count=1)
def remove_hook(self, hook_type, hook_handle):
# TODO: this should be part of speakeasy
self.emu.hooks[hook_type].remove(hook_handle)
self.emu.emu_eng.hook_remove(hook_handle.handle)
def remove_mem_read_hook(self, hook_handle):
# TODO: this should be part of speakeasy
self.remove_hook(se_common.HOOK_MEM_READ, hook_handle)
@contextlib.contextmanager
def mem_read_hook(self, hook):
"""
context manager for temporarily installing a hook on the emulator.
example:
with self.mem_read_hook(lambda emu, access, addr, size, ctx: emu.stop()):
self.emu.emu_eng.start(0x401000)
args:
hook (speakeasy.common.MemReadHook): the hook to install
"""
handle = self.add_mem_read_hook(hook)
# if this fails, then there's still an unfixed bug in Speakeasy
assert handle.handle != 0
try:
yield
finally:
self.remove_mem_read_hook(handle)
def remove_code_hook(self, hook_handle):
# TODO: this should be part of speakeasy
self.remove_hook(se_common.HOOK_CODE, hook_handle)
@contextlib.contextmanager
def code_hook(self, hook):
"""
context manager for temporarily installing a hook on the emulator.
example:
with self.code_hook(lambda emu, addr, size, ctx: emu.stop()):
self.emu.emu_eng.start(0x401000)
args:
hook (speakeasy.common.CodeHook): the hook to install
"""
handle = self.add_code_hook(hook)
assert handle.handle != 0
try:
yield
finally:
self.remove_code_hook(handle)
def read_ptr(self, va):
endian = "little"
val = self.mem_read(va, self.emu.ptr_size)
return int.from_bytes(val, endian)
def dump(self):
"""
emulate the loaded module, pausing after an appropriate section hop.
then, dump and return the module's memory and OEP.
this routine is specific to aspack. it makes the following assumptions:
- aspack starts with a PUSHA to save off the CPU context
- aspeck then runs its unpacking stub
- aspeck executes POPA to restore the CPU context
- aspack section hops to the OEP
we'll emulate in a few phases:
1. single step over PUSHA at the entrypoint
2. extract the address of the saved CPU context
3. emulate until the saved CPU context is read
4. assert this is a POPA instruction
5. emulate until a section hop
6. profit!
return the module's memory segment and the OEP.
returns: Tuple[byte, int]
"""
# prime the emulator.
# this is derived from winemu::WindowsEmulator::start()
self.emu.curr_run = speakeasy.profiler.Run()
self.emu.curr_mod = self.module
self.emu.set_hooks()
self.emu._set_emu_hooks()
# 0. sanity checking: assert entrypoint is a PUSHA instruction
entrypoint = self.module.base + self.module.ep
opcode = self.emu.mem_read(entrypoint, 1)[0]
if opcode != INSN_PUSHA:
raise ValueError("not packed with supported ASPack")
# 1. single step over PUSHA
self.emu.set_pc(entrypoint)
self.stepi()
# 2. extract address of saved CPU context
saved_cpu_context = self.emu.get_stack_ptr()
# 3. emulate until saved CPU context is accessed
def until_read(target):
"""return a mem_read hook that stops the emulator when an address is read."""
def inner(emu, _access, addr, _size, _value, _ctx):
if addr == target:
emu.stop()
return True
return inner
with self.mem_read_hook(until_read(saved_cpu_context)):
self.emu.emu_eng.start(self.emu.get_pc())
# 4. assert this is a POPA instruction
opcode = self.emu.mem_read(self.emu.get_pc(), 1)[0]
if opcode != INSN_POPA:
raise ValueError("not packed with supported ASPack")
logger.debug("POPA: 0x%x", self.emu.get_pc())
# 5. emulate until a section hop
aspack_section = self.module.get_section_by_name(".aspack")
start = self.module.base + aspack_section.VirtualAddress
end = start + aspack_section.Misc_VirtualSize
def until_section_hop(start, end):
def inner(emu, addr, _size, _ctx):
if addr < start or addr >= end:
emu.stop()
return True
return inner
with self.code_hook(until_section_hop(start, end)):
self.emu.emu_eng.start(self.emu.get_pc())
# 6. dump and return
oep = self.emu.get_pc()
logger.debug("OEP: 0x%x", oep)
mm = self.get_address_map(self.module.base)
buf = self.mem_read(mm.base, mm.size)
return buf, oep
def fixup(self, buf, oep):
"""
fixup a PE image that's been dumped from memory after unpacking aspack.
there are two big fixes that need to happen:
1. update the section pointers and sizes
2. rebuild the import table
for (1) updating the section pointers, we'll just update the
physical pointers to match the virtual pointers, since this is a loaded image.
for (2) rebuilding the import table, we'll:
(a) inspect the emulation results for resolved imports, which tells us dll/symbol names
(b) scan the dumped image for the unpacked import thunks (Import Address Table/Thunk Table)
(c) match the import thunks with resolved imports
(d) build the import table structures
(e) write the reconstructed table into the .aspack section
since the .aspack section contains the unpacking stub, which is no longer used,
then we'll write the reconstructed IAT there. hopefully its big enough.
"""
pe = pefile.PE(data=buf)
pe.OPTIONAL_HEADER.AddressOfEntryPoint = oep - self.module.base
# 1. update section pointers and sizes.
for section in pe.sections:
section.PointerToRawData = section.VirtualAddress
section.SizeOfRawData = section.Misc_VirtualSize
# 2. rebuild the import table
# place the reconstructed import table in the .aspack section (unpacking stub)
reconstruction_target = pefile_get_section_by_name(pe, ".aspack").VirtualAddress
# mapping from import pointer to (dll name, symbol name).
# the import pointer is generated by speakeasy and is not mapped.
# it often looks something like 0xfeedf008.
# as we encounter pointers with values like this, we can resolve the symbol.
imports = {}
# 2a. find resolved imports
for addr, (dll, sym) in self.module.import_table.items():
# these are items in the original import table.
logger.debug(f"found static import {dll}.{sym}")
imports[addr] = (dll, sym)
for (addr, dll, sym) in self.emu.dyn_imps:
# these are imports that have been resolved at runtime by the unpacking stub.
logger.debug(f"found dynamic import {dll}.{sym}")
imports[addr] = (dll, sym)
# 2b. find the existing thunk tables
# these are pointer-aligned tables of import pointers.
# in my test sample, its found at the start of the first section.
# ordered list of tuples (VA, import pointer)
# look up the symbol using the import pointer and the `imports` mapping.
thunks = []
# scan from the start of the first section
# until we reach values that don't look like thunk tables.
for va in range(pe.sections[0].VirtualAddress + self.module.base, 0xFFFFFFFFFFFFFFFF, self.emu.ptr_size):
ptr = self.read_ptr(va)
if ptr == 0:
# probably padding/terminating entry
continue
if ptr in imports:
thunks.append((va, ptr,))
logger.debug(f"found import thunk at {va:08x} to {ptr:08x} for {imports[ptr][0]}\t{imports[ptr][1]}")
continue
# otherwise, at the end of the thunk tables
break
# collect the thunk entries into contiguous tables, grouped by dll name.
#
# list of thunk tuples that are contiguous and have the same dll name:
# (VA, import pointer, dll name, symbol name)
curr_idt_table = []
# list of list of thunk tuples, like above
idt_tables = []
for thunk in thunks:
va, imp = thunk
dll, sym = imports[imp]
if not curr_idt_table:
curr_idt_table.append((va, imp, dll, sym))
elif curr_idt_table[0][2] == dll:
curr_idt_table.append((va, imp, dll, sym))
else:
idt_tables.append(curr_idt_table)
curr_idt_table = [(va, imp, dll, sym)]
idt_tables.append(curr_idt_table)
# 2d. build the import table structures
# mapping from the data identifier to its RVA (which will be found within the reconstruction blob)
locations = {}
# the raw bytes of the reconstructed import structures.
# it will have the following layout:
# 1. DLL name strings and Hint/Name table entries
# 2. Import Lookup Tables (points into (1))
# 3. Import Directory Tables (points into (1), (2), and original Thunk Tables)
reconstruction = io.BytesIO()
# list of dll names
dlls = list(sorted(set(map(lambda pair: pair[0], imports.values()))))
# mapping from dll name to list of symbols
symbols = collections.defaultdict(set)
for dll, sym in imports.values():
symbols[dll].add(sym)
# emit strings into the reconstruction blob
for dll in dlls:
locations[("dll", dll)] = reconstruction_target + reconstruction.tell()
reconstruction.write(dll.encode("ascii") + b"\x00")
if reconstruction.tell() % 2 == 1:
# padding
reconstruction.write(b"\x00")
for sym in sorted(symbols[dll]):
locations[("hint", dll, sym)] = reconstruction_target + reconstruction.tell()
# export name pointer table hint == 0
reconstruction.write(b"\x00\x00")
# name
reconstruction.write(sym.encode("ascii") + b"\x00")
if reconstruction.tell() % 2 == 1:
# padding
reconstruction.write(b"\x00")
# emit Import Lookup Tables for each recovered thunk table
ptr_format = "<I" if self.emu.ptr_size == 4 else "<Q"
for i, idt_entry in enumerate(idt_tables):
locations[("import lookup table", i)] = reconstruction_target + reconstruction.tell()
for (va, imp, dll, sym) in idt_entry:
reconstruction.write(struct.pack(ptr_format, locations[("hint", dll, sym)]))
reconstruction.write(b"\x00" * 8)
# emit Import Descriptor Tables for each recovered thunk table
IDT_ENTRY_SIZE = 0x20
for i, idt_entry in enumerate(idt_tables):
va, _, dll, _ = idt_entry[0]
rva = va - self.module.base
locations[("import descriptor table", i)] = reconstruction_target + reconstruction.tell()
# import lookup table rva
reconstruction.write(struct.pack("<I", locations[("import lookup table", i)]))
# date stamp
reconstruction.write(struct.pack("<I", 0x0))
# forwarder chain
reconstruction.write(struct.pack("<I", 0x0))
# name rva
reconstruction.write(struct.pack("<I", locations[("dll", dll)]))
# import address table rva
reconstruction.write(struct.pack("<I", rva))
# empty last entry
reconstruction.write(b"\x00" * IDT_ENTRY_SIZE)
# if the reconstructed import structures are larger than the unpacking stub...
# i'm not sure what we'll do. probably need to add a section.
assert len(reconstruction.getvalue()) <= pefile_get_section_by_name(pe, ".aspack").Misc_VirtualSize
pe.set_bytes_at_rva(reconstruction_target, reconstruction.getvalue())
pe.OPTIONAL_HEADER.DATA_DIRECTORY[1].VirtualAddress = locations[("import descriptor table", 0)]
pe.OPTIONAL_HEADER.DATA_DIRECTORY[1].Size = IDT_ENTRY_SIZE * len(idt_tables)
return pe.write()
def unpack(self):
buf, oep = self.dump()
buf = self.fixup(buf, oep)
return buf
if __name__ == "__main__":
import sys
input = sys.argv[1]
output = sys.argv[1]
with open(sys.argv[1], "rb") as f:
buf = f.read()
with open(sys.argv[2], "wb") as f:
f.write(AspackUnpacker.unpack_pe(buf))

View File

@@ -1,112 +1,112 @@
""" """
Binary Ninja plugin that imports a capa report, Binary Ninja plugin that imports a capa report,
produced via `capa --json /path/to/sample`, produced via `capa --json /path/to/sample`,
into the current database. into the current database.
It will mark up functions with their capa matches, like: It will mark up functions with their capa matches, like:
; capa: print debug messages (host-interaction/log/debug/write-event) ; capa: print debug messages (host-interaction/log/debug/write-event)
; capa: delete service (host-interaction/service/delete) ; capa: delete service (host-interaction/service/delete)
; Attributes: bp-based frame ; Attributes: bp-based frame
public UninstallService public UninstallService
UninstallService proc near UninstallService proc near
... ...
To use, invoke from the Binary Ninja Tools menu, or from the To use, invoke from the Binary Ninja Tools menu, or from the
command-palette. command-palette.
Adapted for Binary Ninja by @psifertex Adapted for Binary Ninja by @psifertex
This script will verify that the report matches the workspace. This script will verify that the report matches the workspace.
Check the log window for any errors, and/or the summary of changes. Check the log window for any errors, and/or the summary of changes.
Derived from: https://github.com/fireeye/capa/blob/master/scripts/import-to-ida.py Derived from: https://github.com/fireeye/capa/blob/master/scripts/import-to-ida.py
""" """
import os import os
import json import json
from binaryninja import * from binaryninja import *
def append_func_cmt(bv, va, cmt): def append_func_cmt(bv, va, cmt):
""" """
add the given comment to the given function, add the given comment to the given function,
if it doesn't already exist. if it doesn't already exist.
""" """
func = bv.get_function_at(va) func = bv.get_function_at(va)
if not func: if not func:
raise ValueError("not a function") raise ValueError("not a function")
if cmt in func.comment: if cmt in func.comment:
return return
func.comment = func.comment + "\n" + cmt func.comment = func.comment + "\n" + cmt
def load_analysis(bv): def load_analysis(bv):
shortname = os.path.splitext(os.path.basename(bv.file.filename))[0] shortname = os.path.splitext(os.path.basename(bv.file.filename))[0]
dirname = os.path.dirname(bv.file.filename) dirname = os.path.dirname(bv.file.filename)
log_info(f"dirname: {dirname}\nshortname: {shortname}\n") log_info(f"dirname: {dirname}\nshortname: {shortname}\n")
if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK): if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK):
path = os.path.join(dirname, shortname + ".js") path = os.path.join(dirname, shortname + ".js")
elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK): elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK):
path = os.path.join(dirname, shortname + ".json") path = os.path.join(dirname, shortname + ".json")
else: else:
path = interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)") path = interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)")
if not path or not os.access(path, os.R_OK): if not path or not os.access(path, os.R_OK):
log_error("Invalid filename.") log_error("Invalid filename.")
return 0 return 0
log_info("Using capa file %s" % path) log_info("Using capa file %s" % path)
with open(path, "rb") as f: with open(path, "rb") as f:
doc = json.loads(f.read().decode("utf-8")) doc = json.loads(f.read().decode("utf-8"))
if "meta" not in doc or "rules" not in doc: if "meta" not in doc or "rules" not in doc:
log_error("doesn't appear to be a capa report") log_error("doesn't appear to be a capa report")
return -1 return -1
a = doc["meta"]["sample"]["md5"].lower() a = doc["meta"]["sample"]["md5"].lower()
md5 = Transform["MD5"] md5 = Transform["MD5"]
rawhex = Transform["RawHex"] rawhex = Transform["RawHex"]
b = rawhex.encode(md5.encode(bv.parent_view.read(bv.parent_view.start, bv.parent_view.end))).decode("utf-8") b = rawhex.encode(md5.encode(bv.parent_view.read(bv.parent_view.start, bv.parent_view.end))).decode("utf-8")
if not a == b: if not a == b:
log_error("sample mismatch") log_error("sample mismatch")
return -2 return -2
rows = [] rows = []
for rule in doc["rules"].values(): for rule in doc["rules"].values():
if rule["meta"].get("lib"): if rule["meta"].get("lib"):
continue continue
if rule["meta"].get("capa/subscope"): if rule["meta"].get("capa/subscope"):
continue continue
if rule["meta"]["scope"] != "function": if rule["meta"]["scope"] != "function":
continue continue
name = rule["meta"]["name"] name = rule["meta"]["name"]
ns = rule["meta"].get("namespace", "") ns = rule["meta"].get("namespace", "")
for va in rule["matches"].keys(): for va in rule["matches"].keys():
va = int(va) va = int(va)
rows.append((ns, name, va)) rows.append((ns, name, va))
# order by (namespace, name) so that like things show up together # order by (namespace, name) so that like things show up together
rows = sorted(rows) rows = sorted(rows)
for ns, name, va in rows: for ns, name, va in rows:
if ns: if ns:
cmt = "%s (%s)" % (name, ns) cmt = "%s (%s)" % (name, ns)
else: else:
cmt = "%s" % (name,) cmt = "%s" % (name,)
log_info("0x%x: %s" % (va, cmt)) log_info("0x%x: %s" % (va, cmt))
try: try:
# message will look something like: # message will look something like:
# #
# capa: delete service (host-interaction/service/delete) # capa: delete service (host-interaction/service/delete)
append_func_cmt(bv, va, "capa: " + cmt) append_func_cmt(bv, va, "capa: " + cmt)
except ValueError: except ValueError:
continue continue
log_info("ok") log_info("ok")
PluginCommand.register("Load capa file", "Loads an analysis file from capa", load_analysis) PluginCommand.register("Load capa file", "Loads an analysis file from capa", load_analysis)

View File

@@ -1,117 +1,117 @@
""" """
IDA Pro script that imports a capa report, IDA Pro script that imports a capa report,
produced via `capa --json /path/to/sample`, produced via `capa --json /path/to/sample`,
into the current database. into the current database.
It will mark up functions with their capa matches, like: It will mark up functions with their capa matches, like:
; capa: print debug messages (host-interaction/log/debug/write-event) ; capa: print debug messages (host-interaction/log/debug/write-event)
; capa: delete service (host-interaction/service/delete) ; capa: delete service (host-interaction/service/delete)
; Attributes: bp-based frame ; Attributes: bp-based frame
public UninstallService public UninstallService
UninstallService proc near UninstallService proc near
... ...
To use, invoke from the IDA Pro scripting dialog, To use, invoke from the IDA Pro scripting dialog,
such as via Alt-F9, such as via Alt-F9,
and then select the existing capa report from the file system. and then select the existing capa report from the file system.
This script will verify that the report matches the workspace. This script will verify that the report matches the workspace.
Check the output window for any errors, and/or the summary of changes. Check the output window for any errors, and/or the summary of changes.
Copyright (C) 2020 FireEye, Inc. All Rights Reserved. Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at: [package root]/LICENSE.txt You may obtain a copy of the License at: [package root]/LICENSE.txt
Unless required by applicable law or agreed to in writing, software distributed under the License Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License. See the License for the specific language governing permissions and limitations under the License.
""" """
import json import json
import logging import logging
import idc import idc
import idautils import idautils
import ida_idaapi import ida_idaapi
import ida_kernwin import ida_kernwin
logger = logging.getLogger("capa") logger = logging.getLogger("capa")
def append_func_cmt(va, cmt, repeatable=False): def append_func_cmt(va, cmt, repeatable=False):
""" """
add the given comment to the given function, add the given comment to the given function,
if it doesn't already exist. if it doesn't already exist.
""" """
func = ida_funcs.get_func(va) func = ida_funcs.get_func(va)
if not func: if not func:
raise ValueError("not a function") raise ValueError("not a function")
existing = ida_funcs.get_func_cmt(func, repeatable) or "" existing = ida_funcs.get_func_cmt(func, repeatable) or ""
if cmt in existing: if cmt in existing:
return return
new = existing + "\n" + cmt new = existing + "\n" + cmt
ida_funcs.set_func_cmt(func, new, repeatable) ida_funcs.set_func_cmt(func, new, repeatable)
def main(): def main():
path = ida_kernwin.ask_file(False, "*", "capa report") path = ida_kernwin.ask_file(False, "*", "capa report")
if not path: if not path:
return 0 return 0
with open(path, "rb") as f: with open(path, "rb") as f:
doc = json.loads(f.read().decode("utf-8")) doc = json.loads(f.read().decode("utf-8"))
if "meta" not in doc or "rules" not in doc: if "meta" not in doc or "rules" not in doc:
logger.error("doesn't appear to be a capa report") logger.error("doesn't appear to be a capa report")
return -1 return -1
# in IDA 7.4, the MD5 hash may be truncated, for example: # in IDA 7.4, the MD5 hash may be truncated, for example:
# wanted: 84882c9d43e23d63b82004fae74ebb61 # wanted: 84882c9d43e23d63b82004fae74ebb61
# found: b'84882C9D43E23D63B82004FAE74EBB6\x00' # found: b'84882C9D43E23D63B82004FAE74EBB6\x00'
# #
# see: https://github.com/idapython/bin/issues/11 # see: https://github.com/idapython/bin/issues/11
a = doc["meta"]["sample"]["md5"].lower() a = doc["meta"]["sample"]["md5"].lower()
b = idautils.GetInputFileMD5().decode("ascii").lower().rstrip("\x00") b = idautils.GetInputFileMD5().decode("ascii").lower().rstrip("\x00")
if not a.startswith(b): if not a.startswith(b):
logger.error("sample mismatch") logger.error("sample mismatch")
return -2 return -2
rows = [] rows = []
for rule in doc["rules"].values(): for rule in doc["rules"].values():
if rule["meta"].get("lib"): if rule["meta"].get("lib"):
continue continue
if rule["meta"].get("capa/subscope"): if rule["meta"].get("capa/subscope"):
continue continue
if rule["meta"]["scope"] != "function": if rule["meta"]["scope"] != "function":
continue continue
name = rule["meta"]["name"] name = rule["meta"]["name"]
ns = rule["meta"].get("namespace", "") ns = rule["meta"].get("namespace", "")
for va in rule["matches"].keys(): for va in rule["matches"].keys():
va = int(va) va = int(va)
rows.append((ns, name, va)) rows.append((ns, name, va))
# order by (namespace, name) so that like things show up together # order by (namespace, name) so that like things show up together
rows = sorted(rows) rows = sorted(rows)
for ns, name, va in rows: for ns, name, va in rows:
if ns: if ns:
cmt = "%s (%s)" % (name, ns) cmt = "%s (%s)" % (name, ns)
else: else:
cmt = "%s" % (name,) cmt = "%s" % (name,)
logger.info("0x%x: %s", va, cmt) logger.info("0x%x: %s", va, cmt)
try: try:
# message will look something like: # message will look something like:
# #
# capa: delete service (host-interaction/service/delete) # capa: delete service (host-interaction/service/delete)
append_func_cmt(va, "capa: " + cmt, repeatable=False) append_func_cmt(va, "capa: " + cmt, repeatable=False)
except ValueError: except ValueError:
continue continue
logger.info("ok") logger.info("ok")
main() main()

View File

@@ -17,6 +17,7 @@ requirements = ["six", "tqdm", "pyyaml", "tabulate", "colorama", "termcolor", "r
if sys.version_info >= (3, 0): if sys.version_info >= (3, 0):
# py3 # py3
requirements.append("networkx") requirements.append("networkx")
requirements.append("pylancelot~=0.3.6")
else: else:
# py2 # py2
requirements.append("enum34") requirements.append("enum34")
@@ -54,7 +55,7 @@ setuptools.setup(
"pycodestyle", "pycodestyle",
"black ; python_version>'3.0'", "black ; python_version>'3.0'",
"isort", "isort",
] ],
}, },
zip_safe=False, zip_safe=False,
keywords="capa", keywords="capa",

View File

@@ -80,6 +80,16 @@ def get_viv_extractor(path):
return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path) return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
@lru_cache
def get_lancelot_extractor(path):
import capa.features.extractors.lancelot
with open(path, "rb") as f:
buf = f.read()
return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf)
@lru_cache() @lru_cache()
def extract_file_features(extractor): def extract_file_features(extractor):
features = collections.defaultdict(set) features = collections.defaultdict(set)
@@ -140,6 +150,8 @@ def get_data_path_by_name(name):
return os.path.join(CD, "data", "bfb9b5391a13d0afd787e87ab90f14f5.dll_") return os.path.join(CD, "data", "bfb9b5391a13d0afd787e87ab90f14f5.dll_")
elif name.startswith("c9188"): elif name.startswith("c9188"):
return os.path.join(CD, "data", "c91887d861d9bd4a5872249b641bc9f9.exe_") return os.path.join(CD, "data", "c91887d861d9bd4a5872249b641bc9f9.exe_")
elif name == "aspack":
return os.path.join(CD, "data", "2055994ff75b4309eee3a49c5749d306")
else: else:
raise ValueError("unexpected sample fixture") raise ValueError("unexpected sample fixture")
@@ -427,7 +439,7 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected):
def get_extractor(path): def get_extractor(path):
if sys.version_info >= (3, 0): if sys.version_info >= (3, 0):
raise RuntimeError("no supported py3 backends yet") extractor = get_lancelot_extractor(path)
else: else:
extractor = get_viv_extractor(path) extractor = get_viv_extractor(path)
@@ -494,3 +506,8 @@ def z499c2_extractor():
@pytest.fixture @pytest.fixture
def al_khaser_x86_extractor(): def al_khaser_x86_extractor():
return get_extractor(get_data_path_by_name("al-khaser x86")) return get_extractor(get_data_path_by_name("al-khaser x86"))
@pytest.fixture
def aspack_extractor():
return get_extractor(get_data_path_by_name("aspack"))

View File

@@ -1,104 +1,104 @@
# run this script from within IDA with ./tests/data/mimikatz.exe open # run this script from within IDA with ./tests/data/mimikatz.exe open
import sys import sys
import logging import logging
import os.path import os.path
import binascii import binascii
import traceback import traceback
import pytest import pytest
try: try:
sys.path.append(os.path.dirname(__file__)) sys.path.append(os.path.dirname(__file__))
from fixtures import * from fixtures import *
finally: finally:
sys.path.pop() sys.path.pop()
logger = logging.getLogger("test_ida_features") logger = logging.getLogger("test_ida_features")
def check_input_file(wanted): def check_input_file(wanted):
import idautils import idautils
# some versions (7.4) of IDA return a truncated version of the MD5. # some versions (7.4) of IDA return a truncated version of the MD5.
# https://github.com/idapython/bin/issues/11 # https://github.com/idapython/bin/issues/11
try: try:
found = idautils.GetInputFileMD5()[:31].decode("ascii").lower() found = idautils.GetInputFileMD5()[:31].decode("ascii").lower()
except UnicodeDecodeError: except UnicodeDecodeError:
# in IDA 7.5 or so, GetInputFileMD5 started returning raw binary # in IDA 7.5 or so, GetInputFileMD5 started returning raw binary
# rather than the hex digest # rather than the hex digest
found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower() found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower()
if not wanted.startswith(found): if not wanted.startswith(found):
raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted)) raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted))
def get_ida_extractor(_path): def get_ida_extractor(_path):
check_input_file("5f66b82558ca92e54e77f216ef4c066c") check_input_file("5f66b82558ca92e54e77f216ef4c066c")
# have to import import this inline so pytest doesn't bail outside of IDA # have to import import this inline so pytest doesn't bail outside of IDA
import capa.features.extractors.ida import capa.features.extractors.ida
return capa.features.extractors.ida.IdaFeatureExtractor() return capa.features.extractors.ida.IdaFeatureExtractor()
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA") @pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_features(): def test_ida_features():
for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS: for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS:
id = make_test_id((sample, scope, feature, expected)) id = make_test_id((sample, scope, feature, expected))
try: try:
check_input_file(get_sample_md5_by_name(sample)) check_input_file(get_sample_md5_by_name(sample))
except RuntimeError: except RuntimeError:
print("SKIP %s" % (id)) print("SKIP %s" % (id))
continue continue
scope = resolve_scope(scope) scope = resolve_scope(scope)
sample = resolve_sample(sample) sample = resolve_sample(sample)
try: try:
do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected) do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
except Exception as e: except Exception as e:
print("FAIL %s" % (id)) print("FAIL %s" % (id))
traceback.print_exc() traceback.print_exc()
else: else:
print("OK %s" % (id)) print("OK %s" % (id))
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA") @pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
def test_ida_feature_counts(): def test_ida_feature_counts():
for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS: for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS:
id = make_test_id((sample, scope, feature, expected)) id = make_test_id((sample, scope, feature, expected))
try: try:
check_input_file(get_sample_md5_by_name(sample)) check_input_file(get_sample_md5_by_name(sample))
except RuntimeError: except RuntimeError:
print("SKIP %s" % (id)) print("SKIP %s" % (id))
continue continue
scope = resolve_scope(scope) scope = resolve_scope(scope)
sample = resolve_sample(sample) sample = resolve_sample(sample)
try: try:
do_test_feature_count(get_ida_extractor, sample, scope, feature, expected) do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
except Exception as e: except Exception as e:
print("FAIL %s" % (id)) print("FAIL %s" % (id))
traceback.print_exc() traceback.print_exc()
else: else:
print("OK %s" % (id)) print("OK %s" % (id))
if __name__ == "__main__": if __name__ == "__main__":
print("-" * 80) print("-" * 80)
# invoke all functions in this module that start with `test_` # invoke all functions in this module that start with `test_`
for name in dir(sys.modules[__name__]): for name in dir(sys.modules[__name__]):
if not name.startswith("test_"): if not name.startswith("test_"):
continue continue
test = getattr(sys.modules[__name__], name) test = getattr(sys.modules[__name__], name)
logger.debug("invoking test: %s", name) logger.debug("invoking test: %s", name)
sys.stderr.flush() sys.stderr.flush()
test() test()
print("DONE") print("DONE")

View File

@@ -0,0 +1,26 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from fixtures import *
@parametrize(
"sample,scope,feature,expected", FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"],
)
def test_lancelot_features(sample, scope, feature, expected):
with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"):
do_test_feature_presence(get_lancelot_extractor, sample, scope, feature, expected)
@parametrize(
"sample,scope,feature,expected", FEATURE_COUNT_TESTS, indirect=["sample", "scope"],
)
def test_lancelot_feature_counts(sample, scope, feature, expected):
with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"):
do_test_feature_count(get_lancelot_extractor, sample, scope, feature, expected)

View File

@@ -47,7 +47,7 @@ def test_main_single_rule(z9324d_extractor, tmpdir):
assert capa.main.main([path, "-v", "-r", rule_file.strpath,]) == 0 assert capa.main.main([path, "-v", "-r", rule_file.strpath,]) == 0
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") @pytest.mark.xfail(sys.version_info >= (3, 0), reason="lancelot doesn't support shellcode workspaces")
def test_main_shellcode(z499c2_extractor): def test_main_shellcode(z499c2_extractor):
path = z499c2_extractor.path path = z499c2_extractor.path
assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0 assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0

62
tests/test_unpack.py Normal file
View File

@@ -0,0 +1,62 @@
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import pefile
import pytest
from fixtures import *
import capa.unpack
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
def test_aspack_is_packed(aspack_extractor):
path = aspack_extractor.path
with open(path, "rb") as f:
buf = f.read()
assert capa.unpack.is_packed(buf) is True
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
def test_aspack_detect(aspack_extractor):
path = aspack_extractor.path
with open(path, "rb") as f:
buf = f.read()
assert capa.unpack.detect_packer(buf) == "aspack"
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
def test_aspack_unpack(aspack_extractor):
with open(aspack_extractor.path, "rb") as f:
buf = f.read()
unpacked = capa.unpack.unpack_pe("aspack", buf)
pe = pefile.PE(data=unpacked)
assert pe.OPTIONAL_HEADER.ImageBase == 0x4AD00000
assert pe.OPTIONAL_HEADER.AddressOfEntryPoint == 0x1A610
assert b"This program cannot be run in DOS mode" in unpacked
assert "(C) Copyright 1985-2000 Microsoft Corp.".encode("utf-16le") in unpacked
assert "CMD.EXE has halted. %0".encode("utf-16le") in unpacked
dlls = set([])
syms = set([])
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dlls.add(entry.dll.decode("ascii").lower().partition(".")[0])
for imp in entry.imports:
syms.add(imp.name.decode("ascii"))
assert dlls == {"advapi32", "kernel32", "msvcrt", "user32"}
assert "RegQueryValueExW" in syms
assert "WriteConsoleW" in syms
assert "realloc" in syms
assert "GetProcessWindowStation" in syms