mirror of
https://github.com/mandiant/capa.git
synced 2025-12-08 05:40:36 -08:00
Compare commits
15 Commits
v6.0.0
...
backend-mi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3831f1c104 | ||
|
|
dc828e82b3 | ||
|
|
2e98ba990c | ||
|
|
d008fef23f | ||
|
|
fe458c387a | ||
|
|
3e52c7de23 | ||
|
|
2d1e7946e3 | ||
|
|
f2fe173ef3 | ||
|
|
b2fc52d390 | ||
|
|
5ba4629c3c | ||
|
|
4fc9c77791 | ||
|
|
31ba9ee1b3 | ||
|
|
b4a808ac76 | ||
|
|
0f030115d1 | ||
|
|
42573d8df2 |
@@ -8,6 +8,8 @@
|
||||
|
||||
import abc
|
||||
|
||||
from capa.helpers import oint
|
||||
|
||||
|
||||
class FeatureExtractor(object):
|
||||
"""
|
||||
@@ -35,6 +37,12 @@ class FeatureExtractor(object):
|
||||
#
|
||||
super(FeatureExtractor, self).__init__()
|
||||
|
||||
def block_offset(self, bb):
|
||||
return oint(bb)
|
||||
|
||||
def function_offset(self, f):
|
||||
return oint(f)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_base_address(self):
|
||||
"""
|
||||
|
||||
107
capa/features/extractors/miasm/__init__.py
Normal file
107
capa/features/extractors/miasm/__init__.py
Normal file
@@ -0,0 +1,107 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import miasm.analysis.binary
|
||||
import miasm.analysis.machine
|
||||
from miasm.core.locationdb import LocationDB
|
||||
|
||||
import capa.features.extractors.miasm.file
|
||||
import capa.features.extractors.miasm.insn
|
||||
import capa.features.extractors.miasm.function
|
||||
import capa.features.extractors.miasm.basicblock
|
||||
from capa.features.extractors import FeatureExtractor
|
||||
|
||||
|
||||
class MiasmFeatureExtractor(FeatureExtractor):
|
||||
def __init__(self, buf):
|
||||
super(MiasmFeatureExtractor, self).__init__()
|
||||
self.buf = buf
|
||||
self.loc_db = LocationDB()
|
||||
self.container = miasm.analysis.binary.Container.from_string(buf, self.loc_db)
|
||||
self.pe = self.container.executable
|
||||
self.machine = miasm.analysis.machine.Machine(self.container.arch)
|
||||
self.cfg = self._build_cfg()
|
||||
|
||||
def get_base_address(self):
|
||||
return self.container.entry_point
|
||||
|
||||
def extract_file_features(self):
|
||||
for feature, va in capa.features.extractors.miasm.file.extract_file_features(self):
|
||||
yield feature, va
|
||||
|
||||
# TODO: Improve this function (it just considers all loc_keys target of calls a function), port to miasm
|
||||
def get_functions(self):
|
||||
"""
|
||||
returns all loc_keys which are the argument of any call function
|
||||
"""
|
||||
functions = set()
|
||||
|
||||
for block in self.cfg.blocks:
|
||||
for line in block.lines:
|
||||
if line.is_subcall() and line.args[0].is_loc():
|
||||
loc_key = line.args[0].loc_key
|
||||
if loc_key not in functions:
|
||||
functions.add(loc_key)
|
||||
yield loc_key
|
||||
|
||||
def extract_function_features(self, loc_key):
|
||||
for feature, va in capa.features.extractors.miasm.function.extract_features(self, loc_key):
|
||||
yield feature, va
|
||||
|
||||
def block_offset(self, bb):
|
||||
return bb.lines[0].offset
|
||||
|
||||
def function_offset(self, f):
|
||||
return self.cfg.loc_key_to_block(f).lines[0].offset
|
||||
|
||||
def get_basic_blocks(self, loc_key):
|
||||
"""
|
||||
get the basic blocks of the function represented by lock_key
|
||||
"""
|
||||
block = self.cfg.loc_key_to_block(loc_key)
|
||||
disassembler = self.machine.dis_engine(self.container.bin_stream, loc_db=self.loc_db, follow_call=False)
|
||||
cfg = disassembler.dis_multiblock(self.block_offset(block))
|
||||
return cfg.blocks
|
||||
|
||||
def extract_basic_block_features(self, _, bb):
|
||||
for feature, va in capa.features.extractors.miasm.basicblock.extract_features(bb):
|
||||
yield feature, va
|
||||
|
||||
def get_instructions(self, _, bb):
|
||||
return bb.lines
|
||||
|
||||
def extract_insn_features(self, f, bb, insn):
|
||||
for feature, va in capa.features.extractors.miasm.insn.extract_features(self, f, bb, insn):
|
||||
yield feature, va
|
||||
|
||||
def _get_entry_points(self):
|
||||
entry_points = {self.get_base_address()}
|
||||
|
||||
for _, va in miasm.jitter.loader.pe.get_export_name_addr_list(self.pe):
|
||||
entry_points.add(va)
|
||||
|
||||
return entry_points
|
||||
|
||||
# This is more efficient that using the `blocks` argument in `dis_multiblock`
|
||||
# See http://www.williballenthin.com/post/2020-01-12-miasm-part-2
|
||||
# TODO: port this efficiency improvement to miasm
|
||||
def _build_cfg(self):
|
||||
loc_db = self.container.loc_db
|
||||
disassembler = self.machine.dis_engine(self.container.bin_stream, follow_call=True, loc_db=loc_db)
|
||||
job_done = set()
|
||||
cfgs = {}
|
||||
|
||||
for va in self._get_entry_points():
|
||||
cfgs[va] = disassembler.dis_multiblock(va, job_done=job_done)
|
||||
|
||||
complete_cfs = miasm.core.asmblock.AsmCFG(loc_db)
|
||||
for cfg in cfgs.values():
|
||||
complete_cfs.merge(cfg)
|
||||
|
||||
disassembler.apply_splitting(complete_cfs)
|
||||
return complete_cfs
|
||||
134
capa/features/extractors/miasm/basicblock.py
Normal file
134
capa/features/extractors/miasm/basicblock.py
Normal file
@@ -0,0 +1,134 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import sys
|
||||
import string
|
||||
import struct
|
||||
|
||||
from capa.features import Characteristic
|
||||
from capa.features.basicblock import BasicBlock
|
||||
from capa.features.extractors.helpers import MIN_STACKSTRING_LEN
|
||||
|
||||
|
||||
# TODO: Avoid this duplication (this code is in __init__ as well)
|
||||
def block_offset(bb):
|
||||
return bb.lines[0].offset
|
||||
|
||||
|
||||
def extract_bb_tight_loop(bb):
|
||||
""" check basic block for tight loop indicators """
|
||||
if any(c.loc_key == bb.loc_key for c in bb.bto):
|
||||
yield Characteristic("tight loop"), block_offset(bb)
|
||||
|
||||
|
||||
def is_mov_imm_to_stack(instr):
|
||||
"""
|
||||
Return if instruction moves immediate onto stack
|
||||
"""
|
||||
if not instr.name.startswith("MOV"):
|
||||
return False
|
||||
|
||||
try:
|
||||
dst, src = instr.args
|
||||
except ValueError:
|
||||
# not two operands
|
||||
return False
|
||||
|
||||
if not src.is_int():
|
||||
return False
|
||||
|
||||
if not dst.is_mem():
|
||||
return False
|
||||
|
||||
# should detect things like `@8[ESP + 0x8]` and `EBP` and not fail in other cases
|
||||
if any(register in str(dst) for register in ["EBP", "RBP", "ESP", "RSP"]):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def is_printable_ascii(chars):
|
||||
if sys.version_info >= (3, 0):
|
||||
return all(c < 127 and chr(c) in string.printable for c in chars)
|
||||
else:
|
||||
return all(ord(c) < 127 and c in string.printable for c in chars)
|
||||
|
||||
|
||||
def is_printable_utf16le(chars):
|
||||
if all(c == b"\x00" for c in chars[1::2]):
|
||||
return is_printable_ascii(chars[::2])
|
||||
|
||||
|
||||
def get_printable_len(insn):
|
||||
"""
|
||||
Return string length if all operand bytes are ascii or utf16-le printable
|
||||
"""
|
||||
dst, src = insn.args
|
||||
|
||||
if not src.is_int():
|
||||
return ValueError("unexpected operand type")
|
||||
|
||||
if not dst.is_mem():
|
||||
return ValueError("unexpected operand type")
|
||||
|
||||
if isinstance(src.arg, int):
|
||||
val = src.arg
|
||||
else:
|
||||
val = src.arg.arg
|
||||
|
||||
size = (val.bit_length() + 7) // 8
|
||||
|
||||
if size == 0:
|
||||
return 0
|
||||
elif size == 1:
|
||||
chars = struct.pack("<B", val)
|
||||
elif size == 2:
|
||||
chars = struct.pack("<H", val)
|
||||
elif size == 4:
|
||||
chars = struct.pack("<I", val)
|
||||
elif size == 8:
|
||||
chars = struct.pack("<Q", val)
|
||||
|
||||
if is_printable_ascii(chars):
|
||||
return size
|
||||
|
||||
if is_printable_utf16le(chars):
|
||||
return size / 2
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def extract_stackstring(bb):
|
||||
""" check basic block for stackstring indicators """
|
||||
count = 0
|
||||
for line in bb.lines:
|
||||
if is_mov_imm_to_stack(line):
|
||||
count += get_printable_len(line)
|
||||
if count > MIN_STACKSTRING_LEN:
|
||||
yield Characteristic("stack string"), block_offset(bb)
|
||||
return
|
||||
|
||||
|
||||
def extract_features(bb):
|
||||
"""
|
||||
extract features from the given basic block.
|
||||
args:
|
||||
bb (miasm.core.asmblock.AsmBlock): the basic block to process.
|
||||
yields:
|
||||
Feature, set[VA]: the features and their location found in this basic block.
|
||||
"""
|
||||
yield BasicBlock(), block_offset(bb)
|
||||
for bb_handler in BASIC_BLOCK_HANDLERS:
|
||||
for feature, va in bb_handler(bb):
|
||||
yield feature, va
|
||||
|
||||
|
||||
BASIC_BLOCK_HANDLERS = (
|
||||
extract_bb_tight_loop,
|
||||
extract_stackstring,
|
||||
)
|
||||
102
capa/features/extractors/miasm/file.py
Normal file
102
capa/features/extractors/miasm/file.py
Normal file
@@ -0,0 +1,102 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import re
|
||||
|
||||
import miasm.analysis.binary
|
||||
|
||||
import capa.features.extractors.strings
|
||||
from capa.features import String, Characteristic
|
||||
from capa.features.file import Export, Import, Section
|
||||
|
||||
|
||||
def extract_file_embedded_pe(extractor):
|
||||
"""
|
||||
extract embedded PE features
|
||||
"""
|
||||
buf = extractor.buf
|
||||
for match in re.finditer(b"MZ", buf):
|
||||
offset = match.start()
|
||||
subcontainer = miasm.analysis.binary.ContainerPE.from_string(buf[offset:], loc_db=extractor.loc_db)
|
||||
if isinstance(subcontainer, miasm.analysis.binary.ContainerPE):
|
||||
yield Characteristic("embedded pe"), offset
|
||||
|
||||
|
||||
def extract_file_export_names(extractor):
|
||||
"""
|
||||
extract file exports and their addresses
|
||||
"""
|
||||
for symbol, va in miasm.jitter.loader.pe.get_export_name_addr_list(extractor.pe):
|
||||
# Only use func names and not ordinals
|
||||
if isinstance(symbol, str):
|
||||
yield Export(symbol), va
|
||||
|
||||
|
||||
def extract_file_import_names(extractor):
|
||||
"""
|
||||
extract imported function names and their addresses
|
||||
1. imports by ordinal:
|
||||
- modulename.#ordinal
|
||||
2. imports by name, results in two features to support importname-only matching:
|
||||
- modulename.importname
|
||||
- importname
|
||||
"""
|
||||
for ((dll, symbol), va_set) in miasm.jitter.loader.pe.get_import_address_pe(extractor.pe).items():
|
||||
dll_name = dll[:-4] # Remove .dll
|
||||
for va in va_set:
|
||||
if isinstance(symbol, int):
|
||||
yield Import("%s.#%s" % (dll_name, symbol)), va
|
||||
else:
|
||||
yield Import("%s.%s" % (dll_name, symbol)), va
|
||||
yield Import(symbol), va
|
||||
|
||||
|
||||
def extract_file_section_names(extractor):
|
||||
"""
|
||||
extract file sections and their addresses
|
||||
"""
|
||||
for section in extractor.pe.SHList.shlist:
|
||||
name = section.name.partition(b"\x00")[0].decode("ascii")
|
||||
va = section.addr
|
||||
yield Section(name), va
|
||||
|
||||
|
||||
def extract_file_strings(extractor):
|
||||
"""
|
||||
extract ASCII and UTF-16 LE strings from file
|
||||
"""
|
||||
for s in capa.features.extractors.strings.extract_ascii_strings(extractor.buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
for s in capa.features.extractors.strings.extract_unicode_strings(extractor.buf):
|
||||
yield String(s.s), s.offset
|
||||
|
||||
|
||||
def extract_file_features(extractor):
|
||||
"""
|
||||
extract file features from given buffer and parsed binary
|
||||
|
||||
args:
|
||||
buf (bytes): binary content
|
||||
container (miasm.analysis.binary.ContainerPE): parsed binary returned by miasm
|
||||
|
||||
yields:
|
||||
Tuple[Feature, VA]: a feature and its location.
|
||||
"""
|
||||
for file_handler in FILE_HANDLERS:
|
||||
for feature, va in file_handler(extractor):
|
||||
yield feature, va
|
||||
|
||||
|
||||
FILE_HANDLERS = (
|
||||
extract_file_embedded_pe,
|
||||
extract_file_export_names,
|
||||
extract_file_import_names,
|
||||
extract_file_section_names,
|
||||
extract_file_strings,
|
||||
)
|
||||
50
capa/features/extractors/miasm/function.py
Normal file
50
capa/features/extractors/miasm/function.py
Normal file
@@ -0,0 +1,50 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from capa.features import Characteristic
|
||||
|
||||
|
||||
def extract_function_calls_to(extractor, loc_key):
|
||||
for pred_key in extractor.cfg.predecessors(loc_key):
|
||||
pred_block = extractor.cfg.loc_key_to_block(pred_key)
|
||||
pred_insn = pred_block.get_subcall_instr()
|
||||
if pred_insn and pred_insn.is_subcall():
|
||||
dst = pred_insn.args[0]
|
||||
if dst.is_loc() and dst.loc_key == loc_key:
|
||||
yield Characteristic("calls to"), pred_insn.offset
|
||||
|
||||
|
||||
def extract_function_loop(extractor, loc_key):
|
||||
"""
|
||||
returns if the function has a loop
|
||||
"""
|
||||
block = extractor.cfg.loc_key_to_block(loc_key)
|
||||
disassembler = extractor.machine.dis_engine(
|
||||
extractor.container.bin_stream, loc_db=extractor.loc_db, follow_call=False
|
||||
)
|
||||
offset = extractor.block_offset(block)
|
||||
cfg = disassembler.dis_multiblock(offset)
|
||||
if cfg.has_loop():
|
||||
yield Characteristic("loop"), offset
|
||||
|
||||
|
||||
def extract_features(extractor, loc_key):
|
||||
"""
|
||||
extract features from the given function.
|
||||
args:
|
||||
cfg (AsmCFG): the CFG of the function from which to extract features
|
||||
loc_key (LocKey): LocKey which represents the beginning of the function
|
||||
yields:
|
||||
Feature, set[VA]: the features and their location found in this function.
|
||||
"""
|
||||
for func_handler in FUNCTION_HANDLERS:
|
||||
for feature, va in func_handler(extractor, loc_key):
|
||||
yield feature, va
|
||||
|
||||
|
||||
FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop)
|
||||
126
capa/features/extractors/miasm/insn.py
Normal file
126
capa/features/extractors/miasm/insn.py
Normal file
@@ -0,0 +1,126 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import miasm.expression.expression
|
||||
|
||||
import capa.features.extractors.helpers
|
||||
from capa.features.insn import Mnemonic
|
||||
|
||||
|
||||
# TODO: remove duplication (similar code in file.py)
|
||||
# TODO: this function should be cached
|
||||
def get_imports(pe):
|
||||
imports = {}
|
||||
for ((dll, symbol), va_set) in miasm.jitter.loader.pe.get_import_address_pe(pe).items():
|
||||
dll_name = dll[:-4]
|
||||
for va in va_set:
|
||||
if isinstance(symbol, int):
|
||||
imports[va] = "%s.#%s" % (dll_name, symbol)
|
||||
else:
|
||||
imports[va] = "%s.%s" % (dll_name, symbol)
|
||||
return imports
|
||||
|
||||
|
||||
def extract_insn_api_features(extractor, _f, _bb, insn):
|
||||
"""parse API features from the given instruction."""
|
||||
if insn.is_subcall():
|
||||
arg = insn.args[0]
|
||||
if isinstance(arg, miasm.expression.expression.ExprMem) and isinstance(
|
||||
arg.ptr, miasm.expression.expression.ExprInt
|
||||
):
|
||||
target = int(arg.ptr)
|
||||
imports = get_imports(extractor.pe)
|
||||
if target in imports:
|
||||
dll, _, symbol = imports[target].rpartition(".")
|
||||
for feature in capa.features.extractors.helpers.generate_symbols(dll, symbol):
|
||||
yield feature, insn.offset
|
||||
|
||||
|
||||
def extract_insn_number_features(extractor, f, bb, insn):
|
||||
"""parse number features from the given instruction."""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_string_features(extractor, f, bb, insn):
|
||||
"""parse string features from the given instruction."""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_offset_features(extractor, f, bb, insn):
|
||||
"""parse structure offset features from the given instruction."""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_nzxor_characteristic_features(extractor, f, bb, insn):
|
||||
"""
|
||||
parse non-zeroing XOR instruction from the given instruction.
|
||||
ignore expected non-zeroing XORs, e.g. security cookies.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_mnemonic_features(extractor, f, bb, insn):
|
||||
"""parse mnemonic features from the given instruction."""
|
||||
yield Mnemonic(insn.name), insn.offset
|
||||
|
||||
|
||||
def extract_insn_peb_access_characteristic_features(extractor, f, bb, insn):
|
||||
"""
|
||||
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_segment_access_features(extractor, f, bb, insn):
|
||||
""" parse the instruction for access to fs or gs """
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_insn_cross_section_cflow(extractor, f, bb, insn):
|
||||
"""
|
||||
inspect the instruction for a CALL or JMP that crosses section boundaries.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
# this is a feature that's most relevant at the function scope,
|
||||
# however, its most efficient to extract at the instruction scope.
|
||||
def extract_function_calls_from(f, bb, insn):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def extract_features(extractor, f, bb, insn):
|
||||
"""
|
||||
extract features from the given insn.
|
||||
args:
|
||||
extractor (MiasmFeatureExtractor)
|
||||
f (miasm.expression.expression.LocKey): the function from which to extract features
|
||||
bb (miasm.core.asmblock.AsmBlock): the basic block to process.
|
||||
insn (Instruction): the instruction to process.
|
||||
yields:
|
||||
Feature, set[VA]: the features and their location found in this insn.
|
||||
"""
|
||||
for insn_handler in INSTRUCTION_HANDLERS:
|
||||
for feature, va in insn_handler(extractor, f, bb, insn):
|
||||
yield feature, va
|
||||
|
||||
|
||||
INSTRUCTION_HANDLERS = (
|
||||
extract_insn_api_features,
|
||||
# extract_insn_number_features,
|
||||
# extract_insn_string_features,
|
||||
# extract_insn_bytes_features,
|
||||
# extract_insn_offset_features,
|
||||
# extract_insn_nzxor_characteristic_features,
|
||||
extract_insn_mnemonic_features,
|
||||
# extract_insn_peb_access_characteristic_features,
|
||||
# extract_insn_cross_section_cflow,
|
||||
# extract_insn_segment_access_features,
|
||||
# extract_function_calls_from,
|
||||
# extract_function_indirect_call_characteristic_features,
|
||||
)
|
||||
@@ -37,4 +37,4 @@ class Offset(Feature):
|
||||
|
||||
class Mnemonic(Feature):
|
||||
def __init__(self, value, description=None):
|
||||
super(Mnemonic, self).__init__(value, description=description)
|
||||
super(Mnemonic, self).__init__(value.lower(), description=description)
|
||||
|
||||
38
capa/main.py
38
capa/main.py
@@ -29,7 +29,7 @@ import capa.version
|
||||
import capa.features
|
||||
import capa.features.freeze
|
||||
import capa.features.extractors
|
||||
from capa.helpers import oint, get_file_taste
|
||||
from capa.helpers import get_file_taste
|
||||
|
||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||
SUPPORTED_FILE_MAGIC = set(["MZ"])
|
||||
@@ -72,14 +72,14 @@ def find_function_capabilities(ruleset, extractor, f):
|
||||
bb_features[feature].add(va)
|
||||
function_features[feature].add(va)
|
||||
|
||||
_, matches = capa.engine.match(ruleset.basic_block_rules, bb_features, oint(bb))
|
||||
_, matches = capa.engine.match(ruleset.basic_block_rules, bb_features, extractor.block_offset(bb))
|
||||
|
||||
for rule_name, res in matches.items():
|
||||
bb_matches[rule_name].extend(res)
|
||||
for va, _ in res:
|
||||
function_features[capa.features.MatchedRule(rule_name)].add(va)
|
||||
|
||||
_, function_matches = capa.engine.match(ruleset.function_rules, function_features, oint(f))
|
||||
_, function_matches = capa.engine.match(ruleset.function_rules, function_features, extractor.function_offset(f))
|
||||
return function_matches, bb_matches, len(function_features)
|
||||
|
||||
|
||||
@@ -123,8 +123,8 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
|
||||
|
||||
for f in pbar(list(extractor.get_functions()), desc="matching", unit=" functions"):
|
||||
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
|
||||
meta["feature_counts"]["functions"][f.__int__()] = feature_count
|
||||
logger.debug("analyzed function 0x%x and extracted %d features", f.__int__(), feature_count)
|
||||
meta["feature_counts"]["functions"][extractor.function_offset(f)] = feature_count
|
||||
logger.debug("analyzed function 0x%x and extracted %d features", extractor.function_offset(f), feature_count)
|
||||
|
||||
for rule_name, res in function_matches.items():
|
||||
all_function_matches[rule_name].extend(res)
|
||||
@@ -304,19 +304,27 @@ class UnsupportedRuntimeError(RuntimeError):
|
||||
|
||||
|
||||
def get_extractor_py3(path, format, disable_progress=False):
|
||||
from smda.SmdaConfig import SmdaConfig
|
||||
from smda.Disassembler import Disassembler
|
||||
if False: # TODO: How to decide which backend to use?
|
||||
from smda.SmdaConfig import SmdaConfig
|
||||
from smda.Disassembler import Disassembler
|
||||
|
||||
import capa.features.extractors.smda
|
||||
import capa.features.extractors.smda
|
||||
|
||||
smda_report = None
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
config = SmdaConfig()
|
||||
config.STORE_BUFFER = True
|
||||
smda_disasm = Disassembler(config)
|
||||
smda_report = smda_disasm.disassembleFile(path)
|
||||
smda_report = None
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
config = SmdaConfig()
|
||||
config.STORE_BUFFER = True
|
||||
smda_disasm = Disassembler(config)
|
||||
smda_report = smda_disasm.disassembleFile(path)
|
||||
|
||||
return capa.features.extractors.smda.SmdaFeatureExtractor(smda_report, path)
|
||||
return capa.features.extractors.smda.SmdaFeatureExtractor(smda_report, path)
|
||||
else:
|
||||
import capa.features.extractors.miasm
|
||||
|
||||
with open(path, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
return capa.features.extractors.miasm.MiasmFeatureExtractor(buf)
|
||||
|
||||
|
||||
def get_extractor(path, format, disable_progress=False):
|
||||
|
||||
@@ -108,6 +108,17 @@ def get_smda_extractor(path):
|
||||
return capa.features.extractors.smda.SmdaFeatureExtractor(report, path)
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_miasm_extractor(path):
|
||||
import capa.features.extractors.miasm
|
||||
|
||||
with open(path, "rb") as f:
|
||||
buf = f.read()
|
||||
|
||||
print("Using miasm!!!!")
|
||||
return capa.features.extractors.miasm.MiasmFeatureExtractor(buf)
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def extract_file_features(extractor):
|
||||
features = collections.defaultdict(set)
|
||||
@@ -242,14 +253,14 @@ def sample(request):
|
||||
|
||||
def get_function(extractor, fva):
|
||||
for f in extractor.get_functions():
|
||||
if f.__int__() == fva:
|
||||
if extractor.function_offset(f) == fva:
|
||||
return f
|
||||
raise ValueError("function not found")
|
||||
|
||||
|
||||
def get_basic_block(extractor, f, va):
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
if bb.__int__() == va:
|
||||
if extractor.block_offset(bb) == va:
|
||||
return bb
|
||||
raise ValueError("basic block not found")
|
||||
|
||||
@@ -521,7 +532,10 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected):
|
||||
|
||||
def get_extractor(path):
|
||||
if sys.version_info >= (3, 0):
|
||||
extractor = get_smda_extractor(path)
|
||||
if False: # TODO: How to decide which backend to use?
|
||||
extractor = get_smda_extractor(path)
|
||||
else:
|
||||
extractor = get_miasm_extractor(path)
|
||||
else:
|
||||
extractor = get_viv_extractor(path)
|
||||
|
||||
|
||||
29
tests/test_miasm_features.py
Normal file
29
tests/test_miasm_features.py
Normal file
@@ -0,0 +1,29 @@
|
||||
# Copyright (C) 2020 FireEye, Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: https://github.com/fireeye/capa/blob/master/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import sys
|
||||
|
||||
from fixtures import *
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
FEATURE_PRESENCE_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_miasm_features(sample, scope, feature, expected):
|
||||
do_test_feature_presence(get_miasm_extractor, sample, scope, feature, expected)
|
||||
|
||||
|
||||
@parametrize(
|
||||
"sample,scope,feature,expected",
|
||||
FEATURE_COUNT_TESTS,
|
||||
indirect=["sample", "scope"],
|
||||
)
|
||||
def test_miasm_feature_counts(sample, scope, feature, expected):
|
||||
do_test_feature_count(get_miasm_extractor, sample, scope, feature, expected)
|
||||
Reference in New Issue
Block a user