mirror of
https://github.com/mandiant/capa.git
synced 2025-12-09 06:10:36 -08:00
Compare commits
25 Commits
v7.0.0-bet
...
v7.0.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ddb6b0773 | ||
|
|
5fd532845c | ||
|
|
2a59284621 | ||
|
|
9adb669921 | ||
|
|
034894330b | ||
|
|
a3a8e36911 | ||
|
|
2c93c5fc83 | ||
|
|
9929967634 | ||
|
|
3436aab3fd | ||
|
|
9a76558fdf | ||
|
|
2e5761a414 | ||
|
|
2f2d4a1d6b | ||
|
|
1a4f2559fa | ||
|
|
66c2f07ca8 | ||
|
|
75800b9d2e | ||
|
|
bae4091661 | ||
|
|
ba044a980f | ||
|
|
2e7642ef8a | ||
|
|
3e4479e3bb | ||
|
|
437732174b | ||
|
|
f845382471 | ||
|
|
06aa3f6528 | ||
|
|
45ebc3e3d6 | ||
|
|
c3301d3b3f | ||
|
|
d2e1a47192 |
2
.github/flake8.ini
vendored
2
.github/flake8.ini
vendored
@@ -10,6 +10,8 @@ extend-ignore =
|
||||
F811,
|
||||
# E501 line too long (prefer black)
|
||||
E501,
|
||||
# E701 multiple statements on one line (colon) (prefer black, see https://github.com/psf/black/issues/4173)
|
||||
E701,
|
||||
# B010 Do not call setattr with a constant attribute value
|
||||
B010,
|
||||
# G200 Logging statement uses exception in arguments
|
||||
|
||||
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,8 +1,6 @@
|
||||
[submodule "rules"]
|
||||
path = rules
|
||||
url = ../capa-rules.git
|
||||
branch = dynamic-syntax
|
||||
[submodule "tests/data"]
|
||||
path = tests/data
|
||||
url = ../capa-testfiles.git
|
||||
branch = dynamic-feature-extractor
|
||||
|
||||
48
CHANGELOG.md
48
CHANGELOG.md
@@ -4,29 +4,47 @@
|
||||
|
||||
### New Features
|
||||
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
|
||||
### New Rules (0)
|
||||
|
||||
-
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
|
||||
### capa explorer IDA Pro plugin
|
||||
|
||||
### Development
|
||||
|
||||
### Raw diffs
|
||||
- [capa v7.0.0-beta...master](https://github.com/mandiant/capa/compare/v7.0.0-beta...master)
|
||||
- [capa-rules v7.0.0-beta...master](https://github.com/mandiant/capa-rules/compare/v7.0.0-beta...master)
|
||||
- [capa v7.0.1...master](https://github.com/mandiant/capa/compare/v7.0.1...master)
|
||||
- [capa-rules v7.0.1...master](https://github.com/mandiant/capa-rules/compare/v7.0.1...master)
|
||||
|
||||
## v7.0.0-beta
|
||||
This is the beta release of capa v7.0 which was mainly worked on during the Google Summer of Code (GSoC) 2023. A huge
|
||||
shoutout to @colton-gabertan and @yelhamer for their amazing work.
|
||||
## v7.0.1
|
||||
|
||||
This release fixes a circular import error when using capa as a library.
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
- fix potentially circular import errors #1969 @williballenthin
|
||||
|
||||
### Raw diffs
|
||||
- [capa v7.0.0...v7.0.1](https://github.com/mandiant/capa/compare/v7.0.0...v7.0.1)
|
||||
- [capa-rules v7.0.0...v7.0.1](https://github.com/mandiant/capa-rules/compare/v7.0.0...v7.0.1)
|
||||
|
||||
## v7.0.0
|
||||
This is the v7.0.0 release of capa which was mainly worked on during the Google Summer of Code (GSoC) 2023. A huge
|
||||
shoutout to our GSoC contributors @colton-gabertan and @yelhamer for their amazing work.
|
||||
|
||||
Also, a big thanks to the other contributors: @aaronatp, @Aayush-Goel-04, @bkojusner, @doomedraven, @ruppde, @larchchen, @JCoonradt, and @xusheng6.
|
||||
|
||||
Also a big thanks to the other contributors: @aaronatp, @Aayush-Goel-04, @bkojusner, @doomedraven, @ruppde, and @xusheng6.
|
||||
### New Features
|
||||
|
||||
- add Ghidra backend #1770 #1767 @colton-gabertan @mike-hunhoff
|
||||
- add Ghidra UI integration #1734 @colton-gabertan @mike-hunhoff
|
||||
- add dynamic analysis via CAPE sandbox reports #48 #1535 @yelhamer
|
||||
- add call scope #771 @yelhamer
|
||||
- add thread scope #1517 @yelhamer
|
||||
@@ -45,6 +63,9 @@ Also a big thanks to the other contributors: @aaronatp, @Aayush-Goel-04, @bkojus
|
||||
- protobuf: deprecate `Metadata.analysis` in favor of `Metadata.analysis2` that is dynamic analysis aware @williballenthin
|
||||
- update freeze format to v3, adding support for dynamic analysis @williballenthin
|
||||
- extractor: ignore DLL name for api features #1815 @mr-tz
|
||||
- main: introduce wrapping routines within main for working with CLI args #1813 @williballenthin
|
||||
- move functions from `capa.main` to new `capa.loader` namespace #1821 @williballenthin
|
||||
- proto: add `package` declaration #1960 @larchchen
|
||||
|
||||
### New Rules (41)
|
||||
|
||||
@@ -95,9 +116,12 @@ Also a big thanks to the other contributors: @aaronatp, @Aayush-Goel-04, @bkojus
|
||||
- binja: use `binaryninja.load` to open files @xusheng6
|
||||
- binja: bump binja version to 3.5 #1789 @xusheng6
|
||||
- elf: better detect ELF OS via GCC .ident directives #1928 @williballenthin
|
||||
- elf: better detect ELF OS via Android dependencies #1947 @williballenthin
|
||||
- fix setuptools package discovery #1886 @gmacon @mr-tz
|
||||
- remove unnecessary scripts/vivisect-py2-vs-py3.sh file #1949 @JCoonradt
|
||||
|
||||
### capa explorer IDA Pro plugin
|
||||
- various integration updates and minor bug fixes
|
||||
|
||||
### Development
|
||||
- update ATT&CK/MBC data for linting #1932 @mr-tz
|
||||
@@ -114,9 +138,17 @@ of importing the relevant logic from the main file.
|
||||
For sandbox-based feature extractors, we are using Pydantic models. Contributions of more models for other sandboxes
|
||||
are very welcome!
|
||||
|
||||
With this release we've reorganized the logic found in `main()` to localize logic and ease readability and ease changes
|
||||
and integrations. The new "main routines" are expected to be used only within main functions, either capa main or
|
||||
related scripts. These functions should not be invoked from library code.
|
||||
|
||||
Beyond copying code around, we've refined the handling of the input file/format/backend. The logic for picking the
|
||||
format and backend is more consistent. We've documented that the input file is not necessarily the sample itself
|
||||
(cape/freeze/etc.) inputs are not actually the sample.
|
||||
|
||||
### Raw diffs
|
||||
- [capa v6.1.0...v7.0.0-beta](https://github.com/mandiant/capa/compare/v6.1.0...v7.0.0-beta)
|
||||
- [capa-rules v6.1.0...v7.0.0-beta](https://github.com/mandiant/capa-rules/compare/v6.1.0...v7.0.0-beta)
|
||||
- [capa v6.1.0...v7.0.0](https://github.com/mandiant/capa/compare/v6.1.0...v7.0.0)
|
||||
- [capa-rules v6.1.0...v7.0.0](https://github.com/mandiant/capa-rules/compare/v6.1.0...v7.0.0)
|
||||
|
||||
## v6.1.0
|
||||
|
||||
|
||||
11
README.md
11
README.md
@@ -11,11 +11,12 @@ capa detects capabilities in executable files.
|
||||
You run it against a PE, ELF, .NET module, shellcode file, or a sandbox report and it tells you what it thinks the program can do.
|
||||
For example, it might suggest that the file is a backdoor, is capable of installing services, or relies on HTTP to communicate.
|
||||
|
||||
Check out:
|
||||
- the overview in our first [capa blog post](https://www.mandiant.com/resources/capa-automatically-identify-malware-capabilities)
|
||||
- the major version 2.0 updates described in our [second blog post](https://www.mandiant.com/resources/capa-2-better-stronger-faster)
|
||||
- the major version 3.0 (ELF support) described in the [third blog post](https://www.mandiant.com/resources/elfant-in-the-room-capa-v3)
|
||||
- the major version 4.0 (.NET support) described in the [fourth blog post](https://www.mandiant.com/resources/blog/capa-v4-casting-wider-net)
|
||||
Check out our capa blog posts:
|
||||
- [Dynamic capa: Exploring Executable Run-Time Behavior with the CAPE Sandbox](https://www.mandiant.com/resources/blog/dynamic-capa-executable-behavior-cape-sandbox)
|
||||
- [capa v4: casting a wider .NET](https://www.mandiant.com/resources/blog/capa-v4-casting-wider-net) (.NET support)
|
||||
- [ELFant in the Room – capa v3](https://www.mandiant.com/resources/elfant-in-the-room-capa-v3) (ELF support)
|
||||
- [capa 2.0: Better, Stronger, Faster](https://www.mandiant.com/resources/capa-2-better-stronger-faster)
|
||||
- [capa: Automatically Identify Malware Capabilities](https://www.mandiant.com/resources/capa-automatically-identify-malware-capabilities)
|
||||
|
||||
```
|
||||
$ capa.exe suspicious.exe
|
||||
|
||||
@@ -10,8 +10,7 @@ import abc
|
||||
|
||||
class Address(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def __eq__(self, other):
|
||||
...
|
||||
def __eq__(self, other): ...
|
||||
|
||||
@abc.abstractmethod
|
||||
def __lt__(self, other):
|
||||
|
||||
@@ -458,18 +458,22 @@ FORMAT_AUTO = "auto"
|
||||
FORMAT_SC32 = "sc32"
|
||||
FORMAT_SC64 = "sc64"
|
||||
FORMAT_CAPE = "cape"
|
||||
FORMAT_FREEZE = "freeze"
|
||||
FORMAT_RESULT = "result"
|
||||
STATIC_FORMATS = {
|
||||
FORMAT_SC32,
|
||||
FORMAT_SC64,
|
||||
FORMAT_PE,
|
||||
FORMAT_ELF,
|
||||
FORMAT_DOTNET,
|
||||
FORMAT_FREEZE,
|
||||
FORMAT_RESULT,
|
||||
}
|
||||
DYNAMIC_FORMATS = {
|
||||
FORMAT_CAPE,
|
||||
FORMAT_FREEZE,
|
||||
FORMAT_RESULT,
|
||||
}
|
||||
FORMAT_FREEZE = "freeze"
|
||||
FORMAT_RESULT = "result"
|
||||
FORMAT_UNKNOWN = "unknown"
|
||||
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ MATCH_RESULT = b'{"meta":'
|
||||
MATCH_JSON_OBJECT = b'{"'
|
||||
|
||||
|
||||
def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]:
|
||||
def extract_file_strings(buf: bytes, **kwargs) -> Iterator[Tuple[String, Address]]:
|
||||
"""
|
||||
extract ASCII and UTF-16 LE strings from file
|
||||
"""
|
||||
@@ -56,7 +56,7 @@ def extract_file_strings(buf, **kwargs) -> Iterator[Tuple[String, Address]]:
|
||||
yield String(s.s), FileOffsetAddress(s.offset)
|
||||
|
||||
|
||||
def extract_format(buf) -> Iterator[Tuple[Feature, Address]]:
|
||||
def extract_format(buf: bytes) -> Iterator[Tuple[Feature, Address]]:
|
||||
if buf.startswith(MATCH_PE):
|
||||
yield Format(FORMAT_PE), NO_ADDRESS
|
||||
elif buf.startswith(MATCH_ELF):
|
||||
|
||||
@@ -866,6 +866,8 @@ def guess_os_from_ident_directive(elf: ELF) -> Optional[OS]:
|
||||
return OS.LINUX
|
||||
elif "Red Hat" in comment:
|
||||
return OS.LINUX
|
||||
elif "Android" in comment:
|
||||
return OS.ANDROID
|
||||
|
||||
return None
|
||||
|
||||
@@ -921,6 +923,8 @@ def guess_os_from_needed_dependencies(elf: ELF) -> Optional[OS]:
|
||||
return OS.HURD
|
||||
if needed.startswith("libandroid.so"):
|
||||
return OS.ANDROID
|
||||
if needed.startswith("liblog.so"):
|
||||
return OS.ANDROID
|
||||
|
||||
return None
|
||||
|
||||
@@ -1023,10 +1027,6 @@ def detect_elf_os(f) -> str:
|
||||
if osabi_guess:
|
||||
ret = osabi_guess
|
||||
|
||||
elif ident_guess:
|
||||
# we don't trust this too much due to non-cross-compilation assumptions
|
||||
ret = ident_guess
|
||||
|
||||
elif ph_notes_guess:
|
||||
ret = ph_notes_guess
|
||||
|
||||
@@ -1045,6 +1045,11 @@ def detect_elf_os(f) -> str:
|
||||
elif symtab_guess:
|
||||
ret = symtab_guess
|
||||
|
||||
elif ident_guess:
|
||||
# at the bottom because we don't trust this too much
|
||||
# due to potential for bugs with cross-compilation.
|
||||
ret = ident_guess
|
||||
|
||||
return ret.value if ret is not None else "unknown"
|
||||
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ Unless required by applicable law or agreed to in writing, software distributed
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
|
||||
import json
|
||||
import zlib
|
||||
import logging
|
||||
@@ -681,14 +682,18 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="save capa features to a file")
|
||||
capa.main.install_common_args(parser, {"sample", "format", "backend", "os", "signatures"})
|
||||
capa.main.install_common_args(parser, {"input_file", "format", "backend", "os", "signatures"})
|
||||
parser.add_argument("output", type=str, help="Path to output file")
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
sigpaths = capa.main.get_signatures(args.signatures)
|
||||
|
||||
extractor = capa.main.get_extractor(args.sample, args.format, args.os, args.backend, sigpaths, False)
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
capa.main.ensure_input_exists_from_cli(args)
|
||||
input_format = capa.main.get_input_format_from_cli(args)
|
||||
backend = capa.main.get_backend_from_cli(args, input_format)
|
||||
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
Path(args.output).write_bytes(dump(extractor))
|
||||
|
||||
|
||||
@@ -2,23 +2,46 @@
|
||||
<img src="/doc/img/ghidra_backend_logo.png" width=300 height=175>
|
||||
</div>
|
||||
|
||||
The Ghidra feature extractor is an application of the FLARE team's open-source project, Ghidrathon, to integrate capa with Ghidra using Python 3. capa is a framework that uses a well-defined collection of rules to identify capabilities in a program. You can run capa against a PE file, ELF file, or shellcode and it tells you what it thinks the program can do. For example, it might suggest that the program is a backdoor, can install services, or relies on HTTP to communicate. The Ghidra feature extractor can be used to run capa analysis on your Ghidra databases without needing access to the original binary file.
|
||||
The Ghidra feature extractor is an application of the FLARE team's open-source project, Ghidrathon, to integrate capa with Ghidra using Python 3. capa is a framework that uses a well-defined collection of rules to identify capabilities in a program. You can run capa against a PE file, ELF file, or shellcode and it tells you what it thinks the program can do. For example, it might suggest that the program is a backdoor, can install services, or relies on HTTP to communicate. The Ghidra feature extractor can be used to run capa analysis on your Ghidra databases without needing access to the original binary file. As a part of this integration, we've developed two scripts, [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_explorer.py) and [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py), to display capa results directly in Ghidra.
|
||||
|
||||
### Using `capa_explorer.py`
|
||||
|
||||
`capa_explorer.py` integrates capa results directly into Ghidra's UI. In the Symbol Tree Window, under the Namespaces section, you can find the matched rules as well as the corresponding functions that contain the matched features:
|
||||
|
||||

|
||||
|
||||
Labeled functions may be clicked in the Symbol Tree Window to navigate Ghidra's Disassembly Listing and Decompilation windows to the function locations. A comment listing each matched capa rule is inserted at the beginning of the function and a comment for each matched capa feature is added at the matched address within the function. These comments can be viewed using Ghidra's Disassembly Listing and Decompilation windows:
|
||||
|
||||

|
||||
|
||||
The script also adds bookmarks for capa matches that are categorized under MITRE ATT&CK and Malware Behavior Catalog. These may be found and navigated using Ghidra's Bookmarks Window:
|
||||
|
||||

|
||||
|
||||
### Using `capa_ghidra.py`
|
||||
|
||||
`capa_ghidra.py` displays capa results in Ghidra's Console window and can be executed using Ghidra's Headless Analyzer. The following is an example of running `capa_ghidra.py` using the Ghidra Script Manager:
|
||||
|
||||
Selecting capa rules:
|
||||
<img src="/doc/img/ghidra_script_mngr_rules.png">
|
||||
|
||||
Choosing output format:
|
||||
<img src="/doc/img/ghidra_script_mngr_verbosity.png">
|
||||
|
||||
Viewing results in Ghidra Console Window:
|
||||
<img src="/doc/img/ghidra_script_mngr_output.png">
|
||||
|
||||
## Getting Started
|
||||
## Installation
|
||||
|
||||
### Installation
|
||||
### Requirements
|
||||
|
||||
Please ensure that you have the following dependencies installed before continuing:
|
||||
|
||||
| Dependency | Version | Source |
|
||||
| Tool | Version | Source |
|
||||
|------------|---------|--------|
|
||||
| Ghidrathon | `>= 3.0.0` | https://github.com/mandiant/Ghidrathon |
|
||||
| Python | `>= 3.8` | https://www.python.org/downloads |
|
||||
| Ghidra | `>= 10.2` | https://ghidra-sre.org |
|
||||
| Ghidrathon | `>= 3.0.0` | https://github.com/mandiant/Ghidrathon/releases |
|
||||
| Ghidra | `>= 10.3.2` | https://github.com/NationalSecurityAgency/ghidra/releases |
|
||||
| Python | `>= 3.8.0` | https://www.python.org/downloads |
|
||||
|
||||
In order to run capa using using Ghidra, you must install capa as a library, obtain the official capa rules that match the capa version you have installed, and configure the Python 3 script [capa_ghidra.py](/capa/ghidra/capa_ghidra.py). You can do this by completing the following steps using the Python 3 interpreter that you have configured for your Ghidrathon installation:
|
||||
You can run capa in Ghidra by completing the following steps using the Python 3 interpreter that you have configured for your Ghidrathon installation:
|
||||
|
||||
1. Install capa and its dependencies from PyPI using the following command:
|
||||
```bash
|
||||
@@ -32,63 +55,52 @@ OR
|
||||
$ capa --version
|
||||
```
|
||||
|
||||
3. Copy [capa_ghidra.py](/capa/ghidra/capa_ghidra.py) to your `$USER_HOME/ghidra_scripts` directory or manually add `</path/to/ghidra_capa.py/>` to the Ghidra Script Manager.
|
||||
3. Copy [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_explorer.py) and [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) to your `$USER_HOME/ghidra_scripts` directory or manually add the absolute path of each script to the Ghidra Script Manager.
|
||||
|
||||
## Usage
|
||||
|
||||
After completing the installation steps you can execute `capa_ghidra.py` using the Ghidra Script Manager or Headless Analyzer.
|
||||
After completing the installation steps you can execute [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_explorer.py) and [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) using the Ghidra Script Manager. You can also execute [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) using Ghidra's Headless Analyzer.
|
||||
|
||||
### Ghidra Script Manager
|
||||
|
||||
To execute `capa_ghidra.py` using the Ghidra Script Manager, first open the Ghidra Script Manager by navigating to `Window > Script Manager` in the Ghidra Code Browser. Next, locate `capa_ghidra.py` by selecting the `Python 3 > capa` category or using the Ghidra Script Manager search funtionality. Finally, double-click `capa_ghidra.py` to execute the script. If you don't see `capa_ghidra.py`, make sure you have copied the script to your `$USER_HOME/ghidra_scripts` directory or manually added `</path/to/ghidra_capa.py/>` to the Ghidra Script Manager
|
||||
Use the following steps to execute [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_explorer.py) and [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) using Ghidra's Script Manager:
|
||||
1. Open the Ghidra Script Manager by navigating to `Window > Script Manager`
|
||||
2. Locate [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_explorer.py) and [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) by selecting the `Python 3 > capa` category or using the Ghidra Script Manager search functionality
|
||||
3. Double-click [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_explorer.py) or [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) to execute the script
|
||||
|
||||
When executed, `capa_ghidra.py` asks you to provide your capa rules directory and preferred output format. `capa_ghidra.py` supports `default`, `verbose`, and `vverbose` output formats when executed from the Ghidra Script Manager. `capa_ghidra.py` writes output to the Ghidra Console Window.
|
||||
If you don't see [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_explorer.py) and [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) make sure you have copied these scripts to your `$USER_HOME/ghidra_scripts` directory or manually added the absolute path of each script to the Ghidra Script Manager.
|
||||
|
||||
#### Example
|
||||
|
||||
The following is an example of running `capa_ghidra.py` using the Ghidra Script Manager:
|
||||
|
||||
Selecting capa rules:
|
||||
<img src="/doc/img/ghidra_script_mngr_rules.png">
|
||||
|
||||
Choosing output format:
|
||||
<img src="/doc/img/ghidra_script_mngr_verbosity.png">
|
||||
|
||||
Viewing results in Ghidra Console Window:
|
||||
<img src="/doc/img/ghidra_script_mngr_output.png">
|
||||
Both scripts ask you to provide the path of your capa rules directory. [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) also asks you to select `default`, `verbose`, and `vverbose` output formats used when writing output to the Ghidra Console Window.
|
||||
|
||||
### Ghidra Headless Analyzer
|
||||
|
||||
To execute `capa_ghidra.py` using the Ghidra Headless Analyzer, you can use the Ghidra `analyzeHeadless` script located in your `$GHIDRA_HOME/support` directory. You will need to provide the following arguments to the Ghidra `analyzeHeadless` script:
|
||||
To execute [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) using the Ghidra Headless Analyzer, you can use the Ghidra `analyzeHeadless` script located in your `<ghidra_install_path>/support` directory. You will need to provide the following arguments to the Ghidra `analyzeHeadless` script:
|
||||
|
||||
1. `</path/to/ghidra/project/>`: path to Ghidra project
|
||||
1. `<ghidra_project_path>`: path to Ghidra project
|
||||
2. `<ghidra_project_name>`: name of Ghidra Project
|
||||
3. `-process <sample_name>`: name of sample `<sample_name>`
|
||||
4. `-ScriptPath </path/to/capa_ghidra/>`: OPTIONAL argument specifying path `</path/to/capa_ghidra/>` to `capa_ghidra.py`
|
||||
5. `-PostScript capa_ghidra.py`: executes `capa_ghidra.py` as post-analysis script
|
||||
6. `"<capa_args>"`: single, quoted string containing capa arguments that must specify capa rules directory and output format, e.g. `"<path/to/capa/rules> --verbose"`. `capa_ghidra.py` supports `default`, `verbose`, `vverbose` and `json` formats when executed using the Ghidra Headless Analyzer. `capa_ghidra.py` writes output to the console window used to execute the Ghidra `analyzeHeadless` script.
|
||||
7. `-processor <languageID>`: required ONLY if sample `<sample_name>` is shellcode. More information on specifying the `<languageID>` can be found in the `$GHIDRA_HOME/support/analyzeHeadlessREADME.html` documentation.
|
||||
4. `-ScriptPath <capa_ghidra_path>`: OPTIONAL argument specifying the absolute path of [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py)
|
||||
5. `-PostScript capa_ghidra.py`: execute [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) as post-analysis script
|
||||
6. `"<capa_args>"`: single, quoted string containing capa arguments that must specify capa rules directory and output format, e.g. `"<capa_rules_path> --verbose"`. [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) supports `default`, `verbose`, `vverbose` and `json` formats when executed using the Ghidra Headless Analyzer. [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) writes output to the console window used to execute the Ghidra `analyzeHeadless` script.
|
||||
|
||||
The following is an example of combining these arguments into a single `analyzeHeadless` script command:
|
||||
|
||||
```
|
||||
$GHIDRA_HOME/support/analyzeHeadless </path/to/ghidra/project/> <ghidra_project_name> -process <sample_name> -PostScript capa_ghidra.py "/path/to/capa/rules/ --verbose"
|
||||
<ghidra_install_path>/support/analyzeHeadless <ghidra_project_path> <ghidra_project_name> -process <sample_name> -PostScript capa_ghidra.py "<capa_rules_path> --verbose"
|
||||
```
|
||||
|
||||
You may also want to run capa against a sample that you have not yet imported into your Ghidra project. The following is an example of importing a sample and running `capa_ghidra.py` using a single `analyzeHeadless` script command:
|
||||
You may also want to run capa against a sample that you have not yet imported into your Ghidra project. The following is an example of importing a sample and running [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) using a single `analyzeHeadless` script command:
|
||||
|
||||
```
|
||||
$GHIDRA_HOME/support/analyzeHeadless </path/to/ghidra/project/> <ghidra_project_name> -Import </path/to/sample> -PostScript capa_ghidra.py "/path/to/capa/rules/ --verbose"
|
||||
<ghidra_install_path>/support/analyzeHeadless <ghidra_project_path> <ghidra_project_name> -Import <sample_path> -PostScript capa_ghidra.py "<capa_rules_path> --verbose"
|
||||
```
|
||||
|
||||
You can also provide `capa_ghidra.py` the single argument `"help"` to view supported arguments when running the script using the Ghidra Headless Analyzer:
|
||||
You can also provide [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) the single argument `"help"` to view supported arguments when running the script using the Ghidra Headless Analyzer:
|
||||
```
|
||||
$GHIDRA_HOME/support/analyzeHeadless </path/to/ghidra/project/> <ghidra_project_name> -process <sample_name> -PostScript capa_ghidra.py "help"
|
||||
<ghidra_install_path>/support/analyzeHeadless <ghidra_project_path> <ghidra_project_name> -process <sample_name> -PostScript capa_ghidra.py "help"
|
||||
```
|
||||
|
||||
#### Example
|
||||
|
||||
The following is an example of running `capa_ghidra.py` against a shellcode sample using the Ghidra `analyzeHeadless` script:
|
||||
The following is an example of running [capa_ghidra.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ghidra/capa_ghidra.py) against a shellcode sample using the Ghidra `analyzeHeadless` script:
|
||||
```
|
||||
$ analyzeHeadless /home/wumbo/Desktop/ghidra_projects/ capa_test -process 499c2a85f6e8142c3f48d4251c9c7cd6.raw32 -processor x86:LE:32:default -PostScript capa_ghidra.py "/home/wumbo/capa/rules -vv"
|
||||
[...]
|
||||
|
||||
378
capa/ghidra/capa_explorer.py
Normal file
378
capa/ghidra/capa_explorer.py
Normal file
@@ -0,0 +1,378 @@
|
||||
# Integrate capa results with Ghidra UI
|
||||
# @author Colton Gabertan (gabertan.colton@gmail.com)
|
||||
# @category Python 3.capa
|
||||
|
||||
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import sys
|
||||
import json
|
||||
import logging
|
||||
import pathlib
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from ghidra.app.cmd.label import AddLabelCmd, CreateNamespacesCmd
|
||||
from ghidra.program.model.symbol import Namespace, SourceType, SymbolType
|
||||
|
||||
import capa
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.render.json
|
||||
import capa.ghidra.helpers
|
||||
import capa.capabilities.common
|
||||
import capa.features.extractors.ghidra.extractor
|
||||
|
||||
logger = logging.getLogger("capa_explorer")
|
||||
|
||||
|
||||
def add_bookmark(addr, txt, category="CapaExplorer"):
|
||||
"""create bookmark at addr"""
|
||||
currentProgram().getBookmarkManager().setBookmark(addr, "Info", category, txt) # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
|
||||
def create_namespace(namespace_str):
|
||||
"""create new Ghidra namespace for each capa namespace"""
|
||||
|
||||
cmd = CreateNamespacesCmd(namespace_str, SourceType.USER_DEFINED)
|
||||
cmd.applyTo(currentProgram()) # type: ignore [name-defined] # noqa: F821
|
||||
return cmd.getNamespace()
|
||||
|
||||
|
||||
def create_label(ghidra_addr, name, capa_namespace):
|
||||
"""custom label cmd to overlay symbols under capa-generated namespaces"""
|
||||
|
||||
# prevent duplicate labels under the same capa-generated namespace
|
||||
symbol_table = currentProgram().getSymbolTable() # type: ignore [name-defined] # noqa: F821
|
||||
for sym in symbol_table.getSymbols(ghidra_addr):
|
||||
if sym.getName(True) == capa_namespace.getName(True) + Namespace.DELIMITER + name:
|
||||
return
|
||||
|
||||
# create SymbolType.LABEL at addr
|
||||
# prioritize capa-generated namespace (duplicate match @ new addr), else put under global Ghidra one (new match)
|
||||
cmd = AddLabelCmd(ghidra_addr, name, True, SourceType.USER_DEFINED)
|
||||
cmd.applyTo(currentProgram()) # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
# assign new match overlay label to capa-generated namespace
|
||||
cmd.getSymbol().setNamespace(capa_namespace)
|
||||
return
|
||||
|
||||
|
||||
class CapaMatchData:
|
||||
def __init__(
|
||||
self,
|
||||
namespace,
|
||||
scope,
|
||||
capability,
|
||||
matches,
|
||||
attack: List[Dict[Any, Any]],
|
||||
mbc: List[Dict[Any, Any]],
|
||||
):
|
||||
self.namespace = namespace
|
||||
self.scope = scope
|
||||
self.capability = capability
|
||||
self.matches = matches
|
||||
self.attack = attack
|
||||
self.mbc = mbc
|
||||
|
||||
def bookmark_functions(self):
|
||||
"""create bookmarks for MITRE ATT&CK & MBC mappings"""
|
||||
|
||||
if self.attack == [] and self.mbc == []:
|
||||
return
|
||||
|
||||
for key in self.matches.keys():
|
||||
addr = toAddr(hex(key)) # type: ignore [name-defined] # noqa: F821
|
||||
func = getFunctionContaining(addr) # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
# bookmark & tag MITRE ATT&CK tactics & MBC @ function scope
|
||||
if func is not None:
|
||||
func_addr = func.getEntryPoint()
|
||||
|
||||
if self.attack != []:
|
||||
for item in self.attack:
|
||||
attack_txt = ""
|
||||
for part in item.get("parts", {}):
|
||||
attack_txt = attack_txt + part + Namespace.DELIMITER
|
||||
attack_txt = attack_txt + item.get("id", {})
|
||||
add_bookmark(func_addr, attack_txt, "CapaExplorer::MITRE ATT&CK")
|
||||
|
||||
if self.mbc != []:
|
||||
for item in self.mbc:
|
||||
mbc_txt = ""
|
||||
for part in item.get("parts", {}):
|
||||
mbc_txt = mbc_txt + part + Namespace.DELIMITER
|
||||
mbc_txt = mbc_txt + item.get("id", {})
|
||||
add_bookmark(func_addr, mbc_txt, "CapaExplorer::MBC")
|
||||
|
||||
def set_plate_comment(self, ghidra_addr):
|
||||
"""set plate comments at matched functions"""
|
||||
comment = getPlateComment(ghidra_addr) # type: ignore [name-defined] # noqa: F821
|
||||
rule_path = self.namespace.replace(Namespace.DELIMITER, "/")
|
||||
# 2 calls to avoid duplicate comments via subsequent script runs
|
||||
if comment is None:
|
||||
# first comment @ function
|
||||
comment = rule_path + "\n"
|
||||
setPlateComment(ghidra_addr, comment) # type: ignore [name-defined] # noqa: F821
|
||||
elif rule_path not in comment:
|
||||
comment = comment + rule_path + "\n"
|
||||
setPlateComment(ghidra_addr, comment) # type: ignore [name-defined] # noqa: F821
|
||||
else:
|
||||
return
|
||||
|
||||
def set_pre_comment(self, ghidra_addr, sub_type, description):
|
||||
"""set pre comments at subscoped matches of main rules"""
|
||||
comment = getPreComment(ghidra_addr) # type: ignore [name-defined] # noqa: F821
|
||||
if comment is None:
|
||||
comment = "capa: " + sub_type + "(" + description + ")" + ' matched in "' + self.capability + '"\n'
|
||||
setPreComment(ghidra_addr, comment) # type: ignore [name-defined] # noqa: F821
|
||||
elif self.capability not in comment:
|
||||
comment = (
|
||||
comment + "capa: " + sub_type + "(" + description + ")" + ' matched in "' + self.capability + '"\n'
|
||||
)
|
||||
setPreComment(ghidra_addr, comment) # type: ignore [name-defined] # noqa: F821
|
||||
else:
|
||||
return
|
||||
|
||||
def label_matches(self):
|
||||
"""label findings at function scopes and comment on subscope matches"""
|
||||
capa_namespace = create_namespace(self.namespace)
|
||||
symbol_table = currentProgram().getSymbolTable() # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
# handle function main scope of matched rule
|
||||
# these will typically contain further matches within
|
||||
if self.scope == "function":
|
||||
for addr in self.matches.keys():
|
||||
ghidra_addr = toAddr(hex(addr)) # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
# classify new function label under capa-generated namespace
|
||||
sym = symbol_table.getPrimarySymbol(ghidra_addr)
|
||||
if sym is not None:
|
||||
if sym.getSymbolType() == SymbolType.FUNCTION:
|
||||
create_label(ghidra_addr, sym.getName(), capa_namespace)
|
||||
self.set_plate_comment(ghidra_addr)
|
||||
|
||||
# parse the corresponding nodes, and pre-comment subscope matched features
|
||||
# under the encompassing function(s)
|
||||
for sub_match in self.matches.get(addr):
|
||||
for loc, node in sub_match.items():
|
||||
sub_ghidra_addr = toAddr(hex(loc)) # type: ignore [name-defined] # noqa: F821
|
||||
if sub_ghidra_addr == ghidra_addr:
|
||||
# skip duplicates
|
||||
continue
|
||||
|
||||
# precomment subscope matches under the function
|
||||
if node != {}:
|
||||
for sub_type, description in parse_node(node):
|
||||
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
|
||||
else:
|
||||
# resolve the encompassing function for the capa namespace
|
||||
# of non-function scoped main matches
|
||||
for addr in self.matches.keys():
|
||||
ghidra_addr = toAddr(hex(addr)) # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
# basic block / insn scoped main matches
|
||||
# Ex. See "Create Process on Windows" Rule
|
||||
func = getFunctionContaining(ghidra_addr) # type: ignore [name-defined] # noqa: F821
|
||||
if func is not None:
|
||||
func_addr = func.getEntryPoint()
|
||||
create_label(func_addr, func.getName(), capa_namespace)
|
||||
self.set_plate_comment(func_addr)
|
||||
|
||||
# create subscope match precomments
|
||||
for sub_match in self.matches.get(addr):
|
||||
for loc, node in sub_match.items():
|
||||
sub_ghidra_addr = toAddr(hex(loc)) # type: ignore [name-defined] # noqa: F821
|
||||
|
||||
if node != {}:
|
||||
if func is not None:
|
||||
# basic block/ insn scope under resolved function
|
||||
for sub_type, description in parse_node(node):
|
||||
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
|
||||
else:
|
||||
# this would be a global/file scoped main match
|
||||
# try to resolve the encompassing function via the subscope match, instead
|
||||
# Ex. "run as service" rule
|
||||
sub_func = getFunctionContaining(sub_ghidra_addr) # type: ignore [name-defined] # noqa: F821
|
||||
if sub_func is not None:
|
||||
sub_func_addr = sub_func.getEntryPoint()
|
||||
# place function in capa namespace & create the subscope match label in Ghidra's global namespace
|
||||
create_label(sub_func_addr, sub_func.getName(), capa_namespace)
|
||||
self.set_plate_comment(sub_func_addr)
|
||||
for sub_type, description in parse_node(node):
|
||||
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
|
||||
else:
|
||||
# addr is in some other file section like .data
|
||||
# represent this location with a label symbol under the capa namespace
|
||||
# Ex. See "Reference Base64 String" rule
|
||||
for sub_type, description in parse_node(node):
|
||||
# in many cases, these will be ghidra-labeled data, so just add the existing
|
||||
# label symbol to the capa namespace
|
||||
for sym in symbol_table.getSymbols(sub_ghidra_addr):
|
||||
if sym.getSymbolType() == SymbolType.LABEL:
|
||||
sym.setNamespace(capa_namespace)
|
||||
self.set_pre_comment(sub_ghidra_addr, sub_type, description)
|
||||
|
||||
|
||||
def get_capabilities():
|
||||
rules_dir: str = ""
|
||||
try:
|
||||
selected_dir = askDirectory("Choose capa rules directory", "Ok") # type: ignore [name-defined] # noqa: F821
|
||||
if selected_dir:
|
||||
rules_dir = selected_dir.getPath()
|
||||
except RuntimeError:
|
||||
# RuntimeError thrown when user selects "Cancel"
|
||||
pass
|
||||
|
||||
if not rules_dir:
|
||||
logger.info("You must choose a capa rules directory before running capa.")
|
||||
return "" # return empty str to avoid handling both int and str types
|
||||
|
||||
rules_path: pathlib.Path = pathlib.Path(rules_dir)
|
||||
logger.info("running capa using rules from %s", str(rules_path))
|
||||
|
||||
rules = capa.rules.get_rules([rules_path])
|
||||
meta = capa.ghidra.helpers.collect_metadata([rules_path])
|
||||
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
|
||||
|
||||
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True)
|
||||
|
||||
if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False):
|
||||
popup("capa explorer encountered warnings during analysis. Please check the console output for more information.") # type: ignore [name-defined] # noqa: F821
|
||||
logger.info("capa encountered warnings during analysis")
|
||||
|
||||
return capa.render.json.render(meta, rules, capabilities)
|
||||
|
||||
|
||||
def get_locations(match_dict):
|
||||
"""recursively collect match addresses and associated nodes"""
|
||||
|
||||
for loc in match_dict.get("locations", {}):
|
||||
# either an rva (absolute)
|
||||
# or an offset into a file (file)
|
||||
if loc.get("type", "") in ("absolute", "file"):
|
||||
yield loc.get("value"), match_dict.get("node")
|
||||
|
||||
for child in match_dict.get("children", {}):
|
||||
yield from get_locations(child)
|
||||
|
||||
|
||||
def parse_node(node_data):
|
||||
"""pull match descriptions and sub features by parsing node dicts"""
|
||||
|
||||
node = node_data.get(node_data.get("type"))
|
||||
|
||||
if "description" in node:
|
||||
yield "description", node.get("description")
|
||||
|
||||
data = node.get(node.get("type"))
|
||||
if isinstance(data, (str, int)):
|
||||
feat_type = node.get("type")
|
||||
if isinstance(data, int):
|
||||
data = hex(data)
|
||||
yield feat_type, data
|
||||
|
||||
|
||||
def parse_json(capa_data):
|
||||
"""Parse json produced by capa"""
|
||||
|
||||
for rule, capability in capa_data.get("rules", {}).items():
|
||||
# structure to contain rule match address & supporting feature data
|
||||
# {rule match addr:[{feature addr:{node_data}}]}
|
||||
rule_matches: Dict[Any, List[Any]] = {}
|
||||
for i in range(len(capability.get("matches"))):
|
||||
# grab rule match location
|
||||
match_loc = capability.get("matches")[i][0].get("value")
|
||||
if match_loc is None:
|
||||
# Ex. See "Reference Base64 string"
|
||||
# {'type':'no address'}
|
||||
match_loc = i
|
||||
rule_matches[match_loc] = []
|
||||
|
||||
# grab extracted feature locations & corresponding node data
|
||||
# feature[0]: location
|
||||
# feature[1]: node
|
||||
features = capability.get("matches")[i][1]
|
||||
feat_dict = {}
|
||||
for feature in get_locations(features):
|
||||
feat_dict[feature[0]] = feature[1]
|
||||
rule_matches[match_loc].append(feat_dict)
|
||||
|
||||
# dict data of currently matched rule
|
||||
meta = capability["meta"]
|
||||
|
||||
# get MITRE ATT&CK and MBC
|
||||
attack = meta.get("attack")
|
||||
if attack is None:
|
||||
attack = []
|
||||
mbc = meta.get("mbc")
|
||||
if mbc is None:
|
||||
mbc = []
|
||||
|
||||
# scope match for the rule
|
||||
scope = meta["scopes"].get("static")
|
||||
|
||||
fmt_rule = Namespace.DELIMITER + rule.replace(" ", "-")
|
||||
if "namespace" in meta:
|
||||
# split into list to help define child namespaces
|
||||
# this requires the correct delimiter used by Ghidra
|
||||
# Ex. 'communication/named-pipe/create/create pipe' -> capa::communication::named-pipe::create::create-pipe
|
||||
namespace_str = Namespace.DELIMITER.join(meta["namespace"].split("/"))
|
||||
namespace = "capa" + Namespace.DELIMITER + namespace_str + fmt_rule
|
||||
else:
|
||||
# lib rules via the official rules repo will not contain data
|
||||
# for the "namespaces" key, so format using rule itself
|
||||
# Ex. 'contain loop' -> capa::lib::contain-loop
|
||||
namespace = "capa" + Namespace.DELIMITER + "lib" + fmt_rule
|
||||
|
||||
yield CapaMatchData(namespace, scope, rule, rule_matches, attack, mbc)
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
if isRunningHeadless(): # type: ignore [name-defined] # noqa: F821
|
||||
logger.error("unsupported Ghidra execution mode")
|
||||
return capa.main.E_UNSUPPORTED_GHIDRA_EXECUTION_MODE
|
||||
|
||||
if not capa.ghidra.helpers.is_supported_ghidra_version():
|
||||
logger.error("unsupported Ghidra version")
|
||||
return capa.main.E_UNSUPPORTED_GHIDRA_VERSION
|
||||
|
||||
if not capa.ghidra.helpers.is_supported_file_type():
|
||||
logger.error("unsupported file type")
|
||||
return capa.main.E_INVALID_FILE_TYPE
|
||||
|
||||
if not capa.ghidra.helpers.is_supported_arch_type():
|
||||
logger.error("unsupported file architecture")
|
||||
return capa.main.E_INVALID_FILE_ARCH
|
||||
|
||||
# capa_data will always contain {'meta':..., 'rules':...}
|
||||
# if the 'rules' key contains no values, then there were no matches
|
||||
capa_data = json.loads(get_capabilities())
|
||||
if capa_data.get("rules") is None:
|
||||
logger.info("capa explorer found no matches")
|
||||
popup("capa explorer found no matches.") # type: ignore [name-defined] # noqa: F821
|
||||
return capa.main.E_EMPTY_REPORT
|
||||
|
||||
for item in parse_json(capa_data):
|
||||
item.bookmark_functions()
|
||||
item.label_matches()
|
||||
logger.info("capa explorer analysis complete")
|
||||
popup("capa explorer analysis complete.\nPlease see results in the Bookmarks Window and Namespaces section of the Symbol Tree Window.") # type: ignore [name-defined] # noqa: F821
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if sys.version_info < (3, 8):
|
||||
from capa.exceptions import UnsupportedRuntimeError
|
||||
|
||||
raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.8+")
|
||||
exit_code = main()
|
||||
if exit_code != 0:
|
||||
popup("capa explorer encountered errors during analysis. Please check the console output for more information.") # type: ignore [name-defined] # noqa: F821
|
||||
sys.exit(exit_code)
|
||||
@@ -69,7 +69,7 @@ def run_headless():
|
||||
rules_path = pathlib.Path(args.rules)
|
||||
|
||||
logger.debug("rule path: %s", rules_path)
|
||||
rules = capa.main.get_rules([rules_path])
|
||||
rules = capa.rules.get_rules([rules_path])
|
||||
|
||||
meta = capa.ghidra.helpers.collect_metadata([rules_path])
|
||||
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
|
||||
@@ -78,7 +78,7 @@ def run_headless():
|
||||
|
||||
meta.analysis.feature_counts = counts["feature_counts"]
|
||||
meta.analysis.library_functions = counts["library_functions"]
|
||||
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=True):
|
||||
logger.info("capa encountered warnings during analysis")
|
||||
@@ -119,7 +119,7 @@ def run_ui():
|
||||
rules_path: pathlib.Path = pathlib.Path(rules_dir)
|
||||
logger.info("running capa using rules from %s", str(rules_path))
|
||||
|
||||
rules = capa.main.get_rules([rules_path])
|
||||
rules = capa.rules.get_rules([rules_path])
|
||||
|
||||
meta = capa.ghidra.helpers.collect_metadata([rules_path])
|
||||
extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor()
|
||||
@@ -128,7 +128,7 @@ def run_ui():
|
||||
|
||||
meta.analysis.feature_counts = counts["feature_counts"]
|
||||
meta.analysis.library_functions = counts["library_functions"]
|
||||
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False):
|
||||
logger.info("capa encountered warnings during analysis")
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import sys
|
||||
import json
|
||||
import inspect
|
||||
import logging
|
||||
@@ -16,12 +17,22 @@ from pathlib import Path
|
||||
import tqdm
|
||||
|
||||
from capa.exceptions import UnsupportedFormatError
|
||||
from capa.features.common import FORMAT_PE, FORMAT_CAPE, FORMAT_SC32, FORMAT_SC64, FORMAT_DOTNET, FORMAT_UNKNOWN, Format
|
||||
from capa.features.common import (
|
||||
FORMAT_PE,
|
||||
FORMAT_CAPE,
|
||||
FORMAT_SC32,
|
||||
FORMAT_SC64,
|
||||
FORMAT_DOTNET,
|
||||
FORMAT_FREEZE,
|
||||
FORMAT_UNKNOWN,
|
||||
Format,
|
||||
)
|
||||
|
||||
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
|
||||
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
|
||||
EXTENSIONS_DYNAMIC = ("json", "json_")
|
||||
EXTENSIONS_ELF = "elf_"
|
||||
EXTENSIONS_FREEZE = "frz"
|
||||
|
||||
logger = logging.getLogger("capa")
|
||||
|
||||
@@ -81,6 +92,8 @@ def get_format_from_extension(sample: Path) -> str:
|
||||
format_ = FORMAT_SC64
|
||||
elif sample.name.endswith(EXTENSIONS_DYNAMIC):
|
||||
format_ = get_format_from_report(sample)
|
||||
elif sample.name.endswith(EXTENSIONS_FREEZE):
|
||||
format_ = FORMAT_FREEZE
|
||||
return format_
|
||||
|
||||
|
||||
@@ -201,3 +214,16 @@ def log_unsupported_runtime_error():
|
||||
" If you're seeing this message on the command line, please ensure you're running a supported Python version."
|
||||
)
|
||||
logger.error("-" * 80)
|
||||
|
||||
|
||||
def is_running_standalone() -> bool:
|
||||
"""
|
||||
are we running from a PyInstaller'd executable?
|
||||
if so, then we'll be able to access `sys._MEIPASS` for the packaged resources.
|
||||
"""
|
||||
# typically we only expect capa.main to be packaged via PyInstaller.
|
||||
# therefore, this *should* be in capa.main; however,
|
||||
# the Binary Ninja extractor uses this to resolve the BN API code,
|
||||
# so we keep this in a common area.
|
||||
# generally, other library code should not use this function.
|
||||
return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS")
|
||||
|
||||
@@ -636,7 +636,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
if ida_kernwin.user_cancelled():
|
||||
raise UserCancelledError("user cancelled")
|
||||
|
||||
return capa.main.get_rules([rule_path], on_load_rule=on_load_rule)
|
||||
return capa.rules.get_rules([rule_path], on_load_rule=on_load_rule)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return None
|
||||
@@ -775,7 +775,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
meta.analysis.feature_counts = counts["feature_counts"]
|
||||
meta.analysis.library_functions = counts["library_functions"]
|
||||
meta.analysis.layout = capa.main.compute_layout(ruleset, self.feature_extractor, capabilities)
|
||||
meta.analysis.layout = capa.loader.compute_layout(ruleset, self.feature_extractor, capabilities)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
@@ -932,9 +932,9 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
update_wait_box("verifying cached results")
|
||||
|
||||
try:
|
||||
results: Optional[
|
||||
capa.render.result_document.ResultDocument
|
||||
] = capa.ida.helpers.load_and_verify_cached_results()
|
||||
results: Optional[capa.render.result_document.ResultDocument] = (
|
||||
capa.ida.helpers.load_and_verify_cached_results()
|
||||
)
|
||||
except Exception as e:
|
||||
capa.ida.helpers.inform_user_ida_ui("Failed to verify cached results, reanalyzing program")
|
||||
logger.exception("Failed to verify cached results (error: %s)", e)
|
||||
@@ -1073,9 +1073,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
self.view_rulegen_features.load_features(all_file_features, all_function_features)
|
||||
|
||||
self.set_view_status_label(
|
||||
f"capa rules: {settings.user[CAPA_SETTINGS_RULE_PATH]} ({settings.user[CAPA_SETTINGS_RULE_PATH]} rules)"
|
||||
)
|
||||
self.set_view_status_label(f"capa rules: {settings.user[CAPA_SETTINGS_RULE_PATH]}")
|
||||
except Exception as e:
|
||||
logger.exception("Failed to render views (error: %s)", e)
|
||||
return False
|
||||
@@ -1324,10 +1322,17 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
idaapi.info("No rule to save.")
|
||||
return
|
||||
|
||||
path = Path(self.ask_user_capa_rule_file())
|
||||
if not path.exists():
|
||||
rule_file_path = self.ask_user_capa_rule_file()
|
||||
if not rule_file_path:
|
||||
# dialog canceled
|
||||
return
|
||||
|
||||
path = Path(rule_file_path)
|
||||
if not path.parent.exists():
|
||||
logger.warning("Failed to save file: parent directory '%s' does not exist.", path.parent)
|
||||
return
|
||||
|
||||
logger.info("Saving rule to %s.", path)
|
||||
write_file(path, s)
|
||||
|
||||
def slot_checkbox_limit_by_changed(self, state):
|
||||
|
||||
@@ -200,9 +200,11 @@ class CapaExplorerRulegenPreview(QtWidgets.QTextEdit):
|
||||
" references:",
|
||||
" - <insert_references>",
|
||||
" examples:",
|
||||
f" - {capa.ida.helpers.get_file_md5().upper()}:{hex(ea)}"
|
||||
if ea
|
||||
else f" - {capa.ida.helpers.get_file_md5().upper()}",
|
||||
(
|
||||
f" - {capa.ida.helpers.get_file_md5().upper()}:{hex(ea)}"
|
||||
if ea
|
||||
else f" - {capa.ida.helpers.get_file_md5().upper()}"
|
||||
),
|
||||
" features:",
|
||||
]
|
||||
self.setText("\n".join(metadata_default))
|
||||
|
||||
544
capa/loader.py
Normal file
544
capa/loader.py
Normal file
@@ -0,0 +1,544 @@
|
||||
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at: [package root]/LICENSE.txt
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
import sys
|
||||
import json
|
||||
import logging
|
||||
import datetime
|
||||
from typing import Set, Dict, List, Optional
|
||||
from pathlib import Path
|
||||
|
||||
import halo
|
||||
from typing_extensions import assert_never
|
||||
|
||||
import capa.perf
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.helpers
|
||||
import capa.version
|
||||
import capa.render.json
|
||||
import capa.rules.cache
|
||||
import capa.render.default
|
||||
import capa.render.verbose
|
||||
import capa.features.common
|
||||
import capa.features.freeze as frz
|
||||
import capa.render.vverbose
|
||||
import capa.features.extractors
|
||||
import capa.render.result_document
|
||||
import capa.render.result_document as rdoc
|
||||
import capa.features.extractors.common
|
||||
import capa.features.extractors.pefile
|
||||
import capa.features.extractors.elffile
|
||||
import capa.features.extractors.dotnetfile
|
||||
import capa.features.extractors.base_extractor
|
||||
import capa.features.extractors.cape.extractor
|
||||
from capa.rules import RuleSet
|
||||
from capa.engine import MatchResults
|
||||
from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError
|
||||
from capa.features.common import (
|
||||
OS_AUTO,
|
||||
FORMAT_PE,
|
||||
FORMAT_ELF,
|
||||
FORMAT_AUTO,
|
||||
FORMAT_CAPE,
|
||||
FORMAT_SC32,
|
||||
FORMAT_SC64,
|
||||
FORMAT_DOTNET,
|
||||
)
|
||||
from capa.features.address import Address
|
||||
from capa.features.extractors.base_extractor import (
|
||||
SampleHashes,
|
||||
FeatureExtractor,
|
||||
StaticFeatureExtractor,
|
||||
DynamicFeatureExtractor,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BACKEND_VIV = "vivisect"
|
||||
BACKEND_DOTNET = "dotnet"
|
||||
BACKEND_BINJA = "binja"
|
||||
BACKEND_PEFILE = "pefile"
|
||||
BACKEND_CAPE = "cape"
|
||||
BACKEND_FREEZE = "freeze"
|
||||
|
||||
|
||||
def is_supported_format(sample: Path) -> bool:
|
||||
"""
|
||||
Return if this is a supported file based on magic header values
|
||||
"""
|
||||
taste = sample.open("rb").read(0x100)
|
||||
|
||||
return len(list(capa.features.extractors.common.extract_format(taste))) == 1
|
||||
|
||||
|
||||
def is_supported_arch(sample: Path) -> bool:
|
||||
buf = sample.read_bytes()
|
||||
|
||||
return len(list(capa.features.extractors.common.extract_arch(buf))) == 1
|
||||
|
||||
|
||||
def get_arch(sample: Path) -> str:
|
||||
buf = sample.read_bytes()
|
||||
|
||||
for feature, _ in capa.features.extractors.common.extract_arch(buf):
|
||||
assert isinstance(feature.value, str)
|
||||
return feature.value
|
||||
|
||||
return "unknown"
|
||||
|
||||
|
||||
def is_supported_os(sample: Path) -> bool:
|
||||
buf = sample.read_bytes()
|
||||
|
||||
return len(list(capa.features.extractors.common.extract_os(buf))) == 1
|
||||
|
||||
|
||||
def get_os(sample: Path) -> str:
|
||||
buf = sample.read_bytes()
|
||||
|
||||
for feature, _ in capa.features.extractors.common.extract_os(buf):
|
||||
assert isinstance(feature.value, str)
|
||||
return feature.value
|
||||
|
||||
return "unknown"
|
||||
|
||||
|
||||
def get_meta_str(vw):
|
||||
"""
|
||||
Return workspace meta information string
|
||||
"""
|
||||
meta = []
|
||||
for k in ["Format", "Platform", "Architecture"]:
|
||||
if k in vw.metadata:
|
||||
meta.append(f"{k.lower()}: {vw.metadata[k]}")
|
||||
return f"{', '.join(meta)}, number of functions: {len(vw.getFunctions())}"
|
||||
|
||||
|
||||
def get_workspace(path: Path, input_format: str, sigpaths: List[Path]):
|
||||
"""
|
||||
load the program at the given path into a vivisect workspace using the given format.
|
||||
also apply the given FLIRT signatures.
|
||||
|
||||
supported formats:
|
||||
- pe
|
||||
- elf
|
||||
- shellcode 32-bit
|
||||
- shellcode 64-bit
|
||||
- auto
|
||||
|
||||
this creates and analyzes the workspace; however, it does *not* save the workspace.
|
||||
this is the responsibility of the caller.
|
||||
"""
|
||||
|
||||
# lazy import enables us to not require viv if user wants another backend.
|
||||
import viv_utils
|
||||
import viv_utils.flirt
|
||||
|
||||
logger.debug("generating vivisect workspace for: %s", path)
|
||||
if input_format == FORMAT_AUTO:
|
||||
if not is_supported_format(path):
|
||||
raise UnsupportedFormatError()
|
||||
|
||||
# don't analyze, so that we can add our Flirt function analyzer first.
|
||||
vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
|
||||
elif input_format in {FORMAT_PE, FORMAT_ELF}:
|
||||
vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
|
||||
elif input_format == FORMAT_SC32:
|
||||
# these are not analyzed nor saved.
|
||||
vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="i386", analyze=False)
|
||||
elif input_format == FORMAT_SC64:
|
||||
vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="amd64", analyze=False)
|
||||
else:
|
||||
raise ValueError("unexpected format: " + input_format)
|
||||
|
||||
viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths])
|
||||
|
||||
vw.analyze()
|
||||
|
||||
logger.debug("%s", get_meta_str(vw))
|
||||
return vw
|
||||
|
||||
|
||||
def get_extractor(
|
||||
input_path: Path,
|
||||
input_format: str,
|
||||
os_: str,
|
||||
backend: str,
|
||||
sigpaths: List[Path],
|
||||
should_save_workspace=False,
|
||||
disable_progress=False,
|
||||
sample_path: Optional[Path] = None,
|
||||
) -> FeatureExtractor:
|
||||
"""
|
||||
raises:
|
||||
UnsupportedFormatError
|
||||
UnsupportedArchError
|
||||
UnsupportedOSError
|
||||
"""
|
||||
if backend == BACKEND_CAPE:
|
||||
import capa.features.extractors.cape.extractor
|
||||
|
||||
report = json.loads(input_path.read_text(encoding="utf-8"))
|
||||
return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)
|
||||
|
||||
elif backend == BACKEND_DOTNET:
|
||||
import capa.features.extractors.dnfile.extractor
|
||||
|
||||
if input_format not in (FORMAT_PE, FORMAT_DOTNET):
|
||||
raise UnsupportedFormatError()
|
||||
|
||||
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(input_path)
|
||||
|
||||
elif backend == BACKEND_BINJA:
|
||||
import capa.helpers
|
||||
from capa.features.extractors.binja.find_binja_api import find_binja_path
|
||||
|
||||
# When we are running as a standalone executable, we cannot directly import binaryninja
|
||||
# We need to fist find the binja API installation path and add it into sys.path
|
||||
if capa.helpers.is_running_standalone():
|
||||
bn_api = find_binja_path()
|
||||
if bn_api.exists():
|
||||
sys.path.append(str(bn_api))
|
||||
|
||||
try:
|
||||
import binaryninja
|
||||
from binaryninja import BinaryView
|
||||
except ImportError:
|
||||
raise RuntimeError(
|
||||
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: "
|
||||
+ "https://docs.binary.ninja/dev/batch.html#install-the-api)."
|
||||
)
|
||||
|
||||
import capa.features.extractors.binja.extractor
|
||||
|
||||
if input_format not in (FORMAT_SC32, FORMAT_SC64):
|
||||
if not is_supported_format(input_path):
|
||||
raise UnsupportedFormatError()
|
||||
|
||||
if not is_supported_arch(input_path):
|
||||
raise UnsupportedArchError()
|
||||
|
||||
if os_ == OS_AUTO and not is_supported_os(input_path):
|
||||
raise UnsupportedOSError()
|
||||
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
bv: BinaryView = binaryninja.load(str(input_path))
|
||||
if bv is None:
|
||||
raise RuntimeError(f"Binary Ninja cannot open file {input_path}")
|
||||
|
||||
return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv)
|
||||
|
||||
elif backend == BACKEND_PEFILE:
|
||||
import capa.features.extractors.pefile
|
||||
|
||||
return capa.features.extractors.pefile.PefileFeatureExtractor(input_path)
|
||||
|
||||
elif backend == BACKEND_VIV:
|
||||
import capa.features.extractors.viv.extractor
|
||||
|
||||
if input_format not in (FORMAT_SC32, FORMAT_SC64):
|
||||
if not is_supported_format(input_path):
|
||||
raise UnsupportedFormatError()
|
||||
|
||||
if not is_supported_arch(input_path):
|
||||
raise UnsupportedArchError()
|
||||
|
||||
if os_ == OS_AUTO and not is_supported_os(input_path):
|
||||
raise UnsupportedOSError()
|
||||
|
||||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
|
||||
vw = get_workspace(input_path, input_format, sigpaths)
|
||||
|
||||
if should_save_workspace:
|
||||
logger.debug("saving workspace")
|
||||
try:
|
||||
vw.saveWorkspace()
|
||||
except IOError:
|
||||
# see #168 for discussion around how to handle non-writable directories
|
||||
logger.info("source directory is not writable, won't save intermediate workspace")
|
||||
else:
|
||||
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")
|
||||
|
||||
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_)
|
||||
|
||||
elif backend == BACKEND_FREEZE:
|
||||
return frz.load(input_path.read_bytes())
|
||||
|
||||
else:
|
||||
raise ValueError("unexpected backend: " + backend)
|
||||
|
||||
|
||||
def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtractor]:
|
||||
file_extractors: List[FeatureExtractor] = []
|
||||
|
||||
if input_format == FORMAT_PE:
|
||||
file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file))
|
||||
|
||||
elif input_format == FORMAT_DOTNET:
|
||||
file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file))
|
||||
file_extractors.append(capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(input_file))
|
||||
|
||||
elif input_format == FORMAT_ELF:
|
||||
file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(input_file))
|
||||
|
||||
elif input_format == FORMAT_CAPE:
|
||||
report = json.loads(input_file.read_text(encoding="utf-8"))
|
||||
file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report))
|
||||
|
||||
return file_extractors
|
||||
|
||||
|
||||
def get_signatures(sigs_path: Path) -> List[Path]:
|
||||
if not sigs_path.exists():
|
||||
raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed")
|
||||
|
||||
paths: List[Path] = []
|
||||
if sigs_path.is_file():
|
||||
paths.append(sigs_path)
|
||||
elif sigs_path.is_dir():
|
||||
logger.debug("reading signatures from directory %s", sigs_path.resolve())
|
||||
for file in sigs_path.rglob("*"):
|
||||
if file.is_file() and file.suffix.lower() in (".pat", ".pat.gz", ".sig"):
|
||||
paths.append(file)
|
||||
|
||||
# Convert paths to their absolute and normalized forms
|
||||
paths = [path.resolve().absolute() for path in paths]
|
||||
|
||||
# load signatures in deterministic order: the alphabetic sorting of filename.
|
||||
# this means that `0_sigs.pat` loads before `1_sigs.pat`.
|
||||
paths = sorted(paths, key=lambda path: path.name)
|
||||
|
||||
for path in paths:
|
||||
logger.debug("found signature file: %s", path)
|
||||
|
||||
return paths
|
||||
|
||||
|
||||
def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts):
|
||||
if isinstance(extractor, StaticFeatureExtractor):
|
||||
return rdoc.StaticAnalysis(
|
||||
format=format_,
|
||||
arch=arch,
|
||||
os=os_,
|
||||
extractor=extractor.__class__.__name__,
|
||||
rules=tuple(rules_path),
|
||||
base_address=frz.Address.from_capa(extractor.get_base_address()),
|
||||
layout=rdoc.StaticLayout(
|
||||
functions=(),
|
||||
# this is updated after capabilities have been collected.
|
||||
# will look like:
|
||||
#
|
||||
# "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... }
|
||||
),
|
||||
feature_counts=counts["feature_counts"],
|
||||
library_functions=counts["library_functions"],
|
||||
)
|
||||
elif isinstance(extractor, DynamicFeatureExtractor):
|
||||
return rdoc.DynamicAnalysis(
|
||||
format=format_,
|
||||
arch=arch,
|
||||
os=os_,
|
||||
extractor=extractor.__class__.__name__,
|
||||
rules=tuple(rules_path),
|
||||
layout=rdoc.DynamicLayout(
|
||||
processes=(),
|
||||
),
|
||||
feature_counts=counts["feature_counts"],
|
||||
)
|
||||
else:
|
||||
raise ValueError("invalid extractor type")
|
||||
|
||||
|
||||
def collect_metadata(
|
||||
argv: List[str],
|
||||
input_path: Path,
|
||||
input_format: str,
|
||||
os_: str,
|
||||
rules_path: List[Path],
|
||||
extractor: FeatureExtractor,
|
||||
counts: dict,
|
||||
) -> rdoc.Metadata:
|
||||
# if it's a binary sample we hash it, if it's a report
|
||||
# we fetch the hashes from the report
|
||||
sample_hashes: SampleHashes = extractor.get_sample_hashes()
|
||||
md5, sha1, sha256 = sample_hashes.md5, sample_hashes.sha1, sample_hashes.sha256
|
||||
|
||||
global_feats = list(extractor.extract_global_features())
|
||||
extractor_format = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Format)]
|
||||
extractor_arch = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Arch)]
|
||||
extractor_os = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.OS)]
|
||||
|
||||
input_format = (
|
||||
str(extractor_format[0]) if extractor_format else "unknown" if input_format == FORMAT_AUTO else input_format
|
||||
)
|
||||
arch = str(extractor_arch[0]) if extractor_arch else "unknown"
|
||||
os_ = str(extractor_os[0]) if extractor_os else "unknown" if os_ == OS_AUTO else os_
|
||||
|
||||
if isinstance(extractor, StaticFeatureExtractor):
|
||||
meta_class: type = rdoc.StaticMetadata
|
||||
elif isinstance(extractor, DynamicFeatureExtractor):
|
||||
meta_class = rdoc.DynamicMetadata
|
||||
else:
|
||||
assert_never(extractor)
|
||||
|
||||
rules = tuple(r.resolve().absolute().as_posix() for r in rules_path)
|
||||
|
||||
return meta_class(
|
||||
timestamp=datetime.datetime.now(),
|
||||
version=capa.version.__version__,
|
||||
argv=tuple(argv) if argv else None,
|
||||
sample=rdoc.Sample(
|
||||
md5=md5,
|
||||
sha1=sha1,
|
||||
sha256=sha256,
|
||||
path=input_path.resolve().as_posix(),
|
||||
),
|
||||
analysis=get_sample_analysis(
|
||||
input_format,
|
||||
arch,
|
||||
os_,
|
||||
extractor,
|
||||
rules,
|
||||
counts,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def compute_dynamic_layout(
|
||||
rules: RuleSet, extractor: DynamicFeatureExtractor, capabilities: MatchResults
|
||||
) -> rdoc.DynamicLayout:
|
||||
"""
|
||||
compute a metadata structure that links threads
|
||||
to the processes in which they're found.
|
||||
|
||||
only collect the threads at which some rule matched.
|
||||
otherwise, we may pollute the json document with
|
||||
a large amount of un-referenced data.
|
||||
"""
|
||||
assert isinstance(extractor, DynamicFeatureExtractor)
|
||||
|
||||
matched_calls: Set[Address] = set()
|
||||
|
||||
def result_rec(result: capa.features.common.Result):
|
||||
for loc in result.locations:
|
||||
if isinstance(loc, capa.features.address.DynamicCallAddress):
|
||||
matched_calls.add(loc)
|
||||
for child in result.children:
|
||||
result_rec(child)
|
||||
|
||||
for matches in capabilities.values():
|
||||
for _, result in matches:
|
||||
result_rec(result)
|
||||
|
||||
names_by_process: Dict[Address, str] = {}
|
||||
names_by_call: Dict[Address, str] = {}
|
||||
|
||||
matched_processes: Set[Address] = set()
|
||||
matched_threads: Set[Address] = set()
|
||||
|
||||
threads_by_process: Dict[Address, List[Address]] = {}
|
||||
calls_by_thread: Dict[Address, List[Address]] = {}
|
||||
|
||||
for p in extractor.get_processes():
|
||||
threads_by_process[p.address] = []
|
||||
|
||||
for t in extractor.get_threads(p):
|
||||
calls_by_thread[t.address] = []
|
||||
|
||||
for c in extractor.get_calls(p, t):
|
||||
if c.address in matched_calls:
|
||||
names_by_call[c.address] = extractor.get_call_name(p, t, c)
|
||||
calls_by_thread[t.address].append(c.address)
|
||||
|
||||
if calls_by_thread[t.address]:
|
||||
matched_threads.add(t.address)
|
||||
threads_by_process[p.address].append(t.address)
|
||||
|
||||
if threads_by_process[p.address]:
|
||||
matched_processes.add(p.address)
|
||||
names_by_process[p.address] = extractor.get_process_name(p)
|
||||
|
||||
layout = rdoc.DynamicLayout(
|
||||
processes=tuple(
|
||||
rdoc.ProcessLayout(
|
||||
address=frz.Address.from_capa(p),
|
||||
name=names_by_process[p],
|
||||
matched_threads=tuple(
|
||||
rdoc.ThreadLayout(
|
||||
address=frz.Address.from_capa(t),
|
||||
matched_calls=tuple(
|
||||
rdoc.CallLayout(
|
||||
address=frz.Address.from_capa(c),
|
||||
name=names_by_call[c],
|
||||
)
|
||||
for c in calls_by_thread[t]
|
||||
if c in matched_calls
|
||||
),
|
||||
)
|
||||
for t in threads
|
||||
if t in matched_threads
|
||||
), # this object is open to extension in the future,
|
||||
# such as with the function name, etc.
|
||||
)
|
||||
for p, threads in threads_by_process.items()
|
||||
if p in matched_processes
|
||||
)
|
||||
)
|
||||
|
||||
return layout
|
||||
|
||||
|
||||
def compute_static_layout(rules: RuleSet, extractor: StaticFeatureExtractor, capabilities) -> rdoc.StaticLayout:
|
||||
"""
|
||||
compute a metadata structure that links basic blocks
|
||||
to the functions in which they're found.
|
||||
|
||||
only collect the basic blocks at which some rule matched.
|
||||
otherwise, we may pollute the json document with
|
||||
a large amount of un-referenced data.
|
||||
"""
|
||||
functions_by_bb: Dict[Address, Address] = {}
|
||||
bbs_by_function: Dict[Address, List[Address]] = {}
|
||||
for f in extractor.get_functions():
|
||||
bbs_by_function[f.address] = []
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
functions_by_bb[bb.address] = f.address
|
||||
bbs_by_function[f.address].append(bb.address)
|
||||
|
||||
matched_bbs = set()
|
||||
for rule_name, matches in capabilities.items():
|
||||
rule = rules[rule_name]
|
||||
if capa.rules.Scope.BASIC_BLOCK in rule.scopes:
|
||||
for addr, _ in matches:
|
||||
assert addr in functions_by_bb
|
||||
matched_bbs.add(addr)
|
||||
|
||||
layout = rdoc.StaticLayout(
|
||||
functions=tuple(
|
||||
rdoc.FunctionLayout(
|
||||
address=frz.Address.from_capa(f),
|
||||
matched_basic_blocks=tuple(
|
||||
rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs
|
||||
), # this object is open to extension in the future,
|
||||
# such as with the function name, etc.
|
||||
)
|
||||
for f, bbs in bbs_by_function.items()
|
||||
if len([bb for bb in bbs if bb in matched_bbs]) > 0
|
||||
)
|
||||
)
|
||||
|
||||
return layout
|
||||
|
||||
|
||||
def compute_layout(rules: RuleSet, extractor, capabilities) -> rdoc.Layout:
|
||||
if isinstance(extractor, StaticFeatureExtractor):
|
||||
return compute_static_layout(rules, extractor, capabilities)
|
||||
elif isinstance(extractor, DynamicFeatureExtractor):
|
||||
return compute_dynamic_layout(rules, extractor, capabilities)
|
||||
else:
|
||||
raise ValueError("extractor must be either a static or dynamic extracotr")
|
||||
1171
capa/main.py
1171
capa/main.py
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,7 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package mandiant.capa;
|
||||
|
||||
message APIFeature {
|
||||
string type = 1;
|
||||
string api = 2;
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -160,8 +160,7 @@ class CompoundStatementType:
|
||||
OPTIONAL = "optional"
|
||||
|
||||
|
||||
class StatementModel(FrozenModel):
|
||||
...
|
||||
class StatementModel(FrozenModel): ...
|
||||
|
||||
|
||||
class CompoundStatement(StatementModel):
|
||||
@@ -650,9 +649,9 @@ class ResultDocument(FrozenModel):
|
||||
return ResultDocument(meta=meta, rules=rule_matches)
|
||||
|
||||
def to_capa(self) -> Tuple[Metadata, Dict]:
|
||||
capabilities: Dict[
|
||||
str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]
|
||||
] = collections.defaultdict(list)
|
||||
capabilities: Dict[str, List[Tuple[capa.features.address.Address, capa.features.common.Result]]] = (
|
||||
collections.defaultdict(list)
|
||||
)
|
||||
|
||||
# this doesn't quite work because we don't have the rule source for rules that aren't matched.
|
||||
rules_by_name = {
|
||||
|
||||
@@ -22,6 +22,7 @@ Unless required by applicable law or agreed to in writing, software distributed
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
|
||||
from typing import cast
|
||||
|
||||
import tabulate
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
import codecs
|
||||
@@ -25,7 +26,7 @@ except ImportError:
|
||||
# https://github.com/python/mypy/issues/1153
|
||||
from backports.functools_lru_cache import lru_cache # type: ignore
|
||||
|
||||
from typing import Any, Set, Dict, List, Tuple, Union, Iterator, Optional
|
||||
from typing import Any, Set, Dict, List, Tuple, Union, Callable, Iterator, Optional
|
||||
from dataclasses import asdict, dataclass
|
||||
|
||||
import yaml
|
||||
@@ -1691,3 +1692,105 @@ class RuleSet:
|
||||
matches.update(hard_matches)
|
||||
|
||||
return (features3, matches)
|
||||
|
||||
|
||||
def is_nursery_rule_path(path: Path) -> bool:
|
||||
"""
|
||||
The nursery is a spot for rules that have not yet been fully polished.
|
||||
For example, they may not have references to public example of a technique.
|
||||
Yet, we still want to capture and report on their matches.
|
||||
The nursery is currently a subdirectory of the rules directory with that name.
|
||||
|
||||
When nursery rules are loaded, their metadata section should be updated with:
|
||||
`nursery=True`.
|
||||
"""
|
||||
return "nursery" in path.parts
|
||||
|
||||
|
||||
def collect_rule_file_paths(rule_paths: List[Path]) -> List[Path]:
|
||||
"""
|
||||
collect all rule file paths, including those in subdirectories.
|
||||
"""
|
||||
rule_file_paths = []
|
||||
for rule_path in rule_paths:
|
||||
if not rule_path.exists():
|
||||
raise IOError(f"rule path {rule_path} does not exist or cannot be accessed")
|
||||
|
||||
if rule_path.is_file():
|
||||
rule_file_paths.append(rule_path)
|
||||
elif rule_path.is_dir():
|
||||
logger.debug("reading rules from directory %s", rule_path)
|
||||
for root, _, files in os.walk(rule_path):
|
||||
if ".git" in root:
|
||||
# the .github directory contains CI config in capa-rules
|
||||
# this includes some .yml files
|
||||
# these are not rules
|
||||
# additionally, .git has files that are not .yml and generate the warning
|
||||
# skip those too
|
||||
continue
|
||||
for file in files:
|
||||
if not file.endswith(".yml"):
|
||||
if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))):
|
||||
# expect to see .git* files, readme.md, format.md, and maybe a .git directory
|
||||
# other things maybe are rules, but are mis-named.
|
||||
logger.warning("skipping non-.yml file: %s", file)
|
||||
continue
|
||||
rule_file_paths.append(Path(root) / file)
|
||||
return rule_file_paths
|
||||
|
||||
|
||||
# TypeAlias. note: using `foo: TypeAlias = bar` is Python 3.10+
|
||||
RulePath = Path
|
||||
|
||||
|
||||
def on_load_rule_default(_path: RulePath, i: int, _total: int) -> None:
|
||||
return
|
||||
|
||||
|
||||
def get_rules(
|
||||
rule_paths: List[RulePath],
|
||||
cache_dir=None,
|
||||
on_load_rule: Callable[[RulePath, int, int], None] = on_load_rule_default,
|
||||
) -> RuleSet:
|
||||
"""
|
||||
args:
|
||||
rule_paths: list of paths to rules files or directories containing rules files
|
||||
cache_dir: directory to use for caching rules, or will use the default detected cache directory if None
|
||||
on_load_rule: callback to invoke before a rule is loaded, use for progress or cancellation
|
||||
"""
|
||||
if cache_dir is None:
|
||||
cache_dir = capa.rules.cache.get_default_cache_directory()
|
||||
# rule_paths may contain directory paths,
|
||||
# so search for file paths recursively.
|
||||
rule_file_paths = collect_rule_file_paths(rule_paths)
|
||||
|
||||
# this list is parallel to `rule_file_paths`:
|
||||
# rule_file_paths[i] corresponds to rule_contents[i].
|
||||
rule_contents = [file_path.read_bytes() for file_path in rule_file_paths]
|
||||
|
||||
ruleset = capa.rules.cache.load_cached_ruleset(cache_dir, rule_contents)
|
||||
if ruleset is not None:
|
||||
return ruleset
|
||||
|
||||
rules: List[Rule] = []
|
||||
|
||||
total_rule_count = len(rule_file_paths)
|
||||
for i, (path, content) in enumerate(zip(rule_file_paths, rule_contents)):
|
||||
on_load_rule(path, i, total_rule_count)
|
||||
|
||||
try:
|
||||
rule = capa.rules.Rule.from_yaml(content.decode("utf-8"))
|
||||
except capa.rules.InvalidRule:
|
||||
raise
|
||||
else:
|
||||
rule.meta["capa/path"] = path.as_posix()
|
||||
rule.meta["capa/nursery"] = is_nursery_rule_path(path)
|
||||
|
||||
rules.append(rule)
|
||||
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes)
|
||||
|
||||
ruleset = capa.rules.RuleSet(rules)
|
||||
|
||||
capa.rules.cache.cache_ruleset(cache_dir, ruleset)
|
||||
|
||||
return ruleset
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
__version__ = "7.0.0-beta"
|
||||
__version__ = "7.0.1"
|
||||
|
||||
|
||||
def get_major_version():
|
||||
|
||||
@@ -63,12 +63,12 @@ namespaces = false
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"pre-commit==3.5.0",
|
||||
"pytest==7.4.4",
|
||||
"pytest==8.0.0",
|
||||
"pytest-sugar==0.9.7",
|
||||
"pytest-instafail==0.5.0",
|
||||
"pytest-cov==4.1.0",
|
||||
"flake8==7.0.0",
|
||||
"flake8-bugbear==23.12.2",
|
||||
"flake8-bugbear==24.1.17",
|
||||
"flake8-encodings==0.5.1",
|
||||
"flake8-comprehensions==3.14.0",
|
||||
"flake8-logging-format==0.9.0",
|
||||
@@ -79,7 +79,7 @@ dev = [
|
||||
"flake8-use-pathlib==0.3.0",
|
||||
"flake8-copyright==0.2.4",
|
||||
"ruff==0.1.14",
|
||||
"black==23.12.1",
|
||||
"black==24.1.1",
|
||||
"isort==5.13.2",
|
||||
"mypy==1.8.0",
|
||||
"psutil==5.9.2",
|
||||
@@ -93,7 +93,7 @@ dev = [
|
||||
"types-tabulate==0.9.0.20240106",
|
||||
"types-termcolor==1.1.4",
|
||||
"types-psutil==5.8.23",
|
||||
"types_requests==2.31.0.20240106",
|
||||
"types_requests==2.31.0.20240125",
|
||||
"types-protobuf==4.23.0.3",
|
||||
]
|
||||
build = [
|
||||
|
||||
@@ -36,7 +36,7 @@ example:
|
||||
usage:
|
||||
|
||||
usage: bulk-process.py [-h] [-r RULES] [-d] [-q] [-n PARALLELISM] [--no-mp]
|
||||
input
|
||||
input_directory
|
||||
|
||||
detect capabilities in programs.
|
||||
|
||||
@@ -62,7 +62,6 @@ Unless required by applicable law or agreed to in writing, software distributed
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import logging
|
||||
@@ -74,10 +73,10 @@ from pathlib import Path
|
||||
import capa
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.loader
|
||||
import capa.render.json
|
||||
import capa.capabilities.common
|
||||
import capa.render.result_document as rd
|
||||
from capa.features.common import OS_AUTO
|
||||
|
||||
logger = logging.getLogger("capa")
|
||||
|
||||
@@ -87,11 +86,8 @@ def get_capa_results(args):
|
||||
run capa against the file at the given path, using the given rules.
|
||||
|
||||
args is a tuple, containing:
|
||||
rules (capa.rules.RuleSet): the rules to match
|
||||
signatures (List[str]): list of file system paths to signature files
|
||||
format (str): the name of the sample file format
|
||||
os (str): the name of the operating system
|
||||
path (str): the file system path to the sample to process
|
||||
rules, signatures, format, backend, os, input_file
|
||||
as provided via the CLI arguments.
|
||||
|
||||
args is a tuple because i'm not quite sure how to unpack multiple arguments using `map`.
|
||||
|
||||
@@ -106,44 +102,58 @@ def get_capa_results(args):
|
||||
meta (dict): the meta analysis results
|
||||
capabilities (dict): the matched capabilities and their result objects
|
||||
"""
|
||||
rules, sigpaths, format, os_, path = args
|
||||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
||||
logger.info("computing capa results for: %s", path)
|
||||
rules, signatures, format_, backend, os_, input_file = args
|
||||
|
||||
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
|
||||
capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os", "backend", "input_file"})
|
||||
argv = [
|
||||
"--signatures",
|
||||
signatures,
|
||||
"--format",
|
||||
format_,
|
||||
"--backend",
|
||||
backend,
|
||||
"--os",
|
||||
os_,
|
||||
input_file,
|
||||
]
|
||||
if rules:
|
||||
argv += ["--rules", rules]
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
try:
|
||||
extractor = capa.main.get_extractor(
|
||||
path, format, os_, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True
|
||||
)
|
||||
except capa.exceptions.UnsupportedFormatError:
|
||||
# i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries.
|
||||
capa.main.handle_common_args(args)
|
||||
capa.main.ensure_input_exists_from_cli(args)
|
||||
input_format = capa.main.get_input_format_from_cli(args)
|
||||
rules = capa.main.get_rules_from_cli(args)
|
||||
backend = capa.main.get_backend_from_cli(args, input_format)
|
||||
sample_path = capa.main.get_sample_path_from_cli(args, backend)
|
||||
if sample_path is None:
|
||||
os_ = "unknown"
|
||||
else:
|
||||
os_ = capa.loader.get_os(sample_path)
|
||||
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
|
||||
except capa.main.ShouldExitError as e:
|
||||
# i'm not 100% sure if multiprocessing will reliably raise exceptions across process boundaries.
|
||||
# so instead, return an object with explicit success/failure status.
|
||||
#
|
||||
# if success, then status=ok, and results found in property "ok"
|
||||
# if error, then status=error, and human readable message in property "error"
|
||||
return {
|
||||
"path": path,
|
||||
"status": "error",
|
||||
"error": f"input file does not appear to be a PE file: {path}",
|
||||
}
|
||||
except capa.exceptions.UnsupportedRuntimeError:
|
||||
return {
|
||||
"path": path,
|
||||
"status": "error",
|
||||
"error": "unsupported runtime or Python interpreter",
|
||||
}
|
||||
return {"path": input_file, "status": "error", "error": str(e), "status_code": e.status_code}
|
||||
except Exception as e:
|
||||
return {
|
||||
"path": path,
|
||||
"path": input_file,
|
||||
"status": "error",
|
||||
"error": f"unexpected error: {e}",
|
||||
}
|
||||
|
||||
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True)
|
||||
|
||||
meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts)
|
||||
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
meta = capa.loader.collect_metadata(argv, args.input_file, format_, os_, [], extractor, counts)
|
||||
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
|
||||
return {"path": path, "status": "ok", "ok": doc.model_dump()}
|
||||
return {"path": input_file, "status": "ok", "ok": doc.model_dump()}
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
@@ -151,30 +161,16 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
|
||||
capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os"})
|
||||
parser.add_argument("input", type=str, help="Path to directory of files to recursively analyze")
|
||||
capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os", "backend"})
|
||||
parser.add_argument("input_directory", type=str, help="Path to directory of files to recursively analyze")
|
||||
parser.add_argument(
|
||||
"-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor"
|
||||
)
|
||||
parser.add_argument("--no-mp", action="store_true", help="disable subprocesses")
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
try:
|
||||
rules = capa.main.get_rules(args.rules)
|
||||
logger.info("successfully loaded %s rules", len(rules))
|
||||
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
try:
|
||||
sig_paths = capa.main.get_signatures(args.signatures)
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
samples = []
|
||||
for file in Path(args.input).rglob("*"):
|
||||
for file in Path(args.input_directory).rglob("*"):
|
||||
samples.append(file)
|
||||
|
||||
cpu_count = multiprocessing.cpu_count()
|
||||
@@ -203,18 +199,22 @@ def main(argv=None):
|
||||
logger.debug("using process mapper")
|
||||
mapper = pmap
|
||||
|
||||
rules = args.rules
|
||||
if rules == [capa.main.RULES_PATH_DEFAULT_STRING]:
|
||||
rules = None
|
||||
|
||||
results = {}
|
||||
for result in mapper(
|
||||
get_capa_results,
|
||||
[(rules, sig_paths, "pe", OS_AUTO, sample) for sample in samples],
|
||||
[(rules, args.signatures, args.format, args.backend, args.os, str(sample)) for sample in samples],
|
||||
parallelism=args.parallelism,
|
||||
):
|
||||
if result["status"] == "error":
|
||||
logger.warning(result["error"])
|
||||
elif result["status"] == "ok":
|
||||
results[result["path"].as_posix()] = rd.ResultDocument.model_validate(result["ok"]).model_dump_json(
|
||||
exclude_none=True
|
||||
)
|
||||
doc = rd.ResultDocument.model_validate(result["ok"]).model_dump_json(exclude_none=True)
|
||||
results[result["path"]] = json.loads(doc)
|
||||
|
||||
else:
|
||||
raise ValueError(f"unexpected status: {result['status']}")
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ Unless required by applicable law or agreed to in writing, software distributed
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
@@ -36,20 +37,27 @@ def main(argv=None):
|
||||
|
||||
parser = argparse.ArgumentParser(description="Cache ruleset.")
|
||||
capa.main.install_common_args(parser)
|
||||
parser.add_argument("rules", type=str, action="append", help="Path to rules")
|
||||
parser.add_argument("rules", type=str, help="Path to rules directory")
|
||||
parser.add_argument("cache", type=str, help="Path to cache directory")
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
if args.debug:
|
||||
logging.getLogger("capa").setLevel(logging.DEBUG)
|
||||
# don't use capa.main.handle_common_args
|
||||
# because it expects a different format for the --rules argument
|
||||
|
||||
if args.quiet:
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logging.getLogger().setLevel(logging.WARNING)
|
||||
elif args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
else:
|
||||
logging.getLogger("capa").setLevel(logging.ERROR)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
try:
|
||||
cache_dir = Path(args.cache)
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
rules = capa.main.get_rules(args.rules, cache_dir)
|
||||
rules = capa.rules.get_rules([Path(args.rules)], cache_dir)
|
||||
logger.info("successfully loaded %s rules", len(rules))
|
||||
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
|
||||
logger.error("%s", str(e))
|
||||
|
||||
@@ -723,36 +723,33 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Capa to YARA rule converter")
|
||||
parser.add_argument("rules", type=str, help="Path to rules")
|
||||
parser.add_argument("--private", "-p", action="store_true", help="Create private rules", default=False)
|
||||
capa.main.install_common_args(parser, wanted={"tag"})
|
||||
|
||||
parser.add_argument("--private", "-p", action="store_true", help="Create private rules", default=False)
|
||||
parser.add_argument("rules", type=str, help="Path to rules directory")
|
||||
args = parser.parse_args(args=argv)
|
||||
make_priv = args.private
|
||||
|
||||
if args.verbose:
|
||||
level = logging.DEBUG
|
||||
elif args.quiet:
|
||||
level = logging.ERROR
|
||||
# don't use capa.main.handle_common_args
|
||||
# because it expects a different format for the --rules argument
|
||||
|
||||
if args.quiet:
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logging.getLogger().setLevel(logging.WARNING)
|
||||
elif args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
else:
|
||||
level = logging.INFO
|
||||
|
||||
logging.basicConfig(level=level)
|
||||
logging.getLogger("capa2yara").setLevel(level)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
try:
|
||||
rules = capa.main.get_rules([Path(args.rules)])
|
||||
namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values()))
|
||||
logger.info("successfully loaded %d rules (including subscope rules which will be ignored)", len(rules))
|
||||
if args.tag:
|
||||
rules = rules.filter_rules_by_meta(args.tag)
|
||||
logger.debug("selected %d rules", len(rules))
|
||||
for i, r in enumerate(rules.rules, 1):
|
||||
logger.debug(" %d. %s", i, r)
|
||||
rules = capa.rules.get_rules([Path(args.rules)])
|
||||
logger.info("successfully loaded %s rules", len(rules))
|
||||
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
namespaces = capa.rules.index_rules_by_namespace(list(rules.rules.values()))
|
||||
|
||||
output_yar(
|
||||
"// Rules from Mandiant's https://github.com/mandiant/capa-rules converted to YARA using https://github.com/mandiant/capa/blob/master/scripts/capa2yara.py by Arnim Rupp"
|
||||
)
|
||||
@@ -780,10 +777,10 @@ def main(argv=None):
|
||||
cround += 1
|
||||
logger.info("doing convert_rules(), round: %d", cround)
|
||||
num_rules = len(converted_rules)
|
||||
count_incomplete += convert_rules(rules, namespaces, cround, make_priv)
|
||||
count_incomplete += convert_rules(rules, namespaces, cround, args.private)
|
||||
|
||||
# one last round to collect all unconverted rules
|
||||
count_incomplete += convert_rules(rules, namespaces, 9000, make_priv)
|
||||
count_incomplete += convert_rules(rules, namespaces, 9000, args.private)
|
||||
|
||||
stats = "\n// converted rules : " + str(len(converted_rules))
|
||||
stats += "\n// among those are incomplete : " + str(count_incomplete)
|
||||
|
||||
@@ -15,6 +15,7 @@ from pathlib import Path
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.loader
|
||||
import capa.features
|
||||
import capa.render.json
|
||||
import capa.render.utils as rutils
|
||||
@@ -168,19 +169,19 @@ def render_dictionary(doc: rd.ResultDocument) -> Dict[str, Any]:
|
||||
|
||||
|
||||
# ==== render dictionary helpers
|
||||
def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"):
|
||||
def capa_details(rules_path: Path, input_file: Path, output_format="dictionary"):
|
||||
# load rules from disk
|
||||
rules = capa.main.get_rules([rules_path])
|
||||
rules = capa.rules.get_rules([rules_path])
|
||||
|
||||
# extract features and find capabilities
|
||||
extractor = capa.main.get_extractor(
|
||||
file_path, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True
|
||||
extractor = capa.loader.get_extractor(
|
||||
input_file, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], should_save_workspace=False, disable_progress=True
|
||||
)
|
||||
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True)
|
||||
|
||||
# collect metadata (used only to make rendering more complete)
|
||||
meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts)
|
||||
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
meta = capa.loader.collect_metadata([], input_file, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts)
|
||||
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
capa_output: Any = False
|
||||
|
||||
@@ -206,7 +207,7 @@ if __name__ == "__main__":
|
||||
RULES_PATH = capa.main.get_default_root() / "rules"
|
||||
|
||||
parser = argparse.ArgumentParser(description="Extract capabilities from a file")
|
||||
parser.add_argument("file", help="file to extract capabilities from")
|
||||
parser.add_argument("input_file", help="file to extract capabilities from")
|
||||
parser.add_argument("--rules", help="path to rules directory", default=RULES_PATH)
|
||||
parser.add_argument(
|
||||
"--output", help="output format", choices=["dictionary", "json", "texttable"], default="dictionary"
|
||||
@@ -214,5 +215,5 @@ if __name__ == "__main__":
|
||||
args = parser.parse_args()
|
||||
if args.rules != RULES_PATH:
|
||||
args.rules = Path(args.rules)
|
||||
print(capa_details(args.rules, Path(args.file), args.output))
|
||||
print(capa_details(args.rules, Path(args.input_file), args.output))
|
||||
sys.exit(0)
|
||||
|
||||
@@ -14,11 +14,13 @@ Unless required by applicable law or agreed to in writing, software distributed
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
import capa.main
|
||||
import capa.rules
|
||||
|
||||
logger = logging.getLogger("capafmt")
|
||||
@@ -29,6 +31,7 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Capa rule formatter.")
|
||||
capa.main.install_common_args(parser)
|
||||
parser.add_argument("path", type=str, help="Path to rule to format")
|
||||
parser.add_argument(
|
||||
"-i",
|
||||
@@ -37,8 +40,6 @@ def main(argv=None):
|
||||
dest="in_place",
|
||||
help="Format the rule in place, otherwise, write formatted rule to STDOUT",
|
||||
)
|
||||
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging")
|
||||
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--check",
|
||||
@@ -47,15 +48,10 @@ def main(argv=None):
|
||||
)
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
if args.verbose:
|
||||
level = logging.DEBUG
|
||||
elif args.quiet:
|
||||
level = logging.ERROR
|
||||
else:
|
||||
level = logging.INFO
|
||||
|
||||
logging.basicConfig(level=level)
|
||||
logging.getLogger("capafmt").setLevel(level)
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
rule = capa.rules.Rule.from_yaml_file(args.path, use_ruamel=True)
|
||||
reformatted_rule = rule.to_yaml()
|
||||
|
||||
@@ -17,8 +17,8 @@ import logging
|
||||
import argparse
|
||||
import contextlib
|
||||
from typing import BinaryIO
|
||||
from pathlib import Path
|
||||
|
||||
import capa.main
|
||||
import capa.helpers
|
||||
import capa.features.extractors.elf
|
||||
|
||||
@@ -36,28 +36,16 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Detect the underlying OS for the given ELF file")
|
||||
parser.add_argument("sample", type=str, help="path to ELF file")
|
||||
|
||||
logging_group = parser.add_argument_group("logging arguments")
|
||||
|
||||
logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
|
||||
logging_group.add_argument(
|
||||
"-q", "--quiet", action="store_true", help="disable all status output except fatal errors"
|
||||
)
|
||||
|
||||
capa.main.install_common_args(parser, wanted={"input_file"})
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
if args.quiet:
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logging.getLogger().setLevel(logging.WARNING)
|
||||
elif args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
capa.main.ensure_input_exists_from_cli(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
f = Path(args.sample).open("rb")
|
||||
f = args.input_file.open("rb")
|
||||
|
||||
with contextlib.closing(f):
|
||||
try:
|
||||
|
||||
@@ -48,7 +48,7 @@ def find_overlapping_rules(new_rule_path, rules_path):
|
||||
overlapping_rules = []
|
||||
|
||||
# capa.rules.RuleSet stores all rules in given paths
|
||||
ruleset = capa.main.get_rules(rules_path)
|
||||
ruleset = capa.rules.get_rules(rules_path)
|
||||
|
||||
for rule_name, rule in ruleset.rules.items():
|
||||
rule_features = rule.extract_all_features()
|
||||
|
||||
@@ -28,6 +28,7 @@ Unless required by applicable law or agreed to in writing, software distributed
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import binascii
|
||||
from pathlib import Path
|
||||
|
||||
@@ -13,6 +13,7 @@ Unless required by applicable law or agreed to in writing, software distributed
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
|
||||
import gc
|
||||
import os
|
||||
import re
|
||||
@@ -39,6 +40,7 @@ import tqdm.contrib.logging
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.loader
|
||||
import capa.helpers
|
||||
import capa.features.insn
|
||||
import capa.capabilities.common
|
||||
@@ -307,9 +309,8 @@ class InvalidAttckOrMbcTechnique(Lint):
|
||||
with data_path.open("rb") as fd:
|
||||
self.data = json.load(fd)
|
||||
self.enabled_frameworks = self.data.keys()
|
||||
except BaseException:
|
||||
# If linter-data.json is not present, or if an error happen
|
||||
# we log an error and lint nothing.
|
||||
except (FileNotFoundError, json.decoder.JSONDecodeError):
|
||||
# linter-data.json missing, or JSON error: log an error and skip this lint
|
||||
logger.warning(
|
||||
"Could not load 'scripts/linter-data.json'. The att&ck and mbc information will not be linted."
|
||||
)
|
||||
@@ -355,16 +356,20 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]:
|
||||
logger.debug("found cached results: %s: %d capabilities", nice_path, len(ctx.capabilities_by_sample[path]))
|
||||
return ctx.capabilities_by_sample[path]
|
||||
|
||||
if nice_path.name.endswith(capa.helpers.EXTENSIONS_SHELLCODE_32):
|
||||
format_ = "sc32"
|
||||
elif nice_path.name.endswith(capa.helpers.EXTENSIONS_SHELLCODE_64):
|
||||
format_ = "sc64"
|
||||
else:
|
||||
format_ = capa.helpers.get_auto_format(nice_path)
|
||||
|
||||
logger.debug("analyzing sample: %s", nice_path)
|
||||
extractor = capa.main.get_extractor(
|
||||
nice_path, format_, OS_AUTO, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True
|
||||
|
||||
args = argparse.Namespace(input_file=nice_path, format=capa.main.FORMAT_AUTO, backend=capa.main.BACKEND_AUTO)
|
||||
format_ = capa.main.get_input_format_from_cli(args)
|
||||
backend = capa.main.get_backend_from_cli(args, format_)
|
||||
|
||||
extractor = capa.loader.get_extractor(
|
||||
nice_path,
|
||||
format_,
|
||||
OS_AUTO,
|
||||
backend,
|
||||
DEFAULT_SIGNATURES,
|
||||
should_save_workspace=False,
|
||||
disable_progress=True,
|
||||
)
|
||||
|
||||
capabilities, _ = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True)
|
||||
@@ -649,16 +654,6 @@ class FeatureNtdllNtoskrnlApi(Lint):
|
||||
return False
|
||||
|
||||
|
||||
class FormatLineFeedEOL(Lint):
|
||||
name = "line(s) end with CRLF (\\r\\n)"
|
||||
recommendation = "convert line endings to LF (\\n) for example using dos2unix"
|
||||
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
if len(rule.definition.split("\r\n")) > 0:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class FormatSingleEmptyLineEOF(Lint):
|
||||
name = "EOF format"
|
||||
recommendation = "end file with a single empty line"
|
||||
@@ -674,16 +669,14 @@ class FormatIncorrect(Lint):
|
||||
recommendation_template = "use scripts/capafmt.py or adjust as follows\n{:s}"
|
||||
|
||||
def check_rule(self, ctx: Context, rule: Rule):
|
||||
actual = rule.definition
|
||||
# EOL depends on Git and our .gitattributes defines text=auto (Git handles files it thinks is best)
|
||||
# we prefer LF only, but enforcing across OSs seems tedious and unnecessary
|
||||
actual = rule.definition.replace("\r\n", "\n")
|
||||
expected = capa.rules.Rule.from_yaml(rule.definition, use_ruamel=True).to_yaml()
|
||||
|
||||
if actual != expected:
|
||||
diff = difflib.ndiff(actual.splitlines(1), expected.splitlines(True))
|
||||
recommendation_template = self.recommendation_template
|
||||
if "\r\n" in actual:
|
||||
recommendation_template = (
|
||||
self.recommendation_template + "\nplease make sure that the file uses LF (\\n) line endings only"
|
||||
)
|
||||
self.recommendation = recommendation_template.format("".join(diff))
|
||||
return True
|
||||
|
||||
@@ -797,7 +790,6 @@ def lint_features(ctx: Context, rule: Rule):
|
||||
|
||||
|
||||
FORMAT_LINTS = (
|
||||
FormatLineFeedEOL(),
|
||||
FormatSingleEmptyLineEOF(),
|
||||
FormatStringQuotesIncorrect(),
|
||||
FormatIncorrect(),
|
||||
@@ -990,7 +982,11 @@ def main(argv=None):
|
||||
help="Enable thorough linting - takes more time, but does a better job",
|
||||
)
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
if args.debug:
|
||||
logging.getLogger("capa").setLevel(logging.DEBUG)
|
||||
@@ -1002,16 +998,9 @@ def main(argv=None):
|
||||
time0 = time.time()
|
||||
|
||||
try:
|
||||
rules = capa.main.get_rules(args.rules)
|
||||
logger.info("successfully loaded %s rules", rules.source_rule_count)
|
||||
if args.tag:
|
||||
rules = rules.filter_rules_by_meta(args.tag)
|
||||
logger.debug("selected %s rules", len(rules))
|
||||
for i, r in enumerate(rules.rules, 1):
|
||||
logger.debug(" %d. %s", i, r)
|
||||
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
rules = capa.main.get_rules_from_cli(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
logger.info("collecting potentially referenced samples")
|
||||
samples_path = Path(args.samples)
|
||||
|
||||
@@ -62,6 +62,7 @@ import capa.engine
|
||||
import capa.helpers
|
||||
import capa.features
|
||||
import capa.features.freeze
|
||||
from capa.loader import BACKEND_VIV
|
||||
|
||||
logger = logging.getLogger("capa.match-function-id")
|
||||
|
||||
@@ -71,61 +72,53 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="FLIRT match each function")
|
||||
parser.add_argument("sample", type=str, help="Path to sample to analyze")
|
||||
capa.main.install_common_args(parser, wanted={"input_file", "signatures", "format"})
|
||||
parser.add_argument(
|
||||
"-F",
|
||||
"--function",
|
||||
type=lambda x: int(x, 0x10),
|
||||
help="match a specific function by VA, rather than add functions",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--signature",
|
||||
action="append",
|
||||
dest="signatures",
|
||||
type=str,
|
||||
default=[],
|
||||
help="use the given signatures to identify library functions, file system paths to .sig/.pat files.",
|
||||
)
|
||||
parser.add_argument("-d", "--debug", action="store_true", help="Enable debugging output on STDERR")
|
||||
parser.add_argument("-q", "--quiet", action="store_true", help="Disable all output but errors")
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
if args.quiet:
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
logging.getLogger().setLevel(logging.ERROR)
|
||||
elif args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
# disable vivisect-related logging, it's verbose and not relevant for capa users
|
||||
capa.main.set_vivisect_log_level(logging.CRITICAL)
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
capa.main.ensure_input_exists_from_cli(args)
|
||||
input_format = capa.main.get_input_format_from_cli(args)
|
||||
sig_paths = capa.main.get_signatures_from_cli(args, input_format, BACKEND_VIV)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
analyzers = []
|
||||
for sigpath in args.signatures:
|
||||
sigs = viv_utils.flirt.load_flirt_signature(sigpath)
|
||||
for sigpath in sig_paths:
|
||||
sigs = viv_utils.flirt.load_flirt_signature(str(sigpath))
|
||||
|
||||
with capa.main.timing("flirt: compiling sigs"):
|
||||
matcher = flirt.compile(sigs)
|
||||
|
||||
analyzer = viv_utils.flirt.FlirtFunctionAnalyzer(matcher, sigpath)
|
||||
analyzer = viv_utils.flirt.FlirtFunctionAnalyzer(matcher, str(sigpath))
|
||||
logger.debug("registering viv function analyzer: %s", repr(analyzer))
|
||||
analyzers.append(analyzer)
|
||||
|
||||
vw = viv_utils.getWorkspace(args.sample, analyze=True, should_save=False)
|
||||
vw = viv_utils.getWorkspace(str(args.input_file), analyze=True, should_save=False)
|
||||
|
||||
functions = vw.getFunctions()
|
||||
if args.function:
|
||||
functions = [args.function]
|
||||
|
||||
seen = set()
|
||||
for function in functions:
|
||||
logger.debug("matching function: 0x%04x", function)
|
||||
for analyzer in analyzers:
|
||||
name = viv_utils.flirt.match_function_flirt_signatures(analyzer.matcher, vw, function)
|
||||
viv_utils.flirt.match_function_flirt_signatures(analyzer.matcher, vw, function)
|
||||
name = viv_utils.get_function_name(vw, function)
|
||||
if name:
|
||||
print(f"0x{function:04x}: {name}")
|
||||
key = (function, name)
|
||||
if key in seen:
|
||||
continue
|
||||
else:
|
||||
print(f"0x{function:04x}: {name}")
|
||||
seen.add(key)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@@ -41,7 +41,6 @@ import timeit
|
||||
import logging
|
||||
import argparse
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
import tqdm
|
||||
import tabulate
|
||||
@@ -50,6 +49,7 @@ import capa.main
|
||||
import capa.perf
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.loader
|
||||
import capa.helpers
|
||||
import capa.features
|
||||
import capa.features.common
|
||||
@@ -74,42 +74,22 @@ def main(argv=None):
|
||||
label += " (dirty)"
|
||||
|
||||
parser = argparse.ArgumentParser(description="Profile capa performance")
|
||||
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "rules"})
|
||||
|
||||
capa.main.install_common_args(parser, wanted={"format", "os", "input_file", "signatures", "rules"})
|
||||
parser.add_argument("--number", type=int, default=3, help="batch size of profile collection")
|
||||
parser.add_argument("--repeat", type=int, default=30, help="batch count of profile collection")
|
||||
parser.add_argument("--label", type=str, default=label, help="description of the profile collection")
|
||||
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
try:
|
||||
taste = capa.helpers.get_file_taste(Path(args.sample))
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
capa.main.ensure_input_exists_from_cli(args)
|
||||
input_format = capa.main.get_input_format_from_cli(args)
|
||||
backend = capa.main.get_backend_from_cli(args, input_format)
|
||||
with capa.main.timing("load rules"):
|
||||
rules = capa.main.get_rules(args.rules)
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
try:
|
||||
sig_paths = capa.main.get_signatures(args.signatures)
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
if (args.format == "freeze") or (
|
||||
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
|
||||
):
|
||||
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
|
||||
else:
|
||||
extractor = capa.main.get_extractor(
|
||||
args.sample, args.format, args.os, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False
|
||||
)
|
||||
rules = capa.main.get_rules_from_cli(args)
|
||||
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
with tqdm.tqdm(total=args.number * args.repeat, leave=False) as pbar:
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ import logging
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
import capa.main
|
||||
import capa.render.proto
|
||||
import capa.render.result_document
|
||||
|
||||
@@ -44,26 +45,14 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Convert a capa JSON result document into the protobuf format")
|
||||
capa.main.install_common_args(parser)
|
||||
parser.add_argument("json", type=str, help="path to JSON result document file, produced by `capa --json`")
|
||||
|
||||
logging_group = parser.add_argument_group("logging arguments")
|
||||
|
||||
logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
|
||||
logging_group.add_argument(
|
||||
"-q", "--quiet", action="store_true", help="disable all status output except fatal errors"
|
||||
)
|
||||
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
if args.quiet:
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logging.getLogger().setLevel(logging.WARNING)
|
||||
elif args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
rd = capa.render.result_document.ResultDocument.from_file(Path(args.json))
|
||||
pb = capa.render.proto.doc_to_pb2(rd)
|
||||
|
||||
@@ -36,6 +36,7 @@ import logging
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
import capa.main
|
||||
import capa.render.json
|
||||
import capa.render.proto
|
||||
import capa.render.proto.capa_pb2
|
||||
@@ -49,28 +50,16 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Convert a capa protobuf result document into the JSON format")
|
||||
capa.main.install_common_args(parser)
|
||||
parser.add_argument(
|
||||
"pb", type=str, help="path to protobuf result document file, produced by `proto-from-results.py`"
|
||||
)
|
||||
|
||||
logging_group = parser.add_argument_group("logging arguments")
|
||||
|
||||
logging_group.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
|
||||
logging_group.add_argument(
|
||||
"-q", "--quiet", action="store_true", help="disable all status output except fatal errors"
|
||||
)
|
||||
|
||||
args = parser.parse_args(args=argv)
|
||||
|
||||
if args.quiet:
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
logging.getLogger().setLevel(logging.WARNING)
|
||||
elif args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
try:
|
||||
capa.main.handle_common_args(args)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
pb = Path(args.pb).read_bytes()
|
||||
|
||||
|
||||
@@ -178,11 +178,8 @@ def main(args: argparse.Namespace) -> None:
|
||||
data["mbc"] = MbcExtractor().run()
|
||||
|
||||
logging.info("Writing results to %s", args.output)
|
||||
try:
|
||||
with Path(args.output).open("w", encoding="utf-8") as jf:
|
||||
json.dump(data, jf, indent=2)
|
||||
except BaseException as e:
|
||||
logging.error("Exception encountered when writing results: %s", e)
|
||||
with Path(args.output).open("w", encoding="utf-8") as jf:
|
||||
json.dump(data, jf, indent=2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -55,13 +55,11 @@ Unless required by applicable law or agreed to in writing, software distributed
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
import collections
|
||||
from typing import Dict
|
||||
from pathlib import Path
|
||||
|
||||
import colorama
|
||||
|
||||
@@ -76,10 +74,7 @@ import capa.render.verbose
|
||||
import capa.features.freeze
|
||||
import capa.capabilities.common
|
||||
import capa.render.result_document as rd
|
||||
from capa.helpers import get_file_taste
|
||||
from capa.features.common import FORMAT_AUTO
|
||||
from capa.features.freeze import Address
|
||||
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor
|
||||
|
||||
logger = logging.getLogger("capa.show-capabilities-by-function")
|
||||
|
||||
@@ -142,67 +137,37 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="detect capabilities in programs.")
|
||||
capa.main.install_common_args(parser, wanted={"format", "os", "backend", "sample", "signatures", "rules", "tag"})
|
||||
capa.main.install_common_args(
|
||||
parser, wanted={"format", "os", "backend", "input_file", "signatures", "rules", "tag"}
|
||||
)
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
try:
|
||||
taste = get_file_taste(Path(args.sample))
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
try:
|
||||
rules = capa.main.get_rules(args.rules)
|
||||
logger.info("successfully loaded %s rules", len(rules))
|
||||
if args.tag:
|
||||
rules = rules.filter_rules_by_meta(args.tag)
|
||||
logger.info("selected %s rules", len(rules))
|
||||
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
try:
|
||||
sig_paths = capa.main.get_signatures(args.signatures)
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
if (args.format == "freeze") or (args.format == FORMAT_AUTO and capa.features.freeze.is_freeze(taste)):
|
||||
format_ = "freeze"
|
||||
extractor: FeatureExtractor = capa.features.freeze.load(Path(args.sample).read_bytes())
|
||||
else:
|
||||
format_ = args.format
|
||||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
||||
|
||||
try:
|
||||
extractor = capa.main.get_extractor(
|
||||
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
|
||||
)
|
||||
assert isinstance(extractor, StaticFeatureExtractor)
|
||||
except capa.exceptions.UnsupportedFormatError:
|
||||
capa.helpers.log_unsupported_format_error()
|
||||
return -1
|
||||
except capa.exceptions.UnsupportedRuntimeError:
|
||||
capa.helpers.log_unsupported_runtime_error()
|
||||
return -1
|
||||
capa.main.handle_common_args(args)
|
||||
capa.main.ensure_input_exists_from_cli(args)
|
||||
input_format = capa.main.get_input_format_from_cli(args)
|
||||
rules = capa.main.get_rules_from_cli(args)
|
||||
backend = capa.main.get_backend_from_cli(args, input_format)
|
||||
sample_path = capa.main.get_sample_path_from_cli(args, backend)
|
||||
if sample_path is None:
|
||||
os_ = "unknown"
|
||||
else:
|
||||
os_ = capa.loader.get_os(sample_path)
|
||||
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor)
|
||||
|
||||
meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts)
|
||||
meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities)
|
||||
meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts)
|
||||
meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities)
|
||||
|
||||
if capa.capabilities.common.has_file_limitation(rules, capabilities):
|
||||
# bail if capa encountered file limitation e.g. a packed binary
|
||||
# do show the output in verbose mode, though.
|
||||
if not (args.verbose or args.vverbose or args.json):
|
||||
return -1
|
||||
return capa.main.E_FILE_LIMITATION
|
||||
|
||||
# colorama will detect:
|
||||
# - when on Windows console, and fixup coloring, and
|
||||
# - when not an interactive session, and disable coloring
|
||||
# renderers should use coloring and assume it will be stripped out if necessary.
|
||||
colorama.init()
|
||||
doc = rd.ResultDocument.from_capa(meta, rules, capabilities)
|
||||
print(render_matches_by_function(doc))
|
||||
colorama.deinit()
|
||||
|
||||
@@ -64,16 +64,15 @@ Example::
|
||||
insn: 0x10001027: mnemonic(shl)
|
||||
...
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
from typing import Tuple
|
||||
from pathlib import Path
|
||||
|
||||
import capa.main
|
||||
import capa.rules
|
||||
import capa.engine
|
||||
import capa.loader
|
||||
import capa.helpers
|
||||
import capa.features
|
||||
import capa.exceptions
|
||||
@@ -81,17 +80,9 @@ import capa.render.verbose as v
|
||||
import capa.features.freeze
|
||||
import capa.features.address
|
||||
import capa.features.extractors.pefile
|
||||
from capa.helpers import get_auto_format, log_unsupported_runtime_error
|
||||
from capa.helpers import assert_never
|
||||
from capa.features.insn import API, Number
|
||||
from capa.features.common import (
|
||||
FORMAT_AUTO,
|
||||
FORMAT_CAPE,
|
||||
FORMAT_FREEZE,
|
||||
DYNAMIC_FORMATS,
|
||||
String,
|
||||
Feature,
|
||||
is_global_feature,
|
||||
)
|
||||
from capa.features.common import String, Feature, is_global_feature
|
||||
from capa.features.extractors.base_extractor import FunctionHandle, StaticFeatureExtractor, DynamicFeatureExtractor
|
||||
|
||||
logger = logging.getLogger("capa.show-features")
|
||||
@@ -106,56 +97,33 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Show the features that capa extracts from the given sample")
|
||||
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"})
|
||||
capa.main.install_common_args(parser, wanted={"input_file", "format", "os", "signatures", "backend"})
|
||||
|
||||
parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
|
||||
parser.add_argument("-P", "--process", type=str, help="Show features for specific process name")
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
if args.function and args.backend == "pefile":
|
||||
print("pefile backend does not support extracting function features")
|
||||
return -1
|
||||
|
||||
try:
|
||||
_ = capa.helpers.get_file_taste(Path(args.sample))
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
capa.main.handle_common_args(args)
|
||||
capa.main.ensure_input_exists_from_cli(args)
|
||||
|
||||
try:
|
||||
sig_paths = capa.main.get_signatures(args.signatures)
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
format_ = args.format if args.format != FORMAT_AUTO else get_auto_format(args.sample)
|
||||
if format_ == FORMAT_FREEZE:
|
||||
# this should be moved above the previous if clause after implementing
|
||||
# feature freeze for the dynamic analysis flavor
|
||||
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
|
||||
else:
|
||||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
||||
try:
|
||||
extractor = capa.main.get_extractor(
|
||||
args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace
|
||||
)
|
||||
except capa.exceptions.UnsupportedFormatError as e:
|
||||
if format_ == FORMAT_CAPE:
|
||||
capa.helpers.log_unsupported_cape_report_error(str(e))
|
||||
else:
|
||||
capa.helpers.log_unsupported_format_error()
|
||||
return -1
|
||||
except capa.exceptions.UnsupportedRuntimeError:
|
||||
log_unsupported_runtime_error()
|
||||
if args.function and args.backend == "pefile":
|
||||
print("pefile backend does not support extracting function features")
|
||||
return -1
|
||||
|
||||
if format_ in DYNAMIC_FORMATS:
|
||||
assert isinstance(extractor, DynamicFeatureExtractor)
|
||||
input_format = capa.main.get_input_format_from_cli(args)
|
||||
|
||||
backend = capa.main.get_backend_from_cli(args, input_format)
|
||||
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
if isinstance(extractor, DynamicFeatureExtractor):
|
||||
print_dynamic_analysis(extractor, args)
|
||||
else:
|
||||
assert isinstance(extractor, StaticFeatureExtractor)
|
||||
elif isinstance(extractor, StaticFeatureExtractor):
|
||||
print_static_analysis(extractor, args)
|
||||
else:
|
||||
assert_never(extractor)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@@ -8,13 +8,11 @@ Unless required by applicable law or agreed to in writing, software distributed
|
||||
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and limitations under the License.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import typing
|
||||
import logging
|
||||
import argparse
|
||||
from typing import Set, Tuple
|
||||
from pathlib import Path
|
||||
from collections import Counter
|
||||
|
||||
import tabulate
|
||||
@@ -31,8 +29,7 @@ import capa.features.freeze
|
||||
import capa.features.address
|
||||
import capa.features.extractors.pefile
|
||||
import capa.features.extractors.base_extractor
|
||||
from capa.helpers import log_unsupported_runtime_error
|
||||
from capa.features.common import Feature
|
||||
from capa.features.common import FORMAT_FREEZE, Feature
|
||||
from capa.features.extractors.base_extractor import FunctionHandle, StaticFeatureExtractor
|
||||
|
||||
logger = logging.getLogger("show-unused-features")
|
||||
@@ -42,10 +39,9 @@ def format_address(addr: capa.features.address.Address) -> str:
|
||||
return v.format_address(capa.features.freeze.Address.from_capa((addr)))
|
||||
|
||||
|
||||
def get_rules_feature_set(rules_path) -> Set[Feature]:
|
||||
ruleset = capa.main.get_rules(rules_path)
|
||||
def get_rules_feature_set(rules: capa.rules.RuleSet) -> Set[Feature]:
|
||||
rules_feature_set: Set[Feature] = set()
|
||||
for _, rule in ruleset.rules.items():
|
||||
for _, rule in rules.rules.items():
|
||||
rules_feature_set.update(rule.extract_all_features())
|
||||
|
||||
return rules_feature_set
|
||||
@@ -106,44 +102,23 @@ def main(argv=None):
|
||||
argv = sys.argv[1:]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Show the features that capa doesn't have rules for yet")
|
||||
capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend", "rules"})
|
||||
|
||||
capa.main.install_common_args(parser, wanted={"format", "os", "input_file", "signatures", "backend", "rules"})
|
||||
parser.add_argument("-F", "--function", type=str, help="Show features for specific function")
|
||||
args = parser.parse_args(args=argv)
|
||||
capa.main.handle_common_args(args)
|
||||
|
||||
if args.function and args.backend == "pefile":
|
||||
print("pefile backend does not support extracting function features")
|
||||
return -1
|
||||
|
||||
try:
|
||||
taste = capa.helpers.get_file_taste(Path(args.sample))
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
try:
|
||||
sig_paths = capa.main.get_signatures(args.signatures)
|
||||
except IOError as e:
|
||||
logger.error("%s", str(e))
|
||||
return -1
|
||||
|
||||
if (args.format == "freeze") or (
|
||||
args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste)
|
||||
):
|
||||
extractor = capa.features.freeze.load(Path(args.sample).read_bytes())
|
||||
else:
|
||||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)
|
||||
try:
|
||||
extractor = capa.main.get_extractor(
|
||||
args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace
|
||||
)
|
||||
except capa.exceptions.UnsupportedFormatError:
|
||||
capa.helpers.log_unsupported_format_error()
|
||||
return -1
|
||||
except capa.exceptions.UnsupportedRuntimeError:
|
||||
log_unsupported_runtime_error()
|
||||
return -1
|
||||
capa.main.handle_common_args(args)
|
||||
capa.main.ensure_input_exists_from_cli(args)
|
||||
rules = capa.main.get_rules_from_cli(args)
|
||||
input_format = capa.main.get_input_format_from_cli(args)
|
||||
backend = capa.main.get_backend_from_cli(args, input_format)
|
||||
extractor = capa.main.get_extractor_from_cli(args, input_format, backend)
|
||||
except capa.main.ShouldExitError as e:
|
||||
return e.status_code
|
||||
|
||||
assert isinstance(extractor, StaticFeatureExtractor), "only static analysis supported today"
|
||||
|
||||
@@ -159,7 +134,7 @@ def main(argv=None):
|
||||
function_handles = tuple(extractor.get_functions())
|
||||
|
||||
if args.function:
|
||||
if args.format == "freeze":
|
||||
if input_format == FORMAT_FREEZE:
|
||||
function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles))
|
||||
else:
|
||||
function_handles = tuple(filter(lambda fh: format_address(fh.address) == args.function, function_handles))
|
||||
@@ -174,7 +149,7 @@ def main(argv=None):
|
||||
|
||||
feature_map.update(get_file_features(function_handles, extractor))
|
||||
|
||||
rules_feature_set = get_rules_feature_set(args.rules)
|
||||
rules_feature_set = get_rules_feature_set(rules)
|
||||
|
||||
print_unused_features(feature_map, rules_feature_set)
|
||||
return 0
|
||||
@@ -206,7 +181,8 @@ def ida_main():
|
||||
feature_map.update(get_file_features(function_handles, extractor))
|
||||
|
||||
rules_path = capa.main.get_default_root() / "rules"
|
||||
rules_feature_set = get_rules_feature_set([rules_path])
|
||||
rules = capa.rules.get_rules([rules_path])
|
||||
rules_feature_set = get_rules_feature_set(rules)
|
||||
|
||||
print_unused_features(feature_map, rules_feature_set)
|
||||
|
||||
|
||||
@@ -1,69 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
int() {
|
||||
int=$(bc <<< "scale=0; ($1 + 0.5)/1")
|
||||
}
|
||||
|
||||
export TIMEFORMAT='%3R'
|
||||
threshold_time=90
|
||||
threshold_py3_time=60 # Do not warn if it doesn't take at least 1 minute to run
|
||||
rm tests/data/*.viv 2>/dev/null
|
||||
mkdir results
|
||||
for file in tests/data/*
|
||||
do
|
||||
file=$(printf %q "$file") # Handle names with white spaces
|
||||
file_name=$(basename $file)
|
||||
echo $file_name
|
||||
|
||||
rm "$file.viv" 2>/dev/null
|
||||
py3_time=$(sh -c "time python3 scripts/show-features.py $file >> results/p3-$file_name.out 2>/dev/null" 2>&1)
|
||||
rm "$file.viv" 2>/dev/null
|
||||
py2_time=$(sh -c "time python2 scripts/show-features.py $file >> results/p2-$file_name.out 2>/dev/null" 2>&1)
|
||||
|
||||
int $py3_time
|
||||
if (($int > $threshold_py3_time))
|
||||
then
|
||||
percentage=$(bc <<< "scale=3; $py2_time/$py3_time*100 + 0.5")
|
||||
int $percentage
|
||||
if (($int < $threshold_py3_time))
|
||||
then
|
||||
echo -n " SLOWER ($percentage): "
|
||||
fi
|
||||
fi
|
||||
echo " PY2($py2_time) PY3($py3_time)"
|
||||
done
|
||||
|
||||
threshold_features=98
|
||||
counter=0
|
||||
average=0
|
||||
results_for() {
|
||||
py3=$(cat "results/p3-$file_name.out" | grep "$1" | wc -l)
|
||||
py2=$(cat "results/p2-$file_name.out" | grep "$1" | wc -l)
|
||||
if (($py2 > 0))
|
||||
then
|
||||
percentage=$(bc <<< "scale=2; 100*$py3/$py2")
|
||||
average=$(bc <<< "scale=2; $percentage + $average")
|
||||
count=$(($count + 1))
|
||||
int $percentage
|
||||
if (($int < $threshold_features))
|
||||
then
|
||||
echo -e "$1: py2($py2) py3($py3) $percentage% - $file_name"
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
rm tests/data/*.viv 2>/dev/null
|
||||
echo -e '\nRESULTS:'
|
||||
for file in tests/data/*
|
||||
do
|
||||
file_name=$(basename $file)
|
||||
if test -f "results/p2-$file_name.out"; then
|
||||
results_for 'insn'
|
||||
results_for 'file'
|
||||
results_for 'func'
|
||||
results_for 'bb'
|
||||
fi
|
||||
done
|
||||
|
||||
average=$(bc <<< "scale=2; $average/$count")
|
||||
echo "TOTAL: $average"
|
||||
@@ -106,11 +106,11 @@ def get_viv_extractor(path: Path):
|
||||
]
|
||||
|
||||
if "raw32" in path.name:
|
||||
vw = capa.main.get_workspace(path, "sc32", sigpaths=sigpaths)
|
||||
vw = capa.loader.get_workspace(path, "sc32", sigpaths=sigpaths)
|
||||
elif "raw64" in path.name:
|
||||
vw = capa.main.get_workspace(path, "sc64", sigpaths=sigpaths)
|
||||
vw = capa.loader.get_workspace(path, "sc64", sigpaths=sigpaths)
|
||||
else:
|
||||
vw = capa.main.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths)
|
||||
vw = capa.loader.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths)
|
||||
vw.saveWorkspace()
|
||||
extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, OS_AUTO)
|
||||
fixup_viv(path, extractor)
|
||||
|
||||
@@ -40,7 +40,10 @@ def get_rule_path():
|
||||
[
|
||||
pytest.param("capa2yara.py", [get_rules_path()]),
|
||||
pytest.param("capafmt.py", [get_rule_path()]),
|
||||
# not testing lint.py as it runs regularly anyway
|
||||
# testing some variations of linter script
|
||||
pytest.param("lint.py", ["-t", "create directory", get_rules_path()]),
|
||||
# `create directory` rule has native and .NET example PEs
|
||||
pytest.param("lint.py", ["--thorough", "-t", "create directory", get_rules_path()]),
|
||||
pytest.param("match-function-id.py", [get_file_path()]),
|
||||
pytest.param("show-capabilities-by-function.py", [get_file_path()]),
|
||||
pytest.param("show-features.py", [get_file_path()]),
|
||||
|
||||
Reference in New Issue
Block a user