Merge branch 'dynamic-features' into cape-extractor

This commit is contained in:
Yacine Elhamer
2023-06-13 14:26:34 +01:00
4 changed files with 93 additions and 24 deletions

View File

@@ -272,6 +272,42 @@ class _MatchedSubstring(Substring):
return f'substring("{self.value}", matches = {matches})'
class Registry(String):
# todo: add a way to tell whether this registry key was created, accessed, or deleted.
def __init__(self, value: str, description=None):
super().__init__(value, description)
def __eq__(self, other):
# Registry instance is in a ruleset
if isinstance(other, Registry):
return super().__eq__(other)
return False
class Filename(String):
# todo: add a way to tell whether this file was created, accessed, or deleted.
def __init__(self, value: str, description=None):
super().__init__(value, description)
def __eq__(self, other):
# Mutex instance is in a ruleset
if isinstance(other, Filename):
return super().__eq__(other)
return False
class Mutex(String):
# todo: add a way to tell whether this mutex was created or used
def __init__(self, value: str, description=None):
super().__init__(value, description)
def __eq__(self, other):
# Mutex instance is in a ruleset
if isinstance(other, Mutex):
return super().__eq__(other)
return False
class Regex(String):
def __init__(self, value: str, description=None):
super().__init__(value, description=description)

View File

@@ -327,7 +327,7 @@ class DynamicExtractor(FeatureExtractor):
raise NotImplementedError()
@abc.abstractmethod
def get_threads(self, ph: ProcessHandle) -> Iterator[ProcessHandle]:
def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
"""
Yields all the threads that a process created.
@@ -341,26 +341,7 @@ class DynamicExtractor(FeatureExtractor):
"""
Yields all the features of a thread. These include:
- sequenced api traces
- files/registris interacted with
- file/registry interactions
- network activity
"""
raise NotImplementedError()
@abc.abstractclassmethod
def from_trace(cls, trace: TextIO) -> "DynamicExtractor":
"""
Most sandboxes provide reports in a serialized text format (i.e. JSON for Cuckoo and CAPE).
This routine takes a file descriptor of such report (analysis trace) and returns a corresponding DynamicExtractor object.
"""
raise NotImplementedError()
@abc.abstractclassmethod
def submit_sample(cls, sample: BinaryIO, api: Dict[str, str]) -> "DynamicExtractor":
"""
This routine takes a sample and submits it for analysis to the provided api. The trace should then ideally be passed to the from_trace() method.
Attributes:
sample: file descriptor of the sample
api: contains information such as the uri, api key, etc.
"""
raise NotImplementedError()

View File

@@ -6,7 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import abc
from typing import Union, Optional
from typing import Tuple, Union, Optional, Dict
import capa.helpers
from capa.features.common import VALID_FEATURE_ACCESS, Feature
@@ -21,9 +21,55 @@ def hex(n: int) -> str:
class API(Feature):
def __init__(self, name: str, description=None):
super().__init__(name, description=description)
def __init__(self, signature: str, description=None):
if signature.isidentifier():
# api call is in the legacy format
super().__init__(signature, description=description)
self.args = {}
self.ret = False
else:
# api call is in the strace format and therefore has to be parsed
name, self.args, self.ret = self.parse_signature(signature)
super().__init__(name, description=description)
# store the original signature for hashing purposes
self.signature = signature
def __hash__(self):
return hash(self.signature)
def __eq__(self, other):
if not isinstance(other, API):
return False
assert(isinstance(other, API))
if {} in (self.args, other.args) or False in (self.ret, other.ret):
# Legacy API feature
return super().__eq__(other)
# API call with arguments
return super().__eq__(other) and self.args == other.args and self.ret == other.ret
def parse_signature(self, signature: str) -> Tuple[str, Optional[Dict[str, str]], Optional[str]]:
# todo: optimize this method and improve the code quality
import re
args = ret = False
match = re.findall(r"(.+\(.*\)) ?=? ?([^=]*)", signature)
if not match:
return "", None, None
if len(match[0]) == 2:
ret = match[0][1]
match = re.findall(r"(.*)\((.*)\)", match[0][0])
if len(match[0]) == 2:
args = (match[0][1]+", ").split(", ")
map(lambda x: {f"arg{x[0]}": x[1]}, enumerate(args))
args = [{} | arg for arg in args][0]
return match[0][0], args, ret
class _AccessFeature(Feature, abc.ABC):
# superclass: don't use directly

View File

@@ -261,6 +261,12 @@ def parse_feature(key: str):
return capa.features.common.StringFactory
elif key == "substring":
return capa.features.common.Substring
elif key == "registry":
return capa.features.common.Registry
elif key == "filename":
return capa.features.common.Filename
elif key == "mutex":
return capa.features.common.Mutex
elif key == "bytes":
return capa.features.common.Bytes
elif key == "number":