From 36822926af5d384761bbd7722a4963ee1fe735ed Mon Sep 17 00:00:00 2001 From: "Daniel Plohmann (jupiter)" Date: Thu, 29 Oct 2020 11:28:22 +0100 Subject: [PATCH 01/15] initial commit for backend-smda --- capa/features/extractors/smda/__init__.py | 49 +++ capa/features/extractors/smda/basicblock.py | 136 ++++++++ capa/features/extractors/smda/file.py | 139 ++++++++ capa/features/extractors/smda/function.py | 51 +++ capa/features/extractors/smda/insn.py | 343 ++++++++++++++++++++ capa/main.py | 43 ++- setup.py | 1 + tests/fixtures.py | 17 +- tests/test_main.py | 22 +- 9 files changed, 776 insertions(+), 25 deletions(-) create mode 100644 capa/features/extractors/smda/__init__.py create mode 100644 capa/features/extractors/smda/basicblock.py create mode 100644 capa/features/extractors/smda/file.py create mode 100644 capa/features/extractors/smda/function.py create mode 100644 capa/features/extractors/smda/insn.py diff --git a/capa/features/extractors/smda/__init__.py b/capa/features/extractors/smda/__init__.py new file mode 100644 index 00000000..43b6b508 --- /dev/null +++ b/capa/features/extractors/smda/__init__.py @@ -0,0 +1,49 @@ +import sys +import types + +from smda.common.SmdaReport import SmdaReport +from smda.common.SmdaInstruction import SmdaInstruction + +import capa.features.extractors.smda.file +import capa.features.extractors.smda.insn +import capa.features.extractors.smda.function +import capa.features.extractors.smda.basicblock +from capa.features.extractors import FeatureExtractor + + +class SmdaFeatureExtractor(FeatureExtractor): + def __init__(self, smda_report: SmdaReport, path): + super(SmdaFeatureExtractor, self).__init__() + self.smda_report = smda_report + self.path = path + + def get_base_address(self): + return self.smda_report.base_addr + + def extract_file_features(self): + for feature, va in capa.features.extractors.smda.file.extract_features(self.smda_report, self.path): + yield feature, va + + def get_functions(self): + for function in self.smda_report.getFunctions(): + yield function + + def extract_function_features(self, f): + for feature, va in capa.features.extractors.smda.function.extract_features(f): + yield feature, va + + def get_basic_blocks(self, f): + for bb in f.getBlocks(): + yield bb + + def extract_basic_block_features(self, f, bb): + for feature, va in capa.features.extractors.smda.basicblock.extract_features(f, bb): + yield feature, va + + def get_instructions(self, f, bb): + for smda_ins in bb.getInstructions(): + yield smda_ins + + def extract_insn_features(self, f, bb, insn): + for feature, va in capa.features.extractors.smda.insn.extract_features(f, bb, insn): + yield feature, va diff --git a/capa/features/extractors/smda/basicblock.py b/capa/features/extractors/smda/basicblock.py new file mode 100644 index 00000000..432f5dcf --- /dev/null +++ b/capa/features/extractors/smda/basicblock.py @@ -0,0 +1,136 @@ +import sys +import string +import struct + +from capa.features import Characteristic +from capa.features.basicblock import BasicBlock +from capa.features.extractors.helpers import MIN_STACKSTRING_LEN + + +def _bb_has_tight_loop(f, bb): + """ + parse tight loops, true if last instruction in basic block branches to bb start + """ + return bb.offset in f.blockrefs[bb.offset] if bb.offset in f.blockrefs else False + + +def extract_bb_tight_loop(f, bb): + """ check basic block for tight loop indicators """ + if _bb_has_tight_loop(f, bb): + yield Characteristic("tight loop"), bb.offset + + +def _bb_has_stackstring(f, bb): + """ + extract potential stackstring creation, using the following heuristics: + - basic block contains enough moves of constant bytes to the stack + """ + count = 0 + for instr in bb.getInstructions(): + if is_mov_imm_to_stack(instr): + count += get_printable_len(instr.getDetailed()) + if count > MIN_STACKSTRING_LEN: + return True + return False + + +def get_operands(smda_ins): + return [o.strip() for o in smda_ins.operands.split(",")] + + +def extract_stackstring(f, bb): + """ check basic block for stackstring indicators """ + if _bb_has_stackstring(f, bb): + yield Characteristic("stack string"), bb.offset + + +def is_mov_imm_to_stack(smda_ins): + """ + Return if instruction moves immediate onto stack + """ + if not smda_ins.mnemonic.startswith("mov"): + return False + + try: + dst, src = get_operands(smda_ins) + except ValueError: + # not two operands + return False + + try: + int(src, 16) + except ValueError: + return False + + if not any(regname in dst for regname in ["ebp", "rbp", "esp", "rsp"]): + return False + + return True + + +def get_printable_len(instr): + """ + Return string length if all operand bytes are ascii or utf16-le printable + + Works on a capstone instruction + """ + # should have exactly two operands for mov immediate + if len(instr.operands) != 2: + return 0 + + op_value = instr.operands[1].value.imm + + if instr.imm_size == 1: + chars = struct.pack("= 3: + return all(c < 127 and chr(c) in string.printable for c in chars) + else: + return all(ord(c) < 127 and c in string.printable for c in chars) + + def is_printable_utf16le(chars): + if sys.version_info[0] >= 3: + if all(c == 0x00 for c in chars[1::2]): + return is_printable_ascii(chars[::2]) + else: + if all(c == "\x00" for c in chars[1::2]): + return is_printable_ascii(chars[::2]) + + if is_printable_ascii(chars): + return instr.imm_size + if is_printable_utf16le(chars): + return instr.imm_size / 2 + + return 0 + + +def extract_features(f, bb): + """ + extract features from the given basic block. + + args: + f (smda.common.SmdaFunction): the function from which to extract features + bb (smda.common.SmdaBasicBlock): the basic block to process. + + yields: + Feature, set[VA]: the features and their location found in this basic block. + """ + yield BasicBlock(), bb.offset + for bb_handler in BASIC_BLOCK_HANDLERS: + for feature, va in bb_handler(f, bb): + yield feature, va + + +BASIC_BLOCK_HANDLERS = ( + extract_bb_tight_loop, + extract_stackstring, +) diff --git a/capa/features/extractors/smda/file.py b/capa/features/extractors/smda/file.py new file mode 100644 index 00000000..ba643ad4 --- /dev/null +++ b/capa/features/extractors/smda/file.py @@ -0,0 +1,139 @@ +import struct + +# if we have SMDA we definitely have lief +import lief + +import capa.features.extractors.helpers +import capa.features.extractors.strings +from capa.features import String, Characteristic +from capa.features.file import Export, Import, Section + + +def carve(pbytes, offset=0): + """ + Return a list of (offset, size, xor) tuples of embedded PEs + + Based on the version from vivisect: + https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19 + And its IDA adaptation: + capa/features/extractors/ida/file.py + """ + mz_xor = [ + ( + capa.features.extractors.helpers.xor_static(b"MZ", i), + capa.features.extractors.helpers.xor_static(b"PE", i), + i, + ) + for i in range(256) + ] + + pblen = len(pbytes) + todo = [(pbytes.find(mzx, offset), mzx, pex, i) for mzx, pex, i in mz_xor] + todo = [(off, mzx, pex, i) for (off, mzx, pex, i) in todo if off != -1] + + while len(todo): + + off, mzx, pex, i = todo.pop() + + # The MZ header has one field we will check + # e_lfanew is at 0x3c + e_lfanew = off + 0x3C + if pblen < (e_lfanew + 4): + continue + + newoff = struct.unpack(" buffer_end: + return smda_report.buffer[rva:] + else: + return smda_report.buffer[rva : rva + max_bytes] + + +def extract_insn_bytes_features(f, bb, insn): + """ + parse byte sequence features from the given instruction. + example: + # push offset iid_004118d4_IShellLinkA ; riid + """ + for data_ref in insn.getDataRefs(): + bytes_read = read_bytes(f.smda_report, data_ref) + if bytes_read is None: + continue + if capa.features.extractors.helpers.all_zeros(bytes_read): + continue + yield Bytes(bytes_read), insn.offset + + +def detectAsciiLen(smda_report, offset): + if smda_report.buffer is None: + return 0 + ascii_len = 0 + rva = offset - smda_report.base_addr + char = smda_report.buffer[rva] + while char < 127 and chr(char) in string.printable: + ascii_len += 1 + rva += 1 + char = smda_report.buffer[rva] + if char == 0: + return ascii_len + return 0 + + +def detectUnicodeLen(smda_report, offset): + if smda_report.buffer is None: + return 0 + unicode_len = 0 + rva = offset - smda_report.base_addr + char = smda_report.buffer[rva] + second_char = smda_report.buffer[rva + 1] + while char < 127 and chr(char) in string.printable and second_char == 0: + unicode_len += 2 + rva += 2 + char = smda_report.buffer[rva] + second_char = smda_report.buffer[rva + 1] + if char == 0 and second_char == 0: + return unicode_len + return 0 + + +def read_string(smda_report, offset): + alen = detectAsciiLen(smda_report, offset) + if alen > 1: + return read_bytes(smda_report, offset, alen).decode("utf-8") + ulen = detectUnicodeLen(smda_report, offset) + if ulen > 2: + return read_bytes(smda_report, offset, ulen).decode("utf-16") + + +def extract_insn_string_features(f, bb, insn): + """parse string features from the given instruction.""" + # example: + # + # push offset aAcr ; "ACR > " + for data_ref in insn.getDataRefs(): + string_read = read_string(f.smda_report, data_ref) + if string_read: + yield String(string_read.rstrip("\x00")), insn.offset + + +def extract_insn_offset_features(f, bb, insn): + """parse structure offset features from the given instruction.""" + # examples: + # + # mov eax, [esi + 4] + # mov eax, [esi + ecx + 16384] + operands = [o.strip() for o in insn.operands.split(",")] + for operand in operands: + number = None + number_hex = re.search(r"[+\-] (?P0x[a-fA-F0-9]+)", operand) + number_int = re.search(r"[+\-] (?P[0-9])", operand) + if number_hex: + number = int(number_hex.group("num"), 16) + number = -1 * number if number_hex.group().startswith("-") else number + elif number_int: + number = int(number_int.group("num")) + number = -1 * number if number_int.group().startswith("-") else number + if not operand.startswith("0") and number is not None: + yield Offset(number), insn.offset + + +def is_security_cookie(f, bb, insn): + """ + check if an instruction is related to security cookie checks + """ + # security cookie check should use SP or BP + operands = [o.strip() for o in insn.operands.split(",")] + if operands[0] not in ["esp", "ebp", "rsp", "rbp"]: + return False + for index, block in enumerate(f.getBlocks()): + # expect security cookie init in first basic block within first bytes (instructions) + if index == 0 and insn.offset < (block[0].offset + SECURITY_COOKIE_BYTES_DELTA): + return True + # ... or within last bytes (instructions) before a return + if block[-1].mnemonic.startswith("ret") and insn.offset > (block[-1].offset - SECURITY_COOKIE_BYTES_DELTA): + return True + return False + + +def extract_insn_nzxor_characteristic_features(f, bb, insn): + """ + parse non-zeroing XOR instruction from the given instruction. + ignore expected non-zeroing XORs, e.g. security cookies. + """ + + if insn.mnemonic != "xor": + return + + operands = [o.strip() for o in insn.operands.split(",")] + if operands[0] == operands[1]: + return + + if is_security_cookie(f, bb, insn): + return + + yield Characteristic("nzxor"), insn.offset + + +def extract_insn_mnemonic_features(f, bb, insn): + """parse mnemonic features from the given instruction.""" + yield Mnemonic(insn.mnemonic), insn.offset + + +def extract_insn_peb_access_characteristic_features(f, bb, insn): + """ + parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64 + """ + + if insn.mnemonic not in ["push", "mov"]: + return + + operands = [o.strip() for o in insn.operands.split(",")] + for operand in operands: + if "fs:" in operand and "0x30" in operand: + yield Characteristic("peb access"), insn.offset + elif "gs:" in operand and "0x60" in operand: + yield Characteristic("peb access"), insn.offset + + +def extract_insn_segment_access_features(f, bb, insn): + """ parse the instruction for access to fs or gs """ + operands = [o.strip() for o in insn.operands.split(",")] + for operand in operands: + if "fs:" in operand and "0x30" in operand: + yield Characteristic("fs access"), insn.offset + elif "gs:" in operand and "0x60" in operand: + yield Characteristic("gs access"), insn.offset + + +def get_section(vw, va): + for start, length, _, __ in vw.getMemoryMaps(): + if start <= va < start + length: + return start + + raise KeyError(va) + + +def extract_insn_cross_section_cflow(f, bb, insn): + """ + inspect the instruction for a CALL or JMP that crosses section boundaries. + """ + if insn.mnemonic in ["call", "jmp"]: + if insn.offset in f.apirefs: + return + + if insn.offset in f.outrefs: + for target in f.outrefs[insn.offset]: + if not insn.smda_function.smda_report.isAddrWithinMemoryImage(target): + yield Characteristic("cross section flow"), insn.offset + elif insn.operands.startswith("0x"): + target = int(insn.operands, 16) + if not insn.smda_function.smda_report.isAddrWithinMemoryImage(target): + yield Characteristic("cross section flow"), insn.offset + + +# this is a feature that's most relevant at the function scope, +# however, its most efficient to extract at the instruction scope. +def extract_function_calls_from(f, bb, insn): + if insn.mnemonic != "call": + return + + if insn.offset in f.outrefs: + for outref in f.outrefs[insn.offset]: + yield Characteristic("calls from"), outref + + if outref == f.offset: + # if we found a jump target and it's the function address + # mark as recursive + yield Characteristic("recursive call"), outref + + +# this is a feature that's most relevant at the function or basic block scope, +# however, its most efficient to extract at the instruction scope. +def extract_function_indirect_call_characteristic_features(f, bb, insn): + """ + extract indirect function call characteristic (e.g., call eax or call dword ptr [edx+4]) + does not include calls like => call ds:dword_ABD4974 + """ + if insn.mnemonic != "call": + return + if insn.operands.startswith("0x"): + return False + if "qword ptr" in insn.operands and "rip" in insn.operands: + return False + if insn.operands.startswith("dword ptr [0x"): + return False + # call edx + # call dword ptr [eax+50h] + # call qword ptr [rsp+78h] + yield Characteristic("indirect call"), insn.offset + + +def extract_features(f, bb, insn): + """ + extract features from the given insn. + + args: + f (smda.common.SmdaFunction): the function to process. + bb (smda.common.SmdaBasicBlock): the basic block to process. + insn (smda.common.SmdaInstruction): the instruction to process. + + yields: + Feature, set[VA]: the features and their location found in this insn. + """ + for insn_handler in INSTRUCTION_HANDLERS: + for feature, va in insn_handler(f, bb, insn): + yield feature, va + + +INSTRUCTION_HANDLERS = ( + extract_insn_api_features, + extract_insn_number_features, + extract_insn_string_features, + extract_insn_bytes_features, + extract_insn_offset_features, + extract_insn_nzxor_characteristic_features, + extract_insn_mnemonic_features, + extract_insn_peb_access_characteristic_features, + extract_insn_cross_section_cflow, + extract_insn_segment_access_features, + extract_function_calls_from, + extract_function_indirect_call_characteristic_features, +) diff --git a/capa/main.py b/capa/main.py index f1ca7760..2be2802d 100644 --- a/capa/main.py +++ b/capa/main.py @@ -295,7 +295,19 @@ class UnsupportedRuntimeError(RuntimeError): def get_extractor_py3(path, format, disable_progress=False): - raise UnsupportedRuntimeError() + from smda.SmdaConfig import SmdaConfig + from smda.Disassembler import Disassembler + + import capa.features.extractors.smda + + smda_report = None + with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): + config = SmdaConfig() + config.STORE_BUFFER = True + smda_disasm = Disassembler(config) + smda_report = smda_disasm.disassembleFile(path) + + return capa.features.extractors.smda.SmdaFeatureExtractor(smda_report, path) def get_extractor(path, format, disable_progress=False): @@ -446,14 +458,25 @@ def main(argv=None): parser = argparse.ArgumentParser( description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter ) - parser.add_argument( - # in #328 we noticed that the sample path is not handled correctly if it contains non-ASCII characters - # https://stackoverflow.com/a/22947334/ offers a solution and decoding using getfilesystemencoding works - # in our testing, however other sources suggest `sys.stdin.encoding` (https://stackoverflow.com/q/4012571/) - "sample", - type=lambda s: s.decode(sys.getfilesystemencoding()), - help="path to sample to analyze", - ) + # TODO: decode won't work for python3 + if sys.version_info >= (3, 0): + parser.add_argument( + # in #328 we noticed that the sample path is not handled correctly if it contains non-ASCII characters + # https://stackoverflow.com/a/22947334/ offers a solution and decoding using getfilesystemencoding works + # in our testing, however other sources suggest `sys.stdin.encoding` (https://stackoverflow.com/q/4012571/) + "sample", + type=str, + help="path to sample to analyze", + ) + else: + parser.add_argument( + # in #328 we noticed that the sample path is not handled correctly if it contains non-ASCII characters + # https://stackoverflow.com/a/22947334/ offers a solution and decoding using getfilesystemencoding works + # in our testing, however other sources suggest `sys.stdin.encoding` (https://stackoverflow.com/q/4012571/) + "sample", + type=lambda s: s.decode(sys.getfilesystemencoding()), + help="path to sample to analyze", + ) parser.add_argument("--version", action="version", version="%(prog)s {:s}".format(capa.version.__version__)) parser.add_argument( "-r", @@ -550,7 +573,7 @@ def main(argv=None): # during the load of the RuleSet, we extract subscope statements into their own rules # that are subsequently `match`ed upon. this inflates the total rule count. # so, filter out the subscope rules when reporting total number of loaded rules. - len(filter(lambda r: "capa/subscope-rule" not in r.meta, rules.rules.values())), + len([i for i in filter(lambda r: "capa/subscope-rule" not in r.meta, rules.rules.values())]), ) if args.tag: rules = rules.filter_rules_by_meta(args.tag) diff --git a/setup.py b/setup.py index 4c09dd04..4cf4aa8b 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ requirements = [ if sys.version_info >= (3, 0): # py3 requirements.append("networkx") + requirements.append("smda") else: # py2 requirements.append("enum34==1.1.6") # v1.1.6 is needed by halo 0.0.30 / spinners 0.0.24 diff --git a/tests/fixtures.py b/tests/fixtures.py index 3ff40f6c..4e6a907f 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -81,6 +81,21 @@ def get_viv_extractor(path): return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path) +@lru_cache() +def get_smda_extractor(path): + from smda.SmdaConfig import SmdaConfig + from smda.Disassembler import Disassembler + + import capa.features.extractors.smda + + config = SmdaConfig() + config.STORE_BUFFER = True + disasm = Disassembler(config) + report = disasm.disassembleFile(path) + + return capa.features.extractors.smda.SmdaFeatureExtractor(report, path) + + @lru_cache() def extract_file_features(extractor): features = collections.defaultdict(set) @@ -473,7 +488,7 @@ def do_test_feature_count(get_extractor, sample, scope, feature, expected): def get_extractor(path): if sys.version_info >= (3, 0): - raise RuntimeError("no supported py3 backends yet") + extractor = get_smda_extractor(path) else: extractor = get_viv_extractor(path) diff --git a/tests/test_main.py b/tests/test_main.py index 6ceae34a..783fc95c 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -19,7 +19,6 @@ import capa.features from capa.engine import * -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_main(z9324d_extractor): # tests rules can be loaded successfully and all output modes path = z9324d_extractor.path @@ -29,7 +28,6 @@ def test_main(z9324d_extractor): assert capa.main.main([path]) == 0 -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_main_single_rule(z9324d_extractor, tmpdir): # tests a single rule can be loaded successfully RULE_CONTENT = textwrap.dedent( @@ -58,7 +56,6 @@ def test_main_single_rule(z9324d_extractor, tmpdir): ) -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_main_non_ascii_filename(pingtaest_extractor, tmpdir, capsys): # on py2.7, need to be careful about str (which can hold bytes) # vs unicode (which is only unicode characters). @@ -71,18 +68,22 @@ def test_main_non_ascii_filename(pingtaest_extractor, tmpdir, capsys): std = capsys.readouterr() # but here, we have to use a unicode instance, # because capsys has decoded the output for us. - assert pingtaest_extractor.path.decode("utf-8") in std.out + if sys.version_info >= (3, 0): + assert pingtaest_extractor.path in std.out + else: + assert pingtaest_extractor.path.decode("utf-8") in std.out -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_main_non_ascii_filename_nonexistent(tmpdir, caplog): NON_ASCII_FILENAME = "täst_not_there.exe" assert capa.main.main(["-q", NON_ASCII_FILENAME]) == -1 - assert NON_ASCII_FILENAME.decode("utf-8") in caplog.text + if sys.version_info >= (3, 0): + assert NON_ASCII_FILENAME in caplog.text + else: + assert NON_ASCII_FILENAME.decode("utf-8") in caplog.text -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_main_shellcode(z499c2_extractor): path = z499c2_extractor.path assert capa.main.main([path, "-vv", "-f", "sc32"]) == 0 @@ -137,7 +138,6 @@ def test_ruleset(): assert len(rules.basic_block_rules) == 1 -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_match_across_scopes_file_function(z9324d_extractor): rules = capa.rules.RuleSet( [ @@ -201,7 +201,6 @@ def test_match_across_scopes_file_function(z9324d_extractor): assert ".text section and install service" in capabilities -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_match_across_scopes(z9324d_extractor): rules = capa.rules.RuleSet( [ @@ -264,7 +263,6 @@ def test_match_across_scopes(z9324d_extractor): assert "kill thread program" in capabilities -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_subscope_bb_rules(z9324d_extractor): rules = capa.rules.RuleSet( [ @@ -289,7 +287,6 @@ def test_subscope_bb_rules(z9324d_extractor): assert "test rule" in capabilities -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_byte_matching(z9324d_extractor): rules = capa.rules.RuleSet( [ @@ -312,7 +309,6 @@ def test_byte_matching(z9324d_extractor): assert "byte match test" in capabilities -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_count_bb(z9324d_extractor): rules = capa.rules.RuleSet( [ @@ -336,7 +332,6 @@ def test_count_bb(z9324d_extractor): assert "count bb" in capabilities -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_fix262(pma16_01_extractor, capsys): # tests rules can be loaded successfully and all output modes path = pma16_01_extractor.path @@ -347,7 +342,6 @@ def test_fix262(pma16_01_extractor, capsys): assert "www.practicalmalwareanalysis.com" not in std.out -@pytest.mark.xfail(sys.version_info >= (3, 0), reason="vivsect only works on py2") def test_not_render_rules_also_matched(z9324d_extractor, capsys): # rules that are also matched by other rules should not get rendered by default. # this cuts down on the amount of output while giving approx the same detail. From 60ddf0400e69f2e22c12c2a0a64913bbfe21e7ba Mon Sep 17 00:00:00 2001 From: "Daniel Plohmann (jupiter)" Date: Thu, 29 Oct 2020 17:47:10 +0100 Subject: [PATCH 02/15] addressing review --- capa/features/extractors/smda/basicblock.py | 30 ++++++++++--------- capa/features/extractors/smda/insn.py | 32 +++++++++------------ 2 files changed, 29 insertions(+), 33 deletions(-) diff --git a/capa/features/extractors/smda/basicblock.py b/capa/features/extractors/smda/basicblock.py index 432f5dcf..4d130505 100644 --- a/capa/features/extractors/smda/basicblock.py +++ b/capa/features/extractors/smda/basicblock.py @@ -68,6 +68,22 @@ def is_mov_imm_to_stack(smda_ins): return True +def is_printable_ascii(chars): + if sys.version_info[0] >= 3: + return all(c < 127 and chr(c) in string.printable for c in chars) + else: + return all(ord(c) < 127 and c in string.printable for c in chars) + + +def is_printable_utf16le(chars): + if sys.version_info[0] >= 3: + if all(c == 0x00 for c in chars[1::2]): + return is_printable_ascii(chars[::2]) + else: + if all(c == "\x00" for c in chars[1::2]): + return is_printable_ascii(chars[::2]) + + def get_printable_len(instr): """ Return string length if all operand bytes are ascii or utf16-le printable @@ -91,20 +107,6 @@ def get_printable_len(instr): else: raise ValueError("Unhandled operand data type 0x%x." % instr.imm_size) - def is_printable_ascii(chars): - if sys.version_info[0] >= 3: - return all(c < 127 and chr(c) in string.printable for c in chars) - else: - return all(ord(c) < 127 and c in string.printable for c in chars) - - def is_printable_utf16le(chars): - if sys.version_info[0] >= 3: - if all(c == 0x00 for c in chars[1::2]): - return is_printable_ascii(chars[::2]) - else: - if all(c == "\x00" for c in chars[1::2]): - return is_printable_ascii(chars[::2]) - if is_printable_ascii(chars): return instr.imm_size if is_printable_utf16le(chars): diff --git a/capa/features/extractors/smda/insn.py b/capa/features/extractors/smda/insn.py index 0b2b4b3e..e33d86c4 100644 --- a/capa/features/extractors/smda/insn.py +++ b/capa/features/extractors/smda/insn.py @@ -18,9 +18,11 @@ from capa.features.insn import API, Number, Offset, Mnemonic # security cookie checks may perform non-zeroing XORs, these are expected within a certain # byte range within the first and returning basic blocks, this helps to reduce FP features SECURITY_COOKIE_BYTES_DELTA = 0x40 +PATTERN_HEXNUM = re.compile(r"[+\-] (?P0x[a-fA-F0-9]+)") +PATTERN_SINGLENUM = re.compile(r"[+\-] (?P[0-9])") -def get_arch(smda_report: SmdaReport): +def get_arch(smda_report): if smda_report.architecture == "intel": if smda_report.bitness == 32: return ARCH_X32 @@ -52,8 +54,8 @@ def extract_insn_api_features(f, bb, insn): # reformat dll_name, api_name = api_entry.split("!") dll_name = dll_name.split(".")[0] - name = dll_name + "." + api_name - yield API(name), insn.offset + for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name): + yield API(name), insn.offset def extract_insn_number_features(f, bb, insn): @@ -106,7 +108,7 @@ def extract_insn_bytes_features(f, bb, insn): yield Bytes(bytes_read), insn.offset -def detectAsciiLen(smda_report, offset): +def detect_ascii_len(smda_report, offset): if smda_report.buffer is None: return 0 ascii_len = 0 @@ -121,7 +123,7 @@ def detectAsciiLen(smda_report, offset): return 0 -def detectUnicodeLen(smda_report, offset): +def detect_unicode_len(smda_report, offset): if smda_report.buffer is None: return 0 unicode_len = 0 @@ -139,10 +141,10 @@ def detectUnicodeLen(smda_report, offset): def read_string(smda_report, offset): - alen = detectAsciiLen(smda_report, offset) + alen = detect_ascii_len(smda_report, offset) if alen > 1: return read_bytes(smda_report, offset, alen).decode("utf-8") - ulen = detectUnicodeLen(smda_report, offset) + ulen = detect_unicode_len(smda_report, offset) if ulen > 2: return read_bytes(smda_report, offset, ulen).decode("utf-16") @@ -167,8 +169,8 @@ def extract_insn_offset_features(f, bb, insn): operands = [o.strip() for o in insn.operands.split(",")] for operand in operands: number = None - number_hex = re.search(r"[+\-] (?P0x[a-fA-F0-9]+)", operand) - number_int = re.search(r"[+\-] (?P[0-9])", operand) + number_hex = re.search(PATTERN_HEXNUM, operand) + number_int = re.search(PATTERN_SINGLENUM, operand) if number_hex: number = int(number_hex.group("num"), 16) number = -1 * number if number_hex.group().startswith("-") else number @@ -241,20 +243,12 @@ def extract_insn_segment_access_features(f, bb, insn): """ parse the instruction for access to fs or gs """ operands = [o.strip() for o in insn.operands.split(",")] for operand in operands: - if "fs:" in operand and "0x30" in operand: + if "fs:" in operand: yield Characteristic("fs access"), insn.offset - elif "gs:" in operand and "0x60" in operand: + elif "gs:" in operand: yield Characteristic("gs access"), insn.offset -def get_section(vw, va): - for start, length, _, __ in vw.getMemoryMaps(): - if start <= va < start + length: - return start - - raise KeyError(va) - - def extract_insn_cross_section_cflow(f, bb, insn): """ inspect the instruction for a CALL or JMP that crosses section boundaries. From b12d0b64249caf836e9aa3ae76494ae4a0c0f3bc Mon Sep 17 00:00:00 2001 From: Jon Crussell Date: Thu, 29 Oct 2020 09:56:28 -0700 Subject: [PATCH 03/15] tests: add smda backend test 40 failed, 73 passed. --- tests/test_smda_features.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 tests/test_smda_features.py diff --git a/tests/test_smda_features.py b/tests/test_smda_features.py new file mode 100644 index 00000000..ef586278 --- /dev/null +++ b/tests/test_smda_features.py @@ -0,0 +1,28 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import sys + +from fixtures import * + + +@parametrize( + "sample,scope,feature,expected", + FEATURE_PRESENCE_TESTS, + indirect=["sample", "scope"], +) +def test_smda_features(sample, scope, feature, expected): + do_test_feature_presence(get_smda_extractor, sample, scope, feature, expected) + + +@parametrize( + "sample,scope,feature,expected", + FEATURE_COUNT_TESTS, + indirect=["sample", "scope"], +) +def test_smda_feature_counts(sample, scope, feature, expected): + do_test_feature_count(get_smda_extractor, sample, scope, feature, expected) From 74b2c182961b8944738abe5a6ba7205380f24cd0 Mon Sep 17 00:00:00 2001 From: "Daniel Plohmann (jupiter)" Date: Thu, 29 Oct 2020 20:05:50 +0100 Subject: [PATCH 04/15] down to 14 failed --- capa/features/extractors/smda/file.py | 8 +++--- capa/features/extractors/smda/insn.py | 38 ++++++++++++++++++++------- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/capa/features/extractors/smda/file.py b/capa/features/extractors/smda/file.py index ba643ad4..e10fc171 100644 --- a/capa/features/extractors/smda/file.py +++ b/capa/features/extractors/smda/file.py @@ -67,7 +67,7 @@ def extract_file_export_names(smda_report, file_path): lief_binary = lief.parse(file_path) if lief_binary is not None: for function in lief_binary.exported_functions: - yield function.name, function.address + yield Export(function.name), function.address def extract_file_import_names(smda_report, file_path): @@ -76,14 +76,16 @@ def extract_file_import_names(smda_report, file_path): if not isinstance(lief_binary, lief.PE.Binary): return for imported_library in lief_binary.imports: + library_name = imported_library.name.lower() + library_name = library_name[:-4] if library_name.endswith(".dll") else library_name for func in imported_library.entries: if func.name: va = func.iat_address + smda_report.base_addr - for name in capa.features.extractors.helpers.generate_symbols(imported_library.name, func.name): + for name in capa.features.extractors.helpers.generate_symbols(library_name, func.name): yield Import(name), va elif func.is_ordinal: for name in capa.features.extractors.helpers.generate_symbols( - imported_library.name, "#%s" % func.ordinal + library_name, "#%s" % func.ordinal ): yield Import(name), va diff --git a/capa/features/extractors/smda/insn.py b/capa/features/extractors/smda/insn.py index e33d86c4..89ce4320 100644 --- a/capa/features/extractors/smda/insn.py +++ b/capa/features/extractors/smda/insn.py @@ -54,8 +54,21 @@ def extract_insn_api_features(f, bb, insn): # reformat dll_name, api_name = api_entry.split("!") dll_name = dll_name.split(".")[0] + dll_name = dll_name.lower() for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name): yield API(name), insn.offset + elif insn.offset in f.outrefs: + for target in f.outrefs[insn.offset]: + target_function = f.smda_report.getFunction(target) + if target_function is not None and target_function.isThunkCall(): + api_entry = target_function.apirefs[target] if target in target_function.apirefs else None + if api_entry: + # reformat + dll_name, api_name = api_entry.split("!") + dll_name = dll_name.split(".")[0] + dll_name = dll_name.lower() + for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name): + yield API(name), insn.offset def extract_insn_number_features(f, bb, insn): @@ -64,17 +77,18 @@ def extract_insn_number_features(f, bb, insn): # # push 3136B0h ; dwControlCode operands = [o.strip() for o in insn.operands.split(",")] + if insn.mnemonic == "add" and operands[0] in ["esp", "rsp"]: + # skip things like: + # + # .text:00401140 call sub_407E2B + # .text:00401145 add esp, 0Ch + return for operand in operands: - if insn.mnemonic == "add" and operands[0] in ["esp", "rsp"]: - # skip things like: - # - # .text:00401140 call sub_407E2B - # .text:00401145 add esp, 0Ch - return try: yield Number(int(operand, 16)), insn.offset + yield Number(int(operand, 16), arch=get_arch(f.smda_report)), insn.offset except: - return + continue def read_bytes(smda_report, va, num_bytes=None): @@ -168,7 +182,11 @@ def extract_insn_offset_features(f, bb, insn): # mov eax, [esi + ecx + 16384] operands = [o.strip() for o in insn.operands.split(",")] for operand in operands: - number = None + if not "ptr" in operand: + continue + if "esp" in operand or "ebp" in operand or "rbp" in operand: + continue + number = 0 number_hex = re.search(PATTERN_HEXNUM, operand) number_int = re.search(PATTERN_SINGLENUM, operand) if number_hex: @@ -177,8 +195,8 @@ def extract_insn_offset_features(f, bb, insn): elif number_int: number = int(number_int.group("num")) number = -1 * number if number_int.group().startswith("-") else number - if not operand.startswith("0") and number is not None: - yield Offset(number), insn.offset + yield Offset(number), insn.offset + yield Offset(number, arch=get_arch(f.smda_report)), insn.offset def is_security_cookie(f, bb, insn): From 8f6a46e2d8ce9f02e017496f9f08f8b4f2e1bf5b Mon Sep 17 00:00:00 2001 From: Jon Crussell Date: Fri, 30 Oct 2020 07:01:07 -0700 Subject: [PATCH 05/15] add check for pointer to string Check if memory referenced is a pointer to a string. Fixes mimikatz string test. --- capa/features/extractors/smda/insn.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/capa/features/extractors/smda/insn.py b/capa/features/extractors/smda/insn.py index 89ce4320..b83d7562 100644 --- a/capa/features/extractors/smda/insn.py +++ b/capa/features/extractors/smda/insn.py @@ -1,5 +1,6 @@ import re import string +import struct from smda.common.SmdaReport import SmdaReport @@ -172,6 +173,18 @@ def extract_insn_string_features(f, bb, insn): string_read = read_string(f.smda_report, data_ref) if string_read: yield String(string_read.rstrip("\x00")), insn.offset + continue + + # test to see if we're referencing a pointer and that points to a string + bytes_ = read_bytes(insn.smda_function.smda_report, data_ref, num_bytes=4) + val = struct.unpack("I", bytes_)[0] + if val and insn.smda_function.smda_report.isAddrWithinMemoryImage(val): + # it is a pointer, check if it points to a string + string_read = read_string(f.smda_report, val) + if string_read: + yield String(string_read.rstrip("\x00")), insn.offset + continue + def extract_insn_offset_features(f, bb, insn): From 0c85e7604c15db8b31b4c72f67562f3c73121fab Mon Sep 17 00:00:00 2001 From: Jon Crussell Date: Fri, 30 Oct 2020 07:23:24 -0700 Subject: [PATCH 06/15] use magical derefs Found derefs in viv/insn.py, does exactly what we need! --- capa/features/extractors/smda/insn.py | 59 ++++++++++++++++++--------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/capa/features/extractors/smda/insn.py b/capa/features/extractors/smda/insn.py index b83d7562..8ac57714 100644 --- a/capa/features/extractors/smda/insn.py +++ b/capa/features/extractors/smda/insn.py @@ -108,6 +108,36 @@ def read_bytes(smda_report, va, num_bytes=None): return smda_report.buffer[rva : rva + max_bytes] +def derefs(smda_report, p): + """ + recursively follow the given pointer, yielding the valid memory addresses along the way. + useful when you may have a pointer to string, or pointer to pointer to string, etc. + + this is a "do what i mean" type of helper function. + + based on the implementation in viv/insn.py + """ + depth = 0 + while True: + if not smda_report.isAddrWithinMemoryImage(p): + return + yield p + + bytes_ = read_bytes(smda_report, p, num_bytes=4) + val = struct.unpack("I", bytes_)[0] + + # sanity: pointer points to self + if val == p: + return + + # sanity: avoid chains of pointers that are unreasonably deep + depth += 1 + if depth > 10: + return + + p = val + + def extract_insn_bytes_features(f, bb, insn): """ parse byte sequence features from the given instruction. @@ -115,12 +145,14 @@ def extract_insn_bytes_features(f, bb, insn): # push offset iid_004118d4_IShellLinkA ; riid """ for data_ref in insn.getDataRefs(): - bytes_read = read_bytes(f.smda_report, data_ref) - if bytes_read is None: - continue - if capa.features.extractors.helpers.all_zeros(bytes_read): - continue - yield Bytes(bytes_read), insn.offset + for v in derefs(f.smda_report, data_ref): + bytes_read = read_bytes(f.smda_report, v) + if bytes_read is None: + continue + if capa.features.extractors.helpers.all_zeros(bytes_read): + continue + + yield Bytes(bytes_read), insn.offset def detect_ascii_len(smda_report, offset): @@ -170,21 +202,10 @@ def extract_insn_string_features(f, bb, insn): # # push offset aAcr ; "ACR > " for data_ref in insn.getDataRefs(): - string_read = read_string(f.smda_report, data_ref) - if string_read: - yield String(string_read.rstrip("\x00")), insn.offset - continue - - # test to see if we're referencing a pointer and that points to a string - bytes_ = read_bytes(insn.smda_function.smda_report, data_ref, num_bytes=4) - val = struct.unpack("I", bytes_)[0] - if val and insn.smda_function.smda_report.isAddrWithinMemoryImage(val): - # it is a pointer, check if it points to a string - string_read = read_string(f.smda_report, val) + for v in derefs(f.smda_report, data_ref): + string_read = read_string(f.smda_report, v) if string_read: yield String(string_read.rstrip("\x00")), insn.offset - continue - def extract_insn_offset_features(f, bb, insn): From 4a0f1f22bae364a40831dd045f38247b9457ec12 Mon Sep 17 00:00:00 2001 From: "Daniel Plohmann (jupiter)" Date: Fri, 30 Oct 2020 15:25:42 +0100 Subject: [PATCH 07/15] test fixes --- capa/features/extractors/smda/insn.py | 47 ++++++++++++++++++--------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/capa/features/extractors/smda/insn.py b/capa/features/extractors/smda/insn.py index 89ce4320..07c57929 100644 --- a/capa/features/extractors/smda/insn.py +++ b/capa/features/extractors/smda/insn.py @@ -57,18 +57,29 @@ def extract_insn_api_features(f, bb, insn): dll_name = dll_name.lower() for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name): yield API(name), insn.offset + # TODO SMDA: we want to check this recursively! elif insn.offset in f.outrefs: - for target in f.outrefs[insn.offset]: - target_function = f.smda_report.getFunction(target) - if target_function is not None and target_function.isThunkCall(): - api_entry = target_function.apirefs[target] if target in target_function.apirefs else None - if api_entry: - # reformat - dll_name, api_name = api_entry.split("!") - dll_name = dll_name.split(".")[0] - dll_name = dll_name.lower() - for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name): - yield API(name), insn.offset + current_function = f + current_instruction = insn + for _ in range(THUNK_CHAIN_DEPTH_DELTA): + if len(current_function.outrefs[current_instruction.offset]) == 1: + target = current_function.outrefs[current_instruction.offset][0] + referenced_function = current_function.smda_report.getFunction(target) + if referenced_function: + if referenced_function.isThunkCall(): + api_entry = referenced_function.apirefs[target] if target in referenced_function.apirefs else None + if api_entry: + # reformat + dll_name, api_name = api_entry.split("!") + dll_name = dll_name.split(".")[0] + dll_name = dll_name.lower() + for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name): + yield API(name), insn.offset + elif referenced_function.num_instructions == 1 and referenced_function.num_outrefs == 1: + current_function = referenced_function + current_instruction = [i for i in referenced_function.getInstructions()][0] + else: + return def extract_insn_number_features(f, bb, insn): @@ -205,14 +216,15 @@ def is_security_cookie(f, bb, insn): """ # security cookie check should use SP or BP operands = [o.strip() for o in insn.operands.split(",")] - if operands[0] not in ["esp", "ebp", "rsp", "rbp"]: + if operands[1] not in ["esp", "ebp", "rsp", "rbp"]: return False for index, block in enumerate(f.getBlocks()): # expect security cookie init in first basic block within first bytes (instructions) - if index == 0 and insn.offset < (block[0].offset + SECURITY_COOKIE_BYTES_DELTA): + block_instructions = [i for i in block.getInstructions()] + if index == 0 and insn.offset < (block_instructions[0].offset + SECURITY_COOKIE_BYTES_DELTA): return True # ... or within last bytes (instructions) before a return - if block[-1].mnemonic.startswith("ret") and insn.offset > (block[-1].offset - SECURITY_COOKIE_BYTES_DELTA): + if block_instructions[-1].mnemonic.startswith("ret") and insn.offset > (block_instructions[-1].offset - SECURITY_COOKIE_BYTES_DELTA): return True return False @@ -275,13 +287,14 @@ def extract_insn_cross_section_cflow(f, bb, insn): if insn.offset in f.apirefs: return + smda_report = insn.smda_function.smda_report if insn.offset in f.outrefs: for target in f.outrefs[insn.offset]: - if not insn.smda_function.smda_report.isAddrWithinMemoryImage(target): + if smda_report.getSection(insn.offset) != smda_report.getSection(target): yield Characteristic("cross section flow"), insn.offset elif insn.operands.startswith("0x"): target = int(insn.operands, 16) - if not insn.smda_function.smda_report.isAddrWithinMemoryImage(target): + if smda_report.getSection(insn.offset) != smda_report.getSection(target): yield Characteristic("cross section flow"), insn.offset @@ -299,6 +312,8 @@ def extract_function_calls_from(f, bb, insn): # if we found a jump target and it's the function address # mark as recursive yield Characteristic("recursive call"), outref + if insn.offset in f.apirefs: + yield Characteristic("calls from"), f.apirefs[insn.offset] # this is a feature that's most relevant at the function or basic block scope, From d276a07a7117038516951643377642bf11186441 Mon Sep 17 00:00:00 2001 From: "Daniel Plohmann (jupiter)" Date: Fri, 30 Oct 2020 15:29:38 +0100 Subject: [PATCH 08/15] comments on a test where disassembly differs among backends --- tests/fixtures.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/fixtures.py b/tests/fixtures.py index 4e6a907f..2a7051d2 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -393,6 +393,11 @@ FEATURE_PRESENCE_TESTS = [ ("kernel32-64", "function=0x1800202B0", capa.features.insn.API("RtlCaptureContext"), True), # insn/api: x64 nested thunk ("82bf6", "function=0x140059342", capa.features.insn.API("ElfClearEventLogFile"), True), + # TODO decide how to adjust the above test to make it compatible across disassemblers + # this is a test adjusted to the function entry point when disassembled by IDA/SMDA: + # ("82bf6", "function=14005E0C0", capa.features.insn.API("ElfClearEventLogFile"), True), + # this is another x64 nested thunk, but function is not recognized by vivisect: + # ("82bf6", "function=0x1400615c0", capa.features.insn.API("IsProcessorFeaturePresent"), True), # insn/api: call via jmp ("mimikatz", "function=0x40B3C6", capa.features.insn.API("LocalFree"), True), ("c91887...", "function=0x40156F", capa.features.insn.API("CloseClipboard"), True), From 6bcdf64f679e28afa98c95cb15a08b2a442f5556 Mon Sep 17 00:00:00 2001 From: "Daniel Plohmann (jupiter)" Date: Fri, 30 Oct 2020 15:34:02 +0100 Subject: [PATCH 09/15] formatting --- capa/features/extractors/smda/file.py | 4 +--- capa/features/extractors/smda/insn.py | 8 ++++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/capa/features/extractors/smda/file.py b/capa/features/extractors/smda/file.py index e10fc171..6f690dbe 100644 --- a/capa/features/extractors/smda/file.py +++ b/capa/features/extractors/smda/file.py @@ -84,9 +84,7 @@ def extract_file_import_names(smda_report, file_path): for name in capa.features.extractors.helpers.generate_symbols(library_name, func.name): yield Import(name), va elif func.is_ordinal: - for name in capa.features.extractors.helpers.generate_symbols( - library_name, "#%s" % func.ordinal - ): + for name in capa.features.extractors.helpers.generate_symbols(library_name, "#%s" % func.ordinal): yield Import(name), va diff --git a/capa/features/extractors/smda/insn.py b/capa/features/extractors/smda/insn.py index be0455c1..32845e29 100644 --- a/capa/features/extractors/smda/insn.py +++ b/capa/features/extractors/smda/insn.py @@ -68,7 +68,9 @@ def extract_insn_api_features(f, bb, insn): referenced_function = current_function.smda_report.getFunction(target) if referenced_function: if referenced_function.isThunkCall(): - api_entry = referenced_function.apirefs[target] if target in referenced_function.apirefs else None + api_entry = ( + referenced_function.apirefs[target] if target in referenced_function.apirefs else None + ) if api_entry: # reformat dll_name, api_name = api_entry.split("!") @@ -258,7 +260,9 @@ def is_security_cookie(f, bb, insn): if index == 0 and insn.offset < (block_instructions[0].offset + SECURITY_COOKIE_BYTES_DELTA): return True # ... or within last bytes (instructions) before a return - if block_instructions[-1].mnemonic.startswith("ret") and insn.offset > (block_instructions[-1].offset - SECURITY_COOKIE_BYTES_DELTA): + if block_instructions[-1].mnemonic.startswith("ret") and insn.offset > ( + block_instructions[-1].offset - SECURITY_COOKIE_BYTES_DELTA + ): return True return False From 3a43ffa641715f2ea3dad4225c882703c41b6d1e Mon Sep 17 00:00:00 2001 From: "pnx@pyrite" Date: Thu, 5 Nov 2020 12:58:07 +0100 Subject: [PATCH 10/15] adjusted identification of thunks via SMDA. --- capa/features/extractors/smda/insn.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/capa/features/extractors/smda/insn.py b/capa/features/extractors/smda/insn.py index 32845e29..af29bbbc 100644 --- a/capa/features/extractors/smda/insn.py +++ b/capa/features/extractors/smda/insn.py @@ -58,16 +58,16 @@ def extract_insn_api_features(f, bb, insn): dll_name = dll_name.lower() for name in capa.features.extractors.helpers.generate_symbols(dll_name, api_name): yield API(name), insn.offset - # TODO SMDA: we want to check this recursively! elif insn.offset in f.outrefs: current_function = f current_instruction = insn - for _ in range(THUNK_CHAIN_DEPTH_DELTA): - if len(current_function.outrefs[current_instruction.offset]) == 1: + for index in range(THUNK_CHAIN_DEPTH_DELTA): + if current_function and len(current_function.outrefs[current_instruction.offset]) == 1: target = current_function.outrefs[current_instruction.offset][0] referenced_function = current_function.smda_report.getFunction(target) if referenced_function: - if referenced_function.isThunkCall(): + # TODO SMDA: implement this function for both jmp and call, checking if function has 1 instruction which refs an API + if referenced_function.isApiThunk(): api_entry = ( referenced_function.apirefs[target] if target in referenced_function.apirefs else None ) From 1e25604b0b7ced8dcdb3a0bb80ed8324d550960f Mon Sep 17 00:00:00 2001 From: "pnx@pyrite" Date: Thu, 5 Nov 2020 16:31:47 +0100 Subject: [PATCH 11/15] replacement test for nested x64 thunks - still needs to be verified for vivisect --- tests/fixtures.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index 2a7051d2..640bc222 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -144,6 +144,8 @@ def get_data_path_by_name(name): return os.path.join(CD, "data", "Practical Malware Analysis Lab 21-01.exe_") elif name == "al-khaser x86": return os.path.join(CD, "data", "al-khaser_x86.exe_") + elif name == "al-khaser x64": + return os.path.join(CD, "data", "al-khaser_x64.exe_") elif name.startswith("39c05"): return os.path.join(CD, "data", "39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.dll_") elif name.startswith("499c2"): @@ -392,12 +394,7 @@ FEATURE_PRESENCE_TESTS = [ ), ("kernel32-64", "function=0x1800202B0", capa.features.insn.API("RtlCaptureContext"), True), # insn/api: x64 nested thunk - ("82bf6", "function=0x140059342", capa.features.insn.API("ElfClearEventLogFile"), True), - # TODO decide how to adjust the above test to make it compatible across disassemblers - # this is a test adjusted to the function entry point when disassembled by IDA/SMDA: - # ("82bf6", "function=14005E0C0", capa.features.insn.API("ElfClearEventLogFile"), True), - # this is another x64 nested thunk, but function is not recognized by vivisect: - # ("82bf6", "function=0x1400615c0", capa.features.insn.API("IsProcessorFeaturePresent"), True), + ("al-khaser x64", "function=0x14004B4F0", capa.features.insn.API("__vcrt_GetModuleHandle"), True), # insn/api: call via jmp ("mimikatz", "function=0x40B3C6", capa.features.insn.API("LocalFree"), True), ("c91887...", "function=0x40156F", capa.features.insn.API("CloseClipboard"), True), From 7d4888bb777246cc43b6d8a92bc991e6d7d2adf5 Mon Sep 17 00:00:00 2001 From: "Daniel Plohmann (jupiter)" Date: Fri, 6 Nov 2020 10:09:06 +0100 Subject: [PATCH 12/15] addressing the comments in the PR discussion --- capa/features/extractors/smda/basicblock.py | 15 ++++----------- capa/features/extractors/smda/function.py | 15 +-------------- capa/features/extractors/smda/insn.py | 15 --------------- 3 files changed, 5 insertions(+), 40 deletions(-) diff --git a/capa/features/extractors/smda/basicblock.py b/capa/features/extractors/smda/basicblock.py index 4d130505..07477139 100644 --- a/capa/features/extractors/smda/basicblock.py +++ b/capa/features/extractors/smda/basicblock.py @@ -69,19 +69,12 @@ def is_mov_imm_to_stack(smda_ins): def is_printable_ascii(chars): - if sys.version_info[0] >= 3: - return all(c < 127 and chr(c) in string.printable for c in chars) - else: - return all(ord(c) < 127 and c in string.printable for c in chars) + return all(c < 127 and chr(c) in string.printable for c in chars) def is_printable_utf16le(chars): - if sys.version_info[0] >= 3: - if all(c == 0x00 for c in chars[1::2]): - return is_printable_ascii(chars[::2]) - else: - if all(c == "\x00" for c in chars[1::2]): - return is_printable_ascii(chars[::2]) + if all(c == 0x00 for c in chars[1::2]): + return is_printable_ascii(chars[::2]) def get_printable_len(instr): @@ -110,7 +103,7 @@ def get_printable_len(instr): if is_printable_ascii(chars): return instr.imm_size if is_printable_utf16le(chars): - return instr.imm_size / 2 + return instr.imm_size // 2 return 0 diff --git a/capa/features/extractors/smda/function.py b/capa/features/extractors/smda/function.py index e8f3729c..e1765e67 100644 --- a/capa/features/extractors/smda/function.py +++ b/capa/features/extractors/smda/function.py @@ -2,19 +2,6 @@ from capa.features import Characteristic from capa.features.extractors import loops -def interface_extract_function_XXX(f): - """ - parse features from the given function. - - args: - f (viv_utils.Function): the function to process. - - yields: - (Feature, int): the feature and the address at which its found. - """ - yield NotImplementedError("feature"), NotImplementedError("virtual address") - - def extract_function_calls_to(f): for inref in f.inrefs: yield Characteristic("calls to"), inref @@ -38,7 +25,7 @@ def extract_features(f): extract features from the given function. args: - f (viv_utils.Function): the function from which to extract features + f (smda.common.SmdaFunction): the function from which to extract features yields: Feature, set[VA]: the features and their location found in this function. diff --git a/capa/features/extractors/smda/insn.py b/capa/features/extractors/smda/insn.py index af29bbbc..8fa4e221 100644 --- a/capa/features/extractors/smda/insn.py +++ b/capa/features/extractors/smda/insn.py @@ -33,21 +33,6 @@ def get_arch(smda_report): raise NotImplementedError -def interface_extract_instruction_XXX(f, bb, insn): - """ - parse features from the given instruction. - - args: - f (smda.common.SmdaFunction): the function to process. - bb (smda.common.SmdaBasicBlock): the basic block to process. - insn (smda.common.SmdaInstruction): the instruction to process. - - yields: - (Feature, int): the feature and the address at which its found. - """ - yield NotImplementedError("feature"), NotImplementedError("virtual address") - - def extract_insn_api_features(f, bb, insn): """parse API features from the given instruction.""" if insn.offset in f.apirefs: From 75defc13a0d711138afe8ca025d8c190ddd66835 Mon Sep 17 00:00:00 2001 From: Moritz Raabe Date: Mon, 9 Nov 2020 13:22:23 +0100 Subject: [PATCH 13/15] disable fail-fast for tests job --- .github/workflows/tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 986c41e0..8eeb069a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -45,6 +45,7 @@ jobs: runs-on: ubuntu-latest needs: [code_style, rule_linter] strategy: + fail-fast: false matrix: include: - python: 2.7 From dfc805b89bc2bff81ddb2912e8a1ac91c801f23a Mon Sep 17 00:00:00 2001 From: Moritz Raabe Date: Mon, 9 Nov 2020 13:39:19 +0100 Subject: [PATCH 14/15] improvements for PR #355 --- capa/main.py | 6 ++---- tests/test_smda_features.py | 6 ++++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/capa/main.py b/capa/main.py index 2be2802d..5b6e1a54 100644 --- a/capa/main.py +++ b/capa/main.py @@ -458,12 +458,10 @@ def main(argv=None): parser = argparse.ArgumentParser( description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter ) - # TODO: decode won't work for python3 + if sys.version_info >= (3, 0): parser.add_argument( - # in #328 we noticed that the sample path is not handled correctly if it contains non-ASCII characters - # https://stackoverflow.com/a/22947334/ offers a solution and decoding using getfilesystemencoding works - # in our testing, however other sources suggest `sys.stdin.encoding` (https://stackoverflow.com/q/4012571/) + # Python 3 str handles non-ASCII arguments correctly "sample", type=str, help="path to sample to analyze", diff --git a/tests/test_smda_features.py b/tests/test_smda_features.py index ef586278..68790f8d 100644 --- a/tests/test_smda_features.py +++ b/tests/test_smda_features.py @@ -16,7 +16,8 @@ from fixtures import * indirect=["sample", "scope"], ) def test_smda_features(sample, scope, feature, expected): - do_test_feature_presence(get_smda_extractor, sample, scope, feature, expected) + with xfail(sys.version_info < (3, 0), reason="SMDA only works on py3"): + do_test_feature_presence(get_smda_extractor, sample, scope, feature, expected) @parametrize( @@ -25,4 +26,5 @@ def test_smda_features(sample, scope, feature, expected): indirect=["sample", "scope"], ) def test_smda_feature_counts(sample, scope, feature, expected): - do_test_feature_count(get_smda_extractor, sample, scope, feature, expected) + with xfail(sys.version_info < (3, 0), reason="SMDA only works on py3"): + do_test_feature_count(get_smda_extractor, sample, scope, feature, expected) From f7492c7dc7be9fe526fb5c8d0dffc87a8be6af9c Mon Sep 17 00:00:00 2001 From: "Daniel Plohmann (jupiter)" Date: Mon, 9 Nov 2020 16:20:08 +0100 Subject: [PATCH 15/15] throw UnsupportedRuntimeError if SmdaFeatureExtractor is used with a Python version < 3.0 --- capa/features/extractors/smda/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/capa/features/extractors/smda/__init__.py b/capa/features/extractors/smda/__init__.py index 43b6b508..b7ea3ec1 100644 --- a/capa/features/extractors/smda/__init__.py +++ b/capa/features/extractors/smda/__init__.py @@ -8,12 +8,15 @@ import capa.features.extractors.smda.file import capa.features.extractors.smda.insn import capa.features.extractors.smda.function import capa.features.extractors.smda.basicblock +from capa.main import UnsupportedRuntimeError from capa.features.extractors import FeatureExtractor class SmdaFeatureExtractor(FeatureExtractor): def __init__(self, smda_report: SmdaReport, path): super(SmdaFeatureExtractor, self).__init__() + if sys.version_info < (3, 0): + raise UnsupportedRuntimeError("SMDA should only be used with Python 3.") self.smda_report = smda_report self.path = path