dotnet: emit namespace/class features for type references (#1242)

* dotnet: emit namespace/class features for type references

* dotnet: pre-compute .NET token caches
This commit is contained in:
Mike Hunhoff
2022-12-21 15:59:29 -07:00
committed by GitHub
parent 50490e6a93
commit b68be0c2ce
6 changed files with 206 additions and 156 deletions

View File

@@ -10,6 +10,7 @@
- Python 3.11 support #1192 @williballenthin
- dotnet: emit calls to/from MethodDef methods #1236 @mike-hunhoff
- dotnet: emit namespace/class features for ldvirtftn/ldftn instructions #1241 @mike-hunhoff
- dotnet: emit namespace/class features for type references #1242 @mike-hunhoff
### Breaking Changes

View File

@@ -8,7 +8,8 @@
from __future__ import annotations
from typing import Dict, List, Tuple, Iterator, Optional
from enum import Enum
from typing import Dict, List, Tuple, Union, Iterator, Optional
import dnfile
from dncil.cil.opcode import OpCodes
@@ -19,8 +20,51 @@ import capa.features.extractors.dnfile.insn
import capa.features.extractors.dnfile.function
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress
from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
from capa.features.extractors.dnfile.helpers import get_dotnet_managed_method_bodies
from capa.features.extractors.dnfile.helpers import (
get_dotnet_types,
get_dotnet_fields,
get_dotnet_managed_imports,
get_dotnet_managed_methods,
get_dotnet_unmanaged_imports,
get_dotnet_managed_method_bodies,
)
class DnFileFeatureExtractorCache:
def __init__(self, pe: dnfile.dnPE):
self.imports: Dict[int, Union[DnType, DnUnmanagedMethod]] = {}
self.native_imports: Dict[int, Union[DnType, DnUnmanagedMethod]] = {}
self.methods: Dict[int, Union[DnType, DnUnmanagedMethod]] = {}
self.fields: Dict[int, Union[DnType, DnUnmanagedMethod]] = {}
self.types: Dict[int, Union[DnType, DnUnmanagedMethod]] = {}
for import_ in get_dotnet_managed_imports(pe):
self.imports[import_.token] = import_
for native_import in get_dotnet_unmanaged_imports(pe):
self.native_imports[native_import.token] = native_import
for method in get_dotnet_managed_methods(pe):
self.methods[method.token] = method
for field in get_dotnet_fields(pe):
self.fields[field.token] = field
for type_ in get_dotnet_types(pe):
self.types[type_.token] = type_
def get_import(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]:
return self.imports.get(token, None)
def get_native_import(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]:
return self.native_imports.get(token, None)
def get_method(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]:
return self.methods.get(token, None)
def get_field(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]:
return self.fields.get(token, None)
def get_type(self, token: int) -> Optional[Union[DnType, DnUnmanagedMethod]]:
return self.types.get(token, None)
class DnfileFeatureExtractor(FeatureExtractor):
@@ -28,6 +72,10 @@ class DnfileFeatureExtractor(FeatureExtractor):
super().__init__()
self.pe: dnfile.dnPE = dnfile.dnPE(path)
# pre-compute .NET token lookup tables; each .NET method has access to this cache for feature extraction
# most relevant at instruction scope
self.token_cache: DnFileFeatureExtractorCache = DnFileFeatureExtractorCache(self.pe)
# pre-compute these because we'll yield them at *every* scope.
self.global_features: List[Tuple[Feature, Address]] = []
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_os(pe=self.pe))
@@ -47,7 +95,9 @@ class DnfileFeatureExtractor(FeatureExtractor):
methods: Dict[Address, FunctionHandle] = {}
for (token, method) in get_dotnet_managed_method_bodies(self.pe):
fh: FunctionHandle = FunctionHandle(
address=DNTokenAddress(token), inner=method, ctx={"pe": self.pe, "calls_from": set(), "calls_to": set()}
address=DNTokenAddress(token),
inner=method,
ctx={"pe": self.pe, "calls_from": set(), "calls_to": set(), "cache": self.token_cache},
)
# method tokens should be unique

View File

