diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9ef86252..fd907c4b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -92,7 +92,8 @@ repos: # F401: `foo` imported but unused (prefer ruff) # F811 Redefinition of unused `foo` (prefer ruff) # E501 line too long (prefer black) - - "--extend-ignore=E203,F401,F811,E501" + # B010 Do not call setattr with a constant attribute value + - "--extend-ignore=E203,F401,F811,E501,B010" - "--extend-exclude" - "capa/render/proto/capa_pb2.py" - "capa/" diff --git a/capa/engine.py b/capa/engine.py index 90a3308f..d498639d 100644 --- a/capa/engine.py +++ b/capa/engine.py @@ -71,7 +71,7 @@ class Statement: yield child if hasattr(self, "children"): - for child in getattr(self, "children"): + for child in self.children: assert isinstance(child, (Statement, Feature)) yield child @@ -83,7 +83,7 @@ class Statement: self.child = new if hasattr(self, "children"): - children = getattr(self, "children") + children = self.children for i, child in enumerate(children): if child is existing: children[i] = new diff --git a/capa/features/common.py b/capa/features/common.py index 5060ebaa..3bca4ba5 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -100,7 +100,10 @@ class Result: return self.success -class Feature(abc.ABC): +class Feature(abc.ABC): # noqa: B024 + # this is an abstract class, since we don't want anyone to instantiate it directly, + # but it doesn't have any abstract methods. + def __init__( self, value: Union[str, int, float, bytes], diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index cbc1ab3d..6436902d 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -411,7 +411,7 @@ class ELF: # there should be vn_cnt of these. # each entry describes an ABI name required by the shared object. vna_offset = vn_offset + vn_aux - for i in range(vn_cnt): + for _ in range(vn_cnt): # ElfXX_Vernaux layout is the same on 32 and 64 bit _, _, _, vna_name, vna_next = struct.unpack_from(self.endian + "IHHII", shdr.buf, vna_offset) diff --git a/capa/features/extractors/ida/helpers.py b/capa/features/extractors/ida/helpers.py index 0279f6fd..9ca6be85 100644 --- a/capa/features/extractors/ida/helpers.py +++ b/capa/features/extractors/ida/helpers.py @@ -5,6 +5,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import functools from typing import Any, Dict, Tuple, Iterator, Optional import idc @@ -80,9 +81,22 @@ def get_segment_buffer(seg: idaapi.segment_t) -> bytes: return buff if buff else b"" +def inspect_import(imports, library, ea, function, ordinal): + if function and function.startswith("__imp_"): + # handle mangled PE imports + function = function[len("__imp_") :] + + if function and "@@" in function: + # handle mangled ELF imports, like "fopen@@glibc_2.2.5" + function, _, _ = function.partition("@@") + + imports[ea] = (library.lower(), function, ordinal) + return True + + def get_file_imports() -> Dict[int, Tuple[str, str, int]]: """get file imports""" - imports = {} + imports: Dict[int, Tuple[str, str, int]] = {} for idx in range(idaapi.get_import_module_qty()): library = idaapi.get_import_module_name(idx) @@ -96,19 +110,8 @@ def get_file_imports() -> Dict[int, Tuple[str, str, int]]: if library == ".dynsym": library = "" - def inspect_import(ea, function, ordinal): - if function and function.startswith("__imp_"): - # handle mangled PE imports - function = function[len("__imp_") :] - - if function and "@@" in function: - # handle mangled ELF imports, like "fopen@@glibc_2.2.5" - function, _, _ = function.partition("@@") - - imports[ea] = (library.lower(), function, ordinal) - return True - - idaapi.enum_import_names(idx, inspect_import) + cb = functools.partial(inspect_import, imports, library) + idaapi.enum_import_names(idx, cb) return imports diff --git a/capa/features/extractors/viv/insn.py b/capa/features/extractors/viv/insn.py index 93c4bfa9..e52c98fd 100644 --- a/capa/features/extractors/viv/insn.py +++ b/capa/features/extractors/viv/insn.py @@ -288,16 +288,16 @@ def extract_insn_bytes_features(fh: FunctionHandle, bb, ih: InsnHandle) -> Itera else: continue - for v in derefs(f.vw, v): + for vv in derefs(f.vw, v): try: - buf = read_bytes(f.vw, v) + buf = read_bytes(f.vw, vv) except envi.exc.SegmentationViolation: continue if capa.features.extractors.helpers.all_zeros(buf): continue - if f.vw.isProbablyString(v) or f.vw.isProbablyUnicode(v): + if f.vw.isProbablyString(vv) or f.vw.isProbablyUnicode(vv): # don't extract byte features for obvious strings continue @@ -700,9 +700,9 @@ def extract_op_string_features( else: return - for v in derefs(f.vw, v): + for vv in derefs(f.vw, v): try: - s = read_string(f.vw, v).rstrip("\x00") + s = read_string(f.vw, vv).rstrip("\x00") except ValueError: continue else: diff --git a/capa/helpers.py b/capa/helpers.py index e1bfd7ba..b098a4f2 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -45,7 +45,9 @@ def is_runtime_ida(): def assert_never(value) -> NoReturn: - assert False, f"Unhandled value: {value} ({type(value).__name__})" + # careful: python -O will remove this assertion. + # but this is only used for type checking, so it's ok. + assert False, f"Unhandled value: {value} ({type(value).__name__})" # noqa: B011 def get_format_from_extension(sample: str) -> str: diff --git a/capa/ida/plugin/view.py b/capa/ida/plugin/view.py index 4dba4ca1..680abf2a 100644 --- a/capa/ida/plugin/view.py +++ b/capa/ida/plugin/view.py @@ -6,6 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import re +from typing import Dict, Optional from collections import Counter import idc @@ -646,7 +647,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): counted = list(zip(Counter(features).keys(), Counter(features).values())) # single features - for k, v in filter(lambda t: t[1] == 1, counted): + for k, _ in filter(lambda t: t[1] == 1, counted): if isinstance(k, (capa.features.common.String,)): value = f'"{capa.features.common.escape_string(k.get_value_str())}"' else: @@ -682,10 +683,12 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): # we don't add a new node for description; either set description column of parent's last child # or the parent itself - if parent.childCount(): - parent.child(parent.childCount() - 1).setText(1, feature.lstrip("description:").lstrip()) - else: - parent.setText(1, feature.lstrip("description:").lstrip()) + if feature.startswith("description:"): + description = feature[len("description:") :].lstrip() + if parent.childCount(): + parent.child(parent.childCount() - 1).setText(1, description) + else: + parent.setText(1, description) return None elif feature.startswith("- description:"): if not parent: @@ -693,7 +696,8 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): return None # we don't add a new node for description; set the description column of the parent instead - parent.setText(1, feature.lstrip("- description:").lstrip()) + description = feature[len("- description:") :].lstrip() + parent.setText(1, description) return None node = QtWidgets.QTreeWidgetItem(parent) @@ -1010,7 +1014,7 @@ class CapaExplorerRulegenFeatures(QtWidgets.QTreeWidget): return o - def load_features(self, file_features, func_features={}): + def load_features(self, file_features, func_features: Optional[Dict] = None): """ """ self.parse_features_for_tree(self.new_parent_node(self, ("File Scope",)), file_features) if func_features: diff --git a/capa/main.py b/capa/main.py index 0db61f69..0b1e046d 100644 --- a/capa/main.py +++ b/capa/main.py @@ -440,7 +440,8 @@ def get_default_root() -> str: # pylance/mypy don't like `sys._MEIPASS` because this isn't standard. # its injected by pyinstaller. # so we'll fetch this attribute dynamically. - return getattr(sys, "_MEIPASS") + assert hasattr(sys, "_MEIPASS") + return sys._MEIPASS else: return os.path.join(os.path.dirname(__file__), "..") diff --git a/capa/render/default.py b/capa/render/default.py index 1cd07c38..21b0104b 100644 --- a/capa/render/default.py +++ b/capa/render/default.py @@ -64,7 +64,7 @@ def find_subrule_matches(doc: rd.ResultDocument): matches.add(match.node.feature.match) for rule in rutils.capability_rules(doc): - for address, match in rule.matches: + for _, match in rule.matches: rec(match) return matches diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index ec69063e..0f2184a7 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1247,7 +1247,7 @@ class RuleSet: # the set of subtypes of type A is unbounded, # because any user might come along and create a new subtype B, # so mypy can't reason about this set of types. - assert False, f"Unhandled value: {node} ({type(node).__name__})" + assert_never(node) else: # programming error assert_never(node) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 5e3ed0a1..b5d48691 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -169,15 +169,17 @@ def main(argv=None): return -1 samples = [] - for base, directories, files in os.walk(args.input): + for base, _, files in os.walk(args.input): for file in files: samples.append(os.path.join(base, file)) - def pmap(f, args, parallelism=multiprocessing.cpu_count()): + cpu_count = multiprocessing.cpu_count() + + def pmap(f, args, parallelism=cpu_count): """apply the given function f to the given args using subprocesses""" return multiprocessing.Pool(parallelism).imap(f, args) - def tmap(f, args, parallelism=multiprocessing.cpu_count()): + def tmap(f, args, parallelism=cpu_count): """apply the given function f to the given args using threads""" return multiprocessing.pool.ThreadPool(parallelism).imap(f, args) diff --git a/scripts/lint.py b/scripts/lint.py index ff2e8bd8..3ba073d9 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -888,7 +888,7 @@ def collect_samples(path) -> Dict[str, Path]: recurse through the given path, collecting all file paths, indexed by their content sha256, md5, and filename. """ samples = {} - for root, dirs, files in os.walk(path): + for root, _, files in os.walk(path): for name in files: if name.endswith(".viv"): continue diff --git a/setup.py b/setup.py index 1a0d8b3f..2b1be6be 100644 --- a/setup.py +++ b/setup.py @@ -75,6 +75,7 @@ setuptools.setup( "pytest-instafail==0.5.0", "pytest-cov==4.1.0", "flake8==6.0.0", + "flake8-bugbear==23.6.5", "ruff==0.0.275", "black==23.3.0", "isort==5.11.4", diff --git a/tests/test_proto.py b/tests/test_proto.py index 43a0ad78..8a76ccfc 100644 --- a/tests/test_proto.py +++ b/tests/test_proto.py @@ -20,6 +20,7 @@ import capa.features.address import capa.render.proto.capa_pb2 as capa_pb2 import capa.render.result_document as rd import capa.features.freeze.features +from capa.helpers import assert_never @pytest.mark.parametrize( @@ -306,7 +307,7 @@ def assert_statement(a: rd.StatementNode, b: capa_pb2.StatementNode): else: # unhandled statement - assert False + assert_never(sa) def assert_round_trip(doc: rd.ResultDocument):