diff --git a/CHANGELOG.md b/CHANGELOG.md index 2985c720..e8f5f4e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Python 3.11 support #1192 @williballenthin - dotnet: emit calls to/from MethodDef methods #1236 @mike-hunhoff - dotnet: emit namespace/class features for ldvirtftn/ldftn instructions #1241 @mike-hunhoff +- dotnet: emit namespace/class features for type references #1242 @mike-hunhoff ### Breaking Changes diff --git a/capa/features/extractors/dnfile/extractor.py b/capa/features/extractors/dnfile/extractor.py index 3adb4947..bd4b9c9e 100644 --- a/capa/features/extractors/dnfile/extractor.py +++ b/capa/features/extractors/dnfile/extractor.py @@ -8,7 +8,8 @@ from __future__ import annotations -from typing import Dict, List, Tuple, Iterator, Optional +from enum import Enum +from typing import Dict, List, Tuple, Union, Iterator, Optional import dnfile from dncil.cil.opcode import OpCodes @@ -19,8 +20,51 @@ import capa.features.extractors.dnfile.insn import capa.features.extractors.dnfile.function from capa.features.common import Feature from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress +from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor -from capa.features.extractors.dnfile.helpers import get_dotnet_managed_method_bodies +from capa.features.extractors.dnfile.helpers import ( + get_dotnet_types, + get_dotnet_fields, + get_dotnet_managed_imports, + get_dotnet_managed_methods, + get_dotnet_unmanaged_imports, + get_dotnet_managed_method_bodies, +) + + +class DnFileFeatureExtractorCache: + def __init__(self, pe: dnfile.dnPE): + self.imports: Dict[int, Union[DnType, DnUnmanagedMethod]] = {} + self.native_imports: Dict[int, Union[DnType, DnUnmanagedMethod]] = {} + self.methods: Dict[int, Union[DnType, DnUnmanagedMethod]] = {} + self.fields: Dict[int, Union[DnType, DnUnmanagedMethod]] = {} + self.types: Dict[int, Union[DnType, DnUnmanagedMethod]] = {} + + for import_ in get_dotnet_managed_imports(pe): + self.imports[import_.token] = import_ + for native_import in get_dotnet_unmanaged_imports(pe): + self.native_imports[native_import.token] = native_import + for method in get_dotnet_managed_methods(pe): + self.methods[method.token] = method + for field in get_dotnet_fields(pe): + self.fields[field.token] = field + for type_ in get_dotnet_types(pe): + self.types[type_.token] = type_ + + def get_import(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]: + return self.imports.get(token, None) + + def get_native_import(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]: + return self.native_imports.get(token, None) + + def get_method(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]: + return self.methods.get(token, None) + + def get_field(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]: + return self.fields.get(token, None) + + def get_type(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]: + return self.types.get(token, None) class DnfileFeatureExtractor(FeatureExtractor): @@ -28,6 +72,10 @@ class DnfileFeatureExtractor(FeatureExtractor): super().__init__() self.pe: dnfile.dnPE = dnfile.dnPE(path) + # pre-compute .NET token lookup tables; each .NET method has access to this cache for feature extraction + # most relevant at instruction scope + self.token_cache: DnFileFeatureExtractorCache = DnFileFeatureExtractorCache(self.pe) + # pre-compute these because we'll yield them at *every* scope. self.global_features: List[Tuple[Feature, Address]] = [] self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_os(pe=self.pe)) @@ -47,7 +95,9 @@ class DnfileFeatureExtractor(FeatureExtractor): methods: Dict[Address, FunctionHandle] = {} for (token, method) in get_dotnet_managed_method_bodies(self.pe): fh: FunctionHandle = FunctionHandle( - address=DNTokenAddress(token), inner=method, ctx={"pe": self.pe, "calls_from": set(), "calls_to": set()} + address=DNTokenAddress(token), + inner=method, + ctx={"pe": self.pe, "calls_from": set(), "calls_to": set(), "cache": self.token_cache}, ) # method tokens should be unique diff --git a/capa/features/extractors/dnfile/helpers.py b/capa/features/extractors/dnfile/helpers.py index 086ad3f8..d79d802b 100644 --- a/capa/features/extractors/dnfile/helpers.py +++ b/capa/features/extractors/dnfile/helpers.py @@ -18,6 +18,7 @@ from dncil.clr.token import Token, StringToken, InvalidToken from dncil.cil.body.reader import CilMethodBodyReaderBase from capa.features.common import FeatureAccess +from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod logger = logging.getLogger(__name__) @@ -40,70 +41,6 @@ class DnfileMethodBodyReader(CilMethodBodyReaderBase): return self.offset -class DnType(object): - def __init__(self, token: int, class_: str, namespace: str = "", member: str = "", access: Optional[str] = None): - self.token = token - # property access - self.access = access - self.namespace = namespace - self.class_ = class_ - if member == ".ctor": - member = "ctor" - if member == ".cctor": - member = "cctor" - self.member = member - - def __hash__(self): - return hash((self.token, self.access, self.namespace, self.class_, self.member)) - - def __eq__(self, other): - return ( - self.token == other.token - and self.access == other.access - and self.namespace == other.namespace - and self.class_ == other.class_ - and self.member == other.member - ) - - def __str__(self): - return DnType.format_name(self.class_, namespace=self.namespace, member=self.member) - - def __repr__(self): - return str(self) - - @staticmethod - def format_name(class_: str, namespace: str = "", member: str = ""): - # like File::OpenRead - name: str = f"{class_}::{member}" if member else class_ - if namespace: - # like System.IO.File::OpenRead - name = f"{namespace}.{name}" - return name - - -class DnUnmanagedMethod: - def __init__(self, token: int, module: str, method: str): - self.token: int = token - self.module: str = module - self.method: str = method - - def __hash__(self): - return hash((self.token, self.module, self.method)) - - def __eq__(self, other): - return self.token == other.token and self.module == other.module and self.method == other.method - - def __str__(self): - return DnUnmanagedMethod.format_name(self.module, self.method) - - def __repr__(self): - return str(self) - - @staticmethod - def format_name(module, method): - return f"{module}.{method}" - - def resolve_dotnet_token(pe: dnfile.dnPE, token: Token) -> Union[dnfile.base.MDTableRow, InvalidToken, str]: """map generic token to string or table row""" assert pe.net is not None @@ -363,6 +300,21 @@ def get_dotnet_unmanaged_imports(pe: dnfile.dnPE) -> Iterator[DnUnmanagedMethod] yield DnUnmanagedMethod(token, module, method) +def get_dotnet_types(pe: dnfile.dnPE) -> Iterator[DnType]: + """get .NET types from TypeDef and TypeRef tables""" + for (rid, typedef) in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number): + assert isinstance(typedef, dnfile.mdtable.TypeDefRow) + + typedef_token: int = calculate_dotnet_token_value(dnfile.mdtable.TypeDef.number, rid) + yield DnType(typedef_token, typedef.TypeName, namespace=typedef.TypeNamespace) + + for (rid, typeref) in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number): + assert isinstance(typeref, dnfile.mdtable.TypeRefRow) + + typeref_token: int = calculate_dotnet_token_value(dnfile.mdtable.TypeRef.number, rid) + yield DnType(typeref_token, typeref.TypeName, namespace=typeref.TypeNamespace) + + def calculate_dotnet_token_value(table: int, rid: int) -> int: return ((table & 0xFF) << Token.TABLE_SHIFT) | (rid & Token.RID_MASK) diff --git a/capa/features/extractors/dnfile/insn.py b/capa/features/extractors/dnfile/insn.py index 2e8b7b73..fb95d5cd 100644 --- a/capa/features/extractors/dnfile/insn.py +++ b/capa/features/extractors/dnfile/insn.py @@ -9,7 +9,10 @@ from __future__ import annotations import logging -from typing import Dict, Tuple, Union, Iterator, Optional +from typing import TYPE_CHECKING, Any, Dict, Tuple, Union, Iterator, Optional + +if TYPE_CHECKING: + from capa.features.extractors.dnfile.extractor import DnFileFeatureExtractorCache import dnfile from dncil.clr.token import Token, StringToken, InvalidToken @@ -19,78 +22,42 @@ import capa.features.extractors.helpers from capa.features.insn import API, Number, Property from capa.features.common import Class, String, Feature, Namespace, FeatureAccess, Characteristic from capa.features.address import Address +from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle from capa.features.extractors.dnfile.helpers import ( - DnType, - DnUnmanagedMethod, - get_dotnet_fields, resolve_dotnet_token, read_dotnet_user_string, - get_dotnet_managed_imports, - get_dotnet_managed_methods, calculate_dotnet_token_value, - get_dotnet_unmanaged_imports, ) logger = logging.getLogger(__name__) -def get_managed_imports(ctx: Dict) -> Dict: - if "managed_imports_cache" not in ctx: - ctx["managed_imports_cache"] = {} - for method in get_dotnet_managed_imports(ctx["pe"]): - ctx["managed_imports_cache"][method.token] = method - return ctx["managed_imports_cache"] - - -def get_unmanaged_imports(ctx: Dict) -> Dict: - if "unmanaged_imports_cache" not in ctx: - ctx["unmanaged_imports_cache"] = {} - for imp in get_dotnet_unmanaged_imports(ctx["pe"]): - ctx["unmanaged_imports_cache"][imp.token] = imp - return ctx["unmanaged_imports_cache"] - - -def get_methods(ctx: Dict) -> Dict: - if "methods_cache" not in ctx: - ctx["methods_cache"] = {} - for method in get_dotnet_managed_methods(ctx["pe"]): - ctx["methods_cache"][method.token] = method - return ctx["methods_cache"] - - -def get_fields(ctx: Dict) -> Dict: - if "fields_cache" not in ctx: - ctx["fields_cache"] = {} - for field in get_dotnet_fields(ctx["pe"]): - ctx["fields_cache"][field.token] = field - return ctx["fields_cache"] - - -def get_callee(ctx: Dict, token: Token) -> Union[DnType, DnUnmanagedMethod, None]: +def get_callee( + pe: dnfile.dnPE, cache: DnFileFeatureExtractorCache, token: Token +) -> Optional[Union[DnType, DnUnmanagedMethod]]: """map .NET token to un/managed (generic) method""" - row: Union[dnfile.base.MDTableRow, InvalidToken, str] = resolve_dotnet_token(ctx["pe"], token) - if not isinstance(row, (dnfile.mdtable.MethodDefRow, dnfile.mdtable.MemberRefRow, dnfile.mdtable.MethodSpecRow)): - # we only handle MethodDef (internal), MemberRef (external), and MethodSpec (generic) - return None - token_: int - if isinstance(row, dnfile.mdtable.MethodSpecRow): + if token.table == dnfile.mdtable.MethodSpec.number: # map MethodSpec to MethodDef or MemberRef + row: Union[dnfile.base.MDTableRow, InvalidToken, str] = resolve_dotnet_token(pe, token) + assert isinstance(row, dnfile.mdtable.MethodSpecRow) + if row.Method.table is None: logger.debug("MethodSpec[0x%X] Method table is None", token.rid) return None + token_ = calculate_dotnet_token_value(row.Method.table.number, row.Method.row_index) else: token_ = token.value - callee: Union[DnType, DnUnmanagedMethod, None] = get_managed_imports(ctx).get(token_, None) + callee: Optional[Union[DnType, DnUnmanagedMethod]] = cache.get_import(token_) if callee is None: # we must check unmanaged imports before managed methods because we map forwarded managed methods # to their unmanaged imports; we prefer a forwarded managed method be mapped to its unmanaged import for analysis - callee = get_unmanaged_imports(ctx).get(token_, None) + callee = cache.get_native_import(token_) if callee is None: - callee = get_methods(ctx).get(token_, None) + callee = cache.get_method(token_) return callee @@ -104,7 +71,7 @@ def extract_insn_api_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterato ): return - callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, ih.inner.operand) + callee: Optional[Union[DnType, DnUnmanagedMethod]] = get_callee(fh.ctx["pe"], fh.ctx["cache"], ih.inner.operand) if isinstance(callee, DnType): # ignore methods used to access properties if callee.access is None: @@ -123,7 +90,7 @@ def extract_insn_property_features(fh: FunctionHandle, bh, ih: InsnHandle) -> It if ih.inner.opcode in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp): # property access via MethodDef or MemberRef - callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, ih.inner.operand) + callee: Optional[Union[DnType, DnUnmanagedMethod]] = get_callee(fh.ctx["pe"], fh.ctx["cache"], ih.inner.operand) if isinstance(callee, DnType): if callee.access is not None: name = str(callee) @@ -131,14 +98,14 @@ def extract_insn_property_features(fh: FunctionHandle, bh, ih: InsnHandle) -> It elif ih.inner.opcode in (OpCodes.Ldfld, OpCodes.Ldflda, OpCodes.Ldsfld, OpCodes.Ldsflda): # property read via Field - read_field: Optional[DnType] = get_fields(fh.ctx).get(ih.inner.operand.value, None) + read_field: Optional[Union[DnType, DnUnmanagedMethod]] = fh.ctx["cache"].get_field(ih.inner.operand.value) if read_field is not None: name = str(read_field) access = FeatureAccess.READ elif ih.inner.opcode in (OpCodes.Stfld, OpCodes.Stsfld): # property write via Field - write_field: Optional[DnType] = get_fields(fh.ctx).get(ih.inner.operand.value, None) + write_field: Optional[Union[DnType, DnUnmanagedMethod]] = fh.ctx["cache"].get_field(ih.inner.operand.value) if write_field is not None: name = str(write_field) access = FeatureAccess.WRITE @@ -149,8 +116,12 @@ def extract_insn_property_features(fh: FunctionHandle, bh, ih: InsnHandle) -> It yield Property(name), ih.address -def extract_insn_class_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterator[Tuple[Class, Address]]: - """parse instruction class features""" +def extract_insn_namespace_class_features( + fh: FunctionHandle, bh, ih: InsnHandle +) -> Iterator[Tuple[Union[Namespace, Class], Address]]: + """parse instruction namespace and class features""" + type_: Optional[Union[DnType, DnUnmanagedMethod]] = None + if ih.inner.opcode in ( OpCodes.Call, OpCodes.Callvirt, @@ -160,9 +131,7 @@ def extract_insn_class_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Itera OpCodes.Newobj, ): # method call - includes managed methods (MethodDef, TypeRef) and properties (MethodSemantics, TypeRef) - callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, ih.inner.operand) - if isinstance(callee, DnType): - yield Class(DnType.format_name(callee.class_, namespace=callee.namespace)), ih.address + type_ = get_callee(fh.ctx["pe"], fh.ctx["cache"], ih.inner.operand) elif ih.inner.opcode in ( OpCodes.Ldfld, @@ -173,37 +142,35 @@ def extract_insn_class_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Itera OpCodes.Stsfld, ): # field access - field: Optional[DnType] = get_fields(fh.ctx).get(ih.inner.operand.value, None) - if isinstance(field, DnType): - yield Class(DnType.format_name(field.class_, namespace=field.namespace)), ih.address - - -def extract_insn_namespace_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterator[Tuple[Namespace, Address]]: - """parse instruction namespace features""" - if ih.inner.opcode in ( - OpCodes.Call, - OpCodes.Callvirt, - OpCodes.Jmp, - OpCodes.Ldvirtftn, - OpCodes.Ldftn, - OpCodes.Newobj, - ): - # method call - includes managed methods (MethodDef, TypeRef) and properties (MethodSemantics, TypeRef) - callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, ih.inner.operand) - if isinstance(callee, DnType) and callee.namespace is not None: - yield Namespace(callee.namespace), ih.address + type_ = fh.ctx["cache"].get_field(ih.inner.operand.value) + # ECMA 335 VI.C.4.10 elif ih.inner.opcode in ( - OpCodes.Ldfld, - OpCodes.Ldflda, - OpCodes.Ldsfld, - OpCodes.Ldsflda, - OpCodes.Stfld, - OpCodes.Stsfld, + OpCodes.Initobj, + OpCodes.Box, + OpCodes.Castclass, + OpCodes.Cpobj, + OpCodes.Isinst, + OpCodes.Ldelem, + OpCodes.Ldelema, + OpCodes.Ldobj, + OpCodes.Mkrefany, + OpCodes.Newarr, + OpCodes.Refanyval, + OpCodes.Sizeof, + OpCodes.Stobj, + OpCodes.Unbox, + OpCodes.Constrained, + OpCodes.Stelem, + OpCodes.Unbox_Any, ): - field: Optional[DnType] = get_fields(fh.ctx).get(ih.inner.operand.value, None) - if isinstance(field, DnType) and field.namespace is not None: - yield Namespace(field.namespace), ih.address + # type access + type_ = fh.ctx["cache"].get_type(ih.inner.operand.value) + + if isinstance(type_, DnType): + yield Class(DnType.format_name(type_.class_, namespace=type_.namespace)), ih.address + if type_.namespace: + yield Namespace(type_.namespace), ih.address def extract_insn_number_features(fh, bh, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]: @@ -230,7 +197,7 @@ def extract_insn_string_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iter def extract_unmanaged_call_characteristic_features( fh: FunctionHandle, bb: BBHandle, ih: InsnHandle ) -> Iterator[Tuple[Characteristic, Address]]: - if ih.inner.opcode not in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp, OpCodes.Calli): + if ih.inner.opcode not in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp): return row: Union[str, InvalidToken, dnfile.base.MDTableRow] = resolve_dotnet_token(fh.ctx["pe"], ih.inner.operand) @@ -254,7 +221,6 @@ INSTRUCTION_HANDLERS = ( extract_insn_property_features, extract_insn_number_features, extract_insn_string_features, - extract_insn_namespace_features, - extract_insn_class_features, + extract_insn_namespace_class_features, extract_unmanaged_call_characteristic_features, ) diff --git a/capa/features/extractors/dnfile/types.py b/capa/features/extractors/dnfile/types.py new file mode 100644 index 00000000..822b5d67 --- /dev/null +++ b/capa/features/extractors/dnfile/types.py @@ -0,0 +1,75 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +from enum import Enum +from typing import Union, Optional + + +class DnType(object): + def __init__(self, token: int, class_: str, namespace: str = "", member: str = "", access: Optional[str] = None): + self.token: int = token + self.access: Optional[str] = access + self.namespace: str = namespace + self.class_: str = class_ + + if member == ".ctor": + member = "ctor" + if member == ".cctor": + member = "cctor" + + self.member: str = member + + def __hash__(self): + return hash((self.token, self.access, self.namespace, self.class_, self.member)) + + def __eq__(self, other): + return ( + self.token == other.token + and self.access == other.access + and self.namespace == other.namespace + and self.class_ == other.class_ + and self.member == other.member + ) + + def __str__(self): + return DnType.format_name(self.class_, namespace=self.namespace, member=self.member) + + def __repr__(self): + return str(self) + + @staticmethod + def format_name(class_: str, namespace: str = "", member: str = ""): + # like File::OpenRead + name: str = f"{class_}::{member}" if member else class_ + if namespace: + # like System.IO.File::OpenRead + name = f"{namespace}.{name}" + return name + + +class DnUnmanagedMethod: + def __init__(self, token: int, module: str, method: str): + self.token: int = token + self.module: str = module + self.method: str = method + + def __hash__(self): + return hash((self.token, self.module, self.method)) + + def __eq__(self, other): + return self.token == other.token and self.module == other.module and self.method == other.method + + def __str__(self): + return DnUnmanagedMethod.format_name(self.module, self.method) + + def __repr__(self): + return str(self) + + @staticmethod + def format_name(module, method): + return f"{module}.{method}" diff --git a/tests/fixtures.py b/tests/fixtures.py index 05373d17..ce0effc4 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -760,6 +760,12 @@ FEATURE_PRESENCE_TESTS_DOTNET = sorted( ("_1c444", "function=0x1F68", capa.features.insn.Number(0x0), True), ("_1c444", "function=0x1F68", capa.features.insn.Number(0x1), False), ("_692f", "token=0x6000004", capa.features.insn.API("System.Linq.Enumerable::First"), True), # generic method + ( + "_692f", + "token=0x6000004", + capa.features.insn.Property("System.Linq.Enumerable::First"), + False, + ), # generic method ("_692f", "token=0x6000004", capa.features.common.Namespace("System.Linq"), True), # generic method ("_692f", "token=0x6000004", capa.features.common.Class("System.Linq.Enumerable"), True), # generic method ("_1c444", "token=0x6000020", capa.features.common.Namespace("Reqss"), True), # ldftn