diff --git a/capa/features/__init__.py b/capa/features/__init__.py index 04ddc1b7..d4ee5ed5 100644 --- a/capa/features/__init__.py +++ b/capa/features/__init__.py @@ -32,13 +32,13 @@ class Feature(object): # Used to overwrite the rendering of the feature args in `__str__` and the # json output def get_args_str(self): - return ','.join(self.args) + return ",".join(self.args) def __str__(self): if self.description: - return '%s(%s = %s)' % (self.name, self.get_args_str(), self.description) + return "%s(%s = %s)" % (self.name, self.get_args_str(), self.description) else: - return '%s(%s)' % (self.name, self.get_args_str()) + return "%s(%s)" % (self.name, self.get_args_str()) def __repr__(self): return str(self) @@ -60,7 +60,7 @@ class Feature(object): class MatchedRule(Feature): def __init__(self, rule_name, description=None): super(MatchedRule, self).__init__([rule_name], description) - self.name = 'match' + self.name = "match" self.rule_name = rule_name diff --git a/capa/features/extractors/ida/basicblock.py b/capa/features/extractors/ida/basicblock.py index 82d77f90..11c2f626 100644 --- a/capa/features/extractors/ida/basicblock.py +++ b/capa/features/extractors/ida/basicblock.py @@ -103,7 +103,7 @@ def extract_bb_stackstring(f, bb): bb (IDA BasicBlock) """ if _ida_bb_contains_stackstring(f, bb): - yield Characteristic('stack string'), bb.start_ea + yield Characteristic("stack string"), bb.start_ea def _ida_bb_contains_tight_loop(f, bb): @@ -133,7 +133,7 @@ def extract_bb_tight_loop(f, bb): bb (IDA BasicBlock) """ if _ida_bb_contains_tight_loop(f, bb): - yield Characteristic('tight loop'), bb.start_ea + yield Characteristic("tight loop"), bb.start_ea def extract_features(f, bb): diff --git a/capa/features/extractors/ida/file.py b/capa/features/extractors/ida/file.py index 15564e04..2b15d6ed 100644 --- a/capa/features/extractors/ida/file.py +++ b/capa/features/extractors/ida/file.py @@ -75,7 +75,7 @@ def extract_file_embedded_pe(): continue for ea, _ in _ida_check_segment_for_pe(seg): - yield Characteristic('embedded pe'), ea + yield Characteristic("embedded pe"), ea def extract_file_export_names(): diff --git a/capa/features/extractors/ida/function.py b/capa/features/extractors/ida/function.py index 06dfefde..564a8440 100644 --- a/capa/features/extractors/ida/function.py +++ b/capa/features/extractors/ida/function.py @@ -29,7 +29,7 @@ def extract_function_switch(f): f (IDA func_t) """ if _ida_function_contains_switch(f): - yield Characteristic('switch'), f.start_ea + yield Characteristic("switch"), f.start_ea def extract_function_calls_to(f): @@ -39,7 +39,7 @@ def extract_function_calls_to(f): f (IDA func_t) """ for ea in idautils.CodeRefsTo(f.start_ea, True): - yield Characteristic('calls to'), ea + yield Characteristic("calls to"), ea def extract_function_loop(f): @@ -53,7 +53,7 @@ def extract_function_loop(f): map(lambda s: edges.append((bb.start_ea, s.start_ea)), bb.succs()) if edges and loops.has_loop(edges): - yield Characteristic('loop'), f.start_ea + yield Characteristic("loop"), f.start_ea def extract_recursive_call(f): @@ -64,7 +64,7 @@ def extract_recursive_call(f): """ for ref in idautils.CodeRefsTo(f.start_ea, True): if f.contains(ref): - yield Characteristic('recursive call'), f.start_ea + yield Characteristic("recursive call"), f.start_ea break diff --git a/capa/features/extractors/ida/insn.py b/capa/features/extractors/ida/insn.py index ed2f7548..f40cbed3 100644 --- a/capa/features/extractors/ida/insn.py +++ b/capa/features/extractors/ida/insn.py @@ -259,7 +259,7 @@ def extract_insn_nzxor_characteristic_features(f, bb, insn): if _is_nzxor_stack_cookie(f, bb, insn): return - yield Characteristic('nzxor'), insn.ea + yield Characteristic("nzxor"), insn.ea def extract_insn_mnemonic_features(f, bb, insn): @@ -292,7 +292,7 @@ def extract_insn_peb_access_characteristic_features(f, bb, insn): if " fs:30h" in disasm or " gs:60h" in disasm: # TODO: replace above with proper IDA - yield Characteristic('peb access'), insn.ea + yield Characteristic("peb access"), insn.ea def extract_insn_segment_access_features(f, bb, insn): @@ -309,11 +309,11 @@ def extract_insn_segment_access_features(f, bb, insn): if " fs:" in disasm: # TODO: replace above with proper IDA - yield Characteristic('fs access'), insn.ea + yield Characteristic("fs access"), insn.ea if " gs:" in disasm: # TODO: replace above with proper IDA - yield Characteristic('gs access'), insn.ea + yield Characteristic("gs access"), insn.ea def extract_insn_cross_section_cflow(f, bb, insn): @@ -336,7 +336,7 @@ def extract_insn_cross_section_cflow(f, bb, insn): if idaapi.getseg(ref) == idaapi.getseg(insn.ea): continue - yield Characteristic('cross section flow'), insn.ea + yield Characteristic("cross section flow"), insn.ea def extract_function_calls_from(f, bb, insn): @@ -354,7 +354,7 @@ def extract_function_calls_from(f, bb, insn): return for ref in idautils.CodeRefsFrom(insn.ea, False): - yield Characteristic('calls from'), ref + yield Characteristic("calls from"), ref def extract_function_indirect_call_characteristic_features(f, bb, insn): @@ -373,7 +373,7 @@ def extract_function_indirect_call_characteristic_features(f, bb, insn): return if idc.get_operand_type(insn.ea, 0) in (idc.o_reg, idc.o_phrase, idc.o_displ): - yield Characteristic('indirect call'), insn.ea + yield Characteristic("indirect call"), insn.ea def extract_features(f, bb, insn): diff --git a/capa/features/extractors/viv/basicblock.py b/capa/features/extractors/viv/basicblock.py index 2501fa9f..8d9a5232 100644 --- a/capa/features/extractors/viv/basicblock.py +++ b/capa/features/extractors/viv/basicblock.py @@ -39,7 +39,7 @@ def _bb_has_tight_loop(f, bb): def extract_bb_tight_loop(f, bb): """ check basic block for tight loop indicators """ if _bb_has_tight_loop(f, bb): - yield Characteristic('tight loop'), bb.va + yield Characteristic("tight loop"), bb.va def _bb_has_stackstring(f, bb): @@ -62,7 +62,7 @@ def _bb_has_stackstring(f, bb): def extract_stackstring(f, bb): """ check basic block for stackstring indicators """ if _bb_has_stackstring(f, bb): - yield Characteristic('stack string'), bb.va + yield Characteristic("stack string"), bb.va def is_mov_imm_to_stack(instr): diff --git a/capa/features/extractors/viv/file.py b/capa/features/extractors/viv/file.py index 545bf613..c88f328b 100644 --- a/capa/features/extractors/viv/file.py +++ b/capa/features/extractors/viv/file.py @@ -13,7 +13,7 @@ def extract_file_embedded_pe(vw, file_path): fbytes = f.read() for offset, i in pe_carve.carve(fbytes, 1): - yield Characteristic('embedded pe'), offset + yield Characteristic("embedded pe"), offset def extract_file_export_names(vw, file_path): diff --git a/capa/features/extractors/viv/function.py b/capa/features/extractors/viv/function.py index ed37516d..723a9b39 100644 --- a/capa/features/extractors/viv/function.py +++ b/capa/features/extractors/viv/function.py @@ -53,12 +53,12 @@ def extract_function_switch(f): method can be optimized """ if f.va in get_functions_with_switch(f.vw): - yield Characteristic('switch'), f.va + yield Characteristic("switch"), f.va def extract_function_calls_to(f): for src, _, _, _ in f.vw.getXrefsTo(f.va, rtype=vivisect.const.REF_CODE): - yield Characteristic('calls to'), src + yield Characteristic("calls to"), src def extract_function_loop(f): @@ -74,7 +74,7 @@ def extract_function_loop(f): edges.append((bb.va, bva)) if edges and loops.has_loop(edges): - yield Characteristic('loop'), f.va + yield Characteristic("loop"), f.va def extract_features(f): diff --git a/capa/features/extractors/viv/insn.py b/capa/features/extractors/viv/insn.py index 257240a8..fcfbb068 100644 --- a/capa/features/extractors/viv/insn.py +++ b/capa/features/extractors/viv/insn.py @@ -287,7 +287,7 @@ def extract_insn_nzxor_characteristic_features(f, bb, insn): if is_security_cookie(f, bb, insn): return - yield Characteristic('nzxor'), insn.va + yield Characteristic("nzxor"), insn.va def extract_insn_mnemonic_features(f, bb, insn): @@ -313,14 +313,16 @@ def extract_insn_peb_access_characteristic_features(f, bb, insn): # IDA: push large dword ptr fs:30h # viv: fs: push dword [0x00000030] # fs: push dword [eax + 0x30] ; i386RegMemOper, with eax = 0 - if (isinstance(oper, envi.archs.i386.disasm.i386RegMemOper) and oper.disp == 0x30) or \ - (isinstance(oper, envi.archs.i386.disasm.i386ImmMemOper) and oper.imm == 0x30): - yield Characteristic('peb access'), insn.va - elif 'gs' in insn.getPrefixName(): + if (isinstance(oper, envi.archs.i386.disasm.i386RegMemOper) and oper.disp == 0x30) or ( + isinstance(oper, envi.archs.i386.disasm.i386ImmMemOper) and oper.imm == 0x30 + ): + yield Characteristic("peb access"), insn.va + elif "gs" in insn.getPrefixName(): for oper in insn.opers: - if (isinstance(oper, envi.archs.amd64.disasm.i386RegMemOper) and oper.disp == 0x60) or \ - (isinstance(oper, envi.archs.amd64.disasm.i386ImmMemOper) and oper.imm == 0x60): - yield Characteristic('peb access'), insn.va + if (isinstance(oper, envi.archs.amd64.disasm.i386RegMemOper) and oper.disp == 0x60) or ( + isinstance(oper, envi.archs.amd64.disasm.i386ImmMemOper) and oper.imm == 0x60 + ): + yield Characteristic("peb access"), insn.va else: pass @@ -329,11 +331,11 @@ def extract_insn_segment_access_features(f, bb, insn): """ parse the instruction for access to fs or gs """ prefix = insn.getPrefixName() - if prefix == 'fs': - yield Characteristic('fs access'), insn.va + if prefix == "fs": + yield Characteristic("fs access"), insn.va - if prefix == 'gs': - yield Characteristic('gs access'), insn.va + if prefix == "gs": + yield Characteristic("gs access"), insn.va def get_section(vw, va): @@ -370,7 +372,7 @@ def extract_insn_cross_section_cflow(f, bb, insn): continue if get_section(f.vw, insn.va) != get_section(f.vw, va): - yield Characteristic('cross section flow'), insn.va + yield Characteristic("cross section flow"), insn.va except KeyError: continue @@ -388,7 +390,7 @@ def extract_function_calls_from(f, bb, insn): if isinstance(insn.opers[0], envi.archs.i386.disasm.i386ImmMemOper): oper = insn.opers[0] target = oper.getOperAddr(insn) - yield Characteristic('calls from'), target + yield Characteristic("calls from"), target # call via thunk on x86, # see 9324d1a8ae37a36ae560c37448c9705a at 0x407985 @@ -397,18 +399,18 @@ def extract_function_calls_from(f, bb, insn): # see Lab21-01.exe_:0x140001178 elif isinstance(insn.opers[0], envi.archs.i386.disasm.i386PcRelOper): target = insn.opers[0].getOperValue(insn) - yield Characteristic('calls from'), target + yield Characteristic("calls from"), target # call via IAT, x64 elif isinstance(insn.opers[0], envi.archs.amd64.disasm.Amd64RipRelOper): op = insn.opers[0] target = op.getOperAddr(insn) - yield Characteristic('calls from'), target + yield Characteristic("calls from"), target if target and target == f.va: # if we found a jump target and it's the function address # mark as recursive - yield Characteristic('recursive call'), target + yield Characteristic("recursive call"), target # this is a feature that's most relevant at the function or basic block scope, @@ -424,13 +426,13 @@ def extract_function_indirect_call_characteristic_features(f, bb, insn): # Checks below work for x86 and x64 if isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegOper): # call edx - yield Characteristic('indirect call'), insn.va + yield Characteristic("indirect call"), insn.va elif isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegMemOper): # call dword ptr [eax+50h] - yield Characteristic('indirect call'), insn.va + yield Characteristic("indirect call"), insn.va elif isinstance(insn.opers[0], envi.archs.i386.disasm.i386SibOper): # call qword ptr [rsp+78h] - yield Characteristic('indirect call'), insn.va + yield Characteristic("indirect call"), insn.va def extract_features(f, bb, insn): diff --git a/capa/features/insn.py b/capa/features/insn.py index 937211f8..5b6b16f2 100644 --- a/capa/features/insn.py +++ b/capa/features/insn.py @@ -17,7 +17,7 @@ class Number(Feature): self.value = value def get_args_str(self): - return '0x%X' % self.value + return "0x%X" % self.value class Offset(Feature): @@ -26,7 +26,7 @@ class Offset(Feature): self.value = value def get_args_str(self): - return '0x%X' % self.value + return "0x%X" % self.value class Mnemonic(Feature): diff --git a/capa/ida/explorer/item.py b/capa/ida/explorer/item.py index 8bc9dca0..56862eee 100644 --- a/capa/ida/explorer/item.py +++ b/capa/ida/explorer/item.py @@ -192,10 +192,10 @@ class CapaExplorerFunctionItem(CapaExplorerDataItem): class CapaExplorerSubscopeItem(CapaExplorerDataItem): - fmt = 'subscope(%s)' + fmt = "subscope(%s)" def __init__(self, parent, scope): - super(CapaExplorerSubscopeItem, self).__init__(parent, [self.fmt % scope, '', '']) + super(CapaExplorerSubscopeItem, self).__init__(parent, [self.fmt % scope, "", ""]) class CapaExplorerBlockItem(CapaExplorerDataItem): diff --git a/capa/ida/explorer/model.py b/capa/ida/explorer/model.py index f06d95fa..aa838a56 100644 --- a/capa/ida/explorer/model.py +++ b/capa/ida/explorer/model.py @@ -17,7 +17,7 @@ from capa.ida.explorer.item import ( CapaExplorerBlockItem, CapaExplorerRuleMatchItem, CapaExplorerFeatureItem, - CapaExplorerSubscopeItem + CapaExplorerSubscopeItem, ) import capa.ida.helpers @@ -109,10 +109,21 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): font.setBold(True) return font - if role == QtCore.Qt.FontRole and isinstance(item, (CapaExplorerRuleItem, CapaExplorerRuleMatchItem, - CapaExplorerBlockItem, CapaExplorerFunctionItem, - CapaExplorerFeatureItem, CapaExplorerSubscopeItem)) and \ - column == CapaExplorerDataModel.COLUMN_INDEX_RULE_INFORMATION: + if ( + role == QtCore.Qt.FontRole + and isinstance( + item, + ( + CapaExplorerRuleItem, + CapaExplorerRuleMatchItem, + CapaExplorerBlockItem, + CapaExplorerFunctionItem, + CapaExplorerFeatureItem, + CapaExplorerSubscopeItem, + ), + ) + and column == CapaExplorerDataModel.COLUMN_INDEX_RULE_INFORMATION + ): # set bold font for top-level rules font = QtGui.QFont() font.setBold(True) @@ -352,12 +363,12 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): for location in locations: # for each location render child node for range statement - self.render_capa_doc_feature(parent2, statement['child'], location, doc) + self.render_capa_doc_feature(parent2, statement["child"], location, doc) return parent2 - elif statement['type'] == 'subscope': - return CapaExplorerSubscopeItem(parent, statement['subscope']) - elif statement['type'] == 'regex': + elif statement["type"] == "subscope": + return CapaExplorerSubscopeItem(parent, statement["subscope"]) + elif statement["type"] == "regex": # regex is a `Statement` not a `Feature` # this is because it doesn't get extracted, but applies to all strings in scope. # so we have to handle it here @@ -399,11 +410,12 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): ): return - if match['node']['type'] == 'statement': - parent2 = self.render_capa_doc_statement_node(parent, match['node']['statement'], - match.get('locations', []), doc) - elif match['node']['type'] == 'feature': - parent2 = self.render_capa_doc_feature_node(parent, match['node']['feature'], match['locations'], doc) + if match["node"]["type"] == "statement": + parent2 = self.render_capa_doc_statement_node( + parent, match["node"]["statement"], match.get("locations", []), doc + ) + elif match["node"]["type"] == "feature": + parent2 = self.render_capa_doc_feature_node(parent, match["node"]["feature"], match["locations"], doc) else: raise RuntimeError("unexpected node type: " + str(match["node"]["type"])) diff --git a/capa/render/__init__.py b/capa/render/__init__.py index b9880fb8..5132757e 100644 --- a/capa/render/__init__.py +++ b/capa/render/__init__.py @@ -84,9 +84,9 @@ def convert_feature_to_result_document(feature): "type": "characteristic" }, """ - result = {'type': feature.name, feature.name: feature.get_args_str()} + result = {"type": feature.name, feature.name: feature.get_args_str()} if feature.description: - result['description'] = feature.description + result["description"] = feature.description return result @@ -133,10 +133,10 @@ def convert_match_to_result_document(rules, capabilities, result): # so only add `locations` to feature nodes. if isinstance(result.statement, capa.features.Feature): if bool(result.success): - doc['locations'] = result.locations + doc["locations"] = result.locations elif isinstance(result.statement, capa.rules.Range): if bool(result.success): - doc['locations'] = result.locations + doc["locations"] = result.locations # if we have a `match` statement, then we're referencing another rule. # this could an external rule (written by a human), or diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index 9b02fbe5..c0edc234 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -8,27 +8,27 @@ def render_locations(ostream, match): # its possible to have an empty locations array here, # such as when we're in MODE_FAILURE and showing the logic # under a `not` statement (which will have no matched locations). - locations = list(sorted(match.get('locations', []))) + locations = list(sorted(match.get("locations", []))) if len(locations) == 1: - ostream.write(' @ ') + ostream.write(" @ ") ostream.write(rutils.hex(locations[0])) elif len(locations) > 1: - ostream.write(' @ ') + ostream.write(" @ ") if len(locations) > 4: # don't display too many locations, because it becomes very noisy. # probably only the first handful of locations will be useful for inspection. - ostream.write(', '.join(map(rutils.hex, locations[0:4]))) - ostream.write(', and %d more...' % (len(locations) - 4)) + ostream.write(", ".join(map(rutils.hex, locations[0:4]))) + ostream.write(", and %d more..." % (len(locations) - 4)) else: - ostream.write(', '.join(map(rutils.hex, locations))) + ostream.write(", ".join(map(rutils.hex, locations))) def render_statement(ostream, match, statement, indent=0): - ostream.write(' ' * indent) - if statement['type'] in ('and', 'or', 'optional'): - ostream.write(statement['type']) - ostream.writeln(':') - elif statement['type'] == 'not': + ostream.write(" " * indent) + if statement["type"] in ("and", "or", "optional"): + ostream.write(statement["type"]) + ostream.writeln(":") + elif statement["type"] == "not": # this statement is handled specially in `render_match` using the MODE_SUCCESS/MODE_FAILURE flags. ostream.writeln("not:") elif statement["type"] == "some": @@ -40,36 +40,46 @@ def render_statement(ostream, match, statement, indent=0): # there's no additional logic in the feature part, just the existence of a feature. # so, we have to inline some of the feature rendering here. - child = statement['child'] - if child['type'] in ('string', 'api', 'mnemonic', 'basic block', 'export', 'import', 'section', 'match', 'characteristic'): - value = rutils.bold2(child[child['type']]) - elif child['type'] in ('number', 'offset'): - value = rutils.bold2(rutils.hex(child[child['type']])) - elif child['type'] == 'bytes': - value = rutils.bold2(rutils.hex_string(child[child['type']])) + child = statement["child"] + if child["type"] in ( + "string", + "api", + "mnemonic", + "basic block", + "export", + "import", + "section", + "match", + "characteristic", + ): + value = rutils.bold2(child[child["type"]]) + elif child["type"] in ("number", "offset"): + value = rutils.bold2(rutils.hex(child[child["type"]])) + elif child["type"] == "bytes": + value = rutils.bold2(rutils.hex_string(child[child["type"]])) else: raise RuntimeError("unexpected feature type: " + str(child)) - if child['description']: - ostream.write('count(%s(%s = %s)): ' % (child['type'], value, child['description'])) + if child["description"]: + ostream.write("count(%s(%s = %s)): " % (child["type"], value, child["description"])) else: - ostream.write('count(%s(%s)): ' % (child['type'], value)) + ostream.write("count(%s(%s)): " % (child["type"], value)) - if statement['max'] == statement['min']: - ostream.write('%d' % (statement['min'])) - elif statement['min'] == 0: - ostream.write('%d or fewer' % (statement['max'])) - elif statement['max'] == (1 << 64 - 1): - ostream.write('%d or more' % (statement['min'])) + if statement["max"] == statement["min"]: + ostream.write("%d" % (statement["min"])) + elif statement["min"] == 0: + ostream.write("%d or fewer" % (statement["max"])) + elif statement["max"] == (1 << 64 - 1): + ostream.write("%d or more" % (statement["min"])) else: - ostream.write('between %d and %d' % (statement['min'], statement['max'])) + ostream.write("between %d and %d" % (statement["min"], statement["max"])) render_locations(ostream, match) - ostream.write('\n') - elif statement['type'] == 'subscope': - ostream.write(statement['subscope']) - ostream.writeln(':') - elif statement['type'] == 'regex': + ostream.write("\n") + elif statement["type"] == "subscope": + ostream.write(statement["subscope"]) + ostream.writeln(":") + elif statement["type"] == "regex": # regex is a `Statement` not a `Feature` # this is because it doesn't get extracted, but applies to all strings in scope. # so we have to handle it here @@ -79,38 +89,48 @@ def render_statement(ostream, match, statement, indent=0): def render_feature(ostream, match, feature, indent=0): - ostream.write(' ' * indent) + ostream.write(" " * indent) - if feature['type'] in ('string', 'api', 'mnemonic', 'basic block', 'export', 'import', 'section', 'match', 'characteristic'): - ostream.write(feature['type']) - ostream.write(': ') - ostream.write(rutils.bold2(feature[feature['type']])) - elif feature['type'] in ('number', 'offset'): - ostream.write(feature['type']) - ostream.write(': ') - ostream.write(rutils.bold2(rutils.hex(feature[feature['type']]))) - elif feature['type'] == 'bytes': - ostream.write('bytes: ') + if feature["type"] in ( + "string", + "api", + "mnemonic", + "basic block", + "export", + "import", + "section", + "match", + "characteristic", + ): + ostream.write(feature["type"]) + ostream.write(": ") + ostream.write(rutils.bold2(feature[feature["type"]])) + elif feature["type"] in ("number", "offset"): + ostream.write(feature["type"]) + ostream.write(": ") + ostream.write(rutils.bold2(rutils.hex(feature[feature["type"]]))) + elif feature["type"] == "bytes": + ostream.write("bytes: ") # bytes is the uppercase, hex-encoded string. # it should always be an even number of characters (its hex). - ostream.write(rutils.bold2(rutils.hex_string(feature[feature['type']]))) + ostream.write(rutils.bold2(rutils.hex_string(feature[feature["type"]]))) # note that regex is found in `render_statement` else: raise RuntimeError("unexpected feature type: " + str(feature)) - if 'description' in feature: - ostream.write(' = ') - ostream.write(feature['description']) + if "description" in feature: + ostream.write(" = ") + ostream.write(feature["description"]) render_locations(ostream, match) - ostream.write('\n') + ostream.write("\n") def render_node(ostream, match, node, indent=0): - if node['type'] == 'statement': - render_statement(ostream, match, node['statement'], indent=indent) - elif node['type'] == 'feature': - render_feature(ostream, match, node['feature'], indent=indent) + if node["type"] == "statement": + render_statement(ostream, match, node["statement"], indent=indent) + elif node["type"] == "feature": + render_feature(ostream, match, node["feature"], indent=indent) else: raise RuntimeError("unexpected node type: " + str(node)) diff --git a/capa/rules.py b/capa/rules.py index 5f3da602..1dbf6d27 100644 --- a/capa/rules.py +++ b/capa/rules.py @@ -139,7 +139,7 @@ class InvalidRuleSet(ValueError): def ensure_feature_valid_for_scope(scope, feature): if isinstance(feature, capa.features.Characteristic): if capa.features.Characteristic(feature.value) not in SUPPORTED_FEATURES[scope]: - raise InvalidRule('feature %s not support for scope %s' % (feature, scope)) + raise InvalidRule("feature %s not support for scope %s" % (feature, scope)) elif not isinstance(feature, tuple(filter(lambda t: isinstance(t, type), SUPPORTED_FEATURES[scope]))): raise InvalidRule("feature %s not support for scope %s" % (feature, scope)) @@ -205,9 +205,9 @@ def parse_feature(key): return capa.features.insn.Mnemonic elif key == "basic blocks": return capa.features.basicblock.BasicBlock - elif key == 'characteristic': + elif key == "characteristic": return capa.features.Characteristic - elif key == 'export': + elif key == "export": return capa.features.file.Export elif key == "import": return capa.features.file.Import @@ -220,14 +220,16 @@ def parse_feature(key): def parse_description(s, value_type, description=None): - ''' + """ s can be an int or a string - ''' - if value_type != 'string' and isinstance(s, str) and ' = ' in s: + """ + if value_type != "string" and isinstance(s, str) and " = " in s: if description: - raise InvalidRule('unexpected value: "%s", only one description allowed (inline description with ` = `)' % s) - value, description = s.split(' = ', 1) - if description == '': + raise InvalidRule( + 'unexpected value: "%s", only one description allowed (inline description with ` = `)' % s + ) + value, description = s.split(" = ", 1) + if description == "": raise InvalidRule('unexpected value: "%s", description cannot be empty' % s) else: value = s @@ -241,9 +243,10 @@ def parse_description(s, value_type, description=None): raise InvalidRule('unexpected bytes value: "%s", must be a valid hex sequence' % value) if len(value) > MAX_BYTES_FEATURE_SIZE: - raise InvalidRule('unexpected bytes value: byte sequences must be no larger than %s bytes' % - MAX_BYTES_FEATURE_SIZE) - elif value_type in {'number', 'offset'}: + raise InvalidRule( + "unexpected bytes value: byte sequences must be no larger than %s bytes" % MAX_BYTES_FEATURE_SIZE + ) + elif value_type in {"number", "offset"}: try: value = parse_int(value) except ValueError: @@ -254,7 +257,7 @@ def parse_description(s, value_type, description=None): def build_statements(d, scope): if len(d.keys()) > 2: - raise InvalidRule('too many statements') + raise InvalidRule("too many statements") key = list(d.keys())[0] if key == "and": @@ -305,18 +308,18 @@ def build_statements(d, scope): # - mnemonic: mov # # but here we deal with the form: `mnemonic(mov)`. - term, _, arg = term.partition('(') + term, _, arg = term.partition("(") Feature = parse_feature(term) if arg: - arg = arg[:-len(')')] + arg = arg[: -len(")")] # can't rely on yaml parsing ints embedded within strings # like: # # count(offset(0xC)) # count(number(0x11223344)) # count(number(0x100 = description)) - if term != 'string': + if term != "string": value, description = parse_description(arg, term) feature = Feature(value, description) else: @@ -356,7 +359,7 @@ def build_statements(d, scope): ) else: Feature = parse_feature(key) - value, description = parse_description(d[key], key, d.get('description')) + value, description = parse_description(d[key], key, d.get("description")) feature = Feature(value, description) ensure_feature_valid_for_scope(scope, feature) return feature diff --git a/ci/hooks/hook-vivisect.py b/ci/hooks/hook-vivisect.py index 3714cfb0..b24962be 100644 --- a/ci/hooks/hook-vivisect.py +++ b/ci/hooks/hook-vivisect.py @@ -10,4 +10,4 @@ from PyInstaller.utils.hooks import copy_metadata # this hook runs at analysis time and updates the embedded metadata. # # ref: https://github.com/pyinstaller/pyinstaller/issues/1713#issuecomment-162682084 -datas = copy_metadata('vivisect') +datas = copy_metadata("vivisect") diff --git a/scripts/lint.py b/scripts/lint.py index e79627c6..a42b72c8 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -324,7 +324,7 @@ def get_features(ctx, rule): # get features from rule and all dependencies including subscopes and matched rules features = [] namespaces = capa.rules.index_rules_by_namespace([rule]) - deps = [ctx['rules'].rules[dep] for dep in rule.get_dependencies(namespaces)] + deps = [ctx["rules"].rules[dep] for dep in rule.get_dependencies(namespaces)] for r in [rule] + deps: features.extend(get_rule_features(r)) return features diff --git a/scripts/migrate-rules.py b/scripts/migrate-rules.py index 1697c41a..21e3def4 100644 --- a/scripts/migrate-rules.py +++ b/scripts/migrate-rules.py @@ -1,11 +1,11 @@ #!/usr/bin/env python -''' +""" migrate rules and their namespaces. example: $ python scripts/migrate-rules.py migration.csv ./rules ./new-rules -''' +""" import os import os.path import sys @@ -18,22 +18,27 @@ import argparse import capa.rules -logger = logging.getLogger('migrate-rules') - +logger = logging.getLogger("migrate-rules") def read_plan(plan_path): - with open(plan_path, 'rb') as f: - return list(csv.DictReader(f, restkey='other', fieldnames=( - 'existing path', - 'existing name', - 'existing rule-category', - 'proposed name', - 'proposed namespace', - 'ATT&CK', - 'MBC', - 'comment1', - ))) + with open(plan_path, "rb") as f: + return list( + csv.DictReader( + f, + restkey="other", + fieldnames=( + "existing path", + "existing name", + "existing rule-category", + "proposed name", + "proposed namespace", + "ATT&CK", + "MBC", + "comment1", + ), + ) + ) def read_rules(rule_directory): @@ -41,15 +46,15 @@ def read_rules(rule_directory): for root, dirs, files in os.walk(rule_directory): for file in files: path = os.path.join(root, file) - if not path.endswith('.yml'): - logger.info('skipping file: %s', path) + if not path.endswith(".yml"): + logger.info("skipping file: %s", path) continue rule = capa.rules.Rule.from_yaml_file(path) rules[rule.name] = rule - if 'nursery' in path: - rule.meta['capa/nursery'] = True + if "nursery" in path: + rule.meta["capa/nursery"] = True return rules @@ -57,105 +62,100 @@ def main(argv=None): if argv is None: argv = sys.argv[1:] - parser = argparse.ArgumentParser(description='migrate rules.') - parser.add_argument('plan', type=str, - help='Path to CSV describing migration') - parser.add_argument('source', type=str, - help='Source directory of rules') - parser.add_argument('destination', type=str, - help='Destination directory of rules') + parser = argparse.ArgumentParser(description="migrate rules.") + parser.add_argument("plan", type=str, help="Path to CSV describing migration") + parser.add_argument("source", type=str, help="Source directory of rules") + parser.add_argument("destination", type=str, help="Destination directory of rules") args = parser.parse_args(args=argv) logging.basicConfig(level=logging.INFO) logging.getLogger().setLevel(logging.INFO) plan = read_plan(args.plan) - logger.info('read %d plan entries', len(plan)) + logger.info("read %d plan entries", len(plan)) rules = read_rules(args.source) - logger.info('read %d rules', len(rules)) + logger.info("read %d rules", len(rules)) - planned_rules = set([row['existing name'] for row in plan]) + planned_rules = set([row["existing name"] for row in plan]) unplanned_rules = [rule for (name, rule) in rules.items() if name not in planned_rules] if unplanned_rules: - logger.error('plan does not account for %d rules:' % (len(unplanned_rules))) + logger.error("plan does not account for %d rules:" % (len(unplanned_rules))) for rule in unplanned_rules: - logger.error(' ' + rule.name) + logger.error(" " + rule.name) return -1 # pairs of strings (needle, replacement) match_translations = [] for row in plan: - if not row['existing name']: + if not row["existing name"]: continue - rule = rules[row['existing name']] + rule = rules[row["existing name"]] - if rule.meta['name'] != row['proposed name']: - logger.info("renaming rule '%s' -> '%s'", rule.meta['name'], row['proposed name']) + if rule.meta["name"] != row["proposed name"]: + logger.info("renaming rule '%s' -> '%s'", rule.meta["name"], row["proposed name"]) # assume the yaml is formatted like `- match: $rule-name`. # but since its been linted, this should be ok. - match_translations.append( - ('- match: ' + rule.meta['name'], - '- match: ' + row['proposed name'])) + match_translations.append(("- match: " + rule.meta["name"], "- match: " + row["proposed name"])) - rule.meta['name'] = row['proposed name'] - rule.name = row['proposed name'] + rule.meta["name"] = row["proposed name"] + rule.name = row["proposed name"] - if 'rule-category' in rule.meta: - logger.info("deleting rule category '%s'", rule.meta['rule-category']) - del rule.meta['rule-category'] + if "rule-category" in rule.meta: + logger.info("deleting rule category '%s'", rule.meta["rule-category"]) + del rule.meta["rule-category"] - rule.meta['namespace'] = row['proposed namespace'] + rule.meta["namespace"] = row["proposed namespace"] - if row['ATT&CK'] != 'n/a' and row['ATT&CK'] != '': - tag = row['ATT&CK'] - name, _, id = tag.rpartition(' ') - tag = '%s [%s]' % (name, id) - rule.meta['att&ck'] = [tag] + if row["ATT&CK"] != "n/a" and row["ATT&CK"] != "": + tag = row["ATT&CK"] + name, _, id = tag.rpartition(" ") + tag = "%s [%s]" % (name, id) + rule.meta["att&ck"] = [tag] - if row['MBC'] != 'n/a' and row['MBC'] != '': - tag = row['MBC'] - rule.meta['mbc'] = [tag] + if row["MBC"] != "n/a" and row["MBC"] != "": + tag = row["MBC"] + rule.meta["mbc"] = [tag] for rule in rules.values(): filename = rule.name filename = filename.lower() - filename = filename.replace(' ', '-') - filename = filename.replace('(', '') - filename = filename.replace(')', '') - filename = filename.replace('+', '') - filename = filename.replace('/', '') - filename = filename + '.yml' + filename = filename.replace(" ", "-") + filename = filename.replace("(", "") + filename = filename.replace(")", "") + filename = filename.replace("+", "") + filename = filename.replace("/", "") + filename = filename + ".yml" try: - if rule.meta.get('capa/nursery'): - directory = os.path.join(args.destination, 'nursery') - elif rule.meta.get('lib'): - directory = os.path.join(args.destination, 'lib') + if rule.meta.get("capa/nursery"): + directory = os.path.join(args.destination, "nursery") + elif rule.meta.get("lib"): + directory = os.path.join(args.destination, "lib") else: - directory = os.path.join(args.destination, rule.meta.get('namespace')) + directory = os.path.join(args.destination, rule.meta.get("namespace")) os.makedirs(directory) except OSError: pass else: - logger.info('created namespace: %s', directory) + logger.info("created namespace: %s", directory) path = os.path.join(directory, filename) - logger.info('writing rule %s', path) + logger.info("writing rule %s", path) - doc = rule.to_yaml().decode('utf-8') + doc = rule.to_yaml().decode("utf-8") for (needle, replacement) in match_translations: doc = doc.replace(needle, replacement) - with open(path, 'wb') as f: - f.write(doc.encode('utf-8')) + with open(path, "wb") as f: + f.write(doc.encode("utf-8")) return 0 -if __name__ == '__main__': - sys.exit(main()) \ No newline at end of file +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/testbed/__init__.py b/scripts/testbed/__init__.py index 92da17cf..c77448ea 100644 --- a/scripts/testbed/__init__.py +++ b/scripts/testbed/__init__.py @@ -1,2 +1,2 @@ -FNAMES_EXTENSION = '.fnames' -FREEZE_EXTENSION = '.frz' +FNAMES_EXTENSION = ".fnames" +FREEZE_EXTENSION = ".frz" diff --git a/scripts/testbed/_dump_fnames.py b/scripts/testbed/_dump_fnames.py index 7210f2dc..e4856104 100644 --- a/scripts/testbed/_dump_fnames.py +++ b/scripts/testbed/_dump_fnames.py @@ -1,11 +1,11 @@ -''' +""" IDAPython script to dump JSON file of functions names { fva: fname }. Meant to be run on benign files with PDB information. IDA should apply function names from the PDB files automatically. Can also be run on annotated IDA database files. Example usage (via IDA autonomous mode): ida.exe -A -S_dump_fnames.py "" -''' +""" import json @@ -26,7 +26,7 @@ def main(): fnames = {} for f in idautils.Functions(): fname = idc.get_name(f) - if fname.startswith('sub_'): + if fname.startswith("sub_"): continue name_demangled = idc.demangle_name(fname, INF_SHORT_DN_ATTR) @@ -35,12 +35,12 @@ def main(): fnames[f] = fname - with open(idc.ARGV[1], 'w') as f: + with open(idc.ARGV[1], "w") as f: json.dump(fnames, f) # exit IDA idc.qexit(0) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/scripts/testbed/_export_fimages.py b/scripts/testbed/_export_fimages.py index 8c4a7d4e..4b9131c1 100644 --- a/scripts/testbed/_export_fimages.py +++ b/scripts/testbed/_export_fimages.py @@ -1,9 +1,9 @@ -''' +""" IDAPython script to export DOT files of function graphs. Example usage (via IDA autonomous mode): ida.exe -A -S_export_fimages.py "" [ ...] -''' +""" import os @@ -26,13 +26,19 @@ def main(): for fva in fvas: fstart = idc.get_func_attr(fva, idc.FUNCATTR_START) - name = '%s_0x%x' % (idb_name.replace('.', '_'), fstart) + name = "%s_0x%x" % (idb_name.replace(".", "_"), fstart) out_path = os.path.join(out_dir, name) fname = idc.get_name(fstart) - if not ida_gdl.gen_flow_graph(out_path, '%s (0x%x)' % (fname, fstart), idaapi.get_func(fstart), 0, 0, - ida_gdl.CHART_GEN_DOT | ida_gdl.CHART_PRINT_NAMES): - print 'IDA error generating flow graph' + if not ida_gdl.gen_flow_graph( + out_path, + "%s (0x%x)" % (fname, fstart), + idaapi.get_func(fstart), + 0, + 0, + ida_gdl.CHART_GEN_DOT | ida_gdl.CHART_PRINT_NAMES, + ): + print "IDA error generating flow graph" # TODO add label to DOT file, see https://stackoverflow.com/a/6452088/10548020 # TODO highlight where rule matched @@ -40,5 +46,5 @@ def main(): idc.qexit(0) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/scripts/testbed/freeze_features.py b/scripts/testbed/freeze_features.py index 8c160559..3d913f03 100644 --- a/scripts/testbed/freeze_features.py +++ b/scripts/testbed/freeze_features.py @@ -1,10 +1,10 @@ -''' +""" Freeze capa features. Example usage: freeze_features.py freeze_features.py samples\benign -''' +""" import os import sys @@ -18,63 +18,54 @@ from capa.features.freeze import main as freeze_features # only process files with these extensions -TARGET_EXTENSIONS = [ - '.mal_', - '.exe_', - '.dll_', - '.sys_' -] +TARGET_EXTENSIONS = [".mal_", ".exe_", ".dll_", ".sys_"] -logger = logging.getLogger('check_rule') +logger = logging.getLogger("check_rule") def freeze(input_path, reprocess): if not os.path.exists(input_path): - raise IOError('%s does not exist or cannot be accessed' % input_path) + raise IOError("%s does not exist or cannot be accessed" % input_path) if os.path.isfile(input_path): - outfile = '%s%s' % (input_path, FREEZE_EXTENSION) + outfile = "%s%s" % (input_path, FREEZE_EXTENSION) freeze_file(input_path, outfile, reprocess) elif os.path.isdir(input_path): - logger.info('freezing features of %s files in %s', '|'.join(TARGET_EXTENSIONS), input_path) + logger.info("freezing features of %s files in %s", "|".join(TARGET_EXTENSIONS), input_path) for root, dirs, files in os.walk(input_path): for file in files: if not os.path.splitext(file)[1] in TARGET_EXTENSIONS: - logger.debug('skipping non-target file: %s', file) + logger.debug("skipping non-target file: %s", file) continue path = os.path.join(root, file) - outfile = '%s%s' % (path, FREEZE_EXTENSION) + outfile = "%s%s" % (path, FREEZE_EXTENSION) freeze_file(path, outfile, reprocess) def freeze_file(path, output, reprocess=False): - logger.info('freezing features of %s', path) + logger.info("freezing features of %s", path) if os.path.exists(output) and not reprocess: - logger.info('%s already exists, provide -r argument to reprocess', output) + logger.info("%s already exists, provide -r argument to reprocess", output) return try: freeze_features([path, output]) # args: sample, output except Exception as e: - logger.error('could not freeze features for %s: %s', path, str(e)) + logger.error("could not freeze features for %s: %s", path, str(e)) def main(argv=None): if argv is None: argv = sys.argv[1:] - parser = argparse.ArgumentParser(description='Freeze capa features of a file or of files in a directory') - parser.add_argument('file_path', type=str, - help='Path to file or directory to analyze') - parser.add_argument('-r', '--reprocess', action='store_true', default=False, - help='Overwrite existing analysis') - parser.add_argument('-v', '--verbose', action='store_true', - help='Enable verbose output') - parser.add_argument('-q', '--quiet', action='store_true', - help='Disable all output but errors') + parser = argparse.ArgumentParser(description="Freeze capa features of a file or of files in a directory") + parser.add_argument("file_path", type=str, help="Path to file or directory to analyze") + parser.add_argument("-r", "--reprocess", action="store_true", default=False, help="Overwrite existing analysis") + parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output") + parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors") args = parser.parse_args(args=argv) if args.quiet: @@ -91,12 +82,12 @@ def main(argv=None): try: freeze(args.file_path, args.reprocess) except IOError as e: - logger.error('%s', str(e)) + logger.error("%s", str(e)) return -1 - logger.info('freezing features took %d seconds', time.time() - time0) + logger.info("freezing features took %d seconds", time.time() - time0) return 0 -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/scripts/testbed/run_rule_on_testbed.py b/scripts/testbed/run_rule_on_testbed.py index e8be9c2a..9f1bac7b 100644 --- a/scripts/testbed/run_rule_on_testbed.py +++ b/scripts/testbed/run_rule_on_testbed.py @@ -1,10 +1,10 @@ -''' +""" Run a capa rule file against the testbed (frozen features in a directory). Example usage: run_rule_on_testbed.py run_rule_on_testbed.py ..\\rules "create pipe" samples -''' +""" import os import sys @@ -37,8 +37,8 @@ function_names = set([]) CATEGORY = { - 'malicious': 'MAL', - 'benign': 'BEN', + "malicious": "MAL", + "benign": "BEN", } @@ -48,7 +48,7 @@ def check_rule(path, rules, rule_name, only_matching, save_image, verbose): try: capabilities = get_capabilities(path, rules) except (ValueError, KeyError) as e: - logger.error('cannot load %s due to %s: %s', path, type(e).__name__, str(e)) + logger.error("cannot load %s due to %s: %s", path, type(e).__name__, str(e)) errors += 1 return @@ -58,12 +58,12 @@ def check_rule(path, rules, rule_name, only_matching, save_image, verbose): if not only_matching: render_no_hit(path) else: - print('[x] rule matches %d function(s) in %s (%s)' % (hits, path, get_category(path))) + print ("[x] rule matches %d function(s) in %s (%s)" % (hits, path, get_category(path))) file_hits += 1 function_hits += hits - if get_category(path) == 'MAL': + if get_category(path) == "MAL": mal_hits += 1 else: other_hits += 1 @@ -72,29 +72,29 @@ def check_rule(path, rules, rule_name, only_matching, save_image, verbose): render_hit_verbose(capabilities, path, verbose > 1) if save_image: - fvas = ['0x%x' % fva for fva in get_hit_fvas(capabilities)] + fvas = ["0x%x" % fva for fva in get_hit_fvas(capabilities)] file_path = get_idb_or_sample_path(path) if file_path: if not export_fimages(file_path, save_image, fvas): - logger.warning('exporting images failed') + logger.warning("exporting images failed") else: - logger.warning('could not get IDB or sample path') + logger.warning("could not get IDB or sample path") def get_idb_or_sample_path(path): - exts = ['.idb', '.i64', '.exe_', '.dll_', '.mal_'] + exts = [".idb", ".i64", ".exe_", ".dll_", ".mal_"] roots = [os.path.splitext(path)[0], path] for e in exts: for r in roots: - p = '%s%s' % (r, e) + p = "%s%s" % (r, e) if os.path.exists(p): return p return None def get_capabilities(path, rules): - logger.debug('matching rules in %s', path) - with open(path, 'rb') as f: + logger.debug("matching rules in %s", path) + with open(path, "rb") as f: extractor = capa.features.freeze.load(f.read()) return capa.main.find_capabilities(rules, extractor, disable_progress=True) @@ -107,18 +107,18 @@ def get_category(path): for c in CATEGORY: if c in path: return CATEGORY[c] - return 'UNK' + return "UNK" def render_no_hit(path): - print('[ ] no match in %s (%s)' % (path, get_category(path))) + print ("[ ] no match in %s (%s)" % (path, get_category(path))) def render_hit_verbose(capabilities, path, vverbose): try: fnames = load_fnames(path) except IOError as e: - logger.error('%s', str(e)) + logger.error("%s", str(e)) fnames = None for rule, ress in capabilities.items(): @@ -127,11 +127,11 @@ def render_hit_verbose(capabilities, path, vverbose): fname = fnames[fva] function_names.add(fname) else: - fname = '' - print(' - function 0x%x (%s)' % (fva, fname)) + fname = "" + print (" - function 0x%x (%s)" % (fva, fname)) if vverbose: - capa.main.render_result(res, indent=' ') + capa.main.render_result(res, indent=" ") def get_hit_fvas(capabilities): @@ -145,39 +145,39 @@ def get_hit_fvas(capabilities): def load_fnames(path): fnames_path = path.replace(FREEZE_EXTENSION, FNAMES_EXTENSION) if not os.path.exists(fnames_path): - raise IOError('%s does not exist' % fnames_path) + raise IOError("%s does not exist" % fnames_path) - logger.debug('fnames path: %s', fnames_path) + logger.debug("fnames path: %s", fnames_path) try: # json file with format { fva: fname } fnames = load_json(fnames_path) - logger.debug('loaded JSON file') + logger.debug("loaded JSON file") except TypeError: # csv file with format idbmd5;md5;fva;fname fnames = load_csv(fnames_path) - logger.debug('loaded CSV file') + logger.debug("loaded CSV file") fnames = convert_keys_to_int(fnames) - logger.debug('read %d function names' % len(fnames)) + logger.debug("read %d function names" % len(fnames)) return fnames def load_json(path): - with open(path, 'r') as f: + with open(path, "r") as f: try: funcs = json.load(f) except ValueError as e: - logger.debug('not a JSON file, %s', str(e)) + logger.debug("not a JSON file, %s", str(e)) raise TypeError return funcs def load_csv(path): funcs = defaultdict(str) - with open(path, 'r') as f: + with open(path, "r") as f: data = f.read().splitlines() for line in data: try: - idbmd5, md5, fva, name = line.split(':', 3) + idbmd5, md5, fva, name = line.split(":", 3) except ValueError as e: logger.warning('%s: "%s"', str(e), line) funcs[fva] = name @@ -198,42 +198,38 @@ def convert_keys_to_int(funcs_in): def print_summary(verbose, start_time): global file_count, file_hits, function_hits, errors - print('\n[SUMMARY]') + print ("\n[SUMMARY]") m, s = divmod(time.time() - start_time, 60) - logger.info('ran for %d:%02d minutes', m, s) - ratio = ' (%d%%)' % ((float(file_hits) / file_count) * 100) if file_count else '' - print('matched %d function(s) in %d/%d%s sample(s), encountered %d error(s)' % ( - function_hits, file_hits, file_count, ratio, errors)) - print('%d hits on (MAL) files; %d hits on other files' % (mal_hits, other_hits)) + logger.info("ran for %d:%02d minutes", m, s) + ratio = " (%d%%)" % ((float(file_hits) / file_count) * 100) if file_count else "" + print ( + "matched %d function(s) in %d/%d%s sample(s), encountered %d error(s)" + % (function_hits, file_hits, file_count, ratio, errors) + ) + print ("%d hits on (MAL) files; %d hits on other files" % (mal_hits, other_hits)) if verbose: if len(function_names) > 0: - print('matched function names (unique):') + print ("matched function names (unique):") for fname in function_names: - print ' - %s' % fname + print " - %s" % fname def main(argv=None): if argv is None: argv = sys.argv[1:] - parser = argparse.ArgumentParser(description='Run capa rule file against frozen features in a directory') - parser.add_argument('rules', type=str, - help='Path to directory containing rules') - parser.add_argument('rule_name', type=str, - help='Name of rule to test') - parser.add_argument('frozen_path', type=str, - help='Path to frozen feature file or directory') - parser.add_argument('-f', '--fast', action='store_true', - help='Don't test slow files') - parser.add_argument('-o', '--only_matching', action='store_true', - help='Print only if rule matches') - parser.add_argument('-s', '--save_image', action='store', - help='Directory to save exported images of function graphs') - parser.add_argument('-v', '--verbose', action='count', default=0, - help='Increase output verbosity') - parser.add_argument('-q', '--quiet', action='store_true', - help='Disable all output but errors') + parser = argparse.ArgumentParser(description="Run capa rule file against frozen features in a directory") + parser.add_argument("rules", type=str, help="Path to directory containing rules") + parser.add_argument("rule_name", type=str, help="Name of rule to test") + parser.add_argument("frozen_path", type=str, help="Path to frozen feature file or directory") + parser.add_argument("-f", "--fast", action="store_true", help="Don't test slow files") + parser.add_argument("-o", "--only_matching", action="store_true", help="Print only if rule matches") + parser.add_argument( + "-s", "--save_image", action="store", help="Directory to save exported images of function graphs" + ) + parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase output verbosity") + parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors") args = parser.parse_args(args=argv) if args.quiet: @@ -247,7 +243,7 @@ def main(argv=None): logging.getLogger().setLevel(logging.INFO) if not os.path.isdir(args.rules): - logger.error('%s is not a directory', args.rules) + logger.error("%s is not a directory", args.rules) return -1 # load rule @@ -256,15 +252,15 @@ def main(argv=None): rules = list(capa.rules.get_rules_and_dependencies(rules, args.rule_name)) rules = capa.rules.RuleSet(rules) except IOError as e: - logger.error('%s', str(e)) + logger.error("%s", str(e)) return -1 except capa.rules.InvalidRule as e: - logger.error('%s', str(e)) + logger.error("%s", str(e)) return -1 time0 = time.time() - print('[RULE %s]' % args.rule_name) + print ("[RULE %s]" % args.rule_name) if os.path.isfile(args.frozen_path): check_rule(args.frozen_path, rules, args.rule_name, args.only_matching, args.save_image, args.verbose) @@ -277,8 +273,8 @@ def main(argv=None): continue path = os.path.join(root, file) - if args.fast and 'slow' in path: - logger.debug('fast mode skipping %s', path) + if args.fast and "slow" in path: + logger.debug("fast mode skipping %s", path) continue freeze_files.append(path) @@ -286,12 +282,12 @@ def main(argv=None): for path in sorted(freeze_files): sample_time0 = time.time() check_rule(path, rules, args.rule_name, args.only_matching, args.save_image, args.verbose) - logger.debug('rule check took %d seconds', time.time() - sample_time0) + logger.debug("rule check took %d seconds", time.time() - sample_time0) except KeyboardInterrupt: - logger.info('Received keyboard interrupt, terminating') + logger.info("Received keyboard interrupt, terminating") print_summary(args.verbose, time0) -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/scripts/testbed/start_ida_dump_fnames.py b/scripts/testbed/start_ida_dump_fnames.py index b812105c..b701d509 100644 --- a/scripts/testbed/start_ida_dump_fnames.py +++ b/scripts/testbed/start_ida_dump_fnames.py @@ -1,4 +1,4 @@ -''' +""" Start IDA Pro in autonomous mode to dump JSON file of function names { fva: fname }. Processes a single file or a directory. Only runs on files with supported file extensions. @@ -6,7 +6,7 @@ Only runs on files with supported file extensions. Example usage: start_ida_dump_fnames.py start_ida_dump_fnames.py samples\benign -''' +""" import os import sys @@ -19,18 +19,18 @@ import argparse from scripts.testbed import FNAMES_EXTENSION -IDA32_PATH = 'C:\\Program Files\\IDA Pro 7.3\\ida.exe' -IDA64_PATH = 'C:\\Program Files\\IDA Pro 7.3\\ida64.exe' +IDA32_PATH = "C:\\Program Files\\IDA Pro 7.3\\ida.exe" +IDA64_PATH = "C:\\Program Files\\IDA Pro 7.3\\ida64.exe" # expected in same directory as this file -DUMP_SCRIPT_PATH = os.path.abspath('_dump_fnames.py') +DUMP_SCRIPT_PATH = os.path.abspath("_dump_fnames.py") SUPPORTED_EXTENSIONS = [ - '.exe_', - '.dll_', - '.sys_', - '.idb', - '.i64', + ".exe_", + ".dll_", + ".sys_", + ".idb", + ".i64", ] @@ -38,40 +38,43 @@ logger = logging.getLogger(__name__) def call_ida_dump_script(sample_path, reprocess): - ''' call IDA in autonomous mode and return True if success, False on failure ''' - logger.info('processing %s (MD5: %s)', sample_path, get_md5_hexdigest(sample_path)) + """ call IDA in autonomous mode and return True if success, False on failure """ + logger.info("processing %s (MD5: %s)", sample_path, get_md5_hexdigest(sample_path)) # TODO detect 64-bit binaries - if os.path.splitext(sample_path)[-1] == '.i64': + if os.path.splitext(sample_path)[-1] == ".i64": IDA_PATH = IDA64_PATH else: IDA_PATH = IDA32_PATH - if sample_path.endswith('.idb') or sample_path.endswith('.i64'): + if sample_path.endswith(".idb") or sample_path.endswith(".i64"): sample_path = sample_path[:-4] - fnames = '%s%s' % (sample_path, FNAMES_EXTENSION) + fnames = "%s%s" % (sample_path, FNAMES_EXTENSION) if os.path.exists(fnames) and not reprocess: - logger.info('%s already exists and contains %d function names, provide -r argument to reprocess', - fnames, len(get_function_names(fnames))) + logger.info( + "%s already exists and contains %d function names, provide -r argument to reprocess", + fnames, + len(get_function_names(fnames)), + ) return True out_path = os.path.split(fnames)[-1] # relative to IDA database file - args = [IDA_PATH, '-A', '-S%s "%s"' % (DUMP_SCRIPT_PATH, out_path), sample_path] - logger.debug('calling "%s"' % ' '.join(args)) + args = [IDA_PATH, "-A", '-S%s "%s"' % (DUMP_SCRIPT_PATH, out_path), sample_path] + logger.debug('calling "%s"' % " ".join(args)) subprocess.call(args) if not os.path.exists(fnames): - logger.warning('%s was not created', fnames) + logger.warning("%s was not created", fnames) return False - logger.debug('extracted %d function names to %s', len(get_function_names(fnames)), fnames) + logger.debug("extracted %d function names to %s", len(get_function_names(fnames)), fnames) return True def get_md5_hexdigest(sample_path): m = hashlib.md5() - with open(sample_path, 'rb') as f: + with open(sample_path, "rb") as f: m.update(f.read()) return m.hexdigest() @@ -79,19 +82,17 @@ def get_md5_hexdigest(sample_path): def get_function_names(fnames_file): if not os.path.exists(fnames_file): return None - with open(fnames_file, 'r') as f: + with open(fnames_file, "r") as f: return json.load(f) def main(): parser = argparse.ArgumentParser( - description='Launch IDA Pro in autonomous mode to dump function names of a file or of files in a directory') - parser.add_argument('file_path', type=str, - help='File or directory path to analyze') - parser.add_argument('-r', '--reprocess', action='store_true', default=False, - help='Overwrite existing analysis') - parser.add_argument('-v', '--verbose', action='store_true', - help='Enable verbose output') + description="Launch IDA Pro in autonomous mode to dump function names of a file or of files in a directory" + ) + parser.add_argument("file_path", type=str, help="File or directory path to analyze") + parser.add_argument("-r", "--reprocess", action="store_true", default=False, help="Overwrite existing analysis") + parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output") args = parser.parse_args(args=sys.argv[1:]) if args.verbose: @@ -102,7 +103,7 @@ def main(): logging.getLogger().setLevel(logging.INFO) if not os.path.exists(args.file_path): - logger.warning('%s does not exist', args.file_path) + logger.warning("%s does not exist", args.file_path) return -1 if os.path.isfile(args.file_path): @@ -111,21 +112,21 @@ def main(): errors = 0 - logger.info('processing files in %s with file extension %s', args.file_path, '|'.join(SUPPORTED_EXTENSIONS)) + logger.info("processing files in %s with file extension %s", args.file_path, "|".join(SUPPORTED_EXTENSIONS)) for root, dirs, files in os.walk(args.file_path): for file in files: if not os.path.splitext(file)[1] in SUPPORTED_EXTENSIONS: - logger.debug('%s does not have supported file extension', file) + logger.debug("%s does not have supported file extension", file) continue path = os.path.join(root, file) if not call_ida_dump_script(path, args.reprocess): errors += 1 if errors: - logger.warning('encountered %d errors', errors) + logger.warning("encountered %d errors", errors) return 0 -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/scripts/testbed/start_ida_export_fimages.py b/scripts/testbed/start_ida_export_fimages.py index 18b08bf6..1d7fac47 100644 --- a/scripts/testbed/start_ida_export_fimages.py +++ b/scripts/testbed/start_ida_export_fimages.py @@ -1,10 +1,10 @@ -''' +""" Start IDA Pro in autonomous mode to export images of function graphs. Example usage: start_ida_export_fimages.py -f start_ida_export_fimages.py test.exe imgs -f 0x401000,0x402F90 -''' +""" import os import imp @@ -16,34 +16,35 @@ import subprocess import argparse try: - imp.find_module('graphviz') + imp.find_module("graphviz") from graphviz import Source + graphviz_found = True except ImportError: graphviz_found = False -IDA32_PATH = 'C:\\Program Files\\IDA Pro 7.3\\ida.exe' -IDA64_PATH = 'C:\\Program Files\\IDA Pro 7.3\\ida64.exe' +IDA32_PATH = "C:\\Program Files\\IDA Pro 7.3\\ida.exe" +IDA64_PATH = "C:\\Program Files\\IDA Pro 7.3\\ida64.exe" # expected in same directory as this file -EXPORT_SCRIPT_PATH = os.path.abspath('_export_fimages.py') +EXPORT_SCRIPT_PATH = os.path.abspath("_export_fimages.py") logger = logging.getLogger(__name__) def export_fimages(file_path, out_dir, functions, manual=False): - ''' + """ Export images of function graphs. :param file_path: file to analyze :param out_dir: output directory :param functions: list of strings of hex formatted fvas :param manual: non-autonomous mode :return: True on success, False otherwise - ''' + """ if not graphviz_found: - logger.warning('please install graphviz to export images') + logger.warning("please install graphviz to export images") return False if not os.path.exists(out_dir): @@ -55,37 +56,37 @@ def export_fimages(file_path, out_dir, functions, manual=False): img_count = 0 for root, dirs, files in os.walk(out_dir): for file in files: - if not file.endswith('.dot'): + if not file.endswith(".dot"): continue try: s = Source.from_file(file, directory=out_dir) - s.render(file, directory=out_dir, format='png', cleanup=True) + s.render(file, directory=out_dir, format="png", cleanup=True) img_count += 1 except BaseException: - logger.warning('graphviz error rendering file') + logger.warning("graphviz error rendering file") if img_count > 0: logger.info('exported %d function graph images to "%s"', img_count, os.path.abspath(out_dir)) return True else: - logger.warning('failed to export function graph images') + logger.warning("failed to export function graph images") return False def call_ida_script(script_path, script_args, sample_path, manual): - logger.info('processing %s (MD5: %s)', sample_path, get_md5_hexdigest(sample_path)) + logger.info("processing %s (MD5: %s)", sample_path, get_md5_hexdigest(sample_path)) # TODO detect 64-bit binaries - if os.path.splitext(sample_path)[-1] == '.i64': + if os.path.splitext(sample_path)[-1] == ".i64": IDA_PATH = IDA64_PATH else: IDA_PATH = IDA32_PATH - args = [IDA_PATH, '-A', '-S%s %s' % (script_path, ' '.join(script_args)), sample_path] + args = [IDA_PATH, "-A", "-S%s %s" % (script_path, " ".join(script_args)), sample_path] if manual: - args.remove('-A') + args.remove("-A") - logger.debug('calling "%s"' % ' '.join(args)) + logger.debug('calling "%s"' % " ".join(args)) if subprocess.call(args) == 0: return True else: @@ -94,24 +95,20 @@ def call_ida_script(script_path, script_args, sample_path, manual): def get_md5_hexdigest(sample_path): m = hashlib.md5() - with open(sample_path, 'rb') as f: + with open(sample_path, "rb") as f: m.update(f.read()) return m.hexdigest() def main(): parser = argparse.ArgumentParser( - description='Launch IDA Pro in autonomous mode to export images of function graphs') - parser.add_argument('file_path', type=str, - help='File to export from') - parser.add_argument('out_dir', type=str, - help='Export target directory') - parser.add_argument('-f', '--functions', action='store', - help='Comma separated list of functions to export') - parser.add_argument('-m', '--manual', action='store_true', - help='Manual mode: show IDA dialog boxes') - parser.add_argument('-v', '--verbose', action='store_true', - help='Enable verbose output') + description="Launch IDA Pro in autonomous mode to export images of function graphs" + ) + parser.add_argument("file_path", type=str, help="File to export from") + parser.add_argument("out_dir", type=str, help="Export target directory") + parser.add_argument("-f", "--functions", action="store", help="Comma separated list of functions to export") + parser.add_argument("-m", "--manual", action="store_true", help="Manual mode: show IDA dialog boxes") + parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output") args = parser.parse_args(args=sys.argv[1:]) if args.verbose: @@ -122,14 +119,14 @@ def main(): logging.getLogger().setLevel(logging.INFO) if not os.path.isfile(args.file_path): - logger.warning('%s is not a file', args.file_path) + logger.warning("%s is not a file", args.file_path) return -1 - functions = args.functions.split(',') + functions = args.functions.split(",") export_fimages(args.file_path, args.out_dir, functions, args.manual) return 0 -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/setup.py b/setup.py index 29caa174..dd1d90df 100644 --- a/setup.py +++ b/setup.py @@ -4,69 +4,51 @@ import sys import setuptools -requirements = [ - "six", - "tqdm", - "pyyaml", - "tabulate", - "colorama", - "termcolor", - "ruamel.yaml" -] +requirements = ["six", "tqdm", "pyyaml", "tabulate", "colorama", "termcolor", "ruamel.yaml"] if sys.version_info >= (3, 0): # py3 - requirements.append('networkx') + requirements.append("networkx") else: # py2 - requirements.append('enum34') - requirements.append('vivisect') - requirements.append('viv-utils') - requirements.append('networkx==2.2') # v2.2 is last version supported by Python 2.7 + requirements.append("enum34") + requirements.append("vivisect") + requirements.append("viv-utils") + requirements.append("networkx==2.2") # v2.2 is last version supported by Python 2.7 # this sets __version__ # via: http://stackoverflow.com/a/7071358/87207 # and: http://stackoverflow.com/a/2073599/87207 -with open(os.path.join('capa', 'version.py'), 'rb') as f: +with open(os.path.join("capa", "version.py"), "rb") as f: exec(f.read()) def get_rule_paths(): - return [os.path.join('..', x[0], '*.yml') for x in os.walk('rules')] + return [os.path.join("..", x[0], "*.yml") for x in os.walk("rules")] setuptools.setup( - name='capa', + name="capa", version=__version__, - description='', - long_description='', - author='Willi Ballenthin, Moritz Raabe', - author_email='william.ballenthin@mandiant.com, moritz.raabe@mandiant.com', - url='https://www.github.com/fireeye/capa', - packages=setuptools.find_packages(exclude=['tests', 'testbed']), - package_dir={'capa': 'capa'}, - package_data={'capa': get_rule_paths()}, - entry_points={ - 'console_scripts': [ - 'capa=capa.main:main', - ] - }, + description="", + long_description="", + author="Willi Ballenthin, Moritz Raabe", + author_email="william.ballenthin@mandiant.com, moritz.raabe@mandiant.com", + url="https://www.github.com/fireeye/capa", + packages=setuptools.find_packages(exclude=["tests", "testbed"]), + package_dir={"capa": "capa"}, + package_data={"capa": get_rule_paths()}, + entry_points={"console_scripts": ["capa=capa.main:main",]}, include_package_data=True, install_requires=requirements, - extras_require={ - 'dev': [ - 'pytest', - 'pytest-sugar', - 'pycodestyle', - ] - }, + extras_require={"dev": ["pytest", "pytest-sugar", "pycodestyle",]}, zip_safe=False, - keywords='capa', + keywords="capa", classifiers=[ - 'Development Status :: 3 - Alpha', - 'Intended Audience :: Developers', - 'Natural Language :: English', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 3', + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Natural Language :: English", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 3", ], ) diff --git a/tests/fixtures.py b/tests/fixtures.py index 64862331..92c30022 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -68,11 +68,7 @@ def sample_c91887d861d9bd4a5872249b641bc9f9(): @pytest.fixture def sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41(): - path = os.path.join( - CD, - "data", - "39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.dll_", - ) + path = os.path.join(CD, "data", "39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.dll_",) return Sample(viv_utils.getWorkspace(path), path) diff --git a/tests/test_engine.py b/tests/test_engine.py index 5034d18f..9a5cecf2 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -41,18 +41,8 @@ def test_some(): assert Some(1, Number(1)).evaluate({Number(0): {1}}) == False assert Some(2, Number(1), Number(2), Number(3)).evaluate({Number(0): {1}}) == False - assert ( - Some(2, Number(1), Number(2), Number(3)).evaluate( - {Number(0): {1}, Number(1): {1}} - ) - == False - ) - assert ( - Some(2, Number(1), Number(2), Number(3)).evaluate( - {Number(0): {1}, Number(1): {1}, Number(2): {1}} - ) - == True - ) + assert Some(2, Number(1), Number(2), Number(3)).evaluate({Number(0): {1}, Number(1): {1}}) == False + assert Some(2, Number(1), Number(2), Number(3)).evaluate({Number(0): {1}, Number(1): {1}, Number(2): {1}}) == True assert ( Some(2, Number(1), Number(2), Number(3)).evaluate( {Number(0): {1}, Number(1): {1}, Number(2): {1}, Number(3): {1}} @@ -61,27 +51,20 @@ def test_some(): ) assert ( Some(2, Number(1), Number(2), Number(3)).evaluate( - { - Number(0): {1}, - Number(1): {1}, - Number(2): {1}, - Number(3): {1}, - Number(4): {1}, - } + {Number(0): {1}, Number(1): {1}, Number(2): {1}, Number(3): {1}, Number(4): {1},} ) == True ) def test_complex(): - assert True == Or( - And(Number(1), Number(2)), - Or(Number(3), Some(2, Number(4), Number(5), Number(6))), - ).evaluate({Number(5): {1}, Number(6): {1}, Number(7): {1}, Number(8): {1}}) + assert True == Or(And(Number(1), Number(2)), Or(Number(3), Some(2, Number(4), Number(5), Number(6))),).evaluate( + {Number(5): {1}, Number(6): {1}, Number(7): {1}, Number(8): {1}} + ) - assert False == Or( - And(Number(1), Number(2)), Or(Number(3), Some(2, Number(4), Number(5))) - ).evaluate({Number(5): {1}, Number(6): {1}, Number(7): {1}, Number(8): {1}}) + assert False == Or(And(Number(1), Number(2)), Or(Number(3), Some(2, Number(4), Number(5)))).evaluate( + {Number(5): {1}, Number(6): {1}, Number(7): {1}, Number(8): {1}} + ) def test_range(): @@ -119,100 +102,108 @@ def test_range(): def test_range_exact(): - rule = textwrap.dedent(''' + rule = textwrap.dedent( + """ rule: meta: name: test rule features: - count(number(100)): 2 - ''') + """ + ) r = capa.rules.Rule.from_yaml(rule) # just enough matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1, 2}}, 0x0) - assert 'test rule' in matches + assert "test rule" in matches # not enough matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1}}, 0x0) - assert 'test rule' not in matches + assert "test rule" not in matches # too many matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1, 2, 3}}, 0x0) - assert 'test rule' not in matches + assert "test rule" not in matches def test_range_range(): - rule = textwrap.dedent(''' + rule = textwrap.dedent( + """ rule: meta: name: test rule features: - count(number(100)): (2, 3) - ''') + """ + ) r = capa.rules.Rule.from_yaml(rule) # just enough matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1, 2}}, 0x0) - assert 'test rule' in matches + assert "test rule" in matches # enough matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1, 2, 3}}, 0x0) - assert 'test rule' in matches + assert "test rule" in matches # not enough matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1}}, 0x0) - assert 'test rule' not in matches + assert "test rule" not in matches # too many matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1, 2, 3, 4}}, 0x0) - assert 'test rule' not in matches + assert "test rule" not in matches def test_range_exact_zero(): - rule = textwrap.dedent(''' + rule = textwrap.dedent( + """ rule: meta: name: test rule features: - count(number(100)): 0 - ''') + """ + ) r = capa.rules.Rule.from_yaml(rule) # feature isn't indexed - good. features, matches = capa.engine.match([r], {}, 0x0) - assert 'test rule' in matches + assert "test rule" in matches # feature is indexed, but no matches. # i don't think we should ever really have this case, but good to check anyways. features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {}}, 0x0) - assert 'test rule' in matches + assert "test rule" in matches # too many matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1}}, 0x0) - assert 'test rule' not in matches + assert "test rule" not in matches def test_range_with_zero(): - rule = textwrap.dedent(''' + rule = textwrap.dedent( + """ rule: meta: name: test rule features: - count(number(100)): (0, 1) - ''') + """ + ) r = capa.rules.Rule.from_yaml(rule) # ok features, matches = capa.engine.match([r], {}, 0x0) - assert 'test rule' in matches + assert "test rule" in matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {}}, 0x0) - assert 'test rule' in matches + assert "test rule" in matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1}}, 0x0) - assert 'test rule' in matches + assert "test rule" in matches # too many matches features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1, 2}}, 0x0) - assert 'test rule' not in matches + assert "test rule" not in matches def test_match_adds_matched_rule_feature(): @@ -227,9 +218,7 @@ def test_match_adds_matched_rule_feature(): """ ) r = capa.rules.Rule.from_yaml(rule) - features, matches = capa.engine.match( - [r], {capa.features.insn.Number(100): {1}}, 0x0 - ) + features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1}}, 0x0) assert capa.features.MatchedRule("test rule") in features @@ -261,9 +250,7 @@ def test_match_matched_rules(): ] features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.insn.Number(100): {1}}, - 0x0, + capa.engine.topologically_order_rules(rules), {capa.features.insn.Number(100): {1}}, 0x0, ) assert capa.features.MatchedRule("test rule1") in features assert capa.features.MatchedRule("test rule2") in features @@ -271,9 +258,7 @@ def test_match_matched_rules(): # the ordering of the rules must not matter, # the engine should match rules in an appropriate order. features, matches = capa.engine.match( - capa.engine.topologically_order_rules(reversed(rules)), - {capa.features.insn.Number(100): {1}}, - 0x0, + capa.engine.topologically_order_rules(reversed(rules)), {capa.features.insn.Number(100): {1}}, 0x0, ) assert capa.features.MatchedRule("test rule1") in features assert capa.features.MatchedRule("test rule2") in features @@ -319,30 +304,22 @@ def test_regex(): ), ] features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.insn.Number(100): {1}}, - 0x0, + capa.engine.topologically_order_rules(rules), {capa.features.insn.Number(100): {1}}, 0x0, ) assert capa.features.MatchedRule("test rule") not in features features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String("aaaa"): {1}}, - 0x0, + capa.engine.topologically_order_rules(rules), {capa.features.String("aaaa"): {1}}, 0x0, ) assert capa.features.MatchedRule("test rule") not in features features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String("aBBBBa"): {1}}, - 0x0, + capa.engine.topologically_order_rules(rules), {capa.features.String("aBBBBa"): {1}}, 0x0, ) assert capa.features.MatchedRule("test rule") not in features features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String("abbbba"): {1}}, - 0x0, + capa.engine.topologically_order_rules(rules), {capa.features.String("abbbba"): {1}}, 0x0, ) assert capa.features.MatchedRule("test rule") in features assert capa.features.MatchedRule("rule with implied wildcards") in features @@ -365,9 +342,7 @@ def test_regex_ignorecase(): ), ] features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String("aBBBBa"): {1}}, - 0x0, + capa.engine.topologically_order_rules(rules), {capa.features.String("aBBBBa"): {1}}, 0x0, ) assert capa.features.MatchedRule("test rule") in features @@ -446,9 +421,7 @@ def test_match_namespace(): ] features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.insn.API("CreateFile"): {1}}, - 0x0, + capa.engine.topologically_order_rules(rules), {capa.features.insn.API("CreateFile"): {1}}, 0x0, ) assert "CreateFile API" in matches assert "file-create" in matches @@ -458,9 +431,7 @@ def test_match_namespace(): assert capa.features.MatchedRule("file/create/CreateFile") in features features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.insn.API("WriteFile"): {1}}, - 0x0, + capa.engine.topologically_order_rules(rules), {capa.features.insn.API("WriteFile"): {1}}, 0x0, ) assert "WriteFile API" in matches assert "file-create" not in matches diff --git a/tests/test_freeze.py b/tests/test_freeze.py index 42562179..aecfd7db 100644 --- a/tests/test_freeze.py +++ b/tests/test_freeze.py @@ -10,26 +10,22 @@ import capa.features.freeze from fixtures import * -EXTRACTOR = capa.features.extractors.NullFeatureExtractor({ - 'file features': [ - (0x402345, capa.features.Characteristic('embedded pe')), - ], - 'functions': { - 0x401000: { - 'features': [ - (0x401000, capa.features.Characteristic('switch')), - ], - 'basic blocks': { - 0x401000: { - 'features': [ - (0x401000, capa.features.Characteristic('tight loop')), - ], - 'instructions': { - 0x401000: { - 'features': [ - (0x401000, capa.features.insn.Mnemonic('xor')), - (0x401000, capa.features.Characteristic('nzxor')), - ], +EXTRACTOR = capa.features.extractors.NullFeatureExtractor( + { + "file features": [(0x402345, capa.features.Characteristic("embedded pe")),], + "functions": { + 0x401000: { + "features": [(0x401000, capa.features.Characteristic("switch")),], + "basic blocks": { + 0x401000: { + "features": [(0x401000, capa.features.Characteristic("tight loop")),], + "instructions": { + 0x401000: { + "features": [ + (0x401000, capa.features.insn.Mnemonic("xor")), + (0x401000, capa.features.Characteristic("nzxor")), + ], + }, }, }, }, @@ -44,19 +40,25 @@ def test_null_feature_extractor(): assert list(EXTRACTOR.get_basic_blocks(0x401000)) == [0x401000] assert list(EXTRACTOR.get_instructions(0x401000, 0x0401000)) == [0x401000, 0x401002] - rules = capa.rules.RuleSet([ - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: xor loop - scope: basic block - features: - - and: - - characteristic: tight loop - - mnemonic: xor - - characteristic: nzxor - ''')), - ]) + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: xor loop + scope: basic block + features: + - and: + - characteristic: tight loop + - mnemonic: xor + - characteristic: nzxor + """ + ) + ), + ] + ) capabilities = capa.main.find_capabilities(rules, EXTRACTOR) assert "xor loop" in capabilities @@ -74,20 +76,14 @@ def compare_extractors(a, b): assert list(a.get_functions()) == list(b.get_functions()) for f in a.get_functions(): assert list(a.get_basic_blocks(f)) == list(b.get_basic_blocks(f)) - assert list(a.extract_function_features(f)) == list( - b.extract_function_features(f) - ) + assert list(a.extract_function_features(f)) == list(b.extract_function_features(f)) for bb in a.get_basic_blocks(f): assert list(a.get_instructions(f, bb)) == list(b.get_instructions(f, bb)) - assert list(a.extract_basic_block_features(f, bb)) == list( - b.extract_basic_block_features(f, bb) - ) + assert list(a.extract_basic_block_features(f, bb)) == list(b.extract_basic_block_features(f, bb)) for insn in a.get_instructions(f, bb): - assert list(a.extract_insn_features(f, bb, insn)) == list( - b.extract_insn_features(f, bb, insn) - ) + assert list(a.extract_insn_features(f, bb, insn)) == list(b.extract_insn_features(f, bb, insn)) def compare_extractors_viv_null(viv_ext, null_ext): @@ -102,17 +98,11 @@ def compare_extractors_viv_null(viv_ext, null_ext): # TODO: ordering of these things probably doesn't work yet - assert list(viv_ext.extract_file_features()) == list( - null_ext.extract_file_features() - ) + assert list(viv_ext.extract_file_features()) == list(null_ext.extract_file_features()) assert to_int(list(viv_ext.get_functions())) == list(null_ext.get_functions()) for f in viv_ext.get_functions(): - assert to_int(list(viv_ext.get_basic_blocks(f))) == list( - null_ext.get_basic_blocks(to_int(f)) - ) - assert list(viv_ext.extract_function_features(f)) == list( - null_ext.extract_function_features(to_int(f)) - ) + assert to_int(list(viv_ext.get_basic_blocks(f))) == list(null_ext.get_basic_blocks(to_int(f))) + assert list(viv_ext.extract_function_features(f)) == list(null_ext.extract_function_features(to_int(f))) for bb in viv_ext.get_basic_blocks(f): assert to_int(list(viv_ext.get_instructions(f, bb))) == list( @@ -161,9 +151,9 @@ def test_serialize_features(): roundtrip_feature(capa.features.String("SCardControl")) roundtrip_feature(capa.features.insn.Number(0xFF)) roundtrip_feature(capa.features.insn.Offset(0x0)) - roundtrip_feature(capa.features.insn.Mnemonic('push')) - roundtrip_feature(capa.features.file.Section('.rsrc')) - roundtrip_feature(capa.features.Characteristic('tight loop')) + roundtrip_feature(capa.features.insn.Mnemonic("push")) + roundtrip_feature(capa.features.file.Section(".rsrc")) + roundtrip_feature(capa.features.Characteristic("tight loop")) roundtrip_feature(capa.features.basicblock.BasicBlock()) roundtrip_feature(capa.features.file.Export("BaseThreadInitThunk")) roundtrip_feature(capa.features.file.Import("kernel32.IsWow64Process")) @@ -173,19 +163,13 @@ def test_serialize_features(): def test_freeze_sample(tmpdir, sample_9324d1a8ae37a36ae560c37448c9705a): # tmpdir fixture handles cleanup o = tmpdir.mkdir("capa").join("test.frz").strpath - assert ( - capa.features.freeze.main( - [sample_9324d1a8ae37a36ae560c37448c9705a.path, o, "-v"] - ) - == 0 - ) + assert capa.features.freeze.main([sample_9324d1a8ae37a36ae560c37448c9705a.path, o, "-v"]) == 0 def test_freeze_load_sample(tmpdir, sample_9324d1a8ae37a36ae560c37448c9705a): o = tmpdir.mkdir("capa").join("test.frz") viv_extractor = capa.features.extractors.viv.VivisectFeatureExtractor( - sample_9324d1a8ae37a36ae560c37448c9705a.vw, - sample_9324d1a8ae37a36ae560c37448c9705a.path, + sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path, ) with open(o.strpath, "wb") as f: f.write(capa.features.freeze.dump(viv_extractor)) diff --git a/tests/test_main.py b/tests/test_main.py index 4592a092..11de1356 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -29,56 +29,54 @@ def test_main_single_rule(sample_9324d1a8ae37a36ae560c37448c9705a, tmpdir): ) rule_file = tmpdir.mkdir("capa").join("rule.yml") rule_file.write(RULE_CONTENT) - assert ( - capa.main.main( - [ - sample_9324d1a8ae37a36ae560c37448c9705a.path, - "-v", - "-r", - rule_file.strpath, - ] - ) - == 0 - ) + assert capa.main.main([sample_9324d1a8ae37a36ae560c37448c9705a.path, "-v", "-r", rule_file.strpath,]) == 0 def test_main_shellcode(sample_499c2a85f6e8142c3f48d4251c9c7cd6_raw32): - assert ( - capa.main.main( - [sample_499c2a85f6e8142c3f48d4251c9c7cd6_raw32.path, "-v", "-f", "sc32"] - ) - == 0 - ) + assert capa.main.main([sample_499c2a85f6e8142c3f48d4251c9c7cd6_raw32.path, "-v", "-f", "sc32"]) == 0 def test_ruleset(): - rules = capa.rules.RuleSet([ - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: file rule - scope: file - features: - - characteristic: embedded pe - ''')), - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: function rule - scope: function - features: - - characteristic: switch - ''')), - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: basic block rule - scope: basic block - features: - - characteristic: nzxor - ''')), - - ]) + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: file rule + scope: file + features: + - characteristic: embedded pe + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: function rule + scope: function + features: + - characteristic: switch + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: basic block rule + scope: basic block + features: + - characteristic: nzxor + """ + ) + ), + ] + ) assert len(rules.file_rules) == 1 assert len(rules.function_rules) == 1 assert len(rules.basic_block_rules) == 1 @@ -142,8 +140,7 @@ def test_match_across_scopes_file_function(sample_9324d1a8ae37a36ae560c37448c970 ] ) extractor = capa.features.extractors.viv.VivisectFeatureExtractor( - sample_9324d1a8ae37a36ae560c37448c9705a.vw, - sample_9324d1a8ae37a36ae560c37448c9705a.path, + sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path, ) capabilities = capa.main.find_capabilities(rules, extractor) assert "install service" in capabilities @@ -152,48 +149,64 @@ def test_match_across_scopes_file_function(sample_9324d1a8ae37a36ae560c37448c970 def test_match_across_scopes(sample_9324d1a8ae37a36ae560c37448c9705a): - rules = capa.rules.RuleSet([ - # this rule should match on a basic block (including at least 0x403685) - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: tight loop - scope: basic block - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x403685 - features: - - characteristic: tight loop - ''')), - # this rule should match on a function (0x403660) - # based on API, as well as prior basic block rule match - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: kill thread loop - scope: function - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x403660 - features: - - and: - - api: kernel32.TerminateThread - - api: kernel32.CloseHandle - - match: tight loop - ''')), - # this rule should match on a file feature and a prior function rule match - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: kill thread program - scope: file - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - and: - - section: .text - - match: kill thread loop - ''')), - ]) - extractor = capa.features.extractors.viv.VivisectFeatureExtractor(sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path) + rules = capa.rules.RuleSet( + [ + # this rule should match on a basic block (including at least 0x403685) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: tight loop + scope: basic block + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x403685 + features: + - characteristic: tight loop + """ + ) + ), + # this rule should match on a function (0x403660) + # based on API, as well as prior basic block rule match + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: kill thread loop + scope: function + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x403660 + features: + - and: + - api: kernel32.TerminateThread + - api: kernel32.CloseHandle + - match: tight loop + """ + ) + ), + # this rule should match on a file feature and a prior function rule match + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: kill thread program + scope: file + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - and: + - section: .text + - match: kill thread loop + """ + ) + ), + ] + ) + extractor = capa.features.extractors.viv.VivisectFeatureExtractor( + sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path + ) capabilities = capa.main.find_capabilities(rules, extractor) assert "tight loop" in capabilities assert "kill thread loop" in capabilities @@ -201,22 +214,27 @@ def test_match_across_scopes(sample_9324d1a8ae37a36ae560c37448c9705a): def test_subscope_bb_rules(sample_9324d1a8ae37a36ae560c37448c9705a): - rules = capa.rules.RuleSet([ - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: test rule - scope: function - features: - - and: - - basic block: - - characteristic: tight loop - ''')) - ]) + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scope: function + features: + - and: + - basic block: + - characteristic: tight loop + """ + ) + ) + ] + ) # tight loop at 0x403685 extractor = capa.features.extractors.viv.VivisectFeatureExtractor( - sample_9324d1a8ae37a36ae560c37448c9705a.vw, - sample_9324d1a8ae37a36ae560c37448c9705a.path, + sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path, ) capabilities = capa.main.find_capabilities(rules, extractor) assert "test rule" in capabilities @@ -242,8 +260,7 @@ def test_byte_matching(sample_9324d1a8ae37a36ae560c37448c9705a): ) extractor = capa.features.extractors.viv.VivisectFeatureExtractor( - sample_9324d1a8ae37a36ae560c37448c9705a.vw, - sample_9324d1a8ae37a36ae560c37448c9705a.path, + sample_9324d1a8ae37a36ae560c37448c9705a.vw, sample_9324d1a8ae37a36ae560c37448c9705a.path, ) capabilities = capa.main.find_capabilities(rules, extractor) assert "byte match test" in capabilities diff --git a/tests/test_rules.py b/tests/test_rules.py index 039edb64..6e5f0421 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -34,10 +34,7 @@ def test_rule_yaml(): assert r.evaluate({Number(0): {1}}) == False assert r.evaluate({Number(0): {1}, Number(1): {1}}) == False assert r.evaluate({Number(0): {1}, Number(1): {1}, Number(2): {1}}) == True - assert ( - r.evaluate({Number(0): {1}, Number(1): {1}, Number(2): {1}, Number(3): {1}}) - == True - ) + assert r.evaluate({Number(0): {1}, Number(1): {1}, Number(2): {1}, Number(3): {1}}) == True def test_rule_yaml_complex(): @@ -60,15 +57,13 @@ def test_rule_yaml_complex(): """ ) r = capa.rules.Rule.from_yaml(rule) - assert ( - r.evaluate({Number(5): {1}, Number(6): {1}, Number(7): {1}, Number(8): {1}}) - == True - ) + assert r.evaluate({Number(5): {1}, Number(6): {1}, Number(7): {1}, Number(8): {1}}) == True assert r.evaluate({Number(6): {1}, Number(7): {1}, Number(8): {1}}) == False def test_rule_yaml_descriptions(): - rule = textwrap.dedent(''' + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -78,9 +73,12 @@ def test_rule_yaml_descriptions(): - string: This program cannot be run in DOS mode. description: MS-DOS stub message - count(number(2 = AF_INET/SOCK_DGRAM)): 2 - ''') + """ + ) r = capa.rules.Rule.from_yaml(rule) - assert r.evaluate({Number(1): {1}, Number(2): {2, 3}, String('This program cannot be run in DOS mode.'): {4}}) == True + assert ( + r.evaluate({Number(1): {1}, Number(2): {2, 3}, String("This program cannot be run in DOS mode."): {4}}) == True + ) def test_rule_yaml_not(): @@ -149,53 +147,43 @@ def test_invalid_rule_feature(): ) with pytest.raises(capa.rules.InvalidRule): - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: test rule - scope: file - features: - - characteristic: nzxor - ''')) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scope: file + features: + - characteristic: nzxor + """ + ) + ) with pytest.raises(capa.rules.InvalidRule): - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: test rule - scope: function - features: - - characteristic: embedded pe - ''')) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scope: function + features: + - characteristic: embedded pe + """ + ) + ) with pytest.raises(capa.rules.InvalidRule): - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: test rule - scope: basic block - features: - - characteristic: embedded pe - ''')) - - -def test_lib_rules(): - rules = capa.rules.RuleSet([ - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: a lib rule - lib: true - features: - - api: CreateFileA - ''')), - capa.rules.Rule.from_yaml(textwrap.dedent(''' + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: test rule scope: basic block features: - - characteristic(embedded pe): true + - characteristic: embedded pe """ ) ) @@ -234,21 +222,27 @@ def test_lib_rules(): def test_subscope_rules(): - rules = capa.rules.RuleSet([ - capa.rules.Rule.from_yaml(textwrap.dedent(''' - rule: - meta: - name: test rule - scope: file - features: - - and: - - characteristic: embedded pe - - function: + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scope: file + features: - and: - - characteristic: nzxor - - characteristic: switch - ''')) - ]) + - characteristic: embedded pe + - function: + - and: + - characteristic: nzxor + - characteristic: switch + """ + ) + ) + ] + ) # the file rule scope will have one rules: # - `test rule` assert len(rules.file_rules) == 1 @@ -317,7 +311,9 @@ def test_invalid_rules(): name: test rule features: - characteristic: number(1) - ''')) + """ + ) + ) with pytest.raises(capa.rules.InvalidRule): r = capa.rules.Rule.from_yaml( @@ -328,7 +324,9 @@ def test_invalid_rules(): name: test rule features: - characteristic: count(number(100)) - ''')) + """ + ) + ) def test_number_symbol(): @@ -654,16 +652,12 @@ def test_rules_namespace_dependencies(): ), ] - r3 = set( - map(lambda r: r.name, capa.rules.get_rules_and_dependencies(rules, "rule 3")) - ) + r3 = set(map(lambda r: r.name, capa.rules.get_rules_and_dependencies(rules, "rule 3"))) assert "rule 1" in r3 assert "rule 2" not in r3 assert "rule 4" not in r3 - r4 = set( - map(lambda r: r.name, capa.rules.get_rules_and_dependencies(rules, "rule 4")) - ) + r4 = set(map(lambda r: r.name, capa.rules.get_rules_and_dependencies(rules, "rule 4"))) assert "rule 1" in r4 assert "rule 2" in r4 assert "rule 3" not in r4 diff --git a/tests/test_viv_features.py b/tests/test_viv_features.py index 2d57eada..749c6a26 100644 --- a/tests/test_viv_features.py +++ b/tests/test_viv_features.py @@ -24,13 +24,9 @@ def extract_function_features(f): features = collections.defaultdict(set) for bb in f.basic_blocks: for insn in bb.instructions: - for feature, va in capa.features.extractors.viv.insn.extract_features( - f, bb, insn - ): + for feature, va in capa.features.extractors.viv.insn.extract_features(f, bb, insn): features[feature].add(va) - for feature, va in capa.features.extractors.viv.basicblock.extract_features( - f, bb - ): + for feature, va in capa.features.extractors.viv.basicblock.extract_features(f, bb): features[feature].add(va) for feature, va in capa.features.extractors.viv.function.extract_features(f): features[feature].add(va) @@ -40,9 +36,7 @@ def extract_function_features(f): def extract_basic_block_features(f, bb): features = set({}) for insn in bb.instructions: - for feature, _ in capa.features.extractors.viv.insn.extract_features( - f, bb, insn - ): + for feature, _ in capa.features.extractors.viv.insn.extract_features(f, bb, insn): features.add(feature) for feature, _ in capa.features.extractors.viv.basicblock.extract_features(f, bb): features.add(feature) @@ -64,18 +58,14 @@ def test_api_features(mimikatz): def test_api_features_64_bit(sample_a198216798ca38f280dc413f8c57f2c2): - features = extract_function_features( - viv_utils.Function(sample_a198216798ca38f280dc413f8c57f2c2.vw, 0x4011B0) - ) + features = extract_function_features(viv_utils.Function(sample_a198216798ca38f280dc413f8c57f2c2.vw, 0x4011B0)) assert capa.features.insn.API("kernel32.GetStringTypeA") in features assert capa.features.insn.API("kernel32.GetStringTypeW") not in features assert capa.features.insn.API("kernel32.GetStringType") in features assert capa.features.insn.API("GetStringTypeA") in features assert capa.features.insn.API("GetStringType") in features # call via thunk in IDA Pro - features = extract_function_features( - viv_utils.Function(sample_a198216798ca38f280dc413f8c57f2c2.vw, 0x401CB0) - ) + features = extract_function_features(viv_utils.Function(sample_a198216798ca38f280dc413f8c57f2c2.vw, 0x401CB0)) assert capa.features.insn.API("msvcrt.vfprintf") in features assert capa.features.insn.API("vfprintf") in features @@ -90,20 +80,14 @@ def test_string_features(mimikatz): def test_byte_features(sample_9324d1a8ae37a36ae560c37448c9705a): - features = extract_function_features( - viv_utils.Function(sample_9324d1a8ae37a36ae560c37448c9705a.vw, 0x406F60) - ) - wanted = capa.features.Bytes( - b"\xED\x24\x9E\xF4\x52\xA9\x07\x47\x55\x8E\xE1\xAB\x30\x8E\x23\x61" - ) + features = extract_function_features(viv_utils.Function(sample_9324d1a8ae37a36ae560c37448c9705a.vw, 0x406F60)) + wanted = capa.features.Bytes(b"\xED\x24\x9E\xF4\x52\xA9\x07\x47\x55\x8E\xE1\xAB\x30\x8E\x23\x61") # use `==` rather than `is` because the result is not `True` but a truthy value. assert wanted.evaluate(features) == True def test_byte_features64(sample_lab21_01): - features = extract_function_features( - viv_utils.Function(sample_lab21_01.vw, 0x1400010C0) - ) + features = extract_function_features(viv_utils.Function(sample_lab21_01.vw, 0x1400010C0)) wanted = capa.features.Bytes(b"\x32\xA2\xDF\x2D\x99\x2B\x00\x00") # use `==` rather than `is` because the result is not `True` but a truthy value. assert wanted.evaluate(features) == True @@ -130,7 +114,7 @@ def test_offset_features(mimikatz): def test_nzxor_features(mimikatz): features = extract_function_features(viv_utils.Function(mimikatz.vw, 0x410DFC)) - assert capa.features.Characteristic('nzxor') in features # 0x0410F0B + assert capa.features.Characteristic("nzxor") in features # 0x0410F0B def get_bb_insn(f, va): @@ -147,9 +131,7 @@ def test_is_security_cookie(mimikatz): f = viv_utils.Function(mimikatz.vw, 0x410DFC) for va in [0x0410F0B]: bb, insn = get_bb_insn(f, va) - assert ( - capa.features.extractors.viv.insn.is_security_cookie(f, bb, insn) == False - ) + assert capa.features.extractors.viv.insn.is_security_cookie(f, bb, insn) == False # security cookie initial set and final check f = viv_utils.Function(mimikatz.vw, 0x46C54A) @@ -170,7 +152,7 @@ def test_mnemonic_features(mimikatz): def test_peb_access_features(sample_a933a1a402775cfa94b6bee0963f4b46): features = extract_function_features(viv_utils.Function(sample_a933a1a402775cfa94b6bee0963f4b46.vw, 0xABA6FEC)) - assert capa.features.Characteristic('peb access') in features + assert capa.features.Characteristic("peb access") in features def test_file_section_name_features(mimikatz): @@ -186,7 +168,7 @@ def test_tight_loop_features(mimikatz): if bb.va != 0x402F8E: continue features = extract_basic_block_features(f, bb) - assert capa.features.Characteristic('tight loop') in features + assert capa.features.Characteristic("tight loop") in features assert capa.features.basicblock.BasicBlock() in features @@ -196,7 +178,7 @@ def test_tight_loop_bb_features(mimikatz): if bb.va != 0x402F8E: continue features = extract_basic_block_features(f, bb) - assert capa.features.Characteristic('tight loop') in features + assert capa.features.Characteristic("tight loop") in features assert capa.features.basicblock.BasicBlock() in features @@ -218,65 +200,70 @@ def test_file_import_name_features(mimikatz): def test_cross_section_flow_features(sample_a198216798ca38f280dc413f8c57f2c2): features = extract_function_features(viv_utils.Function(sample_a198216798ca38f280dc413f8c57f2c2.vw, 0x4014D0)) - assert capa.features.Characteristic('cross section flow') in features + assert capa.features.Characteristic("cross section flow") in features # this function has calls to some imports, # which should not trigger cross-section flow characteristic features = extract_function_features(viv_utils.Function(sample_a198216798ca38f280dc413f8c57f2c2.vw, 0x401563)) - assert capa.features.Characteristic('cross section flow') not in features + assert capa.features.Characteristic("cross section flow") not in features def test_segment_access_features(sample_a933a1a402775cfa94b6bee0963f4b46): features = extract_function_features(viv_utils.Function(sample_a933a1a402775cfa94b6bee0963f4b46.vw, 0xABA6FEC)) - assert capa.features.Characteristic('fs access') in features + assert capa.features.Characteristic("fs access") in features def test_thunk_features(sample_9324d1a8ae37a36ae560c37448c9705a): - features = extract_function_features( - viv_utils.Function(sample_9324d1a8ae37a36ae560c37448c9705a.vw, 0x407970) - ) + features = extract_function_features(viv_utils.Function(sample_9324d1a8ae37a36ae560c37448c9705a.vw, 0x407970)) assert capa.features.insn.API("kernel32.CreateToolhelp32Snapshot") in features assert capa.features.insn.API("CreateToolhelp32Snapshot") in features def test_file_embedded_pe(pma_lab_12_04): features = extract_file_features(pma_lab_12_04.vw, pma_lab_12_04.path) - assert capa.features.Characteristic('embedded pe') in features + assert capa.features.Characteristic("embedded pe") in features def test_stackstring_features(mimikatz): features = extract_function_features(viv_utils.Function(mimikatz.vw, 0x4556E5)) - assert capa.features.Characteristic('stack string') in features + assert capa.features.Characteristic("stack string") in features def test_switch_features(mimikatz): features = extract_function_features(viv_utils.Function(mimikatz.vw, 0x409411)) - assert capa.features.Characteristic('switch') in features + assert capa.features.Characteristic("switch") in features features = extract_function_features(viv_utils.Function(mimikatz.vw, 0x409393)) - assert capa.features.Characteristic('switch') not in features + assert capa.features.Characteristic("switch") not in features def test_recursive_call_feature(sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41): - features = extract_function_features(viv_utils.Function(sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.vw, 0x10003100)) - assert capa.features.Characteristic('recursive call') in features + features = extract_function_features( + viv_utils.Function(sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.vw, 0x10003100) + ) + assert capa.features.Characteristic("recursive call") in features - features = extract_function_features(viv_utils.Function(sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.vw, 0x10007B00)) - assert capa.features.Characteristic('recursive call') not in features + features = extract_function_features( + viv_utils.Function(sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.vw, 0x10007B00) + ) + assert capa.features.Characteristic("recursive call") not in features def test_loop_feature(sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41): - features = extract_function_features(viv_utils.Function(sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.vw, 0x10003D30)) - assert capa.features.Characteristic('loop') in features + features = extract_function_features( + viv_utils.Function(sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.vw, 0x10003D30) + ) + assert capa.features.Characteristic("loop") in features - features = extract_function_features(viv_utils.Function(sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.vw, 0x10007250)) - assert capa.features.Characteristic('loop') not in features + features = extract_function_features( + viv_utils.Function(sample_39c05b15e9834ac93f206bc114d0a00c357c888db567ba8f5345da0529cbed41.vw, 0x10007250) + ) + assert capa.features.Characteristic("loop") not in features def test_file_string_features(sample_bfb9b5391a13d0afd787e87ab90f14f5): features = extract_file_features( - sample_bfb9b5391a13d0afd787e87ab90f14f5.vw, - sample_bfb9b5391a13d0afd787e87ab90f14f5.path, + sample_bfb9b5391a13d0afd787e87ab90f14f5.vw, sample_bfb9b5391a13d0afd787e87ab90f14f5.path, ) assert capa.features.String("WarStop") in features # ASCII, offset 0x40EC assert capa.features.String("cimage/png") in features # UTF-16 LE, offset 0x350E @@ -284,39 +271,35 @@ def test_file_string_features(sample_bfb9b5391a13d0afd787e87ab90f14f5): def test_function_calls_to(sample_9324d1a8ae37a36ae560c37448c9705a): features = extract_function_features(viv_utils.Function(sample_9324d1a8ae37a36ae560c37448c9705a.vw, 0x406F60)) - assert capa.features.Characteristic('calls to') in features - assert len(features[capa.features.Characteristic('calls to')]) == 1 + assert capa.features.Characteristic("calls to") in features + assert len(features[capa.features.Characteristic("calls to")]) == 1 def test_function_calls_to64(sample_lab21_01): features = extract_function_features(viv_utils.Function(sample_lab21_01.vw, 0x1400052D0)) # memcpy - assert capa.features.Characteristic('calls to') in features - assert len(features[capa.features.Characteristic('calls to')]) == 8 + assert capa.features.Characteristic("calls to") in features + assert len(features[capa.features.Characteristic("calls to")]) == 8 def test_function_calls_from(sample_9324d1a8ae37a36ae560c37448c9705a): features = extract_function_features(viv_utils.Function(sample_9324d1a8ae37a36ae560c37448c9705a.vw, 0x406F60)) - assert capa.features.Characteristic('calls from') in features - assert len(features[capa.features.Characteristic('calls from')]) == 23 + assert capa.features.Characteristic("calls from") in features + assert len(features[capa.features.Characteristic("calls from")]) == 23 def test_basic_block_count(sample_9324d1a8ae37a36ae560c37448c9705a): - features = extract_function_features( - viv_utils.Function(sample_9324d1a8ae37a36ae560c37448c9705a.vw, 0x406F60) - ) + features = extract_function_features(viv_utils.Function(sample_9324d1a8ae37a36ae560c37448c9705a.vw, 0x406F60)) assert len(features[capa.features.basicblock.BasicBlock()]) == 26 def test_indirect_call_features(sample_a933a1a402775cfa94b6bee0963f4b46): features = extract_function_features(viv_utils.Function(sample_a933a1a402775cfa94b6bee0963f4b46.vw, 0xABA68A0)) - assert capa.features.Characteristic('indirect call') in features - assert len(features[capa.features.Characteristic('indirect call')]) == 3 + assert capa.features.Characteristic("indirect call") in features + assert len(features[capa.features.Characteristic("indirect call")]) == 3 def test_indirect_calls_resolved(sample_c91887d861d9bd4a5872249b641bc9f9): - features = extract_function_features( - viv_utils.Function(sample_c91887d861d9bd4a5872249b641bc9f9.vw, 0x401A77) - ) + features = extract_function_features(viv_utils.Function(sample_c91887d861d9bd4a5872249b641bc9f9.vw, 0x401A77)) assert capa.features.insn.API("kernel32.CreatePipe") in features assert capa.features.insn.API("kernel32.SetHandleInformation") in features assert capa.features.insn.API("kernel32.CloseHandle") in features