@@ -18,6 +18,7 @@ from dncil.clr.token import Token, StringToken, InvalidToken
from dncil.cil.body.reader import CilMethodBodyReaderBase
from capa.features.common import FeatureAccess
from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod
logger = logging.getLogger(__name__)
@@ -40,70 +41,6 @@ class DnfileMethodBodyReader(CilMethodBodyReaderBase):
return self.offset
class DnType(object):
def __init__(self, token: int, class_: str, namespace: str = "", member: str = "", access: Optional[str] = None):
self.token = token
# property access
self.access = access
self.namespace = namespace
self.class_ = class_
if member == ".ctor":
member = "ctor"
if member == ".cctor":
member = "cctor"
self.member = member
def __hash__(self):
return hash((self.token, self.access, self.namespace, self.class_, self.member))
def __eq__(self, other):
return (
self.token == other.token
and self.access == other.access
and self.namespace == other.namespace
and self.class_ == other.class_
and self.member == other.member
)
def __str__(self):
return DnType.format_name(self.class_, namespace=self.namespace, member=self.member)
def __repr__(self):
return str(self)
@staticmethod
def format_name(class_: str, namespace: str = "", member: str = ""):
# like File::OpenRead
name: str = f"{class_}::{member}" if member else class_
if namespace:
# like System.IO.File::OpenRead
name = f"{namespace}.{name}"
return name
class DnUnmanagedMethod:
def __init__(self, token: int, module: str, method: str):
self.token: int = token
self.module: str = module
self.method: str = method
def __hash__(self):
return hash((self.token, self.module, self.method))
def __eq__(self, other):
return self.token == other.token and self.module == other.module and self.method == other.method
def __str__(self):
return DnUnmanagedMethod.format_name(self.module, self.method)
def __repr__(self):
return str(self)
@staticmethod
def format_name(module, method):
return f"{module}.{method}"
def resolve_dotnet_token(pe: dnfile.dnPE, token: Token) -> Union[dnfile.base.MDTableRow, InvalidToken, str]:
"""map generic token to string or table row"""
assert pe.net is not None
@@ -363,6 +300,21 @@ def get_dotnet_unmanaged_imports(pe: dnfile.dnPE) -> Iterator[DnUnmanagedMethod]
yield DnUnmanagedMethod(token, module, method)
def get_dotnet_types(pe: dnfile.dnPE) -> Iterator[DnType]:
"""get .NET types from TypeDef and TypeRef tables"""
for (rid, typedef) in iter_dotnet_table(pe, dnfile.mdtable.TypeDef.number):
assert isinstance(typedef, dnfile.mdtable.TypeDefRow)
typedef_token: int = calculate_dotnet_token_value(dnfile.mdtable.TypeDef.number, rid)
yield DnType(typedef_token, typedef.TypeName, namespace=typedef.TypeNamespace)
for (rid, typeref) in iter_dotnet_table(pe, dnfile.mdtable.TypeRef.number):
assert isinstance(typeref, dnfile.mdtable.TypeRefRow)
typeref_token: int = calculate_dotnet_token_value(dnfile.mdtable.TypeRef.number, rid)
yield DnType(typeref_token, typeref.TypeName, namespace=typeref.TypeNamespace)
def calculate_dotnet_token_value(table: int, rid: int) -> int:
return ((table & 0xFF) << Token.TABLE_SHIFT) | (rid & Token.RID_MASK)

View File

