scripts/show-features.py: add dynamic feature extraction from cape reports

This commit is contained in:
Yacine Elhamer
2023-06-22 12:16:31 +01:00
parent 6712801b01
commit 64189a4d08
3 changed files with 84 additions and 26 deletions

View File

@@ -450,6 +450,7 @@ FORMAT_AUTO = "auto"
FORMAT_SC32 = "sc32"
FORMAT_SC64 = "sc64"
FORMAT_FREEZE = "freeze"
FORMAT_CAPE = "cape"
FORMAT_RESULT = "result"
FORMAT_UNKNOWN = "unknown"

View File

@@ -73,6 +73,7 @@ from capa.features.common import (
FORMAT_SC64,
FORMAT_DOTNET,
FORMAT_FREEZE,
FORMAT_CAPE,
FORMAT_RESULT,
)
from capa.features.address import NO_ADDRESS, Address
@@ -905,6 +906,7 @@ def install_common_args(parser, wanted=None):
(FORMAT_SC32, "32-bit shellcode"),
(FORMAT_SC64, "64-bit shellcode"),
(FORMAT_FREEZE, "features previously frozen by capa"),
(FORMAT_CAPE, "CAPE sandbox json report"),
]
format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats])
parser.add_argument(

View File

@@ -98,6 +98,7 @@ def main(argv=None):
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"})
parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
parser.add_argument("-P", "--process", type=str, help="Show features for specific process name")
args = parser.parse_args(args=argv)
capa.main.handle_common_args(args)
@@ -113,9 +114,17 @@ def main(argv=None):
logger.error("%s", str(e))
return -1
if (args.format == "freeze") or (
dynamic = (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in ("json", "json_"))
if dynamic:
with open(args.sample, "r+", encoding="utf-8") as f:
import json
report = json.loads(f.read())
extractor = capa.features.extractors.cape.from_report(report)
elif (args.format == "freeze") or (
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
):
# this should be moved above the previous if clause after implementing
# feature freeze for the dynamic analysis flavor
with open(args.sample, "rb") as f:
extractor = capa.features.freeze.load(f.read())
else:
@@ -131,6 +140,17 @@ def main(argv=None):
log_unsupported_runtime_error()
return -1
if dynamic:
dynamic_analysis(extractor, args)
else:
static_analysis(extractor, args)
return 0
def static_analysis(extractor: capa.features.extractors.base_extractor.FeatureExtractor, args):
for feature, addr in extractor.extract_global_features():
print(f"global: {format_address(addr)}: {feature}")
@@ -155,41 +175,47 @@ def main(argv=None):
print(f"{args.function} not a function")
return -1
print_features(function_handles, extractor)
return 0
print_function_features(function_handles, extractor)
def ida_main():
import idc
def dynamic_analysis(extractor: capa.features.extractors.base_extractor.DynamicExtractor, args):
for feature, addr in extractor.extract_global_features():
print(f"global: {format_address(addr)}: {feature}")
import capa.features.extractors.ida.extractor
function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START)
print(f"getting features for current function {hex(function)}")
extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor()
if not function:
if not args.process:
for feature, addr in extractor.extract_file_features():
print(f"file: {format_address(addr)}: {feature}")
return
function_handles = tuple(extractor.get_functions())
process_handles = tuple(extractor.get_processes())
if function:
function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles))
if len(function_handles) == 0:
print(f"{hex(function)} not a function")
if args.process:
process_handles = tuple(filter(lambda ph: ph.inner["name"] == args.process, process_handles)):
if args.process not in [ph.inner["name"] for ph in args.process]:
print(f"{args.process} not a process")
return -1
print_features(function_handles, extractor)
return 0
print_process_features(process_handles, extractor)
def print_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor):
def print_process_features(processes, extractor: capa.features.extractors.base_extractor.DynamicExtractor):
for p in processes:
print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})")
for feature, addr in extractor.extract_process_features(p):
if capa.features.common.is_global_feature(feature):
continue
print(f" proc: {p.inner['name']}: {feature}")
for t in extractor.get_threads(p):
for feature, addr in extractor.get_thread_features(p, t):
if capa.features.common.is_global_feature(feature):
continue
print(f" thread: {t.tid}": {feature})
def print_function_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor):
for f in functions:
if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address)
@@ -234,6 +260,35 @@ def print_features(functions, extractor: capa.features.extractors.base_extractor
# may be an issue while piping to less and encountering non-ascii characters
continue
def ida_main():
import idc
import capa.features.extractors.ida.extractor
function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START)
print(f"getting features for current function {hex(function)}")
extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor()
if not function:
for feature, addr in extractor.extract_file_features():
print(f"file: {format_address(addr)}: {feature}")
return
function_handles = tuple(extractor.get_functions())
if function:
function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles))
if len(function_handles) == 0:
print(f"{hex(function)} not a function")
return -1
print_features(function_handles, extractor)
return 0
if __name__ == "__main__":
if capa.main.is_runtime_ida():