diff --git a/CHANGELOG.md b/CHANGELOG.md index c0d41255..fc5598bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -128,6 +128,7 @@ It includes many new rules, including all new techniques introduced in MITRE ATT - json: breaking change: record all matching strings for regex #159 @williballenthin - main: implement file limitations via rules not code #390 @williballenthin - json: breaking change: correctly render negative offsets #619 @williballenthin +- library: breaking change: remove logic from `__init__.py` throughout #622 @williballenthin ### Development diff --git a/capa/engine.py b/capa/engine.py index 5582982a..f496a146 100644 --- a/capa/engine.py +++ b/capa/engine.py @@ -9,7 +9,7 @@ import copy import collections -import capa.features +import capa.features.common class Statement(object): @@ -199,37 +199,6 @@ class Subscope(Statement): raise ValueError("cannot evaluate a subscope directly!") -def topologically_order_rules(rules): - """ - order the given rules such that dependencies show up before dependents. - this means that as we match rules, we can add features for the matches, and these - will be matched by subsequent rules if they follow this order. - - assumes that the rule dependency graph is a DAG. - """ - # we evaluate `rules` multiple times, so if its a generator, realize it into a list. - rules = list(rules) - namespaces = capa.rules.index_rules_by_namespace(rules) - rules = {rule.name: rule for rule in rules} - seen = set([]) - ret = [] - - def rec(rule): - if rule.name in seen: - return - - for dep in rule.get_dependencies(namespaces): - rec(rules[dep]) - - ret.append(rule) - seen.add(rule.name) - - for rule in rules.values(): - rec(rule) - - return ret - - def match(rules, features, va): """ Args: @@ -254,12 +223,12 @@ def match(rules, features, va): res = rule.evaluate(features) if res: results[rule.name].append((va, res)) - features[capa.features.MatchedRule(rule.name)].add(va) + features[capa.features.common.MatchedRule(rule.name)].add(va) namespace = rule.meta.get("namespace") if namespace: while namespace: - features[capa.features.MatchedRule(namespace)].add(va) + features[capa.features.common.MatchedRule(namespace)].add(va) namespace, _, _ = namespace.rpartition("/") return (features, results) diff --git a/capa/features/__init__.py b/capa/features/__init__.py index 151b5744..e69de29b 100644 --- a/capa/features/__init__.py +++ b/capa/features/__init__.py @@ -1,254 +0,0 @@ -# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: [package root]/LICENSE.txt -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under the License. - -import re -import codecs -import logging -import collections - -import capa.engine -import capa.features - -logger = logging.getLogger(__name__) -MAX_BYTES_FEATURE_SIZE = 0x100 - -# thunks may be chained so we specify a delta to control the depth to which these chains are explored -THUNK_CHAIN_DEPTH_DELTA = 5 - -# identifiers for supported architectures names that tweak a feature -# for example, offset/x32 -ARCH_X32 = "x32" -ARCH_X64 = "x64" -VALID_ARCH = (ARCH_X32, ARCH_X64) - - -def bytes_to_str(b): - return str(codecs.encode(b, "hex").decode("utf-8")) - - -def hex_string(h): - """render hex string e.g. "0a40b1" as "0A 40 B1" """ - return " ".join(h[i : i + 2] for i in range(0, len(h), 2)).upper() - - -def escape_string(s): - """escape special characters""" - s = repr(s) - if not s.startswith(('"', "'")): - # u'hello\r\nworld' -> hello\\r\\nworld - s = s[2:-1] - else: - # 'hello\r\nworld' -> hello\\r\\nworld - s = s[1:-1] - s = s.replace("\\'", "'") # repr() may escape "'" in some edge cases, remove - s = s.replace('"', '\\"') # repr() does not escape '"', add - return s - - -class Feature(object): - def __init__(self, value, arch=None, description=None): - """ - Args: - value (any): the value of the feature, such as the number or string. - arch (str): one of the VALID_ARCH values, or None. - When None, then the feature applies to any architecture. - Modifies the feature name from `feature` to `feature/arch`, like `offset/x32`. - description (str): a human-readable description that explains the feature value. - """ - super(Feature, self).__init__() - - if arch is not None: - if arch not in VALID_ARCH: - raise ValueError("arch '%s' must be one of %s" % (arch, VALID_ARCH)) - self.name = self.__class__.__name__.lower() + "/" + arch - else: - self.name = self.__class__.__name__.lower() - - self.value = value - self.arch = arch - self.description = description - - def __hash__(self): - return hash((self.name, self.value, self.arch)) - - def __eq__(self, other): - return self.name == other.name and self.value == other.value and self.arch == other.arch - - def get_value_str(self): - """ - render the value of this feature, for use by `__str__` and friends. - subclasses should override to customize the rendering. - - Returns: any - """ - return self.value - - def __str__(self): - if self.value is not None: - if self.description: - return "%s(%s = %s)" % (self.name, self.get_value_str(), self.description) - else: - return "%s(%s)" % (self.name, self.get_value_str()) - else: - return "%s" % self.name - - def __repr__(self): - return str(self) - - def evaluate(self, ctx): - return capa.engine.Result(self in ctx, self, [], locations=ctx.get(self, [])) - - def freeze_serialize(self): - if self.arch is not None: - return (self.__class__.__name__, [self.value, {"arch": self.arch}]) - else: - return (self.__class__.__name__, [self.value]) - - @classmethod - def freeze_deserialize(cls, args): - # as you can see below in code, - # if the last argument is a dictionary, - # consider it to be kwargs passed to the feature constructor. - if len(args) == 1: - return cls(*args) - elif isinstance(args[-1], dict): - kwargs = args[-1] - args = args[:-1] - return cls(*args, **kwargs) - - -class MatchedRule(Feature): - def __init__(self, value, description=None): - super(MatchedRule, self).__init__(value, description=description) - self.name = "match" - - -class Characteristic(Feature): - def __init__(self, value, description=None): - super(Characteristic, self).__init__(value, description=description) - - -class String(Feature): - def __init__(self, value, description=None): - super(String, self).__init__(value, description=description) - - -class Regex(String): - def __init__(self, value, description=None): - super(Regex, self).__init__(value, description=description) - pat = self.value[len("/") : -len("/")] - flags = re.DOTALL - if value.endswith("/i"): - pat = self.value[len("/") : -len("/i")] - flags |= re.IGNORECASE - try: - self.re = re.compile(pat, flags) - except re.error: - if value.endswith("/i"): - value = value[: -len("i")] - raise ValueError( - "invalid regular expression: %s it should use Python syntax, try it at https://pythex.org" % value - ) - - def evaluate(self, ctx): - # mapping from string value to list of locations. - # will unique the locations later on. - matches = collections.defaultdict(list) - - for feature, locations in ctx.items(): - if not isinstance(feature, (capa.features.String,)): - continue - - # `re.search` finds a match anywhere in the given string - # which implies leading and/or trailing whitespace. - # using this mode cleans is more convenient for rule authors, - # so that they don't have to prefix/suffix their terms like: /.*foo.*/. - if self.re.search(feature.value): - matches[feature.value].extend(locations) - - if matches: - # finalize: defaultdict -> dict - # which makes json serialization easier - matches = dict(matches) - - # collect all locations - locations = set() - for s in matches.keys(): - matches[s] = list(set(matches[s])) - locations.update(matches[s]) - - # unlike other features, we cannot return put a reference to `self` directly in a `Result`. - # this is because `self` may match on many strings, so we can't stuff the matched value into it. - # instead, return a new instance that has a reference to both the regex and the matched values. - # see #262. - return capa.engine.Result(True, _MatchedRegex(self, matches), [], locations=locations) - else: - return capa.engine.Result(False, _MatchedRegex(self, None), []) - - def __str__(self): - return "regex(string =~ %s)" % self.value - - -class _MatchedRegex(Regex): - """ - this represents specific match instances of a regular expression feature. - treat it the same as a `Regex` except it has the `matches` field that contains the complete strings that matched. - - note: this type should only ever be constructed by `Regex.evaluate()`. it is not part of the public API. - """ - - def __init__(self, regex, matches): - """ - args: - regex (Regex): the regex feature that matches. - match (Dict[string, List[int]]|None): mapping from matching string to its locations. - """ - super(_MatchedRegex, self).__init__(regex.value, description=regex.description) - # we want this to collide with the name of `Regex` above, - # so that it works nicely with the renderers. - self.name = "regex" - # this may be None if the regex doesn't match - self.matches = matches - - def __str__(self): - return "regex(string =~ %s, matches = %s)" % ( - self.value, - ", ".join(map(lambda s: '"' + s + '"', (self.matches or {}).keys())), - ) - - -class StringFactory(object): - def __new__(cls, value, description=None): - if value.startswith("/") and (value.endswith("/") or value.endswith("/i")): - return Regex(value, description=description) - return String(value, description=description) - - -class Bytes(Feature): - def __init__(self, value, description=None): - super(Bytes, self).__init__(value, description=description) - - def evaluate(self, ctx): - for feature, locations in ctx.items(): - if not isinstance(feature, (capa.features.Bytes,)): - continue - - if feature.value.startswith(self.value): - return capa.engine.Result(True, self, [], locations=locations) - - return capa.engine.Result(False, self, []) - - def get_value_str(self): - return hex_string(bytes_to_str(self.value)) - - def freeze_serialize(self): - return (self.__class__.__name__, [bytes_to_str(self.value).upper()]) - - @classmethod - def freeze_deserialize(cls, args): - return cls(*[codecs.decode(x, "hex") for x in args]) diff --git a/capa/features/basicblock.py b/capa/features/basicblock.py index e17c95ea..fb6a433c 100644 --- a/capa/features/basicblock.py +++ b/capa/features/basicblock.py @@ -6,7 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from capa.features import Feature +from capa.features.common import Feature class BasicBlock(Feature): diff --git a/capa/features/common.py b/capa/features/common.py new file mode 100644 index 00000000..b32caa7c --- /dev/null +++ b/capa/features/common.py @@ -0,0 +1,254 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import re +import codecs +import logging +import collections + +import capa.engine +import capa.features.common + +logger = logging.getLogger(__name__) +MAX_BYTES_FEATURE_SIZE = 0x100 + +# thunks may be chained so we specify a delta to control the depth to which these chains are explored +THUNK_CHAIN_DEPTH_DELTA = 5 + +# identifiers for supported architectures names that tweak a feature +# for example, offset/x32 +ARCH_X32 = "x32" +ARCH_X64 = "x64" +VALID_ARCH = (ARCH_X32, ARCH_X64) + + +def bytes_to_str(b): + return str(codecs.encode(b, "hex").decode("utf-8")) + + +def hex_string(h): + """render hex string e.g. "0a40b1" as "0A 40 B1" """ + return " ".join(h[i : i + 2] for i in range(0, len(h), 2)).upper() + + +def escape_string(s): + """escape special characters""" + s = repr(s) + if not s.startswith(('"', "'")): + # u'hello\r\nworld' -> hello\\r\\nworld + s = s[2:-1] + else: + # 'hello\r\nworld' -> hello\\r\\nworld + s = s[1:-1] + s = s.replace("\\'", "'") # repr() may escape "'" in some edge cases, remove + s = s.replace('"', '\\"') # repr() does not escape '"', add + return s + + +class Feature(object): + def __init__(self, value, arch=None, description=None): + """ + Args: + value (any): the value of the feature, such as the number or string. + arch (str): one of the VALID_ARCH values, or None. + When None, then the feature applies to any architecture. + Modifies the feature name from `feature` to `feature/arch`, like `offset/x32`. + description (str): a human-readable description that explains the feature value. + """ + super(Feature, self).__init__() + + if arch is not None: + if arch not in VALID_ARCH: + raise ValueError("arch '%s' must be one of %s" % (arch, VALID_ARCH)) + self.name = self.__class__.__name__.lower() + "/" + arch + else: + self.name = self.__class__.__name__.lower() + + self.value = value + self.arch = arch + self.description = description + + def __hash__(self): + return hash((self.name, self.value, self.arch)) + + def __eq__(self, other): + return self.name == other.name and self.value == other.value and self.arch == other.arch + + def get_value_str(self): + """ + render the value of this feature, for use by `__str__` and friends. + subclasses should override to customize the rendering. + + Returns: any + """ + return self.value + + def __str__(self): + if self.value is not None: + if self.description: + return "%s(%s = %s)" % (self.name, self.get_value_str(), self.description) + else: + return "%s(%s)" % (self.name, self.get_value_str()) + else: + return "%s" % self.name + + def __repr__(self): + return str(self) + + def evaluate(self, ctx): + return capa.engine.Result(self in ctx, self, [], locations=ctx.get(self, [])) + + def freeze_serialize(self): + if self.arch is not None: + return (self.__class__.__name__, [self.value, {"arch": self.arch}]) + else: + return (self.__class__.__name__, [self.value]) + + @classmethod + def freeze_deserialize(cls, args): + # as you can see below in code, + # if the last argument is a dictionary, + # consider it to be kwargs passed to the feature constructor. + if len(args) == 1: + return cls(*args) + elif isinstance(args[-1], dict): + kwargs = args[-1] + args = args[:-1] + return cls(*args, **kwargs) + + +class MatchedRule(Feature): + def __init__(self, value, description=None): + super(MatchedRule, self).__init__(value, description=description) + self.name = "match" + + +class Characteristic(Feature): + def __init__(self, value, description=None): + super(Characteristic, self).__init__(value, description=description) + + +class String(Feature): + def __init__(self, value, description=None): + super(String, self).__init__(value, description=description) + + +class Regex(String): + def __init__(self, value, description=None): + super(Regex, self).__init__(value, description=description) + pat = self.value[len("/") : -len("/")] + flags = re.DOTALL + if value.endswith("/i"): + pat = self.value[len("/") : -len("/i")] + flags |= re.IGNORECASE + try: + self.re = re.compile(pat, flags) + except re.error: + if value.endswith("/i"): + value = value[: -len("i")] + raise ValueError( + "invalid regular expression: %s it should use Python syntax, try it at https://pythex.org" % value + ) + + def evaluate(self, ctx): + # mapping from string value to list of locations. + # will unique the locations later on. + matches = collections.defaultdict(list) + + for feature, locations in ctx.items(): + if not isinstance(feature, (capa.features.common.String,)): + continue + + # `re.search` finds a match anywhere in the given string + # which implies leading and/or trailing whitespace. + # using this mode cleans is more convenient for rule authors, + # so that they don't have to prefix/suffix their terms like: /.*foo.*/. + if self.re.search(feature.value): + matches[feature.value].extend(locations) + + if matches: + # finalize: defaultdict -> dict + # which makes json serialization easier + matches = dict(matches) + + # collect all locations + locations = set() + for s in matches.keys(): + matches[s] = list(set(matches[s])) + locations.update(matches[s]) + + # unlike other features, we cannot return put a reference to `self` directly in a `Result`. + # this is because `self` may match on many strings, so we can't stuff the matched value into it. + # instead, return a new instance that has a reference to both the regex and the matched values. + # see #262. + return capa.engine.Result(True, _MatchedRegex(self, matches), [], locations=locations) + else: + return capa.engine.Result(False, _MatchedRegex(self, None), []) + + def __str__(self): + return "regex(string =~ %s)" % self.value + + +class _MatchedRegex(Regex): + """ + this represents specific match instances of a regular expression feature. + treat it the same as a `Regex` except it has the `matches` field that contains the complete strings that matched. + + note: this type should only ever be constructed by `Regex.evaluate()`. it is not part of the public API. + """ + + def __init__(self, regex, matches): + """ + args: + regex (Regex): the regex feature that matches. + match (Dict[string, List[int]]|None): mapping from matching string to its locations. + """ + super(_MatchedRegex, self).__init__(regex.value, description=regex.description) + # we want this to collide with the name of `Regex` above, + # so that it works nicely with the renderers. + self.name = "regex" + # this may be None if the regex doesn't match + self.matches = matches + + def __str__(self): + return "regex(string =~ %s, matches = %s)" % ( + self.value, + ", ".join(map(lambda s: '"' + s + '"', (self.matches or {}).keys())), + ) + + +class StringFactory(object): + def __new__(cls, value, description=None): + if value.startswith("/") and (value.endswith("/") or value.endswith("/i")): + return Regex(value, description=description) + return String(value, description=description) + + +class Bytes(Feature): + def __init__(self, value, description=None): + super(Bytes, self).__init__(value, description=description) + + def evaluate(self, ctx): + for feature, locations in ctx.items(): + if not isinstance(feature, (capa.features.common.Bytes,)): + continue + + if feature.value.startswith(self.value): + return capa.engine.Result(True, self, [], locations=locations) + + return capa.engine.Result(False, self, []) + + def get_value_str(self): + return hex_string(bytes_to_str(self.value)) + + def freeze_serialize(self): + return (self.__class__.__name__, [bytes_to_str(self.value).upper()]) + + @classmethod + def freeze_deserialize(cls, args): + return cls(*[codecs.decode(x, "hex") for x in args]) diff --git a/capa/features/extractors/__init__.py b/capa/features/extractors/__init__.py index 1faea42d..e69de29b 100644 --- a/capa/features/extractors/__init__.py +++ b/capa/features/extractors/__init__.py @@ -1,321 +0,0 @@ -# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: [package root]/LICENSE.txt -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under the License. - -import abc - - -class FeatureExtractor(object): - """ - FeatureExtractor defines the interface for fetching features from a sample. - - There may be multiple backends that support fetching features for capa. - For example, we use vivisect by default, but also want to support saving - and restoring features from a JSON file. - When we restore the features, we'd like to use exactly the same matching logic - to find matching rules. - Therefore, we can define a FeatureExtractor that provides features from the - serialized JSON file and do matching without a binary analysis pass. - Also, this provides a way to hook in an IDA backend. - - This class is not instantiated directly; it is the base class for other implementations. - """ - - __metaclass__ = abc.ABCMeta - - def __init__(self): - # - # note: a subclass should define ctor parameters for its own use. - # for example, the Vivisect feature extract might require the vw and/or path. - # this base class doesn't know what to do with that info, though. - # - super(FeatureExtractor, self).__init__() - - @abc.abstractmethod - def get_base_address(self): - """ - fetch the preferred load address at which the sample was analyzed. - - returns: int - """ - raise NotImplemented - - @abc.abstractmethod - def extract_file_features(self): - """ - extract file-scope features. - - example:: - - extractor = VivisectFeatureExtractor(vw, path) - for feature, va in extractor.get_file_features(): - print('0x%x: %s', va, feature) - - yields: - Tuple[capa.features.Feature, int]: feature and its location - """ - raise NotImplemented - - @abc.abstractmethod - def get_functions(self): - """ - enumerate the functions and provide opaque values that will - subsequently be provided to `.extract_function_features()`, etc. - - by "opaque value", we mean that this can be any object, as long as it - provides enough context to `.extract_function_features()`. - - the opaque value should support casting to int (`__int__`) for the function start address. - - yields: - any: the opaque function value. - """ - raise NotImplemented - - def is_library_function(self, va): - """ - is the given address a library function? - the backend may implement its own function matching algorithm, or none at all. - we accept a VA here, rather than function object, to handle addresses identified in instructions. - - this information is used to: - - filter out matches in library functions (by default), and - - recognize when to fetch symbol names for called (non-API) functions - - args: - va (int): the virtual address of a function. - - returns: - bool: True if the given address is the start of a library function. - """ - return False - - def get_function_name(self, va): - """ - fetch any recognized name for the given address. - this is only guaranteed to return a value when the given function is a recognized library function. - we accept a VA here, rather than function object, to handle addresses identified in instructions. - - args: - va (int): the virtual address of a function. - - returns: - str: the function name - - raises: - KeyError: when the given function does not have a name. - """ - raise KeyError(va) - - @abc.abstractmethod - def extract_function_features(self, f): - """ - extract function-scope features. - the arguments are opaque values previously provided by `.get_functions()`, etc. - - example:: - - extractor = VivisectFeatureExtractor(vw, path) - for function in extractor.get_functions(): - for feature, va in extractor.extract_function_features(function): - print('0x%x: %s', va, feature) - - args: - f [any]: an opaque value previously fetched from `.get_functions()`. - - yields: - Tuple[capa.features.Feature, int]: feature and its location - """ - raise NotImplemented - - @abc.abstractmethod - def get_basic_blocks(self, f): - """ - enumerate the basic blocks in the given function and provide opaque values that will - subsequently be provided to `.extract_basic_block_features()`, etc. - - by "opaque value", we mean that this can be any object, as long as it - provides enough context to `.extract_basic_block_features()`. - - the opaque value should support casting to int (`__int__`) for the basic block start address. - - yields: - any: the opaque basic block value. - """ - raise NotImplemented - - @abc.abstractmethod - def extract_basic_block_features(self, f, bb): - """ - extract basic block-scope features. - the arguments are opaque values previously provided by `.get_functions()`, etc. - - example:: - - extractor = VivisectFeatureExtractor(vw, path) - for function in extractor.get_functions(): - for bb in extractor.get_basic_blocks(function): - for feature, va in extractor.extract_basic_block_features(function, bb): - print('0x%x: %s', va, feature) - - args: - f [any]: an opaque value previously fetched from `.get_functions()`. - bb [any]: an opaque value previously fetched from `.get_basic_blocks()`. - - yields: - Tuple[capa.features.Feature, int]: feature and its location - """ - raise NotImplemented - - @abc.abstractmethod - def get_instructions(self, f, bb): - """ - enumerate the instructions in the given basic block and provide opaque values that will - subsequently be provided to `.extract_insn_features()`, etc. - - by "opaque value", we mean that this can be any object, as long as it - provides enough context to `.extract_insn_features()`. - - the opaque value should support casting to int (`__int__`) for the instruction address. - - yields: - any: the opaque function value. - """ - raise NotImplemented - - @abc.abstractmethod - def extract_insn_features(self, f, bb, insn): - """ - extract instruction-scope features. - the arguments are opaque values previously provided by `.get_functions()`, etc. - - example:: - - extractor = VivisectFeatureExtractor(vw, path) - for function in extractor.get_functions(): - for bb in extractor.get_basic_blocks(function): - for insn in extractor.get_instructions(function, bb): - for feature, va in extractor.extract_insn_features(function, bb, insn): - print('0x%x: %s', va, feature) - - args: - f [any]: an opaque value previously fetched from `.get_functions()`. - bb [any]: an opaque value previously fetched from `.get_basic_blocks()`. - insn [any]: an opaque value previously fetched from `.get_instructions()`. - - yields: - Tuple[capa.features.Feature, int]: feature and its location - """ - raise NotImplemented - - -class NullFeatureExtractor(FeatureExtractor): - """ - An extractor that extracts some user-provided features. - The structure of the single parameter is demonstrated in the example below. - - This is useful for testing, as we can provide expected values and see if matching works. - Also, this is how we represent features deserialized from a freeze file. - - example:: - - extractor = NullFeatureExtractor({ - 'base address: 0x401000, - 'file features': [ - (0x402345, capa.features.Characteristic('embedded pe')), - ], - 'functions': { - 0x401000: { - 'features': [ - (0x401000, capa.features.Characteristic('nzxor')), - ], - 'basic blocks': { - 0x401000: { - 'features': [ - (0x401000, capa.features.Characteristic('tight-loop')), - ], - 'instructions': { - 0x401000: { - 'features': [ - (0x401000, capa.features.Characteristic('nzxor')), - ], - }, - 0x401002: ... - } - }, - 0x401005: ... - } - }, - 0x40200: ... - } - ) - """ - - def __init__(self, features): - super(NullFeatureExtractor, self).__init__() - self.features = features - - def get_base_address(self): - return self.features["base address"] - - def extract_file_features(self): - for p in self.features.get("file features", []): - va, feature = p - yield feature, va - - def get_functions(self): - for va in sorted(self.features["functions"].keys()): - yield va - - def extract_function_features(self, f): - for p in self.features.get("functions", {}).get(f, {}).get("features", []): # noqa: E127 line over-indented - va, feature = p - yield feature, va - - def get_basic_blocks(self, f): - for va in sorted( - self.features.get("functions", {}) # noqa: E127 line over-indented - .get(f, {}) - .get("basic blocks", {}) - .keys() - ): - yield va - - def extract_basic_block_features(self, f, bb): - for p in ( - self.features.get("functions", {}) # noqa: E127 line over-indented - .get(f, {}) - .get("basic blocks", {}) - .get(bb, {}) - .get("features", []) - ): - va, feature = p - yield feature, va - - def get_instructions(self, f, bb): - for va in sorted( - self.features.get("functions", {}) # noqa: E127 line over-indented - .get(f, {}) - .get("basic blocks", {}) - .get(bb, {}) - .get("instructions", {}) - .keys() - ): - yield va - - def extract_insn_features(self, f, bb, insn): - for p in ( - self.features.get("functions", {}) # noqa: E127 line over-indented - .get(f, {}) - .get("basic blocks", {}) - .get(bb, {}) - .get("instructions", {}) - .get(insn, {}) - .get("features", []) - ): - va, feature = p - yield feature, va diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py new file mode 100644 index 00000000..1faea42d --- /dev/null +++ b/capa/features/extractors/base_extractor.py @@ -0,0 +1,321 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import abc + + +class FeatureExtractor(object): + """ + FeatureExtractor defines the interface for fetching features from a sample. + + There may be multiple backends that support fetching features for capa. + For example, we use vivisect by default, but also want to support saving + and restoring features from a JSON file. + When we restore the features, we'd like to use exactly the same matching logic + to find matching rules. + Therefore, we can define a FeatureExtractor that provides features from the + serialized JSON file and do matching without a binary analysis pass. + Also, this provides a way to hook in an IDA backend. + + This class is not instantiated directly; it is the base class for other implementations. + """ + + __metaclass__ = abc.ABCMeta + + def __init__(self): + # + # note: a subclass should define ctor parameters for its own use. + # for example, the Vivisect feature extract might require the vw and/or path. + # this base class doesn't know what to do with that info, though. + # + super(FeatureExtractor, self).__init__() + + @abc.abstractmethod + def get_base_address(self): + """ + fetch the preferred load address at which the sample was analyzed. + + returns: int + """ + raise NotImplemented + + @abc.abstractmethod + def extract_file_features(self): + """ + extract file-scope features. + + example:: + + extractor = VivisectFeatureExtractor(vw, path) + for feature, va in extractor.get_file_features(): + print('0x%x: %s', va, feature) + + yields: + Tuple[capa.features.Feature, int]: feature and its location + """ + raise NotImplemented + + @abc.abstractmethod + def get_functions(self): + """ + enumerate the functions and provide opaque values that will + subsequently be provided to `.extract_function_features()`, etc. + + by "opaque value", we mean that this can be any object, as long as it + provides enough context to `.extract_function_features()`. + + the opaque value should support casting to int (`__int__`) for the function start address. + + yields: + any: the opaque function value. + """ + raise NotImplemented + + def is_library_function(self, va): + """ + is the given address a library function? + the backend may implement its own function matching algorithm, or none at all. + we accept a VA here, rather than function object, to handle addresses identified in instructions. + + this information is used to: + - filter out matches in library functions (by default), and + - recognize when to fetch symbol names for called (non-API) functions + + args: + va (int): the virtual address of a function. + + returns: + bool: True if the given address is the start of a library function. + """ + return False + + def get_function_name(self, va): + """ + fetch any recognized name for the given address. + this is only guaranteed to return a value when the given function is a recognized library function. + we accept a VA here, rather than function object, to handle addresses identified in instructions. + + args: + va (int): the virtual address of a function. + + returns: + str: the function name + + raises: + KeyError: when the given function does not have a name. + """ + raise KeyError(va) + + @abc.abstractmethod + def extract_function_features(self, f): + """ + extract function-scope features. + the arguments are opaque values previously provided by `.get_functions()`, etc. + + example:: + + extractor = VivisectFeatureExtractor(vw, path) + for function in extractor.get_functions(): + for feature, va in extractor.extract_function_features(function): + print('0x%x: %s', va, feature) + + args: + f [any]: an opaque value previously fetched from `.get_functions()`. + + yields: + Tuple[capa.features.Feature, int]: feature and its location + """ + raise NotImplemented + + @abc.abstractmethod + def get_basic_blocks(self, f): + """ + enumerate the basic blocks in the given function and provide opaque values that will + subsequently be provided to `.extract_basic_block_features()`, etc. + + by "opaque value", we mean that this can be any object, as long as it + provides enough context to `.extract_basic_block_features()`. + + the opaque value should support casting to int (`__int__`) for the basic block start address. + + yields: + any: the opaque basic block value. + """ + raise NotImplemented + + @abc.abstractmethod + def extract_basic_block_features(self, f, bb): + """ + extract basic block-scope features. + the arguments are opaque values previously provided by `.get_functions()`, etc. + + example:: + + extractor = VivisectFeatureExtractor(vw, path) + for function in extractor.get_functions(): + for bb in extractor.get_basic_blocks(function): + for feature, va in extractor.extract_basic_block_features(function, bb): + print('0x%x: %s', va, feature) + + args: + f [any]: an opaque value previously fetched from `.get_functions()`. + bb [any]: an opaque value previously fetched from `.get_basic_blocks()`. + + yields: + Tuple[capa.features.Feature, int]: feature and its location + """ + raise NotImplemented + + @abc.abstractmethod + def get_instructions(self, f, bb): + """ + enumerate the instructions in the given basic block and provide opaque values that will + subsequently be provided to `.extract_insn_features()`, etc. + + by "opaque value", we mean that this can be any object, as long as it + provides enough context to `.extract_insn_features()`. + + the opaque value should support casting to int (`__int__`) for the instruction address. + + yields: + any: the opaque function value. + """ + raise NotImplemented + + @abc.abstractmethod + def extract_insn_features(self, f, bb, insn): + """ + extract instruction-scope features. + the arguments are opaque values previously provided by `.get_functions()`, etc. + + example:: + + extractor = VivisectFeatureExtractor(vw, path) + for function in extractor.get_functions(): + for bb in extractor.get_basic_blocks(function): + for insn in extractor.get_instructions(function, bb): + for feature, va in extractor.extract_insn_features(function, bb, insn): + print('0x%x: %s', va, feature) + + args: + f [any]: an opaque value previously fetched from `.get_functions()`. + bb [any]: an opaque value previously fetched from `.get_basic_blocks()`. + insn [any]: an opaque value previously fetched from `.get_instructions()`. + + yields: + Tuple[capa.features.Feature, int]: feature and its location + """ + raise NotImplemented + + +class NullFeatureExtractor(FeatureExtractor): + """ + An extractor that extracts some user-provided features. + The structure of the single parameter is demonstrated in the example below. + + This is useful for testing, as we can provide expected values and see if matching works. + Also, this is how we represent features deserialized from a freeze file. + + example:: + + extractor = NullFeatureExtractor({ + 'base address: 0x401000, + 'file features': [ + (0x402345, capa.features.Characteristic('embedded pe')), + ], + 'functions': { + 0x401000: { + 'features': [ + (0x401000, capa.features.Characteristic('nzxor')), + ], + 'basic blocks': { + 0x401000: { + 'features': [ + (0x401000, capa.features.Characteristic('tight-loop')), + ], + 'instructions': { + 0x401000: { + 'features': [ + (0x401000, capa.features.Characteristic('nzxor')), + ], + }, + 0x401002: ... + } + }, + 0x401005: ... + } + }, + 0x40200: ... + } + ) + """ + + def __init__(self, features): + super(NullFeatureExtractor, self).__init__() + self.features = features + + def get_base_address(self): + return self.features["base address"] + + def extract_file_features(self): + for p in self.features.get("file features", []): + va, feature = p + yield feature, va + + def get_functions(self): + for va in sorted(self.features["functions"].keys()): + yield va + + def extract_function_features(self, f): + for p in self.features.get("functions", {}).get(f, {}).get("features", []): # noqa: E127 line over-indented + va, feature = p + yield feature, va + + def get_basic_blocks(self, f): + for va in sorted( + self.features.get("functions", {}) # noqa: E127 line over-indented + .get(f, {}) + .get("basic blocks", {}) + .keys() + ): + yield va + + def extract_basic_block_features(self, f, bb): + for p in ( + self.features.get("functions", {}) # noqa: E127 line over-indented + .get(f, {}) + .get("basic blocks", {}) + .get(bb, {}) + .get("features", []) + ): + va, feature = p + yield feature, va + + def get_instructions(self, f, bb): + for va in sorted( + self.features.get("functions", {}) # noqa: E127 line over-indented + .get(f, {}) + .get("basic blocks", {}) + .get(bb, {}) + .get("instructions", {}) + .keys() + ): + yield va + + def extract_insn_features(self, f, bb, insn): + for p in ( + self.features.get("functions", {}) # noqa: E127 line over-indented + .get(f, {}) + .get("basic blocks", {}) + .get(bb, {}) + .get("instructions", {}) + .get(insn, {}) + .get("features", []) + ): + va, feature = p + yield feature, va diff --git a/capa/features/extractors/ida/__init__.py b/capa/features/extractors/ida/__init__.py index a089c9ae..e69de29b 100644 --- a/capa/features/extractors/ida/__init__.py +++ b/capa/features/extractors/ida/__init__.py @@ -1,108 +0,0 @@ -# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: [package root]/LICENSE.txt -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under the License. - -import idaapi - -import capa.features.extractors.ida.file -import capa.features.extractors.ida.insn -import capa.features.extractors.ida.function -import capa.features.extractors.ida.basicblock -from capa.features.extractors import FeatureExtractor - - -class FunctionHandle: - """this acts like an idaapi.func_t but with __int__()""" - - def __init__(self, inner): - self._inner = inner - - def __int__(self): - return self.start_ea - - def __getattr__(self, name): - return getattr(self._inner, name) - - -class BasicBlockHandle: - """this acts like an idaapi.BasicBlock but with __int__()""" - - def __init__(self, inner): - self._inner = inner - - def __int__(self): - return self.start_ea - - def __getattr__(self, name): - return getattr(self._inner, name) - - -class InstructionHandle: - """this acts like an idaapi.insn_t but with __int__()""" - - def __init__(self, inner): - self._inner = inner - - def __int__(self): - return self.ea - - def __getattr__(self, name): - return getattr(self._inner, name) - - -class IdaFeatureExtractor(FeatureExtractor): - def __init__(self): - super(IdaFeatureExtractor, self).__init__() - - def get_base_address(self): - return idaapi.get_imagebase() - - def extract_file_features(self): - for (feature, ea) in capa.features.extractors.ida.file.extract_features(): - yield feature, ea - - def get_functions(self): - import capa.features.extractors.ida.helpers as ida_helpers - - # data structure shared across functions yielded here. - # useful for caching analysis relevant across a single workspace. - ctx = {} - - # ignore library functions and thunk functions as identified by IDA - for f in ida_helpers.get_functions(skip_thunks=True, skip_libs=True): - setattr(f, "ctx", ctx) - yield FunctionHandle(f) - - @staticmethod - def get_function(ea): - f = idaapi.get_func(ea) - setattr(f, "ctx", {}) - return FunctionHandle(f) - - def extract_function_features(self, f): - for (feature, ea) in capa.features.extractors.ida.function.extract_features(f): - yield feature, ea - - def get_basic_blocks(self, f): - import capa.features.extractors.ida.helpers as ida_helpers - - for bb in ida_helpers.get_function_blocks(f): - yield BasicBlockHandle(bb) - - def extract_basic_block_features(self, f, bb): - for (feature, ea) in capa.features.extractors.ida.basicblock.extract_features(f, bb): - yield feature, ea - - def get_instructions(self, f, bb): - import capa.features.extractors.ida.helpers as ida_helpers - - for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea): - yield InstructionHandle(insn) - - def extract_insn_features(self, f, bb, insn): - for (feature, ea) in capa.features.extractors.ida.insn.extract_features(f, bb, insn): - yield feature, ea diff --git a/capa/features/extractors/ida/basicblock.py b/capa/features/extractors/ida/basicblock.py index 0a0e08f0..2e871751 100644 --- a/capa/features/extractors/ida/basicblock.py +++ b/capa/features/extractors/ida/basicblock.py @@ -6,14 +6,13 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys import string import struct import idaapi import capa.features.extractors.ida.helpers -from capa.features import Characteristic +from capa.features.common import Characteristic from capa.features.basicblock import BasicBlock from capa.features.extractors.ida import helpers from capa.features.extractors.helpers import MIN_STACKSTRING_LEN diff --git a/capa/features/extractors/ida/extractor.py b/capa/features/extractors/ida/extractor.py new file mode 100644 index 00000000..dc260adc --- /dev/null +++ b/capa/features/extractors/ida/extractor.py @@ -0,0 +1,108 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import idaapi + +import capa.features.extractors.ida.file +import capa.features.extractors.ida.insn +import capa.features.extractors.ida.function +import capa.features.extractors.ida.basicblock +from capa.features.extractors.base_extractor import FeatureExtractor + + +class FunctionHandle: + """this acts like an idaapi.func_t but with __int__()""" + + def __init__(self, inner): + self._inner = inner + + def __int__(self): + return self.start_ea + + def __getattr__(self, name): + return getattr(self._inner, name) + + +class BasicBlockHandle: + """this acts like an idaapi.BasicBlock but with __int__()""" + + def __init__(self, inner): + self._inner = inner + + def __int__(self): + return self.start_ea + + def __getattr__(self, name): + return getattr(self._inner, name) + + +class InstructionHandle: + """this acts like an idaapi.insn_t but with __int__()""" + + def __init__(self, inner): + self._inner = inner + + def __int__(self): + return self.ea + + def __getattr__(self, name): + return getattr(self._inner, name) + + +class IdaFeatureExtractor(FeatureExtractor): + def __init__(self): + super(IdaFeatureExtractor, self).__init__() + + def get_base_address(self): + return idaapi.get_imagebase() + + def extract_file_features(self): + for (feature, ea) in capa.features.extractors.ida.file.extract_features(): + yield feature, ea + + def get_functions(self): + import capa.features.extractors.ida.helpers as ida_helpers + + # data structure shared across functions yielded here. + # useful for caching analysis relevant across a single workspace. + ctx = {} + + # ignore library functions and thunk functions as identified by IDA + for f in ida_helpers.get_functions(skip_thunks=True, skip_libs=True): + setattr(f, "ctx", ctx) + yield FunctionHandle(f) + + @staticmethod + def get_function(ea): + f = idaapi.get_func(ea) + setattr(f, "ctx", {}) + return FunctionHandle(f) + + def extract_function_features(self, f): + for (feature, ea) in capa.features.extractors.ida.function.extract_features(f): + yield feature, ea + + def get_basic_blocks(self, f): + import capa.features.extractors.ida.helpers as ida_helpers + + for bb in ida_helpers.get_function_blocks(f): + yield BasicBlockHandle(bb) + + def extract_basic_block_features(self, f, bb): + for (feature, ea) in capa.features.extractors.ida.basicblock.extract_features(f, bb): + yield feature, ea + + def get_instructions(self, f, bb): + import capa.features.extractors.ida.helpers as ida_helpers + + for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea): + yield InstructionHandle(insn) + + def extract_insn_features(self, f, bb, insn): + for (feature, ea) in capa.features.extractors.ida.insn.extract_features(f, bb, insn): + yield feature, ea diff --git a/capa/features/extractors/ida/function.py b/capa/features/extractors/ida/function.py index 670c7b73..9bbdba51 100644 --- a/capa/features/extractors/ida/function.py +++ b/capa/features/extractors/ida/function.py @@ -10,7 +10,7 @@ import idaapi import idautils import capa.features.extractors.ida.helpers -from capa.features import Characteristic +from capa.features.common import Characteristic from capa.features.extractors import loops diff --git a/capa/features/extractors/ida/helpers.py b/capa/features/extractors/ida/helpers.py index 4b7dcdeb..7a07e1a2 100644 --- a/capa/features/extractors/ida/helpers.py +++ b/capa/features/extractors/ida/helpers.py @@ -6,9 +6,6 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys -import string - import idc import idaapi import idautils diff --git a/capa/features/extractors/ida/insn.py b/capa/features/extractors/ida/insn.py index ec2f8070..044e5923 100644 --- a/capa/features/extractors/ida/insn.py +++ b/capa/features/extractors/ida/insn.py @@ -12,7 +12,8 @@ import idautils import capa.features.extractors.helpers import capa.features.extractors.ida.helpers -from capa.features import ( +from capa.features.insn import API, Number, Offset, Mnemonic +from capa.features.common import ( ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, @@ -21,7 +22,6 @@ from capa.features import ( String, Characteristic, ) -from capa.features.insn import API, Number, Offset, Mnemonic # security cookie checks may perform non-zeroing XORs, these are expected within a certain # byte range within the first and returning basic blocks, this helps to reduce FP features diff --git a/capa/features/extractors/pefile/__init__.py b/capa/features/extractors/pefile.py similarity index 97% rename from capa/features/extractors/pefile/__init__.py rename to capa/features/extractors/pefile.py index def4476f..6d9e6ae7 100644 --- a/capa/features/extractors/pefile/__init__.py +++ b/capa/features/extractors/pefile.py @@ -12,11 +12,10 @@ import pefile import capa.features.extractors import capa.features.extractors.helpers import capa.features.extractors.strings -from capa.features import String, Characteristic from capa.features.file import Export, Import, Section -from capa.features.extractors import FeatureExtractor +from capa.features.common import String, Characteristic +from capa.features.extractors.base_extractor import FeatureExtractor -__all__ = ["file"] logger = logging.getLogger(__name__) diff --git a/capa/features/extractors/smda/__init__.py b/capa/features/extractors/smda/__init__.py index 9d58a7de..e69de29b 100644 --- a/capa/features/extractors/smda/__init__.py +++ b/capa/features/extractors/smda/__init__.py @@ -1,50 +0,0 @@ -import sys -import types - -from smda.common.SmdaReport import SmdaReport -from smda.common.SmdaInstruction import SmdaInstruction - -import capa.features.extractors.smda.file -import capa.features.extractors.smda.insn -import capa.features.extractors.smda.function -import capa.features.extractors.smda.basicblock -from capa.main import UnsupportedRuntimeError -from capa.features.extractors import FeatureExtractor - - -class SmdaFeatureExtractor(FeatureExtractor): - def __init__(self, smda_report: SmdaReport, path): - super(SmdaFeatureExtractor, self).__init__() - self.smda_report = smda_report - self.path = path - - def get_base_address(self): - return self.smda_report.base_addr - - def extract_file_features(self): - for feature, va in capa.features.extractors.smda.file.extract_features(self.smda_report, self.path): - yield feature, va - - def get_functions(self): - for function in self.smda_report.getFunctions(): - yield function - - def extract_function_features(self, f): - for feature, va in capa.features.extractors.smda.function.extract_features(f): - yield feature, va - - def get_basic_blocks(self, f): - for bb in f.getBlocks(): - yield bb - - def extract_basic_block_features(self, f, bb): - for feature, va in capa.features.extractors.smda.basicblock.extract_features(f, bb): - yield feature, va - - def get_instructions(self, f, bb): - for smda_ins in bb.getInstructions(): - yield smda_ins - - def extract_insn_features(self, f, bb, insn): - for feature, va in capa.features.extractors.smda.insn.extract_features(f, bb, insn): - yield feature, va diff --git a/capa/features/extractors/smda/basicblock.py b/capa/features/extractors/smda/basicblock.py index 98b91ec3..b68d02a0 100644 --- a/capa/features/extractors/smda/basicblock.py +++ b/capa/features/extractors/smda/basicblock.py @@ -1,8 +1,7 @@ -import sys import string import struct -from capa.features import Characteristic +from capa.features.common import Characteristic from capa.features.basicblock import BasicBlock from capa.features.extractors.helpers import MIN_STACKSTRING_LEN diff --git a/capa/features/extractors/smda/extractor.py b/capa/features/extractors/smda/extractor.py new file mode 100644 index 00000000..b4355d8f --- /dev/null +++ b/capa/features/extractors/smda/extractor.py @@ -0,0 +1,45 @@ +from smda.common.SmdaReport import SmdaReport + +import capa.features.extractors.smda.file +import capa.features.extractors.smda.insn +import capa.features.extractors.smda.function +import capa.features.extractors.smda.basicblock +from capa.features.extractors.base_extractor import FeatureExtractor + + +class SmdaFeatureExtractor(FeatureExtractor): + def __init__(self, smda_report: SmdaReport, path): + super(SmdaFeatureExtractor, self).__init__() + self.smda_report = smda_report + self.path = path + + def get_base_address(self): + return self.smda_report.base_addr + + def extract_file_features(self): + for feature, va in capa.features.extractors.smda.file.extract_features(self.smda_report, self.path): + yield feature, va + + def get_functions(self): + for function in self.smda_report.getFunctions(): + yield function + + def extract_function_features(self, f): + for feature, va in capa.features.extractors.smda.function.extract_features(f): + yield feature, va + + def get_basic_blocks(self, f): + for bb in f.getBlocks(): + yield bb + + def extract_basic_block_features(self, f, bb): + for feature, va in capa.features.extractors.smda.basicblock.extract_features(f, bb): + yield feature, va + + def get_instructions(self, f, bb): + for smda_ins in bb.getInstructions(): + yield smda_ins + + def extract_insn_features(self, f, bb, insn): + for feature, va in capa.features.extractors.smda.insn.extract_features(f, bb, insn): + yield feature, va diff --git a/capa/features/extractors/smda/file.py b/capa/features/extractors/smda/file.py index 68fee066..5250e26b 100644 --- a/capa/features/extractors/smda/file.py +++ b/capa/features/extractors/smda/file.py @@ -1,12 +1,10 @@ -import struct - # if we have SMDA we definitely have lief import lief import capa.features.extractors.helpers import capa.features.extractors.strings -from capa.features import String, Characteristic from capa.features.file import Export, Import, Section +from capa.features.common import String, Characteristic def extract_file_embedded_pe(smda_report, file_path): diff --git a/capa/features/extractors/smda/function.py b/capa/features/extractors/smda/function.py index e1765e67..33c6c5d1 100644 --- a/capa/features/extractors/smda/function.py +++ b/capa/features/extractors/smda/function.py @@ -1,4 +1,4 @@ -from capa.features import Characteristic +from capa.features.common import Characteristic from capa.features.extractors import loops diff --git a/capa/features/extractors/smda/insn.py b/capa/features/extractors/smda/insn.py index 48ef4c1c..eddc859d 100644 --- a/capa/features/extractors/smda/insn.py +++ b/capa/features/extractors/smda/insn.py @@ -5,7 +5,8 @@ import struct from smda.common.SmdaReport import SmdaReport import capa.features.extractors.helpers -from capa.features import ( +from capa.features.insn import API, Number, Offset, Mnemonic +from capa.features.common import ( ARCH_X32, ARCH_X64, MAX_BYTES_FEATURE_SIZE, @@ -14,7 +15,6 @@ from capa.features import ( String, Characteristic, ) -from capa.features.insn import API, Number, Offset, Mnemonic # security cookie checks may perform non-zeroing XORs, these are expected within a certain # byte range within the first and returning basic blocks, this helps to reduce FP features @@ -97,7 +97,7 @@ def read_bytes(smda_report, va, num_bytes=None): rva = va - smda_report.base_addr if smda_report.buffer is None: - return + raise ValueError("buffer is empty") buffer_end = len(smda_report.buffer) max_bytes = num_bytes if num_bytes is not None else MAX_BYTES_FEATURE_SIZE if rva + max_bytes > buffer_end: diff --git a/capa/features/extractors/viv/__init__.py b/capa/features/extractors/viv/__init__.py index c54ee0be..e69de29b 100644 --- a/capa/features/extractors/viv/__init__.py +++ b/capa/features/extractors/viv/__init__.py @@ -1,78 +0,0 @@ -# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: [package root]/LICENSE.txt -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under the License. -import logging - -import viv_utils -import viv_utils.flirt - -import capa.features.extractors -import capa.features.extractors.viv.file -import capa.features.extractors.viv.insn -import capa.features.extractors.viv.function -import capa.features.extractors.viv.basicblock -from capa.features.extractors import FeatureExtractor - -__all__ = ["file", "function", "basicblock", "insn"] -logger = logging.getLogger(__name__) - - -class InstructionHandle: - """this acts like a vivisect.Opcode but with an __int__() method""" - - def __init__(self, inner): - self._inner = inner - - def __int__(self): - return self.va - - def __getattr__(self, name): - return getattr(self._inner, name) - - -class VivisectFeatureExtractor(FeatureExtractor): - def __init__(self, vw, path): - super(VivisectFeatureExtractor, self).__init__() - self.vw = vw - self.path = path - - def get_base_address(self): - # assume there is only one file loaded into the vw - return list(self.vw.filemeta.values())[0]["imagebase"] - - def extract_file_features(self): - for feature, va in capa.features.extractors.viv.file.extract_features(self.vw, self.path): - yield feature, va - - def get_functions(self): - for va in sorted(self.vw.getFunctions()): - yield viv_utils.Function(self.vw, va) - - def extract_function_features(self, f): - for feature, va in capa.features.extractors.viv.function.extract_features(f): - yield feature, va - - def get_basic_blocks(self, f): - return f.basic_blocks - - def extract_basic_block_features(self, f, bb): - for feature, va in capa.features.extractors.viv.basicblock.extract_features(f, bb): - yield feature, va - - def get_instructions(self, f, bb): - for insn in bb.instructions: - yield InstructionHandle(insn) - - def extract_insn_features(self, f, bb, insn): - for feature, va in capa.features.extractors.viv.insn.extract_features(f, bb, insn): - yield feature, va - - def is_library_function(self, va): - return viv_utils.flirt.is_library_function(self.vw, va) - - def get_function_name(self, va): - return viv_utils.get_function_name(self.vw, va) diff --git a/capa/features/extractors/viv/basicblock.py b/capa/features/extractors/viv/basicblock.py index a76aa179..510513f8 100644 --- a/capa/features/extractors/viv/basicblock.py +++ b/capa/features/extractors/viv/basicblock.py @@ -10,9 +10,9 @@ import string import struct import envi -import vivisect.const +import envi.archs.i386.disasm -from capa.features import Characteristic +from capa.features.common import Characteristic from capa.features.basicblock import BasicBlock from capa.features.extractors.helpers import MIN_STACKSTRING_LEN @@ -37,7 +37,7 @@ def _bb_has_tight_loop(f, bb): """ if len(bb.instructions) > 0: for bva, bflags in bb.instructions[-1].getBranches(): - if bflags & vivisect.envi.BR_COND: + if bflags & envi.BR_COND: if bva == bb.va: return True @@ -117,11 +117,15 @@ def get_printable_len(oper): chars = struct.pack(" 0: @@ -306,7 +311,7 @@ def read_string(vw, offset): try: ulen = vw.detectUnicode(offset) - except envi.SegmentationViolation: + except envi.exc.SegmentationViolation: pass except IndexError: # potential vivisect bug detecting Unicode at segment end @@ -367,14 +372,14 @@ def extract_insn_offset_features(f, bb, insn): # reg ^ # disp if isinstance(oper, envi.archs.i386.disasm.i386RegMemOper): - if oper.reg == envi.archs.i386.disasm.REG_ESP: + if oper.reg == envi.archs.i386.regs.REG_ESP: continue - if oper.reg == envi.archs.i386.disasm.REG_EBP: + if oper.reg == envi.archs.i386.regs.REG_EBP: continue # TODO: do x64 support for real. - if oper.reg == envi.archs.amd64.disasm.REG_RBP: + if oper.reg == envi.archs.amd64.regs.REG_RBP: continue # viv already decodes offsets as signed @@ -402,11 +407,11 @@ def is_security_cookie(f, bb, insn): # security cookie check should use SP or BP oper = insn.opers[1] if oper.isReg() and oper.reg not in [ - envi.archs.i386.disasm.REG_ESP, - envi.archs.i386.disasm.REG_EBP, + envi.archs.i386.regs.REG_ESP, + envi.archs.i386.regs.REG_EBP, # TODO: do x64 support for real. - envi.archs.amd64.disasm.REG_RBP, - envi.archs.amd64.disasm.REG_RSP, + envi.archs.amd64.regs.REG_RBP, + envi.archs.amd64.regs.REG_RSP, ]: return False diff --git a/capa/features/file.py b/capa/features/file.py index 2cc4d577..4af93cde 100644 --- a/capa/features/file.py +++ b/capa/features/file.py @@ -6,7 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from capa.features import Feature +from capa.features.common import Feature class Export(Feature): diff --git a/capa/features/freeze.py b/capa/features/freeze.py index 68e93563..c6d4d7f5 100644 --- a/capa/features/freeze.py +++ b/capa/features/freeze.py @@ -53,11 +53,11 @@ import json import zlib import logging -import capa.features import capa.features.file import capa.features.insn +import capa.features.common import capa.features.basicblock -import capa.features.extractors +import capa.features.extractors.base_extractor from capa.helpers import hex logger = logging.getLogger(__name__) @@ -67,7 +67,7 @@ def serialize_feature(feature): return feature.freeze_serialize() -KNOWN_FEATURES = {F.__name__: F for F in capa.features.Feature.__subclasses__()} +KNOWN_FEATURES = {F.__name__: F for F in capa.features.common.Feature.__subclasses__()} def deserialize_feature(doc): @@ -80,7 +80,7 @@ def dumps(extractor): serialize the given extractor to a string args: - extractor: capa.features.extractor.FeatureExtractor: + extractor: capa.features.extractors.base_extractor.FeatureExtractor: returns: str: the serialized features. @@ -217,7 +217,7 @@ def loads(s): feature = deserialize_feature(feature[:2]) features["functions"][loc[0]]["basic blocks"][loc[1]]["instructions"][loc[2]]["features"].append((va, feature)) - return capa.features.extractors.NullFeatureExtractor(features) + return capa.features.extractors.base_extractor.NullFeatureExtractor(features) MAGIC = "capa0000".encode("ascii") diff --git a/capa/features/insn.py b/capa/features/insn.py index 82e0ad7a..beadb815 100644 --- a/capa/features/insn.py +++ b/capa/features/insn.py @@ -7,7 +7,7 @@ # See the License for the specific language governing permissions and limitations under the License. import capa.render.utils -from capa.features import Feature +from capa.features.common import Feature class API(Feature): diff --git a/capa/ida/helpers/__init__.py b/capa/ida/helpers.py similarity index 95% rename from capa/ida/helpers/__init__.py rename to capa/ida/helpers.py index 41e4b463..b2e8f0fc 100644 --- a/capa/ida/helpers/__init__.py +++ b/capa/ida/helpers.py @@ -14,6 +14,8 @@ import idaapi import idautils import capa +import capa.version +import capa.features.common logger = logging.getLogger("capa") @@ -86,7 +88,7 @@ def get_file_md5(): """ """ md5 = idautils.GetInputFileMD5() if not isinstance(md5, str): - md5 = capa.features.bytes_to_str(md5) + md5 = capa.features.common.bytes_to_str(md5) return md5 @@ -94,7 +96,7 @@ def get_file_sha256(): """ """ sha256 = idaapi.retrieve_input_file_sha256() if not isinstance(sha256, str): - sha256 = capa.features.bytes_to_str(sha256) + sha256 = capa.features.common.bytes_to_str(sha256) return sha256 diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 3c71716e..215ec2d7 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -20,9 +20,12 @@ from PyQt5 import QtGui, QtCore, QtWidgets import capa.main import capa.rules +import capa.engine import capa.ida.helpers -import capa.render.utils as rutils -import capa.features.extractors.ida +import capa.render.json +import capa.features.common +import capa.render.result_document +import capa.features.extractors.ida.extractor from capa.ida.plugin.icon import QICON from capa.ida.plugin.view import ( CapaExplorerQtreeView, @@ -97,7 +100,7 @@ def find_func_matches(f, ruleset, func_features, bb_features): for (name, res) in matches.items(): bb_matches[name].extend(res) for (ea, _) in res: - func_features[capa.features.MatchedRule(name)].add(ea) + func_features[capa.features.common.MatchedRule(name)].add(ea) # find rule matches for function, function features include rule matches for basic blocks _, matches = capa.engine.match(ruleset.function_rules, func_features, int(f)) @@ -155,7 +158,7 @@ class CapaExplorerProgressIndicator(QtCore.QObject): self.progress.emit("extracting features from %s" % text) -class CapaExplorerFeatureExtractor(capa.features.extractors.ida.IdaFeatureExtractor): +class CapaExplorerFeatureExtractor(capa.features.extractors.ida.extractor.IdaFeatureExtractor): """subclass the IdaFeatureExtractor track progress during feature extraction, also allow user to cancel feature extraction @@ -770,7 +773,9 @@ class CapaExplorerForm(idaapi.PluginForm): update_wait_box("rendering results") try: - self.doc = capa.render.convert_capabilities_to_result_document(meta, self.ruleset_cache, capabilities) + self.doc = capa.render.result_document.convert_capabilities_to_result_document( + meta, self.ruleset_cache, capabilities + ) except Exception as e: logger.error("Failed to render results (error: %s)", e) return False @@ -865,7 +870,7 @@ class CapaExplorerForm(idaapi.PluginForm): if rule.meta.get("capa/subscope-rule"): continue for (ea, _) in res: - func_features[capa.features.MatchedRule(name)].add(ea) + func_features[capa.features.common.MatchedRule(name)].add(ea) except Exception as e: logger.error("Failed to match function/basic block rule scope (error: %s)" % e) return False @@ -899,7 +904,7 @@ class CapaExplorerForm(idaapi.PluginForm): if rule.meta.get("capa/subscope-rule"): continue for (ea, _) in res: - file_features[capa.features.MatchedRule(name)].add(ea) + file_features[capa.features.common.MatchedRule(name)].add(ea) except Exception as e: logger.error("Failed to match file scope rules (error: %s)" % e) return False @@ -1123,7 +1128,7 @@ class CapaExplorerForm(idaapi.PluginForm): idaapi.info("No program analysis to save.") return - s = json.dumps(self.doc, sort_keys=True, cls=capa.render.CapaJsonObjectEncoder).encode("utf-8") + s = json.dumps(self.doc, sort_keys=True, cls=capa.render.json.CapaJsonObjectEncoder).encode("utf-8") path = idaapi.ask_file(True, "*.json", "Choose file to save capa program analysis JSON") if not path: diff --git a/capa/ida/plugin/item.py b/capa/ida/plugin/item.py index 77db0c32..8263fff9 100644 --- a/capa/ida/plugin/item.py +++ b/capa/ida/plugin/item.py @@ -6,7 +6,6 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys import codecs import idc diff --git a/capa/ida/plugin/model.py b/capa/ida/plugin/model.py index fb8c9ceb..bd9b8430 100644 --- a/capa/ida/plugin/model.py +++ b/capa/ida/plugin/model.py @@ -13,9 +13,9 @@ import idaapi from PyQt5 import QtGui, QtCore import capa.rules -import capa.features import capa.ida.helpers import capa.render.utils as rutils +import capa.features.common from capa.ida.plugin.item import ( CapaExplorerDataItem, CapaExplorerRuleItem, @@ -494,7 +494,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): value = feature[feature["type"]] if value: if key == "string": - value = '"%s"' % capa.features.escape_string(value) + value = '"%s"' % capa.features.common.escape_string(value) if feature.get("description", ""): return "%s(%s = %s)" % (key, value, feature["description"]) else: @@ -560,7 +560,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): for s, locations in feature["matches"].items(): if location in locations: return CapaExplorerStringViewItem( - parent, display, location, '"' + capa.features.escape_string(s) + '"' + parent, display, location, '"' + capa.features.common.escape_string(s) + '"' ) # programming error: the given location should always be found in the regex matches @@ -590,7 +590,7 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): if feature["type"] in ("string",): # display string preview return CapaExplorerStringViewItem( - parent, display, location, '"%s"' % capa.features.escape_string(feature[feature["type"]]) + parent, display, location, '"%s"' % capa.features.common.escape_string(feature[feature["type"]]) ) if feature["type"] in ("import", "export", "function-name"): diff --git a/capa/ida/plugin/view.py b/capa/ida/plugin/view.py index 52a29916..2f2d2a5f 100644 --- a/capa/ida/plugin/view.py +++ b/capa/ida/plugin/view.py @@ -14,6 +14,7 @@ from PyQt5 import QtGui, QtCore, QtWidgets import capa.rules import capa.engine import capa.ida.helpers +import capa.features.common import capa.features.basicblock from capa.ida.plugin.item import CapaExplorerFunctionItem from capa.ida.plugin.model import CapaExplorerDataModel @@ -624,8 +625,8 @@ class CapaExplorerRulgenEditor(QtWidgets.QTreeWidget): # single features for (k, v) in filter(lambda t: t[1] == 1, counted): - if isinstance(k, (capa.features.String,)): - value = '"%s"' % capa.features.escape_string(k.get_value_str()) + if isinstance(k, (capa.features.common.String,)): + value = '"%s"' % capa.features.common.escape_string(k.get_value_str()) else: value = k.get_value_str() self.new_feature_node(self.root, ("- %s: %s" % (k.name.lower(), value), "")) @@ -633,8 +634,8 @@ class CapaExplorerRulgenEditor(QtWidgets.QTreeWidget): # n > 1 features for (k, v) in filter(lambda t: t[1] > 1, counted): if k.value: - if isinstance(k, (capa.features.String,)): - value = '"%s"' % capa.features.escape_string(k.get_value_str()) + if isinstance(k, (capa.features.common.String,)): + value = '"%s"' % capa.features.common.escape_string(k.get_value_str()) else: value = k.get_value_str() display = "- count(%s(%s)): %d" % (k.name.lower(), value, v) @@ -898,8 +899,8 @@ class CapaExplorerRulegenFeatures(QtWidgets.QTreeWidget): """ """ name = feature.name.lower() value = feature.get_value_str() - if isinstance(feature, (capa.features.String,)): - value = '"%s"' % capa.features.escape_string(value) + if isinstance(feature, (capa.features.common.String,)): + value = '"%s"' % capa.features.common.escape_string(value) return "%s(%s)" % (name, value) for (feature, eas) in sorted(features.items(), key=lambda k: sorted(k[1])): diff --git a/capa/main.py b/capa/main.py index 00e6679e..f12aeb29 100644 --- a/capa/main.py +++ b/capa/main.py @@ -28,10 +28,13 @@ import colorama import capa.rules import capa.engine -import capa.render import capa.version -import capa.features +import capa.render.json +import capa.render.default +import capa.render.verbose +import capa.features.common import capa.features.freeze +import capa.render.vverbose import capa.features.extractors import capa.features.extractors.pefile from capa.helpers import get_file_taste @@ -94,7 +97,7 @@ def find_function_capabilities(ruleset, extractor, f): for rule_name, res in matches.items(): bb_matches[rule_name].extend(res) for va, _ in res: - function_features[capa.features.MatchedRule(rule_name)].add(va) + function_features[capa.features.common.MatchedRule(rule_name)].add(va) _, function_matches = capa.engine.match(ruleset.function_rules, function_features, int(f)) return function_matches, bb_matches, len(function_features) @@ -169,7 +172,7 @@ def find_capabilities(ruleset, extractor, disable_progress=None): # mapping from feature (matched rule) to set of addresses at which it matched. # schema: Dict[MatchedRule: Set[int] function_and_lower_features = { - capa.features.MatchedRule(rule_name): set(map(lambda p: p[0], results)) + capa.features.common.MatchedRule(rule_name): set(map(lambda p: p[0], results)) for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items()) } @@ -407,7 +410,7 @@ def get_extractor(path, format, backend, sigpaths, disable_progress=False): from smda.SmdaConfig import SmdaConfig from smda.Disassembler import Disassembler - import capa.features.extractors.smda + import capa.features.extractors.smda.extractor smda_report = None with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): @@ -416,9 +419,9 @@ def get_extractor(path, format, backend, sigpaths, disable_progress=False): smda_disasm = Disassembler(config) smda_report = smda_disasm.disassembleFile(path) - return capa.features.extractors.smda.SmdaFeatureExtractor(smda_report, path) + return capa.features.extractors.smda.extractor.SmdaFeatureExtractor(smda_report, path) else: - import capa.features.extractors.viv + import capa.features.extractors.viv.extractor with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): if format == "auto" and path.endswith(EXTENSIONS_SHELLCODE_32): @@ -433,7 +436,7 @@ def get_extractor(path, format, backend, sigpaths, disable_progress=False): # see #168 for discussion around how to handle non-writable directories logger.info("source directory is not writable, won't save intermediate workspace") - return capa.features.extractors.viv.VivisectFeatureExtractor(vw, path) + return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path) def is_nursery_rule_path(path): @@ -835,13 +838,13 @@ def main(argv=None): return -1 if args.json: - print(capa.render.render_json(meta, rules, capabilities)) + print(capa.render.json.render(meta, rules, capabilities)) elif args.vverbose: - print(capa.render.render_vverbose(meta, rules, capabilities)) + print(capa.render.vverbose.render(meta, rules, capabilities)) elif args.verbose: - print(capa.render.render_verbose(meta, rules, capabilities)) + print(capa.render.verbose.render(meta, rules, capabilities)) else: - print(capa.render.render_default(meta, rules, capabilities)) + print(capa.render.default.render(meta, rules, capabilities)) colorama.deinit() logger.debug("done.") @@ -850,8 +853,10 @@ def main(argv=None): def ida_main(): + import capa.rules import capa.ida.helpers - import capa.features.extractors.ida + import capa.render.default + import capa.features.extractors.ida.extractor logging.basicConfig(level=logging.INFO) logging.getLogger().setLevel(logging.INFO) @@ -883,14 +888,14 @@ def ida_main(): meta = capa.ida.helpers.collect_metadata() - capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.IdaFeatureExtractor()) + capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor()) meta["analysis"].update(counts) if has_file_limitation(rules, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis") colorama.init(strip=True) - print(capa.render.render_default(meta, rules, capabilities)) + print(capa.render.default.render(meta, rules, capabilities)) def is_runtime_ida(): diff --git a/capa/render/__init__.py b/capa/render/__init__.py index 76b840ca..e69de29b 100644 --- a/capa/render/__init__.py +++ b/capa/render/__init__.py @@ -1,363 +0,0 @@ -# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: [package root]/LICENSE.txt -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under the License. - -import json - -import capa.rules -import capa.engine -import capa.render.utils - - -def convert_statement_to_result_document(statement): - """ - "statement": { - "type": "or" - }, - - "statement": { - "max": 9223372036854775808, - "min": 2, - "type": "range" - }, - """ - statement_type = statement.name.lower() - result = {"type": statement_type} - if statement.description: - result["description"] = statement.description - - if statement_type == "some" and statement.count == 0: - result["type"] = "optional" - elif statement_type == "some": - result["count"] = statement.count - elif statement_type == "range": - result["min"] = statement.min - result["max"] = statement.max - result["child"] = convert_feature_to_result_document(statement.child) - elif statement_type == "subscope": - result["subscope"] = statement.scope - - return result - - -def convert_feature_to_result_document(feature): - """ - "feature": { - "number": 6, - "type": "number" - }, - - "feature": { - "api": "ws2_32.WSASocket", - "type": "api" - }, - - "feature": { - "match": "create TCP socket", - "type": "match" - }, - - "feature": { - "characteristic": [ - "loop", - true - ], - "type": "characteristic" - }, - """ - result = {"type": feature.name, feature.name: feature.get_value_str()} - if feature.description: - result["description"] = feature.description - if feature.name == "regex": - result["matches"] = feature.matches - return result - - -def convert_node_to_result_document(node): - """ - "node": { - "type": "statement", - "statement": { ... } - }, - - "node": { - "type": "feature", - "feature": { ... } - }, - """ - - if isinstance(node, capa.engine.Statement): - return { - "type": "statement", - "statement": convert_statement_to_result_document(node), - } - elif isinstance(node, capa.features.Feature): - return { - "type": "feature", - "feature": convert_feature_to_result_document(node), - } - else: - raise RuntimeError("unexpected match node type") - - -def convert_match_to_result_document(rules, capabilities, result): - """ - convert the given Result instance into a common, Python-native data structure. - this will become part of the "result document" format that can be emitted to JSON. - """ - doc = { - "success": bool(result.success), - "node": convert_node_to_result_document(result.statement), - "children": [convert_match_to_result_document(rules, capabilities, child) for child in result.children], - } - - # logic expression, like `and`, don't have locations - their children do. - # so only add `locations` to feature nodes. - if isinstance(result.statement, capa.features.Feature): - if bool(result.success): - doc["locations"] = result.locations - elif isinstance(result.statement, capa.rules.Range): - if bool(result.success): - doc["locations"] = result.locations - - # if we have a `match` statement, then we're referencing another rule or namespace. - # this could an external rule (written by a human), or - # rule generated to support a subscope (basic block, etc.) - # we still want to include the matching logic in this tree. - # - # so, we need to lookup the other rule results - # and then filter those down to the address used here. - # finally, splice that logic into this tree. - if ( - doc["node"]["type"] == "feature" - and doc["node"]["feature"]["type"] == "match" - # only add subtree on success, - # because there won't be results for the other rule on failure. - and doc["success"] - ): - - name = doc["node"]["feature"]["match"] - - if name in rules: - # this is a rule that we're matching - # - # pull matches from the referenced rule into our tree here. - rule_name = doc["node"]["feature"]["match"] - rule = rules[rule_name] - rule_matches = {address: result for (address, result) in capabilities[rule_name]} - - if rule.meta.get("capa/subscope-rule"): - # for a subscope rule, fixup the node to be a scope node, rather than a match feature node. - # - # e.g. `contain loop/30c4c78e29bf4d54894fc74f664c62e8` -> `basic block` - scope = rule.meta["scope"] - doc["node"] = { - "type": "statement", - "statement": { - "type": "subscope", - "subscope": scope, - }, - } - - for location in doc["locations"]: - doc["children"].append(convert_match_to_result_document(rules, capabilities, rule_matches[location])) - else: - # this is a namespace that we're matching - # - # check for all rules in the namespace, - # seeing if they matched. - # if so, pull their matches into our match tree here. - ns_name = doc["node"]["feature"]["match"] - ns_rules = rules.rules_by_namespace[ns_name] - - for rule in ns_rules: - if rule.name in capabilities: - # the rule matched, so splice results into our tree here. - # - # note, there's a shortcoming in our result document schema here: - # we lose the name of the rule that matched in a namespace. - # for example, if we have a statement: `match: runtime/dotnet` - # and we get matches, we can say the following: - # - # match: runtime/dotnet @ 0x0 - # or: - # import: mscoree._CorExeMain @ 0x402000 - # - # however, we lose the fact that it was rule - # "compiled to the .NET platform" - # that contained this logic and did the match. - # - # we could introduce an intermediate node here. - # this would be a breaking change and require updates to the renderers. - # in the meantime, the above might be sufficient. - rule_matches = {address: result for (address, result) in capabilities[rule.name]} - for location in doc["locations"]: - doc["children"].append( - convert_match_to_result_document(rules, capabilities, rule_matches[location]) - ) - - return doc - - -def convert_meta_to_result_document(meta): - attacks = meta.get("att&ck", []) - meta["att&ck"] = [parse_canonical_attack(attack) for attack in attacks] - mbcs = meta.get("mbc", []) - meta["mbc"] = [parse_canonical_mbc(mbc) for mbc in mbcs] - return meta - - -def parse_canonical_attack(attack): - """ - parse capa's canonical ATT&CK representation: `Tactic::Technique::Subtechnique [Identifier]` - """ - tactic = "" - technique = "" - subtechnique = "" - parts, id = capa.render.utils.parse_parts_id(attack) - if len(parts) > 0: - tactic = parts[0] - if len(parts) > 1: - technique = parts[1] - if len(parts) > 2: - subtechnique = parts[2] - - return { - "parts": parts, - "id": id, - "tactic": tactic, - "technique": technique, - "subtechnique": subtechnique, - } - - -def parse_canonical_mbc(mbc): - """ - parse capa's canonical MBC representation: `Objective::Behavior::Method [Identifier]` - """ - objective = "" - behavior = "" - method = "" - parts, id = capa.render.utils.parse_parts_id(mbc) - if len(parts) > 0: - objective = parts[0] - if len(parts) > 1: - behavior = parts[1] - if len(parts) > 2: - method = parts[2] - - return { - "parts": parts, - "id": id, - "objective": objective, - "behavior": behavior, - "method": method, - } - - -def convert_capabilities_to_result_document(meta, rules, capabilities): - """ - convert the given rule set and capabilities result to a common, Python-native data structure. - this format can be directly emitted to JSON, or passed to the other `render_*` routines - to render as text. - - see examples of substructures in above routines. - - schema: - - ```json - { - "meta": {...}, - "rules: { - $rule-name: { - "meta": {...copied from rule.meta...}, - "matches: { - $address: {...match details...}, - ... - } - }, - ... - } - } - ``` - - Args: - meta (Dict[str, Any]): - rules (RuleSet): - capabilities (Dict[str, List[Tuple[int, Result]]]): - """ - doc = { - "meta": meta, - "rules": {}, - } - - for rule_name, matches in capabilities.items(): - rule = rules[rule_name] - - if rule.meta.get("capa/subscope-rule"): - continue - - rule_meta = convert_meta_to_result_document(rule.meta) - - doc["rules"][rule_name] = { - "meta": rule_meta, - "source": rule.definition, - "matches": { - addr: convert_match_to_result_document(rules, capabilities, match) for (addr, match) in matches - }, - } - - return doc - - -def render_vverbose(meta, rules, capabilities): - # there's an import loop here - # if capa.render imports capa.render.vverbose - # and capa.render.vverbose import capa.render (implicitly, as a submodule) - # so, defer the import until routine is called, breaking the import loop. - import capa.render.vverbose - - doc = convert_capabilities_to_result_document(meta, rules, capabilities) - return capa.render.vverbose.render_vverbose(doc) - - -def render_verbose(meta, rules, capabilities): - # break import loop - import capa.render.verbose - - doc = convert_capabilities_to_result_document(meta, rules, capabilities) - return capa.render.verbose.render_verbose(doc) - - -def render_default(meta, rules, capabilities): - # break import loop - import capa.render.default - import capa.render.verbose - - doc = convert_capabilities_to_result_document(meta, rules, capabilities) - return capa.render.default.render_default(doc) - - -class CapaJsonObjectEncoder(json.JSONEncoder): - """JSON encoder that emits Python sets as sorted lists""" - - def default(self, obj): - if isinstance(obj, (list, dict, int, float, bool, type(None))) or isinstance(obj, str): - return json.JSONEncoder.default(self, obj) - elif isinstance(obj, set): - return list(sorted(obj)) - else: - # probably will TypeError - return json.JSONEncoder.default(self, obj) - - -def render_json(meta, rules, capabilities): - return json.dumps( - convert_capabilities_to_result_document(meta, rules, capabilities), - cls=CapaJsonObjectEncoder, - sort_keys=True, - ) diff --git a/capa/render/default.py b/capa/render/default.py index 82f432d4..beed901b 100644 --- a/capa/render/default.py +++ b/capa/render/default.py @@ -11,6 +11,7 @@ import collections import tabulate import capa.render.utils as rutils +import capa.render.result_document tabulate.PRESERVE_WHITESPACE = True @@ -219,3 +220,8 @@ def render_default(doc): render_capabilities(doc, ostream) return ostream.getvalue() + + +def render(meta, rules, capabilities): + doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities) + return render_default(doc) diff --git a/capa/render/json.py b/capa/render/json.py new file mode 100644 index 00000000..0e875051 --- /dev/null +++ b/capa/render/json.py @@ -0,0 +1,31 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import json + +import capa.render.result_document + + +class CapaJsonObjectEncoder(json.JSONEncoder): + """JSON encoder that emits Python sets as sorted lists""" + + def default(self, obj): + if isinstance(obj, (list, dict, int, float, bool, type(None))) or isinstance(obj, str): + return json.JSONEncoder.default(self, obj) + elif isinstance(obj, set): + return list(sorted(obj)) + else: + # probably will TypeError + return json.JSONEncoder.default(self, obj) + + +def render(meta, rules, capabilities): + return json.dumps( + capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities), + cls=CapaJsonObjectEncoder, + sort_keys=True, + ) diff --git a/capa/render/result_document.py b/capa/render/result_document.py new file mode 100644 index 00000000..8c3703cd --- /dev/null +++ b/capa/render/result_document.py @@ -0,0 +1,312 @@ +# Copyright (C) 2020 FireEye, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import capa.rules +import capa.engine +import capa.render.utils +import capa.features.common + + +def convert_statement_to_result_document(statement): + """ + "statement": { + "type": "or" + }, + + "statement": { + "max": 9223372036854775808, + "min": 2, + "type": "range" + }, + """ + statement_type = statement.name.lower() + result = {"type": statement_type} + if statement.description: + result["description"] = statement.description + + if statement_type == "some" and statement.count == 0: + result["type"] = "optional" + elif statement_type == "some": + result["count"] = statement.count + elif statement_type == "range": + result["min"] = statement.min + result["max"] = statement.max + result["child"] = convert_feature_to_result_document(statement.child) + elif statement_type == "subscope": + result["subscope"] = statement.scope + + return result + + +def convert_feature_to_result_document(feature): + """ + "feature": { + "number": 6, + "type": "number" + }, + + "feature": { + "api": "ws2_32.WSASocket", + "type": "api" + }, + + "feature": { + "match": "create TCP socket", + "type": "match" + }, + + "feature": { + "characteristic": [ + "loop", + true + ], + "type": "characteristic" + }, + """ + result = {"type": feature.name, feature.name: feature.get_value_str()} + if feature.description: + result["description"] = feature.description + if feature.name == "regex": + result["matches"] = feature.matches + return result + + +def convert_node_to_result_document(node): + """ + "node": { + "type": "statement", + "statement": { ... } + }, + + "node": { + "type": "feature", + "feature": { ... } + }, + """ + + if isinstance(node, capa.engine.Statement): + return { + "type": "statement", + "statement": convert_statement_to_result_document(node), + } + elif isinstance(node, capa.features.common.Feature): + return { + "type": "feature", + "feature": convert_feature_to_result_document(node), + } + else: + raise RuntimeError("unexpected match node type") + + +def convert_match_to_result_document(rules, capabilities, result): + """ + convert the given Result instance into a common, Python-native data structure. + this will become part of the "result document" format that can be emitted to JSON. + """ + doc = { + "success": bool(result.success), + "node": convert_node_to_result_document(result.statement), + "children": [convert_match_to_result_document(rules, capabilities, child) for child in result.children], + } + + # logic expression, like `and`, don't have locations - their children do. + # so only add `locations` to feature nodes. + if isinstance(result.statement, capa.features.common.Feature): + if bool(result.success): + doc["locations"] = result.locations + elif isinstance(result.statement, capa.engine.Range): + if bool(result.success): + doc["locations"] = result.locations + + # if we have a `match` statement, then we're referencing another rule or namespace. + # this could an external rule (written by a human), or + # rule generated to support a subscope (basic block, etc.) + # we still want to include the matching logic in this tree. + # + # so, we need to lookup the other rule results + # and then filter those down to the address used here. + # finally, splice that logic into this tree. + if ( + doc["node"]["type"] == "feature" + and doc["node"]["feature"]["type"] == "match" + # only add subtree on success, + # because there won't be results for the other rule on failure. + and doc["success"] + ): + + name = doc["node"]["feature"]["match"] + + if name in rules: + # this is a rule that we're matching + # + # pull matches from the referenced rule into our tree here. + rule_name = doc["node"]["feature"]["match"] + rule = rules[rule_name] + rule_matches = {address: result for (address, result) in capabilities[rule_name]} + + if rule.meta.get("capa/subscope-rule"): + # for a subscope rule, fixup the node to be a scope node, rather than a match feature node. + # + # e.g. `contain loop/30c4c78e29bf4d54894fc74f664c62e8` -> `basic block` + scope = rule.meta["scope"] + doc["node"] = { + "type": "statement", + "statement": { + "type": "subscope", + "subscope": scope, + }, + } + + for location in doc["locations"]: + doc["children"].append(convert_match_to_result_document(rules, capabilities, rule_matches[location])) + else: + # this is a namespace that we're matching + # + # check for all rules in the namespace, + # seeing if they matched. + # if so, pull their matches into our match tree here. + ns_name = doc["node"]["feature"]["match"] + ns_rules = rules.rules_by_namespace[ns_name] + + for rule in ns_rules: + if rule.name in capabilities: + # the rule matched, so splice results into our tree here. + # + # note, there's a shortcoming in our result document schema here: + # we lose the name of the rule that matched in a namespace. + # for example, if we have a statement: `match: runtime/dotnet` + # and we get matches, we can say the following: + # + # match: runtime/dotnet @ 0x0 + # or: + # import: mscoree._CorExeMain @ 0x402000 + # + # however, we lose the fact that it was rule + # "compiled to the .NET platform" + # that contained this logic and did the match. + # + # we could introduce an intermediate node here. + # this would be a breaking change and require updates to the renderers. + # in the meantime, the above might be sufficient. + rule_matches = {address: result for (address, result) in capabilities[rule.name]} + for location in doc["locations"]: + doc["children"].append( + convert_match_to_result_document(rules, capabilities, rule_matches[location]) + ) + + return doc + + +def convert_meta_to_result_document(meta): + attacks = meta.get("att&ck", []) + meta["att&ck"] = [parse_canonical_attack(attack) for attack in attacks] + mbcs = meta.get("mbc", []) + meta["mbc"] = [parse_canonical_mbc(mbc) for mbc in mbcs] + return meta + + +def parse_canonical_attack(attack): + """ + parse capa's canonical ATT&CK representation: `Tactic::Technique::Subtechnique [Identifier]` + """ + tactic = "" + technique = "" + subtechnique = "" + parts, id = capa.render.utils.parse_parts_id(attack) + if len(parts) > 0: + tactic = parts[0] + if len(parts) > 1: + technique = parts[1] + if len(parts) > 2: + subtechnique = parts[2] + + return { + "parts": parts, + "id": id, + "tactic": tactic, + "technique": technique, + "subtechnique": subtechnique, + } + + +def parse_canonical_mbc(mbc): + """ + parse capa's canonical MBC representation: `Objective::Behavior::Method [Identifier]` + """ + objective = "" + behavior = "" + method = "" + parts, id = capa.render.utils.parse_parts_id(mbc) + if len(parts) > 0: + objective = parts[0] + if len(parts) > 1: + behavior = parts[1] + if len(parts) > 2: + method = parts[2] + + return { + "parts": parts, + "id": id, + "objective": objective, + "behavior": behavior, + "method": method, + } + + +def convert_capabilities_to_result_document(meta, rules, capabilities): + """ + convert the given rule set and capabilities result to a common, Python-native data structure. + this format can be directly emitted to JSON, or passed to the other `capa.render.*.render()` routines + to render as text. + + see examples of substructures in above routines. + + schema: + + ```json + { + "meta": {...}, + "rules: { + $rule-name: { + "meta": {...copied from rule.meta...}, + "matches: { + $address: {...match details...}, + ... + } + }, + ... + } + } + ``` + + Args: + meta (Dict[str, Any]): + rules (RuleSet): + capabilities (Dict[str, List[Tuple[int, Result]]]): + """ + doc = { + "meta": meta, + "rules": {}, + } + + for rule_name, matches in capabilities.items(): + rule = rules[rule_name] + + if rule.meta.get("capa/subscope-rule"): + continue + + rule_meta = convert_meta_to_result_document(rule.meta) + + doc["rules"][rule_name] = { + "meta": rule_meta, + "source": rule.definition, + "matches": { + addr: convert_match_to_result_document(rules, capabilities, match) for (addr, match) in matches + }, + } + + return doc diff --git a/capa/render/verbose.py b/capa/render/verbose.py index d07845b2..c1bb5ff1 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -26,6 +26,7 @@ import tabulate import capa.rules import capa.render.utils as rutils +import capa.render.result_document def render_meta(ostream, doc): @@ -120,3 +121,8 @@ def render_verbose(doc): ostream.write("\n") return ostream.getvalue() + + +def render(meta, rules, capabilities): + doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities) + return render_verbose(doc) diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index f14ed36c..683945b2 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -9,9 +9,10 @@ import tabulate import capa.rules -import capa.features import capa.render.utils as rutils import capa.render.verbose +import capa.features.common +import capa.render.result_document def render_locations(ostream, match): @@ -56,7 +57,7 @@ def render_statement(ostream, match, statement, indent=0): if child[child["type"]]: if child["type"] == "string": - value = '"%s"' % capa.features.escape_string(child[child["type"]]) + value = '"%s"' % capa.features.common.escape_string(child[child["type"]]) else: value = child[child["type"]] value = rutils.bold2(value) @@ -85,7 +86,7 @@ def render_statement(ostream, match, statement, indent=0): def render_string_value(s): - return '"%s"' % capa.features.escape_string(s) + return '"%s"' % capa.features.common.escape_string(s) def render_feature(ostream, match, feature, indent=0): @@ -261,3 +262,8 @@ def render_vverbose(doc): ostream.write("\n") return ostream.getvalue() + + +def render(meta, rules, capabilities): + doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities) + return render_vverbose(doc) diff --git a/capa/rules.py b/capa/rules.py index bf407439..b1c1fde3 100644 --- a/capa/rules.py +++ b/capa/rules.py @@ -12,6 +12,7 @@ import codecs import logging import binascii import functools +import collections try: from functools import lru_cache @@ -25,12 +26,13 @@ import ruamel.yaml import capa.rules import capa.engine +import capa.engine as ceng import capa.features import capa.features.file import capa.features.insn +import capa.features.common import capa.features.basicblock -from capa.engine import * -from capa.features import MAX_BYTES_FEATURE_SIZE +from capa.features.common import MAX_BYTES_FEATURE_SIZE logger = logging.getLogger(__name__) @@ -67,38 +69,38 @@ BASIC_BLOCK_SCOPE = "basic block" SUPPORTED_FEATURES = { FILE_SCOPE: { - capa.features.MatchedRule, + capa.features.common.MatchedRule, capa.features.file.Export, capa.features.file.Import, capa.features.file.Section, capa.features.file.FunctionName, - capa.features.Characteristic("embedded pe"), - capa.features.String, + capa.features.common.Characteristic("embedded pe"), + capa.features.common.String, }, FUNCTION_SCOPE: { # plus basic block scope features, see below capa.features.basicblock.BasicBlock, - capa.features.Characteristic("calls from"), - capa.features.Characteristic("calls to"), - capa.features.Characteristic("loop"), - capa.features.Characteristic("recursive call"), + capa.features.common.Characteristic("calls from"), + capa.features.common.Characteristic("calls to"), + capa.features.common.Characteristic("loop"), + capa.features.common.Characteristic("recursive call"), }, BASIC_BLOCK_SCOPE: { - capa.features.MatchedRule, + capa.features.common.MatchedRule, capa.features.insn.API, capa.features.insn.Number, - capa.features.String, - capa.features.Bytes, + capa.features.common.String, + capa.features.common.Bytes, capa.features.insn.Offset, capa.features.insn.Mnemonic, - capa.features.Characteristic("nzxor"), - capa.features.Characteristic("peb access"), - capa.features.Characteristic("fs access"), - capa.features.Characteristic("gs access"), - capa.features.Characteristic("cross section flow"), - capa.features.Characteristic("tight loop"), - capa.features.Characteristic("stack string"), - capa.features.Characteristic("indirect call"), + capa.features.common.Characteristic("nzxor"), + capa.features.common.Characteristic("peb access"), + capa.features.common.Characteristic("fs access"), + capa.features.common.Characteristic("gs access"), + capa.features.common.Characteristic("cross section flow"), + capa.features.common.Characteristic("tight loop"), + capa.features.common.Characteristic("stack string"), + capa.features.common.Characteristic("indirect call"), }, } @@ -142,8 +144,8 @@ class InvalidRuleSet(ValueError): def ensure_feature_valid_for_scope(scope, feature): - if isinstance(feature, capa.features.Characteristic): - if capa.features.Characteristic(feature.value) not in SUPPORTED_FEATURES[scope]: + if isinstance(feature, capa.features.common.Characteristic): + if capa.features.common.Characteristic(feature.value) not in SUPPORTED_FEATURES[scope]: raise InvalidRule("feature %s not support for scope %s" % (feature, scope)) elif not isinstance(feature, tuple(filter(lambda t: isinstance(t, type), SUPPORTED_FEATURES[scope]))): raise InvalidRule("feature %s not support for scope %s" % (feature, scope)) @@ -199,9 +201,9 @@ def parse_feature(key): if key == "api": return capa.features.insn.API elif key == "string": - return capa.features.StringFactory + return capa.features.common.StringFactory elif key == "bytes": - return capa.features.Bytes + return capa.features.common.Bytes elif key == "number": return capa.features.insn.Number elif key.startswith("number/"): @@ -223,7 +225,7 @@ def parse_feature(key): elif key == "basic blocks": return capa.features.basicblock.BasicBlock elif key == "characteristic": - return capa.features.Characteristic + return capa.features.common.Characteristic elif key == "export": return capa.features.file.Export elif key == "import": @@ -231,7 +233,7 @@ def parse_feature(key): elif key == "section": return capa.features.file.Section elif key == "match": - return capa.features.MatchedRule + return capa.features.common.MatchedRule elif key == "function-name": return capa.features.file.FunctionName else: @@ -264,7 +266,7 @@ def parse_description(s, value_type, description=None): if isinstance(value, str): if value_type == "bytes": try: - value = codecs.decode(value.replace(" ", ""), "hex") + value = codecs.decode(value.replace(" ", "").encode("ascii"), "hex") except binascii.Error: raise InvalidRule('unexpected bytes value: "%s", must be a valid hex sequence' % value) @@ -323,21 +325,21 @@ def build_statements(d, scope): key = list(d.keys())[0] description = pop_statement_description_entry(d[key]) if key == "and": - return And([build_statements(dd, scope) for dd in d[key]], description=description) + return ceng.And([build_statements(dd, scope) for dd in d[key]], description=description) elif key == "or": - return Or([build_statements(dd, scope) for dd in d[key]], description=description) + return ceng.Or([build_statements(dd, scope) for dd in d[key]], description=description) elif key == "not": if len(d[key]) != 1: raise InvalidRule("not statement must have exactly one child statement") - return Not(build_statements(d[key][0], scope), description=description) + return ceng.Not(build_statements(d[key][0], scope), description=description) elif key.endswith(" or more"): count = int(key[: -len("or more")]) - return Some(count, [build_statements(dd, scope) for dd in d[key]], description=description) + return ceng.Some(count, [build_statements(dd, scope) for dd in d[key]], description=description) elif key == "optional": # `optional` is an alias for `0 or more` # which is useful for documenting behaviors, # like with `write file`, we might say that `WriteFile` is optionally found alongside `CreateFileA`. - return Some(0, [build_statements(dd, scope) for dd in d[key]], description=description) + return ceng.Some(0, [build_statements(dd, scope) for dd in d[key]], description=description) elif key == "function": if scope != FILE_SCOPE: @@ -346,7 +348,7 @@ def build_statements(d, scope): if len(d[key]) != 1: raise InvalidRule("subscope must have exactly one child statement") - return Subscope(FUNCTION_SCOPE, build_statements(d[key][0], FUNCTION_SCOPE)) + return ceng.Subscope(FUNCTION_SCOPE, build_statements(d[key][0], FUNCTION_SCOPE)) elif key == "basic block": if scope != FUNCTION_SCOPE: @@ -355,7 +357,7 @@ def build_statements(d, scope): if len(d[key]) != 1: raise InvalidRule("subscope must have exactly one child statement") - return Subscope(BASIC_BLOCK_SCOPE, build_statements(d[key][0], BASIC_BLOCK_SCOPE)) + return ceng.Subscope(BASIC_BLOCK_SCOPE, build_statements(d[key][0], BASIC_BLOCK_SCOPE)) elif key.startswith("count(") and key.endswith(")"): # e.g.: @@ -396,18 +398,18 @@ def build_statements(d, scope): count = d[key] if isinstance(count, int): - return Range(feature, min=count, max=count, description=description) + return ceng.Range(feature, min=count, max=count, description=description) elif count.endswith(" or more"): min = parse_int(count[: -len(" or more")]) max = None - return Range(feature, min=min, max=max, description=description) + return ceng.Range(feature, min=min, max=max, description=description) elif count.endswith(" or fewer"): min = None max = parse_int(count[: -len(" or fewer")]) - return Range(feature, min=min, max=max, description=description) + return ceng.Range(feature, min=min, max=max, description=description) elif count.startswith("("): min, max = parse_range(count) - return Range(feature, min=min, max=max, description=description) + return ceng.Range(feature, min=min, max=max, description=description) else: raise InvalidRule("unexpected range: %s" % (count)) elif key == "string" and not isinstance(d[key], str): @@ -462,7 +464,7 @@ class Rule(object): deps = set([]) def rec(statement): - if isinstance(statement, capa.features.MatchedRule): + if isinstance(statement, capa.features.common.MatchedRule): # we're not sure at this point if the `statement.value` is # really a rule name or a namespace name (we use `MatchedRule` for both cases). # we'll give precedence to namespaces, and then assume if that does work, @@ -478,7 +480,7 @@ class Rule(object): # not a namespace, assume its a rule name. deps.add(statement.value) - elif isinstance(statement, Statement): + elif isinstance(statement, ceng.Statement): for child in statement.get_children(): rec(child) @@ -489,7 +491,7 @@ class Rule(object): return deps def _extract_subscope_rules_rec(self, statement): - if isinstance(statement, Statement): + if isinstance(statement, ceng.Statement): # for each child that is a subscope, for subscope in filter( lambda statement: isinstance(statement, capa.engine.Subscope), statement.get_children() @@ -518,7 +520,7 @@ class Rule(object): ) # update the existing statement to `match` the new rule - new_node = capa.features.MatchedRule(name) + new_node = capa.features.common.MatchedRule(name) statement.replace_child(subscope, new_node) # and yield the new rule to our caller @@ -854,6 +856,37 @@ def index_rules_by_namespace(rules): return dict(namespaces) +def topologically_order_rules(rules): + """ + order the given rules such that dependencies show up before dependents. + this means that as we match rules, we can add features for the matches, and these + will be matched by subsequent rules if they follow this order. + + assumes that the rule dependency graph is a DAG. + """ + # we evaluate `rules` multiple times, so if its a generator, realize it into a list. + rules = list(rules) + namespaces = index_rules_by_namespace(rules) + rules = {rule.name: rule for rule in rules} + seen = set([]) + ret = [] + + def rec(rule): + if rule.name in seen: + return + + for dep in rule.get_dependencies(namespaces): + rec(rules[dep]) + + ret.append(rule) + seen.add(rule.name) + + for rule in rules.values(): + rec(rule) + + return ret + + class RuleSet(object): """ a ruleset is initialized with a collection of rules, which it verifies and sorts into scopes. @@ -916,7 +949,7 @@ class RuleSet(object): continue scope_rules.update(get_rules_and_dependencies(rules, rule.name)) - return get_rules_with_scope(capa.engine.topologically_order_rules(scope_rules), scope) + return get_rules_with_scope(topologically_order_rules(scope_rules), scope) @staticmethod def _extract_subscope_rules(rules): diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 14f1d8ac..5939744f 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -55,6 +55,7 @@ Unless required by applicable law or agreed to in writing, software distributed is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import os import sys import json import logging @@ -66,7 +67,7 @@ import multiprocessing.pool import capa import capa.main import capa.rules -import capa.render +import capa.render.json logger = logging.getLogger("capa") @@ -205,7 +206,7 @@ def main(argv=None): capabilities = result["ok"]["capabilities"] # our renderer expects to emit a json document for a single sample # so we deserialize the json document, store it in a larger dict, and we'll subsequently re-encode. - results[result["path"]] = json.loads(capa.render.render_json(meta, rules, capabilities)) + results[result["path"]] = json.loads(capa.render.json.render(meta, rules, capabilities)) else: raise ValueError("unexpected status: %s" % (result["status"])) diff --git a/scripts/capa2yara.py b/scripts/capa2yara.py index 50619ddb..169b8cc4 100644 --- a/scripts/capa2yara.py +++ b/scripts/capa2yara.py @@ -31,7 +31,6 @@ See the License for the specific language governing permissions and limitations """ import re -import pdb import sys import string import logging @@ -44,8 +43,7 @@ import capa.rules import capa.engine import capa.features import capa.features.insn -from capa.features import ARCH_X32, ARCH_X64, String -from capa.features.insn import Number, Offset +from capa.features.common import ARCH_X32, ARCH_X64, String logger = logging.getLogger("capa2yara") @@ -348,9 +346,9 @@ def convert_rule(rule, rulename, cround, depth): kid = rule.child kids = [kid] num_kids = 1 - logger.info("kid: " + kids) + logger.info("kid: %s", kids) except: - logger.info("no kid in rule: " + rule.name) + logger.info("no kid in rule: %s", rule.name) # just a single statement without 'and' or 'or' before it in this rule if "kids" not in locals().keys(): @@ -366,7 +364,7 @@ def convert_rule(rule, rulename, cround, depth): else: x = 0 - logger.info("doing kids: " + repr(kids) + " - len: " + str(num_kids)) + logger.info("doing kids: %r - len: %s", kids, num_kids) for kid in kids: s_type = kid.name logger.info("doing type: " + s_type + " kidnum: " + str(x)) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 2d1b8ddf..8cc4dee8 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -6,11 +6,12 @@ import collections import capa.main import capa.rules import capa.engine -import capa.render import capa.features +import capa.render.json import capa.render.utils as rutils +import capa.render.default +import capa.render.result_document from capa.engine import * -from capa.render import convert_capabilities_to_result_document # edit this to set the path for file to analyze and rule directory RULES_PATH = "/tmp/capa/rules/" @@ -202,14 +203,14 @@ def capa_details(file_path, output_format="dictionary"): capa_output = False if output_format == "dictionary": # ...as python dictionary, simplified as textable but in dictionary - doc = convert_capabilities_to_result_document(meta, rules, capabilities) + doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities) capa_output = render_dictionary(doc) elif output_format == "json": # render results # ...as json - capa_output = json.loads(capa.render.render_json(meta, rules, capabilities)) + capa_output = json.loads(capa.render.json.render(meta, rules, capabilities)) elif output_format == "texttable": # ...as human readable text table - capa_output = capa.render.render_default(meta, rules, capabilities) + capa_output = capa.render.default.render(meta, rules, capabilities) return capa_output diff --git a/scripts/lint.py b/scripts/lint.py index 9bd27c2e..19eb93eb 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -31,8 +31,8 @@ import ruamel.yaml import capa.main import capa.rules import capa.engine -import capa.features import capa.features.insn +import capa.features.common logger = logging.getLogger("lint") @@ -326,7 +326,7 @@ class FeatureStringTooShort(Lint): def check_features(self, ctx, features): for feature in features: - if isinstance(feature, capa.features.String): + if isinstance(feature, capa.features.common.String): if len(feature.value) < 4: self.recommendation = self.recommendation.format(feature.value) return True diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index f3d73710..ad0af057 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -59,10 +59,10 @@ import colorama import capa.main import capa.rules import capa.engine -import capa.render import capa.features import capa.render.utils as rutils import capa.features.freeze +import capa.render.result_document from capa.helpers import get_file_taste logger = logging.getLogger("capa.show-capabilities-by-function") @@ -191,7 +191,7 @@ def main(argv=None): # - when not an interactive session, and disable coloring # renderers should use coloring and assume it will be stripped out if necessary. colorama.init() - doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities) + doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities) print(render_matches_by_function(doc)) colorama.deinit() diff --git a/scripts/show-features.py b/scripts/show-features.py index 5f0e5df7..182ddb42 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -153,10 +153,14 @@ def main(argv=None): def ida_main(): + import idc + + import capa.features.extractors.ida.extractor + function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START) print("getting features for current function 0x%X" % function) - extractor = capa.features.extractors.ida.IdaFeatureExtractor() + extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor() if not function: for feature, va in extractor.extract_file_features(): diff --git a/tests/fixtures.py b/tests/fixtures.py index 10b6e36a..0ff34ffa 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -19,8 +19,9 @@ import pytest import capa.main import capa.features.file import capa.features.insn +import capa.features.common import capa.features.basicblock -from capa.features import ARCH_X32, ARCH_X64 +from capa.features.common import ARCH_X32, ARCH_X64 CD = os.path.dirname(__file__) @@ -66,7 +67,8 @@ def xfail(condition, reason=None): # need to limit cache size so GitHub Actions doesn't run out of memory, see #545 @lru_cache(maxsize=1) def get_viv_extractor(path): - import capa.features.extractors.viv + import capa.main + import capa.features.extractors.viv.extractor sigpaths = [ os.path.join(CD, "..", "sigs", "test_aulldiv.pat"), @@ -82,7 +84,7 @@ def get_viv_extractor(path): vw = capa.main.get_workspace(path, "sc64", sigpaths=sigpaths) else: vw = capa.main.get_workspace(path, "auto", sigpaths=sigpaths) - extractor = capa.features.extractors.viv.VivisectFeatureExtractor(vw, path) + extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path) fixup_viv(path, extractor) return extractor @@ -101,14 +103,14 @@ def get_smda_extractor(path): from smda.SmdaConfig import SmdaConfig from smda.Disassembler import Disassembler - import capa.features.extractors.smda + import capa.features.extractors.smda.extractor config = SmdaConfig() config.STORE_BUFFER = True disasm = Disassembler(config) report = disasm.disassembleFile(path) - return capa.features.extractors.smda.SmdaFeatureExtractor(report, path) + return capa.features.extractors.smda.extractor.SmdaFeatureExtractor(report, path) @lru_cache(maxsize=1) @@ -268,34 +270,34 @@ def get_basic_block(extractor, f, va): def resolve_scope(scope): if scope == "file": - def inner(extractor): + def inner_file(extractor): return extract_file_features(extractor) - inner.__name__ = scope - return inner + inner_file.__name__ = scope + return inner_file elif "bb=" in scope: # like `function=0x401000,bb=0x40100A` fspec, _, bbspec = scope.partition(",") fva = int(fspec.partition("=")[2], 0x10) bbva = int(bbspec.partition("=")[2], 0x10) - def inner(extractor): + def inner_bb(extractor): f = get_function(extractor, fva) bb = get_basic_block(extractor, f, bbva) return extract_basic_block_features(extractor, f, bb) - inner.__name__ = scope - return inner + inner_bb.__name__ = scope + return inner_bb elif scope.startswith("function"): # like `function=0x401000` va = int(scope.partition("=")[2], 0x10) - def inner(extractor): + def inner_function(extractor): f = get_function(extractor, va) return extract_function_features(extractor, f) - inner.__name__ = scope - return inner + inner_function.__name__ = scope + return inner_function else: raise ValueError("unexpected scope fixture") @@ -324,12 +326,12 @@ def parametrize(params, values, **kwargs): FEATURE_PRESENCE_TESTS = sorted( [ # file/characteristic("embedded pe") - ("pma12-04", "file", capa.features.Characteristic("embedded pe"), True), + ("pma12-04", "file", capa.features.common.Characteristic("embedded pe"), True), # file/string - ("mimikatz", "file", capa.features.String("SCardControl"), True), - ("mimikatz", "file", capa.features.String("SCardTransmit"), True), - ("mimikatz", "file", capa.features.String("ACR > "), True), - ("mimikatz", "file", capa.features.String("nope"), False), + ("mimikatz", "file", capa.features.common.String("SCardControl"), True), + ("mimikatz", "file", capa.features.common.String("SCardTransmit"), True), + ("mimikatz", "file", capa.features.common.String("ACR > "), True), + ("mimikatz", "file", capa.features.common.String("nope"), False), # file/sections ("mimikatz", "file", capa.features.file.Section(".text"), True), ("mimikatz", "file", capa.features.file.Section(".nope"), False), @@ -353,17 +355,17 @@ FEATURE_PRESENCE_TESTS = sorted( ("mimikatz", "file", capa.features.file.Import("CryptAcquireContextW"), True), ("mimikatz", "file", capa.features.file.Import("CryptAcquireContext"), True), # function/characteristic(loop) - ("mimikatz", "function=0x401517", capa.features.Characteristic("loop"), True), - ("mimikatz", "function=0x401000", capa.features.Characteristic("loop"), False), + ("mimikatz", "function=0x401517", capa.features.common.Characteristic("loop"), True), + ("mimikatz", "function=0x401000", capa.features.common.Characteristic("loop"), False), # bb/characteristic(tight loop) - ("mimikatz", "function=0x402EC4", capa.features.Characteristic("tight loop"), True), - ("mimikatz", "function=0x401000", capa.features.Characteristic("tight loop"), False), + ("mimikatz", "function=0x402EC4", capa.features.common.Characteristic("tight loop"), True), + ("mimikatz", "function=0x401000", capa.features.common.Characteristic("tight loop"), False), # bb/characteristic(stack string) - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("stack string"), True), - ("mimikatz", "function=0x401000", capa.features.Characteristic("stack string"), False), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("stack string"), True), + ("mimikatz", "function=0x401000", capa.features.common.Characteristic("stack string"), False), # bb/characteristic(tight loop) - ("mimikatz", "function=0x402EC4,bb=0x402F8E", capa.features.Characteristic("tight loop"), True), - ("mimikatz", "function=0x401000,bb=0x401000", capa.features.Characteristic("tight loop"), False), + ("mimikatz", "function=0x402EC4,bb=0x402F8E", capa.features.common.Characteristic("tight loop"), True), + ("mimikatz", "function=0x401000,bb=0x401000", capa.features.common.Characteristic("tight loop"), False), # insn/mnemonic ("mimikatz", "function=0x40105D", capa.features.insn.Mnemonic("push"), True), ("mimikatz", "function=0x40105D", capa.features.insn.Mnemonic("movzx"), True), @@ -440,60 +442,60 @@ FEATURE_PRESENCE_TESTS = sorted( ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CloseHandle"), True), ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.WriteFile"), True), # insn/string - ("mimikatz", "function=0x40105D", capa.features.String("SCardControl"), True), - ("mimikatz", "function=0x40105D", capa.features.String("SCardTransmit"), True), - ("mimikatz", "function=0x40105D", capa.features.String("ACR > "), True), - ("mimikatz", "function=0x40105D", capa.features.String("nope"), False), - ("773290...", "function=0x140001140", capa.features.String(r"%s:\\OfficePackagesForWDAG"), True), + ("mimikatz", "function=0x40105D", capa.features.common.String("SCardControl"), True), + ("mimikatz", "function=0x40105D", capa.features.common.String("SCardTransmit"), True), + ("mimikatz", "function=0x40105D", capa.features.common.String("ACR > "), True), + ("mimikatz", "function=0x40105D", capa.features.common.String("nope"), False), + ("773290...", "function=0x140001140", capa.features.common.String(r"%s:\\OfficePackagesForWDAG"), True), # insn/regex, issue #262 - ("pma16-01", "function=0x4021B0", capa.features.Regex("HTTP/1.0"), True), - ("pma16-01", "function=0x4021B0", capa.features.Regex("www.practicalmalwareanalysis.com"), False), + ("pma16-01", "function=0x4021B0", capa.features.common.Regex("HTTP/1.0"), True), + ("pma16-01", "function=0x4021B0", capa.features.common.Regex("www.practicalmalwareanalysis.com"), False), # insn/string, pointer to string - ("mimikatz", "function=0x44EDEF", capa.features.String("INPUTEVENT"), True), + ("mimikatz", "function=0x44EDEF", capa.features.common.String("INPUTEVENT"), True), # insn/string, direct memory reference - ("mimikatz", "function=0x46D6CE", capa.features.String("(null)"), True), + ("mimikatz", "function=0x46D6CE", capa.features.common.String("(null)"), True), # insn/bytes - ("mimikatz", "function=0x40105D", capa.features.Bytes("SCardControl".encode("utf-16le")), True), - ("mimikatz", "function=0x40105D", capa.features.Bytes("SCardTransmit".encode("utf-16le")), True), - ("mimikatz", "function=0x40105D", capa.features.Bytes("ACR > ".encode("utf-16le")), True), - ("mimikatz", "function=0x40105D", capa.features.Bytes("nope".encode("ascii")), False), + ("mimikatz", "function=0x40105D", capa.features.common.Bytes("SCardControl".encode("utf-16le")), True), + ("mimikatz", "function=0x40105D", capa.features.common.Bytes("SCardTransmit".encode("utf-16le")), True), + ("mimikatz", "function=0x40105D", capa.features.common.Bytes("ACR > ".encode("utf-16le")), True), + ("mimikatz", "function=0x40105D", capa.features.common.Bytes("nope".encode("ascii")), False), # IDA features included byte sequences read from invalid memory, fixed in #409 - ("mimikatz", "function=0x44570F", capa.features.Bytes(binascii.unhexlify("FF" * 256)), False), + ("mimikatz", "function=0x44570F", capa.features.common.Bytes(binascii.unhexlify("FF" * 256)), False), # insn/bytes, pointer to bytes - ("mimikatz", "function=0x44EDEF", capa.features.Bytes("INPUTEVENT".encode("utf-16le")), True), + ("mimikatz", "function=0x44EDEF", capa.features.common.Bytes("INPUTEVENT".encode("utf-16le")), True), # insn/characteristic(nzxor) - ("mimikatz", "function=0x410DFC", capa.features.Characteristic("nzxor"), True), - ("mimikatz", "function=0x40105D", capa.features.Characteristic("nzxor"), False), + ("mimikatz", "function=0x410DFC", capa.features.common.Characteristic("nzxor"), True), + ("mimikatz", "function=0x40105D", capa.features.common.Characteristic("nzxor"), False), # insn/characteristic(nzxor): no security cookies - ("mimikatz", "function=0x46D534", capa.features.Characteristic("nzxor"), False), + ("mimikatz", "function=0x46D534", capa.features.common.Characteristic("nzxor"), False), # insn/characteristic(nzxor): xorps # viv needs fixup to recognize function, see above - ("3b13b...", "function=0x10006860", capa.features.Characteristic("nzxor"), True), + ("3b13b...", "function=0x10006860", capa.features.common.Characteristic("nzxor"), True), # insn/characteristic(peb access) - ("kernel32-64", "function=0x1800017D0", capa.features.Characteristic("peb access"), True), - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("peb access"), False), + ("kernel32-64", "function=0x1800017D0", capa.features.common.Characteristic("peb access"), True), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("peb access"), False), # insn/characteristic(gs access) - ("kernel32-64", "function=0x180001068", capa.features.Characteristic("gs access"), True), - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("gs access"), False), + ("kernel32-64", "function=0x180001068", capa.features.common.Characteristic("gs access"), True), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("gs access"), False), # insn/characteristic(cross section flow) - ("a1982...", "function=0x4014D0", capa.features.Characteristic("cross section flow"), True), + ("a1982...", "function=0x4014D0", capa.features.common.Characteristic("cross section flow"), True), # insn/characteristic(cross section flow): imports don't count - ("kernel32-64", "function=0x180001068", capa.features.Characteristic("cross section flow"), False), - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("cross section flow"), False), + ("kernel32-64", "function=0x180001068", capa.features.common.Characteristic("cross section flow"), False), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("cross section flow"), False), # insn/characteristic(recursive call) - ("mimikatz", "function=0x40640e", capa.features.Characteristic("recursive call"), True), + ("mimikatz", "function=0x40640e", capa.features.common.Characteristic("recursive call"), True), # before this we used ambiguous (0x4556E5, False), which has a data reference / indirect recursive call, see #386 - ("mimikatz", "function=0x4175FF", capa.features.Characteristic("recursive call"), False), + ("mimikatz", "function=0x4175FF", capa.features.common.Characteristic("recursive call"), False), # insn/characteristic(indirect call) - ("mimikatz", "function=0x4175FF", capa.features.Characteristic("indirect call"), True), - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("indirect call"), False), + ("mimikatz", "function=0x4175FF", capa.features.common.Characteristic("indirect call"), True), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("indirect call"), False), # insn/characteristic(calls from) - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("calls from"), True), - ("mimikatz", "function=0x4702FD", capa.features.Characteristic("calls from"), False), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("calls from"), True), + ("mimikatz", "function=0x4702FD", capa.features.common.Characteristic("calls from"), False), # function/characteristic(calls to) - ("mimikatz", "function=0x40105D", capa.features.Characteristic("calls to"), True), + ("mimikatz", "function=0x40105D", capa.features.common.Characteristic("calls to"), True), # before this we used ambiguous (0x4556E5, False), which has a data reference / indirect recursive call, see #386 - ("mimikatz", "function=0x456BB9", capa.features.Characteristic("calls to"), False), + ("mimikatz", "function=0x456BB9", capa.features.common.Characteristic("calls to"), False), # file/function-name ("pma16-01", "file", capa.features.file.FunctionName("__aulldiv"), True), ], @@ -510,10 +512,10 @@ FEATURE_PRESENCE_TESTS_IDA = [ FEATURE_COUNT_TESTS = [ ("mimikatz", "function=0x40E5C2", capa.features.basicblock.BasicBlock(), 7), - ("mimikatz", "function=0x4702FD", capa.features.Characteristic("calls from"), 0), - ("mimikatz", "function=0x40E5C2", capa.features.Characteristic("calls from"), 3), - ("mimikatz", "function=0x4556E5", capa.features.Characteristic("calls to"), 0), - ("mimikatz", "function=0x40B1F1", capa.features.Characteristic("calls to"), 3), + ("mimikatz", "function=0x4702FD", capa.features.common.Characteristic("calls from"), 0), + ("mimikatz", "function=0x40E5C2", capa.features.common.Characteristic("calls from"), 3), + ("mimikatz", "function=0x4556E5", capa.features.common.Characteristic("calls to"), 0), + ("mimikatz", "function=0x40B1F1", capa.features.common.Characteristic("calls to"), 3), ] diff --git a/tests/test_engine.py b/tests/test_engine.py index a04ddad3..642ddb49 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -10,6 +10,8 @@ import textwrap import capa.rules import capa.engine +import capa.features.insn +import capa.features.common from capa.engine import * from capa.features import * from capa.features.insn import * @@ -233,7 +235,7 @@ def test_match_adds_matched_rule_feature(): ) r = capa.rules.Rule.from_yaml(rule) features, matches = capa.engine.match([r], {capa.features.insn.Number(100): {1}}, 0x0) - assert capa.features.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("test rule") in features def test_match_matched_rules(): @@ -264,22 +266,22 @@ def test_match_matched_rules(): ] features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), + capa.rules.topologically_order_rules(rules), {capa.features.insn.Number(100): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule1") in features - assert capa.features.MatchedRule("test rule2") in features + assert capa.features.common.MatchedRule("test rule1") in features + assert capa.features.common.MatchedRule("test rule2") in features # the ordering of the rules must not matter, # the engine should match rules in an appropriate order. features, matches = capa.engine.match( - capa.engine.topologically_order_rules(reversed(rules)), + capa.rules.topologically_order_rules(reversed(rules)), {capa.features.insn.Number(100): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule1") in features - assert capa.features.MatchedRule("test rule2") in features + assert capa.features.common.MatchedRule("test rule1") in features + assert capa.features.common.MatchedRule("test rule2") in features def test_regex(): @@ -322,34 +324,34 @@ def test_regex(): ), ] features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), + capa.rules.topologically_order_rules(rules), {capa.features.insn.Number(100): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") not in features + assert capa.features.common.MatchedRule("test rule") not in features features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String("aaaa"): {1}}, + capa.rules.topologically_order_rules(rules), + {capa.features.common.String("aaaa"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") not in features + assert capa.features.common.MatchedRule("test rule") not in features features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String("aBBBBa"): {1}}, + capa.rules.topologically_order_rules(rules), + {capa.features.common.String("aBBBBa"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") not in features + assert capa.features.common.MatchedRule("test rule") not in features features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String("abbbba"): {1}}, + capa.rules.topologically_order_rules(rules), + {capa.features.common.String("abbbba"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") in features - assert capa.features.MatchedRule("rule with implied wildcards") in features - assert capa.features.MatchedRule("rule with anchor") not in features + assert capa.features.common.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("rule with implied wildcards") in features + assert capa.features.common.MatchedRule("rule with anchor") not in features def test_regex_ignorecase(): @@ -368,11 +370,11 @@ def test_regex_ignorecase(): ), ] features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String("aBBBBa"): {1}}, + capa.rules.topologically_order_rules(rules), + {capa.features.common.String("aBBBBa"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("test rule") in features def test_regex_complex(): @@ -391,11 +393,11 @@ def test_regex_complex(): ), ] features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String(r"Hardware\Key\key with spaces\some value"): {1}}, + capa.rules.topologically_order_rules(rules), + {capa.features.common.String(r"Hardware\Key\key with spaces\some value"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("test rule") in features def test_match_namespace(): @@ -449,19 +451,19 @@ def test_match_namespace(): ] features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), + capa.rules.topologically_order_rules(rules), {capa.features.insn.API("CreateFile"): {1}}, 0x0, ) assert "CreateFile API" in matches assert "file-create" in matches assert "filesystem-any" in matches - assert capa.features.MatchedRule("file") in features - assert capa.features.MatchedRule("file/create") in features - assert capa.features.MatchedRule("file/create/CreateFile") in features + assert capa.features.common.MatchedRule("file") in features + assert capa.features.common.MatchedRule("file/create") in features + assert capa.features.common.MatchedRule("file/create/CreateFile") in features features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), + capa.rules.topologically_order_rules(rules), {capa.features.insn.API("WriteFile"): {1}}, 0x0, ) @@ -472,11 +474,11 @@ def test_match_namespace(): def test_render_number(): assert str(capa.features.insn.Number(1)) == "number(0x1)" - assert str(capa.features.insn.Number(1, arch=ARCH_X32)) == "number/x32(0x1)" - assert str(capa.features.insn.Number(1, arch=ARCH_X64)) == "number/x64(0x1)" + assert str(capa.features.insn.Number(1, arch=capa.features.common.ARCH_X32)) == "number/x32(0x1)" + assert str(capa.features.insn.Number(1, arch=capa.features.common.ARCH_X64)) == "number/x64(0x1)" def test_render_offset(): assert str(capa.features.insn.Offset(1)) == "offset(0x1)" - assert str(capa.features.insn.Offset(1, arch=ARCH_X32)) == "offset/x32(0x1)" - assert str(capa.features.insn.Offset(1, arch=ARCH_X64)) == "offset/x64(0x1)" + assert str(capa.features.insn.Offset(1, arch=capa.features.common.ARCH_X32)) == "offset/x32(0x1)" + assert str(capa.features.insn.Offset(1, arch=capa.features.common.ARCH_X64)) == "offset/x64(0x1)" diff --git a/tests/test_freeze.py b/tests/test_freeze.py index 514e98f3..0b3ba949 100644 --- a/tests/test_freeze.py +++ b/tests/test_freeze.py @@ -7,37 +7,39 @@ # See the License for the specific language governing permissions and limitations under the License. import textwrap -import pytest from fixtures import * import capa.main +import capa.rules import capa.helpers -import capa.features +import capa.features.file import capa.features.insn +import capa.features.common import capa.features.freeze -import capa.features.extractors +import capa.features.basicblock +import capa.features.extractors.base_extractor -EXTRACTOR = capa.features.extractors.NullFeatureExtractor( +EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor( { "base address": 0x401000, "file features": [ - (0x402345, capa.features.Characteristic("embedded pe")), + (0x402345, capa.features.common.Characteristic("embedded pe")), ], "functions": { 0x401000: { "features": [ - (0x401000, capa.features.Characteristic("indirect call")), + (0x401000, capa.features.common.Characteristic("indirect call")), ], "basic blocks": { 0x401000: { "features": [ - (0x401000, capa.features.Characteristic("tight loop")), + (0x401000, capa.features.common.Characteristic("tight loop")), ], "instructions": { 0x401000: { "features": [ (0x401000, capa.features.insn.Mnemonic("xor")), - (0x401000, capa.features.Characteristic("nzxor")), + (0x401000, capa.features.common.Characteristic("nzxor")), ], }, 0x401002: { @@ -111,7 +113,7 @@ def compare_extractors_viv_null(viv_ext, null_ext): and NullFeatureExtractor returns ints args: - viv_ext (capa.features.extractors.viv.VivisectFeatureExtractor) + viv_ext (capa.features.extractors.viv.extractor.VivisectFeatureExtractor) null_ext (capa.features.extractors.NullFeatureExtractor) """ assert list(viv_ext.extract_file_features()) == list(null_ext.extract_file_features()) @@ -154,12 +156,12 @@ def roundtrip_feature(feature): def test_serialize_features(): roundtrip_feature(capa.features.insn.API("advapi32.CryptAcquireContextW")) - roundtrip_feature(capa.features.String("SCardControl")) + roundtrip_feature(capa.features.common.String("SCardControl")) roundtrip_feature(capa.features.insn.Number(0xFF)) roundtrip_feature(capa.features.insn.Offset(0x0)) roundtrip_feature(capa.features.insn.Mnemonic("push")) roundtrip_feature(capa.features.file.Section(".rsrc")) - roundtrip_feature(capa.features.Characteristic("tight loop")) + roundtrip_feature(capa.features.common.Characteristic("tight loop")) roundtrip_feature(capa.features.basicblock.BasicBlock()) roundtrip_feature(capa.features.file.Export("BaseThreadInitThunk")) roundtrip_feature(capa.features.file.Import("kernel32.IsWow64Process")) diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 9267164e..eab1efa4 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -13,9 +13,9 @@ from capa.features.extractors import helpers def test_all_zeros(): a = b"\x00\x00\x00\x00" - b = codecs.decode("00000000", "hex") + b = codecs.decode(b"00000000", "hex") c = b"\x01\x00\x00\x00" - d = codecs.decode("01000000", "hex") + d = codecs.decode(b"01000000", "hex") assert helpers.all_zeros(a) is True assert helpers.all_zeros(b) is True assert helpers.all_zeros(c) is False diff --git a/tests/test_ida_features.py b/tests/test_ida_features.py index 4539a1ad..a568606a 100644 --- a/tests/test_ida_features.py +++ b/tests/test_ida_features.py @@ -9,6 +9,7 @@ import pytest try: sys.path.append(os.path.dirname(__file__)) + import fixtures from fixtures import * finally: sys.path.pop() @@ -37,27 +38,27 @@ def get_ida_extractor(_path): check_input_file("5f66b82558ca92e54e77f216ef4c066c") # have to import import this inline so pytest doesn't bail outside of IDA - import capa.features.extractors.ida + import capa.features.extractors.ida.extractor - return capa.features.extractors.ida.IdaFeatureExtractor() + return capa.features.extractors.ida.extractor.IdaFeatureExtractor() @pytest.mark.skip(reason="IDA Pro tests must be run within IDA") def test_ida_features(): - for (sample, scope, feature, expected) in FEATURE_PRESENCE_TESTS + FEATURE_PRESENCE_TESTS_IDA: - id = make_test_id((sample, scope, feature, expected)) + for (sample, scope, feature, expected) in fixtures.FEATURE_PRESENCE_TESTS + fixtures.FEATURE_PRESENCE_TESTS_IDA: + id = fixtures.make_test_id((sample, scope, feature, expected)) try: - check_input_file(get_sample_md5_by_name(sample)) + check_input_file(fixtures.get_sample_md5_by_name(sample)) except RuntimeError: print("SKIP %s" % (id)) continue - scope = resolve_scope(scope) - sample = resolve_sample(sample) + scope = fixtures.resolve_scope(scope) + sample = fixtures.resolve_sample(sample) try: - do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected) except Exception as e: print("FAIL %s" % (id)) traceback.print_exc() @@ -67,20 +68,20 @@ def test_ida_features(): @pytest.mark.skip(reason="IDA Pro tests must be run within IDA") def test_ida_feature_counts(): - for (sample, scope, feature, expected) in FEATURE_COUNT_TESTS: - id = make_test_id((sample, scope, feature, expected)) + for (sample, scope, feature, expected) in fixtures.FEATURE_COUNT_TESTS: + id = fixtures.make_test_id((sample, scope, feature, expected)) try: - check_input_file(get_sample_md5_by_name(sample)) + check_input_file(fixtures.get_sample_md5_by_name(sample)) except RuntimeError: print("SKIP %s" % (id)) continue - scope = resolve_scope(scope) - sample = resolve_sample(sample) + scope = fixtures.resolve_scope(scope) + sample = fixtures.resolve_sample(sample) try: - do_test_feature_count(get_ida_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_count(get_ida_extractor, sample, scope, feature, expected) except Exception as e: print("FAIL %s" % (id)) traceback.print_exc() diff --git a/tests/test_main.py b/tests/test_main.py index d38ead5b..aa74bed3 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -9,6 +9,7 @@ import json import textwrap +import fixtures from fixtures import * import capa.main @@ -362,7 +363,7 @@ def test_not_render_rules_also_matched(z9324d_extractor, capsys): def test_backend_option(capsys): # tests that main works with different backends - path = get_data_path_by_name("pma16-01") + path = fixtures.get_data_path_by_name("pma16-01") assert capa.main.main([path, "-j", "-b", capa.main.BACKEND_VIV]) == 0 std = capsys.readouterr() std_json = json.loads(std.out) diff --git a/tests/test_pefile_features.py b/tests/test_pefile_features.py index 5c24edab..8bb46d43 100644 --- a/tests/test_pefile_features.py +++ b/tests/test_pefile_features.py @@ -5,9 +5,8 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys - import pytest +import fixtures from fixtures import * from fixtures import parametrize @@ -16,7 +15,7 @@ import capa.features.file @parametrize( "sample,scope,feature,expected", - FEATURE_PRESENCE_TESTS, + fixtures.FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"], ) def test_pefile_features(sample, scope, feature, expected): @@ -26,4 +25,4 @@ def test_pefile_features(sample, scope, feature, expected): if isinstance(feature, capa.features.file.FunctionName): pytest.xfail("pefile only doesn't extract function names") - do_test_feature_presence(get_pefile_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_presence(fixtures.get_pefile_extractor, sample, scope, feature, expected) diff --git a/tests/test_render.py b/tests/test_render.py index 10fda1e9..da89e4d6 100644 --- a/tests/test_render.py +++ b/tests/test_render.py @@ -1,8 +1,8 @@ import textwrap import capa.rules -from capa.render import convert_meta_to_result_document -from capa.render.utils import format_parts_id +import capa.render.utils +import capa.render.result_document def test_render_meta_attack(): @@ -27,7 +27,7 @@ def test_render_meta_attack(): ) ) r = capa.rules.Rule.from_yaml(rule) - rule_meta = convert_meta_to_result_document(r.meta) + rule_meta = capa.render.result_document.convert_meta_to_result_document(r.meta) attack = rule_meta["att&ck"][0] assert attack["id"] == id @@ -35,7 +35,7 @@ def test_render_meta_attack(): assert attack["technique"] == technique assert attack["subtechnique"] == subtechnique - assert format_parts_id(attack) == canonical + assert capa.render.utils.format_parts_id(attack) == canonical def test_render_meta_mbc(): @@ -60,7 +60,7 @@ def test_render_meta_mbc(): ) ) r = capa.rules.Rule.from_yaml(rule) - rule_meta = convert_meta_to_result_document(r.meta) + rule_meta = capa.render.result_document.convert_meta_to_result_document(r.meta) attack = rule_meta["mbc"][0] assert attack["id"] == id @@ -68,4 +68,4 @@ def test_render_meta_mbc(): assert attack["behavior"] == behavior assert attack["method"] == method - assert format_parts_id(attack) == canonical + assert capa.render.utils.format_parts_id(attack) == canonical diff --git a/tests/test_rules.py b/tests/test_rules.py index 33ed2d2a..12791d2e 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -12,10 +12,10 @@ import pytest import capa.rules import capa.engine -import capa.features -from capa.features import ARCH_X32, ARCH_X64, String +import capa.features.common from capa.features.file import FunctionName from capa.features.insn import Number, Offset +from capa.features.common import ARCH_X32, ARCH_X64, String def test_rule_ctor(): @@ -751,18 +751,18 @@ def test_regex_values_always_string(): ), ] features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String("123"): {1}}, + capa.rules.topologically_order_rules(rules), + {capa.features.common.String("123"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("test rule") in features features, matches = capa.engine.match( - capa.engine.topologically_order_rules(rules), - {capa.features.String("0x123"): {1}}, + capa.rules.topologically_order_rules(rules), + {capa.features.common.String("0x123"): {1}}, 0x0, ) - assert capa.features.MatchedRule("test rule") in features + assert capa.features.common.MatchedRule("test rule") in features def test_filter_rules(): diff --git a/tests/test_smda_features.py b/tests/test_smda_features.py index 9435f736..6614c24d 100644 --- a/tests/test_smda_features.py +++ b/tests/test_smda_features.py @@ -5,9 +5,8 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys - import pytest +import fixtures from fixtures import * from fixtures import parametrize @@ -16,20 +15,20 @@ import capa.features.file @parametrize( "sample,scope,feature,expected", - FEATURE_PRESENCE_TESTS, + fixtures.FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"], ) def test_smda_features(sample, scope, feature, expected): if scope.__name__ == "file" and isinstance(feature, capa.features.file.FunctionName) and expected is True: pytest.xfail("SMDA has no function ID") - do_test_feature_presence(get_smda_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_presence(fixtures.get_smda_extractor, sample, scope, feature, expected) @parametrize( "sample,scope,feature,expected", - FEATURE_COUNT_TESTS, + fixtures.FEATURE_COUNT_TESTS, indirect=["sample", "scope"], ) def test_smda_feature_counts(sample, scope, feature, expected): - do_test_feature_count(get_smda_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_count(fixtures.get_smda_extractor, sample, scope, feature, expected) diff --git a/tests/test_viv_features.py b/tests/test_viv_features.py index 3206e8cf..fa8bfda3 100644 --- a/tests/test_viv_features.py +++ b/tests/test_viv_features.py @@ -5,24 +5,23 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys - +import fixtures from fixtures import * -@parametrize( +@fixtures.parametrize( "sample,scope,feature,expected", - FEATURE_PRESENCE_TESTS, + fixtures.FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"], ) def test_viv_features(sample, scope, feature, expected): - do_test_feature_presence(get_viv_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_presence(fixtures.get_viv_extractor, sample, scope, feature, expected) -@parametrize( +@fixtures.parametrize( "sample,scope,feature,expected", - FEATURE_COUNT_TESTS, + fixtures.FEATURE_COUNT_TESTS, indirect=["sample", "scope"], ) def test_viv_feature_counts(sample, scope, feature, expected): - do_test_feature_count(get_viv_extractor, sample, scope, feature, expected) + fixtures.do_test_feature_count(fixtures.get_viv_extractor, sample, scope, feature, expected)