@@ -9,7 +9,10 @@
from __future__ import annotations
import logging
from typing import Dict, Tuple, Union, Iterator, Optional
from typing import TYPE_CHECKING, Any, Dict, Tuple, Union, Iterator, Optional
if TYPE_CHECKING:
from capa.features.extractors.dnfile.extractor import DnFileFeatureExtractorCache
import dnfile
from dncil.clr.token import Token, StringToken, InvalidToken
@@ -19,78 +22,42 @@ import capa.features.extractors.helpers
from capa.features.insn import API, Number, Property
from capa.features.common import Class, String, Feature, Namespace, FeatureAccess, Characteristic
from capa.features.address import Address
from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
from capa.features.extractors.dnfile.helpers import (
DnType,
DnUnmanagedMethod,
get_dotnet_fields,
resolve_dotnet_token,
read_dotnet_user_string,
get_dotnet_managed_imports,
get_dotnet_managed_methods,
calculate_dotnet_token_value,
get_dotnet_unmanaged_imports,
)
logger = logging.getLogger(__name__)
def get_managed_imports(ctx: Dict) -> Dict:
if "managed_imports_cache" not in ctx:
ctx["managed_imports_cache"] = {}
for method in get_dotnet_managed_imports(ctx["pe"]):
ctx["managed_imports_cache"][method.token] = method
return ctx["managed_imports_cache"]
def get_unmanaged_imports(ctx: Dict) -> Dict:
if "unmanaged_imports_cache" not in ctx:
ctx["unmanaged_imports_cache"] = {}
for imp in get_dotnet_unmanaged_imports(ctx["pe"]):
ctx["unmanaged_imports_cache"][imp.token] = imp
return ctx["unmanaged_imports_cache"]
def get_methods(ctx: Dict) -> Dict:
if "methods_cache" not in ctx:
ctx["methods_cache"] = {}
for method in get_dotnet_managed_methods(ctx["pe"]):
ctx["methods_cache"][method.token] = method
return ctx["methods_cache"]
def get_fields(ctx: Dict) -> Dict:
if "fields_cache" not in ctx:
ctx["fields_cache"] = {}
for field in get_dotnet_fields(ctx["pe"]):
ctx["fields_cache"][field.token] = field
return ctx["fields_cache"]
def get_callee(ctx: Dict, token: Token) -> Union[DnType, DnUnmanagedMethod, None]:
def get_callee(
pe: dnfile.dnPE, cache: DnFileFeatureExtractorCache, token: Token
) -> Optional[Union[DnType, DnUnmanagedMethod]]:
"""map .NET token to un/managed (generic) method"""
row: Union[dnfile.base.MDTableRow, InvalidToken, str] = resolve_dotnet_token(ctx["pe"], token)
if not isinstance(row, (dnfile.mdtable.MethodDefRow, dnfile.mdtable.MemberRefRow, dnfile.mdtable.MethodSpecRow)):
# we only handle MethodDef (internal), MemberRef (external), and MethodSpec (generic)
return None
token_: int
if isinstance(row, dnfile.mdtable.MethodSpecRow):
if token.table == dnfile.mdtable.MethodSpec.number:
# map MethodSpec to MethodDef or MemberRef
row: Union[dnfile.base.MDTableRow, InvalidToken, str] = resolve_dotnet_token(pe, token)
assert isinstance(row, dnfile.mdtable.MethodSpecRow)
if row.Method.table is None:
logger.debug("MethodSpec[0x%X] Method table is None", token.rid)
return None
token_ = calculate_dotnet_token_value(row.Method.table.number, row.Method.row_index)
else:
token_ = token.value
callee: Union[DnType, DnUnmanagedMethod, None] = get_managed_imports(ctx).get(token_, None)
callee: Optional[Union[DnType, DnUnmanagedMethod]] = cache.get_import(token_)
if callee is None:
# we must check unmanaged imports before managed methods because we map forwarded managed methods
# to their unmanaged imports; we prefer a forwarded managed method be mapped to its unmanaged import for analysis
callee = get_unmanaged_imports(ctx).get(token_, None)
callee = cache.get_native_import(token_)
if callee is None:
callee = get_methods(ctx).get(token_, None)
callee = cache.get_method(token_)
return callee
@@ -104,7 +71,7 @@ def extract_insn_api_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterato
):
return
callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, ih.inner.operand)
callee: Optional[Union[DnType, DnUnmanagedMethod]] = get_callee(fh.ctx["pe"], fh.ctx["cache"], ih.inner.operand)
if isinstance(callee, DnType):
# ignore methods used to access properties
if callee.access is None:
@@ -123,7 +90,7 @@ def extract_insn_property_features(fh: FunctionHandle, bh, ih: InsnHandle) -> It
if ih.inner.opcode in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp):
# property access via MethodDef or MemberRef
callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, ih.inner.operand)
callee: Optional[Union[DnType, DnUnmanagedMethod]] = get_callee(fh.ctx["pe"], fh.ctx["cache"], ih.inner.operand)
if isinstance(callee, DnType):
if callee.access is not None:
name = str(callee)
@@ -131,14 +98,14 @@ def extract_insn_property_features(fh: FunctionHandle, bh, ih: InsnHandle) -> It
elif ih.inner.opcode in (OpCodes.Ldfld, OpCodes.Ldflda, OpCodes.Ldsfld, OpCodes.Ldsflda):
# property read via Field
read_field: Optional[DnType] = get_fields(fh.ctx).get(ih.inner.operand.value, None)
read_field: Optional[Union[DnType, DnUnmanagedMethod]] = fh.ctx["cache"].get_field(ih.inner.operand.value)
if read_field is not None:
name = str(read_field)
access = FeatureAccess.READ
elif ih.inner.opcode in (OpCodes.Stfld, OpCodes.Stsfld):
# property write via Field
write_field: Optional[DnType] = get_fields(fh.ctx).get(ih.inner.operand.value, None)
write_field: Optional[Union[DnType, DnUnmanagedMethod]] = fh.ctx["cache"].get_field(ih.inner.operand.value)
if write_field is not None:
name = str(write_field)
access = FeatureAccess.WRITE
@@ -149,8 +116,12 @@ def extract_insn_property_features(fh: FunctionHandle, bh, ih: InsnHandle) -> It
yield Property(name), ih.address
def extract_insn_class_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterator[Tuple[Class, Address]]:
"""parse instruction class features"""
def extract_insn_namespace_class_features(
fh: FunctionHandle, bh, ih: InsnHandle
) -> Iterator[Tuple[Union[Namespace, Class], Address]]:
"""parse instruction namespace and class features"""
type_: Optional[Union[DnType, DnUnmanagedMethod]] = None
if ih.inner.opcode in (
OpCodes.Call,
OpCodes.Callvirt,
@@ -160,9 +131,7 @@ def extract_insn_class_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Itera
OpCodes.Newobj,
):
# method call - includes managed methods (MethodDef, TypeRef) and properties (MethodSemantics, TypeRef)
callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, ih.inner.operand)
if isinstance(callee, DnType):
yield Class(DnType.format_name(callee.class_, namespace=callee.namespace)), ih.address
type_ = get_callee(fh.ctx["pe"], fh.ctx["cache"], ih.inner.operand)
elif ih.inner.opcode in (
OpCodes.Ldfld,
@@ -173,37 +142,35 @@ def extract_insn_class_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Itera
OpCodes.Stsfld,
):
# field access
field: Optional[DnType] = get_fields(fh.ctx).get(ih.inner.operand.value, None)
if isinstance(field, DnType):
yield Class(DnType.format_name(field.class_, namespace=field.namespace)), ih.address
def extract_insn_namespace_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterator[Tuple[Namespace, Address]]:
"""parse instruction namespace features"""
if ih.inner.opcode in (
OpCodes.Call,
OpCodes.Callvirt,
OpCodes.Jmp,
OpCodes.Ldvirtftn,
OpCodes.Ldftn,
OpCodes.Newobj,
):
# method call - includes managed methods (MethodDef, TypeRef) and properties (MethodSemantics, TypeRef)
callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, ih.inner.operand)
if isinstance(callee, DnType) and callee.namespace is not None:
yield Namespace(callee.namespace), ih.address
type_ = fh.ctx["cache"].get_field(ih.inner.operand.value)
# ECMA 335 VI.C.4.10
elif ih.inner.opcode in (
OpCodes.Ldfld,
OpCodes.Ldflda,
OpCodes.Ldsfld,
OpCodes.Ldsflda,
OpCodes.Stfld,
OpCodes.Stsfld,
OpCodes.Initobj,
OpCodes.Box,
OpCodes.Castclass,
OpCodes.Cpobj,
OpCodes.Isinst,
OpCodes.Ldelem,
OpCodes.Ldelema,
OpCodes.Ldobj,
OpCodes.Mkrefany,
OpCodes.Newarr,
OpCodes.Refanyval,
OpCodes.Sizeof,
OpCodes.Stobj,
OpCodes.Unbox,
OpCodes.Constrained,
OpCodes.Stelem,
OpCodes.Unbox_Any,
):
field: Optional[DnType] = get_fields(fh.ctx).get(ih.inner.operand.value, None)
if isinstance(field, DnType) and field.namespace is not None:
yield Namespace(field.namespace), ih.address
# type access
type_ = fh.ctx["cache"].get_type(ih.inner.operand.value)
if isinstance(type_, DnType):
yield Class(DnType.format_name(type_.class_, namespace=type_.namespace)), ih.address
if type_.namespace:
yield Namespace(type_.namespace), ih.address
def extract_insn_number_features(fh, bh, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]:
@@ -230,7 +197,7 @@ def extract_insn_string_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iter
def extract_unmanaged_call_characteristic_features(
fh: FunctionHandle, bb: BBHandle, ih: InsnHandle
) -> Iterator[Tuple[Characteristic, Address]]:
if ih.inner.opcode not in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp, OpCodes.Calli):
if ih.inner.opcode not in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp):
return
row: Union[str, InvalidToken, dnfile.base.MDTableRow] = resolve_dotnet_token(fh.ctx["pe"], ih.inner.operand)
@@ -254,7 +221,6 @@ INSTRUCTION_HANDLERS = (
extract_insn_property_features,
extract_insn_number_features,
extract_insn_string_features,
extract_insn_namespace_features,
extract_insn_class_features,
extract_insn_namespace_class_features,
extract_unmanaged_call_characteristic_features,
)

