code style changes

This commit is contained in:
Yacine Elhamer
2023-06-15 13:21:17 +01:00
parent e1535dd574
commit dbad921fa5
7 changed files with 40 additions and 48 deletions

View File

@@ -296,12 +296,13 @@ class DynamicExtractor(FeatureExtractor):
"""
DynamicExtractor defines the interface for fetching features from a sandbox' analysis of a sample.
Features are grouped mainly into threads that alongside their meta-features are also grouped into
processes (that also have their own features). Other scopes (such as function and file) may also apply
Features are grouped mainly into threads that alongside their meta-features are also grouped into
processes (that also have their own features). Other scopes (such as function and file) may also apply
for a specific sandbox.
This class is not instantiated directly; it is the base class for other implementations.
"""
@abc.abstractmethod
def get_processes(self) -> Iterator[ProcessHandle]:
"""

View File

@@ -9,13 +9,13 @@
import logging
from typing import Dict, Tuple, Iterator
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.features.common import Feature
from capa.features.address import Address
from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle, DynamicExtractor
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicExtractor
logger = logging.getLogger(__name__)
@@ -28,13 +28,12 @@ class CapeExtractor(DynamicExtractor):
self.global_features = capa.features.extractors.cape.global_.extract_features(self.static)
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features
def get_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.file.extract_features(self.static)
def get_processes(self) -> Iterator[ProcessHandle]:
yield from capa.features.extractors.cape.process.get_processes(self.behavior)
@@ -47,7 +46,6 @@ class CapeExtractor(DynamicExtractor):
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.thread.extract_features(self.behavior, ph, th)
@classmethod
def from_report(cls, report: Dict) -> "DynamicExtractor":
# todo:
@@ -64,4 +62,4 @@ class CapeExtractor(DynamicExtractor):
behavior = report.pop("behavior")
behavior["network"] = report.pop("network")
return cls(static, behavior)
return cls(static, behavior)

View File

@@ -9,10 +9,9 @@
import logging
from typing import Any, Dict, List, Tuple, Iterator
from capa.features.common import Feature, String
from capa.features.file import Section, Import, Export, FunctionName
from capa.features.address import Address, AbsoluteVirtualAddress, NO_ADDRESS
from capa.features.file import Export, Import, Section, FunctionName
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
logger = logging.getLogger(__name__)
@@ -72,4 +71,4 @@ FILE_HANDLERS = (
extract_used_regkeys,
extract_used_files,
extract_used_mutexes,
)
)

View File

@@ -9,23 +9,22 @@
import logging
from typing import Tuple, Iterator
from capa.features.address import Address, NO_ADDRESS
from capa.features.common import (
OS,
OS_ANY,
ARCH_I386,
ARCH_AMD64,
ARCH_ANY,
FORMAT_PE,
FORMAT_ELF,
FORMAT_UNKNOWN,
OS_WINDOWS,
OS_LINUX,
ARCH_I386,
FORMAT_PE,
ARCH_AMD64,
FORMAT_ELF,
OS_WINDOWS,
FORMAT_UNKNOWN,
Arch,
Format,
Feature,
)
from capa.features.address import NO_ADDRESS, Address
logger = logging.getLogger(__name__)
@@ -90,4 +89,4 @@ GLOBAL_HANDLER = (
extract_arch,
extract_format,
extract_os,
)
)

View File

@@ -9,13 +9,13 @@
import logging
from typing import Any, Dict, List, Tuple, Iterator
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
from capa.features.common import Feature, String
from capa.features.address import Address, AbsoluteVirtualAddress, NO_ADDRESS
from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle, DynamicExtractor
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicExtractor
logger = logging.getLogger(__name__)
@@ -54,8 +54,8 @@ def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple
if not environ:
return
for (variable, value) in environ.items():
for variable, value in environ.items():
if value:
yield String(value), NO_ADDRESS
@@ -66,6 +66,4 @@ def extract_features(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Featur
yield feature, addr
PROCESS_HANDLERS = (
extract_environ_strings
)
PROCESS_HANDLERS = extract_environ_strings

View File

@@ -9,16 +9,15 @@
import logging
from typing import Any, Dict, List, Tuple, Iterator
from capa.features.common import Feature, String
from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address
from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)
def extract_call_features(behavior: Dict, ph:ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
"""
this method goes through the specified thread's call trace, and extracts all possible
features such as: API, Number (for arguments), String (for arguments).
@@ -32,10 +31,10 @@ def extract_call_features(behavior: Dict, ph:ProcessHandle, th: ThreadHandle) ->
Feature, address; where Feature is either: API, Number, or String.
"""
calls:List[Dict] = None
calls: List[Dict] = None
for process in behavior["processes"]:
if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]:
calls:List[Dict] = process
calls: List[Dict] = process
tid = str(th.tid)
for call in calls:
@@ -51,6 +50,4 @@ def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Ite
yield feature, addr
THREAD_HANDLERS = (
extract_call_features,
)
THREAD_HANDLERS = (extract_call_features,)

View File

@@ -6,7 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import abc
from typing import Tuple, Union, Optional, Dict
from typing import Dict, Tuple, Union, Optional
import capa.helpers
from capa.features.common import VALID_FEATURE_ACCESS, Feature
@@ -41,8 +41,8 @@ class API(Feature):
def __eq__(self, other):
if not isinstance(other, API):
return False
assert(isinstance(other, API))
assert isinstance(other, API)
if {} in (self.args, other.args) or False in (self.ret, other.ret):
# Legacy API feature
return super().__eq__(other)
@@ -64,12 +64,12 @@ class API(Feature):
match = re.findall(r"(.*)\((.*)\)", match[0][0])
if len(match[0]) == 2:
args = (match[0][1]+", ").split(", ")
args = (match[0][1] + ", ").split(", ")
map(lambda x: {f"arg{x[0]}": x[1]}, enumerate(args))
args = [{} | arg for arg in args][0]
return match[0][0], args, ret
class _AccessFeature(Feature, abc.ABC):
# superclass: don't use directly