diff --git a/capa/engine.py b/capa/engine.py index 5582982a..2356d0f5 100644 --- a/capa/engine.py +++ b/capa/engine.py @@ -9,7 +9,8 @@ import copy import collections -import capa.features +import capa.rules +import capa.features.common class Statement(object): @@ -254,12 +255,12 @@ def match(rules, features, va): res = rule.evaluate(features) if res: results[rule.name].append((va, res)) - features[capa.features.MatchedRule(rule.name)].add(va) + features[capa.features.common.MatchedRule(rule.name)].add(va) namespace = rule.meta.get("namespace") if namespace: while namespace: - features[capa.features.MatchedRule(namespace)].add(va) + features[capa.features.common.MatchedRule(namespace)].add(va) namespace, _, _ = namespace.rpartition("/") return (features, results) diff --git a/capa/features/__init__.py b/capa/features/__init__.py index 151b5744..e69de29b 100644 --- a/capa/features/__init__.py +++ b/capa/features/__init__.py @@ -1,254 +0,0 @@ -# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: [package root]/LICENSE.txt -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under the License. - -import re -import codecs -import logging -import collections - -import capa.engine -import capa.features - -logger = logging.getLogger(__name__) -MAX_BYTES_FEATURE_SIZE = 0x100 - -# thunks may be chained so we specify a delta to control the depth to which these chains are explored -THUNK_CHAIN_DEPTH_DELTA = 5 - -# identifiers for supported architectures names that tweak a feature -# for example, offset/x32 -ARCH_X32 = "x32" -ARCH_X64 = "x64" -VALID_ARCH = (ARCH_X32, ARCH_X64) - - -def bytes_to_str(b): - return str(codecs.encode(b, "hex").decode("utf-8")) - - -def hex_string(h): - """render hex string e.g. "0a40b1" as "0A 40 B1" """ - return " ".join(h[i : i + 2] for i in range(0, len(h), 2)).upper() - - -def escape_string(s): - """escape special characters""" - s = repr(s) - if not s.startswith(('"', "'")): - # u'hello\r\nworld' -> hello\\r\\nworld - s = s[2:-1] - else: - # 'hello\r\nworld' -> hello\\r\\nworld - s = s[1:-1] - s = s.replace("\\'", "'") # repr() may escape "'" in some edge cases, remove - s = s.replace('"', '\\"') # repr() does not escape '"', add - return s - - -class Feature(object): - def __init__(self, value, arch=None, description=None): - """ - Args: - value (any): the value of the feature, such as the number or string. - arch (str): one of the VALID_ARCH values, or None. - When None, then the feature applies to any architecture. - Modifies the feature name from `feature` to `feature/arch`, like `offset/x32`. - description (str): a human-readable description that explains the feature value. - """ - super(Feature, self).__init__() - - if arch is not None: - if arch not in VALID_ARCH: - raise ValueError("arch '%s' must be one of %s" % (arch, VALID_ARCH)) - self.name = self.__class__.__name__.lower() + "/" + arch - else: - self.name = self.__class__.__name__.lower() - - self.value = value - self.arch = arch - self.description = description - - def __hash__(self): - return hash((self.name, self.value, self.arch)) - - def __eq__(self, other): - return self.name == other.name and self.value == other.value and self.arch == other.arch - - def get_value_str(self): - """ - render the value of this feature, for use by `__str__` and friends. - subclasses should override to customize the rendering. - - Returns: any - """ - return self.value - - def __str__(self): - if self.value is not None: - if self.description: - return "%s(%s = %s)" % (self.name, self.get_value_str(), self.description) - else: - return "%s(%s)" % (self.name, self.get_value_str()) - else: - return "%s" % self.name - - def __repr__(self): - return str(self) - - def evaluate(self, ctx): - return capa.engine.Result(self in ctx, self, [], locations=ctx.get(self, [])) - - def freeze_serialize(self): - if self.arch is not None: - return (self.__class__.__name__, [self.value, {"arch": self.arch}]) - else: - return (self.__class__.__name__, [self.value]) - - @classmethod - def freeze_deserialize(cls, args): - # as you can see below in code, - # if the last argument is a dictionary, - # consider it to be kwargs passed to the feature constructor. - if len(args) == 1: - return cls(*args) - elif isinstance(args[-1], dict): - kwargs = args[-1] - args = args[:-1] - return cls(*args, **kwargs) - - -class MatchedRule(Feature): - def __init__(self, value, description=None): - super(MatchedRule, self).__init__(value, description=description) - self.name = "match" - - -class Characteristic(Feature): - def __init__(self, value, description=None): - super(Characteristic, self).__init__(value, description=description) - - -class String(Feature): - def __init__(self, value, description=None): - super(String, self).__init__(value, description=description) - - -class Regex(String): - def __init__(self, value, description=None): - super(Regex, self).__init__(value, description=description) - pat = self.value[len("/") : -len("/")] - flags = re.DOTALL - if value.endswith("/i"): - pat = self.value[len("/") : -len("/i")] - flags |= re.IGNORECASE - try: - self.re = re.compile(pat, flags) - except re.error: - if value.endswith("/i"): - value = value[: -len("i")] - raise ValueError( - "invalid regular expression: %s it should use Python syntax, try it at https://pythex.org" % value - ) - - def evaluate(self, ctx): - # mapping from string value to list of locations. - # will unique the locations later on. - matches = collections.defaultdict(list) - - for feature, locations in ctx.items(): - if not isinstance(feature, (capa.features.String,)): - continue - - # `re.search` finds a match anywhere in the given string - # which implies leading and/or trailing whitespace. - # using this mode cleans is more convenient for rule authors, - # so that they don't have to prefix/suffix their terms like: /.*foo.*/. - if self.re.search(feature.value): - matches[feature.value].extend(locations) - - if matches: - # finalize: defaultdict -> dict - # which makes json serialization easier - matches = dict(matches) - - # collect all locations - locations = set() - for s in matches.keys(): - matches[s] = list(set(matches[s])) - locations.update(matches[s]) - - # unlike other features, we cannot return put a reference to `self` directly in a `Result`. - # this is because `self` may match on many strings, so we can't stuff the matched value into it. - # instead, return a new instance that has a reference to both the regex and the matched values. - # see #262. - return capa.engine.Result(True, _MatchedRegex(self, matches), [], locations=locations) - else: - return capa.engine.Result(False, _MatchedRegex(self, None), []) - - def __str__(self): - return "regex(string =~ %s)" % self.value - - -class _MatchedRegex(Regex): - """ - this represents specific match instances of a regular expression feature. - treat it the same as a `Regex` except it has the `matches` field that contains the complete strings that matched. - - note: this type should only ever be constructed by `Regex.evaluate()`. it is not part of the public API. - """ - - def __init__(self, regex, matches): - """ - args: - regex (Regex): the regex feature that matches. - match (Dict[string, List[int]]|None): mapping from matching string to its locations. - """ - super(_MatchedRegex, self).__init__(regex.value, description=regex.description) - # we want this to collide with the name of `Regex` above, - # so that it works nicely with the renderers. - self.name = "regex" - # this may be None if the regex doesn't match - self.matches = matches - - def __str__(self): - return "regex(string =~ %s, matches = %s)" % ( - self.value, - ", ".join(map(lambda s: '"' + s + '"', (self.matches or {}).keys())), - ) - - -class StringFactory(object): - def __new__(cls, value, description=None): - if value.startswith("/") and (value.endswith("/") or value.endswith("/i")): - return Regex(value, description=description) - return String(value, description=description) - - -class Bytes(Feature): - def __init__(self, value, description=None): - super(Bytes, self).__init__(value, description=description) - - def evaluate(self, ctx): - for feature, locations in ctx.items(): - if not isinstance(feature, (capa.features.Bytes,)): - continue - - if feature.value.startswith(self.value): - return capa.engine.Result(True, self, [], locations=locations) - - return capa.engine.Result(False, self, []) - - def get_value_str(self): - return hex_string(bytes_to_str(self.value)) - - def freeze_serialize(self): - return (self.__class__.__name__, [bytes_to_str(self.value).upper()]) - - @classmethod - def freeze_deserialize(cls, args): - return cls(*[codecs.decode(x, "hex") for x in args]) diff --git a/capa/features/basicblock.py b/capa/features/basicblock.py index e17c95ea..fb6a433c 100644 --- a/capa/features/basicblock.py +++ b/capa/features/basicblock.py @@ -6,7 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from capa.features import Feature +from capa.features.common import Feature class BasicBlock(Feature): diff --git a/capa/features/common.py b/capa/features/common.py new file mode 100644 index 00000000..b32caa7c --- /dev/null +++ b/capa/features/common.py @@ -0,0 +1,254 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import re +import codecs +import logging +import collections + +import capa.engine +import capa.features.common + +logger = logging.getLogger(__name__) +MAX_BYTES_FEATURE_SIZE = 0x100 + +# thunks may be chained so we specify a delta to control the depth to which these chains are explored +THUNK_CHAIN_DEPTH_DELTA = 5 + +# identifiers for supported architectures names that tweak a feature +# for example, offset/x32 +ARCH_X32 = "x32" +ARCH_X64 = "x64" +VALID_ARCH = (ARCH_X32, ARCH_X64) + + +def bytes_to_str(b): + return str(codecs.encode(b, "hex").decode("utf-8")) + + +def hex_string(h): + """render hex string e.g. "0a40b1" as "0A 40 B1" """ + return " ".join(h[i : i + 2] for i in range(0, len(h), 2)).upper() + + +def escape_string(s): + """escape special characters""" + s = repr(s) + if not s.startswith(('"', "'")): + # u'hello\r\nworld' -> hello\\r\\nworld + s = s[2:-1] + else: + # 'hello\r\nworld' -> hello\\r\\nworld + s = s[1:-1] + s = s.replace("\\'", "'") # repr() may escape "'" in some edge cases, remove + s = s.replace('"', '\\"') # repr() does not escape '"', add + return s + + +class Feature(object): + def __init__(self, value, arch=None, description=None): + """ + Args: + value (any): the value of the feature, such as the number or string. + arch (str): one of the VALID_ARCH values, or None. + When None, then the feature applies to any architecture. + Modifies the feature name from `feature` to `feature/arch`, like `offset/x32`. + description (str): a human-readable description that explains the feature value. + """ + super(Feature, self).__init__() + + if arch is not None: + if arch not in VALID_ARCH: + raise ValueError("arch '%s' must be one of %s" % (arch, VALID_ARCH)) + self.name = self.__class__.__name__.lower() + "/" + arch + else: + self.name = self.__class__.__name__.lower() + + self.value = value + self.arch = arch + self.description = description + + def __hash__(self): + return hash((self.name, self.value, self.arch)) + + def __eq__(self, other): + return self.name == other.name and self.value == other.value and self.arch == other.arch + + def get_value_str(self): + """ + render the value of this feature, for use by `__str__` and friends. + subclasses should override to customize the rendering. + + Returns: any + """ + return self.value + + def __str__(self): + if self.value is not None: + if self.description: + return "%s(%s = %s)" % (self.name, self.get_value_str(), self.description) + else: + return "%s(%s)" % (self.name, self.get_value_str()) + else: + return "%s" % self.name + + def __repr__(self): + return str(self) + + def evaluate(self, ctx): + return capa.engine.Result(self in ctx, self, [], locations=ctx.get(self, [])) + + def freeze_serialize(self): + if self.arch is not None: + return (self.__class__.__name__, [self.value, {"arch": self.arch}]) + else: + return (self.__class__.__name__, [self.value]) + + @classmethod + def freeze_deserialize(cls, args): + # as you can see below in code, + # if the last argument is a dictionary, + # consider it to be kwargs passed to the feature constructor. + if len(args) == 1: + return cls(*args) + elif isinstance(args[-1], dict): + kwargs = args[-1] + args = args[:-1] + return cls(*args, **kwargs) + + +class MatchedRule(Feature): + def __init__(self, value, description=None): + super(MatchedRule, self).__init__(value, description=description) + self.name = "match" + + +class Characteristic(Feature): + def __init__(self, value, description=None): + super(Characteristic, self).__init__(value, description=description) + + +class String(Feature): + def __init__(self, value, description=None): + super(String, self).__init__(value, description=description) + + +class Regex(String): + def __init__(self, value, description=None): + super(Regex, self).__init__(value, description=description) + pat = self.value[len("/") : -len("/")] + flags = re.DOTALL + if value.endswith("/i"): + pat = self.value[len("/") : -len("/i")] + flags |= re.IGNORECASE + try: + self.re = re.compile(pat, flags) + except re.error: + if value.endswith("/i"): + value = value[: -len("i")] + raise ValueError( + "invalid regular expression: %s it should use Python syntax, try it at https://pythex.org" % value + ) + + def evaluate(self, ctx): + # mapping from string value to list of locations. + # will unique the locations later on. + matches = collections.defaultdict(list) + + for feature, locations in ctx.items(): + if not isinstance(feature, (capa.features.common.String,)): + continue + + # `re.search` finds a match anywhere in the given string + # which implies leading and/or trailing whitespace. + # using this mode cleans is more convenient for rule authors, + # so that they don't have to prefix/suffix their terms like: /.*foo.*/. + if self.re.search(feature.value): + matches[feature.value].extend(locations) + + if matches: + # finalize: defaultdict -> dict + # which makes json serialization easier + matches = dict(matches) + + # collect all locations + locations = set() + for s in matches.keys(): + matches[s] = list(set(matches[s])) + locations.update(matches[s]) + + # unlike other features, we cannot return put a reference to `self` directly in a `Result`. + # this is because `self` may match on many strings, so we can't stuff the matched value into it. + # instead, return a new instance that has a reference to both the regex and the matched values. + # see #262. + return capa.engine.Result(True, _MatchedRegex(self, matches), [], locations=locations) + else: + return capa.engine.Result(False, _MatchedRegex(self, None), []) + + def __str__(self): + return "regex(string =~ %s)" % self.value + + +class _MatchedRegex(Regex): + """ + this represents specific match instances of a regular expression feature. + treat it the same as a `Regex` except it has the `matches` field that contains the complete strings that matched. + + note: this type should only ever be constructed by `Regex.evaluate()`. it is not part of the public API. + """ + + def __init__(self, regex, matches): + """ + args: + regex (Regex): the regex feature that matches. + match (Dict[string, List[int]]|None): mapping from matching string to its locations. + """ + super(_MatchedRegex, self).__init__(regex.value, description=regex.description) + # we want this to collide with the name of `Regex` above, + # so that it works nicely with the renderers. + self.name = "regex" + # this may be None if the regex doesn't match + self.matches = matches + + def __str__(self): + return "regex(string =~ %s, matches = %s)" % ( + self.value, + ", ".join(map(lambda s: '"' + s + '"', (self.matches or {}).keys())), + ) + + +class StringFactory(object): + def __new__(cls, value, description=None): + if value.startswith("/") and (value.endswith("/") or value.endswith("/i")): + return Regex(value, description=description) + return String(value, description=description) + + +class Bytes(Feature): + def __init__(self, value, description=None): + super(Bytes, self).__init__(value, description=description) + + def evaluate(self, ctx): + for feature, locations in ctx.items(): + if not isinstance(feature, (capa.features.common.Bytes,)): + continue + + if feature.value.startswith(self.value): + return capa.engine.Result(True, self, [], locations=locations) + + return capa.engine.Result(False, self, []) + + def get_value_str(self): + return hex_string(bytes_to_str(self.value)) + + def freeze_serialize(self): + return (self.__class__.__name__, [bytes_to_str(self.value).upper()]) + + @classmethod + def freeze_deserialize(cls, args): + return cls(*[codecs.decode(x, "hex") for x in args]) diff --git a/capa/features/extractors/ida/basicblock.py b/capa/features/extractors/ida/basicblock.py index 0a0e08f0..2e871751 100644 --- a/capa/features/extractors/ida/basicblock.py +++ b/capa/features/extractors/ida/basicblock.py @@ -6,14 +6,13 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys import string import struct import idaapi import capa.features.extractors.ida.helpers -from capa.features import Characteristic +from capa.features.common import Characteristic from capa.features.basicblock import BasicBlock from capa.features.extractors.ida import helpers from capa.features.extractors.helpers import MIN_STACKSTRING_LEN diff --git a/capa/features/extractors/ida/function.py b/capa/features/extractors/ida/function.py index 670c7b73..9bbdba51 100644 --- a/capa/features/extractors/ida/function.py +++ b/capa/features/extractors/ida/function.py @@ -10,7 +10,7 @@ import idaapi import idautils import capa.features.extractors.ida.helpers -from capa.features import Characteristic +from capa.features.common import Characteristic from capa.features.extractors import loops diff --git a/capa/features/extractors/ida/helpers.py b/capa/features/extractors/ida/helpers.py index 4b7dcdeb..7a07e1a2 100644 --- a/capa/features/extractors/ida/helpers.py +++ b/capa/features/extractors/ida/helpers.py @@ -6,9 +6,6 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys -import string - import idc import idaapi import idautils diff --git a/capa/features/extractors/ida/insn.py b/capa/features/extractors/ida/insn.py index ec2f8070..bf8ad847 100644 --- a/capa/features/extractors/ida/insn.py +++ b/capa/features/extractors/ida/insn.py @@ -12,7 +12,7 @@ import idautils import capa.features.extractors.helpers import capa.features.extractors.ida.helpers -from capa.features import ( +from capa.features.common import ( ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, diff --git a/capa/features/extractors/pefile.py b/capa/features/extractors/pefile.py index 099daa7d..6d9e6ae7 100644 --- a/capa/features/extractors/pefile.py +++ b/capa/features/extractors/pefile.py @@ -12,8 +12,8 @@ import pefile import capa.features.extractors import capa.features.extractors.helpers import capa.features.extractors.strings -from capa.features import String, Characteristic from capa.features.file import Export, Import, Section +from capa.features.common import String, Characteristic from capa.features.extractors.base_extractor import FeatureExtractor logger = logging.getLogger(__name__) diff --git a/capa/features/extractors/smda/basicblock.py b/capa/features/extractors/smda/basicblock.py index 98b91ec3..b68d02a0 100644 --- a/capa/features/extractors/smda/basicblock.py +++ b/capa/features/extractors/smda/basicblock.py @@ -1,8 +1,7 @@ -import sys import string import struct -from capa.features import Characteristic +from capa.features.common import Characteristic from capa.features.basicblock import BasicBlock from capa.features.extractors.helpers import MIN_STACKSTRING_LEN diff --git a/capa/features/extractors/smda/file.py b/capa/features/extractors/smda/file.py index 68fee066..cc9d5859 100644 --- a/capa/features/extractors/smda/file.py +++ b/capa/features/extractors/smda/file.py @@ -1,11 +1,9 @@ -import struct - # if we have SMDA we definitely have lief import lief import capa.features.extractors.helpers import capa.features.extractors.strings -from capa.features import String, Characteristic +from capa.features.common import String, Characteristic from capa.features.file import Export, Import, Section diff --git a/capa/features/extractors/smda/function.py b/capa/features/extractors/smda/function.py index e1765e67..33c6c5d1 100644 --- a/capa/features/extractors/smda/function.py +++ b/capa/features/extractors/smda/function.py @@ -1,4 +1,4 @@ -from capa.features import Characteristic +from capa.features.common import Characteristic from capa.features.extractors import loops diff --git a/capa/features/extractors/smda/insn.py b/capa/features/extractors/smda/insn.py index 48ef4c1c..22c9c66e 100644 --- a/capa/features/extractors/smda/insn.py +++ b/capa/features/extractors/smda/insn.py @@ -5,7 +5,7 @@ import struct from smda.common.SmdaReport import SmdaReport import capa.features.extractors.helpers -from capa.features import ( +from capa.features.common import ( ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, @@ -97,7 +97,7 @@ def read_bytes(smda_report, va, num_bytes=None): rva = va - smda_report.base_addr if smda_report.buffer is None: - return + raise ValueError("buffer is empty") buffer_end = len(smda_report.buffer) max_bytes = num_bytes if num_bytes is not None else MAX_BYTES_FEATURE_SIZE if rva + max_bytes > buffer_end: diff --git a/capa/features/extractors/viv/basicblock.py b/capa/features/extractors/viv/basicblock.py index a76aa179..510513f8 100644 --- a/capa/features/extractors/viv/basicblock.py +++ b/capa/features/extractors/viv/basicblock.py @@ -10,9 +10,9 @@ import string import struct import envi -import vivisect.const +import envi.archs.i386.disasm -from capa.features import Characteristic +from capa.features.common import Characteristic from capa.features.basicblock import BasicBlock from capa.features.extractors.helpers import MIN_STACKSTRING_LEN @@ -37,7 +37,7 @@ def _bb_has_tight_loop(f, bb): """ if len(bb.instructions) > 0: for bva, bflags in bb.instructions[-1].getBranches(): - if bflags & vivisect.envi.BR_COND: + if bflags & envi.BR_COND: if bva == bb.va: return True @@ -117,11 +117,15 @@ def get_printable_len(oper): chars = struct.pack(" 0: @@ -306,7 +311,7 @@ def read_string(vw, offset): try: ulen = vw.detectUnicode(offset) - except envi.SegmentationViolation: + except envi.exc.SegmentationViolation: pass except IndexError: # potential vivisect bug detecting Unicode at segment end @@ -367,14 +372,14 @@ def extract_insn_offset_features(f, bb, insn): # reg ^ # disp if isinstance(oper, envi.archs.i386.disasm.i386RegMemOper): - if oper.reg == envi.archs.i386.disasm.REG_ESP: + if oper.reg == envi.archs.i386.regs.REG_ESP: continue - if oper.reg == envi.archs.i386.disasm.REG_EBP: + if oper.reg == envi.archs.i386.regs.REG_EBP: continue # TODO: do x64 support for real. - if oper.reg == envi.archs.amd64.disasm.REG_RBP: + if oper.reg == envi.archs.amd64.regs.REG_RBP: continue # viv already decodes offsets as signed @@ -402,11 +407,11 @@ def is_security_cookie(f, bb, insn): # security cookie check should use SP or BP oper = insn.opers[1] if oper.isReg() and oper.reg not in [ - envi.archs.i386.disasm.REG_ESP, - envi.archs.i386.disasm.REG_EBP, + envi.archs.i386.regs.REG_ESP, + envi.archs.i386.regs.REG_EBP, # TODO: do x64 support for real. - envi.archs.amd64.disasm.REG_RBP, - envi.archs.amd64.disasm.REG_RSP, + envi.archs.amd64.regs.REG_RBP, + envi.archs.amd64.regs.REG_RSP, ]: return False diff --git a/capa/features/file.py b/capa/features/file.py index 2cc4d577..4af93cde 100644 --- a/capa/features/file.py +++ b/capa/features/file.py @@ -6,7 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from capa.features import Feature +from capa.features.common import Feature class Export(Feature): diff --git a/capa/features/freeze.py b/capa/features/freeze.py index 48e01c12..c6d4d7f5 100644 --- a/capa/features/freeze.py +++ b/capa/features/freeze.py @@ -53,11 +53,11 @@ import json import zlib import logging -import capa.features import capa.features.file import capa.features.insn +import capa.features.common import capa.features.basicblock -import capa.features.extractors +import capa.features.extractors.base_extractor from capa.helpers import hex logger = logging.getLogger(__name__) @@ -67,7 +67,7 @@ def serialize_feature(feature): return feature.freeze_serialize() -KNOWN_FEATURES = {F.__name__: F for F in capa.features.Feature.__subclasses__()} +KNOWN_FEATURES = {F.__name__: F for F in capa.features.common.Feature.__subclasses__()} def deserialize_feature(doc): @@ -217,7 +217,7 @@ def loads(s): feature = deserialize_feature(feature[:2]) features["functions"][loc[0]]["basic blocks"][loc[1]]["instructions"][loc[2]]["features"].append((va, feature)) - return capa.features.extractors.NullFeatureExtractor(features) + return capa.features.extractors.base_extractor.NullFeatureExtractor(features) MAGIC = "capa0000".encode("ascii") diff --git a/capa/features/insn.py b/capa/features/insn.py index 82e0ad7a..beadb815 100644 --- a/capa/features/insn.py +++ b/capa/features/insn.py @@ -7,7 +7,7 @@ # See the License for the specific language governing permissions and limitations under the License. import capa.render.utils -from capa.features import Feature +from capa.features.common import Feature class API(Feature): diff --git a/capa/ida/helpers.py b/capa/ida/helpers.py index 41e4b463..b2e8f0fc 100644 --- a/capa/ida/helpers.py +++ b/capa/ida/helpers.py @@ -14,6 +14,8 @@ import idaapi import idautils import capa +import capa.version +import capa.features.common logger = logging.getLogger("capa") @@ -86,7 +88,7 @@ def get_file_md5(): """ """ md5 = idautils.GetInputFileMD5() if not isinstance(md5, str): - md5 = capa.features.bytes_to_str(md5) + md5 = capa.features.common.bytes_to_str(md5) return md5 @@ -94,7 +96,7 @@ def get_file_sha256(): """ """ sha256 = idaapi.retrieve_input_file_sha256() if not isinstance(sha256, str): - sha256 = capa.features.bytes_to_str(sha256) + sha256 = capa.features.common.bytes_to_str(sha256) return sha256 diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index af292b53..ee73bbc2 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -20,8 +20,12 @@ from PyQt5 import QtGui, QtCore, QtWidgets import capa.main import capa.rules +import capa.engine import capa.ida.helpers +import capa.features.common import capa.features.extractors.ida.extractor +import capa.render.json +import capa.render.result_document from capa.ida.plugin.icon import QICON from capa.ida.plugin.view import ( CapaExplorerQtreeView, @@ -96,7 +100,7 @@ def find_func_matches(f, ruleset, func_features, bb_features): for (name, res) in matches.items(): bb_matches[name].extend(res) for (ea, _) in res: - func_features[capa.features.MatchedRule(name)].add(ea) + func_features[capa.features.common.MatchedRule(name)].add(ea) # find rule matches for function, function features include rule matches for basic blocks _, matches = capa.engine.match(ruleset.function_rules, func_features, int(f)) @@ -769,7 +773,7 @@ class CapaExplorerForm(idaapi.PluginForm): update_wait_box("rendering results") try: - self.doc = capa.render.convert_capabilities_to_result_document(meta, self.ruleset_cache, capabilities) + self.doc = capa.render.result_document.convert_capabilities_to_result_document(meta, self.ruleset_cache, capabilities) except Exception as e: logger.error("Failed to render results (error: %s)", e) return False @@ -864,7 +868,7 @@ class CapaExplorerForm(idaapi.PluginForm): if rule.meta.get("capa/subscope-rule"): continue for (ea, _) in res: - func_features[capa.features.MatchedRule(name)].add(ea) + func_features[capa.features.common.MatchedRule(name)].add(ea) except Exception as e: logger.error("Failed to match function/basic block rule scope (error: %s)" % e) return False @@ -898,7 +902,7 @@ class CapaExplorerForm(idaapi.PluginForm): if rule.meta.get("capa/subscope-rule"): continue for (ea, _) in res: - file_features[capa.features.MatchedRule(name)].add(ea) + file_features[capa.features.common.MatchedRule(name)].add(ea) except Exception as e: logger.error("Failed to match file scope rules (error: %s)" % e) return False @@ -1122,7 +1126,7 @@ class CapaExplorerForm(idaapi.PluginForm): idaapi.info("No program analysis to save.") return - s = json.dumps(self.doc, sort_keys=True, cls=capa.render.CapaJsonObjectEncoder).encode("utf-8") + s = json.dumps(self.doc, sort_keys=True, cls=capa.render.json.CapaJsonObjectEncoder).encode("utf-8") path = idaapi.ask_file(True, "*.json", "Choose file to save capa program analysis JSON") if not path: diff --git a/capa/ida/plugin/item.py b/capa/ida/plugin/item.py index 77db0c32..8263fff9 100644 --- a/capa/ida/plugin/item.py +++ b/capa/ida/plugin/item.py @@ -6,7 +6,6 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys import codecs import idc diff --git a/capa/ida/plugin/model.py b/capa/ida/plugin/model.py index fb8c9ceb..78f785c7 100644 --- a/capa/ida/plugin/model.py +++ b/capa/ida/plugin/model.py @@ -13,8 +13,8 @@ import idaapi from PyQt5 import QtGui, QtCore import capa.rules -import capa.features import capa.ida.helpers +import capa.features.common import capa.render.utils as rutils from capa.ida.plugin.item import ( CapaExplorerDataItem, @@ -494,7 +494,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): value = feature[feature["type"]] if value: if key == "string": - value = '"%s"' % capa.features.escape_string(value) + value = '"%s"' % capa.features.common.escape_string(value) if feature.get("description", ""): return "%s(%s = %s)" % (key, value, feature["description"]) else: @@ -560,7 +560,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): for s, locations in feature["matches"].items(): if location in locations: return CapaExplorerStringViewItem( - parent, display, location, '"' + capa.features.escape_string(s) + '"' + parent, display, location, '"' + capa.features.common.escape_string(s) + '"' ) # programming error: the given location should always be found in the regex matches @@ -590,7 +590,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): if feature["type"] in ("string",): # display string preview return CapaExplorerStringViewItem( - parent, display, location, '"%s"' % capa.features.escape_string(feature[feature["type"]]) + parent, display, location, '"%s"' % capa.features.common.escape_string(feature[feature["type"]]) ) if feature["type"] in ("import", "export", "function-name"): diff --git a/capa/ida/plugin/view.py b/capa/ida/plugin/view.py index 52a29916..2f2d2a5f 100644 --- a/capa/ida/plugin/view.py +++ b/capa/ida/plugin/view.py @@ -14,6 +14,7 @@ from PyQt5 import QtGui, QtCore, QtWidgets import capa.rules import capa.engine import capa.ida.helpers +import capa.features.common import capa.features.basicblock from capa.ida.plugin.item import CapaExplorerFunctionItem from capa.ida.plugin.model import CapaExplorerDataModel @@ -624,8 +625,8 @@ class CapaExplorerRulgenEditor(QtWidgets.QTreeWidget): # single features for (k, v) in filter(lambda t: t[1] == 1, counted): - if isinstance(k, (capa.features.String,)): - value = '"%s"' % capa.features.escape_string(k.get_value_str()) + if isinstance(k, (capa.features.common.String,)): + value = '"%s"' % capa.features.common.escape_string(k.get_value_str()) else: value = k.get_value_str() self.new_feature_node(self.root, ("- %s: %s" % (k.name.lower(), value), "")) @@ -633,8 +634,8 @@ class CapaExplorerRulgenEditor(QtWidgets.QTreeWidget): # n > 1 features for (k, v) in filter(lambda t: t[1] > 1, counted): if k.value: - if isinstance(k, (capa.features.String,)): - value = '"%s"' % capa.features.escape_string(k.get_value_str()) + if isinstance(k, (capa.features.common.String,)): + value = '"%s"' % capa.features.common.escape_string(k.get_value_str()) else: value = k.get_value_str() display = "- count(%s(%s)): %d" % (k.name.lower(), value, v) @@ -898,8 +899,8 @@ class CapaExplorerRulegenFeatures(QtWidgets.QTreeWidget): """ """ name = feature.name.lower() value = feature.get_value_str() - if isinstance(feature, (capa.features.String,)): - value = '"%s"' % capa.features.escape_string(value) + if isinstance(feature, (capa.features.common.String,)): + value = '"%s"' % capa.features.common.escape_string(value) return "%s(%s)" % (name, value) for (feature, eas) in sorted(features.items(), key=lambda k: sorted(k[1])): diff --git a/capa/main.py b/capa/main.py index 0f1899d0..f12aeb29 100644 --- a/capa/main.py +++ b/capa/main.py @@ -29,10 +29,10 @@ import colorama import capa.rules import capa.engine import capa.version -import capa.features import capa.render.json import capa.render.default import capa.render.verbose +import capa.features.common import capa.features.freeze import capa.render.vverbose import capa.features.extractors @@ -97,7 +97,7 @@ def find_function_capabilities(ruleset, extractor, f): for rule_name, res in matches.items(): bb_matches[rule_name].extend(res) for va, _ in res: - function_features[capa.features.MatchedRule(rule_name)].add(va) + function_features[capa.features.common.MatchedRule(rule_name)].add(va) _, function_matches = capa.engine.match(ruleset.function_rules, function_features, int(f)) return function_matches, bb_matches, len(function_features) @@ -172,7 +172,7 @@ def find_capabilities(ruleset, extractor, disable_progress=None): # mapping from feature (matched rule) to set of addresses at which it matched. # schema: Dict[MatchedRule: Set[int] function_and_lower_features = { - capa.features.MatchedRule(rule_name): set(map(lambda p: p[0], results)) + capa.features.common.MatchedRule(rule_name): set(map(lambda p: p[0], results)) for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items()) } @@ -853,7 +853,9 @@ def main(argv=None): def ida_main(): + import capa.rules import capa.ida.helpers + import capa.render.default import capa.features.extractors.ida.extractor logging.basicConfig(level=logging.INFO) diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 747b0177..8c3703cd 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -8,6 +8,7 @@ import capa.rules import capa.engine import capa.render.utils +import capa.features.common def convert_statement_to_result_document(statement): @@ -92,7 +93,7 @@ def convert_node_to_result_document(node): "type": "statement", "statement": convert_statement_to_result_document(node), } - elif isinstance(node, capa.features.Feature): + elif isinstance(node, capa.features.common.Feature): return { "type": "feature", "feature": convert_feature_to_result_document(node), @@ -114,10 +115,10 @@ def convert_match_to_result_document(rules, capabilities, result): # logic expression, like `and`, don't have locations - their children do. # so only add `locations` to feature nodes. - if isinstance(result.statement, capa.features.Feature): + if isinstance(result.statement, capa.features.common.Feature): if bool(result.success): doc["locations"] = result.locations - elif isinstance(result.statement, capa.rules.Range): + elif isinstance(result.statement, capa.engine.Range): if bool(result.success): doc["locations"] = result.locations diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index 66873e2a..aa04495d 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -9,10 +9,10 @@ import tabulate import capa.rules -import capa.features import capa.render.utils as rutils import capa.render.verbose import capa.render.result_document +import capa.features.common def render_locations(ostream, match): @@ -57,7 +57,7 @@ def render_statement(ostream, match, statement, indent=0): if child[child["type"]]: if child["type"] == "string": - value = '"%s"' % capa.features.escape_string(child[child["type"]]) + value = '"%s"' % capa.features.common.escape_string(child[child["type"]]) else: value = child[child["type"]] value = rutils.bold2(value) @@ -86,7 +86,7 @@ def render_statement(ostream, match, statement, indent=0): def render_string_value(s): - return '"%s"' % capa.features.escape_string(s) + return '"%s"' % capa.features.common.escape_string(s) def render_feature(ostream, match, feature, indent=0): diff --git a/capa/rules.py b/capa/rules.py index bf407439..ab6dca93 100644 --- a/capa/rules.py +++ b/capa/rules.py @@ -12,6 +12,7 @@ import codecs import logging import binascii import functools +import collections try: from functools import lru_cache @@ -28,9 +29,10 @@ import capa.engine import capa.features import capa.features.file import capa.features.insn +import capa.features.common import capa.features.basicblock -from capa.engine import * -from capa.features import MAX_BYTES_FEATURE_SIZE +import capa.engine as ceng +from capa.features.common import MAX_BYTES_FEATURE_SIZE logger = logging.getLogger(__name__) @@ -67,38 +69,38 @@ BASIC_BLOCK_SCOPE = "basic block" SUPPORTED_FEATURES = { FILE_SCOPE: { - capa.features.MatchedRule, + capa.features.common.MatchedRule, capa.features.file.Export, capa.features.file.Import, capa.features.file.Section, capa.features.file.FunctionName, - capa.features.Characteristic("embedded pe"), - capa.features.String, + capa.features.common.Characteristic("embedded pe"), + capa.features.common.String, }, FUNCTION_SCOPE: { # plus basic block scope features, see below capa.features.basicblock.BasicBlock, - capa.features.Characteristic("calls from"), - capa.features.Characteristic("calls to"), - capa.features.Characteristic("loop"), - capa.features.Characteristic("recursive call"), + capa.features.common.Characteristic("calls from"), + capa.features.common.Characteristic("calls to"), + capa.features.common.Characteristic("loop"), + capa.features.common.Characteristic("recursive call"), }, BASIC_BLOCK_SCOPE: { - capa.features.MatchedRule, + capa.features.common.MatchedRule, capa.features.insn.API, capa.features.insn.Number, - capa.features.String, - capa.features.Bytes, + capa.features.common.String, + capa.features.common.Bytes, capa.features.insn.Offset, capa.features.insn.Mnemonic, - capa.features.Characteristic("nzxor"), - capa.features.Characteristic("peb access"), - capa.features.Characteristic("fs access"), - capa.features.Characteristic("gs access"), - capa.features.Characteristic("cross section flow"), - capa.features.Characteristic("tight loop"), - capa.features.Characteristic("stack string"), - capa.features.Characteristic("indirect call"), + capa.features.common.Characteristic("nzxor"), + capa.features.common.Characteristic("peb access"), + capa.features.common.Characteristic("fs access"), + capa.features.common.Characteristic("gs access"), + capa.features.common.Characteristic("cross section flow"), + capa.features.common.Characteristic("tight loop"), + capa.features.common.Characteristic("stack string"), + capa.features.common.Characteristic("indirect call"), }, } @@ -142,8 +144,8 @@ class InvalidRuleSet(ValueError): def ensure_feature_valid_for_scope(scope, feature): - if isinstance(feature, capa.features.Characteristic): - if capa.features.Characteristic(feature.value) not in SUPPORTED_FEATURES[scope]: + if isinstance(feature, capa.features.common.Characteristic): + if capa.features.common.Characteristic(feature.value) not in SUPPORTED_FEATURES[scope]: raise InvalidRule("feature %s not support for scope %s" % (feature, scope)) elif not isinstance(feature, tuple(filter(lambda t: isinstance(t, type), SUPPORTED_FEATURES[scope]))): raise InvalidRule("feature %s not support for scope %s" % (feature, scope)) @@ -199,9 +201,9 @@ def parse_feature(key): if key == "api": return capa.features.insn.API elif key == "string": - return capa.features.StringFactory + return capa.features.common.StringFactory elif key == "bytes": - return capa.features.Bytes + return capa.features.common.Bytes elif key == "number": return capa.features.insn.Number elif key.startswith("number/"): @@ -223,7 +225,7 @@ def parse_feature(key): elif key == "basic blocks": return capa.features.basicblock.BasicBlock elif key == "characteristic": - return capa.features.Characteristic + return capa.features.common.Characteristic elif key == "export": return capa.features.file.Export elif key == "import": @@ -231,7 +233,7 @@ def parse_feature(key): elif key == "section": return capa.features.file.Section elif key == "match": - return capa.features.MatchedRule + return capa.features.common.MatchedRule elif key == "function-name": return capa.features.file.FunctionName else: @@ -264,7 +266,7 @@ def parse_description(s, value_type, description=None): if isinstance(value, str): if value_type == "bytes": try: - value = codecs.decode(value.replace(" ", ""), "hex") + value = codecs.decode(value.replace(" ", "").encode("ascii"), "hex") except binascii.Error: raise InvalidRule('unexpected bytes value: "%s", must be a valid hex sequence' % value) @@ -323,21 +325,21 @@ def build_statements(d, scope): key = list(d.keys())[0] description = pop_statement_description_entry(d[key]) if key == "and": - return And([build_statements(dd, scope) for dd in d[key]], description=description) + return ceng.And([build_statements(dd, scope) for dd in d[key]], description=description) elif key == "or": - return Or([build_statements(dd, scope) for dd in d[key]], description=description) + return ceng.Or([build_statements(dd, scope) for dd in d[key]], description=description) elif key == "not": if len(d[key]) != 1: raise InvalidRule("not statement must have exactly one child statement") - return Not(build_statements(d[key][0], scope), description=description) + return ceng.Not(build_statements(d[key][0], scope), description=description) elif key.endswith(" or more"): count = int(key[: -len("or more")]) - return Some(count, [build_statements(dd, scope) for dd in d[key]], description=description) + return ceng.Some(count, [build_statements(dd, scope) for dd in d[key]], description=description) elif key == "optional": # `optional` is an alias for `0 or more` # which is useful for documenting behaviors, # like with `write file`, we might say that `WriteFile` is optionally found alongside `CreateFileA`. - return Some(0, [build_statements(dd, scope) for dd in d[key]], description=description) + return ceng.Some(0, [build_statements(dd, scope) for dd in d[key]], description=description) elif key == "function": if scope != FILE_SCOPE: @@ -346,7 +348,7 @@ def build_statements(d, scope): if len(d[key]) != 1: raise InvalidRule("subscope must have exactly one child statement") - return Subscope(FUNCTION_SCOPE, build_statements(d[key][0], FUNCTION_SCOPE)) + return ceng.Subscope(FUNCTION_SCOPE, build_statements(d[key][0], FUNCTION_SCOPE)) elif key == "basic block": if scope != FUNCTION_SCOPE: @@ -355,7 +357,7 @@ def build_statements(d, scope): if len(d[key]) != 1: raise InvalidRule("subscope must have exactly one child statement") - return Subscope(BASIC_BLOCK_SCOPE, build_statements(d[key][0], BASIC_BLOCK_SCOPE)) + return ceng.Subscope(BASIC_BLOCK_SCOPE, build_statements(d[key][0], BASIC_BLOCK_SCOPE)) elif key.startswith("count(") and key.endswith(")"): # e.g.: @@ -396,18 +398,18 @@ def build_statements(d, scope): count = d[key] if isinstance(count, int): - return Range(feature, min=count, max=count, description=description) + return ceng.Range(feature, min=count, max=count, description=description) elif count.endswith(" or more"): min = parse_int(count[: -len(" or more")]) max = None - return Range(feature, min=min, max=max, description=description) + return ceng.Range(feature, min=min, max=max, description=description) elif count.endswith(" or fewer"): min = None max = parse_int(count[: -len(" or fewer")]) - return Range(feature, min=min, max=max, description=description) + return ceng.Range(feature, min=min, max=max, description=description) elif count.startswith("("): min, max = parse_range(count) - return Range(feature, min=min, max=max, description=description) + return ceng.Range(feature, min=min, max=max, description=description) else: raise InvalidRule("unexpected range: %s" % (count)) elif key == "string" and not isinstance(d[key], str): @@ -462,7 +464,7 @@ class Rule(object): deps = set([]) def rec(statement): - if isinstance(statement, capa.features.MatchedRule): + if isinstance(statement, capa.features.common.MatchedRule): # we're not sure at this point if the `statement.value` is # really a rule name or a namespace name (we use `MatchedRule` for both cases). # we'll give precedence to namespaces, and then assume if that does work, @@ -478,7 +480,7 @@ class Rule(object): # not a namespace, assume its a rule name. deps.add(statement.value) - elif isinstance(statement, Statement): + elif isinstance(statement, ceng.Statement): for child in statement.get_children(): rec(child) @@ -489,7 +491,7 @@ class Rule(object): return deps def _extract_subscope_rules_rec(self, statement): - if isinstance(statement, Statement): + if isinstance(statement, ceng.Statement): # for each child that is a subscope, for subscope in filter( lambda statement: isinstance(statement, capa.engine.Subscope), statement.get_children() @@ -518,7 +520,7 @@ class Rule(object): ) # update the existing statement to `match` the new rule - new_node = capa.features.MatchedRule(name) + new_node = capa.features.common.MatchedRule(name) statement.replace_child(subscope, new_node) # and yield the new rule to our caller diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 04b4a191..5939744f 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -55,6 +55,7 @@ Unless required by applicable law or agreed to in writing, software distributed is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import os import sys import json import logging diff --git a/scripts/capa2yara.py b/scripts/capa2yara.py index 50619ddb..169b8cc4 100644 --- a/scripts/capa2yara.py +++ b/scripts/capa2yara.py @@ -31,7 +31,6 @@ See the License for the specific language governing permissions and limitations """ import re -import pdb import sys import string import logging @@ -44,8 +43,7 @@ import capa.rules import capa.engine import capa.features import capa.features.insn -from capa.features import ARCH_X32, ARCH_X64, String -from capa.features.insn import Number, Offset +from capa.features.common import ARCH_X32, ARCH_X64, String logger = logging.getLogger("capa2yara") @@ -348,9 +346,9 @@ def convert_rule(rule, rulename, cround, depth): kid = rule.child kids = [kid] num_kids = 1 - logger.info("kid: " + kids) + logger.info("kid: %s", kids) except: - logger.info("no kid in rule: " + rule.name) + logger.info("no kid in rule: %s", rule.name) # just a single statement without 'and' or 'or' before it in this rule if "kids" not in locals().keys(): @@ -366,7 +364,7 @@ def convert_rule(rule, rulename, cround, depth): else: x = 0 - logger.info("doing kids: " + repr(kids) + " - len: " + str(num_kids)) + logger.info("doing kids: %r - len: %s", kids, num_kids) for kid in kids: s_type = kid.name logger.info("doing type: " + s_type + " kidnum: " + str(x)) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index a7f2cff8..8cc4dee8 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -10,8 +10,8 @@ import capa.features import capa.render.json import capa.render.utils as rutils import capa.render.default +import capa.render.result_document from capa.engine import * -from capa.render import convert_capabilities_to_result_document # edit this to set the path for file to analyze and rule directory RULES_PATH = "/tmp/capa/rules/" @@ -203,7 +203,7 @@ def capa_details(file_path, output_format="dictionary"): capa_output = False if output_format == "dictionary": # ...as python dictionary, simplified as textable but in dictionary - doc = convert_capabilities_to_result_document(meta, rules, capabilities) + doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities) capa_output = render_dictionary(doc) elif output_format == "json": # render results diff --git a/scripts/lint.py b/scripts/lint.py index 9bd27c2e..19eb93eb 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -31,8 +31,8 @@ import ruamel.yaml import capa.main import capa.rules import capa.engine -import capa.features import capa.features.insn +import capa.features.common logger = logging.getLogger("lint") @@ -326,7 +326,7 @@ class FeatureStringTooShort(Lint): def check_features(self, ctx, features): for feature in features: - if isinstance(feature, capa.features.String): + if isinstance(feature, capa.features.common.String): if len(feature.value) < 4: self.recommendation = self.recommendation.format(feature.value) return True diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index f3d73710..60b328b0 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -59,9 +59,9 @@ import colorama import capa.main import capa.rules import capa.engine -import capa.render import capa.features import capa.render.utils as rutils +import capa.render.result_document import capa.features.freeze from capa.helpers import get_file_taste @@ -191,7 +191,7 @@ def main(argv=None): # - when not an interactive session, and disable coloring # renderers should use coloring and assume it will be stripped out if necessary. colorama.init() - doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities) + doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities) print(render_matches_by_function(doc)) colorama.deinit() diff --git a/tests/fixtures.py b/tests/fixtures.py index c0166794..0ff34ffa 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -19,8 +19,9 @@ import pytest import capa.main import capa.features.file import capa.features.insn +import capa.features.common import capa.features.basicblock -from capa.features import ARCH_X32, ARCH_X64 +from capa.features.common import ARCH_X32, ARCH_X64 CD = os.path.dirname(__file__) @@ -66,6 +67,7 @@ def xfail(condition, reason=None): # need to limit cache size so GitHub Actions doesn't run out of memory, see #545 @lru_cache(maxsize=1) def get_viv_extractor(path): + import capa.main import capa.features.extractors.viv.extractor sigpaths = [ @@ -268,34 +270,34 @@ def get_basic_block(extractor, f, va): def resolve_scope(scope): if scope == "file": - def inner(extractor): + def inner_file(extractor): return extract_file_features(extractor) - inner.__name__ = scope - return inner + inner_file.__name__ = scope + return inner_file elif "bb=" in scope: # like `function=0x401000,bb=0x40100A` fspec, _, bbspec = scope.partition(",") fva = int(fspec.partition("=")[2], 0x10) bbva = int(bbspec.partition("=")[2], 0x10) - def inner(extractor): + def inner_bb(extractor): f = get_function(extractor, fva) bb = get_basic_block(extractor, f, bbva) return extract_basic_block_features(extractor, f, bb) - inner.__name__ = scope - return inner + inner_bb.__name__ = scope + return inner_bb elif scope.startswith("function"): # like `function=0x401000` va = int(scope.partition("=")[2], 0x10) - def inner(extractor): + def inner_function(extractor): f = get_function(extractor, va) return extract_function_features(extractor, f) - inner.__name__ = scope - return inner + inner_function.__name__ = scope + return inner_function else: raise ValueError("unexpected scope fixture") @@ -324,12 +326,12 @@ def parametrize(params, values, **kwargs): FEATURE_PRESENCE_TESTS = sorted( [ # file/characteristic("embedded pe") - ("pma12-04", "file", capa.features.Characteristic("embedded pe"), True), + ("pma12-04", "file", capa.features.common.Characteristic("embedded pe"), True), # file/string - ("mimikatz", "file", capa.features.String("SCardControl"), True), - ("mimikatz", "file", capa.features.String("SCardTransmit"), True), - ("mimikatz", "file", capa.features.String("ACR > "), True), - ("mimikatz", "file", capa.features.String("nope"), False), + ("mimikatz", "file", capa.features.common.String("SCardControl"), True), + ("mimikatz", "file", capa.features.common.String("SCardTransmit"), True), + ("mimikatz", "file", capa.features.common.String("ACR > "), True), + ("mimikatz", "file", capa.features.common.String("nope"), False), # file/sections ("mimikatz", "file", capa.features.file.Section(".text"), True), ("mimikatz", "file", capa.features.file.Section(".nope"), False), @@ -353,17 +355,17 @@ FEATURE_PRESENCE_TESTS = sorted( ("mimikatz", "file", capa.features.file.Import("CryptAcquireContextW"), True), ("mimikatz", "file", capa.features.file.Import("CryptAcquireContext"), True), # function/characteristic(loop) - ("mimikatz", "function=0x401517", capa.features.Characteristic("loop"), True), - ("mimikatz", "function=0x401000", capa.features.Characteristic("loop"), False), + ("mimikatz", "function=0x401517", capa.features.common.Characteristic("loop"), True), + ("mimikatz", "function=0x401000", capa.features.common.Characteristic("loop"), False), # bb/characteristic(tight loop) - ("mimikatz", "function=0x402EC4", capa.features.Characteristic("tight loop"), True), - ("mimikatz", "function=0x401000", capa.features.Characteristic("tight loop"), False), + ("mimikatz", "function=0x402EC4", capa.features.common.Characteristic("tight loop"), True), + ("mimikatz", "function=0x401000", capa.features.common.Characteristic("tight loop"), False), # bb/characteristic(stack string) - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("stack string"), True), - ("mimikatz", "function=0x401000", capa.features.Characteristic("stack string"), False), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("stack string"), True), + ("mimikatz", "function=0x401000", capa.features.common.Characteristic("stack string"), False), # bb/characteristic(tight loop) - ("mimikatz", "function=0x402EC4,bb=0x402F8E", capa.features.Characteristic("tight loop"), True), - ("mimikatz", "function=0x401000,bb=0x401000", capa.features.Characteristic("tight loop"), False), + ("mimikatz", "function=0x402EC4,bb=0x402F8E", capa.features.common.Characteristic("tight loop"), True), + ("mimikatz", "function=0x401000,bb=0x401000", capa.features.common.Characteristic("tight loop"), False), # insn/mnemonic ("mimikatz", "function=0x40105D", capa.features.insn.Mnemonic("push"), True), ("mimikatz", "function=0x40105D", capa.features.insn.Mnemonic("movzx"), True), @@ -440,60 +442,60 @@ FEATURE_PRESENCE_TESTS = sorted( ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CloseHandle"), True), ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.WriteFile"), True), # insn/string - ("mimikatz", "function=0x40105D", capa.features.String("SCardControl"), True), - ("mimikatz", "function=0x40105D", capa.features.String("SCardTransmit"), True), - ("mimikatz", "function=0x40105D", capa.features.String("ACR > "), True), - ("mimikatz", "function=0x40105D", capa.features.String("nope"), False), - ("773290...", "function=0x140001140", capa.features.String(r"%s:\\OfficePackagesForWDAG"), True), + ("mimikatz", "function=0x40105D", capa.features.common.String("SCardControl"), True), + ("mimikatz", "function=0x40105D", capa.features.common.String("SCardTransmit"), True), + ("mimikatz", "function=0x40105D", capa.features.common.String("ACR > "), True), + ("mimikatz", "function=0x40105D", capa.features.common.String("nope"), False), + ("773290...", "function=0x140001140", capa.features.common.String(r"%s:\\OfficePackagesForWDAG"), True), # insn/regex, issue #262 - ("pma16-01", "function=0x4021B0", capa.features.Regex("HTTP/1.0"), True), - ("pma16-01", "function=0x4021B0", capa.features.Regex("www.practicalmalwareanalysis.com"), False), + ("pma16-01", "function=0x4021B0", capa.features.common.Regex("HTTP/1.0"), True), + ("pma16-01", "function=0x4021B0", capa.features.common.Regex("www.practicalmalwareanalysis.com"), False), # insn/string, pointer to string - ("mimikatz", "function=0x44EDEF", capa.features.String("INPUTEVENT"), True), + ("mimikatz", "function=0x44EDEF", capa.features.common.String("INPUTEVENT"), True), # insn/string, direct memory reference - ("mimikatz", "function=0x46D6CE", capa.features.String("(null)"), True), + ("mimikatz", "function=0x46D6CE", capa.features.common.String("(null)"), True), # insn/bytes - ("mimikatz", "function=0x40105D", capa.features.Bytes("SCardControl".encode("utf-16le")), True), - ("mimikatz", "function=0x40105D", capa.features.Bytes("SCardTransmit".encode("utf-16le")), True), - ("mimikatz", "function=0x40105D", capa.features.Bytes("ACR > ".encode("utf-16le")), True), - ("mimikatz", "function=0x40105D", capa.features.Bytes("nope".encode("ascii")), False), + ("mimikatz", "function=0x40105D", capa.features.common.Bytes("SCardControl".encode("utf-16le")), True), + ("mimikatz", "function=0x40105D", capa.features.common.Bytes("SCardTransmit".encode("utf-16le")), True), + ("mimikatz", "function=0x40105D", capa.features.common.Bytes("ACR > ".encode("utf-16le")), True), + ("mimikatz", "function=0x40105D", capa.features.common.Bytes("nope".encode("ascii")), False), # IDA features included byte sequences read from invalid memory, fixed in #409 - ("mimikatz", "function=0x44570F", capa.features.Bytes(binascii.unhexlify("FF" * 256)), False), + ("mimikatz", "function=0x44570F", capa.features.common.Bytes(binascii.unhexlify("FF" * 256)), False), # insn/bytes, pointer to bytes - ("mimikatz", "function=0x44EDEF", capa.features.Bytes("INPUTEVENT".encode("utf-16le")), True), + ("mimikatz", "function=0x44EDEF", capa.features.common.Bytes("INPUTEVENT".encode("utf-16le")), True), # insn/characteristic(nzxor) - ("mimikatz", "function=0x410DFC", capa.features.Characteristic("nzxor"), True), - ("mimikatz", "function=0x40105D", capa.features.Characteristic("nzxor"), False), + ("mimikatz", "function=0x410DFC", capa.features.common.Characteristic("nzxor"), True), + ("mimikatz", "function=0x40105D", capa.features.common.Characteristic("nzxor"), False), # insn/characteristic(nzxor): no security cookies - ("mimikatz", "function=0x46D534", capa.features.Characteristic("nzxor"), False), + ("mimikatz", "function=0x46D534", capa.features.common.Characteristic("nzxor"), False), # insn/characteristic(nzxor): xorps # viv needs fixup to recognize function, see above - ("3b13b...", "function=0x10006860", capa.features.Characteristic("nzxor"), True), + ("3b13b...", "function=0x10006860", capa.features.common.Characteristic("nzxor"), True), # insn/characteristic(peb access) - ("kernel32-64", "function=0x1800017D0", capa.features.Characteristic("peb access"), True), - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("peb access"), False), + ("kernel32-64", "function=0x1800017D0", capa.features.common.Characteristic("peb access"), True), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("peb access"), False), # insn/characteristic(gs access) - ("kernel32-64", "function=0x180001068", capa.features.Characteristic("gs access"), True), - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("gs access"), False), + ("kernel32-64", "function=0x180001068", capa.features.common.Characteristic("gs access"), True), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("gs access"), False), # insn/characteristic(cross section flow) - ("a1982...", "function=0x4014D0", capa.features.Characteristic("cross section flow"), True), + ("a1982...", "function=0x4014D0", capa.features.common.Characteristic("cross section flow"), True), # insn/characteristic(cross section flow): imports don't count - ("kernel32-64", "function=0x180001068", capa.features.Characteristic("cross section flow"), False), - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("cross section flow"), False), + ("kernel32-64", "function=0x180001068", capa.features.common.Characteristic("cross section flow"), False), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("cross section flow"), False), # insn/characteristic(recursive call) - ("mimikatz", "function=0x40640e", capa.features.Characteristic("recursive call"), True), + ("mimikatz", "function=0x40640e", capa.features.common.Characteristic("recursive call"), True), # before this we used ambiguous (0x4556E5, False), which has a data reference / indirect recursive call, see #386 - ("mimikatz", "function=0x4175FF", capa.features.Characteristic("recursive call"), False), + ("mimikatz", "function=0x4175FF", capa.features.common.Characteristic("recursive call"), False), # insn/characteristic(indirect call) - ("mimikatz", "function=0x4175FF", capa.features.Characteristic("indirect call"), True), - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("indirect call"), False), + ("mimikatz", "function=0x4175FF", capa.features.common.Characteristic("indirect call"), True), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("indirect call"), False), # insn/characteristic(calls from) - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("calls from"), True), - ("mimikatz", "function=0x4702FD", capa.features.Characteristic("calls from"), False), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("calls from"), True), + ("mimikatz", "function=0x4702FD", capa.features.common.Characteristic("calls from"), False), # function/characteristic(calls to) - ("mimikatz", "function=0x40105D", capa.features.Characteristic("calls to"), True), + ("mimikatz", "function=0x40105D", capa.features.common.Characteristic("calls to"), True), # before this we used ambiguous (0x4556E5, False), which has a data reference / indirect recursive call, see #386 - ("mimikatz", "function=0x456BB9", capa.features.Characteristic("calls to"), False), + ("mimikatz", "function=0x456BB9", capa.features.common.Characteristic("calls to"), False), # file/function-name ("pma16-01", "file", capa.features.file.FunctionName("__aulldiv"), True), ], @@ -510,10 +512,10 @@ FEATURE_PRESENCE_TESTS_IDA = [ FEATURE_COUNT_TESTS = [ ("mimikatz", "function=0x40E5C2", capa.features.basicblock.BasicBlock(), 7), - ("mimikatz", "function=0x4702FD", capa.features.Characteristic("calls from"), 0), - ("mimikatz", "function=0x40E5C2", capa.features.Characteristic("calls from"), 3), - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("calls to"), 0), - ("mimikatz", "function=0x40B1F1", capa.features.Characteristic("calls to"), 3), + ("mimikatz", "function=0x4702FD", capa.features.common.Characteristic("calls from"), 0), + ("mimikatz", "function=0x40E5C2", capa.features.common.Characteristic("calls from"), 3), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("calls to"), 0), + ("mimikatz", "function=0x40B1F1", capa.features.common.Characteristic("calls to"), 3), ] diff --git a/tests/test_engine.py b/tests/test_engine.py index a04ddad3..5f0d7cce 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -10,6 +10,8 @@ import textwrap import capa.rules import capa.engine +import capa.features.insn +import capa.features.common from capa.engine import * from capa.features import * from capa.features.insn import * @@ -233,7 +235,7 @@ def test_match_adds_matched_rule_feature(): ) r = capa.rules.Rule.from_yaml(rule) features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1}}, 0x0) - assert capa.features.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("test rule") in features def test_match_matched_rules(): @@ -268,8 +270,8 @@ def test_match_matched_rules(): {capa.features.insn.Number(100): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule1") in features - assert capa.features.MatchedRule("test rule2") in features + assert capa.features.common.MatchedRule("test rule1") in features + assert capa.features.common.MatchedRule("test rule2") in features # the ordering of the rules must not matter, # the engine should match rules in an appropriate order. @@ -278,8 +280,8 @@ def test_match_matched_rules(): {capa.features.insn.Number(100): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule1") in features - assert capa.features.MatchedRule("test rule2") in features + assert capa.features.common.MatchedRule("test rule1") in features + assert capa.features.common.MatchedRule("test rule2") in features def test_regex(): @@ -326,30 +328,30 @@ def test_regex(): {capa.features.insn.Number(100): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") not in features + assert capa.features.common.MatchedRule("test rule") not in features features, matches = capa.engine.match( capa.engine.topologically_order_rules(rules), - {capa.features.String("aaaa"): {1}}, + {capa.features.common.String("aaaa"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") not in features + assert capa.features.common.MatchedRule("test rule") not in features features, matches = capa.engine.match( capa.engine.topologically_order_rules(rules), - {capa.features.String("aBBBBa"): {1}}, + {capa.features.common.String("aBBBBa"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") not in features + assert capa.features.common.MatchedRule("test rule") not in features features, matches = capa.engine.match( capa.engine.topologically_order_rules(rules), - {capa.features.String("abbbba"): {1}}, + {capa.features.common.String("abbbba"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") in features - assert capa.features.MatchedRule("rule with implied wildcards") in features - assert capa.features.MatchedRule("rule with anchor") not in features + assert capa.features.common.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("rule with implied wildcards") in features + assert capa.features.common.MatchedRule("rule with anchor") not in features def test_regex_ignorecase(): @@ -369,10 +371,10 @@ def test_regex_ignorecase(): ] features, matches = capa.engine.match( capa.engine.topologically_order_rules(rules), - {capa.features.String("aBBBBa"): {1}}, + {capa.features.common.String("aBBBBa"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("test rule") in features def test_regex_complex(): @@ -392,10 +394,10 @@ def test_regex_complex(): ] features, matches = capa.engine.match( capa.engine.topologically_order_rules(rules), - {capa.features.String(r"Hardware\Key\key with spaces\some value"): {1}}, + {capa.features.common.String(r"Hardware\Key\key with spaces\some value"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("test rule") in features def test_match_namespace(): @@ -456,9 +458,9 @@ def test_match_namespace(): assert "CreateFile API" in matches assert "file-create" in matches assert "filesystem-any" in matches - assert capa.features.MatchedRule("file") in features - assert capa.features.MatchedRule("file/create") in features - assert capa.features.MatchedRule("file/create/CreateFile") in features + assert capa.features.common.MatchedRule("file") in features + assert capa.features.common.MatchedRule("file/create") in features + assert capa.features.common.MatchedRule("file/create/CreateFile") in features features, matches = capa.engine.match( capa.engine.topologically_order_rules(rules), @@ -472,11 +474,11 @@ def test_match_namespace(): def test_render_number(): assert str(capa.features.insn.Number(1)) == "number(0x1)" - assert str(capa.features.insn.Number(1, arch=ARCH_X32)) == "number/x32(0x1)" - assert str(capa.features.insn.Number(1, arch=ARCH_X64)) == "number/x64(0x1)" + assert str(capa.features.insn.Number(1, arch=capa.features.common.ARCH_X32)) == "number/x32(0x1)" + assert str(capa.features.insn.Number(1, arch=capa.features.common.ARCH_X64)) == "number/x64(0x1)" def test_render_offset(): assert str(capa.features.insn.Offset(1)) == "offset(0x1)" - assert str(capa.features.insn.Offset(1, arch=ARCH_X32)) == "offset/x32(0x1)" - assert str(capa.features.insn.Offset(1, arch=ARCH_X64)) == "offset/x64(0x1)" + assert str(capa.features.insn.Offset(1, arch=capa.features.common.ARCH_X32)) == "offset/x32(0x1)" + assert str(capa.features.insn.Offset(1, arch=capa.features.common.ARCH_X64)) == "offset/x64(0x1)" diff --git a/tests/test_freeze.py b/tests/test_freeze.py index fd5fb363..f65b35da 100644 --- a/tests/test_freeze.py +++ b/tests/test_freeze.py @@ -10,33 +10,36 @@ import textwrap from fixtures import * import capa.main +import capa.rules import capa.helpers -import capa.features import capa.features.insn +import capa.features.file import capa.features.freeze +import capa.features.common +import capa.features.basicblock import capa.features.extractors.base_extractor EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor( { "base address": 0x401000, "file features": [ - (0x402345, capa.features.Characteristic("embedded pe")), + (0x402345, capa.features.common.Characteristic("embedded pe")), ], "functions": { 0x401000: { "features": [ - (0x401000, capa.features.Characteristic("indirect call")), + (0x401000, capa.features.common.Characteristic("indirect call")), ], "basic blocks": { 0x401000: { "features": [ - (0x401000, capa.features.Characteristic("tight loop")), + (0x401000, capa.features.common.Characteristic("tight loop")), ], "instructions": { 0x401000: { "features": [ (0x401000, capa.features.insn.Mnemonic("xor")), - (0x401000, capa.features.Characteristic("nzxor")), + (0x401000, capa.features.common.Characteristic("nzxor")), ], }, 0x401002: { @@ -153,12 +156,12 @@ def roundtrip_feature(feature): def test_serialize_features(): roundtrip_feature(capa.features.insn.API("advapi32.CryptAcquireContextW")) - roundtrip_feature(capa.features.String("SCardControl")) + roundtrip_feature(capa.features.common.String("SCardControl")) roundtrip_feature(capa.features.insn.Number(0xFF)) roundtrip_feature(capa.features.insn.Offset(0x0)) roundtrip_feature(capa.features.insn.Mnemonic("push")) roundtrip_feature(capa.features.file.Section(".rsrc")) - roundtrip_feature(capa.features.Characteristic("tight loop")) + roundtrip_feature(capa.features.common.Characteristic("tight loop")) roundtrip_feature(capa.features.basicblock.BasicBlock()) roundtrip_feature(capa.features.file.Export("BaseThreadInitThunk")) roundtrip_feature(capa.features.file.Import("kernel32.IsWow64Process")) diff --git a/tests/test_function_id.py b/tests/test_function_id.py index c6ad2be9..9adbe8da 100644 --- a/tests/test_function_id.py +++ b/tests/test_function_id.py @@ -1,4 +1,4 @@ -from fixtures import get_function, pma16_01_extractor, extract_function_features +from fixtures import get_function, extract_function_features import capa.features.insn diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 9267164e..eab1efa4 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -13,9 +13,9 @@ from capa.features.extractors import helpers def test_all_zeros(): a = b"\x00\x00\x00\x00" - b = codecs.decode("00000000", "hex") + b = codecs.decode(b"00000000", "hex") c = b"\x01\x00\x00\x00" - d = codecs.decode("01000000", "hex") + d = codecs.decode(b"01000000", "hex") assert helpers.all_zeros(a) is True assert helpers.all_zeros(b) is True assert helpers.all_zeros(c) is False diff --git a/tests/test_ida_features.py b/tests/test_ida_features.py index 0189adc2..a568606a 100644 --- a/tests/test_ida_features.py +++ b/tests/test_ida_features.py @@ -9,6 +9,7 @@ import pytest try: sys.path.append(os.path.dirname(__file__)) + import fixtures from fixtures import * finally: sys.path.pop() @@ -44,20 +45,20 @@ def get_ida_extractor(_path): @pytest.mark.skip(reason="IDA Pro tests must be run within IDA") def test_ida_features(): - for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS + FEATURE_PRESENCE_TESTS_IDA: - id = make_test_id((sample, scope, feature, expected)) + for (sample, scope, feature, expected) in fixtures.FEATURE_PRESENCE_TESTS + fixtures.FEATURE_PRESENCE_TESTS_IDA: + id = fixtures.make_test_id((sample, scope, feature, expected)) try: - check_input_file(get_sample_md5_by_name(sample)) + check_input_file(fixtures.get_sample_md5_by_name(sample)) except RuntimeError: print("SKIP %s" % (id)) continue - scope = resolve_scope(scope) - sample = resolve_sample(sample) + scope = fixtures.resolve_scope(scope) + sample = fixtures.resolve_sample(sample) try: - do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected) except Exception as e: print("FAIL %s" % (id)) traceback.print_exc() @@ -67,20 +68,20 @@ def test_ida_features(): @pytest.mark.skip(reason="IDA Pro tests must be run within IDA") def test_ida_feature_counts(): - for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS: - id = make_test_id((sample, scope, feature, expected)) + for (sample, scope, feature, expected) in fixtures.FEATURE_COUNT_TESTS: + id = fixtures.make_test_id((sample, scope, feature, expected)) try: - check_input_file(get_sample_md5_by_name(sample)) + check_input_file(fixtures.get_sample_md5_by_name(sample)) except RuntimeError: print("SKIP %s" % (id)) continue - scope = resolve_scope(scope) - sample = resolve_sample(sample) + scope = fixtures.resolve_scope(scope) + sample = fixtures.resolve_sample(sample) try: - do_test_feature_count(get_ida_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_count(get_ida_extractor, sample, scope, feature, expected) except Exception as e: print("FAIL %s" % (id)) traceback.print_exc() diff --git a/tests/test_main.py b/tests/test_main.py index d38ead5b..aa74bed3 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -9,6 +9,7 @@ import json import textwrap +import fixtures from fixtures import * import capa.main @@ -362,7 +363,7 @@ def test_not_render_rules_also_matched(z9324d_extractor, capsys): def test_backend_option(capsys): # tests that main works with different backends - path = get_data_path_by_name("pma16-01") + path = fixtures.get_data_path_by_name("pma16-01") assert capa.main.main([path, "-j", "-b", capa.main.BACKEND_VIV]) == 0 std = capsys.readouterr() std_json = json.loads(std.out) diff --git a/tests/test_pefile_features.py b/tests/test_pefile_features.py index 5c24edab..7d8e4a90 100644 --- a/tests/test_pefile_features.py +++ b/tests/test_pefile_features.py @@ -5,18 +5,18 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys - import pytest -from fixtures import * -from fixtures import parametrize import capa.features.file +import fixtures +from fixtures import * +from fixtures import parametrize + @parametrize( "sample,scope,feature,expected", - FEATURE_PRESENCE_TESTS, + fixtures.FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"], ) def test_pefile_features(sample, scope, feature, expected): @@ -26,4 +26,4 @@ def test_pefile_features(sample, scope, feature, expected): if isinstance(feature, capa.features.file.FunctionName): pytest.xfail("pefile only doesn't extract function names") - do_test_feature_presence(get_pefile_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_presence(fixtures.get_pefile_extractor, sample, scope, feature, expected) diff --git a/tests/test_render.py b/tests/test_render.py index 10fda1e9..da89e4d6 100644 --- a/tests/test_render.py +++ b/tests/test_render.py @@ -1,8 +1,8 @@ import textwrap import capa.rules -from capa.render import convert_meta_to_result_document -from capa.render.utils import format_parts_id +import capa.render.utils +import capa.render.result_document def test_render_meta_attack(): @@ -27,7 +27,7 @@ def test_render_meta_attack(): ) ) r = capa.rules.Rule.from_yaml(rule) - rule_meta = convert_meta_to_result_document(r.meta) + rule_meta = capa.render.result_document.convert_meta_to_result_document(r.meta) attack = rule_meta["att&ck"][0] assert attack["id"] == id @@ -35,7 +35,7 @@ def test_render_meta_attack(): assert attack["technique"] == technique assert attack["subtechnique"] == subtechnique - assert format_parts_id(attack) == canonical + assert capa.render.utils.format_parts_id(attack) == canonical def test_render_meta_mbc(): @@ -60,7 +60,7 @@ def test_render_meta_mbc(): ) ) r = capa.rules.Rule.from_yaml(rule) - rule_meta = convert_meta_to_result_document(r.meta) + rule_meta = capa.render.result_document.convert_meta_to_result_document(r.meta) attack = rule_meta["mbc"][0] assert attack["id"] == id @@ -68,4 +68,4 @@ def test_render_meta_mbc(): assert attack["behavior"] == behavior assert attack["method"] == method - assert format_parts_id(attack) == canonical + assert capa.render.utils.format_parts_id(attack) == canonical diff --git a/tests/test_rules.py b/tests/test_rules.py index 33ed2d2a..7dc1b666 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -12,10 +12,10 @@ import pytest import capa.rules import capa.engine -import capa.features -from capa.features import ARCH_X32, ARCH_X64, String +import capa.features.common from capa.features.file import FunctionName from capa.features.insn import Number, Offset +from capa.features.common import ARCH_X32, ARCH_X64, String def test_rule_ctor(): @@ -752,17 +752,17 @@ def test_regex_values_always_string(): ] features, matches = capa.engine.match( capa.engine.topologically_order_rules(rules), - {capa.features.String("123"): {1}}, + {capa.features.common.String("123"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") in features + assert capa.features.comm.MatchedRule("test rule") in features features, matches = capa.engine.match( capa.engine.topologically_order_rules(rules), - {capa.features.String("0x123"): {1}}, + {capa.features.common.String("0x123"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("test rule") in features def test_filter_rules(): diff --git a/tests/test_smda_features.py b/tests/test_smda_features.py index 9435f736..24440f06 100644 --- a/tests/test_smda_features.py +++ b/tests/test_smda_features.py @@ -5,31 +5,31 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys - import pytest -from fixtures import * -from fixtures import parametrize import capa.features.file +import fixtures +from fixtures import * +from fixtures import parametrize + @parametrize( "sample,scope,feature,expected", - FEATURE_PRESENCE_TESTS, + fixtures.FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"], ) def test_smda_features(sample, scope, feature, expected): if scope.__name__ == "file" and isinstance(feature, capa.features.file.FunctionName) and expected is True: pytest.xfail("SMDA has no function ID") - do_test_feature_presence(get_smda_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_presence(fixtures.get_smda_extractor, sample, scope, feature, expected) @parametrize( "sample,scope,feature,expected", - FEATURE_COUNT_TESTS, + fixtures.FEATURE_COUNT_TESTS, indirect=["sample", "scope"], ) def test_smda_feature_counts(sample, scope, feature, expected): - do_test_feature_count(get_smda_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_count(fixtures.get_smda_extractor, sample, scope, feature, expected) diff --git a/tests/test_viv_features.py b/tests/test_viv_features.py index 3206e8cf..fa8bfda3 100644 --- a/tests/test_viv_features.py +++ b/tests/test_viv_features.py @@ -5,24 +5,23 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys - +import fixtures from fixtures import * -@parametrize( +@fixtures.parametrize( "sample,scope,feature,expected", - FEATURE_PRESENCE_TESTS, + fixtures.FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"], ) def test_viv_features(sample, scope, feature, expected): - do_test_feature_presence(get_viv_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_presence(fixtures.get_viv_extractor, sample, scope, feature, expected) -@parametrize( +@fixtures.parametrize( "sample,scope,feature,expected", - FEATURE_COUNT_TESTS, + fixtures.FEATURE_COUNT_TESTS, indirect=["sample", "scope"], ) def test_viv_feature_counts(sample, scope, feature, expected): - do_test_feature_count(get_viv_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_count(fixtures.get_viv_extractor, sample, scope, feature, expected)