View File

@@ -0,0 +1,75 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from enum import Enum
from typing import Union, Optional
class DnType(object):
def __init__(self, token: int, class_: str, namespace: str = "", member: str = "", access: Optional[str] = None):
self.token: int = token
self.access: Optional[str] = access
self.namespace: str = namespace
self.class_: str = class_
if member == ".ctor":
member = "ctor"
if member == ".cctor":
member = "cctor"
self.member: str = member
def __hash__(self):
return hash((self.token, self.access, self.namespace, self.class_, self.member))
def __eq__(self, other):
return (
self.token == other.token
and self.access == other.access
and self.namespace == other.namespace
and self.class_ == other.class_
and self.member == other.member
)
def __str__(self):
return DnType.format_name(self.class_, namespace=self.namespace, member=self.member)
def __repr__(self):
return str(self)
@staticmethod
def format_name(class_: str, namespace: str = "", member: str = ""):
# like File::OpenRead
name: str = f"{class_}::{member}" if member else class_
if namespace:
# like System.IO.File::OpenRead
name = f"{namespace}.{name}"
return name
class DnUnmanagedMethod:
def __init__(self, token: int, module: str, method: str):
self.token: int = token
self.module: str = module
self.method: str = method
def __hash__(self):
return hash((self.token, self.module, self.method))
def __eq__(self, other):
return self.token == other.token and self.module == other.module and self.method == other.method
def __str__(self):
return DnUnmanagedMethod.format_name(self.module, self.method)
def __repr__(self):
return str(self)
@staticmethod
def format_name(module, method):
return f"{module}.{method}"

View File

@@ -760,6 +760,12 @@ FEATURE_PRESENCE_TESTS_DOTNET = sorted(
("_1c444", "function=0x1F68", capa.features.insn.Number(0x0), True),
("_1c444", "function=0x1F68", capa.features.insn.Number(0x1), False),
("_692f", "token=0x6000004", capa.features.insn.API("System.Linq.Enumerable::First"), True), # generic method
(
"_692f",
"token=0x6000004",
capa.features.insn.Property("System.Linq.Enumerable::First"),
False,
), # generic method
("_692f", "token=0x6000004", capa.features.common.Namespace("System.Linq"), True), # generic method
("_692f", "token=0x6000004", capa.features.common.Class("System.Linq.Enumerable"), True), # generic method
("_1c444", "token=0x6000020", capa.features.common.Namespace("Reqss"), True), # ldftn