From fc1709ba6c94c4281cac173befdf8a05c47f255b Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Fri, 8 Apr 2022 11:53:42 -0600 Subject: [PATCH] extractor: add types throughout --- capa/features/extractors/common.py | 28 ++++++++------- capa/features/extractors/dnfile_.py | 27 ++++++++------- capa/features/extractors/smda/extractor.py | 6 +++- capa/features/extractors/viv/basicblock.py | 25 ++++++++------ capa/features/extractors/viv/extractor.py | 9 +++-- capa/features/extractors/viv/file.py | 40 ++++++++++++---------- capa/features/extractors/viv/function.py | 24 +++++++------ capa/features/extractors/viv/global_.py | 10 +++--- capa/features/extractors/viv/insn.py | 31 ++++++++++------- capa/main.py | 8 ++--- 10 files changed, 118 insertions(+), 90 deletions(-) diff --git a/capa/features/extractors/common.py b/capa/features/extractors/common.py index 786e4faf..0981f547 100644 --- a/capa/features/extractors/common.py +++ b/capa/features/extractors/common.py @@ -2,36 +2,38 @@ import io import logging import binascii import contextlib +from typing import Tuple, Iterator import pefile import capa.features import capa.features.extractors.elf import capa.features.extractors.pefile -from capa.features.common import OS, FORMAT_PE, FORMAT_ELF, OS_WINDOWS, FORMAT_FREEZE, Arch, Format, String +from capa.features.common import OS, FORMAT_PE, FORMAT_ELF, OS_WINDOWS, FORMAT_FREEZE, Arch, Format, String, Feature from capa.features.freeze import is_freeze +from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress logger = logging.getLogger(__name__) -def extract_file_strings(buf, **kwargs): +def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[Feature, Address]]: """ extract ASCII and UTF-16 LE strings from file """ for s in capa.features.extractors.strings.extract_ascii_strings(buf): - yield String(s.s), s.offset + yield String(s.s), FileOffsetAddress(s.offset) for s in capa.features.extractors.strings.extract_unicode_strings(buf): - yield String(s.s), s.offset + yield String(s.s), FileOffsetAddress(s.offset) -def extract_format(buf): +def extract_format(buf) -> Iterator[Tuple[Feature, Address]]: if buf.startswith(b"MZ"): - yield Format(FORMAT_PE), 0x0 + yield Format(FORMAT_PE), NO_ADDRESS elif buf.startswith(b"\x7fELF"): - yield Format(FORMAT_ELF), 0x0 + yield Format(FORMAT_ELF), NO_ADDRESS elif is_freeze(buf): - yield Format(FORMAT_FREEZE), 0x0 + yield Format(FORMAT_FREEZE), NO_ADDRESS else: # we likely end up here: # 1. handling a file format (e.g. macho) @@ -41,7 +43,7 @@ def extract_format(buf): return -def extract_arch(buf): +def extract_arch(buf) -> Iterator[Tuple[Feature, Address]]: if buf.startswith(b"MZ"): yield from capa.features.extractors.pefile.extract_file_arch(pe=pefile.PE(data=buf)) @@ -53,7 +55,7 @@ def extract_arch(buf): logger.debug("unsupported arch: %s", arch) return - yield Arch(arch), 0x0 + yield Arch(arch), NO_ADDRESS else: # we likely end up here: @@ -70,9 +72,9 @@ def extract_arch(buf): return -def extract_os(buf): +def extract_os(buf) -> Iterator[Tuple[Feature, Address]]: if buf.startswith(b"MZ"): - yield OS(OS_WINDOWS), 0x0 + yield OS(OS_WINDOWS), NO_ADDRESS elif buf.startswith(b"\x7fELF"): with contextlib.closing(io.BytesIO(buf)) as f: os = capa.features.extractors.elf.detect_elf_os(f) @@ -81,7 +83,7 @@ def extract_os(buf): logger.debug("unsupported os: %s", os) return - yield OS(os), 0x0 + yield OS(os), NO_ADDRESS else: # we likely end up here: diff --git a/capa/features/extractors/dnfile_.py b/capa/features/extractors/dnfile_.py index af9befcc..e99e9f8d 100644 --- a/capa/features/extractors/dnfile_.py +++ b/capa/features/extractors/dnfile_.py @@ -5,34 +5,35 @@ import dnfile import pefile from capa.features.common import OS, OS_ANY, ARCH_ANY, ARCH_I386, ARCH_AMD64, FORMAT_DOTNET, Arch, Format, Feature +from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress, AbsoluteVirtualAddress from capa.features.extractors.base_extractor import FeatureExtractor logger = logging.getLogger(__name__) def extract_file_format(**kwargs): - yield Format(FORMAT_DOTNET), 0x0 + yield Format(FORMAT_DOTNET), NO_ADDRESS def extract_file_os(**kwargs): - yield OS(OS_ANY), 0x0 + yield OS(OS_ANY), NO_ADDRESS def extract_file_arch(pe, **kwargs): # to distinguish in more detail, see https://stackoverflow.com/a/23614024/10548020 # .NET 4.5 added option: any CPU, 32-bit preferred if pe.net.Flags.CLR_32BITREQUIRED and pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE: - yield Arch(ARCH_I386), 0x0 + yield Arch(ARCH_I386), NO_ADDRESS elif not pe.net.Flags.CLR_32BITREQUIRED and pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE_PLUS: - yield Arch(ARCH_AMD64), 0x0 + yield Arch(ARCH_AMD64), NO_ADDRESS else: - yield Arch(ARCH_ANY), 0x0 + yield Arch(ARCH_ANY), NO_ADDRESS -def extract_file_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, int]]: +def extract_file_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, Address]]: for file_handler in FILE_HANDLERS: - for feature, va in file_handler(pe=pe): # type: ignore - yield feature, va + for feature, address in file_handler(pe=pe): # type: ignore + yield feature, address FILE_HANDLERS = ( @@ -45,10 +46,10 @@ FILE_HANDLERS = ( ) -def extract_global_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, int]]: +def extract_global_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, Address]]: for handler in GLOBAL_HANDLERS: - for feature, va in handler(pe=pe): # type: ignore - yield feature, va + for feature, addr in handler(pe=pe): # type: ignore + yield feature, addr GLOBAL_HANDLERS = ( @@ -63,8 +64,8 @@ class DnfileFeatureExtractor(FeatureExtractor): self.path: str = path self.pe: dnfile.dnPE = dnfile.dnPE(path) - def get_base_address(self) -> int: - return 0x0 + def get_base_address(self) -> AbsoluteVirtualAddress: + return AbsoluteVirtualAddress(0x0) def get_entry_point(self) -> int: # self.pe.net.Flags.CLT_NATIVE_ENTRYPOINT diff --git a/capa/features/extractors/smda/extractor.py b/capa/features/extractors/smda/extractor.py index 56788e43..ed20e84c 100644 --- a/capa/features/extractors/smda/extractor.py +++ b/capa/features/extractors/smda/extractor.py @@ -1,3 +1,5 @@ +from typing import List, Tuple + from smda.common.SmdaReport import SmdaReport import capa.features.extractors.common @@ -6,6 +8,8 @@ import capa.features.extractors.smda.insn import capa.features.extractors.smda.global_ import capa.features.extractors.smda.function import capa.features.extractors.smda.basicblock +from capa.features.common import Feature +from capa.features.address import Address from capa.features.extractors.base_extractor import FeatureExtractor @@ -18,7 +22,7 @@ class SmdaFeatureExtractor(FeatureExtractor): self.buf = f.read() # pre-compute these because we'll yield them at *every* scope. - self.global_features = [] + self.global_features: List[Tuple[Feature, Address]] = [] self.global_features.extend(capa.features.extractors.common.extract_os(self.buf)) self.global_features.extend(capa.features.extractors.smda.global_.extract_arch(self.smda_report)) diff --git a/capa/features/extractors/viv/basicblock.py b/capa/features/extractors/viv/basicblock.py index e1d6cc89..fcc989a7 100644 --- a/capa/features/extractors/viv/basicblock.py +++ b/capa/features/extractors/viv/basicblock.py @@ -8,27 +8,30 @@ import string import struct +from typing import Tuple, Iterator import envi import envi.archs.i386.disasm -from capa.features.common import Characteristic +from capa.features.common import Feature, Characteristic +from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.basicblock import BasicBlock from capa.features.extractors.helpers import MIN_STACKSTRING_LEN +from capa.features.extractors.base_extractor import BBHandle, FunctionHandle -def interface_extract_basic_block_XXX(f, bb): +def interface_extract_basic_block_XXX(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]: """ parse features from the given basic block. args: - f (viv_utils.Function): the function to process. - bb (viv_utils.BasicBlock): the basic block to process. + f: the function to process. + bb: the basic block to process. yields: - (Feature, int): the feature and the address at which its found. + (Feature, Address): the feature and the address at which its found. """ - yield NotImplementedError("feature"), NotImplementedError("virtual address") + ... def _bb_has_tight_loop(f, bb): @@ -44,7 +47,7 @@ def _bb_has_tight_loop(f, bb): return False -def extract_bb_tight_loop(f, bb): +def extract_bb_tight_loop(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]: """check basic block for tight loop indicators""" if _bb_has_tight_loop(f, bb): yield Characteristic("tight loop"), bb.va @@ -67,7 +70,7 @@ def _bb_has_stackstring(f, bb): return False -def extract_stackstring(f, bb): +def extract_stackstring(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]: """check basic block for stackstring indicators""" if _bb_has_stackstring(f, bb): yield Characteristic("stack string"), bb.va @@ -143,7 +146,7 @@ def is_printable_utf16le(chars: bytes) -> bool: return False -def extract_features(f, bb): +def extract_features(f: FunctionHandle, bb: BBHandle) -> Iterator[Tuple[Feature, Address]]: """ extract features from the given basic block. @@ -156,8 +159,8 @@ def extract_features(f, bb): """ yield BasicBlock(), bb.va for bb_handler in BASIC_BLOCK_HANDLERS: - for feature, va in bb_handler(f, bb): - yield feature, va + for feature, addr in bb_handler(f, bb): + yield feature, addr BASIC_BLOCK_HANDLERS = ( diff --git a/capa/features/extractors/viv/extractor.py b/capa/features/extractors/viv/extractor.py index 34f952bc..04083e34 100644 --- a/capa/features/extractors/viv/extractor.py +++ b/capa/features/extractors/viv/extractor.py @@ -6,6 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import logging +from typing import List, Tuple import viv_utils import viv_utils.flirt @@ -16,7 +17,9 @@ import capa.features.extractors.viv.insn import capa.features.extractors.viv.global_ import capa.features.extractors.viv.function import capa.features.extractors.viv.basicblock -from capa.features.extractors.base_extractor import FeatureExtractor +from capa.features.common import Feature +from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor logger = logging.getLogger(__name__) @@ -43,13 +46,13 @@ class VivisectFeatureExtractor(FeatureExtractor): self.buf = f.read() # pre-compute these because we'll yield them at *every* scope. - self.global_features = [] + self.global_features: List[Tuple[Feature, Address]] = [] self.global_features.extend(capa.features.extractors.common.extract_os(self.buf)) self.global_features.extend(capa.features.extractors.viv.global_.extract_arch(self.vw)) def get_base_address(self): # assume there is only one file loaded into the vw - return list(self.vw.filemeta.values())[0]["imagebase"] + return AbsoluteVirtualAddress(list(self.vw.filemeta.values())[0]["imagebase"]) def extract_global_features(self): yield from self.global_features diff --git a/capa/features/extractors/viv/file.py b/capa/features/extractors/viv/file.py index 718e014e..c7d02e64 100644 --- a/capa/features/extractors/viv/file.py +++ b/capa/features/extractors/viv/file.py @@ -5,6 +5,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +from typing import Tuple, Iterator import PE.carve as pe_carve # vivisect PE import viv_utils @@ -15,20 +16,21 @@ import capa.features.extractors.common import capa.features.extractors.helpers import capa.features.extractors.strings from capa.features.file import Export, Import, Section, FunctionName -from capa.features.common import String, Characteristic +from capa.features.common import String, Feature, Characteristic +from capa.features.address import Address, FileOffsetAddress, AbsoluteVirtualAddress -def extract_file_embedded_pe(buf, **kwargs): +def extract_file_embedded_pe(buf, **kwargs) -> Iterator[Tuple[Feature, Address]]: for offset, _ in pe_carve.carve(buf, 1): - yield Characteristic("embedded pe"), offset + yield Characteristic("embedded pe"), FileOffsetAddress(offset) -def extract_file_export_names(vw, **kwargs): +def extract_file_export_names(vw, **kwargs) -> Iterator[Tuple[Feature, Address]]: for va, _, name, _ in vw.getExports(): - yield Export(name), va + yield Export(name), AbsoluteVirtualAddress(va) -def extract_file_import_names(vw, **kwargs): +def extract_file_import_names(vw, **kwargs) -> Iterator[Tuple[Feature, Address]]: """ extract imported function names 1. imports by ordinal: @@ -44,8 +46,9 @@ def extract_file_import_names(vw, **kwargs): # replace ord prefix with # impname = "#%s" % impname[len("ord") :] + addr = AbsoluteVirtualAddress(va) for name in capa.features.extractors.helpers.generate_symbols(modname, impname): - yield Import(name), va + yield Import(name), addr def is_viv_ord_impname(impname: str) -> bool: @@ -62,36 +65,37 @@ def is_viv_ord_impname(impname: str) -> bool: return True -def extract_file_section_names(vw, **kwargs): +def extract_file_section_names(vw, **kwargs) -> Iterator[Tuple[Feature, Address]]: for va, _, segname, _ in vw.getSegments(): - yield Section(segname), va + yield Section(segname), AbsoluteVirtualAddress(va) -def extract_file_strings(buf, **kwargs): +def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[Feature, Address]]: yield from capa.features.extractors.common.extract_file_strings(buf) -def extract_file_function_names(vw, **kwargs): +def extract_file_function_names(vw, **kwargs) -> Iterator[Tuple[Feature, Address]]: """ extract the names of statically-linked library functions. """ for va in sorted(vw.getFunctions()): + addr = AbsoluteVirtualAddress(va) if viv_utils.flirt.is_library_function(vw, va): name = viv_utils.get_function_name(vw, va) - yield FunctionName(name), va + yield FunctionName(name), addr if name.startswith("_"): # some linkers may prefix linked routines with a `_` to avoid name collisions. # extract features for both the mangled and un-mangled representations. # e.g. `_fwrite` -> `fwrite` # see: https://stackoverflow.com/a/2628384/87207 - yield FunctionName(name[1:]), va + yield FunctionName(name[1:]), addr -def extract_file_format(buf, **kwargs): +def extract_file_format(buf, **kwargs) -> Iterator[Tuple[Feature, Address]]: yield from capa.features.extractors.common.extract_format(buf) -def extract_features(vw, buf: bytes): +def extract_features(vw, buf: bytes) -> Iterator[Tuple[Feature, Address]]: """ extract file features from given workspace @@ -100,12 +104,12 @@ def extract_features(vw, buf: bytes): buf: the raw input file bytes yields: - Tuple[Feature, VA]: a feature and its location. + Tuple[Feature, Address]: a feature and its location. """ for file_handler in FILE_HANDLERS: - for feature, va in file_handler(vw=vw, buf=buf): # type: ignore - yield feature, va + for feature, addr in file_handler(vw=vw, buf=buf): # type: ignore + yield feature, addr FILE_HANDLERS = ( diff --git a/capa/features/extractors/viv/function.py b/capa/features/extractors/viv/function.py index 56bc2668..62d0158d 100644 --- a/capa/features/extractors/viv/function.py +++ b/capa/features/extractors/viv/function.py @@ -5,33 +5,37 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +from typing import Tuple, Iterator import envi +import viv_utils import vivisect.const -from capa.features.common import Characteristic +from capa.features.common import Feature, Characteristic +from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.extractors import loops +from capa.features.extractors.base_extractor import FunctionHandle -def interface_extract_function_XXX(f): +def interface_extract_function_XXX(f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: """ parse features from the given function. args: - f (viv_utils.Function): the function to process. + f: the function to process. yields: - (Feature, int): the feature and the address at which its found. + (Feature, Address): the feature and the address at which its found. """ - yield NotImplementedError("feature"), NotImplementedError("virtual address") + ... -def extract_function_calls_to(f): +def extract_function_calls_to(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: for src, _, _, _ in f.vw.getXrefsTo(f.va, rtype=vivisect.const.REF_CODE): yield Characteristic("calls to"), src -def extract_function_loop(f): +def extract_function_loop(fhandle: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: """ parse if a function has a loop """ @@ -53,7 +57,7 @@ def extract_function_loop(f): yield Characteristic("loop"), f.va -def extract_features(f): +def extract_features(f: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: """ extract features from the given function. @@ -64,8 +68,8 @@ def extract_features(f): Tuple[Feature, int]: the features and their location found in this function. """ for func_handler in FUNCTION_HANDLERS: - for feature, va in func_handler(f): - yield feature, va + for feature, addr in func_handler(f): + yield feature, addr FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop) diff --git a/capa/features/extractors/viv/global_.py b/capa/features/extractors/viv/global_.py index 8fc08ee2..6d890d40 100644 --- a/capa/features/extractors/viv/global_.py +++ b/capa/features/extractors/viv/global_.py @@ -1,19 +1,21 @@ import logging +from typing import Tuple, Iterator import envi.archs.i386 import envi.archs.amd64 -from capa.features.common import ARCH_I386, ARCH_AMD64, Arch +from capa.features.common import ARCH_I386, ARCH_AMD64, Arch, Feature +from capa.features.address import NO_ADDRESS, Address logger = logging.getLogger(__name__) -def extract_arch(vw): +def extract_arch(vw) -> Iterator[Tuple[Feature, Address]]: if isinstance(vw.arch, envi.archs.amd64.Amd64Module): - yield Arch(ARCH_AMD64), 0x0 + yield Arch(ARCH_AMD64), NO_ADDRESS elif isinstance(vw.arch, envi.archs.i386.i386Module): - yield Arch(ARCH_I386), 0x0 + yield Arch(ARCH_I386), NO_ADDRESS else: # we likely end up here: diff --git a/capa/features/extractors/viv/insn.py b/capa/features/extractors/viv/insn.py index e721606f..1d849fda 100644 --- a/capa/features/extractors/viv/insn.py +++ b/capa/features/extractors/viv/insn.py @@ -5,6 +5,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +from typing import List, Tuple, Callable, Iterator import envi import envi.exc import viv_utils @@ -18,7 +19,9 @@ import envi.archs.amd64.disasm import capa.features.extractors.helpers import capa.features.extractors.viv.helpers from capa.features.insn import API, Number, Offset, Mnemonic, OperandNumber, OperandOffset -from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Characteristic +from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Feature, Characteristic +from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle from capa.features.extractors.viv.indirect_calls import NotFoundError, resolve_indirect_call # security cookie checks may perform non-zeroing XORs, these are expected within a certain @@ -26,19 +29,21 @@ from capa.features.extractors.viv.indirect_calls import NotFoundError, resolve_i SECURITY_COOKIE_BYTES_DELTA = 0x40 -def interface_extract_instruction_XXX(f, bb, insn): +def interface_extract_instruction_XXX( + f: FunctionHandle, bb: BBHandle, insn: InsnHandle +) -> Iterator[Tuple[Feature, Address]]: """ parse features from the given instruction. args: - f (viv_utils.Function): the function to process. - bb (viv_utils.BasicBlock): the basic block to process. - insn (vivisect...Instruction): the instruction to process. + f: the function to process. + bb: the basic block to process. + insn: the instruction to process. yields: - (Feature, int): the feature and the address at which its found. + (Feature, Address): the feature and the address at which its found. """ - yield NotImplementedError("feature"), NotImplementedError("virtual address") + ... def get_imports(vw): @@ -610,8 +615,8 @@ def extract_op_string_features(f, bb, insn, i, oper): def extract_operand_features(f, bb, insn): for i, oper in enumerate(insn.opers): for op_handler in OPERAND_HANDLERS: - for feature, va in op_handler(f, bb, insn, i, oper): - yield feature, va + for feature, addr in op_handler(f, bb, insn, i, oper): + yield feature, addr OPERAND_HANDLERS = ( @@ -621,7 +626,7 @@ OPERAND_HANDLERS = ( ) -def extract_features(f, bb, insn): +def extract_features(f, bb, insn) -> Iterator[Tuple[Feature, Address]]: """ extract features from the given insn. @@ -631,11 +636,11 @@ def extract_features(f, bb, insn): insn (vivisect...Instruction): the instruction to process. yields: - Tuple[Feature, int]: the features and their location found in this insn. + Tuple[Feature, Address]: the features and their location found in this insn. """ for insn_handler in INSTRUCTION_HANDLERS: - for feature, va in insn_handler(f, bb, insn): - yield feature, va + for feature, addr in insn_handler(f, bb, insn): + yield feature, addr INSTRUCTION_HANDLERS = ( diff --git a/capa/main.py b/capa/main.py index cdb4a164..9c086fce 100644 --- a/capa/main.py +++ b/capa/main.py @@ -114,18 +114,18 @@ def find_instruction_capabilities( # all features found for the instruction. features = collections.defaultdict(set) # type: FeatureSet - for feature, va in itertools.chain( + for feature, addr in itertools.chain( extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features() ): - features[feature].add(va) + features[feature].add(addr) # matches found at this instruction. _, matches = ruleset.match(Scope.INSTRUCTION, features, int(insn)) for rule_name, res in matches.items(): rule = ruleset[rule_name] - for va, _ in res: - capa.engine.index_rule_matches(features, rule, [va]) + for addr, _ in res: + capa.engine.index_rule_matches(features, rule, [addr]) return features, matches