Compare commits

..

17 Commits

Author SHA1 Message Date
Willi Ballenthin
2ec979469e init show-object-layout using assemblage 2024-10-22 09:40:09 +00:00
Willi Ballenthin
2db0cc457f detect user code via entry points (main function name) 2024-10-22 09:21:59 +00:00
Willi Ballenthin
3cad8d12af mute unknown lines 2024-10-22 09:21:40 +00:00
Willi Ballenthin
5be96d7ddc consider thunks library functions 2024-10-22 09:21:16 +00:00
mr-tz
a3b6aef67f render from doc 2024-10-21 12:43:47 +00:00
mr-tz
077fa2e7e1 simplify and include thunks 2024-10-21 11:50:25 +00:00
mr-tz
c3b8e7c638 remove Python 3.12 syntax 2024-10-21 11:49:45 +00:00
Willi Ballenthin
4346922b9a library-detection: add json output format 2024-10-21 10:42:30 +00:00
Willi Ballenthin
d652192af1 library-detection: cleanup script 2024-10-21 10:26:19 +00:00
Moritz
d83750c901 Add LookupError exception 2024-10-15 17:10:59 +02:00
mr-tz
8394b81841 init add result structure and render 2024-10-14 16:05:01 +00:00
mr-tz
febda7d0e2 add option to save idb 2024-10-14 06:15:06 +00:00
mr-tz
f9abb5e83f ease/document extra dependency 2024-10-14 05:53:03 +00:00
Willi Ballenthin
f69602d085 library detection: rough integration of algorithms 2024-10-11 15:58:37 +00:00
Willi Ballenthin
ad187fc3bd library detection: merge flirt and string branches 2024-10-11 13:43:10 +00:00
mr-tz
637926e0b6 initial commit of out-of-the box flirt-based library id 2024-10-11 12:36:42 +00:00
Willi Ballenthin
03ce40e781 initial attempt at library identification via known strings 2024-10-10 12:35:48 +00:00
32 changed files with 969 additions and 3 deletions

View File

@@ -108,6 +108,7 @@ repos:
- "--check-untyped-defs"
- "--ignore-missing-imports"
- "--config-file=.github/mypy/mypy.ini"
- "--enable-incomplete-feature=NewGenericSyntax"
- "capa/"
- "scripts/"
- "tests/"

View File

38
capa/analysis/flirt.py Normal file
View File

@@ -0,0 +1,38 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from pydantic import BaseModel
import capa.features.extractors.ida.idalib as idalib
if not idalib.has_idalib():
raise RuntimeError("cannot find IDA idalib module.")
if not idalib.load_idalib():
raise RuntimeError("failed to load IDA idalib module.")
import idaapi
import idautils
class FunctionId(BaseModel):
va: int
is_library: bool
name: str
def get_flirt_matches(lib_only=True):
for fva in idautils.Functions():
f = idaapi.get_func(fva)
is_lib = bool(f.flags & idaapi.FUNC_LIB)
fname = idaapi.get_func_name(fva)
if lib_only and not is_lib:
continue
yield FunctionId(va=fva, is_library=is_lib, name=fname)

240
capa/analysis/libraries.py Normal file
View File

