From e745fa6aab863704d995c0b57cbb9e00faea79f2 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Wed, 22 Apr 2026 19:03:46 +0300 Subject: [PATCH] style: ruff format changed files --- capa/capabilities/common.py | 12 ++- capa/features/extractors/base_extractor.py | 12 ++- capa/features/extractors/common.py | 10 ++- capa/features/extractors/dotnetfile.py | 10 ++- capa/features/extractors/elf.py | 96 ++++++++++++++++++---- capa/features/extractors/elffile.py | 6 +- capa/features/freeze/__init__.py | 11 ++- capa/helpers.py | 11 ++- capa/loader.py | 10 ++- capa/main.py | 22 ++++- capa/optimizer.py | 18 +++- capa/render/vverbose.py | 64 +++++++++++++-- capa/rules/__init__.py | 94 +++++++++++++++++---- rules | 2 +- tests/data | 2 +- tests/test_engine.py | 18 +++- tests/test_freeze_static.py | 36 ++++++-- tests/test_render.py | 12 ++- 18 files changed, 369 insertions(+), 77 deletions(-) diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py index 4d7d1f4b..c785522d 100644 --- a/capa/capabilities/common.py +++ b/capa/capabilities/common.py @@ -21,8 +21,16 @@ from dataclasses import dataclass from capa.rules import Rule, Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.features.address import NO_ADDRESS -from capa.render.result_document import LibraryFunction, StaticFeatureCounts, DynamicFeatureCounts -from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor +from capa.render.result_document import ( + LibraryFunction, + StaticFeatureCounts, + DynamicFeatureCounts, +) +from capa.features.extractors.base_extractor import ( + FeatureExtractor, + StaticFeatureExtractor, + DynamicFeatureExtractor, +) logger = logging.getLogger(__name__) diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py index 704301eb..10841722 100644 --- a/capa/features/extractors/base_extractor.py +++ b/capa/features/extractors/base_extractor.py @@ -23,7 +23,13 @@ from dataclasses import dataclass import capa.features.address from capa.features.common import Feature -from capa.features.address import Address, ThreadAddress, ProcessAddress, DynamicCallAddress, AbsoluteVirtualAddress +from capa.features.address import ( + Address, + ThreadAddress, + ProcessAddress, + DynamicCallAddress, + AbsoluteVirtualAddress, +) # feature extractors may reference functions, BBs, insns by opaque handle values. # you can use the `.address` property to get and render the address of the feature. @@ -119,7 +125,9 @@ class StaticFeatureExtractor(abc.ABC): self._sample_hashes = hashes @abc.abstractmethod - def get_base_address(self) -> Union[AbsoluteVirtualAddress, capa.features.address._NoAddress]: + def get_base_address( + self, + ) -> Union[AbsoluteVirtualAddress, capa.features.address._NoAddress]: """ fetch the preferred load address at which the sample was analyzed. diff --git a/capa/features/extractors/common.py b/capa/features/extractors/common.py index 35cbccab..c418af63 100644 --- a/capa/features/extractors/common.py +++ b/capa/features/extractors/common.py @@ -116,7 +116,10 @@ def extract_arch(buf) -> Iterator[tuple[Feature, Address]]: # rules that rely on arch conditions will fail to match on shellcode. # # for (2), this logic will need to be updated as the format is implemented. - logger.debug("unsupported file format: %s, will not guess Arch", binascii.hexlify(buf[:4]).decode("ascii")) + logger.debug( + "unsupported file format: %s, will not guess Arch", + binascii.hexlify(buf[:4]).decode("ascii"), + ) return @@ -148,5 +151,8 @@ def extract_os(buf, os=OS_AUTO) -> Iterator[tuple[Feature, Address]]: # rules that rely on OS conditions will fail to match on shellcode. # # for (2), this logic will need to be updated as the format is implemented. - logger.debug("unsupported file format: %s, will not guess OS", binascii.hexlify(buf[:4]).decode("ascii")) + logger.debug( + "unsupported file format: %s, will not guess OS", + binascii.hexlify(buf[:4]).decode("ascii"), + ) return diff --git a/capa/features/extractors/dotnetfile.py b/capa/features/extractors/dotnetfile.py index 54edcb0b..ce0f2ab7 100644 --- a/capa/features/extractors/dotnetfile.py +++ b/capa/features/extractors/dotnetfile.py @@ -114,7 +114,10 @@ def extract_file_class_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Cla typedefnamespace, typedefname = resolve_nested_typedef_name(nested_class_table, rid, typedef, pe) token = calculate_dotnet_token_value(dnfile.mdtable.TypeDef.number, rid) - yield Class(DnType.format_name(typedefname, namespace=typedefnamespace)), DNTokenAddress(token) + yield ( + Class(DnType.format_name(typedefname, namespace=typedefnamespace)), + DNTokenAddress(token), + ) for rid, typeref in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number): # emit external .NET classes @@ -123,7 +126,10 @@ def extract_file_class_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple[Cla typerefnamespace, typerefname = resolve_nested_typeref_name(typeref.ResolutionScope.row_index, typeref, pe) token = calculate_dotnet_token_value(dnfile.mdtable.TypeRef.number, rid) - yield Class(DnType.format_name(typerefname, namespace=typerefnamespace)), DNTokenAddress(token) + yield ( + Class(DnType.format_name(typerefname, namespace=typerefnamespace)), + DNTokenAddress(token), + ) def extract_file_os(**kwargs) -> Iterator[tuple[OS, Address]]: diff --git a/capa/features/extractors/elf.py b/capa/features/extractors/elf.py index 9121a57e..732b4f67 100644 --- a/capa/features/extractors/elf.py +++ b/capa/features/extractors/elf.py @@ -181,7 +181,12 @@ class ELF: else: raise NotImplementedError() - logger.debug("e_phoff: 0x%02x e_phentsize: 0x%02x e_phnum: %d", e_phoff, self.e_phentsize, self.e_phnum) + logger.debug( + "e_phoff: 0x%02x e_phentsize: 0x%02x e_phnum: %d", + e_phoff, + self.e_phentsize, + self.e_phnum, + ) self.f.seek(e_phoff) program_header_size = self.e_phnum * self.e_phentsize @@ -362,13 +367,31 @@ class ELF: shent = self.shbuf[shent_offset : shent_offset + self.e_shentsize] if self.bitness == 32: - sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, _, _, sh_entsize = struct.unpack_from( - self.endian + "IIIIIIIIII", shent, 0x0 - ) + ( + sh_name, + sh_type, + sh_flags, + sh_addr, + sh_offset, + sh_size, + sh_link, + _, + _, + sh_entsize, + ) = struct.unpack_from(self.endian + "IIIIIIIIII", shent, 0x0) elif self.bitness == 64: - sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, _, _, sh_entsize = struct.unpack_from( - self.endian + "IIQQQQIIQQ", shent, 0x0 - ) + ( + sh_name, + sh_type, + sh_flags, + sh_addr, + sh_offset, + sh_size, + sh_link, + _, + _, + sh_entsize, + ) = struct.unpack_from(self.endian + "IIQQQQIIQQ", shent, 0x0) else: raise NotImplementedError() @@ -377,7 +400,17 @@ class ELF: if len(buf) != sh_size: raise ValueError("failed to read section header content") - return Shdr(sh_name, sh_type, sh_flags, sh_addr, sh_offset, sh_size, sh_link, sh_entsize, buf) + return Shdr( + sh_name, + sh_type, + sh_flags, + sh_addr, + sh_offset, + sh_size, + sh_link, + sh_entsize, + buf, + ) @property def section_headers(self): @@ -596,7 +629,12 @@ class PHNote: name_offset = 0xC self.desc_offset = name_offset + align(namesz, 0x4) - logger.debug("ph:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", namesz, self.descsz, self.type_) + logger.debug( + "ph:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", + namesz, + self.descsz, + self.type_, + ) self.name = self.buf[name_offset : name_offset + namesz].partition(b"\x00")[0].decode("ascii") logger.debug("name: %s", self.name) @@ -623,7 +661,13 @@ class PHNote: if not os: return None - logger.debug("abi tag: %s earliest compatible kernel: %d.%d.%d", os, kmajor, kminor, kpatch) + logger.debug( + "abi tag: %s earliest compatible kernel: %d.%d.%d", + os, + kmajor, + kminor, + kpatch, + ) return ABITag(os, kmajor, kminor, kpatch) @@ -645,7 +689,12 @@ class SHNote: name_offset = 0xC self.desc_offset = name_offset + align(namesz, 0x4) - logger.debug("sh:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", namesz, self.descsz, self.type_) + logger.debug( + "sh:namesz: 0x%02x descsz: 0x%02x type: 0x%04x", + namesz, + self.descsz, + self.type_, + ) name_buf = self.buf[name_offset : name_offset + namesz] self.name = read_cstr(name_buf, 0x0) @@ -667,7 +716,13 @@ class SHNote: if not os: return None - logger.debug("abi tag: %s earliest compatible kernel: %d.%d.%d", os, kmajor, kminor, kpatch) + logger.debug( + "abi tag: %s earliest compatible kernel: %d.%d.%d", + os, + kmajor, + kminor, + kpatch, + ) return ABITag(os, kmajor, kminor, kpatch) @@ -752,7 +807,8 @@ class SymTab: strtab_section = elf.sections[section.sh_link] sh_symtab = Shdr.from_viv(section, elf.readAtOffset(section.sh_offset, section.sh_size)) sh_strtab = Shdr.from_viv( - strtab_section, elf.readAtOffset(strtab_section.sh_offset, strtab_section.sh_size) + strtab_section, + elf.readAtOffset(strtab_section.sh_offset, strtab_section.sh_size), ) if sh_symtab is None or sh_strtab is None: @@ -1105,7 +1161,12 @@ def guess_os_from_go_buildinfo(elf: ELF) -> Optional[OS]: assert psize in (4, 8) is_big_endian = flags & 0b01 has_inline_strings = flags & 0b10 - logger.debug("go buildinfo: psize: %d big endian: %s inline: %s", psize, is_big_endian, has_inline_strings) + logger.debug( + "go buildinfo: psize: %d big endian: %s inline: %s", + psize, + is_big_endian, + has_inline_strings, + ) GOOS_TO_OS = { b"aix": OS.AIX, @@ -1466,7 +1527,12 @@ def guess_os_from_vdso_strings(elf: ELF) -> Optional[OS]: ("x86/32", b"__vdso_time", b"LINUX_2.6"), ): if symbol in buf and version in buf: - logger.debug("vdso string: %s %s %s", arch, symbol.decode("ascii"), version.decode("ascii")) + logger.debug( + "vdso string: %s %s %s", + arch, + symbol.decode("ascii"), + version.decode("ascii"), + ) return OS.LINUX return None diff --git a/capa/features/extractors/elffile.py b/capa/features/extractors/elffile.py index 55f68947..0460ab36 100644 --- a/capa/features/extractors/elffile.py +++ b/capa/features/extractors/elffile.py @@ -39,7 +39,11 @@ def extract_file_export_names(elf: ELFFile, **kwargs): logger.debug("Symbol table '%s' has a sh_entsize of zero!", section.name) continue - logger.debug("Symbol table '%s' contains %s entries:", section.name, section.num_symbols()) + logger.debug( + "Symbol table '%s' contains %s entries:", + section.name, + section.num_symbols(), + ) for symbol in section.iter_symbols(): # The following conditions are based on the following article diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index c975aeba..9ee6a8b2 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -97,7 +97,10 @@ class Address(HashableModel): return cls(type=AddressType.THREAD, value=(a.process.ppid, a.process.pid, a.tid)) elif isinstance(a, capa.features.address.DynamicCallAddress): - return cls(type=AddressType.CALL, value=(a.thread.process.ppid, a.thread.process.pid, a.thread.tid, a.id)) + return cls( + type=AddressType.CALL, + value=(a.thread.process.ppid, a.thread.process.pid, a.thread.tid, a.id), + ) elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress): return cls(type=AddressType.NO_ADDRESS, value=None) @@ -146,7 +149,8 @@ class Address(HashableModel): assert isinstance(pid, int) assert isinstance(tid, int) return capa.features.address.ThreadAddress( - process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), tid=tid + process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), + tid=tid, ) elif self.type is AddressType.CALL: @@ -154,7 +158,8 @@ class Address(HashableModel): ppid, pid, tid, id_ = self.value return capa.features.address.DynamicCallAddress( thread=capa.features.address.ThreadAddress( - process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), tid=tid + process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), + tid=tid, ), id=id_, ) diff --git a/capa/helpers.py b/capa/helpers.py index 17dc2e13..2e7f9755 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -309,7 +309,11 @@ def log_unsupported_vmray_report_error(error: str): def log_empty_sandbox_report_error(error: str, sandbox_name: str): logger.error("-" * 80) - logger.error(" %s report is empty or only contains little useful data: %s", sandbox_name, error) + logger.error( + " %s report is empty or only contains little useful data: %s", + sandbox_name, + error, + ) logger.error(" ") logger.error(" Please make sure the sandbox run captures useful behaviour of your sample.") logger.error("-" * 80) @@ -391,7 +395,10 @@ def is_cache_newer_than_rule_code(cache_dir: Path) -> bool: import capa.rules import capa.rules.cache - latest_rule_code_file = max([Path(capa.rules.__file__), Path(capa.rules.cache.__file__)], key=os.path.getmtime) + latest_rule_code_file = max( + [Path(capa.rules.__file__), Path(capa.rules.cache.__file__)], + key=os.path.getmtime, + ) rule_code_timestamp = Path(latest_rule_code_file).stat().st_mtime if rule_code_timestamp > cache_timestamp: diff --git a/capa/loader.py b/capa/loader.py index 463e0033..c7dac7b3 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -33,7 +33,11 @@ import capa.render.result_document as rdoc import capa.features.extractors.common from capa.rules import RuleSet from capa.engine import MatchResults -from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError +from capa.exceptions import ( + UnsupportedOSError, + UnsupportedArchError, + UnsupportedFormatError, +) from capa.features.common import ( OS_AUTO, FORMAT_PE, @@ -410,7 +414,9 @@ def get_extractor( # -1 - Generic errors (database already open, auto-analysis failed, etc.) # -2 - User cancelled operation ret = idapro.open_database( - str(input_path), run_auto_analysis=True, args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R" + str(input_path), + run_auto_analysis=True, + args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R", ) if ret != 0: raise RuntimeError("failed to analyze input file") diff --git a/capa/main.py b/capa/main.py index d502a937..dcb9383b 100644 --- a/capa/main.py +++ b/capa/main.py @@ -239,10 +239,16 @@ def install_common_args(parser, wanted=None): parser.add_argument("--version", action="version", version="%(prog)s {:s}".format(capa.version.__version__)) parser.add_argument( - "-v", "--verbose", action="store_true", help="enable verbose result document (no effect with --json)" + "-v", + "--verbose", + action="store_true", + help="enable verbose result document (no effect with --json)", ) parser.add_argument( - "-vv", "--vverbose", action="store_true", help="enable very verbose result document (no effect with --json)" + "-vv", + "--vverbose", + action="store_true", + help="enable very verbose result document (no effect with --json)", ) parser.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR") parser.add_argument("-q", "--quiet", action="store_true", help="disable all output but errors") @@ -857,7 +863,13 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr """ sig_paths = get_signatures_from_cli(args, input_format, backend) - should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) + should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ( + "0", + "no", + "NO", + "n", + None, + ) os_ = get_os_from_cli(args, backend) sample_path = get_sample_path_from_cli(args, backend) @@ -968,7 +980,9 @@ def main(argv: Optional[list[str]] = None): """) parser = argparse.ArgumentParser( - description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter + description=desc, + epilog=epilog, + formatter_class=argparse.RawDescriptionHelpFormatter, ) install_common_args( parser, diff --git a/capa/optimizer.py b/capa/optimizer.py index 4aa712ec..694c23a1 100644 --- a/capa/optimizer.py +++ b/capa/optimizer.py @@ -21,7 +21,14 @@ logger = logging.getLogger(__name__) def get_node_cost(node): - if isinstance(node, (capa.features.common.OS, capa.features.common.Arch, capa.features.common.Format)): + if isinstance( + node, + ( + capa.features.common.OS, + capa.features.common.Arch, + capa.features.common.Format, + ), + ): # we assume these are the most restrictive features: # authors commonly use them at the start of rules to restrict the category of samples to inspect return 0 @@ -32,7 +39,14 @@ def get_node_cost(node): # this should be all hash-lookup features. # see below. - elif isinstance(node, (capa.features.common.Substring, capa.features.common.Regex, capa.features.common.Bytes)): + elif isinstance( + node, + ( + capa.features.common.Substring, + capa.features.common.Regex, + capa.features.common.Bytes, + ), + ): # substring and regex features require a full scan of each string # which we anticipate is more expensive then a hash lookup feature (e.g. mnemonic or count). # diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index bf722a9d..ebd7e9a8 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -55,7 +55,11 @@ def hanging_indent(s: str, indent: int) -> str: def render_locations( - console: Console, layout: rd.Layout, locations: Iterable[frz.Address], indent: int, use_short_format: bool = False + console: Console, + layout: rd.Layout, + locations: Iterable[frz.Address], + indent: int, + use_short_format: bool = False, ): """ Render the given locations, such as virtual address or pid/tid/callid with process name. @@ -117,7 +121,13 @@ def render_locations( raise RuntimeError("unreachable") -def render_statement(console: Console, layout: rd.Layout, match: rd.Match, statement: rd.Statement, indent: int): +def render_statement( + console: Console, + layout: rd.Layout, + match: rd.Match, + statement: rd.Statement, + indent: int, +): console.write(" " * indent) if isinstance(statement, rd.SubscopeStatement): @@ -191,7 +201,12 @@ def render_string_value(s: str) -> str: def render_feature( - console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, feature: frzf.Feature, indent: int + console: Console, + layout: rd.Layout, + rule: rd.RuleMatches, + match: rd.Match, + feature: frzf.Feature, + indent: int, ): console.write(" " * indent) @@ -220,7 +235,13 @@ def render_feature( value = render_string_value(value) elif isinstance( - feature, (frzf.NumberFeature, frzf.OffsetFeature, frzf.OperandNumberFeature, frzf.OperandOffsetFeature) + feature, + ( + frzf.NumberFeature, + frzf.OffsetFeature, + frzf.OperandNumberFeature, + frzf.OperandOffsetFeature, + ), ): assert isinstance(value, int) value = capa.helpers.hex(value) @@ -275,7 +296,14 @@ def render_feature( console.writeln() -def render_node(console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, node: rd.Node, indent: int): +def render_node( + console: Console, + layout: rd.Layout, + rule: rd.RuleMatches, + match: rd.Match, + node: rd.Node, + indent: int, +): if isinstance(node, rd.StatementNode): render_statement(console, layout, match, node.statement, indent=indent) elif isinstance(node, rd.FeatureNode): @@ -293,7 +321,12 @@ MODE_FAILURE = "failure" def render_match( - console: Console, layout: rd.Layout, rule: rd.RuleMatches, match: rd.Match, indent=0, mode=MODE_SUCCESS + console: Console, + layout: rd.Layout, + rule: rd.RuleMatches, + match: rd.Match, + indent=0, + mode=MODE_SUCCESS, ): child_mode = mode if mode == MODE_SUCCESS: @@ -449,7 +482,10 @@ def render_rules(console: Console, doc: rd.ResultDocument): rows.append(("scope", rule.meta.scopes.dynamic.value)) if rule.meta.attack: - rows.append(("att&ck", ", ".join([rutils.format_parts_id(v) for v in rule.meta.attack]))) + rows.append(( + "att&ck", + ", ".join([rutils.format_parts_id(v) for v in rule.meta.attack]), + )) if rule.meta.mbc: rows.append(("mbc", ", ".join([rutils.format_parts_id(v) for v in rule.meta.mbc]))) @@ -502,9 +538,19 @@ def render_rules(console: Console, doc: rd.ResultDocument): console.write(v.render_thread(doc.meta.analysis.layout, location)) elif rule.meta.scopes.dynamic == capa.rules.Scope.SPAN_OF_CALLS: calls = sorted(set(collect_span_of_calls_locations(match))) - console.write(hanging_indent(v.render_span_of_calls(doc.meta.analysis.layout, calls), indent=1)) + console.write( + hanging_indent( + v.render_span_of_calls(doc.meta.analysis.layout, calls), + indent=1, + ) + ) elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL: - console.write(hanging_indent(v.render_call(doc.meta.analysis.layout, location), indent=1)) + console.write( + hanging_indent( + v.render_call(doc.meta.analysis.layout, location), + indent=1, + ) + ) else: capa.helpers.assert_never(rule.meta.scopes.dynamic) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 52a5ffc7..7219c99f 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -639,21 +639,35 @@ def build_statements(d, scopes: Scopes): key = list(d.keys())[0] description = pop_statement_description_entry(d[key]) if key == "and": - return ceng.And(unique(build_statements(dd, scopes) for dd in d[key]), description=description) + return ceng.And( + unique(build_statements(dd, scopes) for dd in d[key]), + description=description, + ) elif key == "or": - return ceng.Or(unique(build_statements(dd, scopes) for dd in d[key]), description=description) + return ceng.Or( + unique(build_statements(dd, scopes) for dd in d[key]), + description=description, + ) elif key == "not": if len(d[key]) != 1: raise InvalidRule("not statement must have exactly one child statement") return ceng.Not(build_statements(d[key][0], scopes), description=description) elif key.endswith(" or more"): count = int(key[: -len("or more")]) - return ceng.Some(count, unique(build_statements(dd, scopes) for dd in d[key]), description=description) + return ceng.Some( + count, + unique(build_statements(dd, scopes) for dd in d[key]), + description=description, + ) elif key == "optional": # `optional` is an alias for `0 or more` # which is useful for documenting behaviors, # like with `write file`, we might say that `WriteFile` is optionally found alongside `CreateFileA`. - return ceng.Some(0, unique(build_statements(dd, scopes) for dd in d[key]), description=description) + return ceng.Some( + 0, + unique(build_statements(dd, scopes) for dd in d[key]), + description=description, + ) elif key == "process": if not is_subscope_compatible(scopes.dynamic, Scope.PROCESS): @@ -663,7 +677,9 @@ def build_statements(d, scopes: Scopes): raise InvalidRule("subscope must have exactly one child statement") return ceng.Subscope( - Scope.PROCESS, build_statements(d[key][0], Scopes(dynamic=Scope.PROCESS)), description=description + Scope.PROCESS, + build_statements(d[key][0], Scopes(dynamic=Scope.PROCESS)), + description=description, ) elif key == "thread": @@ -674,7 +690,9 @@ def build_statements(d, scopes: Scopes): raise InvalidRule("subscope must have exactly one child statement") return ceng.Subscope( - Scope.THREAD, build_statements(d[key][0], Scopes(dynamic=Scope.THREAD)), description=description + Scope.THREAD, + build_statements(d[key][0], Scopes(dynamic=Scope.THREAD)), + description=description, ) elif key == "span of calls": @@ -698,7 +716,9 @@ def build_statements(d, scopes: Scopes): raise InvalidRule("subscope must have exactly one child statement") return ceng.Subscope( - Scope.CALL, build_statements(d[key][0], Scopes(dynamic=Scope.CALL)), description=description + Scope.CALL, + build_statements(d[key][0], Scopes(dynamic=Scope.CALL)), + description=description, ) elif key == "function": @@ -709,7 +729,9 @@ def build_statements(d, scopes: Scopes): raise InvalidRule("subscope must have exactly one child statement") return ceng.Subscope( - Scope.FUNCTION, build_statements(d[key][0], Scopes(static=Scope.FUNCTION)), description=description + Scope.FUNCTION, + build_statements(d[key][0], Scopes(static=Scope.FUNCTION)), + description=description, ) elif key == "basic block": @@ -720,7 +742,9 @@ def build_statements(d, scopes: Scopes): raise InvalidRule("subscope must have exactly one child statement") return ceng.Subscope( - Scope.BASIC_BLOCK, build_statements(d[key][0], Scopes(static=Scope.BASIC_BLOCK)), description=description + Scope.BASIC_BLOCK, + build_statements(d[key][0], Scopes(static=Scope.BASIC_BLOCK)), + description=description, ) elif key == "instruction": @@ -1589,7 +1613,14 @@ class RuleSet: # Other numbers are assumed to be uncommon. return 7 - elif isinstance(node, (capa.features.common.Substring, capa.features.common.Regex, capa.features.common.Bytes)): + elif isinstance( + node, + ( + capa.features.common.Substring, + capa.features.common.Regex, + capa.features.common.Bytes, + ), + ): # Scanning features (non-hashable), which we can't use for quick matching/filtering. return 0 @@ -1831,20 +1862,37 @@ class RuleSet: string_features = [ feature for feature in features - if isinstance(feature, (capa.features.common.Substring, capa.features.common.Regex)) + if isinstance( + feature, + (capa.features.common.Substring, capa.features.common.Regex), + ) ] hashable_features = [ feature for feature in features if not isinstance( - feature, (capa.features.common.Substring, capa.features.common.Regex, capa.features.common.Bytes) + feature, + ( + capa.features.common.Substring, + capa.features.common.Regex, + capa.features.common.Bytes, + ), ) ] - logger.debug("indexing: features: %d, score: %d, rule: %s", len(features), score, rule_name) + logger.debug( + "indexing: features: %d, score: %d, rule: %s", + len(features), + score, + rule_name, + ) scores_by_rule[rule_name] = score for feature in features: - logger.debug(" : [%d] %s", RuleSet._score_feature(scores_by_rule, feature), feature) + logger.debug( + " : [%d] %s", + RuleSet._score_feature(scores_by_rule, feature), + feature, + ) if string_features: string_rules[rule_name] = cast(list[Feature], string_features) @@ -1871,7 +1919,9 @@ class RuleSet: len([feature for feature, rules in rules_by_feature.items() if len(rules) > 3]), ) logger.debug( - "indexing: %d scanning string features, %d scanning bytes features", len(string_rules), bytes_rules_count + "indexing: %d scanning string features, %d scanning bytes features", + len(string_rules), + bytes_rules_count, ) return RuleSet._RuleFeatureIndex(rules_by_feature, string_rules, dict(bytes_prefix_index)) @@ -1934,13 +1984,23 @@ class RuleSet: for rule in rules: for k, v in rule.meta.items(): if isinstance(v, str) and tag in v: - logger.debug('using rule "%s" and dependencies, found tag in meta.%s: %s', rule.name, k, v) + logger.debug( + 'using rule "%s" and dependencies, found tag in meta.%s: %s', + rule.name, + k, + v, + ) rules_filtered.update(set(get_rules_and_dependencies(rules, rule.name))) break if isinstance(v, list): for vv in v: if tag in vv: - logger.debug('using rule "%s" and dependencies, found tag in meta.%s: %s', rule.name, k, vv) + logger.debug( + 'using rule "%s" and dependencies, found tag in meta.%s: %s', + rule.name, + k, + vv, + ) rules_filtered.update(set(get_rules_and_dependencies(rules, rule.name))) break return RuleSet(list(rules_filtered)) diff --git a/rules b/rules index 2af9fbfc..03a20f69 160000 --- a/rules +++ b/rules @@ -1 +1 @@ -Subproject commit 2af9fbfc1c9b4634dbeb76b5d34fca9389fa7f80 +Subproject commit 03a20f69ae05e4c48467b06ee69faaa773957684 diff --git a/tests/data b/tests/data index f41a1998..413fd280 160000 --- a/tests/data +++ b/tests/data @@ -1 +1 @@ -Subproject commit f41a1998b92d391a26858f8ae4e6c92ac7394411 +Subproject commit 413fd2803e0f45c7af1eb27a091a1d93221d5d04 diff --git a/tests/test_engine.py b/tests/test_engine.py index 88184f1a..1d4ba1f2 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -17,7 +17,13 @@ import pytest import capa.features.address from capa.engine import Or, And, Not, Some, Range from capa.features.insn import Number -from capa.features.address import ThreadAddress, ProcessAddress, DynamicCallAddress, DNTokenOffsetAddress, AbsoluteVirtualAddress +from capa.features.address import ( + ThreadAddress, + ProcessAddress, + DynamicCallAddress, + DNTokenOffsetAddress, + AbsoluteVirtualAddress, +) ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001) ADDR2 = capa.features.address.AbsoluteVirtualAddress(0x401002) @@ -138,7 +144,10 @@ def test_some(): def test_complex(): assert True is bool( - Or([And([Number(1), Number(2)]), Or([Number(3), Some(2, [Number(4), Number(5), Number(6)])])]).evaluate({ + Or([ + And([Number(1), Number(2)]), + Or([Number(3), Some(2, [Number(4), Number(5), Number(6)])]), + ]).evaluate({ Number(5): {ADDR1}, Number(6): {ADDR1}, Number(7): {ADDR1}, @@ -147,7 +156,10 @@ def test_complex(): ) assert False is bool( - Or([And([Number(1), Number(2)]), Or([Number(3), Some(2, [Number(4), Number(5)])])]).evaluate({ + Or([ + And([Number(1), Number(2)]), + Or([Number(3), Some(2, [Number(4), Number(5)])]), + ]).evaluate({ Number(5): {ADDR1}, Number(6): {ADDR1}, Number(7): {ADDR1}, diff --git a/tests/test_freeze_static.py b/tests/test_freeze_static.py index 16003fce..60b05328 100644 --- a/tests/test_freeze_static.py +++ b/tests/test_freeze_static.py @@ -29,7 +29,11 @@ import capa.features.extractors.null import capa.features.freeze.features import capa.features.extractors.base_extractor from capa.features.address import Address, AbsoluteVirtualAddress -from capa.features.extractors.base_extractor import BBHandle, SampleHashes, FunctionHandle +from capa.features.extractors.base_extractor import ( + BBHandle, + SampleHashes, + FunctionHandle, +) EXTRACTOR = capa.features.extractors.null.NullStaticFeatureExtractor( base_address=AbsoluteVirtualAddress(0x401000), @@ -40,28 +44,46 @@ EXTRACTOR = capa.features.extractors.null.NullStaticFeatureExtractor( ), global_features=[], file_features=[ - (AbsoluteVirtualAddress(0x402345), capa.features.common.Characteristic("embedded pe")), + ( + AbsoluteVirtualAddress(0x402345), + capa.features.common.Characteristic("embedded pe"), + ), ], functions={ AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.FunctionFeatures( features=[ - (AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("indirect call")), + ( + AbsoluteVirtualAddress(0x401000), + capa.features.common.Characteristic("indirect call"), + ), ], basic_blocks={ AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.BasicBlockFeatures( features=[ - (AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("tight loop")), + ( + AbsoluteVirtualAddress(0x401000), + capa.features.common.Characteristic("tight loop"), + ), ], instructions={ AbsoluteVirtualAddress(0x401000): capa.features.extractors.null.InstructionFeatures( features=[ - (AbsoluteVirtualAddress(0x401000), capa.features.insn.Mnemonic("xor")), - (AbsoluteVirtualAddress(0x401000), capa.features.common.Characteristic("nzxor")), + ( + AbsoluteVirtualAddress(0x401000), + capa.features.insn.Mnemonic("xor"), + ), + ( + AbsoluteVirtualAddress(0x401000), + capa.features.common.Characteristic("nzxor"), + ), ], ), AbsoluteVirtualAddress(0x401002): capa.features.extractors.null.InstructionFeatures( features=[ - (AbsoluteVirtualAddress(0x401002), capa.features.insn.Mnemonic("mov")), + ( + AbsoluteVirtualAddress(0x401002), + capa.features.insn.Mnemonic("mov"), + ), ], ), }, diff --git a/tests/test_render.py b/tests/test_render.py index c931dfea..316ea145 100644 --- a/tests/test_render.py +++ b/tests/test_render.py @@ -44,7 +44,12 @@ def test_render_offset(): def test_render_property(): assert ( - str(capa.features.insn.Property("System.IO.FileInfo::Length", access=capa.features.common.FeatureAccess.READ)) + str( + capa.features.insn.Property( + "System.IO.FileInfo::Length", + access=capa.features.common.FeatureAccess.READ, + ) + ) == "property/read(System.IO.FileInfo::Length)" ) @@ -184,7 +189,10 @@ def test_render_meta_maec(): (capa.features.common.Regex("^foo"), "regex: ^foo"), (capa.features.common.String("foo"), 'string: "foo" @ 0x401000'), (capa.features.common.Class("BeanFactory"), "class: BeanFactory @ 0x401000"), - (capa.features.common.Namespace("std::enterprise"), "namespace: std::enterprise @ 0x401000"), + ( + capa.features.common.Namespace("std::enterprise"), + "namespace: std::enterprise @ 0x401000", + ), (capa.features.insn.API("CreateFileW"), "api: CreateFileW @ 0x401000"), (capa.features.insn.Property("foo"), "property: foo @ 0x401000"), (capa.features.insn.Property("foo", "read"), "property/read: foo @ 0x401000"),