From 274a710bb1b457e431e80a7815716ddeef66fe2a Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Fri, 3 Nov 2023 13:15:29 +0000 Subject: [PATCH] report: better compute dynamic layout --- capa/main.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/capa/main.py b/capa/main.py index 3b238a33..1a856b86 100644 --- a/capa/main.py +++ b/capa/main.py @@ -1061,13 +1061,6 @@ def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabiliti """ assert isinstance(extractor, DynamicFeatureExtractor) - matched_threads: Set[Address] = set() - for rule_name, matches in capabilities.items(): - rule = rules[rule_name] - if capa.rules.Scope.THREAD in rule.scopes: - for addr, _ in matches: - matched_threads.add(addr) - matched_calls: Set[Address] = set() def result_rec(result: capa.features.common.Result): @@ -1081,22 +1074,33 @@ def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabiliti for _, result in matches: result_rec(result) - processes_by_thread: Dict[Address, Address] = {} - threads_by_processes: Dict[Address, List[Address]] = {} names_by_process: Dict[Address, str] = {} - calls_by_thread: Dict[Address, List[Address]] = {} names_by_call: Dict[Address, str] = {} + + matched_processes: Set[Address] = set() + matched_threads: Set[Address] = set() + + threads_by_process: Dict[Address, List[Address]] = {} + calls_by_thread: Dict[Address, List[Address]] = {} + for p in extractor.get_processes(): - threads_by_processes[p.address] = [] - names_by_process[p.address] = extractor.get_process_name(p) + threads_by_process[p.address] = [] + for t in extractor.get_threads(p): - processes_by_thread[t.address] = p.address - threads_by_processes[p.address].append(t.address) calls_by_thread[t.address] = [] + for c in extractor.get_calls(p, t): - calls_by_thread[t.address].append(c.address) if c.address in matched_calls: names_by_call[c.address] = extractor.get_call_name(p, t, c) + calls_by_thread[t.address].append(c.address) + + if calls_by_thread[t.address]: + matched_threads.add(t.address) + threads_by_process[p.address].append(t.address) + + if threads_by_process[p.address]: + matched_processes.add(p.address) + names_by_process[p.address] = extractor.get_process_name(p) layout = rdoc.DynamicLayout( processes=tuple( @@ -1120,8 +1124,8 @@ def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabiliti ) # this object is open to extension in the future, # such as with the function name, etc. ) - for p, threads in threads_by_processes.items() - if len([t for t in threads if t in matched_threads]) > 0 + for p, threads in threads_by_process.items() + if p in matched_processes ) )