diff --git a/.github/flake8.ini b/.github/flake8.ini new file mode 100644 index 00000000..f97dacc2 --- /dev/null +++ b/.github/flake8.ini @@ -0,0 +1,29 @@ +[flake8] +max-line-length = 120 + +extend-ignore = + # E203: whitespace before ':' (black does this) + E203, + # F401: `foo` imported but unused (prefer ruff) + F401, + # F811 Redefinition of unused `foo` (prefer ruff) + F811, + # E501 line too long (prefer black) + E501, + # B010 Do not call setattr with a constant attribute value + B010, + # G200 Logging statement uses exception in arguments + G200 + + +per-file-ignores = + # T201 print found. + # + # scripts are meant to print output + scripts/*: T201 + # capa.exe is meant to print output + capa/main.py: T201 + # IDA tests emit results to output window so need to print + tests/test_ida_features.py: T201 + # utility used to find the Binary Ninja API via invoking python.exe + capa/features/extractors/binja/find_binja_api.py: T201 \ No newline at end of file diff --git a/.github/ruff.toml b/.github/ruff.toml index 2740b0e0..3a5254a9 100644 --- a/.github/ruff.toml +++ b/.github/ruff.toml @@ -1,10 +1,61 @@ -# Enable pycodestyle (`E`) codes -select = ["E"] +# Enable the pycodestyle (`E`) and Pyflakes (`F`) rules by default. +# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or +# McCabe complexity (`C901`) by default. +select = ["E", "F"] + +# Allow autofix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] # E402 module level import not at top of file # E722 do not use bare 'except' -ignore = ["E402", "E722"] -exclude = ["*_pb2.py", "*_pb2.pyi"] +# E501 line too long +ignore = ["E402", "E722", "E501"] -# Same as pycodestyle. -line-length = 180 +line-length = 120 + +exclude = [ + # Exclude a variety of commonly ignored directories. + ".bzr", + ".direnv", + ".eggs", + ".git", + ".git-rewrite", + ".hg", + ".mypy_cache", + ".nox", + ".pants.d", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "venv", + # protobuf generated files + "*_pb2.py", + "*_pb2.pyi" +] + +[per-file-ignores] +# until we address #1592 and move test fixtures into conftest.py +# then we need to ignore imports done to enable pytest fixtures. +# +# F401: `foo` imported but unused +# F811 Redefinition of unused `foo` +"tests/test_main.py" = ["F401", "F811"] +"tests/test_proto.py" = ["F401", "F811"] +"tests/test_freeze.py" = ["F401", "F811"] +"tests/test_function_id.py" = ["F401", "F811"] +"tests/test_viv_features.py" = ["F401", "F811"] +"tests/test_binja_features.py" = ["F401", "F811"] +"tests/test_pefile_features.py" = ["F401", "F811"] +"tests/test_dnfile_features.py" = ["F401", "F811"] +"tests/test_dotnet_features.py" = ["F401", "F811"] +"tests/test_result_document.py" = ["F401", "F811"] +"tests/test_dotnetfile_features.py" = ["F401", "F811"] diff --git a/.github/tox.ini b/.github/tox.ini deleted file mode 100644 index bb8dcdbe..00000000 --- a/.github/tox.ini +++ /dev/null @@ -1,10 +0,0 @@ -[pycodestyle] -; E402: module level import not at top of file -; W503: line break before binary operator -; E231 missing whitespace after ',' (emitted by black) -; E203 whitespace before ':' (emitted by black) -ignore = E402,W503,E203,E231 -max-line-length = 160 -statistics = True -count = True -exclude = .* diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index acd7d807..08797988 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,5 +1,8 @@ name: build +permissions: + contents: write + on: pull_request: branches: [ master ] diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 002a7095..390a814f 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -1,15 +1,21 @@ -# This workflows will upload a Python Package using Twine when a release is created -# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries - +# use PyPI trusted publishing, as described here: +# https://blog.trailofbits.com/2023/05/23/trusted-publishing-a-new-benchmark-for-packaging-security/ name: publish to pypi +permissions: + contents: write + on: release: types: [published] jobs: - deploy: - runs-on: ubuntu-20.04 + pypi-publish: + runs-on: ubuntu-latest + environment: + name: release + permissions: + id-token: write steps: - uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c # v3.3.0 - name: Set up Python @@ -19,11 +25,24 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install setuptools wheel twine - - name: Build and publish - env: - TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} - TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + pip install -e .[build] + - name: build package run: | - python setup.py sdist bdist_wheel - twine upload --skip-existing dist/* + python -m build + - name: upload package artifacts + uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 + with: + name: ${{ matrix.asset_name }} + path: dist/* + - name: upload package to GH Release + uses: svenstaro/upload-release-action@2728235f7dc9ff598bd86ce3c274b74f802d2208 # v2 + with: + repo_token: ${{ secrets.GITHUB_TOKEN}} + file: dist/* + tag: ${{ github.ref }} + - name: publish package + uses: pypa/gh-action-pypi-publish@f5622bde02b04381239da3573277701ceca8f6a0 # release/v1 + with: + skip-existing: true + verbose: true + print-hash: true diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b6db661b..8ea4cc48 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -34,15 +34,15 @@ jobs: - name: Install dependencies run: pip install -e .[dev] - name: Lint with ruff - run: ruff check --config .github/ruff.toml . + run: pre-commit run ruff - name: Lint with isort - run: isort --profile black --length-sort --line-width 120 --skip-glob "*_pb2.py" -c . + run: pre-commit run isort - name: Lint with black - run: black -l 120 --extend-exclude ".*_pb2.py" --check . - - name: Lint with pycodestyle - run: pycodestyle --exclude="*_pb2.py" --show-source capa/ scripts/ tests/ + run: pre-commit run black + - name: Lint with flake8 + run: pre-commit run flake8 - name: Check types with mypy - run: mypy --config-file .github/mypy/mypy.ini --check-untyped-defs capa/ scripts/ tests/ + run: pre-commit run mypy rule_linter: runs-on: ubuntu-20.04 @@ -56,7 +56,7 @@ jobs: with: python-version: "3.8" - name: Install capa - run: pip install -e . + run: pip install -e .[dev] - name: Run rule linter run: python scripts/lint.py rules/ diff --git a/.gitignore b/.gitignore index 6898dcb6..95bc3044 100644 --- a/.gitignore +++ b/.gitignore @@ -108,13 +108,10 @@ venv.bak/ *.viv *.idb *.i64 +.vscode + !rules/lib -# hooks/ci.sh output -isort-output.log -black-output.log -rule-linter-output.log -.vscode scripts/perf/*.txt scripts/perf/*.svg scripts/perf/*.zip diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..dbc6e80f --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,111 @@ +# install the pre-commit hooks: +# +# ❯ pre-commit install --hook-type pre-commit +# pre-commit installed at .git/hooks/pre-commit +# +# ❯ pre-commit install --hook-type pre-push +# pre-commit installed at .git/hooks/pre-push +# +# run all linters liks: +# +# ❯ pre-commit run --all-files +# isort....................................................................Passed +# black....................................................................Passed +# ruff.....................................................................Passed +# flake8...................................................................Passed +# mypy.....................................................................Passed +# +# run a single linter like: +# +# ❯ pre-commit run --all-files isort +# isort....................................................................Passed + +repos: +- repo: local + hooks: + - id: isort + name: isort + stages: [commit, push] + language: system + entry: isort + args: + - "--length-sort" + - "--profile" + - "black" + - "--line-length=120" + - "--skip-glob" + - "*_pb2.py" + - "capa/" + - "scripts/" + - "tests/" + always_run: true + pass_filenames: false + +- repo: local + hooks: + - id: black + name: black + stages: [commit, push] + language: system + entry: black + args: + - "--line-length=120" + - "--extend-exclude" + - ".*_pb2.py" + - "capa/" + - "scripts/" + - "tests/" + always_run: true + pass_filenames: false + +- repo: local + hooks: + - id: ruff + name: ruff + stages: [commit, push] + language: system + entry: ruff + args: + - "check" + - "--config" + - ".github/ruff.toml" + - "capa/" + - "scripts/" + - "tests/" + always_run: true + pass_filenames: false + +- repo: local + hooks: + - id: flake8 + name: flake8 + stages: [commit, push] + language: system + entry: flake8 + args: + - "--config" + - ".github/flake8.ini" + - "--extend-exclude" + - "capa/render/proto/capa_pb2.py" + - "capa/" + - "scripts/" + - "tests/" + always_run: true + pass_filenames: false + +- repo: local + hooks: + - id: mypy + name: mypy + stages: [commit, push] + language: system + entry: mypy + args: + - "--check-untyped-defs" + - "--ignore-missing-imports" + - "--config-file=.github/mypy/mypy.ini" + - "capa/" + - "scripts/" + - "tests/" + always_run: true + pass_filenames: false diff --git a/CHANGELOG.md b/CHANGELOG.md index 12e09f53..49eebfef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,13 +6,16 @@ ### New Features - Utility script to detect feature overlap between new and existing CAPA rules [#1451](https://github.com/mandiant/capa/issues/1451) [@Aayush-Goel-04](https://github.com/aayush-goel-04) - use fancy box drawing characters for default output #1586 @williballenthin +- use [pre-commit](https://pre-commit.com/) to invoke linters #1579 @williballenthin +- publish via PyPI trusted publishing #1491 @williballenthin +- migrate to pyproject.toml #1301 @williballenthin ### Breaking Changes - Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat - Python 3.8 is now the minimum supported Python version #1578 @williballenthin - Updated file paths to use pathlib.Path for improved path handling and compatibility [#1534](https://github.com/mandiant/capa/issues/1534) [@Aayush-Goel-04](https://github.com/aayush-goel-04) -### New Rules (22) +### New Rules (24) - load-code/shellcode/execute-shellcode-via-windows-callback-function ervin.ocampo@mandiant.com jakub.jozwiak@mandiant.com - nursery/execute-shellcode-via-indirect-call ronnie.salomonsen@mandiant.com @@ -35,6 +38,8 @@ - anti-analysis/anti-debugging/debugger-evasion/hide-thread-from-debugger michael.hunhoff@mandiant.com jakub.jozwiak@mandiant.com - host-interaction/memory/create-new-application-domain-in-dotnet jakub.jozwiak@mandiant.com - host-interaction/gui/switch-active-desktop jakub.jozwiak@mandiant.com +- host-interaction/service/query-service-configuration @mr-tz +- anti-analysis/anti-av/patch-event-tracing-for-windows-function jakub.jozwiak@mandiant.com - ### Bug Fixes diff --git a/README.md b/README.md index 723671a4..6094a3b3 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/flare-capa)](https://pypi.org/project/flare-capa) [![Last release](https://img.shields.io/github/v/release/mandiant/capa)](https://github.com/mandiant/capa/releases) -[![Number of rules](https://img.shields.io/badge/rules-810-blue.svg)](https://github.com/mandiant/capa-rules) +[![Number of rules](https://img.shields.io/badge/rules-812-blue.svg)](https://github.com/mandiant/capa-rules) [![CI status](https://github.com/mandiant/capa/workflows/CI/badge.svg)](https://github.com/mandiant/capa/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster) [![Downloads](https://img.shields.io/github/downloads/mandiant/capa/total)](https://github.com/mandiant/capa/releases) [![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)](LICENSE.txt) diff --git a/capa/engine.py b/capa/engine.py index 4d9e0980..d498639d 100644 --- a/capa/engine.py +++ b/capa/engine.py @@ -8,7 +8,7 @@ import copy import collections -from typing import TYPE_CHECKING, Set, Dict, List, Tuple, Union, Mapping, Iterable, Iterator, cast +from typing import TYPE_CHECKING, Set, Dict, List, Tuple, Union, Mapping, Iterable, Iterator import capa.perf import capa.features.common @@ -71,7 +71,7 @@ class Statement: yield child if hasattr(self, "children"): - for child in getattr(self, "children"): + for child in self.children: assert isinstance(child, (Statement, Feature)) yield child @@ -83,7 +83,7 @@ class Statement: self.child = new if hasattr(self, "children"): - children = getattr(self, "children") + children = self.children for i, child in enumerate(children): if child is existing: children[i] = new diff --git a/capa/features/common.py b/capa/features/common.py index 5060ebaa..5ea1612b 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -100,7 +100,10 @@ class Result: return self.success -class Feature(abc.ABC): +class Feature(abc.ABC): # noqa: B024 + # this is an abstract class, since we don't want anyone to instantiate it directly, + # but it doesn't have any abstract methods. + def __init__( self, value: Union[str, int, float, bytes], @@ -124,7 +127,12 @@ class Feature(abc.ABC): return self.name == other.name and self.value == other.value def __lt__(self, other): - # TODO: this is a huge hack! + # implementing sorting by serializing to JSON is a huge hack. + # its slow, inelegant, and probably doesn't work intuitively; + # however, we only use it for deterministic output, so it's good enough for now. + + # circular import + # we should fix if this wasn't already a huge hack. import capa.features.freeze.features return ( @@ -267,7 +275,7 @@ class _MatchedSubstring(Substring): self.matches = matches def __str__(self): - matches = ", ".join(map(lambda s: '"' + s + '"', (self.matches or {}).keys())) + matches = ", ".join(f'"{s}"' for s in (self.matches or {}).keys()) assert isinstance(self.value, str) return f'substring("{self.value}", matches = {matches})' @@ -359,7 +367,7 @@ class _MatchedRegex(Regex): self.matches = matches def __str__(self): - matches = ", ".join(map(lambda s: '"' + s + '"', (self.matches or {}).keys())) + matches = ", ".join(f'"{s}"' for s in (self.matches or {}).keys()) assert isinstance(self.value, str) return f"regex(string =~ {self.value}, matches = {matches})" diff --git a/capa/features/extractors/binja/basicblock.py b/capa/features/extractors/binja/basicblock.py index e354669d..02855ad8 100644 --- a/capa/features/extractors/binja/basicblock.py +++ b/capa/features/extractors/binja/basicblock.py @@ -6,7 +6,6 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys import string import struct from typing import Tuple, Iterator @@ -15,7 +14,6 @@ from binaryninja import Function, Settings from binaryninja import BasicBlock as BinjaBasicBlock from binaryninja import ( BinaryView, - DataBuffer, SymbolType, RegisterValueType, VariableSourceType, @@ -26,7 +24,7 @@ from binaryninja import ( ) from capa.features.common import Feature, Characteristic -from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.address import Address from capa.features.basicblock import BasicBlock from capa.features.extractors.helpers import MIN_STACKSTRING_LEN from capa.features.extractors.base_extractor import BBHandle, FunctionHandle @@ -183,30 +181,3 @@ BASIC_BLOCK_HANDLERS = ( extract_bb_tight_loop, extract_bb_stackstring, ) - - -def main(): - if len(sys.argv) < 2: - return - - from binaryninja import BinaryViewType - - from capa.features.extractors.binja.extractor import BinjaFeatureExtractor - - bv: BinaryView = BinaryViewType.get_view_of_file(sys.argv[1]) - if bv is None: - return - - features = [] - extractor = BinjaFeatureExtractor(bv) - for fh in extractor.get_functions(): - for bbh in extractor.get_basic_blocks(fh): - features.extend(list(extract_features(fh, bbh))) - - import pprint - - pprint.pprint(features) - - -if __name__ == "__main__": - main() diff --git a/capa/features/extractors/binja/file.py b/capa/features/extractors/binja/file.py index 9700a8c1..c36b5b48 100644 --- a/capa/features/extractors/binja/file.py +++ b/capa/features/extractors/binja/file.py @@ -6,11 +6,10 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys import struct from typing import Tuple, Iterator -from binaryninja import Symbol, Segment, BinaryView, SymbolType, SymbolBinding +from binaryninja import Segment, BinaryView, SymbolType, SymbolBinding import capa.features.extractors.common import capa.features.extractors.helpers @@ -166,23 +165,3 @@ FILE_HANDLERS = ( extract_file_function_names, extract_file_format, ) - - -def main(): - """ """ - if len(sys.argv) < 2: - return - - from binaryninja import BinaryViewType - - bv: BinaryView = BinaryViewType.get_view_of_file(sys.argv[1]) - if bv is None: - return - - import pprint - - pprint.pprint(list(extract_features(bv))) - - -if __name__ == "__main__": - main() diff --git a/capa/features/extractors/binja/function.py b/capa/features/extractors/binja/function.py index 96c58eea..2bf735db 100644 --- a/capa/features/extractors/binja/function.py +++ b/capa/features/extractors/binja/function.py @@ -5,7 +5,6 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys from typing import Tuple, Iterator from binaryninja import Function, BinaryView, LowLevelILOperation @@ -19,7 +18,6 @@ from capa.features.extractors.base_extractor import FunctionHandle def extract_function_calls_to(fh: FunctionHandle): """extract callers to a function""" func: Function = fh.inner - bv: BinaryView = func.view for caller in func.caller_sites: # Everything that is a code reference to the current function is considered a caller, which actually includes @@ -68,30 +66,3 @@ def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop, extract_recursive_call) - - -def main(): - """ """ - if len(sys.argv) < 2: - return - - from binaryninja import BinaryViewType - - from capa.features.extractors.binja.extractor import BinjaFeatureExtractor - - bv: BinaryView = BinaryViewType.get_view_of_file(sys.argv[1]) - if bv is None: - return - - features = [] - extractor = BinjaFeatureExtractor(bv) - for fh in extractor.get_functions(): - features.extend(list(extract_features(fh))) - - import pprint - - pprint.pprint(features) - - -if __name__ == "__main__": - main() diff --git a/capa/features/extractors/binja/global_.py b/capa/features/extractors/binja/global_.py index f8ce7003..afcec4ad 100644 --- a/capa/features/extractors/binja/global_.py +++ b/capa/features/extractors/binja/global_.py @@ -1,10 +1,8 @@ import logging -import contextlib from typing import Tuple, Iterator from binaryninja import BinaryView -import capa.features.extractors.elf from capa.features.common import OS, OS_MACOS, ARCH_I386, ARCH_AMD64, OS_WINDOWS, Arch, Feature from capa.features.address import NO_ADDRESS, Address diff --git a/capa/features/extractors/binja/helpers.py b/capa/features/extractors/binja/helpers.py index 499de97b..4baae3f9 100644 --- a/capa/features/extractors/binja/helpers.py +++ b/capa/features/extractors/binja/helpers.py @@ -41,7 +41,10 @@ def unmangle_c_name(name: str) -> str: # _lstrlenWStub@4 # A small optimization to avoid running the regex too many times - # TODO: this still increases the unit test execution time from 170s to 200s, should be able to accelerate it + # this still increases the unit test execution time from 170s to 200s, should be able to accelerate it + # + # TODO(xusheng): performance optimizations to improve test execution time + # https://github.com/mandiant/capa/issues/1610 if name[0] in ["@", "_"]: match = re.match(r"^[@|_](.*?)(Stub)?(@\d+)?$", name) if match: diff --git a/capa/features/extractors/binja/insn.py b/capa/features/extractors/binja/insn.py index fe5cd228..53690b6e 100644 --- a/capa/features/extractors/binja/insn.py +++ b/capa/features/extractors/binja/insn.py @@ -5,8 +5,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import sys -from typing import Any, Dict, List, Tuple, Iterator, Optional +from typing import Any, List, Tuple, Iterator, Optional from binaryninja import Function from binaryninja import BasicBlock as BinjaBasicBlock @@ -18,12 +17,11 @@ from binaryninja import ( RegisterValueType, LowLevelILOperation, LowLevelILInstruction, - InstructionTextTokenType, ) import capa.features.extractors.helpers from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset -from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Feature, Characteristic +from capa.features.common import MAX_BYTES_FEATURE_SIZE, Bytes, String, Feature, Characteristic from capa.features.address import Address, AbsoluteVirtualAddress from capa.features.extractors.binja.helpers import DisassemblyInstruction, visit_llil_exprs from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle @@ -73,7 +71,6 @@ def extract_insn_api_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle) example: call dword [0x00473038] """ - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner bv: BinaryView = func.view @@ -128,12 +125,9 @@ def extract_insn_number_features( example: push 3136B0h ; dwControlCode """ - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner - bv: BinaryView = func.view results: List[Tuple[Any[Number, OperandNumber], Address]] = [] - address_size = func.view.arch.address_size * 8 def llil_checker(il: LowLevelILInstruction, parent: LowLevelILInstruction, index: int) -> bool: if il.operation == LowLevelILOperation.LLIL_LOAD: @@ -171,7 +165,6 @@ def extract_insn_bytes_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandl example: push offset iid_004118d4_IShellLinkA ; riid """ - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner bv: BinaryView = func.view @@ -220,7 +213,6 @@ def extract_insn_string_features( example: push offset aAcr ; "ACR > " """ - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner bv: BinaryView = func.view @@ -278,7 +270,6 @@ def extract_insn_offset_features( example: .text:0040112F cmp [esi+4], ebx """ - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner results: List[Tuple[Any[Offset, OperandOffset], Address]] = [] @@ -333,7 +324,8 @@ def extract_insn_offset_features( def is_nzxor_stack_cookie(f: Function, bb: BinjaBasicBlock, llil: LowLevelILInstruction) -> bool: """check if nzxor exists within stack cookie delta""" - # TODO: we can do a much accurate analysi using LLIL SSA + # TODO(xusheng): use LLIL SSA to do more accurate analysis + # https://github.com/mandiant/capa/issues/1609 reg_names = [] if llil.left.operation == LowLevelILOperation.LLIL_REG: @@ -364,7 +356,6 @@ def extract_insn_nzxor_characteristic_features( parse instruction non-zeroing XOR instruction ignore expected non-zeroing XORs, e.g. security cookies """ - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner results = [] @@ -414,7 +405,6 @@ def extract_insn_peb_access_characteristic_features( fs:[0x30] on x86, gs:[0x60] on x64 """ - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner results = [] @@ -456,7 +446,6 @@ def extract_insn_segment_access_features( fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle ) -> Iterator[Tuple[Feature, Address]]: """parse instruction fs or gs access""" - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner results = [] @@ -485,7 +474,6 @@ def extract_insn_cross_section_cflow( fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle ) -> Iterator[Tuple[Feature, Address]]: """inspect the instruction for a CALL or JMP that crosses section boundaries""" - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner bv: BinaryView = func.view @@ -509,7 +497,6 @@ def extract_function_calls_from(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandl most relevant at the function scope, however, its most efficient to extract at the instruction scope """ - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner bv: BinaryView = func.view @@ -555,7 +542,6 @@ def extract_function_indirect_call_characteristic_features( most relevant at the function or basic block scope; however, its most efficient to extract at the instruction scope """ - insn: DisassemblyInstruction = ih.inner func: Function = fh.inner llil = func.get_llil_at(ih.address) @@ -599,32 +585,3 @@ INSTRUCTION_HANDLERS = ( extract_function_calls_from, extract_function_indirect_call_characteristic_features, ) - - -def main(): - """ """ - if len(sys.argv) < 2: - return - - from binaryninja import BinaryViewType - - from capa.features.extractors.binja.extractor import BinjaFeatureExtractor - - bv: BinaryView = BinaryViewType.get_view_of_file(sys.argv[1]) - if bv is None: - return - - features = [] - extractor = BinjaFeatureExtractor(bv) - for fh in extractor.get_functions(): - for bbh in extractor.get_basic_blocks(fh): - for insn in extractor.get_instructions(fh, bbh): - features.extend(list(extract_features(fh, bbh, insn))) - - import pprint - - pprint.pprint(features) - - -if __name__ == "__main__": - main() diff --git a/capa/features/extractors/dnfile/insn.py b/capa/features/extractors/dnfile/insn.py index 431423a6..b70f39dc 100644 --- a/capa/features/extractors/dnfile/insn.py +++ b/capa/features/extractors/dnfile/insn.py @@ -9,7 +9,7 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Any, Dict, Tuple, Union, Iterator, Optional +from typing import TYPE_CHECKING, Tuple, Union, Iterator, Optional if TYPE_CHECKING: from capa.features.extractors.dnfile.extractor import DnFileFeatureExtractorCache diff --git a/capa/features/extractors/dnfile/types.py b/capa/features/extractors/dnfile/types.py index 822b5d67..5982a1fe 100644 --- a/capa/features/extractors/dnfile/types.py +++ b/capa/features/extractors/dnfile/types.py @@ -6,8 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from enum import Enum -from typing import Union, Optional +from typing import Optional class DnType(object): diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index 1f23d53e..b5960866 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -411,7 +411,7 @@ class ELF: # there should be vn_cnt of these. # each entry describes an ABI name required by the shared object. vna_offset = vn_offset + vn_aux - for i in range(vn_cnt): + for _ in range(vn_cnt): # ElfXX_Vernaux layout is the same on 32 and 64 bit _, _, _, vna_name, vna_next = struct.unpack_from(self.endian + "IHHII", shdr.buf, vna_offset) @@ -727,7 +727,7 @@ class SymTab: return cls(endian, bitness, sh_symtab, sh_strtab) except NameError: return None - except: + except Exception: # all exceptions that could be encountered by # cls._parse() imply a faulty symbol's table. raise CorruptElfFile("malformed symbol's table") @@ -824,7 +824,7 @@ def guess_os_from_abi_versions_needed(elf: ELF) -> Optional[OS]: # this will let us guess about linux/hurd in some cases. versions_needed = elf.versions_needed - if any(map(lambda abi: abi.startswith("GLIBC"), itertools.chain(*versions_needed.values()))): + if any(abi.startswith("GLIBC") for abi in itertools.chain(*versions_needed.values())): # there are any GLIBC versions needed if elf.e_machine != "i386": @@ -881,7 +881,7 @@ def guess_os_from_symtab(elf: ELF) -> Optional[OS]: sym_name = symtab.get_name(symbol) for os, hints in keywords.items(): - if any(map(lambda x: x in sym_name, hints)): + if any(hint in sym_name for hint in hints): return os return None diff --git a/capa/features/extractors/elffile.py b/capa/features/extractors/elffile.py index 1ed1aa9e..edf74dc2 100644 --- a/capa/features/extractors/elffile.py +++ b/capa/features/extractors/elffile.py @@ -37,8 +37,8 @@ def extract_file_import_names(elf, **kwargs): for _, symbol in enumerate(section.iter_symbols()): if symbol.name and symbol.entry.st_info.type == "STT_FUNC": - # TODO symbol address - # TODO symbol version info? + # TODO(williballenthin): extract symbol address + # https://github.com/mandiant/capa/issues/1608 yield Import(symbol.name), FileOffsetAddress(0x0) @@ -69,7 +69,6 @@ def extract_file_format(**kwargs): def extract_file_arch(elf, **kwargs): - # TODO merge with capa.features.extractors.elf.detect_elf_arch() arch = elf.get_machine_arch() if arch == "x86": yield Arch("i386"), NO_ADDRESS @@ -86,7 +85,8 @@ def extract_file_features(elf: ELFFile, buf: bytes) -> Iterator[Tuple[Feature, i FILE_HANDLERS = ( - # TODO extract_file_export_names, + # TODO(williballenthin): implement extract_file_export_names + # https://github.com/mandiant/capa/issues/1607 extract_file_import_names, extract_file_section_names, extract_file_strings, diff --git a/capa/features/extractors/ida/basicblock.py b/capa/features/extractors/ida/basicblock.py index 34f7d1ce..88a1247e 100644 --- a/capa/features/extractors/ida/basicblock.py +++ b/capa/features/extractors/ida/basicblock.py @@ -104,19 +104,3 @@ BASIC_BLOCK_HANDLERS = ( extract_bb_tight_loop, extract_bb_stackstring, ) - - -def main(): - features = [] - for fhandle in helpers.get_functions(skip_thunks=True, skip_libs=True): - f: idaapi.func_t = fhandle.inner - for bb in idaapi.FlowChart(f, flags=idaapi.FC_PREDS): - features.extend(list(extract_features(fhandle, bb))) - - import pprint - - pprint.pprint(features) - - -if __name__ == "__main__": - main() diff --git a/capa/features/extractors/ida/file.py b/capa/features/extractors/ida/file.py index a3da4c6a..e75d59e1 100644 --- a/capa/features/extractors/ida/file.py +++ b/capa/features/extractors/ida/file.py @@ -199,14 +199,3 @@ FILE_HANDLERS = ( extract_file_function_names, extract_file_format, ) - - -def main(): - """ """ - import pprint - - pprint.pprint(list(extract_features())) - - -if __name__ == "__main__": - main() diff --git a/capa/features/extractors/ida/function.py b/capa/features/extractors/ida/function.py index 8d0c6778..cb4d6329 100644 --- a/capa/features/extractors/ida/function.py +++ b/capa/features/extractors/ida/function.py @@ -50,18 +50,3 @@ def extract_features(fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: FUNCTION_HANDLERS = (extract_function_calls_to, extract_function_loop, extract_recursive_call) - - -def main(): - """ """ - features = [] - for fhandle in capa.features.extractors.ida.helpers.get_functions(skip_thunks=True, skip_libs=True): - features.extend(list(extract_features(fhandle))) - - import pprint - - pprint.pprint(features) - - -if __name__ == "__main__": - main() diff --git a/capa/features/extractors/ida/helpers.py b/capa/features/extractors/ida/helpers.py index 0279f6fd..7d269166 100644 --- a/capa/features/extractors/ida/helpers.py +++ b/capa/features/extractors/ida/helpers.py @@ -5,6 +5,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import functools from typing import Any, Dict, Tuple, Iterator, Optional import idc @@ -27,7 +28,8 @@ def find_byte_sequence(start: int, end: int, seq: bytes) -> Iterator[int]: """ seqstr = " ".join([f"{b:02x}" for b in seq]) while True: - # TODO find_binary: Deprecated. Please use ida_bytes.bin_search() instead. + # TODO(mike-hunhoff): find_binary is deprecated. Please use ida_bytes.bin_search() instead. + # https://github.com/mandiant/capa/issues/1606 ea = idaapi.find_binary(start, end, seqstr, 0, idaapi.SEARCH_DOWN) if ea == idaapi.BADADDR: break @@ -80,9 +82,22 @@ def get_segment_buffer(seg: idaapi.segment_t) -> bytes: return buff if buff else b"" +def inspect_import(imports, library, ea, function, ordinal): + if function and function.startswith("__imp_"): + # handle mangled PE imports + function = function[len("__imp_") :] + + if function and "@@" in function: + # handle mangled ELF imports, like "fopen@@glibc_2.2.5" + function, _, _ = function.partition("@@") + + imports[ea] = (library.lower(), function, ordinal) + return True + + def get_file_imports() -> Dict[int, Tuple[str, str, int]]: """get file imports""" - imports = {} + imports: Dict[int, Tuple[str, str, int]] = {} for idx in range(idaapi.get_import_module_qty()): library = idaapi.get_import_module_name(idx) @@ -92,23 +107,13 @@ def get_file_imports() -> Dict[int, Tuple[str, str, int]]: # IDA uses section names for the library of ELF imports, like ".dynsym". # These are not useful to us, we may need to expand this list over time - # TODO: exhaust this list, see #1419 + # TODO(williballenthin): find all section names used by IDA + # https://github.com/mandiant/capa/issues/1419 if library == ".dynsym": library = "" - def inspect_import(ea, function, ordinal): - if function and function.startswith("__imp_"): - # handle mangled PE imports - function = function[len("__imp_") :] - - if function and "@@" in function: - # handle mangled ELF imports, like "fopen@@glibc_2.2.5" - function, _, _ = function.partition("@@") - - imports[ea] = (library.lower(), function, ordinal) - return True - - idaapi.enum_import_names(idx, inspect_import) + cb = functools.partial(inspect_import, imports, library) + idaapi.enum_import_names(idx, cb) return imports diff --git a/capa/features/extractors/ida/insn.py b/capa/features/extractors/ida/insn.py index 7acae816..e38f0bb3 100644 --- a/capa/features/extractors/ida/insn.py +++ b/capa/features/extractors/ida/insn.py @@ -398,14 +398,16 @@ def extract_insn_peb_access_characteristic_features( if insn.itype not in (idaapi.NN_push, idaapi.NN_mov): return - if all(map(lambda op: op.type != idaapi.o_mem, insn.ops)): + if all(op.type != idaapi.o_mem for op in insn.ops): # try to optimize for only memory references return disasm = idc.GetDisasm(insn.ea) if " fs:30h" in disasm or " gs:60h" in disasm: - # TODO: replace above with proper IDA + # TODO(mike-hunhoff): use proper IDA API for fetching segment access + # scanning the disassembly text is a hack. + # https://github.com/mandiant/capa/issues/1605 yield Characteristic("peb access"), ih.address @@ -419,18 +421,22 @@ def extract_insn_segment_access_features( """ insn: idaapi.insn_t = ih.inner - if all(map(lambda op: op.type != idaapi.o_mem, insn.ops)): + if all(op.type != idaapi.o_mem for op in insn.ops): # try to optimize for only memory references return disasm = idc.GetDisasm(insn.ea) if " fs:" in disasm: - # TODO: replace above with proper IDA + # TODO(mike-hunhoff): use proper IDA API for fetching segment access + # scanning the disassembly text is a hack. + # https://github.com/mandiant/capa/issues/1605 yield Characteristic("fs access"), ih.address if " gs:" in disasm: - # TODO: replace above with proper IDA + # TODO(mike-hunhoff): use proper IDA API for fetching segment access + # scanning the disassembly text is a hack. + # https://github.com/mandiant/capa/issues/1605 yield Characteristic("gs access"), ih.address @@ -501,20 +507,3 @@ INSTRUCTION_HANDLERS = ( extract_function_calls_from, extract_function_indirect_call_characteristic_features, ) - - -def main(): - """ """ - features = [] - for f in capa.features.extractors.ida.helpers.get_functions(skip_thunks=True, skip_libs=True): - for bb in idaapi.FlowChart(f, flags=idaapi.FC_PREDS): - for insn in capa.features.extractors.ida.helpers.get_instructions_in_range(bb.start_ea, bb.end_ea): - features.extend(list(extract_features(f, bb, insn))) - - import pprint - - pprint.pprint(features) - - -if __name__ == "__main__": - main() diff --git a/capa/features/extractors/strings.py b/capa/features/extractors/strings.py index 924b9be9..4896e045 100644 --- a/capa/features/extractors/strings.py +++ b/capa/features/extractors/strings.py @@ -85,20 +85,3 @@ def extract_unicode_strings(buf, n=4): yield String(match.group().decode("utf-16"), match.start()) except UnicodeDecodeError: pass - - -def main(): - import sys - - with open(sys.argv[1], "rb") as f: - b = f.read() - - for s in extract_ascii_strings(b): - print("0x{:x}: {:s}".format(s.offset, s.s)) - - for s in extract_unicode_strings(b): - print("0x{:x}: {:s}".format(s.offset, s.s)) - - -if __name__ == "__main__": - main() diff --git a/capa/features/extractors/viv/basicblock.py b/capa/features/extractors/viv/basicblock.py index 24a753ae..ef13a925 100644 --- a/capa/features/extractors/viv/basicblock.py +++ b/capa/features/extractors/viv/basicblock.py @@ -92,7 +92,6 @@ def is_mov_imm_to_stack(instr: envi.archs.i386.disasm.i386Opcode) -> bool: if not src.isImmed(): return False - # TODO what about 64-bit operands? if not isinstance(dst, envi.archs.i386.disasm.i386SibOper) and not isinstance( dst, envi.archs.i386.disasm.i386RegMemOper ): diff --git a/capa/features/extractors/viv/file.py b/capa/features/extractors/viv/file.py index a28045b3..0deee2c7 100644 --- a/capa/features/extractors/viv/file.py +++ b/capa/features/extractors/viv/file.py @@ -16,7 +16,7 @@ import capa.features.extractors.common import capa.features.extractors.helpers import capa.features.extractors.strings from capa.features.file import Export, Import, Section, FunctionName -from capa.features.common import String, Feature, Characteristic +from capa.features.common import Feature, Characteristic from capa.features.address import Address, FileOffsetAddress, AbsoluteVirtualAddress diff --git a/capa/features/extractors/viv/function.py b/capa/features/extractors/viv/function.py index 87403a65..153065f9 100644 --- a/capa/features/extractors/viv/function.py +++ b/capa/features/extractors/viv/function.py @@ -39,7 +39,7 @@ def extract_function_symtab_names(fh: FunctionHandle) -> Iterator[Tuple[Feature, if "symtab" not in fh.ctx["cache"]: try: fh.ctx["cache"]["symtab"] = SymTab.from_Elf(fh.inner.vw.parsedbin) - except: + except Exception: fh.ctx["cache"]["symtab"] = None symtab = fh.ctx["cache"]["symtab"] diff --git a/capa/features/extractors/viv/indirect_calls.py b/capa/features/extractors/viv/indirect_calls.py index f76c13f8..896da807 100644 --- a/capa/features/extractors/viv/indirect_calls.py +++ b/capa/features/extractors/viv/indirect_calls.py @@ -87,8 +87,8 @@ def find_definition(vw: VivWorkspace, va: int, reg: int) -> Tuple[int, Union[int raises: NotFoundError: when the definition cannot be found. """ - q = collections.deque() # type: Deque[int] - seen = set([]) # type: Set[int] + q: Deque[int] = collections.deque() + seen: Set[int] = set() q.extend(get_previous_instructions(vw, va)) while q: diff --git a/capa/features/extractors/viv/insn.py b/capa/features/extractors/viv/insn.py index d8d6edbe..677bd3b3 100644 --- a/capa/features/extractors/viv/insn.py +++ b/capa/features/extractors/viv/insn.py @@ -19,11 +19,10 @@ import envi.archs.amd64.disasm import capa.features.extractors.helpers import capa.features.extractors.viv.helpers -from capa.features.file import FunctionName from capa.features.insn import API, MAX_STRUCTURE_SIZE, Number, Offset, Mnemonic, OperandNumber, OperandOffset from capa.features.common import MAX_BYTES_FEATURE_SIZE, THUNK_CHAIN_DEPTH_DELTA, Bytes, String, Feature, Characteristic from capa.features.address import Address, AbsoluteVirtualAddress -from capa.features.extractors.elf import Shdr, SymTab +from capa.features.extractors.elf import SymTab from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle from capa.features.extractors.viv.indirect_calls import NotFoundError, resolve_indirect_call @@ -117,7 +116,7 @@ def extract_insn_api_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Iterato # this code everytime the call is made, thus preventing the computational overhead. try: fh.ctx["cache"]["symtab"] = SymTab.from_Elf(f.vw.parsedbin) - except: + except Exception: fh.ctx["cache"]["symtab"] = None symtab = fh.ctx["cache"]["symtab"] @@ -289,16 +288,16 @@ def extract_insn_bytes_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Itera else: continue - for v in derefs(f.vw, v): + for vv in derefs(f.vw, v): try: - buf = read_bytes(f.vw, v) + buf = read_bytes(f.vw, vv) except envi.exc.SegmentationViolation: continue if capa.features.extractors.helpers.all_zeros(buf): continue - if f.vw.isProbablyString(v) or f.vw.isProbablyUnicode(v): + if f.vw.isProbablyString(vv) or f.vw.isProbablyUnicode(vv): # don't extract byte features for obvious strings continue @@ -352,7 +351,6 @@ def is_security_cookie(f, bb, insn) -> bool: if oper.isReg() and oper.reg not in [ envi.archs.i386.regs.REG_ESP, envi.archs.i386.regs.REG_EBP, - # TODO: do x64 support for real. envi.archs.amd64.regs.REG_RBP, envi.archs.amd64.regs.REG_RSP, ]: @@ -423,7 +421,6 @@ def extract_insn_peb_access_characteristic_features(f, bb, ih: InsnHandle) -> It """ parse peb access from the given function. fs:[0x30] on x86, gs:[0x60] on x64 """ - # TODO handle where fs/gs are loaded into a register or onto the stack and used later insn: envi.Opcode = ih.inner if insn.mnem not in ["push", "mov"]: @@ -647,7 +644,6 @@ def extract_op_offset_features( if oper.reg == envi.archs.i386.regs.REG_EBP: return - # TODO: do x64 support for real. if oper.reg == envi.archs.amd64.regs.REG_RBP: return @@ -701,9 +697,9 @@ def extract_op_string_features( else: return - for v in derefs(f.vw, v): + for vv in derefs(f.vw, v): try: - s = read_string(f.vw, v).rstrip("\x00") + s = read_string(f.vw, vv).rstrip("\x00") except ValueError: continue else: diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 360868da..ea935139 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and limitations import zlib import logging from enum import Enum -from typing import Any, List, Tuple, Union +from typing import List, Tuple, Union from pydantic import Field, BaseModel diff --git a/capa/helpers.py b/capa/helpers.py index c6b5e7ac..0e77dcaa 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -9,6 +9,7 @@ import os import inspect import logging import contextlib +import importlib.util from typing import NoReturn from pathlib import Path @@ -40,16 +41,13 @@ def get_file_taste(sample_path: Path) -> bytes: def is_runtime_ida(): - try: - import idc - except ImportError: - return False - else: - return True + return importlib.util.find_spec("idc") is not None def assert_never(value) -> NoReturn: - assert False, f"Unhandled value: {value} ({type(value).__name__})" + # careful: python -O will remove this assertion. + # but this is only used for type checking, so it's ok. + assert False, f"Unhandled value: {value} ({type(value).__name__})" # noqa: B011 def get_format_from_extension(sample: Path) -> str: @@ -97,7 +95,7 @@ def redirecting_print_to_tqdm(disable_progress): with one that is compatible with tqdm. via: https://stackoverflow.com/a/42424890/87207 """ - old_print = print + old_print = print # noqa: T202 [reserved word print used] def new_print(*args, **kwargs): # If tqdm.tqdm.write raises error, use builtin print @@ -106,7 +104,7 @@ def redirecting_print_to_tqdm(disable_progress): else: try: tqdm.tqdm.write(*args, **kwargs) - except: + except Exception: old_print(*args, **kwargs) try: diff --git a/capa/ida/helpers.py b/capa/ida/helpers.py index b8f1b2d2..4f3d2a0b 100644 --- a/capa/ida/helpers.py +++ b/capa/ida/helpers.py @@ -47,7 +47,8 @@ NETNODE_RULES_CACHE_ID = "rules-cache-id" def inform_user_ida_ui(message): - idaapi.info(f"{message}. Please refer to IDA Output window for more information.") + # this isn't a logger, this is IDA's logging facility + idaapi.info(f"{message}. Please refer to IDA Output window for more information.") # noqa: G004 def is_supported_ida_version(): @@ -55,7 +56,7 @@ def is_supported_ida_version(): if version < 7.4 or version >= 9: warning_msg = "This plugin does not support your IDA Pro version" logger.warning(warning_msg) - logger.warning("Your IDA Pro version is: %s. Supported versions are: IDA >= 7.4 and IDA < 9.0." % version) + logger.warning("Your IDA Pro version is: %s. Supported versions are: IDA >= 7.4 and IDA < 9.0.", version) return False return True @@ -160,15 +161,15 @@ def collect_metadata(rules: List[Path]): rules=tuple(r.resolve().absolute().as_posix() for r in rules), base_address=capa.features.freeze.Address.from_capa(idaapi.get_imagebase()), layout=rdoc.Layout( - functions=tuple() + functions=(), # this is updated after capabilities have been collected. # will look like: # # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } ), # ignore these for now - not used by IDA plugin. - feature_counts=rdoc.FeatureCounts(file=0, functions=tuple()), - library_functions=tuple(), + feature_counts=rdoc.FeatureCounts(file=0, functions=()), + library_functions=(), ), ) @@ -213,7 +214,7 @@ def idb_contains_cached_results() -> bool: n = netnode.Netnode(CAPA_NETNODE) return bool(n.get(NETNODE_RESULTS)) except netnode.NetnodeCorruptError as e: - logger.error("%s", e, exc_info=True) + logger.exception(str(e)) return False diff --git a/capa/ida/plugin/cache.py b/capa/ida/plugin/cache.py index 5226df9f..4ef4b907 100644 --- a/capa/ida/plugin/cache.py +++ b/capa/ida/plugin/cache.py @@ -10,7 +10,7 @@ from __future__ import annotations import itertools import collections -from typing import Set, Dict, List, Tuple, Union, Optional +from typing import Set, Dict, Tuple, Union, Optional import capa.engine from capa.rules import Scope, RuleSet @@ -37,13 +37,15 @@ class CapaRuleGenFeatureCacheNode: self.children: Set[CapaRuleGenFeatureCacheNode] = set() def __hash__(self): - # TODO: unique enough? + # TODO(mike-hunhoff): confirm this is unique enough + # https://github.com/mandiant/capa/issues/1604 return hash((self.address,)) def __eq__(self, other): if not isinstance(other, type(self)): return NotImplemented - # TODO: unique enough? + # TODO(mike-hunhoff): confirm this is unique enough + # https://github.com/mandiant/capa/issues/1604 return self.address == other.address diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index d36647a6..92272c12 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -10,6 +10,7 @@ import copy import logging import itertools import collections +from enum import IntFlag from typing import Any, List, Optional from pathlib import Path @@ -58,9 +59,6 @@ CAPA_OFFICIAL_RULESET_URL = f"https://github.com/mandiant/capa-rules/releases/ta CAPA_RULESET_DOC_URL = "https://github.com/mandiant/capa/blob/master/doc/rules.md" -from enum import IntFlag - - class Options(IntFlag): NO_ANALYSIS = 0 # No auto analysis ANALYZE_AUTO = 1 # Runs the analysis when starting the explorer, see details below @@ -588,7 +586,7 @@ class CapaExplorerForm(idaapi.PluginForm): rules_message.setText("You must specify a directory containing capa rules before running analysis.") rules_message.setInformativeText( "Click 'Ok' to specify a local directory of rules or you can download and extract the official " - f"rules from the URL listed in the details." + + "rules from the URL listed in the details." ) rules_message.setDetailedText(f"{CAPA_OFFICIAL_RULESET_URL}") rules_message.setStandardButtons(QtWidgets.QMessageBox.Ok | QtWidgets.QMessageBox.Cancel) @@ -607,7 +605,7 @@ class CapaExplorerForm(idaapi.PluginForm): return False settings.user[CAPA_SETTINGS_RULE_PATH] = str(path) - except UserCancelledError as e: + except UserCancelledError: capa.ida.helpers.inform_user_ida_ui("Analysis requires capa rules") logger.warning( "You must specify a directory containing capa rules before running analysis.%s", @@ -616,7 +614,7 @@ class CapaExplorerForm(idaapi.PluginForm): return False except Exception as e: capa.ida.helpers.inform_user_ida_ui("Failed to load capa rules") - logger.error("Failed to load capa rules (error: %s).", e, exc_info=True) + logger.exception("Failed to load capa rules (error: %s).", e) return False if ida_kernwin.user_cancelled(): @@ -649,9 +647,9 @@ class CapaExplorerForm(idaapi.PluginForm): logger.error("Failed to load capa rules from %s (error: %s).", settings.user[CAPA_SETTINGS_RULE_PATH], e) logger.error( - "Make sure your file directory contains properly " - "formatted capa rules. You can download and extract the official rules from %s. " - "Or, for more details, see the rules documentation here: %s", + "Make sure your file directory contains properly " # noqa: G003 [logging statement uses +] + + "formatted capa rules. You can download and extract the official rules from %s. " + + "Or, for more details, see the rules documentation here: %s", CAPA_OFFICIAL_RULESET_URL, CAPA_RULESET_DOC_URL, ) @@ -717,7 +715,7 @@ class CapaExplorerForm(idaapi.PluginForm): cached_results_time = self.resdoc_cache.meta.timestamp.strftime("%Y-%m-%d %H:%M:%S") new_view_status = f"capa rules: {view_status_rules}, cached results (created {cached_results_time})" except Exception as e: - logger.error("Failed to load cached capa results (error: %s).", e, exc_info=True) + logger.exception("Failed to load cached capa results (error: %s).", e) return False else: # load results from fresh anlaysis @@ -734,7 +732,7 @@ class CapaExplorerForm(idaapi.PluginForm): self.feature_extractor = CapaExplorerFeatureExtractor() self.feature_extractor.indicator.progress.connect(slot_progress_feature_extraction) except Exception as e: - logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True) + logger.exception("Failed to initialize feature extractor (error: %s)", e) return False if ida_kernwin.user_cancelled(): @@ -746,7 +744,7 @@ class CapaExplorerForm(idaapi.PluginForm): try: self.process_total += len(tuple(self.feature_extractor.get_functions())) except Exception as e: - logger.error("Failed to calculate analysis (error: %s).", e, exc_info=True) + logger.exception("Failed to calculate analysis (error: %s).", e) return False if ida_kernwin.user_cancelled(): @@ -782,7 +780,7 @@ class CapaExplorerForm(idaapi.PluginForm): logger.info("User cancelled analysis.") return False except Exception as e: - logger.error("Failed to extract capabilities from database (error: %s)", e, exc_info=True) + logger.exception("Failed to extract capabilities from database (error: %s)", e) return False if ida_kernwin.user_cancelled(): @@ -794,7 +792,8 @@ class CapaExplorerForm(idaapi.PluginForm): try: # support binary files specifically for x86/AMD64 shellcode # warn user binary file is loaded but still allow capa to process it - # TODO: check specific architecture of binary files based on how user configured IDA processors + # TODO(mike-hunhoff): check specific architecture of binary files based on how user configured IDA processors + # https://github.com/mandiant/capa/issues/1603 if idaapi.get_file_type_name() == "Binary file": logger.warning("-" * 80) logger.warning(" Input file appears to be a binary file.") @@ -815,7 +814,7 @@ class CapaExplorerForm(idaapi.PluginForm): if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") except Exception as e: - logger.error("Failed to check for file limitations (error: %s)", e, exc_info=True) + logger.exception("Failed to check for file limitations (error: %s)", e) return False if ida_kernwin.user_cancelled(): @@ -829,7 +828,7 @@ class CapaExplorerForm(idaapi.PluginForm): meta, ruleset, capabilities ) except Exception as e: - logger.error("Failed to collect results (error: %s)", e, exc_info=True) + logger.exception("Failed to collect results (error: %s)", e) return False if ida_kernwin.user_cancelled(): @@ -845,7 +844,7 @@ class CapaExplorerForm(idaapi.PluginForm): capa.ida.helpers.save_rules_cache_id(ruleset_id) logger.info("Saved cached results to database") except Exception as e: - logger.error("Failed to save results to database (error: %s)", e, exc_info=True) + logger.exception("Failed to save results to database (error: %s)", e) return False user_settings = settings.user[CAPA_SETTINGS_RULE_PATH] count_source_rules = self.program_analysis_ruleset_cache.source_rule_count @@ -866,7 +865,7 @@ class CapaExplorerForm(idaapi.PluginForm): self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked()) except Exception as e: - logger.error("Failed to render results (error: %s)", e, exc_info=True) + logger.exception("Failed to render results (error: %s)", e) return False self.set_view_status_label(new_view_status) @@ -918,7 +917,7 @@ class CapaExplorerForm(idaapi.PluginForm): has_cache: bool = capa.ida.helpers.idb_contains_cached_results() except Exception as e: capa.ida.helpers.inform_user_ida_ui("Failed to check for cached results, reanalyzing program") - logger.error("Failed to check for cached results (error: %s)", e, exc_info=True) + logger.exception("Failed to check for cached results (error: %s)", e) return False if ida_kernwin.user_cancelled(): @@ -938,7 +937,7 @@ class CapaExplorerForm(idaapi.PluginForm): ] = capa.ida.helpers.load_and_verify_cached_results() except Exception as e: capa.ida.helpers.inform_user_ida_ui("Failed to verify cached results, reanalyzing program") - logger.error("Failed to verify cached results (error: %s)", e, exc_info=True) + logger.exception("Failed to verify cached results (error: %s)", e) return False if results is None: @@ -951,9 +950,9 @@ class CapaExplorerForm(idaapi.PluginForm): "Reanalyze program", "", ida_kernwin.ASKBTN_YES, - f"This database contains capa results generated on " - f"{results.meta.timestamp.strftime('%Y-%m-%d at %H:%M:%S')}.\n" - f"Load existing data or analyze program again?", + "This database contains capa results generated on " + + results.meta.timestamp.strftime("%Y-%m-%d at %H:%M:%S") + + ".\nLoad existing data or analyze program again?", ) if btn_id == ida_kernwin.ASKBTN_CANCEL: @@ -991,7 +990,7 @@ class CapaExplorerForm(idaapi.PluginForm): self.rulegen_feature_extractor = CapaExplorerFeatureExtractor() self.rulegen_feature_cache = CapaRuleGenFeatureCache(self.rulegen_feature_extractor) except Exception as e: - logger.error("Failed to initialize feature extractor (error: %s)", e, exc_info=True) + logger.exception("Failed to initialize feature extractor (error: %s)", e) return False else: logger.info("Reusing prior rulegen cache") @@ -1008,7 +1007,7 @@ class CapaExplorerForm(idaapi.PluginForm): if f is not None: self.rulegen_current_function = self.rulegen_feature_extractor.get_function(f.start_ea) except Exception as e: - logger.error("Failed to resolve function at address 0x%X (error: %s)", f.start_ea, e, exc_info=True) + logger.exception("Failed to resolve function at address 0x%X (error: %s)", f.start_ea, e) return False if ida_kernwin.user_cancelled(): @@ -1034,7 +1033,7 @@ class CapaExplorerForm(idaapi.PluginForm): for addr, _ in result: all_function_features[capa.features.common.MatchedRule(name)].add(addr) except Exception as e: - logger.error("Failed to generate rule matches (error: %s)", e, exc_info=True) + logger.exception("Failed to generate rule matches (error: %s)", e) return False if ida_kernwin.user_cancelled(): @@ -1055,7 +1054,7 @@ class CapaExplorerForm(idaapi.PluginForm): for addr, _ in result: all_file_features[capa.features.common.MatchedRule(name)].add(addr) except Exception as e: - logger.error("Failed to generate file rule matches (error: %s)", e, exc_info=True) + logger.exception("Failed to generate file rule matches (error: %s)", e) return False if ida_kernwin.user_cancelled(): @@ -1078,7 +1077,7 @@ class CapaExplorerForm(idaapi.PluginForm): f"capa rules: {settings.user[CAPA_SETTINGS_RULE_PATH]} ({settings.user[CAPA_SETTINGS_RULE_PATH]} rules)" ) except Exception as e: - logger.error("Failed to render views (error: %s)", e, exc_info=True) + logger.exception("Failed to render views (error: %s)", e) return False return True @@ -1163,7 +1162,7 @@ class CapaExplorerForm(idaapi.PluginForm): assert self.rulegen_ruleset_cache is not None assert self.rulegen_feature_cache is not None except Exception as e: - logger.error("Failed to access cache (error: %s)", e, exc_info=True) + logger.exception("Failed to access cache (error: %s)", e) self.set_rulegen_status("Error: see console output for more details") return diff --git a/capa/ida/plugin/model.py b/capa/ida/plugin/model.py index b53007ae..547d5349 100644 --- a/capa/ida/plugin/model.py +++ b/capa/ida/plugin/model.py @@ -372,7 +372,8 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): display += f" ({statement.description})" return CapaExplorerDefaultItem(parent, display) elif isinstance(statement, rd.CompoundStatement) and statement.type == rd.CompoundStatementType.NOT: - # TODO: do we display 'not' + # TODO(mike-hunhoff): verify that we can display NOT statements + # https://github.com/mandiant/capa/issues/1602 pass elif isinstance(statement, rd.SomeStatement): display = f"{statement.count} or more" @@ -421,12 +422,13 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): @param doc: result doc """ if not match.success: - # TODO: display failed branches at some point? Help with debugging rules? + # TODO(mike-hunhoff): display failed branches at some point? Help with debugging rules? + # https://github.com/mandiant/capa/issues/1601 return # optional statement with no successful children is empty if isinstance(match.node, rd.StatementNode) and match.node.statement.type == rd.CompoundStatementType.OPTIONAL: - if not any(map(lambda m: m.success, match.children)): + if not any(m.success for m in match.children): return if isinstance(match.node, rd.StatementNode): diff --git a/capa/ida/plugin/view.py b/capa/ida/plugin/view.py index 4dba4ca1..0ccd91d1 100644 --- a/capa/ida/plugin/view.py +++ b/capa/ida/plugin/view.py @@ -6,6 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import re +from typing import Dict, Optional from collections import Counter import idc @@ -63,7 +64,7 @@ def parse_yaml_line(feature): feature, _, comment = feature.partition("#") feature, _, description = feature.partition("=") - return map(lambda o: o.strip(), (feature, description, comment)) + return (o.strip() for o in (feature, description, comment)) def parse_node_for_feature(feature, description, comment, depth): @@ -498,12 +499,13 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): rule_text += "\n features:\n" for o in iterate_tree(self): - feature, description, comment = map(lambda o: o.strip(), tuple(o.text(i) for i in range(3))) + feature, description, comment = (o.strip() for o in tuple(o.text(i) for i in range(3))) rule_text += parse_node_for_feature(feature, description, comment, calc_item_depth(o)) - # FIXME we avoid circular update by disabling signals when updating + # TODO(mike-hunhoff): we avoid circular update by disabling signals when updating # the preview. Preferably we would refactor the code to avoid this - # in the first place + # in the first place. + # https://github.com/mandiant/capa/issues/1600 self.preview.blockSignals(True) self.preview.setPlainText(rule_text) self.preview.blockSignals(False) @@ -646,7 +648,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): counted = list(zip(Counter(features).keys(), Counter(features).values())) # single features - for k, v in filter(lambda t: t[1] == 1, counted): + for k, _ in filter(lambda t: t[1] == 1, counted): if isinstance(k, (capa.features.common.String,)): value = f'"{capa.features.common.escape_string(k.get_value_str())}"' else: @@ -682,10 +684,12 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): # we don't add a new node for description; either set description column of parent's last child # or the parent itself - if parent.childCount(): - parent.child(parent.childCount() - 1).setText(1, feature.lstrip("description:").lstrip()) - else: - parent.setText(1, feature.lstrip("description:").lstrip()) + if feature.startswith("description:"): + description = feature[len("description:") :].lstrip() + if parent.childCount(): + parent.child(parent.childCount() - 1).setText(1, description) + else: + parent.setText(1, description) return None elif feature.startswith("- description:"): if not parent: @@ -693,7 +697,8 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): return None # we don't add a new node for description; set the description column of the parent instead - parent.setText(1, feature.lstrip("- description:").lstrip()) + description = feature[len("- description:") :].lstrip() + parent.setText(1, description) return None node = QtWidgets.QTreeWidgetItem(parent) @@ -1010,7 +1015,7 @@ class CapaExplorerRulegenFeatures(QtWidgets.QTreeWidget): return o - def load_features(self, file_features, func_features={}): + def load_features(self, file_features, func_features: Optional[Dict] = None): """ """ self.parse_features_for_tree(self.new_parent_node(self, ("File Scope",)), file_features) if func_features: diff --git a/capa/main.py b/capa/main.py index f9a5805c..7698ca0c 100644 --- a/capa/main.py +++ b/capa/main.py @@ -33,6 +33,7 @@ from elftools.common.exceptions import ELFError import capa.perf import capa.rules import capa.engine +import capa.helpers import capa.version import capa.render.json import capa.rules.cache @@ -248,8 +249,8 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro all_bb_matches = collections.defaultdict(list) # type: MatchResults all_insn_matches = collections.defaultdict(list) # type: MatchResults - feature_counts = rdoc.FeatureCounts(file=0, functions=tuple()) - library_functions: Tuple[rdoc.LibraryFunction, ...] = tuple() + feature_counts = rdoc.FeatureCounts(file=0, functions=()) + library_functions: Tuple[rdoc.LibraryFunction, ...] = () with redirecting_print_to_tqdm(disable_progress): with tqdm.contrib.logging.logging_redirect_tqdm(): @@ -298,16 +299,15 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro for rule_name, results in itertools.chain( all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items() ): - locations = set(map(lambda p: p[0], results)) + locations = {p[0] for p in results} rule = ruleset[rule_name] capa.engine.index_rule_matches(function_and_lower_features, rule, locations) all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) feature_counts.file = feature_count - matches = { - rule_name: results - for rule_name, results in itertools.chain( + matches = dict( + itertools.chain( # each rule exists in exactly one scope, # so there won't be any overlap among these following MatchResults, # and we can merge the dictionaries naively. @@ -316,7 +316,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro all_function_matches.items(), all_file_matches.items(), ) - } + ) meta = { "feature_counts": feature_counts, @@ -326,10 +326,9 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro return matches, meta -# TODO move all to helpers? -def has_rule_with_namespace(rules, capabilities, rule_cat): +def has_rule_with_namespace(rules: RuleSet, capabilities: MatchResults, namespace: str) -> bool: for rule_name in capabilities.keys(): - if rules.rules[rule_name].meta.get("namespace", "").startswith(rule_cat): + if rules.rules[rule_name].meta.get("namespace", "").startswith(namespace): return True return False @@ -434,13 +433,10 @@ def get_default_root() -> Path: # pylance/mypy don't like `sys._MEIPASS` because this isn't standard. # its injected by pyinstaller. # so we'll fetch this attribute dynamically. - try: - meipass = Path(getattr(sys, "_MEIPASS")) - return meipass - except AttributeError: - pass - # return the root directory of the project when not running from a PyInstaller'd executable - return Path(__file__).resolve().parent.parent + assert hasattr(sys, "_MEIPASS") + return Path(getattr(sys, "_MEIPASS")) + else: + return Path(__file__).resolve().parent.parent def get_default_signatures() -> List[Path]: @@ -479,7 +475,6 @@ def get_workspace(path: Path, format_: str, sigpaths: List[Path]): import viv_utils.flirt logger.debug("generating vivisect workspace for: %s", path) - # TODO should not be auto at this point, anymore if format_ == FORMAT_AUTO: if not is_supported_format(path): raise UnsupportedFormatError() @@ -504,7 +499,6 @@ def get_workspace(path: Path, format_: str, sigpaths: List[Path]): return vw -# TODO get_extractors -> List[FeatureExtractor]? def get_extractor( path: Path, format_: str, @@ -550,7 +544,7 @@ def get_extractor( except ImportError: raise RuntimeError( "Cannot import binaryninja module. Please install the Binary Ninja Python API first: " - "https://docs.binary.ninja/dev/batch.html#install-the-api)." + + "https://docs.binary.ninja/dev/batch.html#install-the-api)." ) import capa.features.extractors.binja.extractor @@ -582,7 +576,7 @@ def get_extractor( return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_) -def get_file_extractors(sample: Path, format_: str) -> List[FeatureExtractor]: +def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: file_extractors: List[FeatureExtractor] = list() if format_ == FORMAT_PE: @@ -767,14 +761,14 @@ def collect_metadata( rules=rules, base_address=frz.Address.from_capa(extractor.get_base_address()), layout=rdoc.Layout( - functions=tuple(), + functions=(), # this is updated after capabilities have been collected. # will look like: # # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } ), - feature_counts=rdoc.FeatureCounts(file=0, functions=tuple()), - library_functions=tuple(), + feature_counts=rdoc.FeatureCounts(file=0, functions=()), + library_functions=(), ), ) @@ -1055,9 +1049,9 @@ def handle_common_args(args): if not sigs_path.exists(): logger.error( - "Using default signature path, but it doesn't exist. " - "Please install the signatures first: " - "https://github.com/mandiant/capa/blob/master/doc/installation.md#method-2-using-capa-as-a-python-library." + "Using default signature path, but it doesn't exist. " # noqa: G003 [logging statement uses +] + + "Please install the signatures first: " + + "https://github.com/mandiant/capa/blob/master/doc/installation.md#method-2-using-capa-as-a-python-library." ) raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed") else: @@ -1151,13 +1145,13 @@ def main(argv=None): rules = rules.filter_rules_by_meta(args.tag) logger.debug("selected %d rules", len(rules)) for i, r in enumerate(rules.rules, 1): - # TODO don't display subscope rules? logger.debug(" %d. %s", i, r) + except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: logger.error("%s", str(e)) logger.error( - "Make sure your file directory contains properly formatted capa rules. You can download the standard " - "collection of capa rules from https://github.com/mandiant/capa-rules/releases." + "Make sure your file directory contains properly formatted capa rules. You can download the standard " # noqa: G003 [logging statement uses +] + + "collection of capa rules from https://github.com/mandiant/capa-rules/releases." ) logger.error( "Please ensure you're using the rules that correspond to your major version of capa (%s)", @@ -1325,17 +1319,8 @@ def ida_main(): print(capa.render.default.render(meta, rules, capabilities)) -def is_runtime_ida(): - try: - import idc - except ImportError: - return False - else: - return True - - if __name__ == "__main__": - if is_runtime_ida(): + if capa.helpers.is_runtime_ida(): ida_main() else: sys.exit(main()) diff --git a/capa/optimizer.py b/capa/optimizer.py index 997abd6c..1c8e529a 100644 --- a/capa/optimizer.py +++ b/capa/optimizer.py @@ -22,7 +22,7 @@ def get_node_cost(node): # substring and regex features require a full scan of each string # which we anticipate is more expensive then a hash lookup feature (e.g. mnemonic or count). # - # TODO: compute the average cost of these feature relative to hash feature + # fun research: compute the average cost of these feature relative to hash feature # and adjust the factor accordingly. return 2 diff --git a/capa/render/default.py b/capa/render/default.py index 15e2a5e8..556c82a4 100644 --- a/capa/render/default.py +++ b/capa/render/default.py @@ -11,7 +11,6 @@ import collections import tabulate import capa.render.utils as rutils -import capa.features.freeze as frz import capa.render.result_document as rd import capa.features.freeze.features as frzf from capa.rules import RuleSet @@ -49,7 +48,7 @@ def find_subrule_matches(doc: rd.ResultDocument): collect the rule names that have been matched as a subrule match. this way we can avoid displaying entries for things that are too specific. """ - matches = set([]) + matches = set() def rec(match: rd.Match): if not match.success: @@ -65,7 +64,7 @@ def find_subrule_matches(doc: rd.ResultDocument): matches.add(match.node.feature.match) for rule in rutils.capability_rules(doc): - for address, match in rule.matches: + for _, match in rule.matches: rec(match) return matches diff --git a/capa/render/proto/__init__.py b/capa/render/proto/__init__.py index cd2251ec..2457b7ec 100644 --- a/capa/render/proto/__init__.py +++ b/capa/render/proto/__init__.py @@ -39,7 +39,7 @@ from capa.features.freeze import AddressType def dict_tuple_to_list_values(d: Dict) -> Dict: - o = dict() + o = {} for k, v in d.items(): if isinstance(v, tuple): o[k] = list(v) diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 944365e6..00c3eb9b 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -302,7 +302,7 @@ class Match(FrozenModel): # pull matches from the referenced rule into our tree here. rule_name = name rule = rules[rule_name] - rule_matches = {address: result for (address, result) in capabilities[rule_name]} + rule_matches = dict(capabilities[rule_name]) if rule.is_subscope_rule(): # for a subscope rule, fixup the node to be a scope node, rather than a match feature node. @@ -347,7 +347,7 @@ class Match(FrozenModel): # we could introduce an intermediate node here. # this would be a breaking change and require updates to the renderers. # in the meantime, the above might be sufficient. - rule_matches = {address: result for (address, result) in capabilities[rule.name]} + rule_matches = dict(capabilities[rule.name]) for location in result.locations: # doc[locations] contains all matches for the given namespace. # for example, the feature might be `match: anti-analysis/packer` diff --git a/capa/render/utils.py b/capa/render/utils.py index 20a817d7..d9cb9308 100644 --- a/capa/render/utils.py +++ b/capa/render/utils.py @@ -37,7 +37,7 @@ def format_parts_id(data: Union[rd.AttackSpec, rd.MBCSpec]): def capability_rules(doc: rd.ResultDocument) -> Iterator[rd.RuleMatches]: """enumerate the rules in (namespace, name) order that are 'capability' rules (not lib/subscope/disposition/etc).""" - for _, _, rule in sorted(map(lambda rule: (rule.meta.namespace or "", rule.meta.name, rule), doc.rules.values())): + for _, _, rule in sorted((rule.meta.namespace or "", rule.meta.name, rule) for rule in doc.rules.values()): if rule.meta.lib: continue if rule.meta.is_subscope_rule: diff --git a/capa/render/verbose.py b/capa/render/verbose.py index 536e7242..227c5fc7 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -96,8 +96,7 @@ def render_meta(ostream, doc: rd.ResultDocument): ("library function count", len(doc.meta.analysis.library_functions)), ( "total feature count", - doc.meta.analysis.feature_counts.file - + sum(map(lambda f: f.count, doc.meta.analysis.feature_counts.functions)), + doc.meta.analysis.feature_counts.file + sum(f.count for f in doc.meta.analysis.feature_counts.functions), ), ] @@ -141,7 +140,7 @@ def render_rules(ostream, doc: rd.ResultDocument): rows.append((key, v)) if rule.meta.scope != capa.rules.FILE_SCOPE: - locations = list(map(lambda m: m[0], doc.rules[rule.meta.name].matches)) + locations = [m[0] for m in doc.rules[rule.meta.name].matches] rows.append(("matches", "\n".join(map(format_address, locations)))) ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index 0c782853..ba90f76a 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -29,7 +29,7 @@ def render_locations(ostream, locations: Iterable[frz.Address]): # its possible to have an empty locations array here, # such as when we're in MODE_FAILURE and showing the logic # under a `not` statement (which will have no matched locations). - locations = list(sorted(locations)) + locations = sorted(locations) if len(locations) == 0: return @@ -222,7 +222,7 @@ def render_match(ostream, match: rd.Match, indent=0, mode=MODE_SUCCESS): # optional statement with no successful children is empty if isinstance(match.node, rd.StatementNode) and match.node.statement.type == rd.CompoundStatementType.OPTIONAL: - if not any(map(lambda m: m.success, match.children)): + if not any(m.success for m in match.children): return # not statement, so invert the child mode to show failed evaluations @@ -236,7 +236,7 @@ def render_match(ostream, match: rd.Match, indent=0, mode=MODE_SUCCESS): # optional statement with successful children is not relevant if isinstance(match.node, rd.StatementNode) and match.node.statement.type == rd.CompoundStatementType.OPTIONAL: - if any(map(lambda m: m.success, match.children)): + if any(m.success for m in match.children): return # not statement, so invert the child mode to show successful evaluations @@ -277,7 +277,7 @@ def render_rules(ostream, doc: rd.ResultDocument): had_match = False - for _, _, rule in sorted(map(lambda rule: (rule.meta.namespace or "", rule.meta.name, rule), doc.rules.values())): + for _, _, rule in sorted((rule.meta.namespace or "", rule.meta.name, rule) for rule in doc.rules.values()): # default scope hides things like lib rules, malware-category rules, etc. # but in vverbose mode, we really want to show everything. # diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 64fd7e37..4d0f9ed8 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -510,7 +510,9 @@ def build_statements(d, scope: str): # arg is string (which doesn't support inline descriptions), like: # # count(string(error)) - # TODO: what about embedded newlines? + # + # known problem that embedded newlines may not work here? + # this may become a problem (or not), so address it when encountered. feature = Feature(arg) else: feature = Feature() @@ -634,7 +636,7 @@ class Rule: Returns: List[str]: names of rules upon which this rule depends. """ - deps: Set[str] = set([]) + deps: Set[str] = set() def rec(statement): if isinstance(statement, capa.features.common.MatchedRule): @@ -648,7 +650,7 @@ class Rule: # but, namespaces tend to use `-` while rule names use ` `. so, unlikely, but possible. if statement.value in namespaces: # matches a namespace, so take precedence and don't even check rule names. - deps.update(map(lambda r: r.name, namespaces[statement.value])) + deps.update(r.name for r in namespaces[statement.value]) else: # not a namespace, assume its a rule name. assert isinstance(statement.value, str) @@ -778,7 +780,7 @@ class Rule: # on Windows, get WHLs from pyyaml.org/pypi logger.debug("using libyaml CLoader.") return yaml.CLoader - except: + except Exception: logger.debug("unable to import libyaml CLoader, falling back to Python yaml parser.") logger.debug("this will be slower to load rules.") return yaml.Loader @@ -950,7 +952,7 @@ def get_rules_with_scope(rules, scope) -> List[Rule]: from the given collection of rules, select those with the given scope. `scope` is one of the capa.rules.*_SCOPE constants. """ - return list(rule for rule in rules if rule.scope == scope) + return [rule for rule in rules if rule.scope == scope] def get_rules_and_dependencies(rules: List[Rule], rule_name: str) -> Iterator[Rule]: @@ -961,7 +963,7 @@ def get_rules_and_dependencies(rules: List[Rule], rule_name: str) -> Iterator[Ru rules = list(rules) namespaces = index_rules_by_namespace(rules) rules_by_name = {rule.name: rule for rule in rules} - wanted = set([rule_name]) + wanted = {rule_name} def rec(rule): wanted.add(rule.name) @@ -976,7 +978,7 @@ def get_rules_and_dependencies(rules: List[Rule], rule_name: str) -> Iterator[Ru def ensure_rules_are_unique(rules: List[Rule]) -> None: - seen = set([]) + seen = set() for rule in rules: if rule.name in seen: raise InvalidRule("duplicate rule name: " + rule.name) @@ -1041,7 +1043,7 @@ def topologically_order_rules(rules: List[Rule]) -> List[Rule]: rules = list(rules) namespaces = index_rules_by_namespace(rules) rules_by_name = {rule.name: rule for rule in rules} - seen = set([]) + seen = set() ret = [] def rec(rule): @@ -1190,7 +1192,6 @@ class RuleSet: # so thats not helpful to decide how to downselect. # # and, a global rule will never be the sole selector in a rule. - # TODO: probably want a lint for this. pass else: # easy feature: hash lookup @@ -1247,7 +1248,7 @@ class RuleSet: # the set of subtypes of type A is unbounded, # because any user might come along and create a new subtype B, # so mypy can't reason about this set of types. - assert False, f"Unhandled value: {node} ({type(node).__name__})" + assert_never(node) else: # programming error assert_never(node) @@ -1284,7 +1285,7 @@ class RuleSet: don't include auto-generated "subscope" rules. we want to include general "lib" rules here - even if they are not dependencies of other rules, see #398 """ - scope_rules: Set[Rule] = set([]) + scope_rules: Set[Rule] = set() # we need to process all rules, not just rules with the given scope. # this is because rules with a higher scope, e.g. file scope, may have subscope rules @@ -1329,7 +1330,7 @@ class RuleSet: TODO support -t=metafield """ rules = list(self.rules.values()) - rules_filtered = set([]) + rules_filtered = set() for rule in rules: for k, v in rule.meta.items(): if isinstance(v, str) and tag in v: diff --git a/capa/rules/cache.py b/capa/rules/cache.py index 628ddfe6..1269712b 100644 --- a/capa/rules/cache.py +++ b/capa/rules/cache.py @@ -29,7 +29,7 @@ def compute_cache_identifier(rule_content: List[bytes]) -> CacheIdentifier: hash.update(version.encode("utf-8")) hash.update(b"\x00") - rule_hashes = list(sorted([hashlib.sha256(buf).hexdigest() for buf in rule_content])) + rule_hashes = sorted([hashlib.sha256(buf).hexdigest() for buf in rule_content]) for rule_hash in rule_hashes: hash.update(rule_hash.encode("ascii")) hash.update(b"\x00") diff --git a/doc/installation.md b/doc/installation.md index 1edf8aa0..65258e45 100644 --- a/doc/installation.md +++ b/doc/installation.md @@ -93,28 +93,43 @@ For more details about creating and using virtual environments, check out the [v We use the following tools to ensure consistent code style and formatting: - [black](https://github.com/psf/black) code formatter - - [isort 5](https://pypi.org/project/isort/) code formatter - - [dos2unix](https://linux.die.net/man/1/dos2unix) for UNIX-style LF newlines + - [isort](https://pypi.org/project/isort/) code formatter + - [ruff](https://beta.ruff.rs/docs/) code linter + - [flake8](https://flake8.pycqa.org/en/latest/) code linter + - [mypy](https://mypy-lang.org/) type checking - [capafmt](https://github.com/mandiant/capa/blob/master/scripts/capafmt.py) rule formatter To install these development dependencies, run: `$ pip install -e /local/path/to/src[dev]` -To check the code style, formatting and run the tests you can run the script `scripts/ci.sh`. -You can run it with the argument `no_tests` to skip the tests and only run the code style and formatting: `scripts/ci.sh no_tests` +We use [pre-commit](https://pre-commit.com/) so that its trivial to run the same linters & configuration locally as in CI. -##### Setup hooks [optional] +Run all linters liks: -If you plan to contribute to capa, you may want to setup the provided hooks. -Run `scripts/setup-hooks.sh` to set the following hooks up: -- The `pre-commit` hook runs checks before every `git commit`. - It runs `scripts/ci.sh no_tests` aborting the commit if there are code style or rule linter offenses you need to fix. -- The `pre-push` hook runs checks before every `git push`. - It runs `scripts/ci.sh` aborting the push if there are code style or rule linter offenses or if the tests fail. - This way you can ensure everything is alright before sending a pull request. + ❯ pre-commit run --all-files + isort....................................................................Passed + black....................................................................Passed + ruff.....................................................................Passed + flake8...................................................................Passed + mypy.....................................................................Passed -You can skip the checks by using the `-n`/`--no-verify` git option. +Or run a single linter like: + + ❯ pre-commit run --all-files isort + isort....................................................................Passed + + +Importantly, you can configure pre-commit to run automatically before every commit by running: + + ❯ pre-commit install --hook-type pre-commit + pre-commit installed at .git/hooks/pre-commit + + ❯ pre-commit install --hook-type pre-push + pre-commit installed at .git/hooks/pre-push + +This way you can ensure that you don't commit code style or formatting offenses. +You can always temporarily skip the checks by using the `-n`/`--no-verify` git option. ### 3. Compile binary using PyInstaller We compile capa standalone binaries using PyInstaller. To reproduce the build process check out the source code as described above and follow the following steps. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..8353e161 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,109 @@ +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[project] +name = "flare-capa" +authors = [ + {name = "Willi Ballenthin", email = "william.ballenthin@mandiant.com"}, + {name = "Moritz Raabe", email = "moritz.raabe@mandiant.com"}, + {name = "Mike Hunhoff", email = "michael.hunhoff@mandiant.com"}, +] +description = "The FLARE team's open-source tool to identify capabilities in executable files." +license = {file = "LICENSE.txt"} +requires-python = ">=3.8" +keywords = ["malware analysis", "reverse engineering", "capability detection", "software behaviors", "capa", "FLARE"] +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Intended Audience :: Information Technology", + "License :: OSI Approved :: Apache Software License", + "Natural Language :: English", + "Programming Language :: Python :: 3", + "Topic :: Security", +] +dependencies = [ + "tqdm==4.65.0", + "pyyaml==6.0", + "tabulate==0.9.0", + "colorama==0.4.6", + "termcolor==2.3.0", + "wcwidth==0.2.6", + "ida-settings==2.1.0", + "viv-utils[flirt]==0.7.9", + "halo==0.0.31", + "networkx==3.1", + "ruamel.yaml==0.17.32", + "vivisect==1.1.1", + "pefile==2023.2.7", + "pyelftools==0.29", + "dnfile==0.13.0", + "dncil==1.0.2", + "pydantic==1.10.9", + "protobuf==4.23.2", +] +dynamic = ["version", "readme"] + +[tool.setuptools.dynamic] +version = {attr = "capa.version.__version__"} +readme = {file = "README.md"} + +[tool.setuptools] +packages = ["capa"] + +[project.optional-dependencies] +dev = [ + "pre-commit==3.3.3", + "pytest==7.4.0", + "pytest-sugar==0.9.7", + "pytest-instafail==0.5.0", + "pytest-cov==4.1.0", + "flake8==6.0.0", + "flake8-bugbear==23.6.5", + "flake8-encodings==0.5.0.post1", + "flake8-comprehensions==3.14.0", + "flake8-logging-format==0.9.0", + "flake8-no-implicit-concat==0.3.4", + "flake8-print==5.0.0", + "flake8-todos==0.3.0", + "ruff==0.0.275", + "black==23.3.0", + "isort==5.11.4", + "mypy==1.4.1", + "psutil==5.9.2", + "stix2==3.0.1", + "requests==2.31.0", + "mypy-protobuf==3.4.0", + # type stubs for mypy + "types-backports==0.1.3", + "types-colorama==0.4.15.11", + "types-PyYAML==6.0.8", + "types-tabulate==0.9.0.1", + "types-termcolor==1.1.4", + "types-psutil==5.8.23", + "types_requests==2.31.0.1", + "types-protobuf==4.23.0.1", +] +build = [ + "pyinstaller==5.10.1", + "setuptools==68.0.0", + "build==0.10.0" +] + +[project.urls] +Homepage = "https://github.com/mandiant/capa" +Repository = "https://github.com/mandiant/capa.git" +Documentation = "https://github.com/mandiant/capa/tree/master/doc" +Rules = "https://github.com/mandiant/capa-rules" +"Rules Documentation" = "https://github.com/mandiant/capa-rules/tree/master/doc" + +[project.scripts] +capa = "capa.main:main" diff --git a/rules b/rules index a2989e6b..ec223d1a 160000 --- a/rules +++ b/rules @@ -1 +1 @@ -Subproject commit a2989e6ba5e145617d2aa3a23d365bff6f752284 +Subproject commit ec223d1a1468f1d18887191ddb2e28e0a4a8e8d2 diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 29e83bf0..dc75ac76 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -172,11 +172,13 @@ def main(argv=None): for file in Path(args.input).rglob("*"): samples.append(file) - def pmap(f, args, parallelism=multiprocessing.cpu_count()): + cpu_count = multiprocessing.cpu_count() + + def pmap(f, args, parallelism=cpu_count): """apply the given function f to the given args using subprocesses""" return multiprocessing.Pool(parallelism).imap(f, args) - def tmap(f, args, parallelism=multiprocessing.cpu_count()): + def tmap(f, args, parallelism=cpu_count): """apply the given function f to the given args using threads""" return multiprocessing.pool.ThreadPool(parallelism).imap(f, args) diff --git a/scripts/cache-ruleset.py b/scripts/cache-ruleset.py index 962cdd06..7cf2f9f8 100644 --- a/scripts/cache-ruleset.py +++ b/scripts/cache-ruleset.py @@ -1,6 +1,6 @@ """ Create a cache of the given rules. -This is only really intended to be used by CI to pre-cache rulesets +This is only really intended to be used by CI to pre-cache rulesets that will be distributed within PyInstaller binaries. Usage: @@ -17,7 +17,6 @@ See the License for the specific language governing permissions and limitations """ import os import sys -import time import logging import argparse from pathlib import Path diff --git a/scripts/capa2yara.py b/scripts/capa2yara.py index 15c0e190..dad59baf 100644 --- a/scripts/capa2yara.py +++ b/scripts/capa2yara.py @@ -55,7 +55,7 @@ var_names = ["".join(letters) for letters in itertools.product(string.ascii_lowe # this have to be the internal names used by capa.py which are sometimes different to the ones written out in the rules, e.g. "2 or more" is "Some", count is Range unsupported = ["characteristic", "mnemonic", "offset", "subscope", "Range"] -# TODO shorten this list, possible stuff: +# further idea: shorten this list, possible stuff: # - 2 or more strings: e.g. # -- https://github.com/mandiant/capa-rules/blob/master/collection/file-managers/gather-direct-ftp-information.yml # -- https://github.com/mandiant/capa-rules/blob/master/collection/browser/gather-firefox-profile-information.yml @@ -95,7 +95,7 @@ private rule capa_pe_file : CAPA { def check_feature(statement, rulename): if statement in unsupported: - logger.info("unsupported: " + statement + " in rule: " + rulename) + logger.info("unsupported: %s in rule: %s", statement, rulename) return True else: return False @@ -113,7 +113,7 @@ def convert_capa_number_to_yara_bytes(number): sys.exit() number = re.sub(r"^0[xX]", "", number) - logger.info("number ok: " + repr(number)) + logger.info("number ok: %r", number) # include spaces every 2 hex bytesv = re.sub(r"(..)", r"\1 ", number) @@ -143,9 +143,9 @@ def convert_description(statement): desc = statement.description if desc: yara_desc = " // " + desc - logger.info("using desc: " + repr(yara_desc)) + logger.info("using desc: %r", yara_desc) return yara_desc - except: + except Exception: # no description pass @@ -154,7 +154,7 @@ def convert_description(statement): def convert_rule(rule, rulename, cround, depth): depth += 1 - logger.info("recursion depth: " + str(depth)) + logger.info("recursion depth: %d", depth) global var_names @@ -165,7 +165,7 @@ def convert_rule(rule, rulename, cround, depth): return "BREAK", s_type elif s_type == "string": string = kid.value - logger.info("doing string: " + repr(string)) + logger.info("doing string: %r", string) string = string.replace("\\", "\\\\") string = string.replace("\n", "\\n") string = string.replace("\t", "\\t") @@ -173,14 +173,16 @@ def convert_rule(rule, rulename, cround, depth): yara_strings += "\t$" + var_name + ' = "' + string + '" ascii wide' + convert_description(kid) + "\n" yara_condition += "\t$" + var_name + " " elif s_type == "api" or s_type == "import": - # TODO: is it possible in YARA to make a difference between api & import? + # research needed to decide if its possible in YARA to make a difference between api & import? # https://github.com/mandiant/capa-rules/blob/master/doc/format.md#api api = kid.value - logger.info("doing api: " + repr(api)) + logger.info("doing api: %r", api) # e.g. kernel32.CreateNamedPipe => look for kernel32.dll and CreateNamedPipe - # TODO: improve .NET API call handling + # + # note: the handling of .NET API calls could be improved here. + # once we have a motivation and some examples, lets do that. if "::" in api: mod, api = api.split("::") @@ -205,20 +207,20 @@ def convert_rule(rule, rulename, cround, depth): var_name = "api_" + var_names.pop(0) # limit regex with word boundary \b but also search for appended A and W - # TODO: better use something like /(\\x00|\\x01|\\x02|\\x03|\\x04)' + api + '(A|W)?\\x00/ ??? + # alternatively: use something like /(\\x00|\\x01|\\x02|\\x03|\\x04)' + api + '(A|W)?\\x00/ ??? yara_strings += "\t$" + var_name + " = /\\b" + api + "(A|W)?\\b/ ascii wide\n" yara_condition += "\t$" + var_name + " " elif s_type == "export": export = kid.value - logger.info("doing export: " + repr(export)) + logger.info("doing export: %r", export) yara_condition += '\tpe.exports("' + export + '") ' elif s_type == "section": # https://github.com/mandiant/capa-rules/blob/master/doc/format.md#section section = kid.value - logger.info("doing section: " + repr(section)) + logger.info("doing section: %r", section) # e.g. - section: .rsrc var_name_sec = var_names.pop(0) @@ -230,14 +232,14 @@ def convert_rule(rule, rulename, cround, depth): elif s_type == "match": # https://github.com/mandiant/capa-rules/blob/master/doc/format.md#matching-prior-rule-matches-and-namespaces match = kid.value - logger.info("doing match: " + repr(match)) + logger.info("doing match: %r", match) # e.g. - match: create process # - match: host-interaction/file-system/write match_rule_name = convert_rule_name(match) if match.startswith(rulename + "/"): - logger.info("Depending on myself = basic block: " + match) + logger.info("Depending on myself = basic block: %s", match) return "BREAK", "Depending on myself = basic block" if match_rule_name in converted_rules: @@ -245,14 +247,14 @@ def convert_rule(rule, rulename, cround, depth): else: # don't complain in the early rounds as there should be 3+ rounds (if all rules are converted) if cround > min_rounds - 2: - logger.info("needed sub-rule not converted (yet, maybe in next round): " + repr(match)) + logger.info("needed sub-rule not converted (yet, maybe in next round): %r", match) return "BREAK", "needed sub-rule not converted" else: return "BREAK", "NOLOG" elif s_type == "bytes": bytesv = kid.get_value_str() - logger.info("doing bytes: " + repr(bytesv)) + logger.info("doing bytes: %r", bytesv) var_name = var_names.pop(0) yara_strings += "\t$" + var_name + " = { " + bytesv + " }" + convert_description(kid) + "\n" @@ -260,19 +262,19 @@ def convert_rule(rule, rulename, cround, depth): elif s_type == "number": number = kid.get_value_str() - logger.info("doing number: " + repr(number)) + logger.info("doing number: %r", number) if len(number) < 10: - logger.info("too short for byte search (until I figure out how to do it properly)" + repr(number)) + logger.info("too short for byte search (until I figure out how to do it properly): %r", number) return "BREAK", "Number too short" # there's just one rule which contains 0xFFFFFFF but yara gives a warning if if used if number == "0xFFFFFFFF": return "BREAK", "slow byte pattern for YARA search" - logger.info("number ok: " + repr(number)) + logger.info("number ok: %r", number) number = convert_capa_number_to_yara_bytes(number) - logger.info("number ok: " + repr(number)) + logger.info("number ok: %r", number) var_name = "num_" + var_names.pop(0) yara_strings += "\t$" + var_name + " = { " + number + "}" + convert_description(kid) + "\n" @@ -280,7 +282,7 @@ def convert_rule(rule, rulename, cround, depth): elif s_type == "regex": regex = kid.get_value_str() - logger.info("doing regex: " + repr(regex)) + logger.info("doing regex: %r", regex) # change capas /xxx/i to yaras /xxx/ nocase, count will be used later to decide appending 'nocase' regex, count = re.subn(r"/i$", "/", regex) @@ -316,7 +318,7 @@ def convert_rule(rule, rulename, cround, depth): elif s_type == "Not" or s_type == "And" or s_type == "Or": pass else: - logger.info("something unhandled: " + repr(s_type)) + logger.info("something unhandled: %r", s_type) sys.exit() return yara_strings, yara_condition @@ -330,7 +332,7 @@ def convert_rule(rule, rulename, cround, depth): statement = rule.name - logger.info("doing statement: " + statement) + logger.info("doing statement: %s", statement) if check_feature(statement, rulename): return "BREAK", statement, rule_comment, incomplete @@ -338,18 +340,18 @@ def convert_rule(rule, rulename, cround, depth): if statement == "And" or statement == "Or": desc = convert_description(rule) if desc: - logger.info("description of bool statement: " + repr(desc)) + logger.info("description of bool statement: %r", desc) yara_strings_list.append("\t" * depth + desc + "\n") elif statement == "Not": - logger.info("one of those seldom nots: " + rule.name) + logger.info("one of those seldom nots: %s", rule.name) # check for nested statements try: kids = rule.children num_kids = len(kids) - logger.info("kids: " + kids) - except: - logger.info("no kids in rule: " + rule.name) + logger.info("kids: %s", kids) + except Exception: + logger.info("no kids in rule: %s", rule.name) try: # maybe it's "Not" = only one child: @@ -357,31 +359,31 @@ def convert_rule(rule, rulename, cround, depth): kids = [kid] num_kids = 1 logger.info("kid: %s", kids) - except: + except Exception: logger.info("no kid in rule: %s", rule.name) # just a single statement without 'and' or 'or' before it in this rule if "kids" not in locals().keys(): - logger.info("no kids: " + rule.name) + logger.info("no kids: %s", rule.name) yara_strings_sub, yara_condition_sub = do_statement(statement, rule) if yara_strings_sub == "BREAK": - logger.info("Unknown feature at1: " + rule.name) + logger.info("Unknown feature at1: %s", rule.name) return "BREAK", yara_condition_sub, rule_comment, incomplete yara_strings_list.append(yara_strings_sub) yara_condition_list.append(yara_condition_sub) else: x = 0 - logger.info("doing kids: %r - len: %s", kids, num_kids) + logger.info("doing kids: %r - len: %d", kids, num_kids) for kid in kids: s_type = kid.name - logger.info("doing type: " + s_type + " kidnum: " + str(x)) + logger.info("doing type: %s kidnum: %d", s_type, x) if s_type == "Some": cmin = kid.count - logger.info("Some type with minimum: " + str(cmin)) + logger.info("Some type with minimum: %d", cmin) if not cmin: logger.info("this is optional: which means, we can just ignore it") @@ -396,8 +398,8 @@ def convert_rule(rule, rulename, cround, depth): return "BREAK", "Some aka x or more (TODO)", rule_comment, incomplete if s_type == "And" or s_type == "Or" or s_type == "Not" and not kid.name == "Some": - logger.info("doing bool with recursion: " + repr(kid)) - logger.info("kid coming: " + repr(kid.name)) + logger.info("doing bool with recursion: %r", kid) + logger.info("kid coming: %r", kid.name) # logger.info("grandchildren: " + repr(kid.children)) # @@ -407,22 +409,24 @@ def convert_rule(rule, rulename, cround, depth): kid, rulename, cround, depth ) - logger.info("coming out of this recursion, depth: " + repr(depth) + " s_type: " + s_type) + logger.info("coming out of this recursion, depth: %d s_type: %s", depth, s_type) if yara_strings_sub == "BREAK": logger.info( - "Unknown feature at2: " + rule.name + " - s_type: " + s_type + " - depth: " + str(depth) + "Unknown feature at2: %s - s_type: %s - depth: %d", + rule.name, + s_type, + depth, ) # luckily this is only a killer, if we're inside an 'And', inside 'Or' we're just missing some coverage # only accept incomplete rules in rounds > 3 because the reason might be a reference to another rule not converted yet because of missing dependencies - logger.info("rule.name, depth, cround: " + rule.name + ", " + str(depth) + ", " + str(cround)) + logger.info("rule.name, depth, cround: %s, %d, %d", rule.name, depth, cround) if rule.name == "Or" and depth == 1 and cround > min_rounds - 1: logger.info( - "Unknown feature, just ignore this branch and keep the rest bec we're in Or (1): " - + s_type - + " - depth: " - + str(depth) + "Unknown feature, just ignore this branch and keep the rest bec we're in Or (1): %s - depth: %s", + s_type, + depth, ) # remove last 'or' # yara_condition = re.sub(r'\sor $', ' ', yara_condition) @@ -443,14 +447,13 @@ def convert_rule(rule, rulename, cround, depth): yara_strings_sub, yara_condition_sub = do_statement(s_type, kid) if yara_strings_sub == "BREAK": - logger.info("Unknown feature at3: " + rule.name) - logger.info("rule.name, depth, cround: " + rule.name + ", " + str(depth) + ", " + str(cround)) + logger.info("Unknown feature at3: %s", rule.name) + logger.info("rule.name, depth, cround: %s, %d, %d", rule.name, depth, cround) if rule.name == "Or" and depth == 1 and cround > min_rounds - 1: logger.info( - "Unknown feature, just ignore this branch and keep the rest bec we're in Or (2): " - + s_type - + " - depth: " - + str(depth) + "Unknown feature, just ignore this branch and keep the rest bec we're in Or (2): %s - depth: %d", + s_type, + depth, ) rule_comment += "This rule is incomplete because a branch inside an Or-statement had an unsupported feature and was skipped" @@ -488,7 +491,7 @@ def convert_rule(rule, rulename, cround, depth): elif statement == "Some": cmin = rule.count - logger.info("Some type with minimum at2: " + str(cmin)) + logger.info("Some type with minimum at2: %d", cmin) if not cmin: logger.info("this is optional: which means, we can just ignore it") @@ -501,7 +504,7 @@ def convert_rule(rule, rulename, cround, depth): yara_condition = "not " + "".join(yara_condition_list) + " " else: if len(yara_condition_list) != 1: - logger.info("something wrong around here" + repr(yara_condition_list) + " - " + statement) + logger.info("something wrong around here %r - %s", yara_condition_list, statement) sys.exit() # strings might be empty with only conditions @@ -510,8 +513,10 @@ def convert_rule(rule, rulename, cround, depth): yara_condition = "\n\t" + yara_condition_list[0] - logger.info(f"# end of convert_rule() #strings: {len(yara_strings_list)} #conditions: {len(yara_condition_list)}") - logger.info(f"strings: {yara_strings} conditions: {yara_condition}") + logger.info( + "# end of convert_rule() #strings: %d #conditions: %d", len(yara_strings_list), len(yara_condition_list) + ) + logger.info("strings: %s conditions: %s", yara_strings, yara_condition) return yara_strings, yara_condition, rule_comment, incomplete @@ -523,7 +528,7 @@ def output_yar(yara): def output_unsupported_capa_rules(yaml, capa_rulename, url, reason): if reason != "NOLOG": if capa_rulename not in unsupported_capa_rules_list: - logger.info("unsupported: " + capa_rulename + " - reason: " + reason + " - url: " + url) + logger.info("unsupported: %s - reason: %s, - url: %s", capa_rulename, reason, url) unsupported_capa_rules_list.append(capa_rulename) unsupported_capa_rules.write(yaml.encode("utf-8") + b"\n") @@ -547,32 +552,32 @@ def convert_rules(rules, namespaces, cround, make_priv): rule_name = convert_rule_name(rule.name) if rule.is_subscope_rule(): - logger.info("skipping sub scope rule capa: " + rule.name) + logger.info("skipping sub scope rule capa: %s", rule.name) continue if rule_name in converted_rules: - logger.info("skipping already converted rule capa: " + rule.name + " - yara rule: " + rule_name) + logger.info("skipping already converted rule capa: %s - yara rule: %s", rule.name, rule_name) continue - logger.info("-------------------------- DOING RULE CAPA: " + rule.name + " - yara rule: " + rule_name) + logger.info("-------------------------- DOING RULE CAPA: %s - yara rule: ", rule.name, rule_name) if "capa/path" in rule.meta: url = get_rule_url(rule.meta["capa/path"]) else: url = "no url" - logger.info("URL: " + url) - logger.info("statements: " + repr(rule.statement)) + logger.info("URL: %s", url) + logger.info("statements: %r", rule.statement) # don't really know what that passed empty string is good for :) dependencies = rule.get_dependencies(namespaces) if len(dependencies): - logger.info("Dependencies at4: " + rule.name + " - dep: " + str(dependencies)) + logger.info("Dependencies at4: %s - dep: %s", rule.name, dependencies) for dep in dependencies: - logger.info("Dependencies at44: " + dep) + logger.info("Dependencies at44: %s", dep) if not dep.startswith(rule.name + "/"): - logger.info("Depending on another rule: " + dep) + logger.info("Depending on another rule: %s", dep) continue yara_strings, yara_condition, rule_comment, incomplete = convert_rule(rule.statement, rule.name, cround, 0) @@ -581,7 +586,7 @@ def convert_rules(rules, namespaces, cround, make_priv): # only give up if in final extra round #9000 if cround == 9000: output_unsupported_capa_rules(rule.to_yaml(), rule.name, url, yara_condition) - logger.info("Unknown feature at5: " + rule.name) + logger.info("Unknown feature at5: %s", rule.name) else: yara_meta = "" metas = rule.meta @@ -597,24 +602,24 @@ def convert_rules(rules, namespaces, cround, make_priv): if meta_name == "att&ck": meta_name = "attack" for attack in list(metas[meta]): - logger.info("attack:" + attack) + logger.info("attack: %s", attack) # cut out tag in square brackets, e.g. Defense Evasion::Obfuscated Files or Information [T1027] => T1027 r = re.search(r"\[(T[^\]]*)", attack) if r: tag = r.group(1) - logger.info("attack tag:" + tag) + logger.info("attack tag: %s", tag) tag = re.sub(r"\W", "_", tag) rule_tags += tag + " " # also add a line "attack = ..." to yaras 'meta:' to keep the long description: yara_meta += '\tattack = "' + attack + '"\n' elif meta_name == "mbc": for mbc in list(metas[meta]): - logger.info("mbc:" + mbc) + logger.info("mbc: %s", mbc) # cut out tag in square brackets, e.g. Cryptography::Encrypt Data::RC6 [C0027.010] => C0027.010 r = re.search(r"\[(.[^\]]*)", mbc) if r: tag = r.group(1) - logger.info("mbc tag:" + tag) + logger.info("mbc tag: %s", tag) tag = re.sub(r"\W", "_", tag) rule_tags += tag + " " @@ -677,8 +682,6 @@ def convert_rules(rules, namespaces, cround, make_priv): yara += " condition:" + condition_header + yara_condition + "\n}" - # TODO: now the rule is finished and could be automatically checked with the capa-testfile(s) named in meta - # (doing it for all of them using yara-ci upload at the moment) output_yar(yara) converted_rules.append(rule_name) count_incomplete += incomplete @@ -714,10 +717,10 @@ def main(argv=None): try: rules = capa.main.get_rules([Path(args.rules)]) namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values())) - logger.info("successfully loaded %s rules (including subscope rules which will be ignored)", len(rules)) + logger.info("successfully loaded %d rules (including subscope rules which will be ignored)", len(rules)) if args.tag: rules = rules.filter_rules_by_meta(args.tag) - logger.debug("selected %s rules", len(rules)) + logger.debug("selected %d rules", len(rules)) for i, r in enumerate(rules.rules, 1): logger.debug(" %d. %s", i, r) except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: @@ -749,7 +752,7 @@ def main(argv=None): count_incomplete = 0 while num_rules != len(converted_rules) or cround < min_rounds: cround += 1 - logger.info("doing convert_rules(), round: " + str(cround)) + logger.info("doing convert_rules(), round: %d", cround) num_rules = len(converted_rules) count_incomplete += convert_rules(rules, namespaces, cround, make_priv) @@ -759,7 +762,7 @@ def main(argv=None): stats = "\n// converted rules : " + str(len(converted_rules)) stats += "\n// among those are incomplete : " + str(count_incomplete) stats += "\n// unconverted rules : " + str(len(unsupported_capa_rules_list)) + "\n" - logger.info(stats) + logger.info("%s", stats) output_yar(stats) return 0 diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index f0a4f5cc..96b57988 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -14,7 +14,6 @@ import capa.render.utils as rutils import capa.render.default import capa.render.result_document as rd import capa.features.freeze.features as frzf -from capa.engine import * from capa.features.common import OS_AUTO, FORMAT_AUTO @@ -31,7 +30,7 @@ def find_subrule_matches(doc: rd.ResultDocument) -> Set[str]: collect the rule names that have been matched as a subrule match. this way we can avoid displaying entries for things that are too specific. """ - matches = set([]) + matches = set() def rec(node: rd.Match): if not node.success: @@ -67,7 +66,7 @@ def render_capabilities(doc: rd.ResultDocument, result): """ subrule_matches = find_subrule_matches(doc) - result["CAPABILITY"] = dict() + result["CAPABILITY"] = {} for rule in rutils.capability_rules(doc): if rule.meta.name in subrule_matches: # rules that are also matched by other rules should not get rendered by default. @@ -81,7 +80,7 @@ def render_capabilities(doc: rd.ResultDocument, result): else: capability = f"{rule.meta.name} ({count} matches)" - result["CAPABILITY"].setdefault(rule.meta.namespace, list()) + result["CAPABILITY"].setdefault(rule.meta.namespace, []) result["CAPABILITY"][rule.meta.namespace].append(capability) @@ -98,7 +97,7 @@ def render_attack(doc, result): 'EXECUTION': ['Shared Modules [T1129]']} } """ - result["ATTCK"] = dict() + result["ATTCK"] = {} tactics = collections.defaultdict(set) for rule in rutils.capability_rules(doc): if not rule.meta.attack: @@ -131,7 +130,7 @@ def render_mbc(doc, result): '[C0021.004]']} } """ - result["MBC"] = dict() + result["MBC"] = {} objectives = collections.defaultdict(set) for rule in rutils.capability_rules(doc): if not rule.meta.mbc: @@ -151,7 +150,7 @@ def render_mbc(doc, result): def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]: - result: Dict[str, Any] = dict() + result: Dict[str, Any] = {} render_meta(doc, result) render_attack(doc, result) render_mbc(doc, result) diff --git a/scripts/ci.sh b/scripts/ci.sh deleted file mode 100755 index 1d359c7c..00000000 --- a/scripts/ci.sh +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env bash - -# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: [package root]/LICENSE.txt -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under the License. - -# Use a console with emojis support for a better experience -# Use venv to ensure that `python` calls the correct python version - -# Stash uncommitted changes -MSG="pre-push-$(date +%s)"; -git stash push -kum "$MSG" &>/dev/null ; -STASH_LIST=$(git stash list); -if [[ "$STASH_LIST" == *"$MSG"* ]]; then - echo "Uncommitted changes stashed with message '$MSG', if you abort before they are restored run \`git stash pop\`"; -fi - -restore_stashed() { - if [[ "$STASH_LIST" == *"$MSG"* ]]; then - git stash pop --index &>/dev/null ; - echo "Stashed changes '$MSG' restored"; - fi -} - -# Run isort and print state -python -m isort --profile black --length-sort --line-width 120 -c . > isort-output.log 2>&1; -if [ $? == 0 ]; then - echo 'isort succeeded!! 💖'; -else - echo 'isort FAILED! 😭'; - echo 'Check isort-output.log for details'; - restore_stashed; - exit 1; -fi - -# Run black and print state -python -m black -l 120 --check . > black-output.log 2>&1; -if [ $? == 0 ]; then - echo 'black succeeded!! 💝'; -else - echo 'black FAILED! 😭'; - echo 'Check black-output.log for details'; - restore_stashed; - exit 2; -fi - -# Run rule linter and print state -python ./scripts/lint.py ./rules/ > rule-linter-output.log 2>&1; -if [ $? == 0 ]; then - echo 'Rule linter succeeded!! 💘'; -else - echo 'Rule linter FAILED! 😭'; - echo 'Check rule-linter-output.log for details'; - restore_stashed; - exit 3; -fi - -# Run tests except if first argument is no_tests -if [ "$1" != 'no_tests' ]; then - echo 'Running tests, please wait ⌛'; - python -m pytest tests/ --maxfail=1; - if [ $? == 0 ]; then - echo 'Tests succeed!! 🎉'; - else - echo 'Tests FAILED! 😓'; - echo 'Run `pytest -v --cov=capa test/` if you need more details'; - restore_stashed; - exit 4; - fi -fi - -restore_stashed; -echo 'SUCCEEDED 🎉🎉'; - diff --git a/scripts/detect_duplicate_features.py b/scripts/detect_duplicate_features.py index 88476b9d..3d37bb89 100644 --- a/scripts/detect_duplicate_features.py +++ b/scripts/detect_duplicate_features.py @@ -43,12 +43,12 @@ def get_features(rule_path: str) -> list: list: A list of all feature statements contained within the rule file. """ feature_list = [] - with open(rule_path, "r") as f: + with open(rule_path, "r", encoding="utf-8") as f: try: new_rule = capa.rules.Rule.from_yaml(f.read()) feature_list = get_child_features(new_rule.statement) except Exception as e: - logger.error("Error: New rule " + rule_path + " " + str(type(e)) + " " + str(e)) + logger.error("Error: New rule %s %s %s", rule_path, str(type(e)), str(e)) sys.exit(-1) return feature_list @@ -74,7 +74,7 @@ def find_overlapping_rules(new_rule_path, rules_path): continue count += 1 # Checks if any features match between existing and new rule. - if any([feature in rule_features for feature in new_rule_features]): + if any(feature in rule_features for feature in new_rule_features): overlapping_rules.append(rule_name) result = {"overlapping_rules": overlapping_rules, "count": count} diff --git a/scripts/lint.py b/scripts/lint.py index e289b356..0e14a62c 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -43,7 +43,7 @@ import capa.engine import capa.helpers import capa.features.insn from capa.rules import Rule, RuleSet -from capa.features.common import OS_AUTO, FORMAT_PE, FORMAT_DOTNET, String, Feature, Substring +from capa.features.common import OS_AUTO, String, Feature, Substring from capa.render.result_document import RuleMetadata logger = logging.getLogger("lint") @@ -517,7 +517,7 @@ class FeatureNegativeNumber(Lint): recommendation = "specify the number's two's complement representation" recommendation_template = ( "capa treats number features as unsigned values; you may specify the number's two's complement " - 'representation; will not match on "{:d}"' + + 'representation; will not match on "{:d}"' ) def check_features(self, ctx: Context, features: List[Feature]): @@ -535,7 +535,7 @@ class FeatureNtdllNtoskrnlApi(Lint): level = Lint.WARN recommendation_template = ( "check if {:s} is exported by both ntdll and ntoskrnl; if true, consider removing {:s} " - "module requirement to improve detection" + + "module requirement to improve detection" ) def check_features(self, ctx: Context, features: List[Feature]): @@ -826,7 +826,7 @@ def lint_rule(ctx: Context, rule: Rule): print("") if is_nursery_rule(rule): - has_examples = not any(map(lambda v: v.level == Lint.FAIL and v.name == "missing examples", violations)) + has_examples = not any(v.level == Lint.FAIL and v.name == "missing examples" for v in violations) lints_failed = len( tuple( filter( diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 523497a8..d9b5197f 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -27,7 +27,7 @@ example: |--------------------------------------|----------------------|-------------|-------------|-------------| | 18c30e4 main: remove perf debug msgs | 66,561,622 | 132.13s | 125.14s | 139.12s | - ^^^ --label or git hash + ^^^ --label or git hash """ import sys import timeit @@ -119,12 +119,12 @@ def main(argv=None): samples = timeit.repeat(do_iteration, number=args.number, repeat=args.repeat) - logger.debug("perf: find capabilities: min: %0.2fs" % (min(samples) / float(args.number))) - logger.debug("perf: find capabilities: avg: %0.2fs" % (sum(samples) / float(args.repeat) / float(args.number))) - logger.debug("perf: find capabilities: max: %0.2fs" % (max(samples) / float(args.number))) + logger.debug("perf: find capabilities: min: %0.2fs", (min(samples) / float(args.number))) + logger.debug("perf: find capabilities: avg: %0.2fs", (sum(samples) / float(args.repeat) / float(args.number))) + logger.debug("perf: find capabilities: max: %0.2fs", (max(samples) / float(args.number))) for counter, count in capa.perf.counters.most_common(): - logger.debug("perf: counter: {:}: {:,}".format(counter, count)) + logger.debug("perf: counter: %s: %s", counter, count) print( tabulate.tabulate( diff --git a/scripts/setup-hooks.sh b/scripts/setup-hooks.sh deleted file mode 100755 index 0ff607bd..00000000 --- a/scripts/setup-hooks.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env bash -# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: [package root]/LICENSE.txt -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under the License. - -set -euo pipefail - -GIT_DIR=$(git rev-parse --show-toplevel); -cd "$GIT_DIR"; - -# hooks may exist already (e.g. git-lfs configuration) -# If the `.git/hooks/$arg` file doesn't exist it, initialize with `#!/usr/bin/env bash` -# After that append `scripts/hooks/$arg` and ensure they can be run -create_hook() { - if [[ ! -e .git/hooks/$1 ]]; then - echo "#!/usr/bin/env bash" > ".git/hooks/$1"; - fi - echo "scripts/ci.sh ${2:-}" >> ".git/hooks/$1"; - chmod +x .git/hooks/"$1"; -} - -printf 'Adding scripts/ci.sh to .git/hooks/'; -create_hook 'pre-commit' 'no_tests'; -create_hook 'pre-push'; diff --git a/scripts/setup-linter-dependencies.py b/scripts/setup-linter-dependencies.py index a59d32f0..66c4a5d8 100644 --- a/scripts/setup-linter-dependencies.py +++ b/scripts/setup-linter-dependencies.py @@ -65,7 +65,7 @@ class MitreExtractor: if self.url == "": raise ValueError(f"URL not specified in class {self.__class__.__name__}") - logging.info(f"Downloading STIX data at: {self.url}") + logging.info("Downloading STIX data at: %s", self.url) stix_json = requests.get(self.url).json() self._memory_store = MemoryStore(stix_data=stix_json["objects"]) @@ -170,12 +170,12 @@ def main(args: argparse.Namespace) -> None: logging.info("Extracting MBC behaviors...") data["mbc"] = MbcExtractor().run() - logging.info(f"Writing results to {args.output}") + logging.info("Writing results to %s", args.output) try: - with open(args.output, "w") as jf: + with open(args.output, "w", encoding="utf-8") as jf: json.dump(data, jf, indent=2) except BaseException as e: - logging.error(f"Exception encountered when writing results: {e}") + logging.error("Exception encountered when writing results: %s", e) if __name__ == "__main__": diff --git a/scripts/show-features.py b/scripts/show-features.py index 3400da7a..4c327c8d 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -142,7 +142,6 @@ def main(argv=None): if args.function: if args.format == "freeze": - # TODO fix function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles)) else: function_handles = tuple(filter(lambda fh: format_address(fh.address) == args.function, function_handles)) @@ -236,7 +235,7 @@ def print_features(functions, extractor: capa.features.extractors.base_extractor if __name__ == "__main__": - if capa.main.is_runtime_ida(): + if capa.helpers.is_runtime_ida(): ida_main() else: sys.exit(main()) diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 87eef850..00000000 --- a/setup.cfg +++ /dev/null @@ -1,28 +0,0 @@ -[bdist_wheel] -universal = 1 - -[aliases] -test = pytest - -[pycodestyle] -# the following suppress lints that conflict with the project's style: -# -# E203 Whitespace before : -# E302 expected 2 blank lines, found 1 -# E402 module level import not at top of file -# E501 line too long (209 > 180 characters) -# E712 comparison to False should be 'if cond is False:' or 'if not cond:' -# E722 do not use bare 'except' -# E731 do not assign a lambda expression, use a def -# W291 trailing whitespace -# W503 line break before binary operator -ignore = E203, E302, E402, E501, E712, E722, E731, W291, W503 -max-line-length = 180 -statistics = True - - -[pylint.FORMAT] -max-line-length = 180 - -[pylint] -disable = missing-docstring,invalid-name,import-outside-toplevel,redefined-outer-name,consider-using-f-string \ No newline at end of file diff --git a/tests/fixtures.py b/tests/fixtures.py index 9950fa66..ba00dde0 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -9,7 +9,6 @@ import os import binascii -import itertools import contextlib import collections from typing import Set, Dict @@ -69,7 +68,7 @@ def xfail(condition, reason=None): try: # do the block yield - except: + except Exception: if condition: # we expected the test to fail, so raise and register this via pytest pytest.xfail(reason) @@ -652,7 +651,6 @@ FEATURE_PRESENCE_TESTS = sorted( # insn/api: call via jmp ("mimikatz", "function=0x40B3C6", capa.features.insn.API("LocalFree"), True), ("c91887...", "function=0x40156F", capa.features.insn.API("CloseClipboard"), True), - # TODO ignore thunk functions that call via jmp? # insn/api: resolve indirect calls ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CreatePipe"), True), ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.SetHandleInformation"), True), diff --git a/tests/test_binja_features.py b/tests/test_binja_features.py index c5c3d33f..5819e0dc 100644 --- a/tests/test_binja_features.py +++ b/tests/test_binja_features.py @@ -5,11 +5,13 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import os import logging from pathlib import Path +import pytest import fixtures -from fixtures import * +from fixtures import scope, sample import capa.main @@ -23,7 +25,7 @@ try: try: binaryninja.load(source=b"\x90") - except RuntimeError as e: + except RuntimeError: logger.warning("Binary Ninja license is not valid, provide via $BN_LICENSE or license.dat") else: binja_present = True @@ -38,6 +40,8 @@ except ImportError: indirect=["sample", "scope"], ) def test_binja_features(sample, scope, feature, expected): + if feature == capa.features.common.Characteristic("stack string"): + pytest.xfail("skip failing Binja stack string detection temporarily, see #1473") fixtures.do_test_feature_presence(fixtures.get_binja_extractor, sample, scope, feature, expected) diff --git a/tests/test_dnfile_features.py b/tests/test_dnfile_features.py index 76540468..5ffa2ead 100644 --- a/tests/test_dnfile_features.py +++ b/tests/test_dnfile_features.py @@ -6,13 +6,11 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import pytest import fixtures -from fixtures import * -from fixtures import parametrize +from fixtures import scope, sample -@parametrize( +@fixtures.parametrize( "sample,scope,feature,expected", fixtures.FEATURE_PRESENCE_TESTS_DOTNET, indirect=["sample", "scope"], @@ -21,7 +19,7 @@ def test_dnfile_features(sample, scope, feature, expected): fixtures.do_test_feature_presence(fixtures.get_dnfile_extractor, sample, scope, feature, expected) -@parametrize( +@fixtures.parametrize( "sample,scope,feature,expected", fixtures.FEATURE_COUNT_TESTS_DOTNET, indirect=["sample", "scope"], diff --git a/tests/test_dotnet_features.py b/tests/test_dotnet_features.py index 9980a730..af633386 100644 --- a/tests/test_dotnet_features.py +++ b/tests/test_dotnet_features.py @@ -6,13 +6,11 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import pytest import fixtures -from fixtures import * -from fixtures import parametrize +from fixtures import scope, sample, b9f5b_dotnetfile_extractor, mixed_mode_64_dotnetfile_extractor -@parametrize( +@fixtures.parametrize( "sample,scope,feature,expected", fixtures.FEATURE_PRESENCE_TESTS_DOTNET, indirect=["sample", "scope"], @@ -21,7 +19,7 @@ def test_dnfile_features(sample, scope, feature, expected): fixtures.do_test_feature_presence(fixtures.get_dnfile_extractor, sample, scope, feature, expected) -@parametrize( +@fixtures.parametrize( "extractor,function,expected", [ ("b9f5b_dotnetfile_extractor", "is_dotnet_file", True), diff --git a/tests/test_dotnetfile_features.py b/tests/test_dotnetfile_features.py index 28d63cdb..3b4858a1 100644 --- a/tests/test_dotnetfile_features.py +++ b/tests/test_dotnetfile_features.py @@ -8,13 +8,20 @@ import pytest import fixtures -from fixtures import * -from fixtures import parametrize +from fixtures import ( + FEATURE_PRESENCE_TESTS_DOTNET, + scope, + sample, + b9f5b_dotnetfile_extractor, + mixed_mode_64_dotnetfile_extractor, +) + +import capa.features.file -@parametrize( +@fixtures.parametrize( "sample,scope,feature,expected", - fixtures.FEATURE_PRESENCE_TESTS_DOTNET, + FEATURE_PRESENCE_TESTS_DOTNET, indirect=["sample", "scope"], ) def test_dotnetfile_features(sample, scope, feature, expected): @@ -27,7 +34,7 @@ def test_dotnetfile_features(sample, scope, feature, expected): fixtures.do_test_feature_presence(fixtures.get_dotnetfile_extractor, sample, scope, feature, expected) -@parametrize( +@fixtures.parametrize( "extractor,function,expected", [ ("b9f5b_dotnetfile_extractor", "is_dotnet_file", True), diff --git a/tests/test_engine.py b/tests/test_engine.py index c070edb8..785896a3 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -6,9 +6,8 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import capa.features.address -from capa.engine import * -from capa.features import * -from capa.features.insn import * +from capa.engine import Or, And, Not, Some, Range +from capa.features.insn import Number ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001) ADDR2 = capa.features.address.AbsoluteVirtualAddress(0x401002) diff --git a/tests/test_freeze.py b/tests/test_freeze.py index 2c5f1920..62a1aacd 100644 --- a/tests/test_freeze.py +++ b/tests/test_freeze.py @@ -8,7 +8,8 @@ import textwrap from typing import List -from fixtures import * +import pytest +from fixtures import z9324d_extractor import capa.main import capa.rules @@ -20,7 +21,8 @@ import capa.features.freeze import capa.features.basicblock import capa.features.extractors.null import capa.features.extractors.base_extractor -from capa.features.address import AbsoluteVirtualAddress +from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.extractors.base_extractor import BBHandle, FunctionHandle EXTRACTOR = capa.features.extractors.null.NullFeatureExtractor( base_address=AbsoluteVirtualAddress(0x401000), @@ -59,7 +61,7 @@ EXTRACTOR = capa.features.extractors.null.NullFeatureExtractor( def addresses(s) -> List[Address]: - return list(sorted(map(lambda i: i.address, s))) + return sorted(i.address for i in s) def test_null_feature_extractor(): @@ -102,17 +104,17 @@ def compare_extractors(a, b): assert addresses(a.get_functions()) == addresses(b.get_functions()) for f in a.get_functions(): assert addresses(a.get_basic_blocks(f)) == addresses(b.get_basic_blocks(f)) - assert list(sorted(set(a.extract_function_features(f)))) == list(sorted(set(b.extract_function_features(f)))) + assert sorted(set(a.extract_function_features(f))) == sorted(set(b.extract_function_features(f))) for bb in a.get_basic_blocks(f): assert addresses(a.get_instructions(f, bb)) == addresses(b.get_instructions(f, bb)) - assert list(sorted(set(a.extract_basic_block_features(f, bb)))) == list( - sorted(set(b.extract_basic_block_features(f, bb))) + assert sorted(set(a.extract_basic_block_features(f, bb))) == sorted( + set(b.extract_basic_block_features(f, bb)) ) for insn in a.get_instructions(f, bb): - assert list(sorted(set(a.extract_insn_features(f, bb, insn)))) == list( - sorted(set(b.extract_insn_features(f, bb, insn))) + assert sorted(set(a.extract_insn_features(f, bb, insn))) == sorted( + set(b.extract_insn_features(f, bb, insn)) ) diff --git a/tests/test_ida_features.py b/tests/test_ida_features.py index 77897194..3eb3b053 100644 --- a/tests/test_ida_features.py +++ b/tests/test_ida_features.py @@ -55,7 +55,6 @@ import pytest try: sys.path.append(str(Path(__file__).parent)) import fixtures - from fixtures import * finally: sys.path.pop() @@ -104,7 +103,7 @@ def test_ida_features(): try: fixtures.do_test_feature_presence(get_ida_extractor, sample, scope, feature, expected) - except Exception as e: + except Exception: f = io.StringIO() traceback.print_exc(file=f) yield this_name, id, "fail", f.getvalue() @@ -130,7 +129,7 @@ def test_ida_feature_counts(): try: fixtures.do_test_feature_count(get_ida_extractor, sample, scope, feature, expected) - except Exception as e: + except Exception: f = io.StringIO() traceback.print_exc(file=f) yield this_name, id, "fail", f.getvalue() diff --git a/tests/test_main.py b/tests/test_main.py index bf711e9a..45554760 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -10,8 +10,11 @@ import json import textwrap import fixtures -from fixtures import * from fixtures import ( + z499c2_extractor, + z9324d_extractor, + pma16_01_extractor, + pingtaest_extractor, _692f_dotnetfile_extractor, _1c444_dotnetfile_extractor, _039a6_dotnetfile_extractor, @@ -22,7 +25,6 @@ import capa.main import capa.rules import capa.engine import capa.features -from capa.engine import * def test_main(z9324d_extractor): @@ -357,7 +359,7 @@ def test_instruction_scope(z9324d_extractor): ) capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) assert "push 1000" in capabilities - assert 0x4071A4 in set(map(lambda result: result[0], capabilities["push 1000"])) + assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} def test_instruction_subscope(z9324d_extractor): @@ -387,7 +389,7 @@ def test_instruction_subscope(z9324d_extractor): ) capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) assert "push 1000 on i386" in capabilities - assert 0x406F60 in set(map(lambda result: result[0], capabilities["push 1000 on i386"])) + assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} def test_fix262(pma16_01_extractor, capsys): @@ -431,9 +433,9 @@ def test_json_meta(capsys): std = capsys.readouterr() std_json = json.loads(std.out) - assert {"type": "absolute", "value": 0x10001010} in list( - map(lambda f: f["address"], std_json["meta"]["analysis"]["layout"]["functions"]) - ) + assert {"type": "absolute", "value": 0x10001010} in [ + f["address"] for f in std_json["meta"]["analysis"]["layout"]["functions"] + ] for addr, info in std_json["meta"]["analysis"]["layout"]["functions"]: if addr == ["absolute", 0x10001010]: diff --git a/tests/test_match.py b/tests/test_match.py index fc3583f7..99301cce 100644 --- a/tests/test_match.py +++ b/tests/test_match.py @@ -13,8 +13,7 @@ import capa.engine import capa.features.insn import capa.features.common from capa.rules import Scope -from capa.features.insn import * -from capa.features.common import * +from capa.features.common import OS, OS_ANY, OS_WINDOWS, String, MatchedRule def match(rules, features, va, scope=Scope.FUNCTION): diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index d07ba330..40c008fa 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -8,15 +8,13 @@ import textwrap -import pytest - import capa.rules import capa.engine import capa.optimizer import capa.features.common from capa.engine import Or, And from capa.features.insn import Mnemonic -from capa.features.common import Arch, Bytes, Substring +from capa.features.common import Arch, Substring def test_optimizer_order(): diff --git a/tests/test_os_detection.py b/tests/test_os_detection.py index 585aada2..d7bdb55c 100644 --- a/tests/test_os_detection.py +++ b/tests/test_os_detection.py @@ -9,8 +9,7 @@ import io import zlib -import pytest -from fixtures import * +from fixtures import get_data_path_by_name import capa.features.extractors.elf diff --git a/tests/test_pefile_features.py b/tests/test_pefile_features.py index 79a68088..c9013d50 100644 --- a/tests/test_pefile_features.py +++ b/tests/test_pefile_features.py @@ -9,13 +9,12 @@ from pathlib import PosixPath import pytest import fixtures -from fixtures import * -from fixtures import parametrize +from fixtures import scope, sample import capa.features.file -@parametrize( +@fixtures.parametrize( "sample,scope,feature,expected", fixtures.FEATURE_PRESENCE_TESTS, indirect=["sample", "scope"], diff --git a/tests/test_proto.py b/tests/test_proto.py index 64c950bc..8a76ccfc 100644 --- a/tests/test_proto.py +++ b/tests/test_proto.py @@ -8,7 +8,8 @@ import copy from typing import Any -from fixtures import * +import pytest +from fixtures import a3f3bbc_rd, a076114_rd, pma0101_rd, al_khaserx64_rd, al_khaserx86_rd, dotnet_1c444e_rd import capa.rules import capa.render @@ -19,6 +20,7 @@ import capa.features.address import capa.render.proto.capa_pb2 as capa_pb2 import capa.render.result_document as rd import capa.features.freeze.features +from capa.helpers import assert_never @pytest.mark.parametrize( @@ -305,7 +307,7 @@ def assert_statement(a: rd.StatementNode, b: capa_pb2.StatementNode): else: # unhandled statement - assert False + assert_never(sa) def assert_round_trip(doc: rd.ResultDocument): diff --git a/tests/test_result_document.py b/tests/test_result_document.py index 5ae9af26..161628ff 100644 --- a/tests/test_result_document.py +++ b/tests/test_result_document.py @@ -9,7 +9,7 @@ import copy import pytest import fixtures -from fixtures import * +from fixtures import a3f3bbc_rd, a076114_rd, pma0101_rd, al_khaserx64_rd, al_khaserx86_rd, dotnet_1c444e_rd import capa import capa.engine as ceng diff --git a/tests/test_rules.py b/tests/test_rules.py index 9f07f31d..afd9c012 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -344,7 +344,7 @@ def test_subscope_rules(): def test_duplicate_rules(): with pytest.raises(capa.rules.InvalidRule): - rules = capa.rules.RuleSet( + _ = capa.rules.RuleSet( [ capa.rules.Rule.from_yaml( textwrap.dedent( @@ -374,7 +374,7 @@ def test_duplicate_rules(): def test_missing_dependency(): with pytest.raises(capa.rules.InvalidRule): - rules = capa.rules.RuleSet( + _ = capa.rules.RuleSet( [ capa.rules.Rule.from_yaml( textwrap.dedent( @@ -393,7 +393,7 @@ def test_missing_dependency(): def test_invalid_rules(): with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -406,7 +406,7 @@ def test_invalid_rules(): ) with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -420,7 +420,7 @@ def test_invalid_rules(): # att&ck and mbc must be lists with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -433,7 +433,7 @@ def test_invalid_rules(): ) ) with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -497,7 +497,7 @@ def test_count_number_symbol(): def test_invalid_number(): with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -510,7 +510,7 @@ def test_invalid_number(): ) with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -523,7 +523,7 @@ def test_invalid_number(): ) with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -583,7 +583,7 @@ def test_count_offset_symbol(): def test_invalid_offset(): with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -596,7 +596,7 @@ def test_invalid_offset(): ) with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -609,7 +609,7 @@ def test_invalid_offset(): ) with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -624,7 +624,7 @@ def test_invalid_offset(): def test_invalid_string_values_int(): with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -637,7 +637,7 @@ def test_invalid_string_values_int(): ) with pytest.raises(capa.rules.InvalidRule): - r = capa.rules.Rule.from_yaml( + _ = capa.rules.Rule.from_yaml( textwrap.dedent( """ rule: @@ -874,12 +874,12 @@ def test_rules_namespace_dependencies(): ), ] - r3 = set(map(lambda r: r.name, capa.rules.get_rules_and_dependencies(rules, "rule 3"))) + r3 = {r.name for r in capa.rules.get_rules_and_dependencies(rules, "rule 3")} assert "rule 1" in r3 assert "rule 2" not in r3 assert "rule 4" not in r3 - r4 = set(map(lambda r: r.name, capa.rules.get_rules_and_dependencies(rules, "rule 4"))) + r4 = {r.name for r in capa.rules.get_rules_and_dependencies(rules, "rule 4")} assert "rule 1" in r4 assert "rule 2" in r4 assert "rule 3" not in r4 diff --git a/tests/test_scripts.py b/tests/test_scripts.py index 496df577..4b2b6bc1 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -8,12 +8,14 @@ import os import sys +import logging import textwrap import subprocess from pathlib import Path import pytest -from fixtures import * + +logger = logging.getLogger(__name__) CD = Path(__file__).resolve().parent @@ -69,7 +71,7 @@ def test_bulk_process(tmp_path): def run_program(script_path, args): args = [sys.executable] + [script_path] + args - print(f"running: '{args}'") + logger.debug("running: %r", args) return subprocess.run(args, stdout=subprocess.PIPE) @@ -169,7 +171,7 @@ def test_detect_duplicate_features(tmpdir): """ The rule_overlaps list represents the number of overlaps between each rule in the RULESET. An overlap includes a rule overlap with itself. - The scripts + The scripts The overlaps are like: - Rule 0 has zero overlaps in RULESET - Rule 1 overlaps with 3 other rules in RULESET diff --git a/tests/test_viv_features.py b/tests/test_viv_features.py index 58ce5ace..1e07ca74 100644 --- a/tests/test_viv_features.py +++ b/tests/test_viv_features.py @@ -6,7 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import fixtures -from fixtures import * +from fixtures import scope, sample @fixtures.parametrize(