diff --git a/capa/features/extractors/viv/indirect_calls.py b/capa/features/extractors/viv/indirect_calls.py index 58d79cac..203aa7c3 100644 --- a/capa/features/extractors/viv/indirect_calls.py +++ b/capa/features/extractors/viv/indirect_calls.py @@ -1,149 +1,149 @@ -import collections - -import envi -import vivisect.const -import envi.archs.i386.disasm -import envi.archs.amd64.disasm - -# pull out consts for lookup performance -i386RegOper = envi.archs.i386.disasm.i386RegOper -i386ImmOper = envi.archs.i386.disasm.i386ImmOper -i386ImmMemOper = envi.archs.i386.disasm.i386ImmMemOper -Amd64RipRelOper = envi.archs.amd64.disasm.Amd64RipRelOper -LOC_OP = vivisect.const.LOC_OP -IF_NOFALL = envi.IF_NOFALL -REF_CODE = vivisect.const.REF_CODE -FAR_BRANCH_MASK = envi.BR_PROC | envi.BR_DEREF | envi.BR_ARCH - -DESTRUCTIVE_MNEMONICS = ("mov", "lea", "pop", "xor") - - -def get_previous_instructions(vw, va): - """ - collect the instructions that flow to the given address, local to the current function. - - args: - vw (vivisect.Workspace) - va (int): the virtual address to inspect - - returns: - List[int]: the prior instructions, which may fallthrough and/or jump here - """ - ret = [] - - # find the immediate prior instruction. - # ensure that it fallsthrough to this one. - loc = vw.getPrevLocation(va, adjacent=True) - if loc is not None: - # from vivisect.const: - # location: (L_VA, L_SIZE, L_LTYPE, L_TINFO) - (pva, _, ptype, pinfo) = vw.getPrevLocation(va, adjacent=True) - - if ptype == LOC_OP and not (pinfo & IF_NOFALL): - ret.append(pva) - - # find any code refs, e.g. jmp, to this location. - # ignore any calls. - # - # from vivisect.const: - # xref: (XR_FROM, XR_TO, XR_RTYPE, XR_RFLAG) - for (xfrom, _, _, xflag) in vw.getXrefsTo(va, REF_CODE): - if (xflag & FAR_BRANCH_MASK) != 0: - continue - ret.append(xfrom) - - return ret - - -class NotFoundError(Exception): - pass - - -def find_definition(vw, va, reg): - """ - scan backwards from the given address looking for assignments to the given register. - if a constant, return that value. - - args: - vw (vivisect.Workspace) - va (int): the virtual address at which to start analysis - reg (int): the vivisect register to study - - returns: - (va: int, value?: int|None): the address of the assignment and the value, if a constant. - - raises: - NotFoundError: when the definition cannot be found. - """ - q = collections.deque() - seen = set([]) - - q.extend(get_previous_instructions(vw, va)) - while q: - cur = q.popleft() - - # skip if we've already processed this location - if cur in seen: - continue - seen.add(cur) - - insn = vw.parseOpcode(cur) - - if len(insn.opers) == 0: - q.extend(get_previous_instructions(vw, cur)) - continue - - opnd0 = insn.opers[0] - if not (isinstance(opnd0, i386RegOper) and opnd0.reg == reg and insn.mnem in DESTRUCTIVE_MNEMONICS): - q.extend(get_previous_instructions(vw, cur)) - continue - - # if we reach here, the instruction is destructive to our target register. - - # we currently only support extracting the constant from something like: `mov $reg, IAT` - # so, any other pattern results in an unknown value, represented by None. - # this is a good place to extend in the future, if we need more robust support. - if insn.mnem != "mov": - return (cur, None) - else: - opnd1 = insn.opers[1] - if isinstance(opnd1, i386ImmOper): - return (cur, opnd1.getOperValue(opnd1)) - elif isinstance(opnd1, i386ImmMemOper): - return (cur, opnd1.getOperAddr(opnd1)) - elif isinstance(opnd1, Amd64RipRelOper): - return (cur, opnd1.getOperAddr(insn)) - else: - # might be something like: `mov $reg, dword_401000[eax]` - return (cur, None) - - raise NotFoundError() - - -def is_indirect_call(vw, va, insn=None): - if insn is None: - insn = vw.parseOpcode(va) - - return insn.mnem == "call" and isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegOper) - - -def resolve_indirect_call(vw, va, insn=None): - """ - inspect the given indirect call instruction and attempt to resolve the target address. - - args: - vw (vivisect.Workspace) - va (int): the virtual address at which to start analysis - - returns: - (va: int, value?: int|None): the address of the assignment and the value, if a constant. - - raises: - NotFoundError: when the definition cannot be found. - """ - if insn is None: - insn = vw.parseOpcode(va) - - assert is_indirect_call(vw, va, insn=insn) - - return find_definition(vw, va, insn.opers[0].reg) +import collections + +import envi +import vivisect.const +import envi.archs.i386.disasm +import envi.archs.amd64.disasm + +# pull out consts for lookup performance +i386RegOper = envi.archs.i386.disasm.i386RegOper +i386ImmOper = envi.archs.i386.disasm.i386ImmOper +i386ImmMemOper = envi.archs.i386.disasm.i386ImmMemOper +Amd64RipRelOper = envi.archs.amd64.disasm.Amd64RipRelOper +LOC_OP = vivisect.const.LOC_OP +IF_NOFALL = envi.IF_NOFALL +REF_CODE = vivisect.const.REF_CODE +FAR_BRANCH_MASK = envi.BR_PROC | envi.BR_DEREF | envi.BR_ARCH + +DESTRUCTIVE_MNEMONICS = ("mov", "lea", "pop", "xor") + + +def get_previous_instructions(vw, va): + """ + collect the instructions that flow to the given address, local to the current function. + + args: + vw (vivisect.Workspace) + va (int): the virtual address to inspect + + returns: + List[int]: the prior instructions, which may fallthrough and/or jump here + """ + ret = [] + + # find the immediate prior instruction. + # ensure that it fallsthrough to this one. + loc = vw.getPrevLocation(va, adjacent=True) + if loc is not None: + # from vivisect.const: + # location: (L_VA, L_SIZE, L_LTYPE, L_TINFO) + (pva, _, ptype, pinfo) = vw.getPrevLocation(va, adjacent=True) + + if ptype == LOC_OP and not (pinfo & IF_NOFALL): + ret.append(pva) + + # find any code refs, e.g. jmp, to this location. + # ignore any calls. + # + # from vivisect.const: + # xref: (XR_FROM, XR_TO, XR_RTYPE, XR_RFLAG) + for (xfrom, _, _, xflag) in vw.getXrefsTo(va, REF_CODE): + if (xflag & FAR_BRANCH_MASK) != 0: + continue + ret.append(xfrom) + + return ret + + +class NotFoundError(Exception): + pass + + +def find_definition(vw, va, reg): + """ + scan backwards from the given address looking for assignments to the given register. + if a constant, return that value. + + args: + vw (vivisect.Workspace) + va (int): the virtual address at which to start analysis + reg (int): the vivisect register to study + + returns: + (va: int, value?: int|None): the address of the assignment and the value, if a constant. + + raises: + NotFoundError: when the definition cannot be found. + """ + q = collections.deque() + seen = set([]) + + q.extend(get_previous_instructions(vw, va)) + while q: + cur = q.popleft() + + # skip if we've already processed this location + if cur in seen: + continue + seen.add(cur) + + insn = vw.parseOpcode(cur) + + if len(insn.opers) == 0: + q.extend(get_previous_instructions(vw, cur)) + continue + + opnd0 = insn.opers[0] + if not (isinstance(opnd0, i386RegOper) and opnd0.reg == reg and insn.mnem in DESTRUCTIVE_MNEMONICS): + q.extend(get_previous_instructions(vw, cur)) + continue + + # if we reach here, the instruction is destructive to our target register. + + # we currently only support extracting the constant from something like: `mov $reg, IAT` + # so, any other pattern results in an unknown value, represented by None. + # this is a good place to extend in the future, if we need more robust support. + if insn.mnem != "mov": + return (cur, None) + else: + opnd1 = insn.opers[1] + if isinstance(opnd1, i386ImmOper): + return (cur, opnd1.getOperValue(opnd1)) + elif isinstance(opnd1, i386ImmMemOper): + return (cur, opnd1.getOperAddr(opnd1)) + elif isinstance(opnd1, Amd64RipRelOper): + return (cur, opnd1.getOperAddr(insn)) + else: + # might be something like: `mov $reg, dword_401000[eax]` + return (cur, None) + + raise NotFoundError() + + +def is_indirect_call(vw, va, insn=None): + if insn is None: + insn = vw.parseOpcode(va) + + return insn.mnem == "call" and isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegOper) + + +def resolve_indirect_call(vw, va, insn=None): + """ + inspect the given indirect call instruction and attempt to resolve the target address. + + args: + vw (vivisect.Workspace) + va (int): the virtual address at which to start analysis + + returns: + (va: int, value?: int|None): the address of the assignment and the value, if a constant. + + raises: + NotFoundError: when the definition cannot be found. + """ + if insn is None: + insn = vw.parseOpcode(va) + + assert is_indirect_call(vw, va, insn=insn) + + return find_definition(vw, va, insn.opers[0].reg) diff --git a/capa/render/__init__.py b/capa/render/__init__.py index 1cdc96e5..2dabb022 100644 --- a/capa/render/__init__.py +++ b/capa/render/__init__.py @@ -1,266 +1,266 @@ -import json - -import six - -import capa.rules -import capa.engine - - -def convert_statement_to_result_document(statement): - """ - "statement": { - "type": "or" - }, - - "statement": { - "max": 9223372036854775808, - "min": 2, - "type": "range" - }, - """ - if isinstance(statement, capa.engine.And): - return { - "type": "and", - } - elif isinstance(statement, capa.engine.Or): - return { - "type": "or", - } - elif isinstance(statement, capa.engine.Not): - return { - "type": "not", - } - elif isinstance(statement, capa.engine.Some) and statement.count == 0: - return {"type": "optional"} - elif isinstance(statement, capa.engine.Some) and statement.count > 0: - return { - "type": "some", - "count": statement.count, - } - elif isinstance(statement, capa.engine.Range): - return { - "type": "range", - "min": statement.min, - "max": statement.max, - "child": convert_feature_to_result_document(statement.child), - } - elif isinstance(statement, capa.engine.Regex): - return { - "type": "regex", - "pattern": statement.pattern, - # the string that was matched - "match": statement.match, - } - elif isinstance(statement, capa.engine.Subscope): - return { - "type": "subscope", - "subscope": statement.scope, - } - else: - raise RuntimeError("unexpected match statement type: " + str(statement)) - - -def convert_feature_to_result_document(feature): - """ - "feature": { - "number": 6, - "type": "number" - }, - - "feature": { - "api": "ws2_32.WSASocket", - "type": "api" - }, - - "feature": { - "match": "create TCP socket", - "type": "match" - }, - - "feature": { - "characteristic": [ - "loop", - true - ], - "type": "characteristic" - }, - """ - result = {"type": feature.name, feature.name: feature.get_args_str()} - if feature.description: - result["description"] = feature.description - - return result - - -def convert_node_to_result_document(node): - """ - "node": { - "type": "statement", - "statement": { ... } - }, - - "node": { - "type": "feature", - "feature": { ... } - }, - """ - - if isinstance(node, capa.engine.Statement): - return { - "type": "statement", - "statement": convert_statement_to_result_document(node), - } - elif isinstance(node, capa.features.Feature): - return { - "type": "feature", - "feature": convert_feature_to_result_document(node), - } - else: - raise RuntimeError("unexpected match node type") - - -def convert_match_to_result_document(rules, capabilities, result): - """ - convert the given Result instance into a common, Python-native data structure. - this will become part of the "result document" format that can be emitted to JSON. - """ - doc = { - "success": bool(result.success), - "node": convert_node_to_result_document(result.statement), - "children": [convert_match_to_result_document(rules, capabilities, child) for child in result.children], - } - - # logic expression, like `and`, don't have locations - their children do. - # so only add `locations` to feature nodes. - if isinstance(result.statement, capa.features.Feature): - if bool(result.success): - doc["locations"] = result.locations - elif isinstance(result.statement, capa.rules.Range): - if bool(result.success): - doc["locations"] = result.locations - - # if we have a `match` statement, then we're referencing another rule. - # this could an external rule (written by a human), or - # rule generated to support a subscope (basic block, etc.) - # we still want to include the matching logic in this tree. - # - # so, we need to lookup the other rule results - # and then filter those down to the address used here. - # finally, splice that logic into this tree. - if ( - doc["node"]["type"] == "feature" - and doc["node"]["feature"]["type"] == "match" - # only add subtree on success, - # because there won't be results for the other rule on failure. - and doc["success"] - ): - - rule_name = doc["node"]["feature"]["match"] - rule = rules[rule_name] - rule_matches = {address: result for (address, result) in capabilities[rule_name]} - - if rule.meta.get("capa/subscope-rule"): - # for a subscope rule, fixup the node to be a scope node, rather than a match feature node. - # - # e.g. `contain loop/30c4c78e29bf4d54894fc74f664c62e8` -> `basic block` - scope = rule.meta["scope"] - doc["node"] = { - "type": "statement", - "statement": {"type": "subscope", "subscope": scope,}, - } - - for location in doc["locations"]: - doc["children"].append(convert_match_to_result_document(rules, capabilities, rule_matches[location])) - - return doc - - -def convert_capabilities_to_result_document(rules, capabilities): - """ - convert the given rule set and capabilities result to a common, Python-native data structure. - this format can be directly emitted to JSON, or passed to the other `render_*` routines - to render as text. - - see examples of substructures in above routines. - - schema: - - ```json - { - $rule-name: { - "meta": {...copied from rule.meta...}, - "matches: { - $address: {...match details...}, - ... - } - }, - ... - } - ``` - - Args: - rules (RuleSet): - capabilities (Dict[str, List[Tuple[int, Result]]]): - """ - doc = {} - - for rule_name, matches in capabilities.items(): - rule = rules[rule_name] - - if rule.meta.get("capa/subscope-rule"): - continue - - doc[rule_name] = { - "meta": dict(rule.meta), - "source": rule.definition, - "matches": { - addr: convert_match_to_result_document(rules, capabilities, match) for (addr, match) in matches - }, - } - - return doc - - -def render_vverbose(rules, capabilities): - # there's an import loop here - # if capa.render imports capa.render.vverbose - # and capa.render.vverbose import capa.render (implicitly, as a submodule) - # so, defer the import until routine is called, breaking the import loop. - import capa.render.vverbose - - doc = convert_capabilities_to_result_document(rules, capabilities) - return capa.render.vverbose.render_vverbose(doc) - - -def render_verbose(rules, capabilities): - # break import loop - import capa.render.verbose - - doc = convert_capabilities_to_result_document(rules, capabilities) - return capa.render.verbose.render_verbose(doc) - - -def render_default(rules, capabilities): - # break import loop - import capa.render.verbose - import capa.render.default - - doc = convert_capabilities_to_result_document(rules, capabilities) - return capa.render.default.render_default(doc) - - -class CapaJsonObjectEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, (list, dict, int, float, bool, type(None))) or isinstance(obj, six.string_types): - return json.JSONEncoder.default(self, obj) - elif isinstance(obj, set): - return list(sorted(obj)) - else: - # probably will TypeError - return json.JSONEncoder.default(self, obj) - - -def render_json(rules, capabilities): - return json.dumps( - convert_capabilities_to_result_document(rules, capabilities), cls=CapaJsonObjectEncoder, sort_keys=True, - ) +import json + +import six + +import capa.rules +import capa.engine + + +def convert_statement_to_result_document(statement): + """ + "statement": { + "type": "or" + }, + + "statement": { + "max": 9223372036854775808, + "min": 2, + "type": "range" + }, + """ + if isinstance(statement, capa.engine.And): + return { + "type": "and", + } + elif isinstance(statement, capa.engine.Or): + return { + "type": "or", + } + elif isinstance(statement, capa.engine.Not): + return { + "type": "not", + } + elif isinstance(statement, capa.engine.Some) and statement.count == 0: + return {"type": "optional"} + elif isinstance(statement, capa.engine.Some) and statement.count > 0: + return { + "type": "some", + "count": statement.count, + } + elif isinstance(statement, capa.engine.Range): + return { + "type": "range", + "min": statement.min, + "max": statement.max, + "child": convert_feature_to_result_document(statement.child), + } + elif isinstance(statement, capa.engine.Regex): + return { + "type": "regex", + "pattern": statement.pattern, + # the string that was matched + "match": statement.match, + } + elif isinstance(statement, capa.engine.Subscope): + return { + "type": "subscope", + "subscope": statement.scope, + } + else: + raise RuntimeError("unexpected match statement type: " + str(statement)) + + +def convert_feature_to_result_document(feature): + """ + "feature": { + "number": 6, + "type": "number" + }, + + "feature": { + "api": "ws2_32.WSASocket", + "type": "api" + }, + + "feature": { + "match": "create TCP socket", + "type": "match" + }, + + "feature": { + "characteristic": [ + "loop", + true + ], + "type": "characteristic" + }, + """ + result = {"type": feature.name, feature.name: feature.get_args_str()} + if feature.description: + result["description"] = feature.description + + return result + + +def convert_node_to_result_document(node): + """ + "node": { + "type": "statement", + "statement": { ... } + }, + + "node": { + "type": "feature", + "feature": { ... } + }, + """ + + if isinstance(node, capa.engine.Statement): + return { + "type": "statement", + "statement": convert_statement_to_result_document(node), + } + elif isinstance(node, capa.features.Feature): + return { + "type": "feature", + "feature": convert_feature_to_result_document(node), + } + else: + raise RuntimeError("unexpected match node type") + + +def convert_match_to_result_document(rules, capabilities, result): + """ + convert the given Result instance into a common, Python-native data structure. + this will become part of the "result document" format that can be emitted to JSON. + """ + doc = { + "success": bool(result.success), + "node": convert_node_to_result_document(result.statement), + "children": [convert_match_to_result_document(rules, capabilities, child) for child in result.children], + } + + # logic expression, like `and`, don't have locations - their children do. + # so only add `locations` to feature nodes. + if isinstance(result.statement, capa.features.Feature): + if bool(result.success): + doc["locations"] = result.locations + elif isinstance(result.statement, capa.rules.Range): + if bool(result.success): + doc["locations"] = result.locations + + # if we have a `match` statement, then we're referencing another rule. + # this could an external rule (written by a human), or + # rule generated to support a subscope (basic block, etc.) + # we still want to include the matching logic in this tree. + # + # so, we need to lookup the other rule results + # and then filter those down to the address used here. + # finally, splice that logic into this tree. + if ( + doc["node"]["type"] == "feature" + and doc["node"]["feature"]["type"] == "match" + # only add subtree on success, + # because there won't be results for the other rule on failure. + and doc["success"] + ): + + rule_name = doc["node"]["feature"]["match"] + rule = rules[rule_name] + rule_matches = {address: result for (address, result) in capabilities[rule_name]} + + if rule.meta.get("capa/subscope-rule"): + # for a subscope rule, fixup the node to be a scope node, rather than a match feature node. + # + # e.g. `contain loop/30c4c78e29bf4d54894fc74f664c62e8` -> `basic block` + scope = rule.meta["scope"] + doc["node"] = { + "type": "statement", + "statement": {"type": "subscope", "subscope": scope,}, + } + + for location in doc["locations"]: + doc["children"].append(convert_match_to_result_document(rules, capabilities, rule_matches[location])) + + return doc + + +def convert_capabilities_to_result_document(rules, capabilities): + """ + convert the given rule set and capabilities result to a common, Python-native data structure. + this format can be directly emitted to JSON, or passed to the other `render_*` routines + to render as text. + + see examples of substructures in above routines. + + schema: + + ```json + { + $rule-name: { + "meta": {...copied from rule.meta...}, + "matches: { + $address: {...match details...}, + ... + } + }, + ... + } + ``` + + Args: + rules (RuleSet): + capabilities (Dict[str, List[Tuple[int, Result]]]): + """ + doc = {} + + for rule_name, matches in capabilities.items(): + rule = rules[rule_name] + + if rule.meta.get("capa/subscope-rule"): + continue + + doc[rule_name] = { + "meta": dict(rule.meta), + "source": rule.definition, + "matches": { + addr: convert_match_to_result_document(rules, capabilities, match) for (addr, match) in matches + }, + } + + return doc + + +def render_vverbose(rules, capabilities): + # there's an import loop here + # if capa.render imports capa.render.vverbose + # and capa.render.vverbose import capa.render (implicitly, as a submodule) + # so, defer the import until routine is called, breaking the import loop. + import capa.render.vverbose + + doc = convert_capabilities_to_result_document(rules, capabilities) + return capa.render.vverbose.render_vverbose(doc) + + +def render_verbose(rules, capabilities): + # break import loop + import capa.render.verbose + + doc = convert_capabilities_to_result_document(rules, capabilities) + return capa.render.verbose.render_verbose(doc) + + +def render_default(rules, capabilities): + # break import loop + import capa.render.verbose + import capa.render.default + + doc = convert_capabilities_to_result_document(rules, capabilities) + return capa.render.default.render_default(doc) + + +class CapaJsonObjectEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, (list, dict, int, float, bool, type(None))) or isinstance(obj, six.string_types): + return json.JSONEncoder.default(self, obj) + elif isinstance(obj, set): + return list(sorted(obj)) + else: + # probably will TypeError + return json.JSONEncoder.default(self, obj) + + +def render_json(rules, capabilities): + return json.dumps( + convert_capabilities_to_result_document(rules, capabilities), cls=CapaJsonObjectEncoder, sort_keys=True, + ) diff --git a/capa/render/default.py b/capa/render/default.py index fbb0fb12..a93bdda7 100644 --- a/capa/render/default.py +++ b/capa/render/default.py @@ -1,99 +1,99 @@ -import collections - -import six -import tabulate - -import capa.render.utils as rutils - - -def width(s, character_count): - """pad the given string to at least `character_count`""" - if len(s) < character_count: - return s + " " * (character_count - len(s)) - else: - return s - - -def render_capabilities(doc, ostream): - """ - example:: - - +-------------------------------------------------------+-------------------------------------------------+ - | CAPABILITY | NAMESPACE | - |-------------------------------------------------------+-------------------------------------------------| - | check for OutputDebugString error (2 matches) | anti-analysis/anti-debugging/debugger-detection | - | read and send data from client to server | c2/file-transfer | - | ... | ... | - +-------------------------------------------------------+-------------------------------------------------+ - """ - rows = [] - for rule in rutils.capability_rules(doc): - count = len(rule["matches"]) - if count == 1: - capability = rutils.bold(rule["meta"]["name"]) - else: - capability = "%s (%d matches)" % (rutils.bold(rule["meta"]["name"]), count) - rows.append((capability, rule["meta"]["namespace"])) - - ostream.write(tabulate.tabulate(rows, headers=[width("CAPABILITY", 40), width("NAMESPACE", 40)], tablefmt="psql")) - ostream.write("\n") - - -def render_attack(doc, ostream): - """ - example:: - - +------------------------+----------------------------------------------------------------------+ - | ATT&CK Tactic | ATT&CK Technique | - |------------------------+----------------------------------------------------------------------| - | DEFENSE EVASION | Obfuscated Files or Information [T1027] | - | DISCOVERY | Query Registry [T1012] | - | | System Information Discovery [T1082] | - | EXECUTION | Command and Scripting Interpreter::Windows Command Shell [T1059.003] | - | | Shared Modules [T1129] | - | EXFILTRATION | Exfiltration Over C2 Channel [T1041] | - | PERSISTENCE | Create or Modify System Process::Windows Service [T1543.003] | - +------------------------+----------------------------------------------------------------------+ - """ - tactics = collections.defaultdict(set) - for rule in rutils.capability_rules(doc): - if not rule["meta"].get("att&ck"): - continue - - for attack in rule["meta"]["att&ck"]: - tactic, _, rest = attack.partition("::") - if "::" in rest: - technique, _, rest = rest.partition("::") - subtechnique, _, id = rest.rpartition(" ") - tactics[tactic].add((technique, subtechnique, id)) - else: - technique, _, id = rest.rpartition(" ") - tactics[tactic].add((technique, id)) - - rows = [] - for tactic, techniques in sorted(tactics.items()): - inner_rows = [] - for spec in sorted(techniques): - if len(spec) == 2: - technique, id = spec - inner_rows.append("%s %s" % (rutils.bold(technique), id)) - elif len(spec) == 3: - technique, subtechnique, id = spec - inner_rows.append("%s::%s %s" % (rutils.bold(technique), subtechnique, id)) - else: - raise RuntimeError("unexpected ATT&CK spec format") - rows.append((rutils.bold(tactic.upper()), "\n".join(inner_rows),)) - ostream.write( - tabulate.tabulate(rows, headers=[width("ATT&CK Tactic", 20), width("ATT&CK Technique", 60)], tablefmt="psql") - ) - ostream.write("\n") - - -def render_default(doc): - ostream = six.StringIO() - - render_attack(doc, ostream) - ostream.write("\n") - render_capabilities(doc, ostream) - - return ostream.getvalue() +import collections + +import six +import tabulate + +import capa.render.utils as rutils + + +def width(s, character_count): + """pad the given string to at least `character_count`""" + if len(s) < character_count: + return s + " " * (character_count - len(s)) + else: + return s + + +def render_capabilities(doc, ostream): + """ + example:: + + +-------------------------------------------------------+-------------------------------------------------+ + | CAPABILITY | NAMESPACE | + |-------------------------------------------------------+-------------------------------------------------| + | check for OutputDebugString error (2 matches) | anti-analysis/anti-debugging/debugger-detection | + | read and send data from client to server | c2/file-transfer | + | ... | ... | + +-------------------------------------------------------+-------------------------------------------------+ + """ + rows = [] + for rule in rutils.capability_rules(doc): + count = len(rule["matches"]) + if count == 1: + capability = rutils.bold(rule["meta"]["name"]) + else: + capability = "%s (%d matches)" % (rutils.bold(rule["meta"]["name"]), count) + rows.append((capability, rule["meta"]["namespace"])) + + ostream.write(tabulate.tabulate(rows, headers=[width("CAPABILITY", 40), width("NAMESPACE", 40)], tablefmt="psql")) + ostream.write("\n") + + +def render_attack(doc, ostream): + """ + example:: + + +------------------------+----------------------------------------------------------------------+ + | ATT&CK Tactic | ATT&CK Technique | + |------------------------+----------------------------------------------------------------------| + | DEFENSE EVASION | Obfuscated Files or Information [T1027] | + | DISCOVERY | Query Registry [T1012] | + | | System Information Discovery [T1082] | + | EXECUTION | Command and Scripting Interpreter::Windows Command Shell [T1059.003] | + | | Shared Modules [T1129] | + | EXFILTRATION | Exfiltration Over C2 Channel [T1041] | + | PERSISTENCE | Create or Modify System Process::Windows Service [T1543.003] | + +------------------------+----------------------------------------------------------------------+ + """ + tactics = collections.defaultdict(set) + for rule in rutils.capability_rules(doc): + if not rule["meta"].get("att&ck"): + continue + + for attack in rule["meta"]["att&ck"]: + tactic, _, rest = attack.partition("::") + if "::" in rest: + technique, _, rest = rest.partition("::") + subtechnique, _, id = rest.rpartition(" ") + tactics[tactic].add((technique, subtechnique, id)) + else: + technique, _, id = rest.rpartition(" ") + tactics[tactic].add((technique, id)) + + rows = [] + for tactic, techniques in sorted(tactics.items()): + inner_rows = [] + for spec in sorted(techniques): + if len(spec) == 2: + technique, id = spec + inner_rows.append("%s %s" % (rutils.bold(technique), id)) + elif len(spec) == 3: + technique, subtechnique, id = spec + inner_rows.append("%s::%s %s" % (rutils.bold(technique), subtechnique, id)) + else: + raise RuntimeError("unexpected ATT&CK spec format") + rows.append((rutils.bold(tactic.upper()), "\n".join(inner_rows),)) + ostream.write( + tabulate.tabulate(rows, headers=[width("ATT&CK Tactic", 20), width("ATT&CK Technique", 60)], tablefmt="psql") + ) + ostream.write("\n") + + +def render_default(doc): + ostream = six.StringIO() + + render_attack(doc, ostream) + ostream.write("\n") + render_capabilities(doc, ostream) + + return ostream.getvalue() diff --git a/capa/render/utils.py b/capa/render/utils.py index 794f9904..670c1bcf 100644 --- a/capa/render/utils.py +++ b/capa/render/utils.py @@ -1,44 +1,44 @@ -import six -import termcolor - - -def bold(s): - """draw attention to the given string""" - return termcolor.colored(s, "blue") - - -def bold2(s): - """draw attention to the given string, within a `bold` section""" - return termcolor.colored(s, "green") - - -def hex(n): - """render the given number using upper case hex, like: 0x123ABC""" - return "0x%X" % n - - -def capability_rules(doc): - """enumerate the rules in (namespace, name) order that are 'capability' rules (not lib/subscope/disposition/etc).""" - for (_, _, rule) in sorted( - map(lambda rule: (rule["meta"].get("namespace", ""), rule["meta"]["name"], rule), doc.values()) - ): - if rule["meta"].get("lib"): - continue - if rule["meta"].get("capa/subscope"): - continue - if rule["meta"].get("maec/analysis-conclusion"): - continue - if rule["meta"].get("maec/analysis-conclusion-ov"): - continue - if rule["meta"].get("maec/malware-category"): - continue - if rule["meta"].get("maec/malware-category-ov"): - continue - - yield rule - - -class StringIO(six.StringIO): - def writeln(self, s): - self.write(s) - self.write("\n") +import six +import termcolor + + +def bold(s): + """draw attention to the given string""" + return termcolor.colored(s, "blue") + + +def bold2(s): + """draw attention to the given string, within a `bold` section""" + return termcolor.colored(s, "green") + + +def hex(n): + """render the given number using upper case hex, like: 0x123ABC""" + return "0x%X" % n + + +def capability_rules(doc): + """enumerate the rules in (namespace, name) order that are 'capability' rules (not lib/subscope/disposition/etc).""" + for (_, _, rule) in sorted( + map(lambda rule: (rule["meta"].get("namespace", ""), rule["meta"]["name"], rule), doc.values()) + ): + if rule["meta"].get("lib"): + continue + if rule["meta"].get("capa/subscope"): + continue + if rule["meta"].get("maec/analysis-conclusion"): + continue + if rule["meta"].get("maec/analysis-conclusion-ov"): + continue + if rule["meta"].get("maec/malware-category"): + continue + if rule["meta"].get("maec/malware-category-ov"): + continue + + yield rule + + +class StringIO(six.StringIO): + def writeln(self, s): + self.write(s) + self.write("\n") diff --git a/capa/render/verbose.py b/capa/render/verbose.py index ed3bc350..cf13402f 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -1,52 +1,52 @@ -""" -example:: - - send data - namespace communication - author william.ballenthin@fireeye.com - description all known techniques for sending data to a potential C2 server - scope function - examples BFB9B5391A13D0AFD787E87AB90F14F5:0x13145D60 - matches 0x10004363 - 0x100046c9 - 0x1000454e - 0x10003a13 - 0x10003415 - 0x10003797 -""" -import tabulate - -import capa.rules -import capa.render.utils as rutils - - -def render_verbose(doc): - ostream = rutils.StringIO() - - for rule in rutils.capability_rules(doc): - count = len(rule["matches"]) - if count == 1: - capability = rutils.bold(rule["meta"]["name"]) - else: - capability = "%s (%d matches)" % (rutils.bold(rule["meta"]["name"]), count) - - ostream.writeln(capability) - - rows = [] - for key in ("namespace", "description", "scope"): - if key == "name" or key not in rule["meta"]: - continue - - v = rule["meta"][key] - if isinstance(v, list) and len(v) == 1: - v = v[0] - rows.append((key, v)) - - if rule["meta"]["scope"] != capa.rules.FILE_SCOPE: - locations = doc[rule["meta"]["name"]]["matches"].keys() - rows.append(("matches", "\n".join(map(rutils.hex, locations)))) - - ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) - ostream.write("\n") - - return ostream.getvalue() +""" +example:: + + send data + namespace communication + author william.ballenthin@fireeye.com + description all known techniques for sending data to a potential C2 server + scope function + examples BFB9B5391A13D0AFD787E87AB90F14F5:0x13145D60 + matches 0x10004363 + 0x100046c9 + 0x1000454e + 0x10003a13 + 0x10003415 + 0x10003797 +""" +import tabulate + +import capa.rules +import capa.render.utils as rutils + + +def render_verbose(doc): + ostream = rutils.StringIO() + + for rule in rutils.capability_rules(doc): + count = len(rule["matches"]) + if count == 1: + capability = rutils.bold(rule["meta"]["name"]) + else: + capability = "%s (%d matches)" % (rutils.bold(rule["meta"]["name"]), count) + + ostream.writeln(capability) + + rows = [] + for key in ("namespace", "description", "scope"): + if key == "name" or key not in rule["meta"]: + continue + + v = rule["meta"][key] + if isinstance(v, list) and len(v) == 1: + v = v[0] + rows.append((key, v)) + + if rule["meta"]["scope"] != capa.rules.FILE_SCOPE: + locations = doc[rule["meta"]["name"]]["matches"].keys() + rows.append(("matches", "\n".join(map(rutils.hex, locations)))) + + ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) + ostream.write("\n") + + return ostream.getvalue() diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index db28baa0..a0182b11 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -1,185 +1,185 @@ -import tabulate - -import capa.rules -import capa.render.utils as rutils - - -def render_locations(ostream, match): - # its possible to have an empty locations array here, - # such as when we're in MODE_FAILURE and showing the logic - # under a `not` statement (which will have no matched locations). - locations = list(sorted(match.get("locations", []))) - if len(locations) == 1: - ostream.write(" @ ") - ostream.write(rutils.hex(locations[0])) - elif len(locations) > 1: - ostream.write(" @ ") - if len(locations) > 4: - # don't display too many locations, because it becomes very noisy. - # probably only the first handful of locations will be useful for inspection. - ostream.write(", ".join(map(rutils.hex, locations[0:4]))) - ostream.write(", and %d more..." % (len(locations) - 4)) - else: - ostream.write(", ".join(map(rutils.hex, locations))) - - -def render_statement(ostream, match, statement, indent=0): - ostream.write(" " * indent) - if statement["type"] in ("and", "or", "optional"): - ostream.write(statement["type"]) - ostream.writeln(":") - elif statement["type"] == "not": - # this statement is handled specially in `render_match` using the MODE_SUCCESS/MODE_FAILURE flags. - ostream.writeln("not:") - elif statement["type"] == "some": - ostream.write(statement["count"] + " or more") - ostream.writeln(":") - elif statement["type"] == "range": - # `range` is a weird node, its almost a hybrid of statement+feature. - # it is a specific feature repeated multiple times. - # there's no additional logic in the feature part, just the existence of a feature. - # so, we have to inline some of the feature rendering here. - - child = statement["child"] - value = rutils.bold2(child[child["type"]]) - - if child.get("description"): - ostream.write("count(%s(%s = %s)): " % (child["type"], value, child["description"])) - else: - ostream.write("count(%s(%s)): " % (child["type"], value)) - - if statement["max"] == statement["min"]: - ostream.write("%d" % (statement["min"])) - elif statement["min"] == 0: - ostream.write("%d or fewer" % (statement["max"])) - elif statement["max"] == (1 << 64 - 1): - ostream.write("%d or more" % (statement["min"])) - else: - ostream.write("between %d and %d" % (statement["min"], statement["max"])) - - render_locations(ostream, match) - ostream.write("\n") - elif statement["type"] == "subscope": - ostream.write(statement["subscope"]) - ostream.writeln(":") - elif statement["type"] == "regex": - # regex is a `Statement` not a `Feature` - # this is because it doesn't get extracted, but applies to all strings in scope. - # so we have to handle it here - ostream.writeln("string: %s" % (statement["match"])) - else: - raise RuntimeError("unexpected match statement type: " + str(statement)) - - -def render_feature(ostream, match, feature, indent=0): - ostream.write(" " * indent) - - ostream.write(feature["type"]) - ostream.write(": ") - ostream.write(rutils.bold2(feature[feature["type"]])) - - if "description" in feature: - ostream.write(" = ") - ostream.write(feature["description"]) - - render_locations(ostream, match) - ostream.write("\n") - - -def render_node(ostream, match, node, indent=0): - if node["type"] == "statement": - render_statement(ostream, match, node["statement"], indent=indent) - elif node["type"] == "feature": - render_feature(ostream, match, node["feature"], indent=indent) - else: - raise RuntimeError("unexpected node type: " + str(node)) - - -# display nodes that successfully evaluated against the sample. -MODE_SUCCESS = "success" - -# display nodes that did not evaluate to True against the sample. -# this is useful when rendering the logic tree under a `not` node. -MODE_FAILURE = "failure" - - -def render_match(ostream, match, indent=0, mode=MODE_SUCCESS): - child_mode = mode - if mode == MODE_SUCCESS: - # display only nodes that evaluated successfully. - if not match["success"]: - return - # optional statement with no successful children is empty - if match["node"].get("statement", {}).get("type") == "optional" and not any( - map(lambda m: m["success"], match["children"]) - ): - return - # not statement, so invert the child mode to show failed evaluations - if match["node"].get("statement", {}).get("type") == "not": - child_mode = MODE_FAILURE - elif mode == MODE_FAILURE: - # display only nodes that did not evaluate to True - if match["success"]: - return - # optional statement with successful children is not relevant - if match["node"].get("statement", {}).get("type") == "optional" and any( - map(lambda m: m["success"], match["children"]) - ): - return - # not statement, so invert the child mode to show successful evaluations - if match["node"].get("statement", {}).get("type") == "not": - child_mode = MODE_SUCCESS - else: - raise RuntimeError("unexpected mode: " + mode) - - render_node(ostream, match, match["node"], indent=indent) - - for child in match["children"]: - render_match(ostream, child, indent=indent + 1, mode=child_mode) - - -def render_vverbose(doc): - ostream = rutils.StringIO() - - for rule in rutils.capability_rules(doc): - count = len(rule["matches"]) - if count == 1: - capability = rutils.bold(rule["meta"]["name"]) - else: - capability = "%s (%d matches)" % (rutils.bold(rule["meta"]["name"]), count) - - ostream.writeln(capability) - - rows = [] - for key in capa.rules.META_KEYS: - if key == "name" or key not in rule["meta"]: - continue - - v = rule["meta"][key] - if isinstance(v, list) and len(v) == 1: - v = v[0] - elif isinstance(v, list) and len(v) > 1: - v = ", ".join(v) - rows.append((key, v)) - - ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) - - if rule["meta"]["scope"] == capa.rules.FILE_SCOPE: - matches = list(doc[rule["meta"]["name"]]["matches"].values()) - if len(matches) != 1: - # i think there should only ever be one match per file-scope rule, - # because we do the file-scope evaluation a single time. - # but i'm not 100% sure if this is/will always be true. - # so, lets be explicit about our assumptions and raise an exception if they fail. - raise RuntimeError("unexpected file scope match count: " + len(matches)) - render_match(ostream, matches[0], indent=0) - else: - for location, match in sorted(doc[rule["meta"]["name"]]["matches"].items()): - ostream.write(rule["meta"]["scope"]) - ostream.write(" @ ") - ostream.writeln(rutils.hex(location)) - render_match(ostream, match, indent=1) - - ostream.write("\n") - - return ostream.getvalue() +import tabulate + +import capa.rules +import capa.render.utils as rutils + + +def render_locations(ostream, match): + # its possible to have an empty locations array here, + # such as when we're in MODE_FAILURE and showing the logic + # under a `not` statement (which will have no matched locations). + locations = list(sorted(match.get("locations", []))) + if len(locations) == 1: + ostream.write(" @ ") + ostream.write(rutils.hex(locations[0])) + elif len(locations) > 1: + ostream.write(" @ ") + if len(locations) > 4: + # don't display too many locations, because it becomes very noisy. + # probably only the first handful of locations will be useful for inspection. + ostream.write(", ".join(map(rutils.hex, locations[0:4]))) + ostream.write(", and %d more..." % (len(locations) - 4)) + else: + ostream.write(", ".join(map(rutils.hex, locations))) + + +def render_statement(ostream, match, statement, indent=0): + ostream.write(" " * indent) + if statement["type"] in ("and", "or", "optional"): + ostream.write(statement["type"]) + ostream.writeln(":") + elif statement["type"] == "not": + # this statement is handled specially in `render_match` using the MODE_SUCCESS/MODE_FAILURE flags. + ostream.writeln("not:") + elif statement["type"] == "some": + ostream.write(statement["count"] + " or more") + ostream.writeln(":") + elif statement["type"] == "range": + # `range` is a weird node, its almost a hybrid of statement+feature. + # it is a specific feature repeated multiple times. + # there's no additional logic in the feature part, just the existence of a feature. + # so, we have to inline some of the feature rendering here. + + child = statement["child"] + value = rutils.bold2(child[child["type"]]) + + if child.get("description"): + ostream.write("count(%s(%s = %s)): " % (child["type"], value, child["description"])) + else: + ostream.write("count(%s(%s)): " % (child["type"], value)) + + if statement["max"] == statement["min"]: + ostream.write("%d" % (statement["min"])) + elif statement["min"] == 0: + ostream.write("%d or fewer" % (statement["max"])) + elif statement["max"] == (1 << 64 - 1): + ostream.write("%d or more" % (statement["min"])) + else: + ostream.write("between %d and %d" % (statement["min"], statement["max"])) + + render_locations(ostream, match) + ostream.write("\n") + elif statement["type"] == "subscope": + ostream.write(statement["subscope"]) + ostream.writeln(":") + elif statement["type"] == "regex": + # regex is a `Statement` not a `Feature` + # this is because it doesn't get extracted, but applies to all strings in scope. + # so we have to handle it here + ostream.writeln("string: %s" % (statement["match"])) + else: + raise RuntimeError("unexpected match statement type: " + str(statement)) + + +def render_feature(ostream, match, feature, indent=0): + ostream.write(" " * indent) + + ostream.write(feature["type"]) + ostream.write(": ") + ostream.write(rutils.bold2(feature[feature["type"]])) + + if "description" in feature: + ostream.write(" = ") + ostream.write(feature["description"]) + + render_locations(ostream, match) + ostream.write("\n") + + +def render_node(ostream, match, node, indent=0): + if node["type"] == "statement": + render_statement(ostream, match, node["statement"], indent=indent) + elif node["type"] == "feature": + render_feature(ostream, match, node["feature"], indent=indent) + else: + raise RuntimeError("unexpected node type: " + str(node)) + + +# display nodes that successfully evaluated against the sample. +MODE_SUCCESS = "success" + +# display nodes that did not evaluate to True against the sample. +# this is useful when rendering the logic tree under a `not` node. +MODE_FAILURE = "failure" + + +def render_match(ostream, match, indent=0, mode=MODE_SUCCESS): + child_mode = mode + if mode == MODE_SUCCESS: + # display only nodes that evaluated successfully. + if not match["success"]: + return + # optional statement with no successful children is empty + if match["node"].get("statement", {}).get("type") == "optional" and not any( + map(lambda m: m["success"], match["children"]) + ): + return + # not statement, so invert the child mode to show failed evaluations + if match["node"].get("statement", {}).get("type") == "not": + child_mode = MODE_FAILURE + elif mode == MODE_FAILURE: + # display only nodes that did not evaluate to True + if match["success"]: + return + # optional statement with successful children is not relevant + if match["node"].get("statement", {}).get("type") == "optional" and any( + map(lambda m: m["success"], match["children"]) + ): + return + # not statement, so invert the child mode to show successful evaluations + if match["node"].get("statement", {}).get("type") == "not": + child_mode = MODE_SUCCESS + else: + raise RuntimeError("unexpected mode: " + mode) + + render_node(ostream, match, match["node"], indent=indent) + + for child in match["children"]: + render_match(ostream, child, indent=indent + 1, mode=child_mode) + + +def render_vverbose(doc): + ostream = rutils.StringIO() + + for rule in rutils.capability_rules(doc): + count = len(rule["matches"]) + if count == 1: + capability = rutils.bold(rule["meta"]["name"]) + else: + capability = "%s (%d matches)" % (rutils.bold(rule["meta"]["name"]), count) + + ostream.writeln(capability) + + rows = [] + for key in capa.rules.META_KEYS: + if key == "name" or key not in rule["meta"]: + continue + + v = rule["meta"][key] + if isinstance(v, list) and len(v) == 1: + v = v[0] + elif isinstance(v, list) and len(v) > 1: + v = ", ".join(v) + rows.append((key, v)) + + ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) + + if rule["meta"]["scope"] == capa.rules.FILE_SCOPE: + matches = list(doc[rule["meta"]["name"]]["matches"].values()) + if len(matches) != 1: + # i think there should only ever be one match per file-scope rule, + # because we do the file-scope evaluation a single time. + # but i'm not 100% sure if this is/will always be true. + # so, lets be explicit about our assumptions and raise an exception if they fail. + raise RuntimeError("unexpected file scope match count: " + len(matches)) + render_match(ostream, matches[0], indent=0) + else: + for location, match in sorted(doc[rule["meta"]["name"]]["matches"].items()): + ostream.write(rule["meta"]["scope"]) + ostream.write(" @ ") + ostream.writeln(rutils.hex(location)) + render_match(ostream, match, indent=1) + + ostream.write("\n") + + return ostream.getvalue() diff --git a/ci/hooks/hook-vivisect.py b/ci/hooks/hook-vivisect.py index b24962be..ec599abf 100644 --- a/ci/hooks/hook-vivisect.py +++ b/ci/hooks/hook-vivisect.py @@ -1,13 +1,13 @@ -from PyInstaller.utils.hooks import copy_metadata - -# in order for viv-utils to use pkg_resources to fetch -# the installed version of vivisect, -# we need to instruct pyinstaller to embed this metadata. -# -# so we set the pyinstaller.spec/hookspath to reference -# the directory with this hook. -# -# this hook runs at analysis time and updates the embedded metadata. -# -# ref: https://github.com/pyinstaller/pyinstaller/issues/1713#issuecomment-162682084 -datas = copy_metadata("vivisect") +from PyInstaller.utils.hooks import copy_metadata + +# in order for viv-utils to use pkg_resources to fetch +# the installed version of vivisect, +# we need to instruct pyinstaller to embed this metadata. +# +# so we set the pyinstaller.spec/hookspath to reference +# the directory with this hook. +# +# this hook runs at analysis time and updates the embedded metadata. +# +# ref: https://github.com/pyinstaller/pyinstaller/issues/1713#issuecomment-162682084 +datas = copy_metadata("vivisect") diff --git a/scripts/migrate-rules.py b/scripts/migrate-rules.py index af4d6286..30b39eb2 100644 --- a/scripts/migrate-rules.py +++ b/scripts/migrate-rules.py @@ -1,160 +1,160 @@ -#!/usr/bin/env python -""" -migrate rules and their namespaces. - -example: - - $ python scripts/migrate-rules.py migration.csv ./rules ./new-rules -""" -import os -import csv -import sys -import logging -import os.path -import collections - -import argparse - -import capa.rules - -logger = logging.getLogger("migrate-rules") - - -def read_plan(plan_path): - with open(plan_path, "rb") as f: - return list( - csv.DictReader( - f, - restkey="other", - fieldnames=( - "existing path", - "existing name", - "existing rule-category", - "proposed name", - "proposed namespace", - "ATT&CK", - "MBC", - "comment1", - ), - ) - ) - - -def read_rules(rule_directory): - rules = {} - for root, dirs, files in os.walk(rule_directory): - for file in files: - path = os.path.join(root, file) - if not path.endswith(".yml"): - logger.info("skipping file: %s", path) - continue - - rule = capa.rules.Rule.from_yaml_file(path) - rules[rule.name] = rule - - if "nursery" in path: - rule.meta["capa/nursery"] = True - return rules - - -def main(argv=None): - if argv is None: - argv = sys.argv[1:] - - parser = argparse.ArgumentParser(description="migrate rules.") - parser.add_argument("plan", type=str, help="Path to CSV describing migration") - parser.add_argument("source", type=str, help="Source directory of rules") - parser.add_argument("destination", type=str, help="Destination directory of rules") - args = parser.parse_args(args=argv) - - logging.basicConfig(level=logging.INFO) - logging.getLogger().setLevel(logging.INFO) - - plan = read_plan(args.plan) - logger.info("read %d plan entries", len(plan)) - - rules = read_rules(args.source) - logger.info("read %d rules", len(rules)) - - planned_rules = set([row["existing name"] for row in plan]) - unplanned_rules = [rule for (name, rule) in rules.items() if name not in planned_rules] - - if unplanned_rules: - logger.error("plan does not account for %d rules:" % (len(unplanned_rules))) - for rule in unplanned_rules: - logger.error(" " + rule.name) - return -1 - - # pairs of strings (needle, replacement) - match_translations = [] - - for row in plan: - if not row["existing name"]: - continue - - rule = rules[row["existing name"]] - - if rule.meta["name"] != row["proposed name"]: - logger.info("renaming rule '%s' -> '%s'", rule.meta["name"], row["proposed name"]) - - # assume the yaml is formatted like `- match: $rule-name`. - # but since its been linted, this should be ok. - match_translations.append(("- match: " + rule.meta["name"], "- match: " + row["proposed name"])) - - rule.meta["name"] = row["proposed name"] - rule.name = row["proposed name"] - - if "rule-category" in rule.meta: - logger.info("deleting rule category '%s'", rule.meta["rule-category"]) - del rule.meta["rule-category"] - - rule.meta["namespace"] = row["proposed namespace"] - - if row["ATT&CK"] != "n/a" and row["ATT&CK"] != "": - tag = row["ATT&CK"] - name, _, id = tag.rpartition(" ") - tag = "%s [%s]" % (name, id) - rule.meta["att&ck"] = [tag] - - if row["MBC"] != "n/a" and row["MBC"] != "": - tag = row["MBC"] - rule.meta["mbc"] = [tag] - - for rule in rules.values(): - filename = rule.name - filename = filename.lower() - filename = filename.replace(" ", "-") - filename = filename.replace("(", "") - filename = filename.replace(")", "") - filename = filename.replace("+", "") - filename = filename.replace("/", "") - filename = filename + ".yml" - - try: - if rule.meta.get("capa/nursery"): - directory = os.path.join(args.destination, "nursery") - elif rule.meta.get("lib"): - directory = os.path.join(args.destination, "lib") - else: - directory = os.path.join(args.destination, rule.meta.get("namespace")) - os.makedirs(directory) - except OSError: - pass - else: - logger.info("created namespace: %s", directory) - - path = os.path.join(directory, filename) - logger.info("writing rule %s", path) - - doc = rule.to_yaml().decode("utf-8") - for (needle, replacement) in match_translations: - doc = doc.replace(needle, replacement) - - with open(path, "wb") as f: - f.write(doc.encode("utf-8")) - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) +#!/usr/bin/env python +""" +migrate rules and their namespaces. + +example: + + $ python scripts/migrate-rules.py migration.csv ./rules ./new-rules +""" +import os +import csv +import sys +import logging +import os.path +import collections + +import argparse + +import capa.rules + +logger = logging.getLogger("migrate-rules") + + +def read_plan(plan_path): + with open(plan_path, "rb") as f: + return list( + csv.DictReader( + f, + restkey="other", + fieldnames=( + "existing path", + "existing name", + "existing rule-category", + "proposed name", + "proposed namespace", + "ATT&CK", + "MBC", + "comment1", + ), + ) + ) + + +def read_rules(rule_directory): + rules = {} + for root, dirs, files in os.walk(rule_directory): + for file in files: + path = os.path.join(root, file) + if not path.endswith(".yml"): + logger.info("skipping file: %s", path) + continue + + rule = capa.rules.Rule.from_yaml_file(path) + rules[rule.name] = rule + + if "nursery" in path: + rule.meta["capa/nursery"] = True + return rules + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + parser = argparse.ArgumentParser(description="migrate rules.") + parser.add_argument("plan", type=str, help="Path to CSV describing migration") + parser.add_argument("source", type=str, help="Source directory of rules") + parser.add_argument("destination", type=str, help="Destination directory of rules") + args = parser.parse_args(args=argv) + + logging.basicConfig(level=logging.INFO) + logging.getLogger().setLevel(logging.INFO) + + plan = read_plan(args.plan) + logger.info("read %d plan entries", len(plan)) + + rules = read_rules(args.source) + logger.info("read %d rules", len(rules)) + + planned_rules = set([row["existing name"] for row in plan]) + unplanned_rules = [rule for (name, rule) in rules.items() if name not in planned_rules] + + if unplanned_rules: + logger.error("plan does not account for %d rules:" % (len(unplanned_rules))) + for rule in unplanned_rules: + logger.error(" " + rule.name) + return -1 + + # pairs of strings (needle, replacement) + match_translations = [] + + for row in plan: + if not row["existing name"]: + continue + + rule = rules[row["existing name"]] + + if rule.meta["name"] != row["proposed name"]: + logger.info("renaming rule '%s' -> '%s'", rule.meta["name"], row["proposed name"]) + + # assume the yaml is formatted like `- match: $rule-name`. + # but since its been linted, this should be ok. + match_translations.append(("- match: " + rule.meta["name"], "- match: " + row["proposed name"])) + + rule.meta["name"] = row["proposed name"] + rule.name = row["proposed name"] + + if "rule-category" in rule.meta: + logger.info("deleting rule category '%s'", rule.meta["rule-category"]) + del rule.meta["rule-category"] + + rule.meta["namespace"] = row["proposed namespace"] + + if row["ATT&CK"] != "n/a" and row["ATT&CK"] != "": + tag = row["ATT&CK"] + name, _, id = tag.rpartition(" ") + tag = "%s [%s]" % (name, id) + rule.meta["att&ck"] = [tag] + + if row["MBC"] != "n/a" and row["MBC"] != "": + tag = row["MBC"] + rule.meta["mbc"] = [tag] + + for rule in rules.values(): + filename = rule.name + filename = filename.lower() + filename = filename.replace(" ", "-") + filename = filename.replace("(", "") + filename = filename.replace(")", "") + filename = filename.replace("+", "") + filename = filename.replace("/", "") + filename = filename + ".yml" + + try: + if rule.meta.get("capa/nursery"): + directory = os.path.join(args.destination, "nursery") + elif rule.meta.get("lib"): + directory = os.path.join(args.destination, "lib") + else: + directory = os.path.join(args.destination, rule.meta.get("namespace")) + os.makedirs(directory) + except OSError: + pass + else: + logger.info("created namespace: %s", directory) + + path = os.path.join(directory, filename) + logger.info("writing rule %s", path) + + doc = rule.to_yaml().decode("utf-8") + for (needle, replacement) in match_translations: + doc = doc.replace(needle, replacement) + + with open(path, "wb") as f: + f.write(doc.encode("utf-8")) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_fmt.py b/tests/test_fmt.py index 33fe077c..ff89d679 100644 --- a/tests/test_fmt.py +++ b/tests/test_fmt.py @@ -1,106 +1,106 @@ -import textwrap - -import capa.rules - -EXPECTED = textwrap.dedent( - """\ - rule: - meta: - name: test rule - author: user@domain.com - scope: function - examples: - - foo1234 - - bar5678 - features: - - and: - - number: 1 - - number: 2 - """ -) - - -def test_rule_reformat_top_level_elements(): - rule = textwrap.dedent( - """ - rule: - features: - - and: - - number: 1 - - number: 2 - meta: - name: test rule - author: user@domain.com - scope: function - examples: - - foo1234 - - bar5678 - """ - ) - - assert capa.rules.Rule.from_yaml(rule).to_yaml() == EXPECTED - - -def test_rule_reformat_indentation(): - rule = textwrap.dedent( - """ - rule: - meta: - name: test rule - author: user@domain.com - scope: function - examples: - - foo1234 - - bar5678 - features: - - and: - - number: 1 - - number: 2 - """ - ) - - assert capa.rules.Rule.from_yaml(rule).to_yaml() == EXPECTED - - -def test_rule_reformat_order(): - rule = textwrap.dedent( - """ - rule: - meta: - author: user@domain.com - examples: - - foo1234 - - bar5678 - scope: function - name: test rule - features: - - and: - - number: 1 - - number: 2 - """ - ) - - assert capa.rules.Rule.from_yaml(rule).to_yaml() == EXPECTED - - -def test_rule_reformat_meta_update(): - rule = textwrap.dedent( - """ - rule: - meta: - author: user@domain.com - examples: - - foo1234 - - bar5678 - scope: function - name: AAAA - features: - - and: - - number: 1 - - number: 2 - """ - ) - - rule = capa.rules.Rule.from_yaml(rule) - rule.name = "test rule" - assert rule.to_yaml() == EXPECTED +import textwrap + +import capa.rules + +EXPECTED = textwrap.dedent( + """\ + rule: + meta: + name: test rule + author: user@domain.com + scope: function + examples: + - foo1234 + - bar5678 + features: + - and: + - number: 1 + - number: 2 + """ +) + + +def test_rule_reformat_top_level_elements(): + rule = textwrap.dedent( + """ + rule: + features: + - and: + - number: 1 + - number: 2 + meta: + name: test rule + author: user@domain.com + scope: function + examples: + - foo1234 + - bar5678 + """ + ) + + assert capa.rules.Rule.from_yaml(rule).to_yaml() == EXPECTED + + +def test_rule_reformat_indentation(): + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + author: user@domain.com + scope: function + examples: + - foo1234 + - bar5678 + features: + - and: + - number: 1 + - number: 2 + """ + ) + + assert capa.rules.Rule.from_yaml(rule).to_yaml() == EXPECTED + + +def test_rule_reformat_order(): + rule = textwrap.dedent( + """ + rule: + meta: + author: user@domain.com + examples: + - foo1234 + - bar5678 + scope: function + name: test rule + features: + - and: + - number: 1 + - number: 2 + """ + ) + + assert capa.rules.Rule.from_yaml(rule).to_yaml() == EXPECTED + + +def test_rule_reformat_meta_update(): + rule = textwrap.dedent( + """ + rule: + meta: + author: user@domain.com + examples: + - foo1234 + - bar5678 + scope: function + name: AAAA + features: + - and: + - number: 1 + - number: 2 + """ + ) + + rule = capa.rules.Rule.from_yaml(rule) + rule.name = "test rule" + assert rule.to_yaml() == EXPECTED