mirror of
https://github.com/mandiant/capa.git
synced 2025-12-07 05:10:36 -08:00
Compare commits
60 Commits
ci/add-gem
...
feature/un
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
216e288ade | ||
|
|
ec55a9c482 | ||
|
|
e5136683e6 | ||
|
|
63561b73c1 | ||
|
|
b700ad1655 | ||
|
|
52eef09c8b | ||
|
|
de3f223bb5 | ||
|
|
6a2e5ad530 | ||
|
|
5575166b7a | ||
|
|
8a2ba9cd71 | ||
|
|
e523c6cb50 | ||
|
|
854e3d7774 | ||
|
|
1b6ac29053 | ||
|
|
df4c75882d | ||
|
|
0cfbed05b4 | ||
|
|
ca95512811 | ||
|
|
bb9803fcc0 | ||
|
|
1fe945e3ed | ||
|
|
6ba4798822 | ||
|
|
f424dd126f | ||
|
|
9fa128b27d | ||
|
|
09bca1e5f7 | ||
|
|
c623791a84 | ||
|
|
980a34adca | ||
|
|
8721eb05eb | ||
|
|
512ea89662 | ||
|
|
15259d455c | ||
|
|
1e097ef759 | ||
|
|
f193ceb91a | ||
|
|
fb19841997 | ||
|
|
40d16c925f | ||
|
|
d5f73b47a4 | ||
|
|
caf738ee4e | ||
|
|
c6f27200fe | ||
|
|
5c967cd6ef | ||
|
|
a59e1054fe | ||
|
|
db45a06ba7 | ||
|
|
c739caee40 | ||
|
|
bdf6b69be6 | ||
|
|
fdd6f7434b | ||
|
|
21adb2b9d1 | ||
|
|
5929c0652c | ||
|
|
e7bf5bfceb | ||
|
|
c2f55fad12 | ||
|
|
7ac4cf47f7 | ||
|
|
3f49a224f5 | ||
|
|
695f1bf55a | ||
|
|
10f5a54e1d | ||
|
|
042654ee97 | ||
|
|
1da450001c | ||
|
|
7996e2efe7 | ||
|
|
5eded3c5cc | ||
|
|
cdae840519 | ||
|
|
fcb8c4a293 | ||
|
|
4e6b475ff6 | ||
|
|
02a8ad1ea4 | ||
|
|
393b332f9c | ||
|
|
bf4a8dcd3e | ||
|
|
9bde11fa6f | ||
|
|
43c6eec30b |
3
.github/workflows/tests.yml
vendored
3
.github/workflows/tests.yml
vendored
@@ -62,7 +62,8 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
python-version: ${{ matrix.python }}
|
python-version: ${{ matrix.python }}
|
||||||
- name: Install capa
|
- name: Install capa
|
||||||
run: pip install -e .[dev]
|
# TODO: remove `pefile` when we bump lancelot >= 0.3.7
|
||||||
|
run: pip install -e .[dev] pefile
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: pytest tests/
|
run: pytest tests/
|
||||||
|
|
||||||
|
|||||||
92
capa/features/extractors/lancelot/__init__.py
Normal file
92
capa/features/extractors/lancelot/__init__.py
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import lancelot
|
||||||
|
|
||||||
|
import capa.features.extractors
|
||||||
|
import capa.features.extractors.lancelot.file
|
||||||
|
import capa.features.extractors.lancelot.insn
|
||||||
|
import capa.features.extractors.lancelot.function
|
||||||
|
import capa.features.extractors.lancelot.basicblock
|
||||||
|
|
||||||
|
__all__ = ["file", "function", "basicblock", "insn"]
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class BB(object):
|
||||||
|
"""extend the lancelot.BasicBlock with an __int__ method to access the address"""
|
||||||
|
|
||||||
|
def __init__(self, ws, bb):
|
||||||
|
super(BB, self).__init__()
|
||||||
|
self.ws = ws
|
||||||
|
self.address = bb.address
|
||||||
|
self.length = bb.length
|
||||||
|
self.predecessors = bb.predecessors
|
||||||
|
self.successors = bb.successors
|
||||||
|
|
||||||
|
def __int__(self):
|
||||||
|
return self.address
|
||||||
|
|
||||||
|
@property
|
||||||
|
def instructions(self):
|
||||||
|
va = self.address
|
||||||
|
while va < self.address + self.length:
|
||||||
|
try:
|
||||||
|
insn = self.ws.read_insn(va)
|
||||||
|
except ValueError:
|
||||||
|
logger.warning("failed to read instruction at 0x%x", va)
|
||||||
|
return
|
||||||
|
|
||||||
|
yield insn
|
||||||
|
va += insn.length
|
||||||
|
|
||||||
|
|
||||||
|
class LancelotFeatureExtractor(capa.features.extractors.FeatureExtractor):
|
||||||
|
def __init__(self, buf):
|
||||||
|
super(LancelotFeatureExtractor, self).__init__()
|
||||||
|
self.buf = buf
|
||||||
|
self.ws = lancelot.from_bytes(buf)
|
||||||
|
self.ctx = {}
|
||||||
|
|
||||||
|
def get_base_address(self):
|
||||||
|
return self.ws.base_address
|
||||||
|
|
||||||
|
def extract_file_features(self):
|
||||||
|
for feature, va in capa.features.extractors.lancelot.file.extract_file_features(self.buf):
|
||||||
|
yield feature, va
|
||||||
|
|
||||||
|
def get_functions(self):
|
||||||
|
for va in self.ws.get_functions():
|
||||||
|
# this is just the address of the function
|
||||||
|
yield va
|
||||||
|
|
||||||
|
def extract_function_features(self, f):
|
||||||
|
for feature, va in capa.features.extractors.lancelot.function.extract_function_features(self.ws, f):
|
||||||
|
yield feature, va
|
||||||
|
|
||||||
|
def get_basic_blocks(self, f):
|
||||||
|
try:
|
||||||
|
cfg = self.ws.build_cfg(f)
|
||||||
|
except:
|
||||||
|
logger.warning("failed to build CFG for 0x%x", f)
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
for bb in cfg.basic_blocks.values():
|
||||||
|
yield BB(self.ws, bb)
|
||||||
|
|
||||||
|
def extract_basic_block_features(self, f, bb):
|
||||||
|
for feature, va in capa.features.extractors.lancelot.basicblock.extract_basic_block_features(self.ws, bb):
|
||||||
|
yield feature, va
|
||||||
|
|
||||||
|
def get_instructions(self, f, bb):
|
||||||
|
return bb.instructions
|
||||||
|
|
||||||
|
def extract_insn_features(self, f, bb, insn):
|
||||||
|
for feature, va in capa.features.extractors.lancelot.insn.extract_insn_features(self, f, bb, insn):
|
||||||
|
yield feature, va
|
||||||
120
capa/features/extractors/lancelot/basicblock.py
Normal file
120
capa/features/extractors/lancelot/basicblock.py
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
import string
|
||||||
|
import struct
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from lancelot import (
|
||||||
|
FLOW_VA,
|
||||||
|
OPERAND_SIZE,
|
||||||
|
OPERAND_TYPE,
|
||||||
|
MEMORY_OPERAND_BASE,
|
||||||
|
OPERAND_TYPE_MEMORY,
|
||||||
|
OPERAND_TYPE_IMMEDIATE,
|
||||||
|
IMMEDIATE_OPERAND_VALUE,
|
||||||
|
)
|
||||||
|
|
||||||
|
from capa.features import Characteristic
|
||||||
|
from capa.features.basicblock import BasicBlock
|
||||||
|
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_bb_tight_loop(ws, bb):
|
||||||
|
""" check basic block for tight loop indicators """
|
||||||
|
if bb.address in map(lambda flow: flow[FLOW_VA], bb.successors):
|
||||||
|
yield Characteristic("tight loop"), bb.address
|
||||||
|
|
||||||
|
|
||||||
|
def is_mov_imm_to_stack(insn):
|
||||||
|
if not insn.mnemonic.startswith("mov"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
dst, src = insn.operands
|
||||||
|
except ValueError:
|
||||||
|
# not two operands
|
||||||
|
return False
|
||||||
|
|
||||||
|
if src[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if src[IMMEDIATE_OPERAND_VALUE] < 0:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if dst[OPERAND_TYPE] != OPERAND_TYPE_MEMORY:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if dst[MEMORY_OPERAND_BASE] not in ("ebp", "rbp", "esp", "rsp"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def is_printable_ascii(chars):
|
||||||
|
return all(c < 127 and chr(c) in string.printable for c in chars)
|
||||||
|
|
||||||
|
|
||||||
|
def is_printable_utf16le(chars):
|
||||||
|
if all(c == b"\x00" for c in chars[1::2]):
|
||||||
|
return is_printable_ascii(chars[::2])
|
||||||
|
|
||||||
|
|
||||||
|
def get_printable_len(operand):
|
||||||
|
"""
|
||||||
|
Return string length if all operand bytes are ascii or utf16-le printable
|
||||||
|
"""
|
||||||
|
operand_size = operand[OPERAND_SIZE]
|
||||||
|
if operand_size == 8:
|
||||||
|
chars = struct.pack("<B", operand[IMMEDIATE_OPERAND_VALUE])
|
||||||
|
elif operand_size == 16:
|
||||||
|
chars = struct.pack("<H", operand[IMMEDIATE_OPERAND_VALUE])
|
||||||
|
elif operand_size == 32:
|
||||||
|
chars = struct.pack("<I", operand[IMMEDIATE_OPERAND_VALUE])
|
||||||
|
elif operand_size == 64:
|
||||||
|
chars = struct.pack("<Q", operand[IMMEDIATE_OPERAND_VALUE])
|
||||||
|
else:
|
||||||
|
raise ValueError("unexpected operand size: " + str(operand_size))
|
||||||
|
|
||||||
|
if is_printable_ascii(chars):
|
||||||
|
return operand_size / 8
|
||||||
|
if is_printable_utf16le(chars):
|
||||||
|
return operand_size / 16
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def _bb_has_stackstring(ws, bb):
|
||||||
|
"""
|
||||||
|
extract potential stackstring creation, using the following heuristics:
|
||||||
|
- basic block contains enough moves of constant bytes to the stack
|
||||||
|
"""
|
||||||
|
count = 0
|
||||||
|
for insn in bb.instructions:
|
||||||
|
if is_mov_imm_to_stack(insn):
|
||||||
|
# add number of operand bytes
|
||||||
|
src = insn.operands[1]
|
||||||
|
count += get_printable_len(src)
|
||||||
|
|
||||||
|
if count > MIN_STACKSTRING_LEN:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def extract_stackstring(ws, bb):
|
||||||
|
""" check basic block for stackstring indicators """
|
||||||
|
if _bb_has_stackstring(ws, bb):
|
||||||
|
yield Characteristic("stack string"), bb.address
|
||||||
|
|
||||||
|
|
||||||
|
def extract_basic_block_features(ws, bb):
|
||||||
|
yield BasicBlock(), bb.address
|
||||||
|
for bb_handler in BASIC_BLOCK_HANDLERS:
|
||||||
|
for feature, va in bb_handler(ws, bb):
|
||||||
|
yield feature, va
|
||||||
|
|
||||||
|
|
||||||
|
BASIC_BLOCK_HANDLERS = (
|
||||||
|
extract_bb_tight_loop,
|
||||||
|
extract_stackstring,
|
||||||
|
)
|
||||||
81
capa/features/extractors/lancelot/file.py
Normal file
81
capa/features/extractors/lancelot/file.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
import pefile
|
||||||
|
|
||||||
|
import capa.features.extractors.strings
|
||||||
|
from capa.features import String, Characteristic
|
||||||
|
from capa.features.file import Export, Import, Section
|
||||||
|
|
||||||
|
|
||||||
|
def extract_file_embedded_pe(buf, pe):
|
||||||
|
buf = buf[2:]
|
||||||
|
|
||||||
|
total_offset = 2
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
offset = buf.index(b"MZ")
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
rest = buf[offset:]
|
||||||
|
total_offset += offset
|
||||||
|
|
||||||
|
try:
|
||||||
|
_ = pefile.PE(data=rest)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
yield Characteristic("embedded pe"), total_offset
|
||||||
|
|
||||||
|
buf = rest[2:]
|
||||||
|
total_offset += 2
|
||||||
|
|
||||||
|
|
||||||
|
def extract_file_export_names(buf, pe):
|
||||||
|
if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
|
||||||
|
return
|
||||||
|
|
||||||
|
base_address = pe.OPTIONAL_HEADER.ImageBase
|
||||||
|
for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
|
||||||
|
yield Export(exp.name.decode("ascii")), base_address + exp.address
|
||||||
|
|
||||||
|
|
||||||
|
def extract_file_import_names(buf, pe):
|
||||||
|
base_address = pe.OPTIONAL_HEADER.ImageBase
|
||||||
|
for entry in pe.DIRECTORY_ENTRY_IMPORT:
|
||||||
|
libname = entry.dll.decode("ascii").lower().partition(".")[0]
|
||||||
|
for imp in entry.imports:
|
||||||
|
if imp.ordinal:
|
||||||
|
yield Import("%s.#%s" % (libname, imp.ordinal)), imp.address
|
||||||
|
else:
|
||||||
|
impname = imp.name.decode("ascii")
|
||||||
|
yield Import("%s.%s" % (libname, impname)), imp.address
|
||||||
|
yield Import("%s" % (impname)), imp.address
|
||||||
|
|
||||||
|
|
||||||
|
def extract_file_section_names(buf, pe):
|
||||||
|
base_address = pe.OPTIONAL_HEADER.ImageBase
|
||||||
|
for section in pe.sections:
|
||||||
|
yield Section(section.Name.partition(b"\x00")[0].decode("ascii")), base_address + section.VirtualAddress
|
||||||
|
|
||||||
|
|
||||||
|
def extract_file_strings(buf, pe):
|
||||||
|
for s in capa.features.extractors.strings.extract_ascii_strings(buf):
|
||||||
|
yield String(s.s), s.offset
|
||||||
|
|
||||||
|
for s in capa.features.extractors.strings.extract_unicode_strings(buf):
|
||||||
|
yield String(s.s), s.offset
|
||||||
|
|
||||||
|
|
||||||
|
def extract_file_features(buf):
|
||||||
|
pe = pefile.PE(data=buf)
|
||||||
|
for file_handler in FILE_HANDLERS:
|
||||||
|
for feature, va in file_handler(buf, pe):
|
||||||
|
yield feature, va
|
||||||
|
|
||||||
|
|
||||||
|
FILE_HANDLERS = (
|
||||||
|
extract_file_embedded_pe,
|
||||||
|
extract_file_export_names,
|
||||||
|
extract_file_import_names,
|
||||||
|
extract_file_section_names,
|
||||||
|
extract_file_strings,
|
||||||
|
)
|
||||||
64
capa/features/extractors/lancelot/function.py
Normal file
64
capa/features/extractors/lancelot/function.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
try:
|
||||||
|
from functools import lru_cache
|
||||||
|
except ImportError:
|
||||||
|
from backports.functools_lru_cache import lru_cache
|
||||||
|
|
||||||
|
from lancelot import (
|
||||||
|
FLOW_VA,
|
||||||
|
FLOW_TYPE,
|
||||||
|
FLOW_TYPE_CONDITIONAL_JUMP,
|
||||||
|
FLOW_TYPE_CONDITIONAL_MOVE,
|
||||||
|
FLOW_TYPE_UNCONDITIONAL_JUMP,
|
||||||
|
)
|
||||||
|
|
||||||
|
from capa.features import Characteristic
|
||||||
|
from capa.features.extractors import loops
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache
|
||||||
|
def get_call_graph(ws):
|
||||||
|
return ws.build_call_graph()
|
||||||
|
|
||||||
|
|
||||||
|
def extract_function_calls_to(ws, f):
|
||||||
|
cg = get_call_graph(ws)
|
||||||
|
|
||||||
|
for caller in cg.calls_to.get(f, []):
|
||||||
|
yield Characteristic("calls to"), caller
|
||||||
|
|
||||||
|
|
||||||
|
def extract_function_loop(ws, f):
|
||||||
|
edges = []
|
||||||
|
for bb in ws.build_cfg(f).basic_blocks.values():
|
||||||
|
for flow in bb.successors:
|
||||||
|
if flow[FLOW_TYPE] in (
|
||||||
|
FLOW_TYPE_UNCONDITIONAL_JUMP,
|
||||||
|
FLOW_TYPE_CONDITIONAL_JUMP,
|
||||||
|
FLOW_TYPE_CONDITIONAL_MOVE,
|
||||||
|
):
|
||||||
|
edges.append((bb.address, flow[FLOW_VA]))
|
||||||
|
continue
|
||||||
|
|
||||||
|
if edges and loops.has_loop(edges):
|
||||||
|
yield Characteristic("loop"), f
|
||||||
|
|
||||||
|
|
||||||
|
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)
|
||||||
|
|
||||||
|
|
||||||
|
_not_implemented = set([])
|
||||||
|
|
||||||
|
|
||||||
|
def extract_function_features(ws, f):
|
||||||
|
for func_handler in FUNCTION_HANDLERS:
|
||||||
|
try:
|
||||||
|
for feature, va in func_handler(ws, f):
|
||||||
|
yield feature, va
|
||||||
|
except NotImplementedError:
|
||||||
|
if func_handler.__name__ not in _not_implemented:
|
||||||
|
logger.warning("not implemented: %s", func_handler.__name__)
|
||||||
|
_not_implemented.add(func_handler.__name__)
|
||||||
33
capa/features/extractors/lancelot/helpers.py
Normal file
33
capa/features/extractors/lancelot/helpers.py
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
from lancelot import (
|
||||||
|
OPERAND_TYPE,
|
||||||
|
MEMORY_OPERAND_BASE,
|
||||||
|
MEMORY_OPERAND_DISP,
|
||||||
|
OPERAND_TYPE_MEMORY,
|
||||||
|
OPERAND_TYPE_IMMEDIATE,
|
||||||
|
IMMEDIATE_OPERAND_VALUE,
|
||||||
|
IMMEDIATE_OPERAND_IS_RELATIVE,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_operand_target(insn, op):
|
||||||
|
if op[OPERAND_TYPE] == OPERAND_TYPE_MEMORY:
|
||||||
|
# call direct, x64
|
||||||
|
# rip relative
|
||||||
|
# kernel32-64:180001041 call cs:__imp_RtlVirtualUnwind_0
|
||||||
|
if op[MEMORY_OPERAND_BASE] == "rip":
|
||||||
|
return op[MEMORY_OPERAND_DISP] + insn.address + insn.length
|
||||||
|
|
||||||
|
# call direct, x32
|
||||||
|
# mimikatz:0x403BD3 call ds:CryptAcquireContextW
|
||||||
|
elif op[MEMORY_OPERAND_BASE] == None:
|
||||||
|
return op[MEMORY_OPERAND_DISP]
|
||||||
|
|
||||||
|
# call via thunk
|
||||||
|
# mimikatz:0x455A41 call LsaQueryInformationPolicy
|
||||||
|
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE and op[IMMEDIATE_OPERAND_IS_RELATIVE]:
|
||||||
|
return op[IMMEDIATE_OPERAND_VALUE] + insn.address + insn.length
|
||||||
|
|
||||||
|
elif op[OPERAND_TYPE] == OPERAND_TYPE_IMMEDIATE:
|
||||||
|
return op[IMMEDIATE_OPERAND_VALUE]
|
||||||
|
|
||||||
|
raise ValueError("memory operand has no target")
|
||||||
149
capa/features/extractors/lancelot/indirect_calls.py
Normal file
149
capa/features/extractors/lancelot/indirect_calls.py
Normal file
@@ -0,0 +1,149 @@
|
|||||||
|
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
import collections
|
||||||
|
|
||||||
|
from lancelot import (
|
||||||
|
FLOW_VA,
|
||||||
|
OPERAND_TYPE,
|
||||||
|
PERMISSION_READ,
|
||||||
|
MEMORY_OPERAND_BASE,
|
||||||
|
MEMORY_OPERAND_DISP,
|
||||||
|
OPERAND_TYPE_MEMORY,
|
||||||
|
MEMORY_OPERAND_INDEX,
|
||||||
|
OPERAND_TYPE_REGISTER,
|
||||||
|
MEMORY_OPERAND_SEGMENT,
|
||||||
|
OPERAND_TYPE_IMMEDIATE,
|
||||||
|
IMMEDIATE_OPERAND_VALUE,
|
||||||
|
REGISTER_OPERAND_REGISTER,
|
||||||
|
IMMEDIATE_OPERAND_IS_RELATIVE,
|
||||||
|
)
|
||||||
|
|
||||||
|
from capa.features.extractors.lancelot.helpers import get_operand_target
|
||||||
|
|
||||||
|
DESTRUCTIVE_MNEMONICS = ("mov", "lea", "pop", "xor")
|
||||||
|
|
||||||
|
|
||||||
|
class NotFoundError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def read_instructions(ws, bb):
|
||||||
|
va = bb.address
|
||||||
|
while va < bb.address + bb.length:
|
||||||
|
try:
|
||||||
|
insn = ws.read_insn(va)
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
|
||||||
|
yield insn
|
||||||
|
va += insn.length
|
||||||
|
|
||||||
|
|
||||||
|
def build_instruction_predecessors(ws, cfg):
|
||||||
|
preds = collections.defaultdict(set)
|
||||||
|
|
||||||
|
for bb in cfg.basic_blocks.values():
|
||||||
|
insns = list(read_instructions(ws, bb))
|
||||||
|
|
||||||
|
for i, insn in enumerate(insns):
|
||||||
|
if i == 0:
|
||||||
|
for pred in bb.predecessors:
|
||||||
|
pred_bb = cfg.basic_blocks[pred[FLOW_VA]]
|
||||||
|
preds[insn.address].add(list(read_instructions(ws, pred_bb))[-1].address)
|
||||||
|
else:
|
||||||
|
preds[insn.address].add(insns[i - 1].address)
|
||||||
|
|
||||||
|
return preds
|
||||||
|
|
||||||
|
|
||||||
|
def find_definition(ws, f, insn):
|
||||||
|
"""
|
||||||
|
scan backwards from the given address looking for assignments to the given register.
|
||||||
|
if a constant, return that value.
|
||||||
|
args:
|
||||||
|
ws (lancelot.PE)
|
||||||
|
f (int): the function start address
|
||||||
|
insn (lancelot.Instruction): call instruction to resolve
|
||||||
|
returns:
|
||||||
|
(va: int, value?: int|None): the address of the assignment and the value, if a constant.
|
||||||
|
raises:
|
||||||
|
NotFoundError: when the definition cannot be found.
|
||||||
|
"""
|
||||||
|
assert insn.mnemonic == "call"
|
||||||
|
op0 = insn.operands[0]
|
||||||
|
assert op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER
|
||||||
|
reg = op0[REGISTER_OPERAND_REGISTER]
|
||||||
|
|
||||||
|
cfg = ws.build_cfg(f)
|
||||||
|
preds = build_instruction_predecessors(ws, cfg)
|
||||||
|
|
||||||
|
q = collections.deque()
|
||||||
|
seen = set([])
|
||||||
|
q.extend(preds[insn.address])
|
||||||
|
while q:
|
||||||
|
cur = q.popleft()
|
||||||
|
|
||||||
|
# skip if we've already processed this location
|
||||||
|
if cur in seen:
|
||||||
|
continue
|
||||||
|
seen.add(cur)
|
||||||
|
|
||||||
|
insn = ws.read_insn(cur)
|
||||||
|
operands = insn.operands
|
||||||
|
|
||||||
|
if len(operands) == 0:
|
||||||
|
q.extend(preds[cur])
|
||||||
|
continue
|
||||||
|
|
||||||
|
op0 = operands[0]
|
||||||
|
if not (
|
||||||
|
op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER
|
||||||
|
and op0[REGISTER_OPERAND_REGISTER] == reg
|
||||||
|
and insn.mnemonic in DESTRUCTIVE_MNEMONICS
|
||||||
|
):
|
||||||
|
q.extend(preds[cur])
|
||||||
|
continue
|
||||||
|
|
||||||
|
# if we reach here, the instruction is destructive to our target register.
|
||||||
|
|
||||||
|
# we currently only support extracting the constant from something like: `mov $reg, IAT`
|
||||||
|
# so, any other pattern results in an unknown value, represented by None.
|
||||||
|
# this is a good place to extend in the future, if we need more robust support.
|
||||||
|
if insn.mnemonic != "mov":
|
||||||
|
return (cur, None)
|
||||||
|
else:
|
||||||
|
op1 = operands[1]
|
||||||
|
try:
|
||||||
|
target = get_operand_target(insn, op1)
|
||||||
|
except ValueError:
|
||||||
|
return (cur, None)
|
||||||
|
else:
|
||||||
|
return (cur, target)
|
||||||
|
|
||||||
|
raise NotFoundError()
|
||||||
|
|
||||||
|
|
||||||
|
def is_indirect_call(insn):
|
||||||
|
return insn.mnemonic == "call" and insn.operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_indirect_call(ws, f, insn):
|
||||||
|
"""
|
||||||
|
inspect the given indirect call instruction and attempt to resolve the target address.
|
||||||
|
args:
|
||||||
|
ws (lancelot.PE): the analysis workspace
|
||||||
|
f (int): the address of the function to analyze
|
||||||
|
insn (lancelot.Instruction): the instruction at which to start analysis
|
||||||
|
returns:
|
||||||
|
(va: int, value?: int|None): the address of the assignment and the value, if a constant.
|
||||||
|
raises:
|
||||||
|
NotFoundError: when the definition cannot be found.
|
||||||
|
"""
|
||||||
|
assert is_indirect_call(insn)
|
||||||
|
return find_definition(ws, f, insn)
|
||||||
487
capa/features/extractors/lancelot/insn.py
Normal file
487
capa/features/extractors/lancelot/insn.py
Normal file
@@ -0,0 +1,487 @@
|
|||||||
|
import logging
|
||||||
|
import itertools
|
||||||
|
|
||||||
|
import pefile
|
||||||
|
|
||||||
|
try:
|
||||||
|
from functools import lru_cache
|
||||||
|
except ImportError:
|
||||||
|
from backports.functools_lru_cache import lru_cache
|
||||||
|
|
||||||
|
from lancelot import (
|
||||||
|
OPERAND_TYPE,
|
||||||
|
PERMISSION_READ,
|
||||||
|
MEMORY_OPERAND_BASE,
|
||||||
|
MEMORY_OPERAND_DISP,
|
||||||
|
OPERAND_TYPE_MEMORY,
|
||||||
|
MEMORY_OPERAND_INDEX,
|
||||||
|
OPERAND_TYPE_REGISTER,
|
||||||
|
MEMORY_OPERAND_SEGMENT,
|
||||||
|
OPERAND_TYPE_IMMEDIATE,
|
||||||
|
IMMEDIATE_OPERAND_VALUE,
|
||||||
|
REGISTER_OPERAND_REGISTER,
|
||||||
|
IMMEDIATE_OPERAND_IS_RELATIVE,
|
||||||
|
)
|
||||||
|
|
||||||
|
import capa.features.extractors.helpers
|
||||||
|
from capa.features import ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, Bytes, String, Characteristic
|
||||||
|
from capa.features.insn import Number, Offset, Mnemonic
|
||||||
|
from capa.features.extractors.lancelot.helpers import get_operand_target
|
||||||
|
from capa.features.extractors.lancelot.function import get_call_graph
|
||||||
|
from capa.features.extractors.lancelot.indirect_calls import NotFoundError, resolve_indirect_call
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# security cookie checks may perform non-zeroing XORs, these are expected within a certain
|
||||||
|
# byte range within the first and returning basic blocks, this helps to reduce FP features
|
||||||
|
SECURITY_COOKIE_BYTES_DELTA = 0x40
|
||||||
|
|
||||||
|
|
||||||
|
def get_arch(ws):
|
||||||
|
if ws.arch == "x32":
|
||||||
|
return ARCH_X32
|
||||||
|
elif ws.arch == "x64":
|
||||||
|
return ARCH_X64
|
||||||
|
else:
|
||||||
|
raise ValueError("unexpected architecture")
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache
|
||||||
|
def get_pefile(xtor):
|
||||||
|
return pefile.PE(data=xtor.buf)
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache
|
||||||
|
def get_imports(xtor):
|
||||||
|
pe = get_pefile(xtor)
|
||||||
|
|
||||||
|
imports = {}
|
||||||
|
for entry in pe.DIRECTORY_ENTRY_IMPORT:
|
||||||
|
libname = entry.dll.decode("ascii").lower().partition(".")[0]
|
||||||
|
for imp in entry.imports:
|
||||||
|
if imp.ordinal:
|
||||||
|
imports[imp.address] = "%s.#%s" % (libname, imp.ordinal)
|
||||||
|
else:
|
||||||
|
impname = imp.name.decode("ascii")
|
||||||
|
imports[imp.address] = "%s.%s" % (libname, impname)
|
||||||
|
return imports
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache
|
||||||
|
def get_thunks(xtor):
|
||||||
|
thunks = {}
|
||||||
|
for va in xtor.ws.get_functions():
|
||||||
|
try:
|
||||||
|
insn = xtor.ws.read_insn(va)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if insn.mnemonic != "jmp":
|
||||||
|
continue
|
||||||
|
|
||||||
|
op0 = insn.operands[0]
|
||||||
|
|
||||||
|
try:
|
||||||
|
target = get_operand_target(insn, op0)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
imports = get_imports(xtor)
|
||||||
|
if target not in imports:
|
||||||
|
continue
|
||||||
|
|
||||||
|
thunks[va] = imports[target]
|
||||||
|
|
||||||
|
return thunks
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_api_features(xtor, f, bb, insn):
|
||||||
|
"""parse API features from the given instruction."""
|
||||||
|
|
||||||
|
if insn.mnemonic != "call":
|
||||||
|
return
|
||||||
|
|
||||||
|
op0 = insn.operands[0]
|
||||||
|
|
||||||
|
if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER:
|
||||||
|
try:
|
||||||
|
(_, target) = resolve_indirect_call(xtor.ws, f, insn)
|
||||||
|
except NotFoundError:
|
||||||
|
return
|
||||||
|
if target is None:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
target = get_operand_target(insn, op0)
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
|
||||||
|
imports = get_imports(xtor)
|
||||||
|
if target in imports:
|
||||||
|
for feature, va in capa.features.extractors.helpers.generate_api_features(imports[target], insn.address):
|
||||||
|
yield feature, va
|
||||||
|
return
|
||||||
|
|
||||||
|
thunks = get_thunks(xtor)
|
||||||
|
if target in thunks:
|
||||||
|
for feature, va in capa.features.extractors.helpers.generate_api_features(thunks[target], insn.address):
|
||||||
|
yield feature, va
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_mnemonic_features(xtor, f, bb, insn):
|
||||||
|
"""parse mnemonic features from the given instruction."""
|
||||||
|
yield Mnemonic(insn.mnemonic), insn.address
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_number_features(xtor, f, bb, insn):
|
||||||
|
"""parse number features from the given instruction."""
|
||||||
|
operands = insn.operands
|
||||||
|
|
||||||
|
for operand in operands:
|
||||||
|
if operand[OPERAND_TYPE] != OPERAND_TYPE_IMMEDIATE:
|
||||||
|
continue
|
||||||
|
|
||||||
|
v = operand[IMMEDIATE_OPERAND_VALUE]
|
||||||
|
|
||||||
|
if xtor.ws.probe(v) & PERMISSION_READ:
|
||||||
|
# v is a valid address
|
||||||
|
# therefore, assume its not also a constant.
|
||||||
|
continue
|
||||||
|
|
||||||
|
if (
|
||||||
|
insn.mnemonic == "add"
|
||||||
|
and operands[0][OPERAND_TYPE] == OPERAND_TYPE_REGISTER
|
||||||
|
and operands[0][REGISTER_OPERAND_REGISTER] == "esp"
|
||||||
|
):
|
||||||
|
# skip things like:
|
||||||
|
#
|
||||||
|
# .text:00401140 call sub_407E2B
|
||||||
|
# .text:00401145 add esp, 0Ch
|
||||||
|
return
|
||||||
|
|
||||||
|
yield Number(v), insn.address
|
||||||
|
yield Number(v, arch=get_arch(xtor.ws)), insn.address
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_offset_features(xtor, f, bb, insn):
|
||||||
|
"""parse structure offset features from the given instruction."""
|
||||||
|
operands = insn.operands
|
||||||
|
|
||||||
|
for operand in operands:
|
||||||
|
if operand[OPERAND_TYPE] != OPERAND_TYPE_MEMORY:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if operand[MEMORY_OPERAND_BASE] in ("esp", "ebp", "rbp"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# lancelot provides `None` when the displacement is not present.
|
||||||
|
v = operand[MEMORY_OPERAND_DISP] or 0
|
||||||
|
|
||||||
|
yield Offset(v), insn.address
|
||||||
|
yield Offset(v, arch=get_arch(xtor.ws)), insn.address
|
||||||
|
|
||||||
|
|
||||||
|
def derefs(xtor, p):
|
||||||
|
"""
|
||||||
|
recursively follow the given pointer, yielding the valid memory addresses along the way.
|
||||||
|
useful when you may have a pointer to string, or pointer to pointer to string, etc.
|
||||||
|
this is a "do what i mean" type of helper function.
|
||||||
|
"""
|
||||||
|
|
||||||
|
depth = 0
|
||||||
|
while True:
|
||||||
|
if not xtor.ws.probe(p) & PERMISSION_READ:
|
||||||
|
return
|
||||||
|
yield p
|
||||||
|
|
||||||
|
next = xtor.ws.read_pointer(p)
|
||||||
|
|
||||||
|
# sanity: pointer points to self
|
||||||
|
if next == p:
|
||||||
|
return
|
||||||
|
|
||||||
|
# sanity: avoid chains of pointers that are unreasonably deep
|
||||||
|
depth += 1
|
||||||
|
if depth > 10:
|
||||||
|
return
|
||||||
|
|
||||||
|
p = next
|
||||||
|
|
||||||
|
|
||||||
|
def read_bytes(xtor, va):
|
||||||
|
"""
|
||||||
|
read up to MAX_BYTES_FEATURE_SIZE from the given address.
|
||||||
|
|
||||||
|
raises:
|
||||||
|
ValueError: if the given address is not valid.
|
||||||
|
"""
|
||||||
|
start = va
|
||||||
|
end = va + MAX_BYTES_FEATURE_SIZE
|
||||||
|
pe = get_pefile(xtor)
|
||||||
|
|
||||||
|
for section in pe.sections:
|
||||||
|
section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress
|
||||||
|
section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize
|
||||||
|
|
||||||
|
if section_start <= start < section_end:
|
||||||
|
end = min(end, section_end)
|
||||||
|
return xtor.ws.read_bytes(start, end - start)
|
||||||
|
|
||||||
|
raise ValueError("invalid address")
|
||||||
|
|
||||||
|
|
||||||
|
# these are mnemonics that may flow (jump) elsewhere
|
||||||
|
FLOW_MNEMONICS = set(
|
||||||
|
[
|
||||||
|
"call",
|
||||||
|
"jb",
|
||||||
|
"jbe",
|
||||||
|
"jcxz",
|
||||||
|
"jecxz",
|
||||||
|
"jknzd",
|
||||||
|
"jkzd",
|
||||||
|
"jl",
|
||||||
|
"jle",
|
||||||
|
"jmp",
|
||||||
|
"jnb",
|
||||||
|
"jnbe",
|
||||||
|
"jnl",
|
||||||
|
"jnle",
|
||||||
|
"jno",
|
||||||
|
"jnp",
|
||||||
|
"jns",
|
||||||
|
"jnz",
|
||||||
|
"jo",
|
||||||
|
"jp",
|
||||||
|
"jrcxz",
|
||||||
|
"js",
|
||||||
|
"jz",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_bytes_features(xtor, f, bb, insn):
|
||||||
|
"""
|
||||||
|
parse byte sequence features from the given instruction.
|
||||||
|
"""
|
||||||
|
if insn.mnemonic in FLOW_MNEMONICS:
|
||||||
|
return
|
||||||
|
|
||||||
|
for operand in insn.operands:
|
||||||
|
try:
|
||||||
|
target = get_operand_target(insn, operand)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for ptr in derefs(xtor, target):
|
||||||
|
try:
|
||||||
|
buf = read_bytes(xtor, ptr)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if capa.features.extractors.helpers.all_zeros(buf):
|
||||||
|
continue
|
||||||
|
|
||||||
|
yield Bytes(buf), insn.address
|
||||||
|
|
||||||
|
|
||||||
|
def first(s):
|
||||||
|
"""enumerate the first element in the sequence"""
|
||||||
|
for i in s:
|
||||||
|
yield i
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_string_features(xtor, f, bb, insn):
|
||||||
|
"""parse string features from the given instruction."""
|
||||||
|
for bytez, va in extract_insn_bytes_features(xtor, f, bb, insn):
|
||||||
|
buf = bytez.value
|
||||||
|
|
||||||
|
for s in itertools.chain(
|
||||||
|
first(capa.features.extractors.strings.extract_ascii_strings(buf)),
|
||||||
|
first(capa.features.extractors.strings.extract_unicode_strings(buf)),
|
||||||
|
):
|
||||||
|
if s.offset == 0:
|
||||||
|
yield String(s.s), va
|
||||||
|
|
||||||
|
|
||||||
|
def is_security_cookie(xtor, f, bb, insn):
|
||||||
|
"""
|
||||||
|
check if an instruction is related to security cookie checks
|
||||||
|
"""
|
||||||
|
op1 = insn.operands[1]
|
||||||
|
if op1[OPERAND_TYPE] == OPERAND_TYPE_REGISTER and op1[REGISTER_OPERAND_REGISTER] not in (
|
||||||
|
"esp",
|
||||||
|
"ebp",
|
||||||
|
"rbp",
|
||||||
|
"rsp",
|
||||||
|
):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# expect security cookie init in first basic block within first bytes (instructions)
|
||||||
|
if f == bb.address and insn.address < (bb.address + SECURITY_COOKIE_BYTES_DELTA):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# ... or within last bytes (instructions) before a return
|
||||||
|
insns = list(xtor.get_instructions(f, bb))
|
||||||
|
if insns[-1].mnemonic in ("ret", "retn") and insn.address > (bb.address + bb.length - SECURITY_COOKIE_BYTES_DELTA):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_nzxor_characteristic_features(xtor, f, bb, insn):
|
||||||
|
"""
|
||||||
|
parse non-zeroing XOR instruction from the given instruction.
|
||||||
|
ignore expected non-zeroing XORs, e.g. security cookies.
|
||||||
|
"""
|
||||||
|
if insn.mnemonic != "xor":
|
||||||
|
return
|
||||||
|
|
||||||
|
operands = insn.operands
|
||||||
|
if operands[0] == operands[1]:
|
||||||
|
return
|
||||||
|
|
||||||
|
if is_security_cookie(xtor, f, bb, insn):
|
||||||
|
return
|
||||||
|
|
||||||
|
yield Characteristic("nzxor"), insn.address
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_peb_access_characteristic_features(xtor, f, bb, insn):
|
||||||
|
"""
|
||||||
|
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
|
||||||
|
"""
|
||||||
|
for operand in insn.operands:
|
||||||
|
if (
|
||||||
|
operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY
|
||||||
|
and operand[MEMORY_OPERAND_SEGMENT] == "gs"
|
||||||
|
and operand[MEMORY_OPERAND_DISP] == 0x60
|
||||||
|
):
|
||||||
|
yield Characteristic("peb access"), insn.address
|
||||||
|
|
||||||
|
if (
|
||||||
|
operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY
|
||||||
|
and operand[MEMORY_OPERAND_SEGMENT] == "fs"
|
||||||
|
and operand[MEMORY_OPERAND_DISP] == 0x30
|
||||||
|
):
|
||||||
|
yield Characteristic("peb access"), insn.address
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_segment_access_features(xtor, f, bb, insn):
|
||||||
|
""" parse the instruction for access to fs or gs """
|
||||||
|
for operand in insn.operands:
|
||||||
|
if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "gs":
|
||||||
|
yield Characteristic("gs access"), insn.address
|
||||||
|
|
||||||
|
if operand[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and operand[MEMORY_OPERAND_SEGMENT] == "fs":
|
||||||
|
yield Characteristic("fs access"), insn.address
|
||||||
|
|
||||||
|
|
||||||
|
def get_section(xtor, va):
|
||||||
|
pe = get_pefile(xtor)
|
||||||
|
|
||||||
|
for i, section in enumerate(pe.sections):
|
||||||
|
section_start = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress
|
||||||
|
section_end = pe.OPTIONAL_HEADER.ImageBase + section.VirtualAddress + section.Misc_VirtualSize
|
||||||
|
|
||||||
|
if section_start <= va < section_end:
|
||||||
|
return i
|
||||||
|
|
||||||
|
raise ValueError("invalid address")
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_cross_section_cflow(xtor, f, bb, insn):
|
||||||
|
"""
|
||||||
|
inspect the instruction for a CALL or JMP that crosses section boundaries.
|
||||||
|
"""
|
||||||
|
if insn.mnemonic not in FLOW_MNEMONICS:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
target = get_operand_target(insn, insn.operands[0])
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
|
||||||
|
if target in get_imports(xtor):
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
if get_section(xtor, insn.address) != get_section(xtor, target):
|
||||||
|
yield Characteristic("cross section flow"), insn.address
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def extract_function_calls_from(xtor, f, bb, insn):
|
||||||
|
cg = get_call_graph(xtor.ws)
|
||||||
|
|
||||||
|
for callee in cg.calls_from.get(insn.address, []):
|
||||||
|
yield Characteristic("calls from"), callee
|
||||||
|
|
||||||
|
if callee == f:
|
||||||
|
yield Characteristic("recursive call"), insn.address
|
||||||
|
|
||||||
|
# lancelot doesn't count API calls when constructing the call graph
|
||||||
|
# so we still have to scan for calls to an import
|
||||||
|
if insn.mnemonic != "call":
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
target = get_operand_target(insn, insn.operands[0])
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
|
||||||
|
imports = get_imports(xtor)
|
||||||
|
if target in imports:
|
||||||
|
yield Characteristic("calls from"), target
|
||||||
|
|
||||||
|
|
||||||
|
# this is a feature that's most relevant at the function or basic block scope,
|
||||||
|
# however, its most efficient to extract at the instruction scope.
|
||||||
|
def extract_function_indirect_call_characteristic_features(xtor, f, bb, insn):
|
||||||
|
"""
|
||||||
|
extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4])
|
||||||
|
does not include calls like => call ds:dword_ABD4974
|
||||||
|
"""
|
||||||
|
if insn.mnemonic != "call":
|
||||||
|
return
|
||||||
|
|
||||||
|
op0 = insn.operands[0]
|
||||||
|
if op0[OPERAND_TYPE] == OPERAND_TYPE_REGISTER:
|
||||||
|
yield Characteristic("indirect call"), insn.address
|
||||||
|
elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_BASE] is not None:
|
||||||
|
yield Characteristic("indirect call"), insn.address
|
||||||
|
elif op0[OPERAND_TYPE] == OPERAND_TYPE_MEMORY and op0[MEMORY_OPERAND_INDEX] is not None:
|
||||||
|
yield Characteristic("indirect call"), insn.address
|
||||||
|
|
||||||
|
|
||||||
|
_not_implemented = set([])
|
||||||
|
|
||||||
|
|
||||||
|
def extract_insn_features(xtor, f, bb, insn):
|
||||||
|
for insn_handler in INSTRUCTION_HANDLERS:
|
||||||
|
try:
|
||||||
|
for feature, va in insn_handler(xtor, f, bb, insn):
|
||||||
|
yield feature, va
|
||||||
|
except NotImplementedError:
|
||||||
|
if insn_handler.__name__ not in _not_implemented:
|
||||||
|
logger.warning("not implemented: %s", insn_handler.__name__)
|
||||||
|
_not_implemented.add(insn_handler.__name__)
|
||||||
|
|
||||||
|
|
||||||
|
INSTRUCTION_HANDLERS = (
|
||||||
|
extract_insn_api_features,
|
||||||
|
extract_insn_number_features,
|
||||||
|
extract_insn_string_features,
|
||||||
|
extract_insn_bytes_features,
|
||||||
|
extract_insn_offset_features,
|
||||||
|
extract_insn_nzxor_characteristic_features,
|
||||||
|
extract_insn_mnemonic_features,
|
||||||
|
extract_insn_peb_access_characteristic_features,
|
||||||
|
extract_insn_cross_section_cflow,
|
||||||
|
extract_insn_segment_access_features,
|
||||||
|
extract_function_calls_from,
|
||||||
|
extract_function_indirect_call_characteristic_features,
|
||||||
|
)
|
||||||
@@ -8,11 +8,7 @@
|
|||||||
|
|
||||||
import types
|
import types
|
||||||
|
|
||||||
import file
|
|
||||||
import insn
|
|
||||||
import function
|
|
||||||
import viv_utils
|
import viv_utils
|
||||||
import basicblock
|
|
||||||
|
|
||||||
import capa.features.extractors
|
import capa.features.extractors
|
||||||
import capa.features.extractors.viv.file
|
import capa.features.extractors.viv.file
|
||||||
|
|||||||
@@ -24,7 +24,10 @@ class Number(Feature):
|
|||||||
super(Number, self).__init__(value, arch=arch, description=description)
|
super(Number, self).__init__(value, arch=arch, description=description)
|
||||||
|
|
||||||
def get_value_str(self):
|
def get_value_str(self):
|
||||||
return "0x%X" % self.value
|
if self.value < 0:
|
||||||
|
return "-0x%X" % (-self.value)
|
||||||
|
else:
|
||||||
|
return "0x%X" % self.value
|
||||||
|
|
||||||
|
|
||||||
class Offset(Feature):
|
class Offset(Feature):
|
||||||
|
|||||||
21
capa/main.py
21
capa/main.py
@@ -32,7 +32,7 @@ import capa.features.extractors
|
|||||||
from capa.helpers import oint, get_file_taste
|
from capa.helpers import oint, get_file_taste
|
||||||
|
|
||||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||||
SUPPORTED_FILE_MAGIC = set(["MZ"])
|
SUPPORTED_FILE_MAGIC = set([b"MZ"])
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger("capa")
|
logger = logging.getLogger("capa")
|
||||||
@@ -290,7 +290,24 @@ class UnsupportedRuntimeError(RuntimeError):
|
|||||||
|
|
||||||
|
|
||||||
def get_extractor_py3(path, format, disable_progress=False):
|
def get_extractor_py3(path, format, disable_progress=False):
|
||||||
raise UnsupportedRuntimeError()
|
try:
|
||||||
|
import lancelot
|
||||||
|
|
||||||
|
import capa.features.extractors.lancelot
|
||||||
|
except ImportError:
|
||||||
|
logger.warning("lancelot not installed")
|
||||||
|
raise UnsupportedRuntimeError()
|
||||||
|
|
||||||
|
if format not in ("pe", "auto"):
|
||||||
|
raise UnsupportedFormatError(format)
|
||||||
|
|
||||||
|
if not is_supported_file_type(path):
|
||||||
|
raise UnsupportedFormatError()
|
||||||
|
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
buf = f.read()
|
||||||
|
|
||||||
|
return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf)
|
||||||
|
|
||||||
|
|
||||||
def get_extractor(path, format, disable_progress=False):
|
def get_extractor(path, format, disable_progress=False):
|
||||||
|
|||||||
47
capa/unpack/__init__.py
Normal file
47
capa/unpack/__init__.py
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
import sys
|
||||||
|
import logging
|
||||||
|
|
||||||
|
try:
|
||||||
|
from functools import lru_cache
|
||||||
|
except ImportError:
|
||||||
|
from backports.functools_lru_cache import lru_cache
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class NotPackedError(ValueError):
|
||||||
|
def __init__(self):
|
||||||
|
super(NotPackedError, self).__init__("not packed")
|
||||||
|
|
||||||
|
|
||||||
|
def can_unpack():
|
||||||
|
# the unpacking backend is based on Speakeasy, which supports python 3.6+
|
||||||
|
return sys.version_info >= (3, 6)
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache
|
||||||
|
def get_unpackers():
|
||||||
|
# break import loop
|
||||||
|
import capa.unpack.aspack
|
||||||
|
|
||||||
|
return {p.name: p for p in [capa.unpack.aspack.AspackUnpacker]}
|
||||||
|
|
||||||
|
|
||||||
|
def detect_packer(buf):
|
||||||
|
for unpacker in get_unpackers().values():
|
||||||
|
if unpacker.is_packed(buf):
|
||||||
|
return unpacker.name
|
||||||
|
|
||||||
|
raise NotPackedError()
|
||||||
|
|
||||||
|
|
||||||
|
def is_packed(buf):
|
||||||
|
try:
|
||||||
|
detect_packer(buf)
|
||||||
|
return True
|
||||||
|
except NotPackedError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def unpack_pe(packer, buf):
|
||||||
|
return get_unpackers()[packer].unpack_pe(buf)
|
||||||
459
capa/unpack/aspack.py
Normal file
459
capa/unpack/aspack.py
Normal file
@@ -0,0 +1,459 @@
|
|||||||
|
import io
|
||||||
|
import struct
|
||||||
|
import logging
|
||||||
|
import contextlib
|
||||||
|
import collections
|
||||||
|
|
||||||
|
import pefile
|
||||||
|
import speakeasy
|
||||||
|
import speakeasy.common as se_common
|
||||||
|
import speakeasy.profiler
|
||||||
|
import speakeasy.windows.objman
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def pefile_get_section_by_name(pe, section_name):
|
||||||
|
for section in pe.sections:
|
||||||
|
try:
|
||||||
|
if section.Name.partition(b"\x00")[0].decode("ascii") == section_name:
|
||||||
|
return section
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
raise ValueError("section not found")
|
||||||
|
|
||||||
|
|
||||||
|
def prepare_emu_context(se, module):
|
||||||
|
"""
|
||||||
|
prepare an Speakeasy instance for emulating the given module, without running it.
|
||||||
|
|
||||||
|
this is useful when planning to manually control the emulator,
|
||||||
|
such as via `Speakeasy.emu.emu_eng.start(...)`.
|
||||||
|
typically, Speakeasy expects to do "Run based" analysis,
|
||||||
|
which doesn't give us too much control.
|
||||||
|
|
||||||
|
much of this was derived from win32::Win32Emulator::run_module.
|
||||||
|
hopefully this can eventually be merged into Speakeasy.
|
||||||
|
|
||||||
|
args:
|
||||||
|
se (speakeasy.Speakeasy): the instance to prepare
|
||||||
|
module (speakeasy.Module): the module that will be emulated
|
||||||
|
"""
|
||||||
|
se._init_hooks()
|
||||||
|
|
||||||
|
main_exe = None
|
||||||
|
if not module.is_exe():
|
||||||
|
container = se.emu.init_container_process()
|
||||||
|
if container:
|
||||||
|
se.emu.processes.append(container)
|
||||||
|
se.emu.curr_process = container
|
||||||
|
else:
|
||||||
|
main_exe = module
|
||||||
|
|
||||||
|
if main_exe:
|
||||||
|
se.emu.user_modules = [main_exe] + se.emu.user_modules
|
||||||
|
|
||||||
|
# Create an empty process object for the module if none is supplied
|
||||||
|
if len(se.emu.processes) == 0:
|
||||||
|
p = speakeasy.windows.objman.Process(se.emu, path=module.get_emu_path(), base=module.base, pe=module)
|
||||||
|
se.emu.curr_process = p
|
||||||
|
|
||||||
|
t = speakeasy.windows.objman.Thread(se.emu, stack_base=se.emu.stack_base, stack_commit=module.stack_commit)
|
||||||
|
|
||||||
|
se.emu.om.objects.update({t.address: t})
|
||||||
|
se.emu.curr_process.threads.append(t)
|
||||||
|
se.emu.curr_thread = t
|
||||||
|
|
||||||
|
peb = se.emu.alloc_peb(se.emu.curr_process)
|
||||||
|
se.emu.init_teb(t, peb)
|
||||||
|
|
||||||
|
|
||||||
|
INSN_PUSHA = 0x60
|
||||||
|
INSN_POPA = 0x61
|
||||||
|
|
||||||
|
|
||||||
|
class AspackUnpacker(speakeasy.Speakeasy):
|
||||||
|
name = "aspack"
|
||||||
|
|
||||||
|
def __init__(self, buf, debug=False):
|
||||||
|
super(AspackUnpacker, self).__init__(debug=debug)
|
||||||
|
self.module = self.load_module(data=buf)
|
||||||
|
prepare_emu_context(self, self.module)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def detect_aspack(buf):
|
||||||
|
"""
|
||||||
|
return True if the given buffer contains an ASPack'd PE file.
|
||||||
|
we detect aspack by looking at the section names for .aspack.
|
||||||
|
the unpacking routine contains further validation and will raise an exception if necessary.
|
||||||
|
|
||||||
|
args:
|
||||||
|
buf (bytes): the contents of a PE file.
|
||||||
|
|
||||||
|
returns: bool
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
pe = pefile.PE(data=buf, fast_load=True)
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
pefile_get_section_by_name(pe, ".aspack")
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_pe(cls, buf):
|
||||||
|
"""
|
||||||
|
unpack the given buffer that contains an ASPack'd PE file.
|
||||||
|
return the contents of a reconstructed PE file.
|
||||||
|
|
||||||
|
args:
|
||||||
|
buf (bytes): the contents of an ASPack'd PE file.
|
||||||
|
|
||||||
|
returns: bytes
|
||||||
|
"""
|
||||||
|
unpacker = cls(buf)
|
||||||
|
return unpacker.unpack()
|
||||||
|
|
||||||
|
def stepi(self):
|
||||||
|
self.emu.emu_eng.start(self.emu.get_pc(), count=1)
|
||||||
|
|
||||||
|
def remove_hook(self, hook_type, hook_handle):
|
||||||
|
# TODO: this should be part of speakeasy
|
||||||
|
self.emu.hooks[hook_type].remove(hook_handle)
|
||||||
|
self.emu.emu_eng.hook_remove(hook_handle.handle)
|
||||||
|
|
||||||
|
def remove_mem_read_hook(self, hook_handle):
|
||||||
|
# TODO: this should be part of speakeasy
|
||||||
|
self.remove_hook(se_common.HOOK_MEM_READ, hook_handle)
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def mem_read_hook(self, hook):
|
||||||
|
"""
|
||||||
|
context manager for temporarily installing a hook on the emulator.
|
||||||
|
|
||||||
|
example:
|
||||||
|
|
||||||
|
with self.mem_read_hook(lambda emu, access, addr, size, ctx: emu.stop()):
|
||||||
|
self.emu.emu_eng.start(0x401000)
|
||||||
|
|
||||||
|
args:
|
||||||
|
hook (speakeasy.common.MemReadHook): the hook to install
|
||||||
|
"""
|
||||||
|
handle = self.add_mem_read_hook(hook)
|
||||||
|
# if this fails, then there's still an unfixed bug in Speakeasy
|
||||||
|
assert handle.handle != 0
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
self.remove_mem_read_hook(handle)
|
||||||
|
|
||||||
|
def remove_code_hook(self, hook_handle):
|
||||||
|
# TODO: this should be part of speakeasy
|
||||||
|
self.remove_hook(se_common.HOOK_CODE, hook_handle)
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def code_hook(self, hook):
|
||||||
|
"""
|
||||||
|
context manager for temporarily installing a hook on the emulator.
|
||||||
|
|
||||||
|
example:
|
||||||
|
|
||||||
|
with self.code_hook(lambda emu, addr, size, ctx: emu.stop()):
|
||||||
|
self.emu.emu_eng.start(0x401000)
|
||||||
|
|
||||||
|
args:
|
||||||
|
hook (speakeasy.common.CodeHook): the hook to install
|
||||||
|
"""
|
||||||
|
handle = self.add_code_hook(hook)
|
||||||
|
assert handle.handle != 0
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
self.remove_code_hook(handle)
|
||||||
|
|
||||||
|
def read_ptr(self, va):
|
||||||
|
endian = "little"
|
||||||
|
val = self.mem_read(va, self.emu.ptr_size)
|
||||||
|
return int.from_bytes(val, endian)
|
||||||
|
|
||||||
|
def dump(self):
|
||||||
|
"""
|
||||||
|
emulate the loaded module, pausing after an appropriate section hop.
|
||||||
|
then, dump and return the module's memory and OEP.
|
||||||
|
|
||||||
|
this routine is specific to aspack. it makes the following assumptions:
|
||||||
|
- aspack starts with a PUSHA to save off the CPU context
|
||||||
|
- aspeck then runs its unpacking stub
|
||||||
|
- aspeck executes POPA to restore the CPU context
|
||||||
|
- aspack section hops to the OEP
|
||||||
|
|
||||||
|
we'll emulate in a few phases:
|
||||||
|
1. single step over PUSHA at the entrypoint
|
||||||
|
2. extract the address of the saved CPU context
|
||||||
|
3. emulate until the saved CPU context is read
|
||||||
|
4. assert this is a POPA instruction
|
||||||
|
5. emulate until a section hop
|
||||||
|
6. profit!
|
||||||
|
|
||||||
|
return the module's memory segment and the OEP.
|
||||||
|
|
||||||
|
returns: Tuple[byte, int]
|
||||||
|
"""
|
||||||
|
|
||||||
|
# prime the emulator.
|
||||||
|
# this is derived from winemu::WindowsEmulator::start()
|
||||||
|
self.emu.curr_run = speakeasy.profiler.Run()
|
||||||
|
self.emu.curr_mod = self.module
|
||||||
|
self.emu.set_hooks()
|
||||||
|
self.emu._set_emu_hooks()
|
||||||
|
|
||||||
|
# 0. sanity checking: assert entrypoint is a PUSHA instruction
|
||||||
|
entrypoint = self.module.base + self.module.ep
|
||||||
|
opcode = self.emu.mem_read(entrypoint, 1)[0]
|
||||||
|
if opcode != INSN_PUSHA:
|
||||||
|
raise ValueError("not packed with supported ASPack")
|
||||||
|
|
||||||
|
# 1. single step over PUSHA
|
||||||
|
self.emu.set_pc(entrypoint)
|
||||||
|
self.stepi()
|
||||||
|
|
||||||
|
# 2. extract address of saved CPU context
|
||||||
|
saved_cpu_context = self.emu.get_stack_ptr()
|
||||||
|
|
||||||
|
# 3. emulate until saved CPU context is accessed
|
||||||
|
def until_read(target):
|
||||||
|
"""return a mem_read hook that stops the emulator when an address is read."""
|
||||||
|
|
||||||
|
def inner(emu, _access, addr, _size, _value, _ctx):
|
||||||
|
if addr == target:
|
||||||
|
emu.stop()
|
||||||
|
return True
|
||||||
|
|
||||||
|
return inner
|
||||||
|
|
||||||
|
with self.mem_read_hook(until_read(saved_cpu_context)):
|
||||||
|
self.emu.emu_eng.start(self.emu.get_pc())
|
||||||
|
|
||||||
|
# 4. assert this is a POPA instruction
|
||||||
|
opcode = self.emu.mem_read(self.emu.get_pc(), 1)[0]
|
||||||
|
if opcode != INSN_POPA:
|
||||||
|
raise ValueError("not packed with supported ASPack")
|
||||||
|
logger.debug("POPA: 0x%x", self.emu.get_pc())
|
||||||
|
|
||||||
|
# 5. emulate until a section hop
|
||||||
|
aspack_section = self.module.get_section_by_name(".aspack")
|
||||||
|
start = self.module.base + aspack_section.VirtualAddress
|
||||||
|
end = start + aspack_section.Misc_VirtualSize
|
||||||
|
|
||||||
|
def until_section_hop(start, end):
|
||||||
|
def inner(emu, addr, _size, _ctx):
|
||||||
|
if addr < start or addr >= end:
|
||||||
|
emu.stop()
|
||||||
|
return True
|
||||||
|
|
||||||
|
return inner
|
||||||
|
|
||||||
|
with self.code_hook(until_section_hop(start, end)):
|
||||||
|
self.emu.emu_eng.start(self.emu.get_pc())
|
||||||
|
|
||||||
|
# 6. dump and return
|
||||||
|
oep = self.emu.get_pc()
|
||||||
|
logger.debug("OEP: 0x%x", oep)
|
||||||
|
|
||||||
|
mm = self.get_address_map(self.module.base)
|
||||||
|
buf = self.mem_read(mm.base, mm.size)
|
||||||
|
|
||||||
|
return buf, oep
|
||||||
|
|
||||||
|
def fixup(self, buf, oep):
|
||||||
|
"""
|
||||||
|
fixup a PE image that's been dumped from memory after unpacking aspack.
|
||||||
|
|
||||||
|
there are two big fixes that need to happen:
|
||||||
|
1. update the section pointers and sizes
|
||||||
|
2. rebuild the import table
|
||||||
|
|
||||||
|
for (1) updating the section pointers, we'll just update the
|
||||||
|
physical pointers to match the virtual pointers, since this is a loaded image.
|
||||||
|
|
||||||
|
for (2) rebuilding the import table, we'll:
|
||||||
|
(a) inspect the emulation results for resolved imports, which tells us dll/symbol names
|
||||||
|
(b) scan the dumped image for the unpacked import thunks (Import Address Table/Thunk Table)
|
||||||
|
(c) match the import thunks with resolved imports
|
||||||
|
(d) build the import table structures
|
||||||
|
(e) write the reconstructed table into the .aspack section
|
||||||
|
|
||||||
|
since the .aspack section contains the unpacking stub, which is no longer used,
|
||||||
|
then we'll write the reconstructed IAT there. hopefully its big enough.
|
||||||
|
"""
|
||||||
|
pe = pefile.PE(data=buf)
|
||||||
|
|
||||||
|
pe.OPTIONAL_HEADER.AddressOfEntryPoint = oep - self.module.base
|
||||||
|
|
||||||
|
# 1. update section pointers and sizes.
|
||||||
|
for section in pe.sections:
|
||||||
|
section.PointerToRawData = section.VirtualAddress
|
||||||
|
section.SizeOfRawData = section.Misc_VirtualSize
|
||||||
|
|
||||||
|
# 2. rebuild the import table
|
||||||
|
|
||||||
|
# place the reconstructed import table in the .aspack section (unpacking stub)
|
||||||
|
reconstruction_target = pefile_get_section_by_name(pe, ".aspack").VirtualAddress
|
||||||
|
|
||||||
|
# mapping from import pointer to (dll name, symbol name).
|
||||||
|
# the import pointer is generated by speakeasy and is not mapped.
|
||||||
|
# it often looks something like 0xfeedf008.
|
||||||
|
# as we encounter pointers with values like this, we can resolve the symbol.
|
||||||
|
imports = {}
|
||||||
|
|
||||||
|
# 2a. find resolved imports
|
||||||
|
for addr, (dll, sym) in self.module.import_table.items():
|
||||||
|
# these are items in the original import table.
|
||||||
|
logger.debug(f"found static import {dll}.{sym}")
|
||||||
|
imports[addr] = (dll, sym)
|
||||||
|
for (addr, dll, sym) in self.emu.dyn_imps:
|
||||||
|
# these are imports that have been resolved at runtime by the unpacking stub.
|
||||||
|
logger.debug(f"found dynamic import {dll}.{sym}")
|
||||||
|
imports[addr] = (dll, sym)
|
||||||
|
|
||||||
|
# 2b. find the existing thunk tables
|
||||||
|
# these are pointer-aligned tables of import pointers.
|
||||||
|
# in my test sample, its found at the start of the first section.
|
||||||
|
|
||||||
|
# ordered list of tuples (VA, import pointer)
|
||||||
|
# look up the symbol using the import pointer and the `imports` mapping.
|
||||||
|
thunks = []
|
||||||
|
|
||||||
|
# scan from the start of the first section
|
||||||
|
# until we reach values that don't look like thunk tables.
|
||||||
|
for va in range(pe.sections[0].VirtualAddress + self.module.base, 0xFFFFFFFFFFFFFFFF, self.emu.ptr_size):
|
||||||
|
ptr = self.read_ptr(va)
|
||||||
|
if ptr == 0:
|
||||||
|
# probably padding/terminating entry
|
||||||
|
continue
|
||||||
|
|
||||||
|
if ptr in imports:
|
||||||
|
thunks.append((va, ptr,))
|
||||||
|
logger.debug(f"found import thunk at {va:08x} to {ptr:08x} for {imports[ptr][0]}\t{imports[ptr][1]}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# otherwise, at the end of the thunk tables
|
||||||
|
break
|
||||||
|
|
||||||
|
# collect the thunk entries into contiguous tables, grouped by dll name.
|
||||||
|
#
|
||||||
|
# list of thunk tuples that are contiguous and have the same dll name:
|
||||||
|
# (VA, import pointer, dll name, symbol name)
|
||||||
|
curr_idt_table = []
|
||||||
|
# list of list of thunk tuples, like above
|
||||||
|
idt_tables = []
|
||||||
|
for thunk in thunks:
|
||||||
|
va, imp = thunk
|
||||||
|
dll, sym = imports[imp]
|
||||||
|
|
||||||
|
if not curr_idt_table:
|
||||||
|
curr_idt_table.append((va, imp, dll, sym))
|
||||||
|
elif curr_idt_table[0][2] == dll:
|
||||||
|
curr_idt_table.append((va, imp, dll, sym))
|
||||||
|
else:
|
||||||
|
idt_tables.append(curr_idt_table)
|
||||||
|
curr_idt_table = [(va, imp, dll, sym)]
|
||||||
|
idt_tables.append(curr_idt_table)
|
||||||
|
|
||||||
|
# 2d. build the import table structures
|
||||||
|
|
||||||
|
# mapping from the data identifier to its RVA (which will be found within the reconstruction blob)
|
||||||
|
locations = {}
|
||||||
|
# the raw bytes of the reconstructed import structures.
|
||||||
|
# it will have the following layout:
|
||||||
|
# 1. DLL name strings and Hint/Name table entries
|
||||||
|
# 2. Import Lookup Tables (points into (1))
|
||||||
|
# 3. Import Directory Tables (points into (1), (2), and original Thunk Tables)
|
||||||
|
reconstruction = io.BytesIO()
|
||||||
|
|
||||||
|
# list of dll names
|
||||||
|
dlls = list(sorted(set(map(lambda pair: pair[0], imports.values()))))
|
||||||
|
# mapping from dll name to list of symbols
|
||||||
|
symbols = collections.defaultdict(set)
|
||||||
|
for dll, sym in imports.values():
|
||||||
|
symbols[dll].add(sym)
|
||||||
|
|
||||||
|
# emit strings into the reconstruction blob
|
||||||
|
for dll in dlls:
|
||||||
|
locations[("dll", dll)] = reconstruction_target + reconstruction.tell()
|
||||||
|
reconstruction.write(dll.encode("ascii") + b"\x00")
|
||||||
|
if reconstruction.tell() % 2 == 1:
|
||||||
|
# padding
|
||||||
|
reconstruction.write(b"\x00")
|
||||||
|
|
||||||
|
for sym in sorted(symbols[dll]):
|
||||||
|
locations[("hint", dll, sym)] = reconstruction_target + reconstruction.tell()
|
||||||
|
# export name pointer table hint == 0
|
||||||
|
reconstruction.write(b"\x00\x00")
|
||||||
|
# name
|
||||||
|
reconstruction.write(sym.encode("ascii") + b"\x00")
|
||||||
|
if reconstruction.tell() % 2 == 1:
|
||||||
|
# padding
|
||||||
|
reconstruction.write(b"\x00")
|
||||||
|
|
||||||
|
# emit Import Lookup Tables for each recovered thunk table
|
||||||
|
ptr_format = "<I" if self.emu.ptr_size == 4 else "<Q"
|
||||||
|
for i, idt_entry in enumerate(idt_tables):
|
||||||
|
locations[("import lookup table", i)] = reconstruction_target + reconstruction.tell()
|
||||||
|
for (va, imp, dll, sym) in idt_entry:
|
||||||
|
reconstruction.write(struct.pack(ptr_format, locations[("hint", dll, sym)]))
|
||||||
|
reconstruction.write(b"\x00" * 8)
|
||||||
|
|
||||||
|
# emit Import Descriptor Tables for each recovered thunk table
|
||||||
|
IDT_ENTRY_SIZE = 0x20
|
||||||
|
for i, idt_entry in enumerate(idt_tables):
|
||||||
|
va, _, dll, _ = idt_entry[0]
|
||||||
|
rva = va - self.module.base
|
||||||
|
locations[("import descriptor table", i)] = reconstruction_target + reconstruction.tell()
|
||||||
|
|
||||||
|
# import lookup table rva
|
||||||
|
reconstruction.write(struct.pack("<I", locations[("import lookup table", i)]))
|
||||||
|
# date stamp
|
||||||
|
reconstruction.write(struct.pack("<I", 0x0))
|
||||||
|
# forwarder chain
|
||||||
|
reconstruction.write(struct.pack("<I", 0x0))
|
||||||
|
# name rva
|
||||||
|
reconstruction.write(struct.pack("<I", locations[("dll", dll)]))
|
||||||
|
# import address table rva
|
||||||
|
reconstruction.write(struct.pack("<I", rva))
|
||||||
|
# empty last entry
|
||||||
|
reconstruction.write(b"\x00" * IDT_ENTRY_SIZE)
|
||||||
|
|
||||||
|
# if the reconstructed import structures are larger than the unpacking stub...
|
||||||
|
# i'm not sure what we'll do. probably need to add a section.
|
||||||
|
assert len(reconstruction.getvalue()) <= pefile_get_section_by_name(pe, ".aspack").Misc_VirtualSize
|
||||||
|
|
||||||
|
pe.set_bytes_at_rva(reconstruction_target, reconstruction.getvalue())
|
||||||
|
pe.OPTIONAL_HEADER.DATA_DIRECTORY[1].VirtualAddress = locations[("import descriptor table", 0)]
|
||||||
|
pe.OPTIONAL_HEADER.DATA_DIRECTORY[1].Size = IDT_ENTRY_SIZE * len(idt_tables)
|
||||||
|
|
||||||
|
return pe.write()
|
||||||
|
|
||||||
|
def unpack(self):
|
||||||
|
buf, oep = self.dump()
|
||||||
|
buf = self.fixup(buf, oep)
|
||||||
|
return buf
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import sys
|
||||||
|
|
||||||
|
input = sys.argv[1]
|
||||||
|
output = sys.argv[1]
|
||||||
|
|
||||||
|
with open(sys.argv[1], "rb") as f:
|
||||||
|
buf = f.read()
|
||||||
|
|
||||||
|
with open(sys.argv[2], "wb") as f:
|
||||||
|
f.write(AspackUnpacker.unpack_pe(buf))
|
||||||
@@ -1,112 +1,112 @@
|
|||||||
"""
|
"""
|
||||||
Binary Ninja plugin that imports a capa report,
|
Binary Ninja plugin that imports a capa report,
|
||||||
produced via `capa --json /path/to/sample`,
|
produced via `capa --json /path/to/sample`,
|
||||||
into the current database.
|
into the current database.
|
||||||
|
|
||||||
It will mark up functions with their capa matches, like:
|
It will mark up functions with their capa matches, like:
|
||||||
|
|
||||||
; capa: print debug messages (host-interaction/log/debug/write-event)
|
; capa: print debug messages (host-interaction/log/debug/write-event)
|
||||||
; capa: delete service (host-interaction/service/delete)
|
; capa: delete service (host-interaction/service/delete)
|
||||||
; Attributes: bp-based frame
|
; Attributes: bp-based frame
|
||||||
|
|
||||||
public UninstallService
|
public UninstallService
|
||||||
UninstallService proc near
|
UninstallService proc near
|
||||||
...
|
...
|
||||||
|
|
||||||
To use, invoke from the Binary Ninja Tools menu, or from the
|
To use, invoke from the Binary Ninja Tools menu, or from the
|
||||||
command-palette.
|
command-palette.
|
||||||
|
|
||||||
Adapted for Binary Ninja by @psifertex
|
Adapted for Binary Ninja by @psifertex
|
||||||
|
|
||||||
This script will verify that the report matches the workspace.
|
This script will verify that the report matches the workspace.
|
||||||
Check the log window for any errors, and/or the summary of changes.
|
Check the log window for any errors, and/or the summary of changes.
|
||||||
|
|
||||||
Derived from: https://github.com/fireeye/capa/blob/master/scripts/import-to-ida.py
|
Derived from: https://github.com/fireeye/capa/blob/master/scripts/import-to-ida.py
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from binaryninja import *
|
from binaryninja import *
|
||||||
|
|
||||||
|
|
||||||
def append_func_cmt(bv, va, cmt):
|
def append_func_cmt(bv, va, cmt):
|
||||||
"""
|
"""
|
||||||
add the given comment to the given function,
|
add the given comment to the given function,
|
||||||
if it doesn't already exist.
|
if it doesn't already exist.
|
||||||
"""
|
"""
|
||||||
func = bv.get_function_at(va)
|
func = bv.get_function_at(va)
|
||||||
if not func:
|
if not func:
|
||||||
raise ValueError("not a function")
|
raise ValueError("not a function")
|
||||||
|
|
||||||
if cmt in func.comment:
|
if cmt in func.comment:
|
||||||
return
|
return
|
||||||
|
|
||||||
func.comment = func.comment + "\n" + cmt
|
func.comment = func.comment + "\n" + cmt
|
||||||
|
|
||||||
|
|
||||||
def load_analysis(bv):
|
def load_analysis(bv):
|
||||||
shortname = os.path.splitext(os.path.basename(bv.file.filename))[0]
|
shortname = os.path.splitext(os.path.basename(bv.file.filename))[0]
|
||||||
dirname = os.path.dirname(bv.file.filename)
|
dirname = os.path.dirname(bv.file.filename)
|
||||||
log_info(f"dirname: {dirname}\nshortname: {shortname}\n")
|
log_info(f"dirname: {dirname}\nshortname: {shortname}\n")
|
||||||
if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK):
|
if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK):
|
||||||
path = os.path.join(dirname, shortname + ".js")
|
path = os.path.join(dirname, shortname + ".js")
|
||||||
elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK):
|
elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK):
|
||||||
path = os.path.join(dirname, shortname + ".json")
|
path = os.path.join(dirname, shortname + ".json")
|
||||||
else:
|
else:
|
||||||
path = interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)")
|
path = interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)")
|
||||||
if not path or not os.access(path, os.R_OK):
|
if not path or not os.access(path, os.R_OK):
|
||||||
log_error("Invalid filename.")
|
log_error("Invalid filename.")
|
||||||
return 0
|
return 0
|
||||||
log_info("Using capa file %s" % path)
|
log_info("Using capa file %s" % path)
|
||||||
|
|
||||||
with open(path, "rb") as f:
|
with open(path, "rb") as f:
|
||||||
doc = json.loads(f.read().decode("utf-8"))
|
doc = json.loads(f.read().decode("utf-8"))
|
||||||
|
|
||||||
if "meta" not in doc or "rules" not in doc:
|
if "meta" not in doc or "rules" not in doc:
|
||||||
log_error("doesn't appear to be a capa report")
|
log_error("doesn't appear to be a capa report")
|
||||||
return -1
|
return -1
|
||||||
|
|
||||||
a = doc["meta"]["sample"]["md5"].lower()
|
a = doc["meta"]["sample"]["md5"].lower()
|
||||||
md5 = Transform["MD5"]
|
md5 = Transform["MD5"]
|
||||||
rawhex = Transform["RawHex"]
|
rawhex = Transform["RawHex"]
|
||||||
b = rawhex.encode(md5.encode(bv.parent_view.read(bv.parent_view.start, bv.parent_view.end))).decode("utf-8")
|
b = rawhex.encode(md5.encode(bv.parent_view.read(bv.parent_view.start, bv.parent_view.end))).decode("utf-8")
|
||||||
if not a == b:
|
if not a == b:
|
||||||
log_error("sample mismatch")
|
log_error("sample mismatch")
|
||||||
return -2
|
return -2
|
||||||
|
|
||||||
rows = []
|
rows = []
|
||||||
for rule in doc["rules"].values():
|
for rule in doc["rules"].values():
|
||||||
if rule["meta"].get("lib"):
|
if rule["meta"].get("lib"):
|
||||||
continue
|
continue
|
||||||
if rule["meta"].get("capa/subscope"):
|
if rule["meta"].get("capa/subscope"):
|
||||||
continue
|
continue
|
||||||
if rule["meta"]["scope"] != "function":
|
if rule["meta"]["scope"] != "function":
|
||||||
continue
|
continue
|
||||||
|
|
||||||
name = rule["meta"]["name"]
|
name = rule["meta"]["name"]
|
||||||
ns = rule["meta"].get("namespace", "")
|
ns = rule["meta"].get("namespace", "")
|
||||||
for va in rule["matches"].keys():
|
for va in rule["matches"].keys():
|
||||||
va = int(va)
|
va = int(va)
|
||||||
rows.append((ns, name, va))
|
rows.append((ns, name, va))
|
||||||
|
|
||||||
# order by (namespace, name) so that like things show up together
|
# order by (namespace, name) so that like things show up together
|
||||||
rows = sorted(rows)
|
rows = sorted(rows)
|
||||||
for ns, name, va in rows:
|
for ns, name, va in rows:
|
||||||
if ns:
|
if ns:
|
||||||
cmt = "%s (%s)" % (name, ns)
|
cmt = "%s (%s)" % (name, ns)
|
||||||
else:
|
else:
|
||||||
cmt = "%s" % (name,)
|
cmt = "%s" % (name,)
|
||||||
|
|
||||||
log_info("0x%x: %s" % (va, cmt))
|
log_info("0x%x: %s" % (va, cmt))
|
||||||
try:
|
try:
|
||||||
# message will look something like:
|
# message will look something like:
|
||||||
#
|
#
|
||||||
# capa: delete service (host-interaction/service/delete)
|
# capa: delete service (host-interaction/service/delete)
|
||||||
append_func_cmt(bv, va, "capa: " + cmt)
|
append_func_cmt(bv, va, "capa: " + cmt)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
log_info("ok")
|
log_info("ok")
|
||||||
|
|
||||||
|
|
||||||
PluginCommand.register("Load capa file", "Loads an analysis file from capa", load_analysis)
|
PluginCommand.register("Load capa file", "Loads an analysis file from capa", load_analysis)
|
||||||
|
|||||||
@@ -1,117 +1,117 @@
|
|||||||
"""
|
"""
|
||||||
IDA Pro script that imports a capa report,
|
IDA Pro script that imports a capa report,
|
||||||
produced via `capa --json /path/to/sample`,
|
produced via `capa --json /path/to/sample`,
|
||||||
into the current database.
|
into the current database.
|
||||||
|
|
||||||
It will mark up functions with their capa matches, like:
|
It will mark up functions with their capa matches, like:
|
||||||
|
|
||||||
; capa: print debug messages (host-interaction/log/debug/write-event)
|
; capa: print debug messages (host-interaction/log/debug/write-event)
|
||||||
; capa: delete service (host-interaction/service/delete)
|
; capa: delete service (host-interaction/service/delete)
|
||||||
; Attributes: bp-based frame
|
; Attributes: bp-based frame
|
||||||
|
|
||||||
public UninstallService
|
public UninstallService
|
||||||
UninstallService proc near
|
UninstallService proc near
|
||||||
...
|
...
|
||||||
|
|
||||||
To use, invoke from the IDA Pro scripting dialog,
|
To use, invoke from the IDA Pro scripting dialog,
|
||||||
such as via Alt-F9,
|
such as via Alt-F9,
|
||||||
and then select the existing capa report from the file system.
|
and then select the existing capa report from the file system.
|
||||||
|
|
||||||
This script will verify that the report matches the workspace.
|
This script will verify that the report matches the workspace.
|
||||||
Check the output window for any errors, and/or the summary of changes.
|
Check the output window for any errors, and/or the summary of changes.
|
||||||
|
|
||||||
Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
You may obtain a copy of the License at: [package root]/LICENSE.txt
|
You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||||
Unless required by applicable law or agreed to in writing, software distributed under the License
|
Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
See the License for the specific language governing permissions and limitations under the License.
|
See the License for the specific language governing permissions and limitations under the License.
|
||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import idc
|
import idc
|
||||||
import idautils
|
import idautils
|
||||||
import ida_idaapi
|
import ida_idaapi
|
||||||
import ida_kernwin
|
import ida_kernwin
|
||||||
|
|
||||||
logger = logging.getLogger("capa")
|
logger = logging.getLogger("capa")
|
||||||
|
|
||||||
|
|
||||||
def append_func_cmt(va, cmt, repeatable=False):
|
def append_func_cmt(va, cmt, repeatable=False):
|
||||||
"""
|
"""
|
||||||
add the given comment to the given function,
|
add the given comment to the given function,
|
||||||
if it doesn't already exist.
|
if it doesn't already exist.
|
||||||
"""
|
"""
|
||||||
func = ida_funcs.get_func(va)
|
func = ida_funcs.get_func(va)
|
||||||
if not func:
|
if not func:
|
||||||
raise ValueError("not a function")
|
raise ValueError("not a function")
|
||||||
|
|
||||||
existing = ida_funcs.get_func_cmt(func, repeatable) or ""
|
existing = ida_funcs.get_func_cmt(func, repeatable) or ""
|
||||||
if cmt in existing:
|
if cmt in existing:
|
||||||
return
|
return
|
||||||
|
|
||||||
new = existing + "\n" + cmt
|
new = existing + "\n" + cmt
|
||||||
ida_funcs.set_func_cmt(func, new, repeatable)
|
ida_funcs.set_func_cmt(func, new, repeatable)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
path = ida_kernwin.ask_file(False, "*", "capa report")
|
path = ida_kernwin.ask_file(False, "*", "capa report")
|
||||||
if not path:
|
if not path:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
with open(path, "rb") as f:
|
with open(path, "rb") as f:
|
||||||
doc = json.loads(f.read().decode("utf-8"))
|
doc = json.loads(f.read().decode("utf-8"))
|
||||||
|
|
||||||
if "meta" not in doc or "rules" not in doc:
|
if "meta" not in doc or "rules" not in doc:
|
||||||
logger.error("doesn't appear to be a capa report")
|
logger.error("doesn't appear to be a capa report")
|
||||||
return -1
|
return -1
|
||||||
|
|
||||||
# in IDA 7.4, the MD5 hash may be truncated, for example:
|
# in IDA 7.4, the MD5 hash may be truncated, for example:
|
||||||
# wanted: 84882c9d43e23d63b82004fae74ebb61
|
# wanted: 84882c9d43e23d63b82004fae74ebb61
|
||||||
# found: b'84882C9D43E23D63B82004FAE74EBB6\x00'
|
# found: b'84882C9D43E23D63B82004FAE74EBB6\x00'
|
||||||
#
|
#
|
||||||
# see: https://github.com/idapython/bin/issues/11
|
# see: https://github.com/idapython/bin/issues/11
|
||||||
a = doc["meta"]["sample"]["md5"].lower()
|
a = doc["meta"]["sample"]["md5"].lower()
|
||||||
b = idautils.GetInputFileMD5().decode("ascii").lower().rstrip("\x00")
|
b = idautils.GetInputFileMD5().decode("ascii").lower().rstrip("\x00")
|
||||||
if not a.startswith(b):
|
if not a.startswith(b):
|
||||||
logger.error("sample mismatch")
|
logger.error("sample mismatch")
|
||||||
return -2
|
return -2
|
||||||
|
|
||||||
rows = []
|
rows = []
|
||||||
for rule in doc["rules"].values():
|
for rule in doc["rules"].values():
|
||||||
if rule["meta"].get("lib"):
|
if rule["meta"].get("lib"):
|
||||||
continue
|
continue
|
||||||
if rule["meta"].get("capa/subscope"):
|
if rule["meta"].get("capa/subscope"):
|
||||||
continue
|
continue
|
||||||
if rule["meta"]["scope"] != "function":
|
if rule["meta"]["scope"] != "function":
|
||||||
continue
|
continue
|
||||||
|
|
||||||
name = rule["meta"]["name"]
|
name = rule["meta"]["name"]
|
||||||
ns = rule["meta"].get("namespace", "")
|
ns = rule["meta"].get("namespace", "")
|
||||||
for va in rule["matches"].keys():
|
for va in rule["matches"].keys():
|
||||||
va = int(va)
|
va = int(va)
|
||||||
rows.append((ns, name, va))
|
rows.append((ns, name, va))
|
||||||
|
|
||||||
# order by (namespace, name) so that like things show up together
|
# order by (namespace, name) so that like things show up together
|
||||||
rows = sorted(rows)
|
rows = sorted(rows)
|
||||||
for ns, name, va in rows:
|
for ns, name, va in rows:
|
||||||
if ns:
|
if ns:
|
||||||
cmt = "%s (%s)" % (name, ns)
|
cmt = "%s (%s)" % (name, ns)
|
||||||
else:
|
else:
|
||||||
cmt = "%s" % (name,)
|
cmt = "%s" % (name,)
|
||||||
|
|
||||||
logger.info("0x%x: %s", va, cmt)
|
logger.info("0x%x: %s", va, cmt)
|
||||||
try:
|
try:
|
||||||
# message will look something like:
|
# message will look something like:
|
||||||
#
|
#
|
||||||
# capa: delete service (host-interaction/service/delete)
|
# capa: delete service (host-interaction/service/delete)
|
||||||
append_func_cmt(va, "capa: " + cmt, repeatable=False)
|
append_func_cmt(va, "capa: " + cmt, repeatable=False)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
logger.info("ok")
|
logger.info("ok")
|
||||||
|
|
||||||
|
|
||||||
main()
|
main()
|
||||||
|
|||||||
3
setup.py
3
setup.py
@@ -17,6 +17,7 @@ requirements = ["six", "tqdm", "pyyaml", "tabulate", "colorama", "termcolor", "r
|
|||||||
if sys.version_info >= (3, 0):
|
if sys.version_info >= (3, 0):
|
||||||
# py3
|
# py3
|
||||||
requirements.append("networkx")
|
requirements.append("networkx")
|
||||||
|
requirements.append("pylancelot~=0.3.6")
|
||||||
else:
|
else:
|
||||||
# py2
|
# py2
|
||||||
requirements.append("enum34")
|
requirements.append("enum34")
|
||||||
@@ -54,7 +55,7 @@ setuptools.setup(
|
|||||||
"pycodestyle",
|
"pycodestyle",
|
||||||
"black ; python_version>'3.0'",
|
"black ; python_version>'3.0'",
|
||||||
"isort",
|
"isort",
|
||||||
]
|
],
|
||||||
},
|
},
|
||||||
zip_safe=False,
|
zip_safe=False,
|
||||||
keywords="capa",
|
keywords="capa",
|
||||||
|
|||||||
@@ -80,6 +80,16 @@ def get_viv_extractor(path):
|
|||||||
return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
|
return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path)
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache
|
||||||
|
def get_lancelot_extractor(path):
|
||||||
|
import capa.features.extractors.lancelot
|
||||||
|
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
buf = f.read()
|
||||||
|
|
||||||
|
return capa.features.extractors.lancelot.LancelotFeatureExtractor(buf)
|
||||||
|
|
||||||
|
|
||||||
@lru_cache()
|
@lru_cache()
|
||||||
def extract_file_features(extractor):
|
def extract_file_features(extractor):
|
||||||
features = collections.defaultdict(set)
|
features = collections.defaultdict(set)
|
||||||
@@ -140,6 +150,8 @@ def get_data_path_by_name(name):
|
|||||||
return os.path.join(CD, "data", "bfb9b5391a13d0afd787e87ab90f14f5.dll_")
|
return os.path.join(CD, "data", "bfb9b5391a13d0afd787e87ab90f14f5.dll_")
|
||||||
elif name.startswith("c9188"):
|
elif name.startswith("c9188"):
|
||||||
return os.path.join(CD, "data", "c91887d861d9bd4a5872249b641bc9f9.exe_")
|
return os.path.join(CD, "data", "c91887d861d9bd4a5872249b641bc9f9.exe_")
|
||||||
|
elif name == "aspack":
|
||||||
|
return os.path.join(CD, "data", "2055994ff75b4309eee3a49c5749d306")
|
||||||
else:
|
else:
|
||||||
raise ValueError("unexpected sample fixture")
|
raise ValueError("unexpected sample fixture")
|
||||||
|
|
||||||
@@ -427,7 +439,7 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected):
|
|||||||
|
|
||||||
def get_extractor(path):
|
def get_extractor(path):
|
||||||
if sys.version_info >= (3, 0):
|
if sys.version_info >= (3, 0):
|
||||||
raise RuntimeError("no supported py3 backends yet")
|
extractor = get_lancelot_extractor(path)
|
||||||
else:
|
else:
|
||||||
extractor = get_viv_extractor(path)
|
extractor = get_viv_extractor(path)
|
||||||
|
|
||||||
@@ -494,3 +506,8 @@ def z499c2_extractor():
|
|||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def al_khaser_x86_extractor():
|
def al_khaser_x86_extractor():
|
||||||
return get_extractor(get_data_path_by_name("al-khaser x86"))
|
return get_extractor(get_data_path_by_name("al-khaser x86"))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def aspack_extractor():
|
||||||
|
return get_extractor(get_data_path_by_name("aspack"))
|
||||||
|
|||||||
@@ -1,104 +1,104 @@
|
|||||||
# run this script from within IDA with ./tests/data/mimikatz.exe open
|
# run this script from within IDA with ./tests/data/mimikatz.exe open
|
||||||
import sys
|
import sys
|
||||||
import logging
|
import logging
|
||||||
import os.path
|
import os.path
|
||||||
import binascii
|
import binascii
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sys.path.append(os.path.dirname(__file__))
|
sys.path.append(os.path.dirname(__file__))
|
||||||
from fixtures import *
|
from fixtures import *
|
||||||
finally:
|
finally:
|
||||||
sys.path.pop()
|
sys.path.pop()
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger("test_ida_features")
|
logger = logging.getLogger("test_ida_features")
|
||||||
|
|
||||||
|
|
||||||
def check_input_file(wanted):
|
def check_input_file(wanted):
|
||||||
import idautils
|
import idautils
|
||||||
|
|
||||||
# some versions (7.4) of IDA return a truncated version of the MD5.
|
# some versions (7.4) of IDA return a truncated version of the MD5.
|
||||||
# https://github.com/idapython/bin/issues/11
|
# https://github.com/idapython/bin/issues/11
|
||||||
try:
|
try:
|
||||||
found = idautils.GetInputFileMD5()[:31].decode("ascii").lower()
|
found = idautils.GetInputFileMD5()[:31].decode("ascii").lower()
|
||||||
except UnicodeDecodeError:
|
except UnicodeDecodeError:
|
||||||
# in IDA 7.5 or so, GetInputFileMD5 started returning raw binary
|
# in IDA 7.5 or so, GetInputFileMD5 started returning raw binary
|
||||||
# rather than the hex digest
|
# rather than the hex digest
|
||||||
found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower()
|
found = binascii.hexlify(idautils.GetInputFileMD5()[:15]).decode("ascii").lower()
|
||||||
|
|
||||||
if not wanted.startswith(found):
|
if not wanted.startswith(found):
|
||||||
raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted))
|
raise RuntimeError("please run the tests against sample with MD5: `%s`" % (wanted))
|
||||||
|
|
||||||
|
|
||||||
def get_ida_extractor(_path):
|
def get_ida_extractor(_path):
|
||||||
check_input_file("5f66b82558ca92e54e77f216ef4c066c")
|
check_input_file("5f66b82558ca92e54e77f216ef4c066c")
|
||||||
|
|
||||||
# have to import import this inline so pytest doesn't bail outside of IDA
|
# have to import import this inline so pytest doesn't bail outside of IDA
|
||||||
import capa.features.extractors.ida
|
import capa.features.extractors.ida
|
||||||
|
|
||||||
return capa.features.extractors.ida.IdaFeatureExtractor()
|
return capa.features.extractors.ida.IdaFeatureExtractor()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
||||||
def test_ida_features():
|
def test_ida_features():
|
||||||
for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS:
|
for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS:
|
||||||
id = make_test_id((sample, scope, feature, expected))
|
id = make_test_id((sample, scope, feature, expected))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
check_input_file(get_sample_md5_by_name(sample))
|
check_input_file(get_sample_md5_by_name(sample))
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
print("SKIP %s" % (id))
|
print("SKIP %s" % (id))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
scope = resolve_scope(scope)
|
scope = resolve_scope(scope)
|
||||||
sample = resolve_sample(sample)
|
sample = resolve_sample(sample)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
|
do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("FAIL %s" % (id))
|
print("FAIL %s" % (id))
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
else:
|
else:
|
||||||
print("OK %s" % (id))
|
print("OK %s" % (id))
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
@pytest.mark.skip(reason="IDA Pro tests must be run within IDA")
|
||||||
def test_ida_feature_counts():
|
def test_ida_feature_counts():
|
||||||
for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS:
|
for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS:
|
||||||
id = make_test_id((sample, scope, feature, expected))
|
id = make_test_id((sample, scope, feature, expected))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
check_input_file(get_sample_md5_by_name(sample))
|
check_input_file(get_sample_md5_by_name(sample))
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
print("SKIP %s" % (id))
|
print("SKIP %s" % (id))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
scope = resolve_scope(scope)
|
scope = resolve_scope(scope)
|
||||||
sample = resolve_sample(sample)
|
sample = resolve_sample(sample)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
|
do_test_feature_count(get_ida_extractor, sample, scope, feature, expected)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("FAIL %s" % (id))
|
print("FAIL %s" % (id))
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
else:
|
else:
|
||||||
print("OK %s" % (id))
|
print("OK %s" % (id))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
print("-" * 80)
|
print("-" * 80)
|
||||||
|
|
||||||
# invoke all functions in this module that start with `test_`
|
# invoke all functions in this module that start with `test_`
|
||||||
for name in dir(sys.modules[__name__]):
|
for name in dir(sys.modules[__name__]):
|
||||||
if not name.startswith("test_"):
|
if not name.startswith("test_"):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
test = getattr(sys.modules[__name__], name)
|
test = getattr(sys.modules[__name__], name)
|
||||||
logger.debug("invoking test: %s", name)
|
logger.debug("invoking test: %s", name)
|
||||||
sys.stderr.flush()
|
sys.stderr.flush()
|
||||||
test()
|
test()
|
||||||
|
|
||||||
print("DONE")
|
print("DONE")
|
||||||
|
|||||||
26
tests/test_lancelot_features.py
Normal file
26
tests/test_lancelot_features.py
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
from fixtures import *
|
||||||
|
|
||||||
|
|
||||||
|
@parametrize(
|
||||||
|
"sample,scope,feature,expected", FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"],
|
||||||
|
)
|
||||||
|
def test_lancelot_features(sample, scope, feature, expected):
|
||||||
|
with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"):
|
||||||
|
do_test_feature_presence(get_lancelot_extractor, sample, scope, feature, expected)
|
||||||
|
|
||||||
|
|
||||||
|
@parametrize(
|
||||||
|
"sample,scope,feature,expected", FEATURE_COUNT_TESTS, indirect=["sample", "scope"],
|
||||||
|
)
|
||||||
|
def test_lancelot_feature_counts(sample, scope, feature, expected):
|
||||||
|
with xfail(sys.version_info < (3, 0), reason="lancelot only works on py3"):
|
||||||
|
do_test_feature_count(get_lancelot_extractor, sample, scope, feature, expected)
|
||||||
@@ -47,7 +47,7 @@ def test_main_single_rule(z9324d_extractor, tmpdir):
|
|||||||
assert capa.main.main([path, "-v", "-r", rule_file.strpath,]) == 0
|
assert capa.main.main([path, "-v", "-r", rule_file.strpath,]) == 0
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2")
|
@pytest.mark.xfail(sys.version_info >= (3, 0), reason="lancelot doesn't support shellcode workspaces")
|
||||||
def test_main_shellcode(z499c2_extractor):
|
def test_main_shellcode(z499c2_extractor):
|
||||||
path = z499c2_extractor.path
|
path = z499c2_extractor.path
|
||||||
assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0
|
assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0
|
||||||
|
|||||||
62
tests/test_unpack.py
Normal file
62
tests/test_unpack.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
# Copyright (C) 2020 FireEye, Inc. All Rights Reserved.
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||||
|
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import pefile
|
||||||
|
import pytest
|
||||||
|
from fixtures import *
|
||||||
|
|
||||||
|
import capa.unpack
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
|
||||||
|
def test_aspack_is_packed(aspack_extractor):
|
||||||
|
path = aspack_extractor.path
|
||||||
|
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
buf = f.read()
|
||||||
|
|
||||||
|
assert capa.unpack.is_packed(buf) is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
|
||||||
|
def test_aspack_detect(aspack_extractor):
|
||||||
|
path = aspack_extractor.path
|
||||||
|
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
buf = f.read()
|
||||||
|
|
||||||
|
assert capa.unpack.detect_packer(buf) == "aspack"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.xfail(sys.version_info <= (3, 5), reason="auto-unpack only works on py3.6+")
|
||||||
|
def test_aspack_unpack(aspack_extractor):
|
||||||
|
with open(aspack_extractor.path, "rb") as f:
|
||||||
|
buf = f.read()
|
||||||
|
|
||||||
|
unpacked = capa.unpack.unpack_pe("aspack", buf)
|
||||||
|
|
||||||
|
pe = pefile.PE(data=unpacked)
|
||||||
|
assert pe.OPTIONAL_HEADER.ImageBase == 0x4AD00000
|
||||||
|
assert pe.OPTIONAL_HEADER.AddressOfEntryPoint == 0x1A610
|
||||||
|
assert b"This program cannot be run in DOS mode" in unpacked
|
||||||
|
assert "(C) Copyright 1985-2000 Microsoft Corp.".encode("utf-16le") in unpacked
|
||||||
|
assert "CMD.EXE has halted. %0".encode("utf-16le") in unpacked
|
||||||
|
|
||||||
|
dlls = set([])
|
||||||
|
syms = set([])
|
||||||
|
for entry in pe.DIRECTORY_ENTRY_IMPORT:
|
||||||
|
dlls.add(entry.dll.decode("ascii").lower().partition(".")[0])
|
||||||
|
for imp in entry.imports:
|
||||||
|
syms.add(imp.name.decode("ascii"))
|
||||||
|
|
||||||
|
assert dlls == {"advapi32", "kernel32", "msvcrt", "user32"}
|
||||||
|
assert "RegQueryValueExW" in syms
|
||||||
|
assert "WriteConsoleW" in syms
|
||||||
|
assert "realloc" in syms
|
||||||
|
assert "GetProcessWindowStation" in syms
|
||||||
Reference in New Issue
Block a user