diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 2d2d10f4..2b627cc7 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -6,6 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import datetime +import collections from typing import Any, Dict, List, Tuple, Union, Optional from pydantic import Field, BaseModel @@ -262,6 +263,55 @@ def node_from_capa(node: Union[capa.engine.Statement, capa.engine.Feature]) -> N assert_never(node) +def node_to_capa( + node: Node, children: List[Union[capa.engine.Statement, capa.engine.Feature]] +) -> Union[capa.engine.Statement, capa.engine.Feature]: + if isinstance(node, StatementNode): + if isinstance(node.statement, CompoundStatement): + if node.statement.type == CompoundStatementType.AND: + return capa.engine.And(description=node.statement.description, children=children) + + elif node.statement.type == CompoundStatementType.OR: + return capa.engine.Or(description=node.statement.description, children=children) + + elif node.statement.type == CompoundStatementType.NOT: + return capa.engine.Not(description=node.statement.description, child=children[0]) + + elif node.statement.type == CompoundStatementType.OPTIONAL: + return capa.engine.Some(description=node.statement.description, count=0, children=children) + + else: + assert_never(node.statement.type) + + elif isinstance(node.statement, SomeStatement): + return capa.engine.Some( + description=node.statement.description, count=node.statement.count, children=children + ) + + elif isinstance(node.statement, RangeStatement): + return capa.engine.Range( + description=node.statement.description, + min=node.statement.min, + max=node.statement.max, + child=node.statement.child.to_capa(), + ) + + elif isinstance(node.statement, SubscopeStatement): + raise NotImplementedError("deserializing subscope statements are not supported") + return capa.engine.Subscope( + description=node.statement.description, scope=node.statement.scope, child=children[0] + ) + + else: + assert_never(node.statement) + + elif isinstance(node, FeatureNode): + return node.feature.to_capa() + + else: + assert_never(node) + + class Match(FrozenModel): """ args: @@ -394,6 +444,42 @@ class Match(FrozenModel): captures={capture: tuple(captures[capture]) for capture in captures}, ) + def to_capa(self, rules: RuleSet) -> capa.engine.Result: + children = [child.to_capa(rules) for child in self.children] + statement = node_to_capa(self.node, [child.statement for child in children]) + + if isinstance(self.node, FeatureNode): + feature = self.node.feature + + if isinstance(feature, (frzf.SubstringFeature, frzf.RegexFeature)): + matches = {capture: {loc.to_capa() for loc in locs} for capture, locs in self.captures.items()} + + if isinstance(feature, frzf.SubstringFeature): + assert isinstance(statement, capa.features.common.Substring) + statement = capa.features.common._MatchedSubstring(statement, matches) + elif isinstance(feature, frzf.RegexFeature): + assert isinstance(statement, capa.features.common.Regex) + statement = capa.features.common._MatchedRegex(statement, matches) + else: + assert_never(feature) + + if ( + isinstance(self.node, FeatureNode) + and isinstance(self.node.feature, frzf.MatchFeature) + # only add subtree on success, + # because there won't be results for the other rule on failure. + and self.success + ): + # TODO: work is needed fixup subscope matches here. + raise NotImplementedError("deserializing subscope matches are not yet supported") + + return capa.features.common.Result( + success=self.success, + statement=statement, + locations={loc.to_capa() for loc in self.locations}, + children=children, + ) + def parse_parts_id(s: str): id_ = "" @@ -581,35 +667,16 @@ class ResultDocument(FrozenModel): def to_capa(self) -> Tuple[Dict, Dict]: meta = self.meta.to_capa() - capabilities: Dict[str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]] = {} + capabilities: Dict[ + str, List[Tuple[capa.features.address.Address, capa.features.common.Result]] + ] = collections.defaultdict(list) + + rules = capa.rules.RuleSet([capa.rules.Rule.from_yaml(rule_match.source) for rule_match in self.rules.values()]) for rule_name, rule_match in self.rules.items(): - # Parse the YAML source into a Rule instance - rule = capa.rules.Rule.from_yaml(rule_match.source) - - # Extract the capabilities from the RuleMatches object for addr, match in rule_match.matches: - if isinstance(match.node, StatementNode): - if isinstance(match.node.statement, CompoundStatement): - statement = rule.statement - else: - statement = statement_from_capa(match.node.statement) - elif isinstance(match.node, FeatureNode): - statement = match.node.feature.to_capa() - if isinstance(statement, (capa.features.common.String, capa.features.common.Regex)): - statement.matches = match.captures - else: - raise ValueError("Invalid node type") + result: capa.engine.Result = match.to_capa(rules) - result = capa.features.common.Result( - statement=statement, - success=match.success, - locations={loc.to_capa() for loc in match.locations}, - children=[], - ) - - if rule_name not in capabilities: - capabilities[rule_name] = [] capabilities[rule_name].append((addr.to_capa(), result)) return meta, capabilities