report: better compute dynamic layout

This commit is contained in:
Willi Ballenthin
2023-11-03 13:15:29 +00:00
parent 4a7e488e4c
commit 274a710bb1

View File

@@ -1061,13 +1061,6 @@ def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabiliti
"""
assert isinstance(extractor, DynamicFeatureExtractor)
matched_threads: Set[Address] = set()
for rule_name, matches in capabilities.items():
rule = rules[rule_name]
if capa.rules.Scope.THREAD in rule.scopes:
for addr, _ in matches:
matched_threads.add(addr)
matched_calls: Set[Address] = set()
def result_rec(result: capa.features.common.Result):
@@ -1081,22 +1074,33 @@ def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabiliti
for _, result in matches:
result_rec(result)
processes_by_thread: Dict[Address, Address] = {}
threads_by_processes: Dict[Address, List[Address]] = {}
names_by_process: Dict[Address, str] = {}
calls_by_thread: Dict[Address, List[Address]] = {}
names_by_call: Dict[Address, str] = {}
matched_processes: Set[Address] = set()
matched_threads: Set[Address] = set()
threads_by_process: Dict[Address, List[Address]] = {}
calls_by_thread: Dict[Address, List[Address]] = {}
for p in extractor.get_processes():
threads_by_processes[p.address] = []
names_by_process[p.address] = extractor.get_process_name(p)
threads_by_process[p.address] = []
for t in extractor.get_threads(p):
processes_by_thread[t.address] = p.address
threads_by_processes[p.address].append(t.address)
calls_by_thread[t.address] = []
for c in extractor.get_calls(p, t):
calls_by_thread[t.address].append(c.address)
if c.address in matched_calls:
names_by_call[c.address] = extractor.get_call_name(p, t, c)
calls_by_thread[t.address].append(c.address)
if calls_by_thread[t.address]:
matched_threads.add(t.address)
threads_by_process[p.address].append(t.address)
if threads_by_process[p.address]:
matched_processes.add(p.address)
names_by_process[p.address] = extractor.get_process_name(p)
layout = rdoc.DynamicLayout(
processes=tuple(
@@ -1120,8 +1124,8 @@ def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabiliti
) # this object is open to extension in the future,
# such as with the function name, etc.
)
for p, threads in threads_by_processes.items()
if len([t for t in threads if t in matched_threads]) > 0
for p, threads in threads_by_process.items()
if p in matched_processes
)
)