@@ -0,0 +1,240 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import sys
import logging
import argparse
import tempfile
import contextlib
from enum import Enum
from typing import List, Optional
from pathlib import Path
import rich
from pydantic import BaseModel
from rich.text import Text
from rich.console import Console
import capa.main
import capa.helpers
import capa.analysis.flirt
import capa.analysis.strings
import capa.features.extractors.ida.idalib as idalib
if not idalib.has_idalib():
raise RuntimeError("cannot find IDA idalib module.")
if not idalib.load_idalib():
raise RuntimeError("failed to load IDA idalib module.")
import idaapi
import idapro
import ida_auto
import idautils
logger = logging.getLogger(__name__)
class Classification(str, Enum):
USER = "user"
LIBRARY = "library"
UNKNOWN = "unknown"
class Method(str, Enum):
FLIRT = "flirt"
STRINGS = "strings"
THUNK = "thunk"
ENTRYPOINT = "entrypoint"
class FunctionClassification(BaseModel):
va: int
classification: Classification
# name per the disassembler/analysis tool
# may be combined with the recovered/suspected name TODO below
name: str
# if is library, this must be provided
method: Optional[Method]
# TODO if is library, recovered/suspected name?
# if is library, these can optionally be provided.
library_name: Optional[str] = None
library_version: Optional[str] = None
class FunctionIdResults(BaseModel):
function_classifications: List[FunctionClassification]
@contextlib.contextmanager
def ida_session(input_path: Path, use_temp_dir=True):
if use_temp_dir:
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
else:
t = input_path
logger.debug("using %s", str(t))
# stderr=True is used here to redirect the spinner banner to stderr,
# so that users can redirect capa's output.
console = Console(stderr=True, quiet=False)
try:
if use_temp_dir:
t.write_bytes(input_path.read_bytes())
# idalib writes to stdout (ugh), so we have to capture that
# so as not to screw up structured output.
with capa.helpers.stdout_redirector(io.BytesIO()):
idapro.enable_console_messages(False)
with capa.main.timing("analyze program"):
with console.status("analyzing program...", spinner="dots"):
if idapro.open_database(str(t.absolute()), run_auto_analysis=True):
raise RuntimeError("failed to analyze input file")
logger.debug("idalib: waiting for analysis...")
ida_auto.auto_wait()
logger.debug("idalib: opened database.")
yield
finally:
idapro.close_database()
if use_temp_dir:
t.unlink()
def is_thunk_function(fva):
f = idaapi.get_func(fva)
return bool(f.flags & idaapi.FUNC_THUNK)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Identify library functions using various strategies.")
capa.main.install_common_args(parser, wanted={"input_file"})
parser.add_argument("--store-idb", action="store_true", default=False, help="store IDA database file")
parser.add_argument("--min-string-length", type=int, default=8, help="minimum string length")
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
args = parser.parse_args(args=argv)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
dbs = capa.analysis.strings.get_default_databases()
capa.analysis.strings.prune_databases(dbs, n=args.min_string_length)
function_classifications: List[FunctionClassification] = []
with ida_session(args.input_file, use_temp_dir=not args.store_idb):
with capa.main.timing("FLIRT-based library identification"):
# TODO: add more signature (files)
# TOOD: apply more signatures
for flirt_match in capa.analysis.flirt.get_flirt_matches():
function_classifications.append(
FunctionClassification(
va=flirt_match.va,
name=flirt_match.name,
classification=Classification.LIBRARY,
method=Method.FLIRT,
# note: we cannot currently include which signature matched per function via the IDA API
)
)
# thunks
for fva in idautils.Functions():
if is_thunk_function(fva):
function_classifications.append(
FunctionClassification(
va=fva,
name=idaapi.get_func_name(fva),
classification=Classification.LIBRARY,
method=Method.THUNK,
)
)
with capa.main.timing("string-based library identification"):
for string_match in capa.analysis.strings.get_string_matches(dbs):
function_classifications.append(
FunctionClassification(
va=string_match.va,
name=idaapi.get_func_name(string_match.va),
classification=Classification.LIBRARY,
method=Method.STRINGS,
library_name=string_match.metadata.library_name,
library_version=string_match.metadata.library_version,
)
)
for va in idautils.Functions():
name = idaapi.get_func_name(va)
if name not in {"WinMain", }:
continue
function_classifications.append(
FunctionClassification(
va=va,
name=name,
classification=Classification.USER,
method=Method.ENTRYPOINT,
)
)
doc = FunctionIdResults(function_classifications=[])
classifications_by_va = capa.analysis.strings.create_index(function_classifications, "va")
for va in idautils.Functions():
if classifications := classifications_by_va.get(va):
doc.function_classifications.extend(classifications)
else:
doc.function_classifications.append(
FunctionClassification(
va=va,
name=idaapi.get_func_name(va),
classification=Classification.UNKNOWN,
method=None,
)
)
if args.json:
print(doc.model_dump_json()) # noqa: T201 print found
else:
table = rich.table.Table()
table.add_column("FVA")
table.add_column("CLASSIFICATION")
table.add_column("METHOD")
table.add_column("FNAME")
table.add_column("EXTRA INFO")
classifications_by_va = capa.analysis.strings.create_index(doc.function_classifications, "va", sorted_=True)
for va, classifications in classifications_by_va.items():
name = ", ".join({c.name for c in classifications})
if "sub_" in name:
name = Text(name, style="grey53")
classification = {c.classification for c in classifications}
method = {c.method for c in classifications if c.method}
extra = {f"{c.library_name}@{c.library_version}" for c in classifications if c.library_name}
table.add_row(
hex(va),
", ".join(classification) if classification != {"unknown"} else Text("unknown", style="grey53"),
", ".join(method),
name,
", ".join(extra),
)
rich.print(table)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,2 @@
# temporary extra file to track dependencies of the analysis directory
nltk==3.9.1

