mirror of
https://github.com/mandiant/capa.git
synced 2025-12-21 23:00:29 -08:00
Merge branch 'master' into dynamic-feature-extraction
This commit is contained in:
19
CHANGELOG.md
19
CHANGELOG.md
@@ -4,23 +4,21 @@
|
||||
|
||||
### New Features
|
||||
- ELF: implement file import and export name extractor #1607 @Aayush-Goel-04
|
||||
- Add a dynamic feature extractor for the CAPE sandbox @yelhamer [#1535](https://github.com/mandiant/capa/issues/1535)
|
||||
- Add unit tests for the new CAPE extractor #1563 @yelhamer
|
||||
- Add a CAPE file format and CAPE-based dynamic feature extraction to scripts/show-features.py #1566 @yelhamer
|
||||
- Add a new process scope for the dynamic analysis flavor #1517 @yelhamer
|
||||
- Add a new thread scope for the dynamic analysis flavor #1517 @yelhamer
|
||||
- Add support for flavor-based rule scopes @yelhamer
|
||||
- Add ProcessesAddress and ThreadAddress #1612 @yelhamer
|
||||
- Add dynamic capability extraction @yelhamer
|
||||
- Add support for mixed-scopes rules @yelhamer
|
||||
- Add a call scope @yelhamer
|
||||
- bump pydantic from 1.10.9 to 2.1.1 #1582 @Aayush-Goel-04
|
||||
- develop script to highlight the features that are not used during matching #331 @Aayush-Goel-04
|
||||
- implement dynamic analysis via CAPE sandbox #48 #1535 @yelhamer
|
||||
- add call scope #771 @yelhamer
|
||||
- add process scope for the dynamic analysis flavor #1517 @yelhamer
|
||||
- Add thread scope for the dynamic analysis flavor #1517 @yelhamer
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
### New Rules (4)
|
||||
|
||||
- executable/pe/export/forwarded-export ronnie.salomonsen@mandiant.com
|
||||
- host-interaction/bootloader/get-uefi-variable jakub.jozwiak@mandiant.com
|
||||
- host-interaction/bootloader/set-uefi-variable jakub.jozwiak@mandiant.com
|
||||
- nursery/enumerate-device-drivers-on-linux @mr-tz
|
||||
-
|
||||
|
||||
### Bug Fixes
|
||||
@@ -29,6 +27,7 @@
|
||||
- linter: skip native API check for NtProtectVirtualMemory #1675 @williballenthin
|
||||
|
||||
### capa explorer IDA Pro plugin
|
||||
- fix unhandled exception when resolving rule path #1693 @mike-hunhoff
|
||||
|
||||
### Development
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
[](https://pypi.org/project/flare-capa)
|
||||
[](https://github.com/mandiant/capa/releases)
|
||||
[](https://github.com/mandiant/capa-rules)
|
||||
[](https://github.com/mandiant/capa-rules)
|
||||
[](https://github.com/mandiant/capa/actions?query=workflow%3ACI+event%3Apush+branch%3Amaster)
|
||||
[](https://github.com/mandiant/capa/releases)
|
||||
[](LICENSE.txt)
|
||||
|
||||
@@ -136,8 +136,8 @@ class Feature(abc.ABC): # noqa: B024
|
||||
import capa.features.freeze.features
|
||||
|
||||
return (
|
||||
capa.features.freeze.features.feature_from_capa(self).json()
|
||||
< capa.features.freeze.features.feature_from_capa(other).json()
|
||||
capa.features.freeze.features.feature_from_capa(self).model_dump_json()
|
||||
< capa.features.freeze.features.feature_from_capa(other).model_dump_json()
|
||||
)
|
||||
|
||||
def get_name_str(self) -> str:
|
||||
|
||||
@@ -13,6 +13,7 @@ from typing import Any, Dict, Tuple, Union, Iterator
|
||||
from dataclasses import dataclass
|
||||
|
||||
# TODO(williballenthin): use typing.TypeAlias directly when Python 3.9 is deprecated
|
||||
# https://github.com/mandiant/capa/issues/1699
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
import capa.features.address
|
||||
|
||||
@@ -14,7 +14,7 @@ import logging
|
||||
from enum import Enum
|
||||
from typing import List, Tuple, Union
|
||||
|
||||
from pydantic import Field, BaseModel
|
||||
from pydantic import Field, BaseModel, ConfigDict
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
import capa.helpers
|
||||
@@ -38,8 +38,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HashableModel(BaseModel):
|
||||
class Config:
|
||||
frozen = True
|
||||
model_config = ConfigDict(frozen=True)
|
||||
|
||||
|
||||
class AddressType(str, Enum):
|
||||
@@ -57,7 +56,7 @@ class AddressType(str, Enum):
|
||||
|
||||
class Address(HashableModel):
|
||||
type: AddressType
|
||||
value: Union[int, Tuple[int, ...], None]
|
||||
value: Union[int, Tuple[int, ...], None] = None # None default value to support deserialization of NO_ADDRESS
|
||||
|
||||
@classmethod
|
||||
def from_capa(cls, a: capa.features.address.Address) -> "Address":
|
||||
@@ -271,9 +270,7 @@ class BasicBlockFeature(HashableModel):
|
||||
basic_block: Address = Field(alias="basic block")
|
||||
address: Address
|
||||
feature: Feature
|
||||
|
||||
class Config:
|
||||
allow_population_by_field_name = True
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
|
||||
class InstructionFeature(HashableModel):
|
||||
@@ -306,9 +303,7 @@ class FunctionFeatures(BaseModel):
|
||||
address: Address
|
||||
features: Tuple[FunctionFeature, ...]
|
||||
basic_blocks: Tuple[BasicBlockFeatures, ...] = Field(alias="basic blocks")
|
||||
|
||||
class Config:
|
||||
allow_population_by_field_name = True
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
|
||||
class CallFeatures(BaseModel):
|
||||
@@ -332,9 +327,7 @@ class StaticFeatures(BaseModel):
|
||||
global_: Tuple[GlobalFeature, ...] = Field(alias="global")
|
||||
file: Tuple[FileFeature, ...]
|
||||
functions: Tuple[FunctionFeatures, ...]
|
||||
|
||||
class Config:
|
||||
allow_population_by_field_name = True
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
|
||||
class DynamicFeatures(BaseModel):
|
||||
@@ -352,9 +345,7 @@ Features: TypeAlias = Union[StaticFeatures, DynamicFeatures]
|
||||
class Extractor(BaseModel):
|
||||
name: str
|
||||
version: str = capa.version.__version__
|
||||
|
||||
class Config:
|
||||
allow_population_by_field_name = True
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
|
||||
class Freeze(BaseModel):
|
||||
@@ -363,9 +354,7 @@ class Freeze(BaseModel):
|
||||
sample_hashes: SampleHashes
|
||||
extractor: Extractor
|
||||
features: Features
|
||||
|
||||
class Config:
|
||||
allow_population_by_field_name = True
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
|
||||
def dumps_static(extractor: StaticFeatureExtractor) -> str:
|
||||
@@ -467,7 +456,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
|
||||
) # type: ignore
|
||||
# Mypy is unable to recognise `base_address` as a argument due to alias
|
||||
|
||||
return freeze.json()
|
||||
return freeze.model_dump_json()
|
||||
|
||||
|
||||
def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
import binascii
|
||||
from typing import Union, Optional
|
||||
|
||||
from pydantic import Field, BaseModel
|
||||
from pydantic import Field, BaseModel, ConfigDict
|
||||
|
||||
import capa.features.file
|
||||
import capa.features.insn
|
||||
@@ -17,9 +17,7 @@ import capa.features.basicblock
|
||||
|
||||
|
||||
class FeatureModel(BaseModel):
|
||||
class Config:
|
||||
frozen = True
|
||||
allow_population_by_field_name = True
|
||||
model_config = ConfigDict(frozen=True, populate_by_name=True)
|
||||
|
||||
def to_capa(self) -> capa.features.common.Feature:
|
||||
if isinstance(self, OSFeature):
|
||||
@@ -213,141 +211,141 @@ def feature_from_capa(f: capa.features.common.Feature) -> "Feature":
|
||||
class OSFeature(FeatureModel):
|
||||
type: str = "os"
|
||||
os: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class ArchFeature(FeatureModel):
|
||||
type: str = "arch"
|
||||
arch: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class FormatFeature(FeatureModel):
|
||||
type: str = "format"
|
||||
format: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class MatchFeature(FeatureModel):
|
||||
type: str = "match"
|
||||
match: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class CharacteristicFeature(FeatureModel):
|
||||
type: str = "characteristic"
|
||||
characteristic: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class ExportFeature(FeatureModel):
|
||||
type: str = "export"
|
||||
export: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class ImportFeature(FeatureModel):
|
||||
type: str = "import"
|
||||
import_: str = Field(alias="import")
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class SectionFeature(FeatureModel):
|
||||
type: str = "section"
|
||||
section: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class FunctionNameFeature(FeatureModel):
|
||||
type: str = "function name"
|
||||
function_name: str = Field(alias="function name")
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class SubstringFeature(FeatureModel):
|
||||
type: str = "substring"
|
||||
substring: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class RegexFeature(FeatureModel):
|
||||
type: str = "regex"
|
||||
regex: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class StringFeature(FeatureModel):
|
||||
type: str = "string"
|
||||
string: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class ClassFeature(FeatureModel):
|
||||
type: str = "class"
|
||||
class_: str = Field(alias="class")
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class NamespaceFeature(FeatureModel):
|
||||
type: str = "namespace"
|
||||
namespace: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class BasicBlockFeature(FeatureModel):
|
||||
type: str = "basic block"
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class APIFeature(FeatureModel):
|
||||
type: str = "api"
|
||||
api: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class PropertyFeature(FeatureModel):
|
||||
type: str = "property"
|
||||
access: Optional[str]
|
||||
access: Optional[str] = None
|
||||
property: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class NumberFeature(FeatureModel):
|
||||
type: str = "number"
|
||||
number: Union[int, float]
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class BytesFeature(FeatureModel):
|
||||
type: str = "bytes"
|
||||
bytes: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class OffsetFeature(FeatureModel):
|
||||
type: str = "offset"
|
||||
offset: int
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class MnemonicFeature(FeatureModel):
|
||||
type: str = "mnemonic"
|
||||
mnemonic: str
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class OperandNumberFeature(FeatureModel):
|
||||
type: str = "operand number"
|
||||
index: int
|
||||
operand_number: int = Field(alias="operand number")
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class OperandOffsetFeature(FeatureModel):
|
||||
type: str = "operand offset"
|
||||
index: int
|
||||
operand_offset: int = Field(alias="operand offset")
|
||||
description: Optional[str]
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
Feature = Union[
|
||||
|
||||
@@ -573,10 +573,11 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
def ensure_capa_settings_rule_path(self):
|
||||
try:
|
||||
path: Path = Path(settings.user.get(CAPA_SETTINGS_RULE_PATH, ""))
|
||||
path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "")
|
||||
|
||||
# resolve rules directory - check self and settings first, then ask user
|
||||
if not path.exists():
|
||||
# pathlib.Path considers "" equivalent to "." so we first check if rule path is an empty string
|
||||
if not path or not Path(path).exists():
|
||||
# configure rules selection messagebox
|
||||
rules_message = QtWidgets.QMessageBox()
|
||||
rules_message.setIcon(QtWidgets.QMessageBox.Information)
|
||||
@@ -594,15 +595,15 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
if pressed == QtWidgets.QMessageBox.Cancel:
|
||||
raise UserCancelledError()
|
||||
|
||||
path = Path(self.ask_user_directory())
|
||||
path = self.ask_user_directory()
|
||||
if not path:
|
||||
raise UserCancelledError()
|
||||
|
||||
if not path.exists():
|
||||
if not Path(path).exists():
|
||||
logger.error("rule path %s does not exist or cannot be accessed", path)
|
||||
return False
|
||||
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH] = str(path)
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH] = path
|
||||
except UserCancelledError:
|
||||
capa.ida.helpers.inform_user_ida_ui("Analysis requires capa rules")
|
||||
logger.warning(
|
||||
@@ -1307,7 +1308,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
idaapi.info("No program analysis to save.")
|
||||
return
|
||||
|
||||
s = self.resdoc_cache.json().encode("utf-8")
|
||||
s = self.resdoc_cache.model_dump_json().encode("utf-8")
|
||||
|
||||
path = Path(self.ask_user_capa_json_file())
|
||||
if not path.exists():
|
||||
|
||||
@@ -11,4 +11,4 @@ from capa.engine import MatchResults
|
||||
|
||||
|
||||
def render(meta, rules: RuleSet, capabilities: MatchResults) -> str:
|
||||
return rd.ResultDocument.from_capa(meta, rules, capabilities).json(exclude_none=True)
|
||||
return rd.ResultDocument.from_capa(meta, rules, capabilities).model_dump_json(exclude_none=True)
|
||||
|
||||
@@ -127,7 +127,7 @@ def metadata_to_pb2(meta: rd.Metadata) -> capa_pb2.Metadata:
|
||||
timestamp=str(meta.timestamp),
|
||||
version=meta.version,
|
||||
argv=meta.argv,
|
||||
sample=google.protobuf.json_format.ParseDict(meta.sample.dict(), capa_pb2.Sample()),
|
||||
sample=google.protobuf.json_format.ParseDict(meta.sample.model_dump(), capa_pb2.Sample()),
|
||||
analysis=capa_pb2.Analysis(
|
||||
format=meta.analysis.format,
|
||||
arch=meta.analysis.arch,
|
||||
@@ -394,7 +394,7 @@ def match_to_pb2(match: rd.Match) -> capa_pb2.Match:
|
||||
def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
|
||||
# after manual type conversions to the RuleMetadata, we can rely on the protobuf json parser
|
||||
# conversions include tuple -> list and rd.Enum -> proto.enum
|
||||
meta = dict_tuple_to_list_values(rule_metadata.dict())
|
||||
meta = dict_tuple_to_list_values(rule_metadata.model_dump())
|
||||
meta["scope"] = scope_to_pb2(meta["scope"])
|
||||
meta["attack"] = list(map(dict_tuple_to_list_values, meta.get("attack", [])))
|
||||
meta["mbc"] = list(map(dict_tuple_to_list_values, meta.get("mbc", [])))
|
||||
|
||||
@@ -7,9 +7,9 @@
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import datetime
|
||||
import collections
|
||||
from typing import Dict, List, Tuple, Union, Optional
|
||||
from typing import Dict, List, Tuple, Union, Literal, Optional
|
||||
|
||||
from pydantic import Field, BaseModel
|
||||
from pydantic import Field, BaseModel, ConfigDict
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
import capa.rules
|
||||
@@ -24,14 +24,11 @@ from capa.helpers import assert_never
|
||||
|
||||
|
||||
class FrozenModel(BaseModel):
|
||||
class Config:
|
||||
frozen = True
|
||||
extra = "forbid"
|
||||
model_config = ConfigDict(frozen=True, extra="forbid")
|
||||
|
||||
|
||||
class Model(BaseModel):
|
||||
class Config:
|
||||
extra = "forbid"
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
|
||||
class Sample(Model):
|
||||
@@ -148,13 +145,13 @@ class CompoundStatement(StatementModel):
|
||||
|
||||
|
||||
class SomeStatement(StatementModel):
|
||||
type = "some"
|
||||
type: Literal["some"] = "some"
|
||||
description: Optional[str] = None
|
||||
count: int
|
||||
|
||||
|
||||
class RangeStatement(StatementModel):
|
||||
type = "range"
|
||||
type: Literal["range"] = "range"
|
||||
description: Optional[str] = None
|
||||
min: int
|
||||
max: int
|
||||
@@ -162,7 +159,7 @@ class RangeStatement(StatementModel):
|
||||
|
||||
|
||||
class SubscopeStatement(StatementModel):
|
||||
type = "subscope"
|
||||
type: Literal["subscope"] = "subscope"
|
||||
description: Optional[str] = None
|
||||
scope: capa.rules.Scope
|
||||
|
||||
@@ -177,7 +174,7 @@ Statement = Union[
|
||||
|
||||
|
||||
class StatementNode(FrozenModel):
|
||||
type = "statement"
|
||||
type: Literal["statement"] = "statement"
|
||||
statement: Statement
|
||||
|
||||
|
||||
@@ -214,7 +211,7 @@ def statement_from_capa(node: capa.engine.Statement) -> Statement:
|
||||
|
||||
|
||||
class FeatureNode(FrozenModel):
|
||||
type = "feature"
|
||||
type: Literal["feature"] = "feature"
|
||||
feature: frz.Feature
|
||||
|
||||
|
||||
@@ -543,15 +540,12 @@ class MaecMetadata(FrozenModel):
|
||||
malware_family: Optional[str] = Field(None, alias="malware-family")
|
||||
malware_category: Optional[str] = Field(None, alias="malware-category")
|
||||
malware_category_ov: Optional[str] = Field(None, alias="malware-category-ov")
|
||||
|
||||
class Config:
|
||||
frozen = True
|
||||
allow_population_by_field_name = True
|
||||
model_config = ConfigDict(frozen=True, populate_by_name=True)
|
||||
|
||||
|
||||
class RuleMetadata(FrozenModel):
|
||||
name: str
|
||||
namespace: Optional[str]
|
||||
namespace: Optional[str] = None
|
||||
authors: Tuple[str, ...]
|
||||
scopes: capa.rules.Scopes
|
||||
attack: Tuple[AttackSpec, ...] = Field(alias="att&ck")
|
||||
@@ -589,9 +583,7 @@ class RuleMetadata(FrozenModel):
|
||||
) # type: ignore
|
||||
# Mypy is unable to recognise arguments due to alias
|
||||
|
||||
class Config:
|
||||
frozen = True
|
||||
allow_population_by_field_name = True
|
||||
model_config = ConfigDict(frozen=True, populate_by_name=True)
|
||||
|
||||
|
||||
class RuleMatches(FrozenModel):
|
||||
|
||||
@@ -88,7 +88,7 @@ def render_statement(ostream, match: rd.Match, statement: rd.Statement, indent=0
|
||||
# so, we have to inline some of the feature rendering here.
|
||||
|
||||
child = statement.child
|
||||
value = child.dict(by_alias=True).get(child.type)
|
||||
value = child.model_dump(by_alias=True).get(child.type)
|
||||
|
||||
if value:
|
||||
if isinstance(child, frzf.StringFeature):
|
||||
@@ -141,7 +141,7 @@ def render_feature(ostream, match: rd.Match, feature: frzf.Feature, indent=0):
|
||||
value = feature.class_
|
||||
else:
|
||||
# convert attributes to dictionary using aliased names, if applicable
|
||||
value = feature.dict(by_alias=True).get(key)
|
||||
value = feature.model_dump(by_alias=True).get(key)
|
||||
|
||||
if value is None:
|
||||
raise ValueError(f"{key} contains None")
|
||||
|
||||
@@ -887,6 +887,33 @@ class Rule:
|
||||
|
||||
yield from self._extract_subscope_rules_rec(self.statement)
|
||||
|
||||
def _extract_all_features_rec(self, statement) -> Set[Feature]:
|
||||
feature_set: Set[Feature] = set()
|
||||
|
||||
for child in statement.get_children():
|
||||
if isinstance(child, Statement):
|
||||
feature_set.update(self._extract_all_features_rec(child))
|
||||
else:
|
||||
feature_set.add(child)
|
||||
return feature_set
|
||||
|
||||
def extract_all_features(self) -> Set[Feature]:
|
||||
"""
|
||||
recursively extracts all feature statements in this rule.
|
||||
|
||||
returns:
|
||||
set: A set of all feature statements contained within this rule.
|
||||
"""
|
||||
if not isinstance(self.statement, ceng.Statement):
|
||||
# For rules with single feature like
|
||||
# anti-analysis\obfuscation\obfuscated-with-advobfuscator.yml
|
||||
# contains a single feature - substring , which is of type String
|
||||
return {
|
||||
self.statement,
|
||||
}
|
||||
|
||||
return self._extract_all_features_rec(self.statement)
|
||||
|
||||
def evaluate(self, features: FeatureSet, short_circuit=True):
|
||||
capa.perf.counters["evaluate.feature"] += 1
|
||||
capa.perf.counters["evaluate.feature.rule"] += 1
|
||||
|
||||
@@ -48,7 +48,7 @@ dependencies = [
|
||||
"pyelftools==0.29",
|
||||
"dnfile==0.13.0",
|
||||
"dncil==1.0.2",
|
||||
"pydantic==1.10.9",
|
||||
"pydantic==2.1.1",
|
||||
"protobuf==4.23.4",
|
||||
]
|
||||
dynamic = ["version"]
|
||||
@@ -66,7 +66,7 @@ dev = [
|
||||
"pytest-sugar==0.9.7",
|
||||
"pytest-instafail==0.5.0",
|
||||
"pytest-cov==4.1.0",
|
||||
"flake8==6.0.0",
|
||||
"flake8==6.1.0",
|
||||
"flake8-bugbear==23.7.10",
|
||||
"flake8-encodings==0.5.0.post1",
|
||||
"flake8-comprehensions==3.14.0",
|
||||
@@ -77,14 +77,14 @@ dev = [
|
||||
"flake8-simplify==0.20.0",
|
||||
"flake8-use-pathlib==0.3.0",
|
||||
"flake8-copyright==0.2.4",
|
||||
"ruff==0.0.280",
|
||||
"ruff==0.0.282",
|
||||
"black==23.7.0",
|
||||
"isort==5.11.4",
|
||||
"mypy==1.4.1",
|
||||
"psutil==5.9.2",
|
||||
"stix2==3.0.1",
|
||||
"requests==2.31.0",
|
||||
"mypy-protobuf==3.4.0",
|
||||
"mypy-protobuf==3.5.0",
|
||||
# type stubs for mypy
|
||||
"types-backports==0.1.3",
|
||||
"types-colorama==0.4.15.11",
|
||||
@@ -93,7 +93,7 @@ dev = [
|
||||
"types-termcolor==1.1.4",
|
||||
"types-psutil==5.8.23",
|
||||
"types_requests==2.31.0.2",
|
||||
"types-protobuf==4.23.0.2",
|
||||
"types-protobuf==4.23.0.3",
|
||||
]
|
||||
build = [
|
||||
"pyinstaller==5.10.1",
|
||||
|
||||
2
rules
2
rules
Submodule rules updated: 7685a232d9...149cf2d133
@@ -142,8 +142,7 @@ def get_capa_results(args):
|
||||
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
|
||||
|
||||
return {"path": path, "status": "ok", "ok": doc.dict(exclude_none=True)}
|
||||
return {"path": path, "status": "ok", "ok": doc.model_dump()}
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
@@ -212,7 +211,9 @@ def main(argv=None):
|
||||
if result["status"] == "error":
|
||||
logger.warning(result["error"])
|
||||
elif result["status"] == "ok":
|
||||
results[result["path"].as_posix()] = rd.ResultDocument.parse_obj(result["ok"]).json(exclude_none=True)
|
||||
results[result["path"].as_posix()] = rd.ResultDocument.model_validate(result["ok"]).model_dump_json(
|
||||
exclude_none=True
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"unexpected status: {result['status']}")
|
||||
|
||||
|
||||
@@ -8,38 +8,17 @@
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
from typing import Set
|
||||
from pathlib import Path
|
||||
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine as ceng
|
||||
from capa.features.common import Feature
|
||||
|
||||
logger = logging.getLogger("detect_duplicate_features")
|
||||
|
||||
|
||||
def get_child_features(feature: ceng.Statement) -> list:
|
||||
"""
|
||||
Recursively extracts all feature statements from a given rule statement.
|
||||
|
||||
Args:
|
||||
feature (capa.engine.Statement): The feature statement to extract features from.
|
||||
|
||||
Returns:
|
||||
list: A list of all feature statements contained within the given feature statement.
|
||||
"""
|
||||
children = []
|
||||
|
||||
if isinstance(feature, (ceng.And, ceng.Or, ceng.Some)):
|
||||
for child in feature.children:
|
||||
children.extend(get_child_features(child))
|
||||
elif isinstance(feature, (ceng.Subscope, ceng.Range, ceng.Not)):
|
||||
children.extend(get_child_features(feature.child))
|
||||
else:
|
||||
children.append(feature)
|
||||
return children
|
||||
|
||||
|
||||
def get_features(rule_path: str) -> list:
|
||||
def get_features(rule_path: str) -> Set[Feature]:
|
||||
"""
|
||||
Extracts all features from a given rule file.
|
||||
|
||||
@@ -47,17 +26,15 @@ def get_features(rule_path: str) -> list:
|
||||
rule_path (str): The path to the rule file to extract features from.
|
||||
|
||||
Returns:
|
||||
list: A list of all feature statements contained within the rule file.
|
||||
set: A set of all feature statements contained within the rule file.
|
||||
"""
|
||||
feature_list = []
|
||||
with Path(rule_path).open("r", encoding="utf-8") as f:
|
||||
try:
|
||||
new_rule = capa.rules.Rule.from_yaml(f.read())
|
||||
feature_list = get_child_features(new_rule.statement)
|
||||
return new_rule.extract_all_features()
|
||||
except Exception as e:
|
||||
logger.error("Error: New rule %s %s %s", rule_path, str(type(e)), str(e))
|
||||
sys.exit(-1)
|
||||
return feature_list
|
||||
|
||||
|
||||
def find_overlapping_rules(new_rule_path, rules_path):
|
||||
@@ -67,7 +44,6 @@ def find_overlapping_rules(new_rule_path, rules_path):
|
||||
|
||||
# Loads features of new rule in a list.
|
||||
new_rule_features = get_features(new_rule_path)
|
||||
|
||||
count = 0
|
||||
overlapping_rules = []
|
||||
|
||||
@@ -75,7 +51,7 @@ def find_overlapping_rules(new_rule_path, rules_path):
|
||||
ruleset = capa.main.get_rules(rules_path)
|
||||
|
||||
for rule_name, rule in ruleset.rules.items():
|
||||
rule_features = get_child_features(rule.statement)
|
||||
rule_features = rule.extract_all_features()
|
||||
|
||||
if not len(rule_features):
|
||||
continue
|
||||
|
||||
@@ -309,7 +309,7 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]:
|
||||
|
||||
logger.debug("analyzing sample: %s", nice_path)
|
||||
extractor = capa.main.get_extractor(
|
||||
nice_path, format_, OS_AUTO, "", DEFAULT_SIGNATURES, False, disable_progress=True
|
||||
nice_path, format_, OS_AUTO, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True
|
||||
)
|
||||
|
||||
capabilities, _ = capa.main.find_capabilities(ctx.rules, extractor, disable_progress=True)
|
||||
|
||||
@@ -78,7 +78,7 @@ def main(argv=None):
|
||||
rdpb.ParseFromString(pb)
|
||||
|
||||
rd = capa.render.proto.doc_from_pb2(rdpb)
|
||||
print(rd.json(exclude_none=True, indent=2, sort_keys=True))
|
||||
print(rd.model_dump_json(exclude_none=True, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
220
scripts/show-unused-features.py
Normal file
220
scripts/show-unused-features.py
Normal file
@@ -0,0 +1,220 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import typing
|
||||
import logging
|
||||
import argparse
|
||||
from typing import Set, Tuple
|
||||
from pathlib import Path
|
||||
from collections import Counter
|
||||
|
||||
import tabulate
|
||||
from termcolor import colored
|
||||
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.helpers
|
||||
import capa.features
|
||||
import capa.exceptions
|
||||
import capa.render.verbose as v
|
||||
import capa.features.common
|
||||
import capa.features.freeze
|
||||
import capa.features.address
|
||||
import capa.features.extractors.pefile
|
||||
import capa.features.extractors.base_extractor
|
||||
from capa.helpers import log_unsupported_runtime_error
|
||||
from capa.features.common import Feature
|
||||
from capa.features.extractors.base_extractor import FunctionHandle, StaticFeatureExtractor
|
||||
|
||||
logger = logging.getLogger("show-unused-features")
|
||||
|
||||
|
||||
def format_address(addr: capa.features.address.Address) -> str:
|
||||
return v.format_address(capa.features.freeze.Address.from_capa((addr)))
|
||||
|
||||
|
||||
def get_rules_feature_set(rules_path) -> Set[Feature]:
|
||||
ruleset = capa.main.get_rules(rules_path)
|
||||
rules_feature_set: Set[Feature] = set()
|
||||
for _, rule in ruleset.rules.items():
|
||||
rules_feature_set.update(rule.extract_all_features())
|
||||
|
||||
return rules_feature_set
|
||||
|
||||
|
||||
def get_file_features(
|
||||
functions: Tuple[FunctionHandle, ...], extractor: capa.features.extractors.base_extractor.StaticFeatureExtractor
|
||||
) -> typing.Counter[Feature]:
|
||||
feature_map: typing.Counter[Feature] = Counter()
|
||||
|
||||
for f in functions:
|
||||
if extractor.is_library_function(f.address):
|
||||
function_name = extractor.get_function_name(f.address)
|
||||
logger.debug("skipping library function %s (%s)", format_address(f.address), function_name)
|
||||
continue
|
||||
|
||||
for feature, _ in extractor.extract_function_features(f):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
continue
|
||||
feature_map.update([feature])
|
||||
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
for feature, _ in extractor.extract_basic_block_features(f, bb):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
continue
|
||||
feature_map.update([feature])
|
||||
|
||||
for insn in extractor.get_instructions(f, bb):
|
||||
for feature, _ in extractor.extract_insn_features(f, bb, insn):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
continue
|
||||
feature_map.update([feature])
|
||||
return feature_map
|
||||
|
||||
|
||||
def get_colored(s: str):
|
||||
if "(" in s and ")" in s:
|
||||
s_split = s.split("(", 1)
|
||||
s_color = colored(s_split[1][:-1], "cyan")
|
||||
return f"{s_split[0]}({s_color})"
|
||||
else:
|
||||
return colored(s, "cyan")
|
||||
|
||||
|
||||
def print_unused_features(feature_map: typing.Counter[Feature], rules_feature_set: Set[Feature]):
|
||||
unused_features = []
|
||||
for feature, count in reversed(feature_map.most_common()):
|
||||
if feature in rules_feature_set:
|
||||
continue
|
||||
unused_features.append((str(count), get_colored(str(feature))))
|
||||
print("\n")
|
||||
print(tabulate.tabulate(unused_features, headers=["Count", "Feature"], tablefmt="plain"))
|
||||
print("\n")
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Show the features that capa doesn't have rules for yet")
|
||||
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend", "rules"})
|
||||
|
||||
parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
if args.function and args.backend == "pefile":
|
||||
print("pefile backend does not support extracting function features")
|
||||
return -1
|
||||
|
||||
try:
|
||||
taste = capa.helpers.get_file_taste(Path(args.sample))
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
try:
|
||||
sig_paths = capa.main.get_signatures(args.signatures)
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
if (args.format == "freeze") or (
|
||||
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
|
||||
):
|
||||
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
|
||||
else:
|
||||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
||||
try:
|
||||
extractor = capa.main.get_extractor(
|
||||
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
|
||||
)
|
||||
except capa.exceptions.UnsupportedFormatError:
|
||||
capa.helpers.log_unsupported_format_error()
|
||||
return -1
|
||||
except capa.exceptions.UnsupportedRuntimeError:
|
||||
log_unsupported_runtime_error()
|
||||
return -1
|
||||
|
||||
assert isinstance(extractor, StaticFeatureExtractor), "only static analysis supported today"
|
||||
|
||||
feature_map: typing.Counter[Feature] = Counter()
|
||||
|
||||
feature_map.update([feature for feature, _ in extractor.extract_global_features()])
|
||||
|
||||
function_handles: Tuple[FunctionHandle, ...]
|
||||
if isinstance(extractor, capa.features.extractors.pefile.PefileFeatureExtractor):
|
||||
# pefile extractor doesn't extract function features
|
||||
function_handles = ()
|
||||
else:
|
||||
function_handles = tuple(extractor.get_functions())
|
||||
|
||||
if args.function:
|
||||
if args.format == "freeze":
|
||||
function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles))
|
||||
else:
|
||||
function_handles = tuple(filter(lambda fh: format_address(fh.address) == args.function, function_handles))
|
||||
|
||||
if args.function not in [format_address(fh.address) for fh in function_handles]:
|
||||
print(f"{args.function} not a function")
|
||||
return -1
|
||||
|
||||
if len(function_handles) == 0:
|
||||
print(f"{args.function} not a function")
|
||||
return -1
|
||||
|
||||
feature_map.update(get_file_features(function_handles, extractor))
|
||||
|
||||
rules_feature_set = get_rules_feature_set(args.rules)
|
||||
|
||||
print_unused_features(feature_map, rules_feature_set)
|
||||
return 0
|
||||
|
||||
|
||||
def ida_main():
|
||||
import idc
|
||||
|
||||
import capa.main
|
||||
import capa.features.extractors.ida.extractor
|
||||
|
||||
function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START)
|
||||
print(f"getting features for current function {hex(function)}")
|
||||
|
||||
extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor()
|
||||
feature_map: typing.Counter[Feature] = Counter()
|
||||
|
||||
feature_map.update([feature for feature, _ in extractor.extract_file_features()])
|
||||
|
||||
function_handles = tuple(extractor.get_functions())
|
||||
|
||||
if function:
|
||||
function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles))
|
||||
|
||||
if len(function_handles) == 0:
|
||||
print(f"{hex(function)} not a function")
|
||||
return -1
|
||||
|
||||
feature_map.update(get_file_features(function_handles, extractor))
|
||||
|
||||
rules_path = capa.main.get_default_root() / "rules"
|
||||
rules_feature_set = get_rules_feature_set([rules_path])
|
||||
|
||||
print_unused_features(feature_map, rules_feature_set)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if capa.helpers.is_runtime_ida():
|
||||
ida_main()
|
||||
else:
|
||||
sys.exit(main())
|
||||
@@ -236,7 +236,7 @@ def test_basic_block_node_from_capa():
|
||||
def assert_round_trip(rd: rdoc.ResultDocument):
|
||||
one = rd
|
||||
|
||||
doc = one.json(exclude_none=True)
|
||||
doc = one.model_dump_json(exclude_none=True)
|
||||
two = rdoc.ResultDocument.parse_raw(doc)
|
||||
|
||||
# show the round trip works
|
||||
@@ -244,14 +244,14 @@ def assert_round_trip(rd: rdoc.ResultDocument):
|
||||
# which works thanks to pydantic model equality.
|
||||
assert one == two
|
||||
# second by showing their json representations are the same.
|
||||
assert one.json(exclude_none=True) == two.json(exclude_none=True)
|
||||
assert one.model_dump_json(exclude_none=True) == two.model_dump_json(exclude_none=True)
|
||||
|
||||
# now show that two different versions are not equal.
|
||||
three = copy.deepcopy(two)
|
||||
three.meta.__dict__.update({"version": "0.0.0"})
|
||||
assert one.meta.version != three.meta.version
|
||||
assert one != three
|
||||
assert one.json(exclude_none=True) != three.json(exclude_none=True)
|
||||
assert one.model_dump_json(exclude_none=True) != three.model_dump_json(exclude_none=True)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
@@ -51,6 +51,7 @@ def get_rule_path():
|
||||
),
|
||||
pytest.param("show-features.py", [get_file_path()]),
|
||||
pytest.param("show-features.py", ["-F", "0x407970", get_file_path()]),
|
||||
pytest.param("show-unused-features.py", [get_file_path()]),
|
||||
pytest.param(
|
||||
"capa_as_library.py", [get_file_path()], marks=pytest.mark.xfail(reason="relies on legacy ruleset")
|
||||
),
|
||||
|
||||
Reference in New Issue
Block a user