mirror of
https://github.com/mandiant/capa.git
synced 2025-12-09 06:10:36 -08:00
Compare commits
2 Commits
wb/library
...
add-cfg-in
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
06fb21728c | ||
|
|
f46b5a4c02 |
@@ -4,6 +4,8 @@
|
||||
|
||||
### New Features
|
||||
|
||||
- meta: add function `calls` information, add `entry_points`, add `thunk_functions` #874 @mr-tz
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
### New Rules (0)
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import abc
|
||||
from typing import Tuple, Iterator, SupportsInt
|
||||
from typing import List, Tuple, Iterator, SupportsInt
|
||||
|
||||
from capa.features.common import Feature
|
||||
|
||||
@@ -59,6 +59,13 @@ class FeatureExtractor:
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_entry_points(self) -> List[int]:
|
||||
"""
|
||||
get the programs entry points, e.g. AddressOfEntryPoint and exported functions
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def extract_global_features(self) -> Iterator[Tuple[Feature, int]]:
|
||||
"""
|
||||
@@ -99,6 +106,12 @@ class FeatureExtractor:
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def is_thunk_function(self, va: int) -> bool:
|
||||
"""
|
||||
is the given address a thunk function?
|
||||
"""
|
||||
return False
|
||||
|
||||
def is_library_function(self, va: int) -> bool:
|
||||
"""
|
||||
is the given address a library function?
|
||||
@@ -134,6 +147,13 @@ class FeatureExtractor:
|
||||
"""
|
||||
raise KeyError(va)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_calls_from(self, va: int) -> List[int]:
|
||||
"""
|
||||
return a function's call targets
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def extract_function_features(self, f: FunctionHandle) -> Iterator[Tuple[Feature, int]]:
|
||||
"""
|
||||
@@ -273,6 +293,9 @@ class NullFeatureExtractor(FeatureExtractor):
|
||||
def get_base_address(self):
|
||||
return self.features["base address"]
|
||||
|
||||
def get_entry_points(self) -> List[int]:
|
||||
return self.features["entry points"]
|
||||
|
||||
def extract_global_features(self):
|
||||
for p in self.features.get("global features", []):
|
||||
va, feature = p
|
||||
@@ -287,6 +310,19 @@ class NullFeatureExtractor(FeatureExtractor):
|
||||
for va in sorted(self.features["functions"].keys()):
|
||||
yield va
|
||||
|
||||
def is_library_function(self, va: int) -> bool:
|
||||
return va in self.features["library functions"]
|
||||
|
||||
def get_function_name(self, va: int) -> str:
|
||||
return self.features["library functions"].get(va)
|
||||
|
||||
def is_thunk_function(self, va: int) -> bool:
|
||||
return va in self.features["thunk functions"]
|
||||
|
||||
def get_calls_from(self, va: int) -> List[int]:
|
||||
# TODO
|
||||
pass
|
||||
|
||||
def extract_function_features(self, f):
|
||||
for p in self.features.get("functions", {}).get(f, {}).get("features", []): # noqa: E127 line over-indented
|
||||
va, feature = p
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import idaapi
|
||||
import idautils
|
||||
|
||||
import capa.ida.helpers
|
||||
import capa.features.extractors.elf
|
||||
@@ -66,6 +67,10 @@ class IdaFeatureExtractor(FeatureExtractor):
|
||||
def get_base_address(self):
|
||||
return idaapi.get_imagebase()
|
||||
|
||||
def get_entry_points(self):
|
||||
# returns list of tuples (index, ordinal, ea, name)
|
||||
return [e[2] for e in idautils.Entries()]
|
||||
|
||||
def extract_global_features(self):
|
||||
yield from self.global_features
|
||||
|
||||
@@ -102,6 +107,10 @@ class IdaFeatureExtractor(FeatureExtractor):
|
||||
def extract_basic_block_features(self, f, bb):
|
||||
yield from capa.features.extractors.ida.basicblock.extract_features(f, bb)
|
||||
|
||||
def is_thunk_function(self, va):
|
||||
f = idaapi.get_func(va)
|
||||
return f.flags & idaapi.FUNC_THUNK
|
||||
|
||||
def get_instructions(self, f, bb):
|
||||
import capa.features.extractors.ida.helpers as ida_helpers
|
||||
|
||||
@@ -110,3 +119,7 @@ class IdaFeatureExtractor(FeatureExtractor):
|
||||
|
||||
def extract_insn_features(self, f, bb, insn):
|
||||
yield from capa.features.extractors.ida.insn.extract_features(f, bb, insn)
|
||||
|
||||
def get_calls_from(self, va):
|
||||
# TODO
|
||||
pass
|
||||
|
||||
@@ -25,6 +25,10 @@ class SmdaFeatureExtractor(FeatureExtractor):
|
||||
def get_base_address(self):
|
||||
return self.smda_report.base_addr
|
||||
|
||||
def get_entry_points(self):
|
||||
# TODO
|
||||
pass
|
||||
|
||||
def extract_global_features(self):
|
||||
yield from self.global_features
|
||||
|
||||
@@ -35,6 +39,10 @@ class SmdaFeatureExtractor(FeatureExtractor):
|
||||
for function in self.smda_report.getFunctions():
|
||||
yield function
|
||||
|
||||
def get_calls_from(self, va):
|
||||
# TODO
|
||||
pass
|
||||
|
||||
def extract_function_features(self, f):
|
||||
yield from capa.features.extractors.smda.function.extract_features(f)
|
||||
|
||||
|
||||
@@ -51,6 +51,9 @@ class VivisectFeatureExtractor(FeatureExtractor):
|
||||
# assume there is only one file loaded into the vw
|
||||
return list(self.vw.filemeta.values())[0]["imagebase"]
|
||||
|
||||
def get_entry_points(self):
|
||||
return self.vw.getEntryPoints()
|
||||
|
||||
def extract_global_features(self):
|
||||
yield from self.global_features
|
||||
|
||||
@@ -80,5 +83,12 @@ class VivisectFeatureExtractor(FeatureExtractor):
|
||||
def is_library_function(self, va):
|
||||
return viv_utils.flirt.is_library_function(self.vw, va)
|
||||
|
||||
def is_thunk_function(self, va):
|
||||
return self.vw.isFunctionThunk(va)
|
||||
|
||||
def get_function_name(self, va):
|
||||
return viv_utils.get_function_name(self.vw, va)
|
||||
|
||||
def get_calls_from(self, va):
|
||||
# TODO compare vs. getXrefsFrom, e.g. on threads?
|
||||
return self.vw.cfctx.getCallsFrom(va)
|
||||
|
||||
@@ -4,8 +4,14 @@ capa freeze file format: `| capa0000 | + zlib(utf-8(json(...)))`
|
||||
json format:
|
||||
|
||||
{
|
||||
'version': 1,
|
||||
'version': 2,
|
||||
'base address': int(base address),
|
||||
'library functions': {
|
||||
int(function va): str(function name)
|
||||
},
|
||||
'thunk functions': {
|
||||
int(function va): str(function name)
|
||||
},
|
||||
'functions': {
|
||||
int(function va): {
|
||||
int(basic block va): [int(instruction va), ...]
|
||||
@@ -59,6 +65,8 @@ import capa.features.basicblock
|
||||
import capa.features.extractors.base_extractor
|
||||
from capa.helpers import hex
|
||||
|
||||
FREEZE_FORMAT_VERSION = 2
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -85,8 +93,11 @@ def dumps(extractor):
|
||||
str: the serialized features.
|
||||
"""
|
||||
ret = {
|
||||
"version": 1,
|
||||
"version": FREEZE_FORMAT_VERSION,
|
||||
"base address": extractor.get_base_address(),
|
||||
"entry points": extractor.get_entry_points(),
|
||||
"library functions": {},
|
||||
"thunk functions": {},
|
||||
"functions": {},
|
||||
"scopes": {
|
||||
"global": [],
|
||||
@@ -105,6 +116,12 @@ def dumps(extractor):
|
||||
for f in extractor.get_functions():
|
||||
ret["functions"][hex(f)] = {}
|
||||
|
||||
if extractor.is_library_function(int(f)):
|
||||
ret["library functions"][hex(f)] = extractor.get_function_name(int(f))
|
||||
|
||||
if extractor.is_thunk_function(int(f)):
|
||||
ret["thunk functions"][hex(f)] = extractor.get_function_name(int(f))
|
||||
|
||||
for feature, va in extractor.extract_function_features(f):
|
||||
ret["scopes"]["function"].append(serialize_feature(feature) + (hex(va), (hex(f),)))
|
||||
|
||||
@@ -147,11 +164,14 @@ def loads(s):
|
||||
"""deserialize a set of features (as a NullFeatureExtractor) from a string."""
|
||||
doc = json.loads(s)
|
||||
|
||||
if doc.get("version") != 1:
|
||||
if doc.get("version") != FREEZE_FORMAT_VERSION:
|
||||
raise ValueError("unsupported freeze format version: %d" % (doc.get("version")))
|
||||
|
||||
features = {
|
||||
"base address": doc.get("base address"),
|
||||
"entry points": doc.get("entry points"),
|
||||
"library functions": {int(k, 0x10): v for k, v in doc.get("library functions", {}).items()},
|
||||
"thunk functions": {int(k, 0x10): v for k, v in doc.get("thunk functions", {}).items()},
|
||||
"global features": [],
|
||||
"file features": [],
|
||||
"functions": {},
|
||||
|
||||
19
capa/main.py
19
capa/main.py
@@ -157,6 +157,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
"functions": {},
|
||||
},
|
||||
"library_functions": {},
|
||||
"thunk_functions": {},
|
||||
} # type: Dict[str, Any]
|
||||
|
||||
pbar = tqdm.tqdm
|
||||
@@ -181,6 +182,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
if isinstance(pb, tqdm.tqdm):
|
||||
pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage))
|
||||
continue
|
||||
elif extractor.is_thunk_function(function_address):
|
||||
function_name = extractor.get_function_name(function_address)
|
||||
logger.debug("skipping thunk function 0x%x (%s)", function_address, function_name)
|
||||
meta["thunk_functions"][function_address] = function_name
|
||||
continue
|
||||
|
||||
function_matches, bb_matches, feature_count = find_function_capabilities(ruleset, extractor, f)
|
||||
meta["feature_counts"]["functions"][function_address] = feature_count
|
||||
@@ -603,11 +609,12 @@ def collect_metadata(argv, sample_path, rules_path, extractor):
|
||||
"extractor": extractor.__class__.__name__,
|
||||
"rules": rules_path,
|
||||
"base_address": extractor.get_base_address(),
|
||||
"entry_points": extractor.get_entry_points(),
|
||||
"layout": {
|
||||
# this is updated after capabilities have been collected.
|
||||
# will look like:
|
||||
#
|
||||
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
||||
# "functions": { 0x401000: { ... }
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -624,15 +631,22 @@ def compute_layout(rules, extractor, capabilities):
|
||||
"""
|
||||
functions_by_bb = {}
|
||||
bbs_by_function = {}
|
||||
calls_by_function = collections.defaultdict(list)
|
||||
for f in extractor.get_functions():
|
||||
bbs_by_function[int(f)] = []
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
functions_by_bb[int(bb)] = int(f)
|
||||
bbs_by_function[int(f)].append(int(bb))
|
||||
calls_by_function[int(f)] = extractor.get_calls_from(int(f))
|
||||
|
||||
matched_bbs = set()
|
||||
for rule_name, matches in capabilities.items():
|
||||
rule = rules[rule_name]
|
||||
|
||||
if rule.meta.get("capa/subscope-rule"):
|
||||
# not included in result document
|
||||
continue
|
||||
|
||||
if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE:
|
||||
for (addr, match) in matches:
|
||||
assert addr in functions_by_bb
|
||||
@@ -641,7 +655,8 @@ def compute_layout(rules, extractor, capabilities):
|
||||
layout = {
|
||||
"functions": {
|
||||
f: {
|
||||
"matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs]
|
||||
"matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs],
|
||||
"calls": calls_by_function.get(f, []),
|
||||
# this object is open to extension in the future,
|
||||
# such as with the function name, etc.
|
||||
}
|
||||
|
||||
@@ -22,6 +22,9 @@ import capa.features.extractors.base_extractor
|
||||
EXTRACTOR = capa.features.extractors.base_extractor.NullFeatureExtractor(
|
||||
{
|
||||
"base address": 0x401000,
|
||||
"entry points": [0x401000],
|
||||
"library functions": {},
|
||||
"thunk functions": {},
|
||||
"file features": [
|
||||
(0x402345, capa.features.common.Characteristic("embedded pe")),
|
||||
],
|
||||
|
||||
Reference in New Issue
Block a user