extractor: viv: use handles throughout

This commit is contained in:
Willi Ballenthin
2022-04-08 11:54:27 -06:00
parent fc1709ba6c
commit bfb01e3729
5 changed files with 125 additions and 79 deletions

View File

@@ -49,8 +49,8 @@ def _bb_has_tight_loop(f, bb):
def extract_bb_tight_loop(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""check basic block for tight loop indicators"""
if _bb_has_tight_loop(f, bb):
yield Characteristic("tight loop"), bb.va
if _bb_has_tight_loop(f, bb.inner):
yield Characteristic("tight loop"), bb.address
def _bb_has_stackstring(f, bb):
@@ -72,8 +72,8 @@ def _bb_has_stackstring(f, bb):
def extract_stackstring(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]:
"""check basic block for stackstring indicators"""
if _bb_has_stackstring(f, bb):
yield Characteristic("stack string"), bb.va
if _bb_has_stackstring(f, bb.inner):
yield Characteristic("stack string"), bb.address
def is_mov_imm_to_stack(instr: envi.archs.i386.disasm.i386Opcode) -> bool:
@@ -157,7 +157,7 @@ def extract_features(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature,
yields:
Tuple[Feature, int]: the features and their location found in this basic block.
"""
yield BasicBlock(), bb.va
yield BasicBlock(), AbsoluteVirtualAddress(bb.inner.va)
for bb_handler in BASIC_BLOCK_HANDLERS:
for feature, addr in bb_handler(f, bb):
yield feature, addr

View File

@@ -62,26 +62,29 @@ class VivisectFeatureExtractor(FeatureExtractor):
def get_functions(self):
for va in sorted(self.vw.getFunctions()):
yield viv_utils.Function(self.vw, va)
yield FunctionHandle(address=AbsoluteVirtualAddress(va), inner=viv_utils.Function(self.vw, va))
def extract_function_features(self, f):
yield from capa.features.extractors.viv.function.extract_features(f)
def get_basic_blocks(self, f):
return f.basic_blocks
def get_basic_blocks(self, fh: FunctionHandle):
f: viv_utils.Function = fh.inner
for bb in f.basic_blocks:
yield BBHandle(address=AbsoluteVirtualAddress(bb.va), inner=bb)
def extract_basic_block_features(self, f, bb):
yield from capa.features.extractors.viv.basicblock.extract_features(f, bb)
def get_instructions(self, f, bb):
def get_instructions(self, f, bbh: BBHandle):
bb: viv_utils.BasicBlock = bbh.inner
for insn in bb.instructions:
yield InstructionHandle(insn)
yield InsnHandle(address=AbsoluteVirtualAddress(insn.va), inner=insn)
def extract_insn_features(self, f, bb, insn):
yield from capa.features.extractors.viv.insn.extract_features(f, bb, insn)
def is_library_function(self, va):
return viv_utils.flirt.is_library_function(self.vw, va)
def is_library_function(self, addr):
return viv_utils.flirt.is_library_function(self.vw, addr)
def get_function_name(self, va):
return viv_utils.get_function_name(self.vw, va)
def get_function_name(self, addr):
return viv_utils.get_function_name(self.vw, addr)

View File

@@ -31,14 +31,17 @@ def interface_extract_function_XXX(f: FunctionHandle) -> Iterator[Tuple[Feature,
def extract_function_calls_to(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
f: viv_utils.Function = fhandle.inner
for src, _, _, _ in f.vw.getXrefsTo(f.va, rtype=vivisect.const.REF_CODE):
yield Characteristic("calls to"), src
yield Characteristic("calls to"), fhandle.address
def extract_function_loop(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse if a function has a loop
"""
f: viv_utils.Function = fhandle.inner
edges = []
for bb in f.basic_blocks:
@@ -54,7 +57,7 @@ def extract_function_loop(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Ad
edges.append((bb.va, bva))
if edges and loops.has_loop(edges):
yield Characteristic("loop"), f.va
yield Characteristic("loop"), fhandle.address
def extract_features(f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import List, Tuple, Callable, Iterator
import envi
import envi.exc
import viv_utils
@@ -63,8 +64,10 @@ def get_imports(vw):
return imports
def extract_insn_api_features(f, bb, insn):
def extract_insn_api_features(fhandle: FunctionHandle, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""parse API features from the given instruction."""
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
# example:
#
@@ -85,7 +88,7 @@ def extract_insn_api_features(f, bb, insn):
if target in imports:
dll, symbol = imports[target]
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield API(name), insn.va
yield API(name), ihandle.address
# call via thunk on x86,
# see 9324d1a8ae37a36ae560c37448c9705a at 0x407985
@@ -107,20 +110,20 @@ def extract_insn_api_features(f, bb, insn):
if viv_utils.flirt.is_library_function(f.vw, target):
name = viv_utils.get_function_name(f.vw, target)
yield API(name), insn.va
yield API(name), ihandle.address
if name.startswith("_"):
# some linkers may prefix linked routines with a `_` to avoid name collisions.
# extract features for both the mangled and un-mangled representations.
# e.g. `_fwrite` -> `fwrite`
# see: https://stackoverflow.com/a/2628384/87207
yield API(name[1:]), insn.va
yield API(name[1:]), ihandle.address
return
for _ in range(THUNK_CHAIN_DEPTH_DELTA):
if target in imports:
dll, symbol = imports[target]
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield API(name), insn.va
yield API(name), ihandle.address
# if jump leads to an ENDBRANCH instruction, skip it
if f.vw.getByteDef(target)[1].startswith(b"\xf3\x0f\x1e"):
@@ -140,7 +143,7 @@ def extract_insn_api_features(f, bb, insn):
if target in imports:
dll, symbol = imports[target]
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield API(name), insn.va
yield API(name), ihandle.address
elif isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegOper):
try:
@@ -157,7 +160,7 @@ def extract_insn_api_features(f, bb, insn):
if target in imports:
dll, symbol = imports[target]
for name in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield API(name), insn.va
yield API(name), ihandle.address
def derefs(vw, p):
@@ -230,12 +233,15 @@ def read_bytes(vw, va: int) -> bytes:
raise
def extract_insn_bytes_features(f, bb, insn):
def extract_insn_bytes_features(fhandle: FunctionHandle, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse byte sequence features from the given instruction.
example:
# push offset iid_004118d4_IShellLinkA ; riid
"""
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
if insn.mnem == "call":
return
@@ -264,7 +270,7 @@ def extract_insn_bytes_features(f, bb, insn):
if capa.features.extractors.helpers.all_zeros(buf):
continue
yield Bytes(buf), insn.va
yield Bytes(buf), ihandle.address
def read_string(vw, offset: int) -> str:
@@ -326,11 +332,17 @@ def is_security_cookie(f, bb, insn) -> bool:
return False
def extract_insn_nzxor_characteristic_features(f, bb, insn):
def extract_insn_nzxor_characteristic_features(
fhandle: FunctionHandle, bbhandle: BBHandle, ihandle: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse non-zeroing XOR instruction from the given instruction.
ignore expected non-zeroing XORs, e.g. security cookies.
"""
insn: envi.Opcode = ihandle.inner
bb: viv_utils.BasicBlock = bbhandle.inner
f: viv_utils.Function = fhandle.inner
if insn.mnem not in ("xor", "xorpd", "xorps", "pxor"):
return
@@ -340,37 +352,42 @@ def extract_insn_nzxor_characteristic_features(f, bb, insn):
if is_security_cookie(f, bb, insn):
return
yield Characteristic("nzxor"), insn.va
yield Characteristic("nzxor"), ihandle.address
def extract_insn_mnemonic_features(f, bb, insn):
def extract_insn_mnemonic_features(f, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""parse mnemonic features from the given instruction."""
yield Mnemonic(insn.mnem), insn.va
yield Mnemonic(ihandle.inner.mnem), ihandle.address
def extract_insn_obfs_call_plus_5_characteristic_features(f, bb, insn):
def extract_insn_obfs_call_plus_5_characteristic_features(
f, bb, ihandle: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
parse call $+5 instruction from the given instruction.
"""
insn: envi.Opcode = ihandle.inner
if insn.mnem != "call":
return
if isinstance(insn.opers[0], envi.archs.i386.disasm.i386PcRelOper):
if insn.va + 5 == insn.opers[0].getOperValue(insn):
yield Characteristic("call $+5"), insn.va
yield Characteristic("call $+5"), ihandle.address
if isinstance(insn.opers[0], envi.archs.i386.disasm.i386ImmMemOper) or isinstance(
insn.opers[0], envi.archs.amd64.disasm.Amd64RipRelOper
):
if insn.va + 5 == insn.opers[0].getOperAddr(insn):
yield Characteristic("call $+5"), insn.va
yield Characteristic("call $+5"), ihandle.address
def extract_insn_peb_access_characteristic_features(f, bb, insn):
def extract_insn_peb_access_characteristic_features(f, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""
parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64
"""
# TODO handle where fs/gs are loaded into a register or onto the stack and used later
insn: envi.Opcode = ihandle.inner
if insn.mnem not in ["push", "mov"]:
return
@@ -389,7 +406,7 @@ def extract_insn_peb_access_characteristic_features(f, bb, insn):
if (isinstance(oper, envi.archs.i386.disasm.i386RegMemOper) and oper.disp == 0x30) or (
isinstance(oper, envi.archs.i386.disasm.i386ImmMemOper) and oper.imm == 0x30
):
yield Characteristic("peb access"), insn.va
yield Characteristic("peb access"), ihandle.address
elif "gs" in prefix:
for oper in insn.opers:
if (
@@ -397,20 +414,22 @@ def extract_insn_peb_access_characteristic_features(f, bb, insn):
or (isinstance(oper, envi.archs.amd64.disasm.i386SibOper) and oper.imm == 0x60)
or (isinstance(oper, envi.archs.amd64.disasm.i386ImmMemOper) and oper.imm == 0x60)
):
yield Characteristic("peb access"), insn.va
yield Characteristic("peb access"), ihandle.address
else:
pass
def extract_insn_segment_access_features(f, bb, insn):
def extract_insn_segment_access_features(f, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
"""parse the instruction for access to fs or gs"""
insn: envi.Opcode = ihandle.inner
prefix = insn.getPrefixName()
if prefix == "fs":
yield Characteristic("fs access"), insn.va
yield Characteristic("fs access"), ihandle.address
if prefix == "gs":
yield Characteristic("gs access"), insn.va
yield Characteristic("gs access"), ihandle.address
def get_section(vw, va: int):
@@ -421,10 +440,15 @@ def get_section(vw, va: int):
raise KeyError(va)
def extract_insn_cross_section_cflow(f, bb, insn):
def extract_insn_cross_section_cflow(
fhandle: FunctionHandle, bb, ihandle: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
inspect the instruction for a CALL or JMP that crosses section boundaries.
"""
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
for va, flags in insn.getBranches():
if va is None:
# va may be none for dynamic branches that haven't been resolved, such as `jmp eax`.
@@ -451,7 +475,7 @@ def extract_insn_cross_section_cflow(f, bb, insn):
continue
if get_section(f.vw, insn.va) != get_section(f.vw, va):
yield Characteristic("cross section flow"), insn.va
yield Characteristic("cross section flow"), ihandle.address
except KeyError:
continue
@@ -459,7 +483,10 @@ def extract_insn_cross_section_cflow(f, bb, insn):
# this is a feature that's most relevant at the function scope,
# however, its most efficient to extract at the instruction scope.
def extract_function_calls_from(f, bb, insn):
def extract_function_calls_from(fhandle: FunctionHandle, bb, ihandle: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
if insn.mnem != "call":
return
@@ -469,7 +496,7 @@ def extract_function_calls_from(f, bb, insn):
if isinstance(insn.opers[0], envi.archs.i386.disasm.i386ImmMemOper):
oper = insn.opers[0]
target = oper.getOperAddr(insn)
yield Characteristic("calls from"), target
yield Characteristic("calls from"), AbsoluteVirtualAddress(target)
# call via thunk on x86,
# see 9324d1a8ae37a36ae560c37448c9705a at 0x407985
@@ -478,47 +505,55 @@ def extract_function_calls_from(f, bb, insn):
# see Lab21-01.exe_:0x140001178
elif isinstance(insn.opers[0], envi.archs.i386.disasm.i386PcRelOper):
target = insn.opers[0].getOperValue(insn)
yield Characteristic("calls from"), target
yield Characteristic("calls from"), AbsoluteVirtualAddress(target)
# call via IAT, x64
elif isinstance(insn.opers[0], envi.archs.amd64.disasm.Amd64RipRelOper):
op = insn.opers[0]
target = op.getOperAddr(insn)
yield Characteristic("calls from"), target
yield Characteristic("calls from"), AbsoluteVirtualAddress(target)
if target and target == f.va:
# if we found a jump target and it's the function address
# mark as recursive
yield Characteristic("recursive call"), target
yield Characteristic("recursive call"), AbsoluteVirtualAddress(target)
# this is a feature that's most relevant at the function or basic block scope,
# however, its most efficient to extract at the instruction scope.
def extract_function_indirect_call_characteristic_features(f, bb, insn):
def extract_function_indirect_call_characteristic_features(
f, bb, ihandle: InsnHandle
) -> Iterator[Tuple[Feature, Address]]:
"""
extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4])
does not include calls like => call ds:dword_ABD4974
"""
insn: envi.Opcode = ihandle.inner
if insn.mnem != "call":
return
# Checks below work for x86 and x64
if isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegOper):
# call edx
yield Characteristic("indirect call"), insn.va
yield Characteristic("indirect call"), ihandle.address
elif isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegMemOper):
# call dword ptr [eax+50h]
yield Characteristic("indirect call"), insn.va
yield Characteristic("indirect call"), ihandle.address
elif isinstance(insn.opers[0], envi.archs.i386.disasm.i386SibOper):
# call qword ptr [rsp+78h]
yield Characteristic("indirect call"), insn.va
yield Characteristic("indirect call"), ihandle.address
def extract_op_number_features(f, bb, insn, i, oper):
def extract_op_number_features(
fhandle: FunctionHandle, bb, ihandle: InsnHandle, i, oper: envi.Operand
) -> Iterator[Tuple[Feature, Address]]:
"""parse number features from the given operand."""
# example:
#
# push 3136B0h ; dwControlCode
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
# this is for both x32 and x64
if not isinstance(oper, (envi.archs.i386.disasm.i386ImmOper, envi.archs.i386.disasm.i386ImmMemOper)):
@@ -541,15 +576,16 @@ def extract_op_number_features(f, bb, insn, i, oper):
# .text:00401145 add esp, 0Ch
return
yield Number(v), insn.va
yield OperandNumber(i, v), insn.va
yield Number(v), ihandle.address
yield OperandNumber(i, v), ihandle.address
def extract_op_offset_features(f, bb, insn, i, oper):
def extract_op_offset_features(f, bb, ihandle: InsnHandle, i, oper: envi.Operand) -> Iterator[Tuple[Feature, Address]]:
"""parse structure offset features from the given operand."""
# example:
#
# .text:0040112F cmp [esi+4], ebx
insn: envi.Opcode = ihandle.inner
# this is for both x32 and x64
# like [esi + 4]
@@ -569,8 +605,8 @@ def extract_op_offset_features(f, bb, insn, i, oper):
# viv already decodes offsets as signed
v = oper.disp
yield Offset(v), insn.va
yield OperandOffset(i, v), insn.va
yield Offset(v), ihandle.address
yield OperandOffset(i, v), ihandle.address
# like: [esi + ecx + 16384]
# reg ^ ^
@@ -580,15 +616,19 @@ def extract_op_offset_features(f, bb, insn, i, oper):
# viv already decodes offsets as signed
v = oper.disp
yield Offset(v), insn.va
yield OperandOffset(i, v), insn.va
yield Offset(v), ihandle.address
yield OperandOffset(i, v), ihandle.address
def extract_op_string_features(f, bb, insn, i, oper):
def extract_op_string_features(
fhandle: FunctionHandle, bb, ihandle: InsnHandle, i, oper: envi.Operand
) -> Iterator[Tuple[Feature, Address]]:
"""parse string features from the given operand."""
# example:
#
# push offset aAcr ; "ACR > "
insn: envi.Opcode = ihandle.inner
f: viv_utils.Function = fhandle.inner
if isinstance(oper, envi.archs.i386.disasm.i386ImmOper):
v = oper.getOperValue(oper)
@@ -609,21 +649,23 @@ def extract_op_string_features(f, bb, insn, i, oper):
except ValueError:
continue
else:
yield String(s.rstrip("\x00")), insn.va
yield String(s.rstrip("\x00")), ihandle.address
def extract_operand_features(f, bb, insn):
for i, oper in enumerate(insn.opers):
def extract_operand_features(f: FunctionHandle, bb, insn: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
for i, oper in enumerate(insn.inner.opers):
for op_handler in OPERAND_HANDLERS:
for feature, addr in op_handler(f, bb, insn, i, oper):
yield feature, addr
OPERAND_HANDLERS = (
OPERAND_HANDLERS: List[
Callable[[FunctionHandle, BBHandle, InsnHandle, int, envi.Operand], Iterator[Tuple[Feature, Address]]]
] = [
extract_op_number_features,
extract_op_offset_features,
extract_op_string_features,
)
]
def extract_features(f, bb, insn) -> Iterator[Tuple[Feature, Address]]:
@@ -643,7 +685,7 @@ def extract_features(f, bb, insn) -> Iterator[Tuple[Feature, Address]]:
yield feature, addr
INSTRUCTION_HANDLERS = (
INSTRUCTION_HANDLERS: List[Callable[[FunctionHandle, BBHandle, InsnHandle], Iterator[Tuple[Feature, Address]]]] = [
extract_insn_api_features,
extract_insn_bytes_features,
extract_insn_nzxor_characteristic_features,
@@ -655,4 +697,4 @@ INSTRUCTION_HANDLERS = (
extract_function_calls_from,
extract_function_indirect_call_characteristic_features,
extract_operand_features,
)
]

View File

@@ -120,7 +120,7 @@ def find_instruction_capabilities(
features[feature].add(addr)
# matches found at this instruction.
_, matches = ruleset.match(Scope.INSTRUCTION, features, int(insn))
_, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address)
for rule_name, res in matches.items():
rule = ruleset[rule_name]
@@ -160,7 +160,7 @@ def find_basic_block_capabilities(
features[feature].add(va)
# matches found within this basic block.
_, matches = ruleset.match(Scope.BASIC_BLOCK, features, int(bb))
_, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address)
for rule_name, res in matches.items():
rule = ruleset[rule_name]
@@ -204,7 +204,7 @@ def find_code_capabilities(
for feature, va in itertools.chain(extractor.extract_function_features(f), extractor.extract_global_features()):
function_features[feature].add(va)
_, function_matches = ruleset.match(Scope.FUNCTION, function_features, int(f))
_, function_matches = ruleset.match(Scope.FUNCTION, function_features, f.address)
return function_matches, bb_matches, insn_matches, len(function_features)
@@ -253,12 +253,10 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions")
for f in pb:
function_address = int(f)
if extractor.is_library_function(function_address):
function_name = extractor.get_function_name(function_address)
logger.debug("skipping library function 0x%x (%s)", function_address, function_name)
meta["library_functions"][function_address] = function_name
if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address)
logger.debug("skipping library function 0x%x (%s)", f.address, function_name)
meta["library_functions"][f.address] = function_name
n_libs = len(meta["library_functions"])
percentage = 100 * (n_libs / n_funcs)
if isinstance(pb, tqdm.tqdm):
@@ -266,8 +264,8 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
continue
function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(ruleset, extractor, f)
meta["feature_counts"]["functions"][function_address] = feature_count
logger.debug("analyzed function 0x%x and extracted %d features", function_address, feature_count)
meta["feature_counts"]["functions"][f.address] = feature_count
logger.debug("analyzed function 0x%x and extracted %d features", f.address, feature_count)
for rule_name, res in function_matches.items():
all_function_matches[rule_name].extend(res)
@@ -710,10 +708,10 @@ def compute_layout(rules, extractor, capabilities):
functions_by_bb = {}
bbs_by_function = {}
for f in extractor.get_functions():
bbs_by_function[int(f)] = []
bbs_by_function[f.address] = []
for bb in extractor.get_basic_blocks(f):
functions_by_bb[int(bb)] = int(f)
bbs_by_function[int(f)].append(int(bb))
functions_by_bb[bb.address] = f.address
bbs_by_function[f.address].append(bb.address)
matched_bbs = set()
for rule_name, matches in capabilities.items():