mirror of
https://github.com/mandiant/capa.git
synced 2025-12-22 07:10:29 -08:00
Merge branch 'dynamic-feature-extraction' into static-extractor
This commit is contained in:
@@ -69,6 +69,7 @@ import sys
|
||||
import logging
|
||||
import os.path
|
||||
import argparse
|
||||
from typing import cast
|
||||
|
||||
import capa.main
|
||||
import capa.rules
|
||||
@@ -77,11 +78,12 @@ import capa.helpers
|
||||
import capa.features
|
||||
import capa.exceptions
|
||||
import capa.render.verbose as v
|
||||
import capa.features.common
|
||||
import capa.features.freeze
|
||||
import capa.features.address
|
||||
from capa.helpers import log_unsupported_runtime_error
|
||||
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor
|
||||
from capa.helpers import get_auto_format, log_unsupported_runtime_error
|
||||
from capa.features.common import FORMAT_AUTO, FORMAT_FREEZE, DYNAMIC_FORMATS, is_global_feature
|
||||
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor
|
||||
|
||||
|
||||
logger = logging.getLogger("capa.show-features")
|
||||
|
||||
@@ -98,6 +100,7 @@ def main(argv=None):
|
||||
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"})
|
||||
|
||||
parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
|
||||
parser.add_argument("-P", "--process", type=str, help="Show features for specific process name")
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
@@ -113,16 +116,17 @@ def main(argv=None):
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
if (args.format == "freeze") or (
|
||||
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
|
||||
):
|
||||
format_ = args.format if args.format != FORMAT_AUTO else get_auto_format(args.sample)
|
||||
if format_ == FORMAT_FREEZE:
|
||||
# this should be moved above the previous if clause after implementing
|
||||
# feature freeze for the dynamic analysis flavor
|
||||
with open(args.sample, "rb") as f:
|
||||
extractor: FeatureExtractor = capa.features.freeze.load(f.read())
|
||||
else:
|
||||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
||||
try:
|
||||
extractor = capa.main.get_extractor(
|
||||
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
|
||||
args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace
|
||||
)
|
||||
except capa.exceptions.UnsupportedFormatError:
|
||||
capa.helpers.log_unsupported_format_error()
|
||||
@@ -131,7 +135,17 @@ def main(argv=None):
|
||||
log_unsupported_runtime_error()
|
||||
return -1
|
||||
|
||||
assert isinstance(extractor, StaticFeatureExtractor)
|
||||
if format_ in DYNAMIC_FORMATS:
|
||||
assert isinstance(extractor, DynamicFeatureExtractor)
|
||||
print_dynamic_analysis(extractor, args)
|
||||
else:
|
||||
assert isinstance(extractor, StaticFeatureExtractor)
|
||||
print_static_analysis(extractor, args)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def print_static_analysis(extractor: StaticFeatureExtractor, args):
|
||||
for feature, addr in extractor.extract_global_features():
|
||||
print(f"global: {format_address(addr)}: {feature}")
|
||||
|
||||
@@ -156,9 +170,90 @@ def main(argv=None):
|
||||
print(f"{args.function} not a function")
|
||||
return -1
|
||||
|
||||
print_features(function_handles, extractor)
|
||||
print_static_features(function_handles, extractor)
|
||||
|
||||
return 0
|
||||
|
||||
def print_dynamic_analysis(extractor: DynamicFeatureExtractor, args):
|
||||
for feature, addr in extractor.extract_global_features():
|
||||
print(f"global: {format_address(addr)}: {feature}")
|
||||
|
||||
if not args.process:
|
||||
for feature, addr in extractor.extract_file_features():
|
||||
print(f"file: {format_address(addr)}: {feature}")
|
||||
|
||||
process_handles = tuple(extractor.get_processes())
|
||||
|
||||
if args.process:
|
||||
process_handles = tuple(filter(lambda ph: ph.inner["name"] == args.process, process_handles))
|
||||
if args.process not in [ph.inner["name"] for ph in args.process]:
|
||||
print(f"{args.process} not a process")
|
||||
return -1
|
||||
|
||||
print_dynamic_features(process_handles, extractor)
|
||||
|
||||
|
||||
def print_static_features(functions, extractor: StaticFeatureExtractor):
|
||||
for f in functions:
|
||||
if extractor.is_library_function(f.address):
|
||||
function_name = extractor.get_function_name(f.address)
|
||||
logger.debug("skipping library function %s (%s)", format_address(f.address), function_name)
|
||||
continue
|
||||
|
||||
print(f"func: {format_address(f.address)}")
|
||||
|
||||
for feature, addr in extractor.extract_function_features(f):
|
||||
if is_global_feature(feature):
|
||||
continue
|
||||
|
||||
if f.address != addr:
|
||||
print(f" func: {format_address(f.address)}: {feature} -> {format_address(addr)}")
|
||||
else:
|
||||
print(f" func: {format_address(f.address)}: {feature}")
|
||||
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
for feature, addr in extractor.extract_basic_block_features(f, bb):
|
||||
if is_global_feature(feature):
|
||||
continue
|
||||
|
||||
if bb.address != addr:
|
||||
print(f" bb: {format_address(bb.address)}: {feature} -> {format_address(addr)}")
|
||||
else:
|
||||
print(f" bb: {format_address(bb.address)}: {feature}")
|
||||
|
||||
for insn in extractor.get_instructions(f, bb):
|
||||
for feature, addr in extractor.extract_insn_features(f, bb, insn):
|
||||
if is_global_feature(feature):
|
||||
continue
|
||||
|
||||
try:
|
||||
if insn.address != addr:
|
||||
print(
|
||||
f" insn: {format_address(f.address)}: {format_address(insn.address)}: {feature} -> {format_address(addr)}"
|
||||
)
|
||||
else:
|
||||
print(f" insn: {format_address(insn.address)}: {feature}")
|
||||
|
||||
except UnicodeEncodeError:
|
||||
# may be an issue while piping to less and encountering non-ascii characters
|
||||
continue
|
||||
|
||||
|
||||
def print_dynamic_features(processes, extractor: DynamicFeatureExtractor):
|
||||
for p in processes:
|
||||
print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})")
|
||||
|
||||
for feature, addr in extractor.extract_process_features(p):
|
||||
if is_global_feature(feature):
|
||||
continue
|
||||
|
||||
print(f" proc: {p.inner['name']}: {feature}")
|
||||
|
||||
for t in extractor.get_threads(p):
|
||||
for feature, addr in extractor.extract_thread_features(p, t):
|
||||
if is_global_feature(feature):
|
||||
continue
|
||||
|
||||
print(f" thread: {t.tid}: {feature}")
|
||||
|
||||
|
||||
def ida_main():
|
||||
@@ -185,57 +280,11 @@ def ida_main():
|
||||
print(f"{hex(function)} not a function")
|
||||
return -1
|
||||
|
||||
print_features(function_handles, extractor)
|
||||
print_static_features(function_handles, extractor)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def print_features(functions, extractor: StaticFeatureExtractor):
|
||||
for f in functions:
|
||||
if extractor.is_library_function(f.address):
|
||||
function_name = extractor.get_function_name(f.address)
|
||||
logger.debug("skipping library function %s (%s)", format_address(f.address), function_name)
|
||||
continue
|
||||
|
||||
print(f"func: {format_address(f.address)}")
|
||||
|
||||
for feature, addr in extractor.extract_function_features(f):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
continue
|
||||
|
||||
if f.address != addr:
|
||||
print(f" func: {format_address(f.address)}: {feature} -> {format_address(addr)}")
|
||||
else:
|
||||
print(f" func: {format_address(f.address)}: {feature}")
|
||||
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
for feature, addr in extractor.extract_basic_block_features(f, bb):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
continue
|
||||
|
||||
if bb.address != addr:
|
||||
print(f" bb: {format_address(bb.address)}: {feature} -> {format_address(addr)}")
|
||||
else:
|
||||
print(f" bb: {format_address(bb.address)}: {feature}")
|
||||
|
||||
for insn in extractor.get_instructions(f, bb):
|
||||
for feature, addr in extractor.extract_insn_features(f, bb, insn):
|
||||
if capa.features.common.is_global_feature(feature):
|
||||
continue
|
||||
|
||||
try:
|
||||
if insn.address != addr:
|
||||
print(
|
||||
f" insn: {format_address(f.address)}: {format_address(insn.address)}: {feature} -> {format_address(addr)}"
|
||||
)
|
||||
else:
|
||||
print(f" insn: {format_address(insn.address)}: {feature}")
|
||||
|
||||
except UnicodeEncodeError:
|
||||
# may be an issue while piping to less and encountering non-ascii characters
|
||||
continue
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if capa.main.is_runtime_ida():
|
||||
ida_main()
|
||||
|
||||
Reference in New Issue
Block a user