FeatureExtractor alias: add type casts to either StaticFeatureExtractor or DynamicFeatureExtractor

This commit is contained in:
Yacine Elhamer
2023-06-26 01:23:01 +01:00
parent 172e7a7649
commit 94fc7b4e9a
5 changed files with 33 additions and 7 deletions

View File

@@ -307,6 +307,13 @@ class DynamicFeatureExtractor:
This class is not instantiated directly; it is the base class for other implementations. This class is not instantiated directly; it is the base class for other implementations.
""" """
@abc.abstractmethod
def get_base_address(self) -> Union[AbsoluteVirtualAddress, capa.features.address._NoAddress]:
"""
fetch the preferred load address at which the sample was analyzed.
"""
raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def get_processes(self) -> Iterator[ProcessHandle]: def get_processes(self) -> Iterator[ProcessHandle]:
""" """

View File

@@ -13,7 +13,7 @@ import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_ import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process import capa.features.extractors.cape.process
from capa.features.common import Feature from capa.features.common import Feature
from capa.features.address import Address from capa.features.address import NO_ADDRESS, Address
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicFeatureExtractor from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicFeatureExtractor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -27,6 +27,9 @@ class CapeExtractor(DynamicFeatureExtractor):
self.global_features = capa.features.extractors.cape.global_.extract_features(self.static) self.global_features = capa.features.extractors.cape.global_.extract_features(self.static)
def get_base_address(self):
return NO_ADDRESS
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]: def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features yield from self.global_features

View File

@@ -20,7 +20,7 @@ import textwrap
import itertools import itertools
import contextlib import contextlib
import collections import collections
from typing import Any, Dict, List, Tuple, Callable from typing import Any, Dict, List, Tuple, Callable, cast
import halo import halo
import tqdm import tqdm
@@ -231,7 +231,12 @@ def find_code_capabilities(
def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
file_features = collections.defaultdict(set) # type: FeatureSet file_features = collections.defaultdict(set) # type: FeatureSet
for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()): if isinstance(extractor, StaticFeatureExtractor):
extractor_: StaticFeatureExtractor = cast(StaticFeatureExtractor, extractor)
else:
extractor_: DynamicFeatureExtractor = cast(DynamicFeatureExtractor, extractor)
for feature, va in itertools.chain(extractor_.extract_file_features(), extractor_.extract_global_features()):
# not all file features may have virtual addresses. # not all file features may have virtual addresses.
# if not, then at least ensure the feature shows up in the index. # if not, then at least ensure the feature shows up in the index.
# the set of addresses will still be empty. # the set of addresses will still be empty.
@@ -249,7 +254,7 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi
return matches, len(file_features) return matches, len(file_features)
def find_capabilities( def find_capabilities_static(
ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None
) -> Tuple[MatchResults, Any]: ) -> Tuple[MatchResults, Any]:
all_function_matches = collections.defaultdict(list) # type: MatchResults all_function_matches = collections.defaultdict(list) # type: MatchResults
@@ -334,6 +339,15 @@ def find_capabilities(
return matches, meta return matches, meta
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, **kwargs) -> Tuple[MatchResults, Any]:
if isinstance(extractor, StaticFeatureExtractor):
extractor_: StaticFeatureExtractor = cast(StaticFeatureExtractor, extractor)
return find_capabilities_static(ruleset, extractor_, kwargs)
else:
# extractor_ = cast(DynamicFeatureExtractor, extractor)
print("nni")
# TODO move all to helpers? # TODO move all to helpers?
def has_rule_with_namespace(rules, capabilities, rule_cat): def has_rule_with_namespace(rules, capabilities, rule_cat):
for rule_name in capabilities.keys(): for rule_name in capabilities.keys():
@@ -1252,7 +1266,7 @@ def main(argv=None):
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try: try:
extractor = get_extractor( extractor: FeatureExtractor = get_extractor(
args.sample, args.sample,
format_, format_,
args.os, args.os,

View File

@@ -46,6 +46,7 @@ import capa.helpers
import capa.features import capa.features
import capa.features.common import capa.features.common
import capa.features.freeze import capa.features.freeze
from capa.features.extractors.base_extractor import FeatureExtractor
logger = logging.getLogger("capa.profile") logger = logging.getLogger("capa.profile")
@@ -105,7 +106,7 @@ def main(argv=None):
with open(args.sample, "rb") as f: with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read()) extractor = capa.features.freeze.load(f.read())
else: else:
extractor = capa.main.get_extractor( extractor: FeatureExtractor = capa.main.get_extractor(
args.sample, args.format, args.os, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False args.sample, args.format, args.os, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False
) )

View File

@@ -70,6 +70,7 @@ import capa.render.result_document as rd
from capa.helpers import get_file_taste from capa.helpers import get_file_taste
from capa.features.common import FORMAT_AUTO from capa.features.common import FORMAT_AUTO
from capa.features.freeze import Address from capa.features.freeze import Address
from capa.features.extractors.base_extractor import FeatureExtractor
logger = logging.getLogger("capa.show-capabilities-by-function") logger = logging.getLogger("capa.show-capabilities-by-function")
@@ -166,7 +167,7 @@ def main(argv=None):
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
try: try:
extractor = capa.main.get_extractor( extractor: FeatureExtractor = capa.main.get_extractor(
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
) )
except capa.exceptions.UnsupportedFormatError: except capa.exceptions.UnsupportedFormatError: