Compare commits

...

2 Commits

Author SHA1 Message Date
Moritz Raabe
06fb21728c add libs and thunks to frz format 2022-01-24 20:13:54 +01:00
Moritz Raabe
f46b5a4c02 add cfg calls info 2022-01-19 19:25:13 +01:00
8 changed files with 113 additions and 6 deletions

View File

@@ -4,6 +4,8 @@
### New Features
- meta: add function `calls` information, add `entry_points`, add `thunk_functions` #874 @mr-tz
### Breaking Changes
### New Rules (0)

View File

@@ -7,7 +7,7 @@
# See the License for the specific language governing permissions and limitations under the License.
import abc
from typing import Tuple, Iterator, SupportsInt
from typing import List, Tuple, Iterator, SupportsInt
from capa.features.common import Feature
@@ -59,6 +59,13 @@ class FeatureExtractor:
"""
raise NotImplementedError()
@abc.abstractmethod
def get_entry_points(self) -> List[int]:
"""
get the programs entry points, e.g. AddressOfEntryPoint and exported functions
"""
raise NotImplementedError()
@abc.abstractmethod
def extract_global_features(self) -> Iterator[Tuple[Feature, int]]:
"""
@@ -99,6 +106,12 @@ class FeatureExtractor:
"""
raise NotImplementedError()
def is_thunk_function(self, va: int) -> bool:
"""
is the given address a thunk function?
"""
return False
def is_library_function(self, va: int) -> bool:
"""
is the given address a library function?
@@ -134,6 +147,13 @@ class FeatureExtractor:
"""
raise KeyError(va)
@abc.abstractmethod
def get_calls_from(self, va: int) -> List[int]:
"""
return a function's call targets
"""
raise NotImplementedError()
@abc.abstractmethod
def extract_function_features(self, f: FunctionHandle) -> Iterator[Tuple[Feature, int]]:
"""
@@ -273,6 +293,9 @@ class NullFeatureExtractor(FeatureExtractor):
def get_base_address(self):
return self.features["base address"]
def get_entry_points(self) -> List[int]:
return self.features["entry points"]
def extract_global_features(self):
for p in self.features.get("global features", []):
va, feature = p
@@ -287,6 +310,19 @@ class NullFeatureExtractor(FeatureExtractor):
for va in sorted(self.features["functions"].keys()):
yield va
def is_library_function(self, va: int) -> bool:
return va in self.features["library functions"]
def get_function_name(self, va: int) -> str:
return self.features["library functions"].get(va)
def is_thunk_function(self, va: int) -> bool:
return va in self.features["thunk functions"]
def get_calls_from(self, va: int) -> List[int]:
# TODO
pass
def extract_function_features(self, f):
for p in self.features.get("functions", {}).get(f, {}).get("features", []): # noqa: E127 line over-indented
va, feature = p

View File

@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import idaapi
import idautils
import capa.ida.helpers
import capa.features.extractors.elf
@@ -66,6 +67,10 @@ class IdaFeatureExtractor(FeatureExtractor):
def get_base_address(self):
return idaapi.get_imagebase()
def get_entry_points(self):
# returns list of tuples (index, ordinal, ea, name)
return [e[2] for e in idautils.Entries()]
def extract_global_features(self):
yield from self.global_features
@@ -102,6 +107,10 @@ class IdaFeatureExtractor(FeatureExtractor):
def extract_basic_block_features(self, f, bb):
yield from capa.features.extractors.ida.basicblock.extract_features(f, bb)
def is_thunk_function(self, va):
f = idaapi.get_func(va)
return f.flags & idaapi.FUNC_THUNK
def get_instructions(self, f, bb):
import capa.features.extractors.ida.helpers as ida_helpers
@@ -110,3 +119,7 @@ class IdaFeatureExtractor(FeatureExtractor):
def extract_insn_features(self, f, bb, insn):
yield from capa.features.extractors.ida.insn.extract_features(f, bb, insn)
def get_calls_from(self, va):
# TODO
pass

View File

@@ -25,6 +25,10 @@ class SmdaFeatureExtractor(FeatureExtractor):
def get_base_address(self):
return self.smda_report.base_addr
def get_entry_points(self):
# TODO
pass
def extract_global_features(self):
yield from self.global_features
@@ -35,6 +39,10 @@ class SmdaFeatureExtractor(FeatureExtractor):
for function in self.smda_report.getFunctions():
yield function
def get_calls_from(self, va):
# TODO
pass
def extract_function_features(self, f):
yield from capa.features.extractors.smda.function.extract_features(f)

View File

@@ -51,6 +51,9 @@ class VivisectFeatureExtractor(FeatureExtractor):
# assume there is only one file loaded into the vw
return list(self.vw.filemeta.values())[0]["imagebase"]
def get_entry_points(self):
return self.vw.getEntryPoints()
def extract_global_features(self):
yield from self.global_features
@@ -80,5 +83,12 @@ class VivisectFeatureExtractor(FeatureExtractor):
def is_library_function(self, va):
return viv_utils.flirt.is_library_function(self.vw, va)
def is_thunk_function(self, va):
return self.vw.isFunctionThunk(va)
def get_function_name(self, va):
return viv_utils.get_function_name(self.vw, va)
def get_calls_from(self, va):
# TODO compare vs. getXrefsFrom, e.g. on threads?
return self.vw.cfctx.getCallsFrom(va)

View File

@@ -4,8 +4,14 @@ capa freeze file format: `| capa0000 | + zlib(utf-8(json(...)))`
json format:
{
'version': 1,
'version': 2,
'base address': int(base address),
'library functions': {
int(function va): str(function name)
},
'thunk functions': {
int(function va): str(function name)
},
'functions': {
int(function va): {
int(basic block va): [int(instruction va), ...]
@@ -59,6 +65,8 @@ import capa.features.basicblock
import capa.features.extractors.base_extractor
from capa.helpers import hex
FREEZE_FORMAT_VERSION = 2
logger = logging.getLogger(__name__)
@@ -85,8 +93,11 @@ def dumps(extractor):
str: the serialized features.
"""
ret = {
"version": 1,
"version": FREEZE_FORMAT_VERSION,
"base address": extractor.get_base_address(),
"entry points": extractor.get_entry_points(),
"library functions": {},
"thunk functions": {},
"functions": {},
"scopes": {
"global": [],
@@ -105,6 +116,12 @@ def dumps(extractor):
for f in extractor.get_functions():
ret["functions"][hex(f)] = {}
if extractor.is_library_function(int(f)):
ret["library functions"][hex(f)] = extractor.get_function_name(int(f))
if extractor.is_thunk_function(int(f)):
ret["thunk functions"][hex(f)] = extractor.get_function_name(int(f))
for feature, va in extractor.extract_function_features(f):
ret["scopes"]["function"].append(serialize_feature(feature) + (hex(va), (hex(f),)))
@@ -147,11 +164,14 @@ def loads(s):
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
doc = json.loads(s)
if doc.get("version") != 1:
if doc.get("version") != FREEZE_FORMAT_VERSION:
raise ValueError("unsupported freeze format version: %d" % (doc.get("version")))
features = {
"base address": doc.get("base address"),
"entry points": doc.get("entry points"),
"library functions": {int(k, 0x10): v for k, v in doc.get("library functions", {}).items()},
"thunk functions": {int(k, 0x10): v for k, v in doc.get("thunk functions", {}).items()},
"global features": [],
"file features": [],
"functions": {},

View File

@@ -157,6 +157,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
"functions": {},
},
"library_functions": {},
"thunk_functions": {},
} # type: Dict[str, Any]
pbar = tqdm.tqdm
@@ -181,6 +182,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
if isinstance(pb, tqdm.tqdm):
pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage))
continue
elif extractor.is_thunk_function(function_address):
function_name = extractor.get_function_name(function_address)
logger.debug("skipping thunk function 0x%x (%s)", function_address, function_name)
meta["thunk_functions"][function_address] = function_name
continue
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
meta["feature_counts"]["functions"][function_address] = feature_count
@@ -603,11 +609,12 @@ def collect_metadata(argv, sample_path, rules_path, extractor):
"extractor": extractor.__class__.__name__,
"rules": rules_path,
"base_address": extractor.get_base_address(),
"entry_points": extractor.get_entry_points(),
"layout": {
# this is updated after capabilities have been collected.
# will look like:
#
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
# "functions": { 0x401000: { ... }
},
},
}
@@ -624,15 +631,22 @@ def compute_layout(rules, extractor, capabilities):
"""
functions_by_bb = {}
bbs_by_function = {}
calls_by_function = collections.defaultdict(list)
for f in extractor.get_functions():
bbs_by_function[int(f)] = []
for bb in extractor.get_basic_blocks(f):
functions_by_bb[int(bb)] = int(f)
bbs_by_function[int(f)].append(int(bb))
calls_by_function[int(f)] = extractor.get_calls_from(int(f))
matched_bbs = set()
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if rule.meta.get("capa/subscope-rule"):
# not included in result document
continue
if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
for (addr, match) in matches:
assert addr in functions_by_bb
@@ -641,7 +655,8 @@ def compute_layout(rules, extractor, capabilities):
layout = {
"functions": {
f: {
"matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs]
"matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs],
"calls": calls_by_function.get(f, []),
# this object is open to extension in the future,
# such as with the function name, etc.
}

View File

@@ -22,6 +22,9 @@ import capa.features.extractors.base_extractor
EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor(
{
"base address": 0x401000,
"entry points": [0x401000],
"library functions": {},
"thunk functions": {},
"file features": [
(0x402345, capa.features.common.Characteristic("embedded pe")),
],