Merge branch 'master' of github.com:fireeye/capa into fix-507

This commit is contained in:
William Ballenthin
2021-06-15 12:28:17 -06:00
36 changed files with 506 additions and 346 deletions

67
.github/mypy/mypy.ini vendored Normal file
View File

@@ -0,0 +1,67 @@
[mypy]
[mypy-halo.*]
ignore_missing_imports = True
[mypy-tqdm.*]
ignore_missing_imports = True
[mypy-ruamel.*]
ignore_missing_imports = True
[mypy-networkx.*]
ignore_missing_imports = True
[mypy-pefile.*]
ignore_missing_imports = True
[mypy-viv_utils.*]
ignore_missing_imports = True
[mypy-flirt.*]
ignore_missing_imports = True
[mypy-smda.*]
ignore_missing_imports = True
[mypy-lief.*]
ignore_missing_imports = True
[mypy-idc.*]
ignore_missing_imports = True
[mypy-vivisect.*]
ignore_missing_imports = True
[mypy-envi.*]
ignore_missing_imports = True
[mypy-PE.*]
ignore_missing_imports = True
[mypy-idaapi.*]
ignore_missing_imports = True
[mypy-idautils.*]
ignore_missing_imports = True
[mypy-ida_bytes.*]
ignore_missing_imports = True
[mypy-ida_kernwin.*]
ignore_missing_imports = True
[mypy-ida_settings.*]
ignore_missing_imports = True
[mypy-ida_funcs.*]
ignore_missing_imports = True
[mypy-PyQt5.*]
ignore_missing_imports = True
[mypy-binaryninja.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True

View File

@@ -17,6 +17,7 @@ jobs:
run: |
number=$(grep '\- *$' CHANGELOG.md | wc -l)
if [ $number != 1 ]; then exit 1; fi
code_style:
runs-on: ubuntu-20.04
steps:
@@ -32,6 +33,8 @@ jobs:
run: isort --profile black --length-sort --line-width 120 -c .
- name: Lint with black
run: black -l 120 --check .
- name: Check types with mypy
run: mypy --config-file .github/mypy/mypy.ini capa/ scripts/ tests/
rule_linter:
runs-on: ubuntu-20.04

View File

@@ -131,6 +131,7 @@ It includes many new rules, including all new techniques introduced in MITRE ATT
- json: breaking change: correctly render negative offsets #619 @williballenthin
- library: breaking change: remove logic from `__init__.py` throughout #622 @williballenthin
- main: don't create .viv files unless CAPA_SAVE_WORKSPACE is set #507 @williballenthin
- library: add type annotations for use with mypy #447 @williballenthin
### Development

View File

@@ -8,11 +8,23 @@
import copy
import collections
from typing import Set, Dict, List, Tuple, Union, Mapping
import capa.rules
import capa.features.common
from capa.features.common import Feature
# a collection of features and the locations at which they are found.
#
# used throughout matching as the context in which features are searched:
# to check if a feature exists, do: `Number(0x10) in features`.
# to collect the locations of a feature, do: `features[Number(0x10)]`
#
# aliased here so that the type can be documented and xref'd.
FeatureSet = Dict[Feature, Set[int]]
class Statement(object):
class Statement:
"""
superclass for structural nodes, such as and/or/not.
this exists to provide a default impl for `__str__` and `__repr__`,
@@ -33,7 +45,7 @@ class Statement(object):
def __repr__(self):
return str(self)
def evaluate(self, ctx):
def evaluate(self, features: FeatureSet) -> "Result":
"""
classes that inherit `Statement` must implement `evaluate`
@@ -50,7 +62,7 @@ class Statement(object):
yield self.child
if hasattr(self, "children"):
for child in self.children:
for child in getattr(self, "children"):
yield child
def replace_child(self, existing, new):
@@ -59,12 +71,13 @@ class Statement(object):
self.child = new
if hasattr(self, "children"):
for i, child in enumerate(self.children):
children = getattr(self, "children")
for i, child in enumerate(children):
if child is existing:
self.children[i] = new
children[i] = new
class Result(object):
class Result:
"""
represents the results of an evaluation of statements against features.
@@ -78,7 +91,7 @@ class Result(object):
we need this so that we can render the tree of expressions and their results.
"""
def __init__(self, success, statement, children, locations=None):
def __init__(self, success: bool, statement: Union[Statement, Feature], children: List["Result"], locations=None):
"""
args:
success (bool)
@@ -199,7 +212,23 @@ class Subscope(Statement):
raise ValueError("cannot evaluate a subscope directly!")
def match(rules, features, va):
# mapping from rule name to list of: (location of match, result object)
#
# used throughout matching and rendering to collection the results
# of statement evaluation and their locations.
#
# to check if a rule matched, do: `"TCP client" in matches`.
# to find where a rule matched, do: `map(first, matches["TCP client"])`
# to see how a rule matched, do:
#
# for address, match_details in matches["TCP client"]:
# inspect(match_details)
#
# aliased here so that the type can be documented and xref'd.
MatchResults = Mapping[str, List[Tuple[int, Result]]]
def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tuple[FeatureSet, MatchResults]:
"""
Args:
rules (List[capa.rules.Rule]): these must already be ordered topologically by dependency.
@@ -207,11 +236,11 @@ def match(rules, features, va):
va (int): location of the features
Returns:
Tuple[List[capa.features.Feature], Dict[str, Tuple[int, capa.engine.Result]]]: two-tuple with entries:
- list of features used for matching (which may be greater than argument, due to rule match features), and
- mapping from rule name to (location of match, result object)
Tuple[FeatureSet, MatchResults]: two-tuple with entries:
- set of features used for matching (which may be greater than argument, due to rule match features), and
- mapping from rule name to [(location of match, result object)]
"""
results = collections.defaultdict(list)
results = collections.defaultdict(list) # type: MatchResults
# copy features so that we can modify it
# without affecting the caller (keep this function pure)

View File

@@ -10,9 +10,10 @@ import re
import codecs
import logging
import collections
from typing import Set, Dict, Union
import capa.engine
import capa.features.common
import capa.features
logger = logging.getLogger(__name__)
MAX_BYTES_FEATURE_SIZE = 0x100
@@ -27,16 +28,16 @@ ARCH_X64 = "x64"
VALID_ARCH = (ARCH_X32, ARCH_X64)
def bytes_to_str(b):
def bytes_to_str(b: bytes) -> str:
return str(codecs.encode(b, "hex").decode("utf-8"))
def hex_string(h):
def hex_string(h: str) -> str:
"""render hex string e.g. "0a40b1" as "0A 40 B1" """
return " ".join(h[i : i + 2] for i in range(0, len(h), 2)).upper()
def escape_string(s):
def escape_string(s: str) -> str:
"""escape special characters"""
s = repr(s)
if not s.startswith(('"', "'")):
@@ -50,8 +51,8 @@ def escape_string(s):
return s
class Feature(object):
def __init__(self, value, arch=None, description=None):
class Feature:
def __init__(self, value: Union[str, int, bytes], arch=None, description=None):
"""
Args:
value (any): the value of the feature, such as the number or string.
@@ -79,14 +80,14 @@ class Feature(object):
def __eq__(self, other):
return self.name == other.name and self.value == other.value and self.arch == other.arch
def get_value_str(self):
def get_value_str(self) -> str:
"""
render the value of this feature, for use by `__str__` and friends.
subclasses should override to customize the rendering.
Returns: any
"""
return self.value
return str(self.value)
def __str__(self):
if self.value is not None:
@@ -100,7 +101,7 @@ class Feature(object):
def __repr__(self):
return str(self)
def evaluate(self, ctx):
def evaluate(self, ctx: Dict["Feature", Set[int]]) -> "capa.engine.Result":
return capa.engine.Result(self in ctx, self, [], locations=ctx.get(self, []))
def freeze_serialize(self):
@@ -123,24 +124,26 @@ class Feature(object):
class MatchedRule(Feature):
def __init__(self, value, description=None):
def __init__(self, value: str, description=None):
super(MatchedRule, self).__init__(value, description=description)
self.name = "match"
class Characteristic(Feature):
def __init__(self, value, description=None):
def __init__(self, value: str, description=None):
super(Characteristic, self).__init__(value, description=description)
class String(Feature):
def __init__(self, value, description=None):
def __init__(self, value: str, description=None):
super(String, self).__init__(value, description=description)
class Regex(String):
def __init__(self, value, description=None):
def __init__(self, value: str, description=None):
super(Regex, self).__init__(value, description=description)
self.value = value
pat = self.value[len("/") : -len("/")]
flags = re.DOTALL
if value.endswith("/i"):
@@ -161,9 +164,13 @@ class Regex(String):
matches = collections.defaultdict(list)
for feature, locations in ctx.items():
if not isinstance(feature, (capa.features.common.String,)):
if not isinstance(feature, (String,)):
continue
if not isinstance(feature.value, str):
# this is a programming error: String should only contain str
raise ValueError("unexpected feature value type")
# `re.search` finds a match anywhere in the given string
# which implies leading and/or trailing whitespace.
# using this mode cleans is more convenient for rule authors,
@@ -202,13 +209,13 @@ class _MatchedRegex(Regex):
note: this type should only ever be constructed by `Regex.evaluate()`. it is not part of the public API.
"""
def __init__(self, regex, matches):
def __init__(self, regex: Regex, matches):
"""
args:
regex (Regex): the regex feature that matches.
match (Dict[string, List[int]]|None): mapping from matching string to its locations.
"""
super(_MatchedRegex, self).__init__(regex.value, description=regex.description)
super(_MatchedRegex, self).__init__(str(regex.value), description=regex.description)
# we want this to collide with the name of `Regex` above,
# so that it works nicely with the renderers.
self.name = "regex"
@@ -222,20 +229,21 @@ class _MatchedRegex(Regex):
)
class StringFactory(object):
def __new__(cls, value, description=None):
class StringFactory:
def __new__(cls, value: str, description=None):
if value.startswith("/") and (value.endswith("/") or value.endswith("/i")):
return Regex(value, description=description)
return String(value, description=description)
class Bytes(Feature):
def __init__(self, value, description=None):
def __init__(self, value: bytes, description=None):
super(Bytes, self).__init__(value, description=description)
self.value = value
def evaluate(self, ctx):
for feature, locations in ctx.items():
if not isinstance(feature, (capa.features.common.Bytes,)):
if not isinstance(feature, (Bytes,)):
continue
if feature.value.startswith(self.value):

View File

@@ -7,9 +7,26 @@
# See the License for the specific language governing permissions and limitations under the License.
import abc
from typing import Tuple, Iterator, SupportsInt
from capa.features.basicblock import Feature
# feature extractors may reference functions, BBs, insns by opaque handle values.
# the only requirement of these handles are that they support `__int__`,
# so that they can be rendered as addresses.
#
# these handles are only consumed by routines on
# the feature extractor from which they were created.
#
# int(FunctionHandle) -> function start address
# int(BBHandle) -> BasicBlock start address
# int(InsnHandle) -> instruction address
FunctionHandle = SupportsInt
BBHandle = SupportsInt
InsnHandle = SupportsInt
class FeatureExtractor(object):
class FeatureExtractor:
"""
FeatureExtractor defines the interface for fetching features from a sample.
@@ -36,16 +53,14 @@ class FeatureExtractor(object):
super(FeatureExtractor, self).__init__()
@abc.abstractmethod
def get_base_address(self):
def get_base_address(self) -> int:
"""
fetch the preferred load address at which the sample was analyzed.
returns: int
"""
raise NotImplemented
@abc.abstractmethod
def extract_file_features(self):
def extract_file_features(self) -> Iterator[Tuple[Feature, int]]:
"""
extract file-scope features.
@@ -56,27 +71,19 @@ class FeatureExtractor(object):
print('0x%x: %s', va, feature)
yields:
Tuple[capa.features.Feature, int]: feature and its location
Tuple[Feature, int]: feature and its location
"""
raise NotImplemented
@abc.abstractmethod
def get_functions(self):
def get_functions(self) -> Iterator[FunctionHandle]:
"""
enumerate the functions and provide opaque values that will
subsequently be provided to `.extract_function_features()`, etc.
by "opaque value", we mean that this can be any object, as long as it
provides enough context to `.extract_function_features()`.
the opaque value should support casting to int (`__int__`) for the function start address.
yields:
any: the opaque function value.
"""
raise NotImplemented
def is_library_function(self, va):
def is_library_function(self, va: int) -> bool:
"""
is the given address a library function?
the backend may implement its own function matching algorithm, or none at all.
@@ -94,7 +101,7 @@ class FeatureExtractor(object):
"""
return False
def get_function_name(self, va):
def get_function_name(self, va: int) -> str:
"""
fetch any recognized name for the given address.
this is only guaranteed to return a value when the given function is a recognized library function.
@@ -112,7 +119,7 @@ class FeatureExtractor(object):
raise KeyError(va)
@abc.abstractmethod
def extract_function_features(self, f):
def extract_function_features(self, f: FunctionHandle) -> Iterator[Tuple[Feature, int]]:
"""
extract function-scope features.
the arguments are opaque values previously provided by `.get_functions()`, etc.
@@ -125,31 +132,23 @@ class FeatureExtractor(object):
print('0x%x: %s', va, feature)
args:
f [any]: an opaque value previously fetched from `.get_functions()`.
f [FunctionHandle]: an opaque value previously fetched from `.get_functions()`.
yields:
Tuple[capa.features.Feature, int]: feature and its location
Tuple[Feature, int]: feature and its location
"""
raise NotImplemented
@abc.abstractmethod
def get_basic_blocks(self, f):
def get_basic_blocks(self, f: FunctionHandle) -> Iterator[BBHandle]:
"""
enumerate the basic blocks in the given function and provide opaque values that will
subsequently be provided to `.extract_basic_block_features()`, etc.
by "opaque value", we mean that this can be any object, as long as it
provides enough context to `.extract_basic_block_features()`.
the opaque value should support casting to int (`__int__`) for the basic block start address.
yields:
any: the opaque basic block value.
"""
raise NotImplemented
@abc.abstractmethod
def extract_basic_block_features(self, f, bb):
def extract_basic_block_features(self, f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, int]]:
"""
extract basic block-scope features.
the arguments are opaque values previously provided by `.get_functions()`, etc.
@@ -163,32 +162,24 @@ class FeatureExtractor(object):
print('0x%x: %s', va, feature)
args:
f [any]: an opaque value previously fetched from `.get_functions()`.
bb [any]: an opaque value previously fetched from `.get_basic_blocks()`.
f [FunctionHandle]: an opaque value previously fetched from `.get_functions()`.
bb [BBHandle]: an opaque value previously fetched from `.get_basic_blocks()`.
yields:
Tuple[capa.features.Feature, int]: feature and its location
Tuple[Feature, int]: feature and its location
"""
raise NotImplemented
@abc.abstractmethod
def get_instructions(self, f, bb):
def get_instructions(self, f: FunctionHandle, bb: BBHandle) -> Iterator[InsnHandle]:
"""
enumerate the instructions in the given basic block and provide opaque values that will
subsequently be provided to `.extract_insn_features()`, etc.
by "opaque value", we mean that this can be any object, as long as it
provides enough context to `.extract_insn_features()`.
the opaque value should support casting to int (`__int__`) for the instruction address.
yields:
any: the opaque function value.
"""
raise NotImplemented
@abc.abstractmethod
def extract_insn_features(self, f, bb, insn):
def extract_insn_features(self, f: FunctionHandle, bb: BBHandle, insn: InsnHandle) -> Iterator[Tuple[Feature, int]]:
"""
extract instruction-scope features.
the arguments are opaque values previously provided by `.get_functions()`, etc.
@@ -203,12 +194,12 @@ class FeatureExtractor(object):
print('0x%x: %s', va, feature)
args:
f [any]: an opaque value previously fetched from `.get_functions()`.
bb [any]: an opaque value previously fetched from `.get_basic_blocks()`.
insn [any]: an opaque value previously fetched from `.get_instructions()`.
f [FunctionHandle]: an opaque value previously fetched from `.get_functions()`.
bb [BBHandle]: an opaque value previously fetched from `.get_basic_blocks()`.
insn [InsnHandle]: an opaque value previously fetched from `.get_instructions()`.
yields:
Tuple[capa.features.Feature, int]: feature and its location
Tuple[Feature, int]: feature and its location
"""
raise NotImplemented

View File

@@ -8,15 +8,16 @@
import struct
import builtins
from typing import Tuple, Iterator
MIN_STACKSTRING_LEN = 8
def xor_static(data, i):
def xor_static(data: bytes, i: int) -> bytes:
return bytes(c ^ i for c in data)
def is_aw_function(symbol):
def is_aw_function(symbol: str) -> bool:
"""
is the given function name an A/W function?
these are variants of functions that, on Windows, accept either a narrow or wide string.
@@ -32,7 +33,7 @@ def is_aw_function(symbol):
return "a" <= symbol[-2] <= "z" or "0" <= symbol[-2] <= "9"
def is_ordinal(symbol):
def is_ordinal(symbol: str) -> bool:
"""
is the given symbol an ordinal that is prefixed by "#"?
"""
@@ -41,7 +42,7 @@ def is_ordinal(symbol):
return False
def generate_symbols(dll, symbol):
def generate_symbols(dll: str, symbol: str) -> Iterator[str]:
"""
for a given dll and symbol name, generate variants.
we over-generate features to make matching easier.
@@ -67,11 +68,11 @@ def generate_symbols(dll, symbol):
yield symbol[:-1]
def all_zeros(bytez):
def all_zeros(bytez: bytes) -> bool:
return all(b == 0 for b in builtins.bytes(bytez))
def twos_complement(val, bits):
def twos_complement(val: int, bits: int) -> int:
"""
compute the 2's complement of int value val
@@ -86,9 +87,9 @@ def twos_complement(val, bits):
return val
def carve_pe(pbytes, offset=0):
def carve_pe(pbytes: bytes, offset: int = 0) -> Iterator[Tuple[int, int]]:
"""
Return a list of (offset, size, xor) tuples of embedded PEs
Generate (offset, key) tuples of embedded PEs
Based on the version from vivisect:
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
@@ -97,20 +98,20 @@ def carve_pe(pbytes, offset=0):
"""
mz_xor = [
(
xor_static(b"MZ", i),
xor_static(b"PE", i),
i,
xor_static(b"MZ", key),
xor_static(b"PE", key),
key,
)
for i in range(256)
for key in range(256)
]
pblen = len(pbytes)
todo = [(pbytes.find(mzx, offset), mzx, pex, i) for mzx, pex, i in mz_xor]
todo = [(off, mzx, pex, i) for (off, mzx, pex, i) in todo if off != -1]
todo = [(pbytes.find(mzx, offset), mzx, pex, key) for mzx, pex, key in mz_xor]
todo = [(off, mzx, pex, key) for (off, mzx, pex, key) in todo if off != -1]
while len(todo):
off, mzx, pex, i = todo.pop()
off, mzx, pex, key = todo.pop()
# The MZ header has one field we will check
# e_lfanew is at 0x3c
@@ -118,15 +119,15 @@ def carve_pe(pbytes, offset=0):
if pblen < (e_lfanew + 4):
continue
newoff = struct.unpack("<I", xor_static(pbytes[e_lfanew : e_lfanew + 4], i))[0]
newoff = struct.unpack("<I", xor_static(pbytes[e_lfanew : e_lfanew + 4], key))[0]
nextres = pbytes.find(mzx, off + 1)
if nextres != -1:
todo.append((nextres, mzx, pex, i))
todo.append((nextres, mzx, pex, key))
peoff = off + newoff
if pblen < (peoff + 2):
continue
if pbytes[peoff : peoff + 2] == pex:
yield (off, i)
yield (off, key)

View File

@@ -15,8 +15,8 @@ import idautils
import capa.features.extractors.helpers
import capa.features.extractors.strings
import capa.features.extractors.ida.helpers
from capa.features import String, Characteristic
from capa.features.file import Export, Import, Section, FunctionName
from capa.features.common import String, Characteristic
def check_segment_for_pe(seg):

View File

@@ -136,7 +136,7 @@ FILE_HANDLERS = (
class PefileFeatureExtractor(FeatureExtractor):
def __init__(self, path):
def __init__(self, path: str):
super(PefileFeatureExtractor, self).__init__()
self.path = path
self.pe = pefile.PE(path)

View File

@@ -116,7 +116,7 @@ def extract_features(f, bb):
bb (smda.common.SmdaBasicBlock): the basic block to process.
yields:
Feature, set[VA]: the features and their location found in this basic block.
Tuple[Feature, int]: the features and their location found in this basic block.
"""
yield BasicBlock(), bb.offset
for bb_handler in BASIC_BLOCK_HANDLERS:

View File

@@ -28,7 +28,7 @@ def extract_features(f):
f (smda.common.SmdaFunction): the function from which to extract features
yields:
Feature, set[VA]: the features and their location found in this function.
Tuple[Feature, int]: the features and their location found in this function.
"""
for func_handler in FUNCTION_HANDLERS:
for feature, va in func_handler(f):

View File

@@ -370,7 +370,7 @@ def extract_features(f, bb, insn):
insn (smda.common.SmdaInstruction): the instruction to process.
yields:
Feature, set[VA]: the features and their location found in this insn.
Tuple[Feature, int]: the features and their location found in this insn.
"""
for insn_handler in INSTRUCTION_HANDLERS:
for feature, va in insn_handler(f, bb, insn):

View File

@@ -73,7 +73,7 @@ def extract_stackstring(f, bb):
yield Characteristic("stack string"), bb.va
def is_mov_imm_to_stack(instr):
def is_mov_imm_to_stack(instr: envi.archs.i386.disasm.i386Opcode) -> bool:
"""
Return if instruction moves immediate onto stack
"""
@@ -105,7 +105,7 @@ def is_mov_imm_to_stack(instr):
return True
def get_printable_len(oper):
def get_printable_len(oper: envi.archs.i386.disasm.i386ImmOper) -> int:
"""
Return string length if all operand bytes are ascii or utf16-le printable
"""
@@ -128,7 +128,7 @@ def get_printable_len(oper):
return 0
def is_printable_ascii(chars):
def is_printable_ascii(chars: bytes) -> bool:
try:
chars_str = chars.decode("ascii")
except UnicodeDecodeError:
@@ -137,9 +137,10 @@ def is_printable_ascii(chars):
return all(c in string.printable for c in chars_str)
def is_printable_utf16le(chars):
def is_printable_utf16le(chars: bytes) -> bool:
if all(c == b"\x00" for c in chars[1::2]):
return is_printable_ascii(chars[::2])
return False
def extract_features(f, bb):
@@ -151,7 +152,7 @@ def extract_features(f, bb):
bb (viv_utils.BasicBlock): the basic block to process.
yields:
Feature, set[VA]: the features and their location found in this basic block.
Tuple[Feature, int]: the features and their location found in this basic block.
"""
yield BasicBlock(), bb.va
for bb_handler in BASIC_BLOCK_HANDLERS:

View File

@@ -10,7 +10,6 @@ import logging
import viv_utils
import viv_utils.flirt
import capa.features.extractors
import capa.features.extractors.viv.file
import capa.features.extractors.viv.insn
import capa.features.extractors.viv.function

View File

@@ -50,7 +50,7 @@ def extract_file_import_names(vw, file_path):
yield Import(name), va
def is_viv_ord_impname(impname):
def is_viv_ord_impname(impname: str) -> bool:
"""
return if import name matches vivisect's ordinal naming scheme `'ord%d' % ord`
"""

View File

@@ -61,7 +61,7 @@ def extract_features(f):
f (viv_utils.Function): the function from which to extract features
yields:
Feature, set[VA]: the features and their location found in this function.
Tuple[Feature, int]: the features and their location found in this function.
"""
for func_handler in FUNCTION_HANDLERS:
for feature, va in func_handler(f):

View File

@@ -5,10 +5,13 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Optional
from vivisect import VivWorkspace
from vivisect.const import XR_TO, REF_CODE
def get_coderef_from(vw, va):
def get_coderef_from(vw: VivWorkspace, va: int) -> Optional[int]:
"""
return first code `tova` whose origin is the specified va
return None if no code reference is found

View File

@@ -7,11 +7,16 @@
# See the License for the specific language governing permissions and limitations under the License.
import collections
from typing import TYPE_CHECKING, Set, List, Deque, Tuple, Union, Optional
import envi
import vivisect.const
import envi.archs.i386.disasm
import envi.archs.amd64.disasm
from vivisect import VivWorkspace
if TYPE_CHECKING:
from capa.features.extractors.viv.extractor import InstructionHandle
# pull out consts for lookup performance
i386RegOper = envi.archs.i386.disasm.i386RegOper
@@ -26,7 +31,7 @@ FAR_BRANCH_MASK = envi.BR_PROC | envi.BR_DEREF | envi.BR_ARCH
DESTRUCTIVE_MNEMONICS = ("mov", "lea", "pop", "xor")
def get_previous_instructions(vw, va):
def get_previous_instructions(vw: VivWorkspace, va: int) -> List[int]:
"""
collect the instructions that flow to the given address, local to the current function.
@@ -43,12 +48,14 @@ def get_previous_instructions(vw, va):
# ensure that it fallsthrough to this one.
loc = vw.getPrevLocation(va, adjacent=True)
if loc is not None:
# from vivisect.const:
# location: (L_VA, L_SIZE, L_LTYPE, L_TINFO)
(pva, _, ptype, pinfo) = vw.getPrevLocation(va, adjacent=True)
ploc = vw.getPrevLocation(va, adjacent=True)
if ploc is not None:
# from vivisect.const:
# location: (L_VA, L_SIZE, L_LTYPE, L_TINFO)
(pva, _, ptype, pinfo) = ploc
if ptype == LOC_OP and not (pinfo & IF_NOFALL):
ret.append(pva)
if ptype == LOC_OP and not (pinfo & IF_NOFALL):
ret.append(pva)
# find any code refs, e.g. jmp, to this location.
# ignore any calls.
@@ -67,7 +74,7 @@ class NotFoundError(Exception):
pass
def find_definition(vw, va, reg):
def find_definition(vw: VivWorkspace, va: int, reg: int) -> Tuple[int, Union[int, None]]:
"""
scan backwards from the given address looking for assignments to the given register.
if a constant, return that value.
@@ -83,8 +90,8 @@ def find_definition(vw, va, reg):
raises:
NotFoundError: when the definition cannot be found.
"""
q = collections.deque()
seen = set([])
q = collections.deque() # type: Deque[int]
seen = set([]) # type: Set[int]
q.extend(get_previous_instructions(vw, va))
while q:
@@ -128,14 +135,16 @@ def find_definition(vw, va, reg):
raise NotFoundError()
def is_indirect_call(vw, va, insn=None):
def is_indirect_call(vw: VivWorkspace, va: int, insn: Optional["InstructionHandle"] = None) -> bool:
if insn is None:
insn = vw.parseOpcode(va)
return insn.mnem in ("call", "jmp") and isinstance(insn.opers[0], envi.archs.i386.disasm.i386RegOper)
def resolve_indirect_call(vw, va, insn=None):
def resolve_indirect_call(
vw: VivWorkspace, va: int, insn: Optional["InstructionHandle"] = None
) -> Tuple[int, Optional[int]]:
"""
inspect the given indirect call instruction and attempt to resolve the target address.

View File

@@ -15,7 +15,6 @@ import envi.archs.amd64.regs
import envi.archs.i386.disasm
import envi.archs.amd64.disasm
import capa.features.extractors.viv
import capa.features.extractors.helpers
import capa.features.extractors.viv.helpers
from capa.features.insn import API, Number, Offset, Mnemonic
@@ -225,7 +224,7 @@ def derefs(vw, p):
p = next
def read_memory(vw, va, size):
def read_memory(vw, va: int, size: int) -> bytes:
# as documented in #176, vivisect will not readMemory() when the section is not marked readable.
#
# but here, we don't care about permissions.
@@ -241,7 +240,7 @@ def read_memory(vw, va, size):
raise envi.exc.SegmentationViolation(va)
def read_bytes(vw, va):
def read_bytes(vw, va: int) -> bytes:
"""
read up to MAX_BYTES_FEATURE_SIZE from the given address.
@@ -300,7 +299,7 @@ def extract_insn_bytes_features(f, bb, insn):
yield Bytes(buf), insn.va
def read_string(vw, offset):
def read_string(vw, offset: int) -> str:
try:
alen = vw.detectString(offset)
except envi.exc.SegmentationViolation:
@@ -400,7 +399,7 @@ def extract_insn_offset_features(f, bb, insn):
yield Offset(v, arch=get_arch(f.vw)), insn.va
def is_security_cookie(f, bb, insn):
def is_security_cookie(f, bb, insn) -> bool:
"""
check if an instruction is related to security cookie checks
"""
@@ -497,7 +496,7 @@ def extract_insn_segment_access_features(f, bb, insn):
yield Characteristic("gs access"), insn.va
def get_section(vw, va):
def get_section(vw, va: int):
for start, length, _, __ in vw.getMemoryMaps():
if start <= va < start + length:
return start
@@ -608,7 +607,7 @@ def extract_features(f, bb, insn):
insn (vivisect...Instruction): the instruction to process.
yields:
Feature, set[VA]: the features and their location found in this insn.
Tuple[Feature, int]: the features and their location found in this insn.
"""
for insn_handler in INSTRUCTION_HANDLERS:
for feature, va in insn_handler(f, bb, insn):

View File

@@ -10,19 +10,19 @@ from capa.features.common import Feature
class Export(Feature):
def __init__(self, value, description=None):
def __init__(self, value: str, description=None):
# value is export name
super(Export, self).__init__(value, description=description)
class Import(Feature):
def __init__(self, value, description=None):
def __init__(self, value: str, description=None):
# value is import name
super(Import, self).__init__(value, description=description)
class Section(Feature):
def __init__(self, value, description=None):
def __init__(self, value: str, description=None):
# value is section name
super(Section, self).__init__(value, description=description)
@@ -30,7 +30,7 @@ class Section(Feature):
class FunctionName(Feature):
"""recognized name for statically linked function"""
def __init__(self, name, description=None):
def __init__(self, name: str, description=None):
# value is function name
super(FunctionName, self).__init__(name, description=description)
# override the name property set by `capa.features.Feature`

View File

@@ -228,7 +228,7 @@ def dump(extractor):
return MAGIC + zlib.compress(dumps(extractor).encode("utf-8"))
def is_freeze(buf):
def is_freeze(buf: bytes) -> bool:
return buf[: len(MAGIC)] == MAGIC

View File

@@ -11,7 +11,7 @@ from capa.features.common import Feature
class API(Feature):
def __init__(self, name, description=None):
def __init__(self, name: str, description=None):
# Downcase library name if given
if "." in name:
modname, _, impname = name.rpartition(".")
@@ -21,7 +21,7 @@ class API(Feature):
class Number(Feature):
def __init__(self, value, arch=None, description=None):
def __init__(self, value: int, arch=None, description=None):
super(Number, self).__init__(value, arch=arch, description=description)
def get_value_str(self):
@@ -29,7 +29,7 @@ class Number(Feature):
class Offset(Feature):
def __init__(self, value, arch=None, description=None):
def __init__(self, value: int, arch=None, description=None):
super(Offset, self).__init__(value, arch=arch, description=description)
def get_value_str(self):
@@ -37,5 +37,5 @@ class Offset(Feature):
class Mnemonic(Feature):
def __init__(self, value, description=None):
def __init__(self, value: str, description=None):
super(Mnemonic, self).__init__(value, description=description)

View File

@@ -15,7 +15,7 @@ def hex(i):
return _hex(int(i))
def get_file_taste(sample_path):
def get_file_taste(sample_path: str) -> bytes:
if not os.path.exists(sample_path):
raise IOError("sample path %s does not exist or cannot be accessed" % sample_path)
with open(sample_path, "rb") as f:

View File

@@ -31,7 +31,7 @@ def location_to_hex(location):
return "%08X" % location
class CapaExplorerDataItem(object):
class CapaExplorerDataItem:
"""store data for CapaExplorerDataModel"""
def __init__(self, parent, data, can_check=True):

View File

@@ -21,6 +21,7 @@ import textwrap
import itertools
import contextlib
import collections
from typing import Any, Dict, List, Tuple
import halo
import tqdm
@@ -37,7 +38,10 @@ import capa.features.freeze
import capa.render.vverbose
import capa.features.extractors
import capa.features.extractors.pefile
from capa.rules import Rule, RuleSet
from capa.engine import FeatureSet, MatchResults
from capa.helpers import get_file_taste
from capa.features.extractors.base_extractor import FunctionHandle, FeatureExtractor
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SUPPORTED_FILE_MAGIC = set([b"MZ"])
@@ -51,7 +55,7 @@ logger = logging.getLogger("capa")
@contextlib.contextmanager
def timing(msg):
def timing(msg: str):
t0 = time.time()
yield
t1 = time.time()
@@ -67,12 +71,12 @@ def set_vivisect_log_level(level):
logging.getLogger("envi.codeflow").setLevel(level)
def find_function_capabilities(ruleset, extractor, f):
def find_function_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle):
# contains features from:
# - insns
# - function
function_features = collections.defaultdict(set)
bb_matches = collections.defaultdict(list)
function_features = collections.defaultdict(set) # type: FeatureSet
bb_matches = collections.defaultdict(list) # type: MatchResults
for feature, va in extractor.extract_function_features(f):
function_features[feature].add(va)
@@ -103,8 +107,8 @@ def find_function_capabilities(ruleset, extractor, f):
return function_matches, bb_matches, len(function_features)
def find_file_capabilities(ruleset, extractor, function_features):
file_features = collections.defaultdict(set)
def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
file_features = collections.defaultdict(set) # type: FeatureSet
for feature, va in extractor.extract_file_features():
# not all file features may have virtual addresses.
@@ -124,9 +128,9 @@ def find_file_capabilities(ruleset, extractor, function_features):
return matches, len(file_features)
def find_capabilities(ruleset, extractor, disable_progress=None):
all_function_matches = collections.defaultdict(list)
all_bb_matches = collections.defaultdict(list)
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None) -> Tuple[MatchResults, Any]:
all_function_matches = collections.defaultdict(list) # type: MatchResults
all_bb_matches = collections.defaultdict(list) # type: MatchResults
meta = {
"feature_counts": {
@@ -134,7 +138,7 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
"functions": {},
},
"library_functions": {},
}
} # type: Dict[str, Any]
pbar = tqdm.tqdm
if disable_progress:
@@ -170,19 +174,25 @@ def find_capabilities(ruleset, extractor, disable_progress=None):
# collection of features that captures the rule matches within function and BB scopes.
# mapping from feature (matched rule) to set of addresses at which it matched.
# schema: Dict[MatchedRule: Set[int]
function_and_lower_features = {
capa.features.common.MatchedRule(rule_name): set(map(lambda p: p[0], results))
for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items())
}
} # type: FeatureSet
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
meta["feature_counts"]["file"] = feature_count
matches = {}
matches.update(all_bb_matches)
matches.update(all_function_matches)
matches.update(all_file_matches)
matches = {
rule_name: results
for rule_name, results in itertools.chain(
# each rule exists in exactly one scope,
# so there won't be any overlap among these following MatchResults,
# and we can merge the dictionaries naively.
all_bb_matches.items(),
all_function_matches.items(),
all_file_matches.items(),
)
}
return matches, meta
@@ -194,15 +204,15 @@ def has_rule_with_namespace(rules, capabilities, rule_cat):
return False
def is_internal_rule(rule):
def is_internal_rule(rule: Rule) -> bool:
return rule.meta.get("namespace", "").startswith("internal/")
def is_file_limitation_rule(rule):
def is_file_limitation_rule(rule: Rule) -> bool:
return rule.meta.get("namespace", "") == "internal/limitation/file"
def has_file_limitation(rules, capabilities, is_standalone=True):
def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool:
file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values()))
for file_limitation_rule in file_limitation_rules:
@@ -224,7 +234,7 @@ def has_file_limitation(rules, capabilities, is_standalone=True):
return False
def is_supported_file_type(sample):
def is_supported_file_type(sample: str) -> bool:
"""
Return if this is a supported file based on magic header values
"""
@@ -329,15 +339,35 @@ def register_flirt_signature_analyzers(vw, sigpaths):
viv_utils.flirt.addFlirtFunctionAnalyzer(vw, analyzer)
def get_default_signatures():
if hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS"):
logger.debug("detected running under PyInstaller")
sigs_path = os.path.join(sys._MEIPASS, "sigs")
logger.debug("default signatures path (PyInstaller method): %s", sigs_path)
def is_running_standalone() -> bool:
"""
are we running from a PyInstaller'd executable?
if so, then we'll be able to access `sys._MEIPASS` for the packaged resources.
"""
return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")
def get_default_root() -> str:
"""
get the file system path to the default resources directory.
under PyInstaller, this comes from _MEIPASS.
under source, this is the root directory of the project.
"""
if is_running_standalone():
# pylance/mypy don't like `sys._MEIPASS` because this isn't standard.
# its injected by pyinstaller.
# so we'll fetch this attribute dynamically.
return getattr(sys, "_MEIPASS")
else:
logger.debug("detected running from source")
sigs_path = os.path.join(os.path.dirname(__file__), "..", "sigs")
logger.debug("default signatures path (source method): %s", sigs_path)
return os.path.join(os.path.dirname(__file__), "..")
def get_default_signatures() -> List[str]:
"""
compute a list of file system paths to the default FLIRT signatures.
"""
sigs_path = os.path.join(get_default_root(), "sigs")
logger.debug("signatures path: %s", sigs_path)
ret = []
for root, dirs, files in os.walk(sigs_path):
@@ -401,7 +431,9 @@ class UnsupportedRuntimeError(RuntimeError):
pass
def get_extractor(path, format, backend, sigpaths, should_save_workspace, disable_progress=False):
def get_extractor(
path: str, format: str, backend: str, sigpaths: List[str], should_save_workspace, disable_progress=False
) -> FeatureExtractor:
"""
raises:
UnsupportedFormatError:
@@ -443,7 +475,7 @@ def get_extractor(path, format, backend, sigpaths, should_save_workspace, disabl
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path)
def is_nursery_rule_path(path):
def is_nursery_rule_path(path: str) -> bool:
"""
The nursery is a spot for rules that have not yet been fully polished.
For example, they may not have references to public example of a technique.
@@ -456,7 +488,7 @@ def is_nursery_rule_path(path):
return "nursery" in path
def get_rules(rule_path, disable_progress=False):
def get_rules(rule_path: str, disable_progress=False) -> List[Rule]:
if not os.path.exists(rule_path):
raise IOError("rule path %s does not exist or cannot be accessed" % rule_path)
@@ -483,7 +515,7 @@ def get_rules(rule_path, disable_progress=False):
rule_path = os.path.join(root, file)
rule_paths.append(rule_path)
rules = []
rules = [] # type: List[Rule]
pbar = tqdm.tqdm
if disable_progress:
@@ -753,14 +785,8 @@ def main(argv=None):
logger.debug(" https://github.com/fireeye/capa-rules")
logger.debug("-" * 80)
if hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS"):
logger.debug("detected running under PyInstaller")
rules_path = os.path.join(sys._MEIPASS, "rules")
logger.debug("default rule path (PyInstaller method): %s", rules_path)
else:
logger.debug("detected running from source")
rules_path = os.path.join(os.path.dirname(__file__), "..", "rules")
logger.debug("default rule path (source method): %s", rules_path)
rules_path = os.path.join(get_default_root(), "rules")
logger.debug("rule path: %s", rules_path)
if not os.path.exists(rules_path):
# when a users installs capa via pip,
@@ -880,15 +906,8 @@ def ida_main():
logger.debug(" https://github.com/fireeye/capa-rules")
logger.debug("-" * 80)
if hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS"):
logger.debug("detected running under PyInstaller")
rules_path = os.path.join(sys._MEIPASS, "rules")
logger.debug("default rule path (PyInstaller method): %s", rules_path)
else:
logger.debug("detected running from source")
rules_path = os.path.join(os.path.dirname(__file__), "..", "rules")
logger.debug("default rule path (source method): %s", rules_path)
rules_path = os.path.join(get_default_root(), "rules")
logger.debug("rule path: %s", rules_path)
rules = get_rules(rules_path)
rules = capa.rules.RuleSet(rules)