View File

@@ -0,0 +1,269 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
further requirements:
- nltk
"""
import gzip
import logging
import collections
from typing import Any, Dict, Mapping
from pathlib import Path
from dataclasses import dataclass
import msgspec
import capa.features.extractors.strings
logger = logging.getLogger(__name__)
class LibraryString(msgspec.Struct):
string: str
library_name: str
library_version: str
file_path: str | None = None
function_name: str | None = None
line_number: int | None = None
@dataclass
class LibraryStringDatabase:
metadata_by_string: Dict[str, LibraryString]
def __len__(self) -> int:
return len(self.metadata_by_string)
@classmethod
def from_file(cls, path: Path) -> "LibraryStringDatabase":
metadata_by_string: Dict[str, LibraryString] = {}
decoder = msgspec.json.Decoder(type=LibraryString)
for line in gzip.decompress(path.read_bytes()).split(b"\n"):
if not line:
continue
s = decoder.decode(line)
metadata_by_string[s.string] = s
return cls(metadata_by_string=metadata_by_string)
DEFAULT_FILENAMES = (
"brotli.jsonl.gz",
"bzip2.jsonl.gz",
"cryptopp.jsonl.gz",
"curl.jsonl.gz",
"detours.jsonl.gz",
"jemalloc.jsonl.gz",
"jsoncpp.jsonl.gz",
"kcp.jsonl.gz",
"liblzma.jsonl.gz",
"libsodium.jsonl.gz",
"libpcap.jsonl.gz",
"mbedtls.jsonl.gz",
"openssl.jsonl.gz",
"sqlite3.jsonl.gz",
"tomcrypt.jsonl.gz",
"wolfssl.jsonl.gz",
"zlib.jsonl.gz",
)
DEFAULT_PATHS = tuple(Path(__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES) + (
Path(__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",
)
def get_default_databases() -> list[LibraryStringDatabase]:
return [LibraryStringDatabase.from_file(path) for path in DEFAULT_PATHS]
@dataclass
class WindowsApiStringDatabase:
dll_names: set[str]
api_names: set[str]
def __len__(self) -> int:
return len(self.dll_names) + len(self.api_names)
@classmethod
def from_dir(cls, path: Path) -> "WindowsApiStringDatabase":
dll_names: set[str] = set()
api_names: set[str] = set()
for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines():
if not line:
continue
dll_names.add(line)
for line in gzip.decompress((path / "apis.txt.gz").read_bytes()).decode("utf-8").splitlines():
if not line:
continue
api_names.add(line)
return cls(dll_names=dll_names, api_names=api_names)
@classmethod
def from_defaults(cls) -> "WindowsApiStringDatabase":
return cls.from_dir(Path(__file__).parent / "data" / "winapi")
def extract_strings(buf, n=4):
yield from capa.features.extractors.strings.extract_ascii_strings(buf, n=n)
yield from capa.features.extractors.strings.extract_unicode_strings(buf, n=n)
def prune_databases(dbs: list[LibraryStringDatabase], n=8):
"""remove less trustyworthy database entries.
such as:
- those found in multiple databases
- those that are English words
- those that are too short
- Windows API and DLL names
"""
# TODO: consider applying these filters directly to the persisted databases, not at load time.
winapi = WindowsApiStringDatabase.from_defaults()
try:
from nltk.corpus import words as nltk_words
nltk_words.words()
except (ImportError, LookupError):
# one-time download of dataset.
# this probably doesn't work well for embedded use.
import nltk
nltk.download("words")
from nltk.corpus import words as nltk_words
words = set(nltk_words.words())
counter: collections.Counter[str] = collections.Counter()
to_remove = set()
for db in dbs:
for string in db.metadata_by_string.keys():
counter[string] += 1
if string in words:
to_remove.add(string)
continue
if len(string) < n:
to_remove.add(string)
continue
if string in winapi.api_names:
to_remove.add(string)
continue
if string in winapi.dll_names:
to_remove.add(string)
continue
for string, count in counter.most_common():
if count <= 1:
break
# remove strings that are seen in more than one database
to_remove.add(string)
for db in dbs:
for string in to_remove:
if string in db.metadata_by_string:
del db.metadata_by_string[string]
def get_function_strings():
import idaapi
import idautils
import capa.features.extractors.ida.helpers as ida_helpers
strings_by_function = collections.defaultdict(set)
for ea in idautils.Functions():
f = idaapi.get_func(ea)
# ignore library functions and thunk functions as identified by IDA
if f.flags & idaapi.FUNC_THUNK:
continue
if f.flags & idaapi.FUNC_LIB:
continue
for bb in ida_helpers.get_function_blocks(f):
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
if ref == insn.ea:
continue
string = capa.features.extractors.ida.helpers.find_string_at(ref)
if not string:
continue
strings_by_function[ea].add(string)
return strings_by_function
@dataclass
class LibraryStringClassification:
va: int
string: str
library_name: str
metadata: LibraryString
def create_index(s: list, k: str, sorted_: bool = False) -> Mapping[Any, list]:
"""create an index of the elements in `s` using the key `k`, optionally sorted by `k`"""
if sorted_:
s = sorted(s, key=lambda x: getattr(x, k))
s_by_k = collections.defaultdict(list)
for v in s:
p = getattr(v, k)
s_by_k[p].append(v)
return s_by_k
def get_string_matches(dbs: list[LibraryStringDatabase]) -> list[LibraryStringClassification]:
matches: list[LibraryStringClassification] = []
for function, strings in sorted(get_function_strings().items()):
for string in strings:
for db in dbs:
if metadata := db.metadata_by_string.get(string):
matches.append(
LibraryStringClassification(
va=function,
string=string,
library_name=metadata.library_name,
metadata=metadata,
)
)
# if there are less than N strings per library, ignore that library
matches_by_library = create_index(matches, "library_name")
for library_name, library_matches in matches_by_library.items():
if len(library_matches) > 5:
continue
logger.info("pruning library %s: only %d matched string", library_name, len(library_matches))
matches = [m for m in matches if m.library_name != library_name]
# if there are conflicts within a single function, don't label it
matches_by_function = create_index(matches, "va")
for va, function_matches in matches_by_function.items():
library_names = {m.library_name for m in function_matches}
if len(library_names) == 1:
continue
logger.info("conflicting matches: 0x%x: %s", va, sorted(library_names))
# this is potentially slow (O(n**2)) but hopefully fast enough in practice.
matches = [m for m in matches if m.va != va]
return matches

View File

@@ -0,0 +1,130 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import logging
import collections
from pathlib import Path
import rich
from rich.text import Text
import capa.analysis.strings
import capa.features.extractors.strings
import capa.features.extractors.ida.helpers as ida_helpers
logger = logging.getLogger(__name__)
def open_ida(input_path: Path):
import tempfile
import idapro
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
t.write_bytes(input_path.read_bytes())
# resource leak: we should delete this upon exit
idapro.enable_console_messages(False)
idapro.open_database(str(t.absolute()), run_auto_analysis=True)
import ida_auto
ida_auto.auto_wait()
def main():
logging.basicConfig(level=logging.DEBUG)
# use n=8 to ignore common words
N = 8
input_path = Path(sys.argv[1])
dbs = capa.analysis.strings.get_default_databases()
capa.analysis.strings.prune_databases(dbs, n=N)
strings_by_library = collections.defaultdict(set)
for string in capa.analysis.strings.extract_strings(input_path.read_bytes(), n=N):
for db in dbs:
if metadata := db.metadata_by_string.get(string.s):
strings_by_library[metadata.library_name].add(string.s)
console = rich.get_console()
console.print("found libraries:", style="bold")
for library, strings in sorted(strings_by_library.items(), key=lambda p: len(p[1]), reverse=True):
console.print(f" - [b]{library}[/] ({len(strings)} strings)")
for string in sorted(strings)[:10]:
console.print(f" - {string}", markup=False, style="grey37")
if len(strings) > 10:
console.print(" ...", style="grey37")
if not strings_by_library:
console.print(" (none)", style="grey37")
# since we're not going to find any strings
# return early and don't do IDA analysis
return
open_ida(input_path)
import idaapi
import idautils
import ida_funcs
strings_by_function = collections.defaultdict(set)
for ea in idautils.Functions():
f = idaapi.get_func(ea)
# ignore library functions and thunk functions as identified by IDA
if f.flags & idaapi.FUNC_THUNK:
continue
if f.flags & idaapi.FUNC_LIB:
continue
for bb in ida_helpers.get_function_blocks(f):
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
if ref == insn.ea:
continue
string = capa.features.extractors.ida.helpers.find_string_at(ref)
if not string:
continue
for db in dbs:
if metadata := db.metadata_by_string.get(string):
strings_by_function[ea].add(string)
# ensure there are at least XXX functions renamed, or ignore those entries
console.print("functions:", style="bold")
for function, strings in sorted(strings_by_function.items()):
if strings:
name = ida_funcs.get_func_name(function)
console.print(f" [b]{name}[/]@{function:08x}:")
for string in strings:
for db in dbs:
if metadata := db.metadata_by_string.get(string):
location = Text(
f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}",
style="grey37",
)
console.print(" - ", location, ": ", string.rstrip())
console.print()
console.print(
f"found {len(strings_by_function)} library functions across {len(list(idautils.Functions()))} functions"
)
if __name__ == "__main__":
main()

Binary file not shown.

View File

@@ -0,0 +1,3 @@
*.csv
*.jsonl
*.jsonl.gz

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,99 @@
# Strings from Open Source libraries
This directory contains databases of strings extracted from open soure software.
capa uses these databases to ignore functions that are likely library code.
There is one file for each database. Each database is a gzip-compressed, JSONL (one JSON document per line) file.
The JSON document looks like this:
string: "1.0.8, 13-Jul-2019"
library_name: "bzip2"
library_version: "1.0.8#3"
file_path: "CMakeFiles/bz2.dir/bzlib.c.obj"
function_name: "BZ2_bzlibVersion"
line_number: null
The following databases were extracted via the vkpkg & jh technique:
- brotli 1.0.9#5
- bzip2 1.0.8#3
- cryptopp 8.7.0
- curl 7.86.0#1
- detours 4.0.1#7
- jemalloc 5.3.0#1
- jsoncpp 1.9.5
- kcp 1.7
- liblzma 5.2.5#6
- libsodium 1.0.18#8
- libpcap 1.10.1#3
- mbedtls 2.28.1
- openssl 3.0.7#1
- sqlite3 3.40.0#1
- tomcrypt 1.18.2#2
- wolfssl 5.5.0
- zlib 1.2.13
This code was originally developed in FLOSS and imported into capa.
## The vkpkg & jh technique
Major steps:
1. build static libraries via vcpkg
2. extract features via jh
3. convert to JSONL format with `jh_to_qs.py`
4. compress with gzip
### Build static libraries via vcpkg
[vcpkg](https://vcpkg.io/en/) is a free C/C++ package manager for acquiring and managing libraries.
We use it to easily build common open source libraries, like zlib.
Use the triplet `x64-windows-static` to build static archives (.lib files that are AR archives containing COFF object files):
```console
PS > C:\vcpkg\vcpkg.exe install --triplet x64-windows-static zlib
```
### Extract features via jh
[jh](https://github.com/williballenthin/lancelot/blob/master/bin/src/bin/jh.rs)
is a lancelot-based utility that parses AR archives containing COFF object files,
reconstructs their control flow, finds functions, and extracts features.
jh extracts numbers, API calls, and strings; we are only interested in the string features.
For each feature, jh emits a CSV line with the fields
- target triplet
- compiler
- library
- version
- build profile
- path
- function
- feature type
- feature value
For example:
```csv
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0x00000100
```
For example, to invoke jh:
```console
$ ~/lancelot/target/release/jh x64-windows-static msvc143 zlib 1.2.13 release /mnt/c/vcpkg/installed/x64-windows-static/lib/zlib.lib > ~/flare-floss/floss/qs/db/data/oss/zlib.csv
```
### Convert to OSS database format
We use the script `jh_to_qs.py` to convert these CSV lines into JSONL file prepared for FLOSS:
```console
$ python3 jh_to_qs.py zlib.csv > zlib.jsonl
```
These files are then gzip'd:
```console
$ gzip -c zlib.jsonl > zlib.jsonl.gz
```

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -19,6 +19,7 @@ from pathlib import Path
from zipfile import ZipFile
from datetime import datetime
import msgspec.json
from rich.console import Console
from rich.progress import (
Task,
@@ -153,7 +154,6 @@ def stdout_redirector(stream):
def load_json_from_path(json_path: Path):
import msgspec.json
with gzip.open(json_path, "r") as compressed_report:
try:
report_json = compressed_report.read()
@@ -165,7 +165,6 @@ def load_json_from_path(json_path: Path):
def decode_json_lines(fd: Union[BinaryIO, gzip.GzipFile]):
import msgspec.json
for line in fd:
try:
line_s = line.strip().decode()
@@ -187,7 +186,6 @@ def load_jsonl_from_path(jsonl_path: Path) -> Iterator[Dict]:
def load_one_jsonl_from_path(jsonl_path: Path):
# this loads one json line to avoid the overhead of loading the entire file
import msgspec.json
try:
with gzip.open(jsonl_path, "rb") as f:
line = next(iter(f))

View File

@@ -77,6 +77,8 @@ dependencies = [
"protobuf>=5",
"msgspec>=0.18.6",
"xmltodict>=0.13.0",
# for library detection (in development)
"nltk>=3",
# ---------------------------------------
# Dependencies that we develop

View File

@@ -0,0 +1,184 @@
import sys
import sqlite3
import argparse
from pathlib import Path
from dataclasses import dataclass
import pefile
import capa.main
@dataclass
class AssemblageRow:
# from table: binaries
binary_id: int
file_name: str
platform: str
build_mode: str
toolset_version: str
github_url: str
optimization: str
repo_last_update: int
size: int
path: str
license: str
binary_hash: str
repo_commit_hash: str
# from table: functions
function_id: int
function_name: str
function_hash: str
top_comments: str
source_codes: str
prototype: str
_source_file: str
# from table: rvas
rva_id: int
start_rva: int
end_rva: int
@property
def source_file(self):
# cleanup some extra metadata provided by assemblage
return self._source_file.partition(" (MD5: ")[0].partition(" (0x3: ")[0]
class Assemblage:
conn: sqlite3.Connection
samples: Path
def __init__(self, db: Path, samples: Path):
super().__init__()
self.db = db
self.samples = samples
self.conn = sqlite3.connect(self.db)
with self.conn:
self.conn.executescript("""
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA busy_timeout = 5000;
PRAGMA cache_size = -20000; -- 20MB
PRAGMA foreign_keys = true;
PRAGMA temp_store = memory;
BEGIN IMMEDIATE TRANSACTION;
CREATE INDEX IF NOT EXISTS idx__functions__binary_id ON functions (binary_id);
CREATE INDEX IF NOT EXISTS idx__rvas__function_id ON rvas (function_id);
CREATE VIEW IF NOT EXISTS assemblage AS
SELECT
binaries.id AS binary_id,
binaries.file_name AS file_name,
binaries.platform AS platform,
binaries.build_mode AS build_mode,
binaries.toolset_version AS toolset_version,
binaries.github_url AS github_url,
binaries.optimization AS optimization,
binaries.repo_last_update AS repo_last_update,
binaries.size AS size,
binaries.path AS path,
binaries.license AS license,
binaries.hash AS hash,
binaries.repo_commit_hash AS repo_commit_hash,
functions.id AS function_id,
functions.name AS function_name,
functions.hash AS function_hash,
functions.top_comments AS top_comments,
functions.source_codes AS source_codes,
functions.prototype AS prototype,
functions.source_file AS source_file,
rvas.id AS rva_id,
rvas.start AS start_rva,
rvas.end AS end_rva
FROM binaries
JOIN functions ON binaries.id = functions.binary_id
JOIN rvas ON functions.id = rvas.function_id;
""")
def get_row_by_binary_id(self, binary_id: int) -> AssemblageRow:
with self.conn:
cur = self.conn.execute("SELECT * FROM assemblage WHERE binary_id = ? LIMIT 1;", (binary_id, ))
return AssemblageRow(*cur.fetchone())
def get_rows_by_binary_id(self, binary_id: int) -> AssemblageRow:
with self.conn:
cur = self.conn.execute("SELECT * FROM assemblage WHERE binary_id = ?;", (binary_id, ))
row = cur.fetchone()
while row:
yield AssemblageRow(*row)
row = cur.fetchone()
def get_path_by_binary_id(self, binary_id: int) -> Path:
with self.conn:
cur = self.conn.execute("""SELECT path FROM assemblage WHERE binary_id = ? LIMIT 1""", (binary_id, ))
return self.samples / cur.fetchone()[0]
def get_pe_by_binary_id(self, binary_id: int) -> pefile.PE:
path = self.get_path_by_binary_id(binary_id)
return pefile.PE(data=path.read_bytes(), fast_load=True)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Inspect object boundaries in compiled programs")
capa.main.install_common_args(parser, wanted={})
parser.add_argument("assemblage_database", type=Path, help="path to Assemblage database")
parser.add_argument("assemblage_directory", type=Path, help="path to Assemblage samples directory")
parser.add_argument("binary_id", type=int, help="primary key of binary to inspect")
args = parser.parse_args(args=argv)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
if not args.assemblage_database.is_file():
raise ValueError("database doesn't exist")
db = Assemblage(args.assemblage_database, args.assemblage_directory)
# print(db.get_row_by_binary_id(args.binary_id))
# print(db.get_pe_by_binary_id(args.binary_id))
@dataclass
class Function:
file: str
name: str
start_rva: int
end_rva: int
functions = [
Function(
file=m.source_file,
name=m.function_name,
start_rva=m.start_rva,
end_rva=m.end_rva,
)
for m in db.get_rows_by_binary_id(args.binary_id)
]
import rich
import rich.table
print(db.get_path_by_binary_id(args.binary_id))
t = rich.table.Table()
t.add_column("rva")
t.add_column("filename")
t.add_column("name")
for function in sorted(functions, key=lambda f: f.start_rva):
t.add_row(hex(function.start_rva), function.file, function.name)
rich.print(t)
# db.conn.close()
if __name__ == "__main__":
sys.exit(main())