View File

@@ -7,16 +7,20 @@
# See the License for the specific language governing permissions and limitations under the License.
import collections
from typing import Dict, List
import tabulate
import capa.render.utils as rutils
import capa.render.result_document
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.render.utils import StringIO
tabulate.PRESERVE_WHITESPACE = True
def width(s, character_count):
def width(s: str, character_count: int) -> str:
"""pad the given string to at least `character_count`"""
if len(s) < character_count:
return s + " " * (character_count - len(s))
@@ -24,7 +28,7 @@ def width(s, character_count):
return s
def render_meta(doc, ostream):
def render_meta(doc, ostream: StringIO):
rows = [
(width("md5", 22), width(doc["meta"]["sample"]["md5"], 82)),
("sha1", doc["meta"]["sample"]["sha1"]),
@@ -64,7 +68,7 @@ def find_subrule_matches(doc):
return matches
def render_capabilities(doc, ostream):
def render_capabilities(doc, ostream: StringIO):
"""
example::
@@ -102,7 +106,7 @@ def render_capabilities(doc, ostream):
ostream.writeln(rutils.bold("no capabilities found"))
def render_attack(doc, ostream):
def render_attack(doc, ostream: StringIO):
"""
example::
@@ -124,23 +128,16 @@ def render_attack(doc, ostream):
continue
for attack in rule["meta"]["att&ck"]:
if attack.get("subtechnique"):
tactics[attack["tactic"]].add((attack["technique"], attack["subtechnique"], attack["id"]))
else:
tactics[attack["tactic"]].add((attack["technique"], attack["id"]))
tactics[attack["tactic"]].add((attack["technique"], attack.get("subtechnique"), attack["id"]))
rows = []
for tactic, techniques in sorted(tactics.items()):
inner_rows = []
for spec in sorted(techniques):
if len(spec) == 2:
technique, id = spec
for (technique, subtechnique, id) in sorted(techniques):
if subtechnique is None:
inner_rows.append("%s %s" % (rutils.bold(technique), id))
elif len(spec) == 3:
technique, subtechnique, id = spec
inner_rows.append("%s::%s %s" % (rutils.bold(technique), subtechnique, id))
else:
raise RuntimeError("unexpected ATT&CK spec format")
inner_rows.append("%s::%s %s" % (rutils.bold(technique), subtechnique, id))
rows.append(
(
rutils.bold(tactic.upper()),
@@ -157,7 +154,7 @@ def render_attack(doc, ostream):
ostream.write("\n")
def render_mbc(doc, ostream):
def render_mbc(doc, ostream: StringIO):
"""
example::
@@ -177,23 +174,16 @@ def render_mbc(doc, ostream):
continue
for mbc in rule["meta"]["mbc"]:
if mbc.get("method"):
objectives[mbc["objective"]].add((mbc["behavior"], mbc["method"], mbc["id"]))
else:
objectives[mbc["objective"]].add((mbc["behavior"], mbc["id"]))
objectives[mbc["objective"]].add((mbc["behavior"], mbc.get("method"), mbc["id"]))
rows = []
for objective, behaviors in sorted(objectives.items()):
inner_rows = []
for spec in sorted(behaviors):
if len(spec) == 2:
behavior, id = spec
for (behavior, method, id) in sorted(behaviors):
if method is None:
inner_rows.append("%s [%s]" % (rutils.bold(behavior), id))
elif len(spec) == 3:
behavior, method, id = spec
inner_rows.append("%s::%s [%s]" % (rutils.bold(behavior), method, id))
else:
raise RuntimeError("unexpected MBC spec format")
inner_rows.append("%s::%s [%s]" % (rutils.bold(behavior), method, id))
rows.append(
(
rutils.bold(objective.upper()),
@@ -222,6 +212,6 @@ def render_default(doc):
return ostream.getvalue()
def render(meta, rules, capabilities):
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
return render_default(doc)

View File

@@ -8,6 +8,8 @@
import json
import capa.render.result_document
from capa.rules import RuleSet
from capa.engine import MatchResults
class CapaJsonObjectEncoder(json.JSONEncoder):
@@ -23,7 +25,7 @@ class CapaJsonObjectEncoder(json.JSONEncoder):
return json.JSONEncoder.default(self, obj)
def render(meta, rules, capabilities):
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
return json.dumps(
capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities),
cls=CapaJsonObjectEncoder,

View File

@@ -5,10 +5,14 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Dict, List, Tuple
import capa.rules
import capa.engine
import capa.render.utils
import capa.features.common
from capa.rules import RuleSet
from capa.engine import MatchResults
def convert_statement_to_result_document(statement):
@@ -209,7 +213,7 @@ def convert_meta_to_result_document(meta):
return meta
def parse_canonical_attack(attack):
def parse_canonical_attack(attack: str):
"""
parse capa's canonical ATT&CK representation: `Tactic::Technique::Subtechnique [Identifier]`
"""
@@ -233,7 +237,7 @@ def parse_canonical_attack(attack):
}
def parse_canonical_mbc(mbc):
def parse_canonical_mbc(mbc: str):
"""
parse capa's canonical MBC representation: `Objective::Behavior::Method [Identifier]`
"""
@@ -257,7 +261,7 @@ def parse_canonical_mbc(mbc):
}
def convert_capabilities_to_result_document(meta, rules, capabilities):
def convert_capabilities_to_result_document(meta, rules: RuleSet, capabilities: MatchResults):
"""
convert the given rule set and capabilities result to a common, Python-native data structure.
this format can be directly emitted to JSON, or passed to the other `capa.render.*.render()` routines

View File

@@ -11,17 +11,17 @@ import io
import termcolor
def bold(s):
def bold(s: str) -> str:
"""draw attention to the given string"""
return termcolor.colored(s, "blue")
def bold2(s):
def bold2(s: str) -> str:
"""draw attention to the given string, within a `bold` section"""
return termcolor.colored(s, "green")
def hex(n):
def hex(n: int) -> str:
"""render the given number using upper case hex, like: 0x123ABC"""
if n < 0:
return "-0x%X" % (-n)
@@ -29,7 +29,7 @@ def hex(n):
return "0x%X" % n
def parse_parts_id(s):
def parse_parts_id(s: str):
id = ""
parts = s.split("::")
if len(parts) > 0:

View File

@@ -27,6 +27,8 @@ import tabulate
import capa.rules
import capa.render.utils as rutils
import capa.render.result_document
from capa.rules import RuleSet
from capa.engine import MatchResults
def render_meta(ostream, doc):
@@ -123,6 +125,6 @@ def render_verbose(doc):
return ostream.getvalue()
def render(meta, rules, capabilities):
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
return render_verbose(doc)

View File

@@ -13,6 +13,8 @@ import capa.render.utils as rutils
import capa.render.verbose
import capa.features.common
import capa.render.result_document
from capa.rules import RuleSet
from capa.engine import MatchResults
def render_locations(ostream, match):
@@ -264,6 +266,6 @@ def render_vverbose(doc):
return ostream.getvalue()
def render(meta, rules, capabilities):
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
doc = capa.render.result_document.convert_capabilities_to_result_document(meta, rules, capabilities)
return render_vverbose(doc)

View File

@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import re
import uuid
import codecs
@@ -17,22 +18,23 @@ import collections
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
# need to type ignore this due to mypy bug here (duplicate name):
# https://github.com/python/mypy/issues/1153
from backports.functools_lru_cache import lru_cache # type: ignore
import io
from typing import Any, Set, Dict, List, Union, Iterator
import yaml
import ruamel.yaml
import capa.rules
import capa.engine
import capa.engine as ceng
import capa.features
import capa.features.file
import capa.features.insn
import capa.features.common
import capa.features.basicblock
from capa.features.common import MAX_BYTES_FEATURE_SIZE
from capa.engine import Statement, FeatureSet
from capa.features.common import MAX_BYTES_FEATURE_SIZE, Feature
logger = logging.getLogger(__name__)
@@ -143,22 +145,32 @@ class InvalidRuleSet(ValueError):
return str(self)
def ensure_feature_valid_for_scope(scope, feature):
if isinstance(feature, capa.features.common.Characteristic):
if capa.features.common.Characteristic(feature.value) not in SUPPORTED_FEATURES[scope]:
raise InvalidRule("feature %s not support for scope %s" % (feature, scope))
elif not isinstance(feature, tuple(filter(lambda t: isinstance(t, type), SUPPORTED_FEATURES[scope]))):
def ensure_feature_valid_for_scope(scope: str, feature: Union[Feature, Statement]):
# if the given feature is a characteristic,
# check that is a valid characteristic for the given scope.
if (
isinstance(feature, capa.features.common.Characteristic)
and isinstance(feature.value, str)
and capa.features.common.Characteristic(feature.value) not in SUPPORTED_FEATURES[scope]
):
raise InvalidRule("feature %s not support for scope %s" % (feature, scope))
if not isinstance(feature, capa.features.common.Characteristic):
# features of this scope that are not Characteristics will be Type instances.
# check that the given feature is one of these types.
types_for_scope = filter(lambda t: isinstance(t, type), SUPPORTED_FEATURES[scope])
if not isinstance(feature, tuple(types_for_scope)): # type: ignore
raise InvalidRule("feature %s not support for scope %s" % (feature, scope))
def parse_int(s):
def parse_int(s: str) -> int:
if s.startswith("0x"):
return int(s, 0x10)
else:
return int(s, 10)
def parse_range(s):
def parse_range(s: str):
"""
parse a string "(0, 1)" into a range (min, max).
min and/or max may by None to indicate an unbound range.
@@ -171,23 +183,21 @@ def parse_range(s):
raise InvalidRule("invalid range: %s" % (s))
s = s[len("(") : -len(")")]
min, _, max = s.partition(",")
min = min.strip()
max = max.strip()
min_spec, _, max_spec = s.partition(",")
min_spec = min_spec.strip()
max_spec = max_spec.strip()
if min:
min = parse_int(min.strip())
min = None
if min_spec:
min = parse_int(min_spec)
if min < 0:
raise InvalidRule("range min less than zero")
else:
min = None
if max:
max = parse_int(max.strip())
max = None
if max_spec:
max = parse_int(max_spec)
if max < 0:
raise InvalidRule("range max less than zero")
else:
max = None
if min is not None and max is not None:
if max < min:
@@ -196,7 +206,7 @@ def parse_range(s):
return min, max
def parse_feature(key):
def parse_feature(key: str):
# keep this in sync with supported features
if key == "api":
return capa.features.insn.API
@@ -247,38 +257,70 @@ def parse_feature(key):
DESCRIPTION_SEPARATOR = " = "
def parse_description(s, value_type, description=None):
"""
s can be an int or a string
"""
if value_type != "string" and isinstance(s, str) and DESCRIPTION_SEPARATOR in s:
if description:
raise InvalidRule(
'unexpected value: "%s", only one description allowed (inline description with `%s`)'
% (s, DESCRIPTION_SEPARATOR)
)
value, _, description = s.partition(DESCRIPTION_SEPARATOR)
if description == "":
raise InvalidRule('unexpected value: "%s", description cannot be empty' % s)
else:
def parse_bytes(s: str) -> bytes:
try:
b = codecs.decode(s.replace(" ", "").encode("ascii"), "hex")
except binascii.Error:
raise InvalidRule('unexpected bytes value: must be a valid hex sequence: "%s"' % s)
if len(b) > MAX_BYTES_FEATURE_SIZE:
raise InvalidRule(
"unexpected bytes value: byte sequences must be no larger than %s bytes" % MAX_BYTES_FEATURE_SIZE
)
return b
def parse_description(s: Union[str, int, bytes], value_type: str, description=None):
if value_type == "string":
# string features cannot have inline descriptions,
# so we assume the entire value is the string,
# like: `string: foo = bar` -> "foo = bar"
value = s
else:
# other features can have inline descriptions, like `number: 10 = CONST_FOO`.
# in this case, the RHS will be like `10 = CONST_FOO` or some other string
if isinstance(s, str):
if DESCRIPTION_SEPARATOR in s:
if description:
# there is already a description passed in as a sub node, like:
#
# - number: 10 = CONST_FOO
# description: CONST_FOO
raise InvalidRule(
'unexpected value: "%s", only one description allowed (inline description with `%s`)'
% (s, DESCRIPTION_SEPARATOR)
)
if isinstance(value, str):
if value_type == "bytes":
try:
value = codecs.decode(value.replace(" ", "").encode("ascii"), "hex")
except binascii.Error:
raise InvalidRule('unexpected bytes value: "%s", must be a valid hex sequence' % value)
value, _, description = s.partition(DESCRIPTION_SEPARATOR)
if description == "":
# sanity check:
# there is an empty description, like `number: 10 =`
raise InvalidRule('unexpected value: "%s", description cannot be empty' % s)
else:
# this is a string, but there is no description,
# like: `api: CreateFileA`
value = s
if len(value) > MAX_BYTES_FEATURE_SIZE:
raise InvalidRule(
"unexpected bytes value: byte sequences must be no larger than %s bytes" % MAX_BYTES_FEATURE_SIZE
)
elif value_type in ("number", "offset") or value_type.startswith(("number/", "offset/")):
try:
value = parse_int(value)
except ValueError:
raise InvalidRule('unexpected value: "%s", must begin with numerical value' % value)
# cast from the received string value to the appropriate type.
#
# without a description, this type would already be correct,
# but since we parsed the description from a string,
# we need to convert the value to the expected type.
#
# for example, from `number: 10 = CONST_FOO` we have
# the string "10" that needs to become the number 10.
if value_type == "bytes":
value = parse_bytes(value)
elif value_type in ("number", "offset") or value_type.startswith(("number/", "offset/")):
try:
value = parse_int(value)
except ValueError:
raise InvalidRule('unexpected value: "%s", must begin with numerical value' % value)
else:
# the value might be a number, like: `number: 10`
value = s
return value, description
@@ -318,7 +360,7 @@ def pop_statement_description_entry(d):
return description["description"]
def build_statements(d, scope):
def build_statements(d, scope: str):
if len(d.keys()) > 2:
raise InvalidRule("too many statements")
@@ -425,16 +467,16 @@ def build_statements(d, scope):
return feature
def first(s):
def first(s: List[Any]) -> Any:
return s[0]
def second(s):
def second(s: List[Any]) -> Any:
return s[1]
class Rule(object):
def __init__(self, name, scope, statement, meta, definition=""):
class Rule:
def __init__(self, name: str, scope: str, statement: Statement, meta, definition=""):
super(Rule, self).__init__()
self.name = name
self.scope = scope
@@ -493,9 +535,7 @@ class Rule(object):
def _extract_subscope_rules_rec(self, statement):
if isinstance(statement, ceng.Statement):
# for each child that is a subscope,
for subscope in filter(
lambda statement: isinstance(statement, capa.engine.Subscope), statement.get_children()
):
for subscope in filter(lambda statement: isinstance(statement, ceng.Subscope), statement.get_children()):
# create a new rule from it.
# the name is a randomly generated, hopefully unique value.
@@ -557,7 +597,7 @@ class Rule(object):
for new_rule in self._extract_subscope_rules_rec(self.statement):
yield new_rule
def evaluate(self, features):
def evaluate(self, features: FeatureSet):
return self.statement.evaluate(features)
@classmethod
@@ -574,7 +614,7 @@ class Rule(object):
if len(statements) != 1:
raise InvalidRule("rule must begin with a single top level statement")
if isinstance(statements[0], capa.engine.Subscope):
if isinstance(statements[0], ceng.Subscope):
raise InvalidRule("top level statement may not be a subscope")
if scope not in SUPPORTED_FEATURES.keys():
@@ -754,50 +794,37 @@ class Rule(object):
return doc
def get_rules_with_scope(rules, scope):
def get_rules_with_scope(rules, scope) -> List[Rule]:
"""
from the given collection of rules, select those with the given scope.
args:
rules (List[capa.rules.Rule]):
scope (str): one of the capa.rules.*_SCOPE constants.
returns:
List[capa.rules.Rule]:
`scope` is one of the capa.rules.*_SCOPE constants.
"""
return list(rule for rule in rules if rule.scope == scope)
def get_rules_and_dependencies(rules, rule_name):
def get_rules_and_dependencies(rules: List[Rule], rule_name: str) -> Iterator[Rule]:
"""
from the given collection of rules, select a rule and its dependencies (transitively).
args:
rules (List[Rule]):
rule_name (str):
yields:
Rule:
"""
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules}
rules_by_name = {rule.name: rule for rule in rules}
wanted = set([rule_name])
def rec(rule):
wanted.add(rule.name)
for dep in rule.get_dependencies(namespaces):
rec(rules[dep])
rec(rules_by_name[dep])
rec(rules[rule_name])
rec(rules_by_name[rule_name])
for rule in rules.values():
for rule in rules_by_name.values():
if rule.name in wanted:
yield rule
def ensure_rules_are_unique(rules):
def ensure_rules_are_unique(rules: List[Rule]) -> None:
seen = set([])
for rule in rules:
if rule.name in seen:
@@ -805,7 +832,7 @@ def ensure_rules_are_unique(rules):
seen.add(rule.name)
def ensure_rule_dependencies_are_met(rules):
def ensure_rule_dependencies_are_met(rules: List[Rule]) -> None:
"""
raise an exception if a rule dependency does not exist.
@@ -815,14 +842,14 @@ def ensure_rule_dependencies_are_met(rules):
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules}
for rule in rules.values():
rules_by_name = {rule.name: rule for rule in rules}
for rule in rules_by_name.values():
for dep in rule.get_dependencies(namespaces):
if dep not in rules:
if dep not in rules_by_name:
raise InvalidRule('rule "%s" depends on missing rule "%s"' % (rule.name, dep))
def index_rules_by_namespace(rules):
def index_rules_by_namespace(rules: List[Rule]) -> Dict[str, List[Rule]]:
"""
compute the rules that fit into each namespace found within the given rules.
@@ -836,11 +863,6 @@ def index_rules_by_namespace(rules):
c2/shell: [create reverse shell]
c2/file-transfer: [download and write a file]
c2: [create reverse shell, download and write a file]
Args:
rules (List[Rule]):
Returns: Dict[str, List[Rule]]
"""
namespaces = collections.defaultdict(list)
@@ -856,7 +878,7 @@ def index_rules_by_namespace(rules):
return dict(namespaces)
def topologically_order_rules(rules):
def topologically_order_rules(rules: List[Rule]) -> List[Rule]:
"""
order the given rules such that dependencies show up before dependents.
this means that as we match rules, we can add features for the matches, and these
@@ -867,7 +889,7 @@ def topologically_order_rules(rules):
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules}
rules_by_name = {rule.name: rule for rule in rules}
seen = set([])
ret = []
@@ -876,18 +898,18 @@ def topologically_order_rules(rules):
return
for dep in rule.get_dependencies(namespaces):
rec(rules[dep])
rec(rules_by_name[dep])
ret.append(rule)
seen.add(rule.name)
for rule in rules.values():
for rule in rules_by_name.values():
rec(rule)
return ret
class RuleSet(object):
class RuleSet:
"""
a ruleset is initialized with a collection of rules, which it verifies and sorts into scopes.
each set of scoped rules is sorted topologically, which enables rules to match on past rule matches.
@@ -902,7 +924,7 @@ class RuleSet(object):
capa.engine.match(ruleset.file_rules, ...)
"""
def __init__(self, rules):
def __init__(self, rules: List[Rule]):
super(RuleSet, self).__init__()
ensure_rules_are_unique(rules)
@@ -949,7 +971,7 @@ class RuleSet(object):
continue
scope_rules.update(get_rules_and_dependencies(rules, rule.name))
return get_rules_with_scope(topologically_order_rules(scope_rules), scope)
return get_rules_with_scope(topologically_order_rules(list(scope_rules)), scope)
@staticmethod
def _extract_subscope_rules(rules):
@@ -973,7 +995,7 @@ class RuleSet(object):
return done
def filter_rules_by_meta(self, tag):
def filter_rules_by_meta(self, tag: str) -> "RuleSet":
"""
return new rule set with rules filtered based on all meta field values, adds all dependency rules
apply tag-based rule filter assuming that all required rules are loaded
@@ -982,7 +1004,7 @@ class RuleSet(object):
TODO handle circular dependencies?
TODO support -t=metafield <k>
"""
rules = self.rules.values()
rules = list(self.rules.values())
rules_filtered = set([])
for rule in rules:
for k, v in rule.meta.items():

View File

@@ -17,8 +17,7 @@ from capa.engine import *
RULES_PATH = "/tmp/capa/rules/"
# load rules from disk
rules = capa.main.get_rules(RULES_PATH, disable_progress=True)
rules = capa.rules.RuleSet(rules)
rules = capa.rules.RuleSet(capa.main.get_rules(RULES_PATH, disable_progress=True))
# == Render ddictionary helpers
def render_meta(doc, ostream):

View File

@@ -25,7 +25,8 @@ Derived from: https://github.com/fireeye/capa/blob/master/scripts/import-to-ida.
import os
import json
from binaryninja import *
import binaryninja
import binaryninja.interaction
def append_func_cmt(bv, va, cmt):
@@ -46,31 +47,31 @@ def append_func_cmt(bv, va, cmt):
def load_analysis(bv):
shortname = os.path.splitext(os.path.basename(bv.file.filename))[0]
dirname = os.path.dirname(bv.file.filename)
log_info(f"dirname: {dirname}\nshortname: {shortname}\n")
binaryninja.log_info(f"dirname: {dirname}\nshortname: {shortname}\n")
if os.access(os.path.join(dirname, shortname + ".js"), os.R_OK):
path = os.path.join(dirname, shortname + ".js")
elif os.access(os.path.join(dirname, shortname + ".json"), os.R_OK):
path = os.path.join(dirname, shortname + ".json")
else:
path = interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)")
path = binaryninja.interaction.get_open_filename_input("capa report:", "JSON (*.js *.json);;All Files (*)")
if not path or not os.access(path, os.R_OK):
log_error("Invalid filename.")
binaryninja.log_error("Invalid filename.")
return 0
log_info("Using capa file %s" % path)
binaryninja.log_info("Using capa file %s" % path)
with open(path, "rb") as f:
doc = json.loads(f.read().decode("utf-8"))
if "meta" not in doc or "rules" not in doc:
log_error("doesn't appear to be a capa report")
binaryninja.log_error("doesn't appear to be a capa report")
return -1
a = doc["meta"]["sample"]["md5"].lower()
md5 = Transform["MD5"]
rawhex = Transform["RawHex"]
md5 = binaryninja.Transform["MD5"]
rawhex = binaryninja.Transform["RawHex"]
b = rawhex.encode(md5.encode(bv.parent_view.read(bv.parent_view.start, bv.parent_view.end))).decode("utf-8")
if not a == b:
log_error("sample mismatch")
binaryninja.log_error("sample mismatch")
return -2
rows = []
@@ -96,7 +97,7 @@ def load_analysis(bv):
else:
cmt = "%s" % (name,)
log_info("0x%x: %s" % (va, cmt))
binaryninja.log_info("0x%x: %s" % (va, cmt))
try:
# message will look something like:
#
@@ -105,7 +106,7 @@ def load_analysis(bv):
except ValueError:
continue
log_info("ok")
binaryninja.log_info("ok")
PluginCommand.register("Load capa file", "Loads an analysis file from capa", load_analysis)
binaryninja.PluginCommand.register("Load capa file", "Loads an analysis file from capa", load_analysis)

View File

@@ -49,7 +49,7 @@ def green(s):
return termcolor.colored(s, "green")
class Lint(object):
class Lint:
WARN = orange("WARN")
FAIL = red("FAIL")

View File

@@ -25,6 +25,7 @@ requirements = [
"vivisect==1.0.3",
"smda==1.5.18",
"pefile==2021.5.24",
"typing==3.7.4.3",
]
# this sets __version__
@@ -72,7 +73,14 @@ setuptools.setup(
"pycodestyle==2.7.0",
"black==21.6b0",
"isort==5.8.0",
]
"mypy==0.901",
# type stubs for mypy
"types-backports==0.1.2",
"types-colorama==0.4.0",
"types-PyYAML==0.1.6",
"types-tabulate==0.1.0",
"types-termcolor==0.1.0",
],
},
zip_safe=False,
keywords="capa malware analysis capability detection FLARE",