library detection: merge flirt and string branches

This commit is contained in:
Willi Ballenthin
2024-10-11 13:43:10 +00:00
parent 637926e0b6
commit ad187fc3bd
5 changed files with 244 additions and 258 deletions

View File

@@ -1,3 +1,11 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import sys
import time
@@ -6,6 +14,7 @@ import argparse
from pathlib import Path
import rich
from pydantic import BaseModel
from rich.console import Console
from rich.logging import RichHandler
@@ -27,9 +36,6 @@ import ida_funcs
logger = logging.getLogger(__name__)
from pydantic import BaseModel
def colorbool(v: bool) -> str:
if v:
return f"[green]{str(v)}[/green]"

View File

@@ -1,193 +1,8 @@
"""
further requirements:
- nltk
"""
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import logging
import collections
from pathlib import Path
import rich
from rich.text import Text
import capa.analysis.strings
import capa.features.extractors.strings
from capa.analysis.strings import LibraryStringDatabase
logger = logging.getLogger(__name__)
def extract_strings(buf, n=4):
yield from capa.features.extractors.strings.extract_ascii_strings(buf, n=n)
yield from capa.features.extractors.strings.extract_unicode_strings(buf, n=n)
def prune_databases(dbs: list[LibraryStringDatabase], n=8):
"""remove less trustyworthy database entries.
such as:
- those found in multiple databases
- those that are English words
- those that are too short
- Windows API and DLL names
"""
# TODO: consider applying these filters directly to the persisted databases, not at load time.
winapi = capa.analysis.strings.WindowsApiStringDatabase.from_defaults()
try:
from nltk.corpus import words as nltk_words
except ImportError:
# one-time download of dataset.
# this probably doesn't work well for embedded use.
import nltk
nltk.download("words")
from nltk.corpus import words as nltk_words
words = set(nltk_words.words())
counter = collections.Counter()
to_remove = set()
for db in dbs:
for string in db.metadata_by_string.keys():
counter[string] += 1
if string in words:
to_remove.add(string)
continue
if len(string) < n:
to_remove.add(string)
continue
if string in winapi.api_names:
to_remove.add(string)
continue
if string in winapi.dll_names:
to_remove.add(string)
continue
for string, count in counter.most_common():
if count <= 1:
break
# remove strings that are seen in more than one database
to_remove.add(string)
for db in dbs:
for string in to_remove:
if string in db.metadata_by_string:
del db.metadata_by_string[string]
def open_ida(input_path: Path):
import tempfile
import idapro
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
t.write_bytes(input_path.read_bytes())
# resource leak: we should delete this upon exit
idapro.enable_console_messages(False)
idapro.open_database(str(t.absolute()), run_auto_analysis=True)
import ida_auto
ida_auto.auto_wait()
def main():
logging.basicConfig(level=logging.DEBUG)
# use n=8 to ignore common words
N = 8
input_path = Path(sys.argv[1])
input_buf = input_path.read_bytes()
dbs = capa.analysis.strings.get_default_databases()
prune_databases(dbs, n=N)
strings_by_library = collections.defaultdict(set)
for string in extract_strings(input_path.read_bytes(), n=N):
for db in dbs:
if (metadata := db.metadata_by_string.get(string.s)):
strings_by_library[metadata.library_name].add(string.s)
console = rich.get_console()
console.print(f"found libraries:", style="bold")
for library, strings in sorted(strings_by_library.items(), key=lambda p: len(p[1]), reverse=True):
console.print(f" - [b]{library}[/] ({len(strings)} strings)")
for string in sorted(strings)[:10]:
console.print(f" - {string}", markup=False, style="grey37")
if len(strings) > 10:
console.print(" ...", style="grey37")
if not strings_by_library:
console.print(" (none)", style="grey37")
# since we're not going to find any strings
# return early and don't do IDA analysis
return
# TODO: ensure there are XXX matches for each library, or ignore those entries
open_ida(input_path)
import idaapi
import idautils
import ida_funcs
import capa.features.extractors.ida.helpers as ida_helpers
strings_by_function = collections.defaultdict(set)
for ea in idautils.Functions():
f = idaapi.get_func(ea)
# ignore library functions and thunk functions as identified by IDA
if f.flags & idaapi.FUNC_THUNK:
continue
if f.flags & idaapi.FUNC_LIB:
continue
for bb in ida_helpers.get_function_blocks(f):
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
if ref == insn.ea:
continue
string = capa.features.extractors.ida.helpers.find_string_at(ref)
if not string:
continue
for db in dbs:
if (metadata := db.metadata_by_string.get(string)):
strings_by_function[ea].add(string)
# ensure there are at least XXX functions renamed, or ignore those entries
console.print("functions:", style="bold")
for function, strings in sorted(strings_by_function.items()):
if strings:
name = ida_funcs.get_func_name(function)
console.print(f" [b]{name}[/]@{function:08x}:")
for string in strings:
for db in dbs:
if (metadata := db.metadata_by_string.get(string)):
location = Text(f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}", style="grey37")
console.print(" - ", location, ": ", string.rstrip())
# TODO: ensure there aren't conflicts among the matches
console.print()
console.print(f"found {len(strings_by_function)} library functions across {len(list(idautils.Functions()))} functions")
if __name__ == "__main__":
main()

View File

@@ -1,10 +1,28 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
"""
further requirements:
- nltk
"""
import gzip
import pathlib
from typing import Dict, Sequence
import logging
import collections
from typing import Dict
from pathlib import Path
from dataclasses import dataclass
import msgspec
import capa.features.extractors.strings
logger = logging.getLogger(__name__)
class LibraryString(msgspec.Struct):
string: str
@@ -23,7 +41,7 @@ class LibraryStringDatabase:
return len(self.metadata_by_string)
@classmethod
def from_file(cls, path: pathlib.Path) -> "LibraryStringDatabase":
def from_file(cls, path: Path) -> "LibraryStringDatabase":
metadata_by_string: Dict[str, LibraryString] = {}
decoder = msgspec.json.Decoder(type=LibraryString)
for line in gzip.decompress(path.read_bytes()).split(b"\n"):
@@ -55,12 +73,12 @@ DEFAULT_FILENAMES = (
"zlib.jsonl.gz",
)
DEFAULT_PATHS = tuple(
pathlib.Path(__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES
) + (pathlib.Path(__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",)
DEFAULT_PATHS = tuple(Path(__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES) + (
Path(__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",
)
def get_default_databases() -> Sequence[LibraryStringDatabase]:
def get_default_databases() -> list[LibraryStringDatabase]:
return [LibraryStringDatabase.from_file(path) for path in DEFAULT_PATHS]
@@ -73,9 +91,9 @@ class WindowsApiStringDatabase:
return len(self.dll_names) + len(self.api_names)
@classmethod
def from_dir(cls, path: pathlib.Path) -> "WindowsApiStringDatabase":
dll_names: Set[str] = set()
api_names: Set[str] = set()
def from_dir(cls, path: Path) -> "WindowsApiStringDatabase":
dll_names: set[str] = set()
api_names: set[str] = set()
for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines():
if not line:
@@ -91,5 +109,69 @@ class WindowsApiStringDatabase:
@classmethod
def from_defaults(cls) -> "WindowsApiStringDatabase":
return cls.from_dir(pathlib.Path(__file__).parent / "data" / "winapi")
return cls.from_dir(Path(__file__).parent / "data" / "winapi")
def extract_strings(buf, n=4):
yield from capa.features.extractors.strings.extract_ascii_strings(buf, n=n)
yield from capa.features.extractors.strings.extract_unicode_strings(buf, n=n)
def prune_databases(dbs: list[LibraryStringDatabase], n=8):
"""remove less trustyworthy database entries.
such as:
- those found in multiple databases
- those that are English words
- those that are too short
- Windows API and DLL names
"""
# TODO: consider applying these filters directly to the persisted databases, not at load time.
winapi = WindowsApiStringDatabase.from_defaults()
try:
from nltk.corpus import words as nltk_words
except ImportError:
# one-time download of dataset.
# this probably doesn't work well for embedded use.
import nltk
nltk.download("words")
from nltk.corpus import words as nltk_words
words = set(nltk_words.words())
counter: collections.Counter[str] = collections.Counter()
to_remove = set()
for db in dbs:
for string in db.metadata_by_string.keys():
counter[string] += 1
if string in words:
to_remove.add(string)
continue
if len(string) < n:
to_remove.add(string)
continue
if string in winapi.api_names:
to_remove.add(string)
continue
if string in winapi.dll_names:
to_remove.add(string)
continue
for string, count in counter.most_common():
if count <= 1:
break
# remove strings that are seen in more than one database
to_remove.add(string)
for db in dbs:
for string in to_remove:
if string in db.metadata_by_string:
del db.metadata_by_string[string]

View File

@@ -0,0 +1,135 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import logging
import collections
from pathlib import Path
import rich
from rich.text import Text
import capa.analysis.strings
import capa.features.extractors.strings
logger = logging.getLogger(__name__)
def open_ida(input_path: Path):
import tempfile
import idapro
t = Path(tempfile.mkdtemp(prefix="ida-")) / input_path.name
t.write_bytes(input_path.read_bytes())
# resource leak: we should delete this upon exit
idapro.enable_console_messages(False)
idapro.open_database(str(t.absolute()), run_auto_analysis=True)
import ida_auto
ida_auto.auto_wait()
def main():
logging.basicConfig(level=logging.DEBUG)
# use n=8 to ignore common words
N = 8
input_path = Path(sys.argv[1])
dbs = capa.analysis.strings.get_default_databases()
capa.analysis.strings.prune_databases(dbs, n=N)
strings_by_library = collections.defaultdict(set)
for string in capa.analysis.strings.extract_strings(input_path.read_bytes(), n=N):
for db in dbs:
if metadata := db.metadata_by_string.get(string.s):
strings_by_library[metadata.library_name].add(string.s)
console = rich.get_console()
console.print("found libraries:", style="bold")
for library, strings in sorted(strings_by_library.items(), key=lambda p: len(p[1]), reverse=True):
console.print(f" - [b]{library}[/] ({len(strings)} strings)")
for string in sorted(strings)[:10]:
console.print(f" - {string}", markup=False, style="grey37")
if len(strings) > 10:
console.print(" ...", style="grey37")
if not strings_by_library:
console.print(" (none)", style="grey37")
# since we're not going to find any strings
# return early and don't do IDA analysis
return
# TODO: ensure there are XXX matches for each library, or ignore those entries
open_ida(input_path)
import idaapi
import idautils
import ida_funcs
import capa.features.extractors.ida.helpers as ida_helpers
strings_by_function = collections.defaultdict(set)
for ea in idautils.Functions():
f = idaapi.get_func(ea)
# ignore library functions and thunk functions as identified by IDA
if f.flags & idaapi.FUNC_THUNK:
continue
if f.flags & idaapi.FUNC_LIB:
continue
for bb in ida_helpers.get_function_blocks(f):
for insn in ida_helpers.get_instructions_in_range(bb.start_ea, bb.end_ea):
ref = capa.features.extractors.ida.helpers.find_data_reference_from_insn(insn)
if ref == insn.ea:
continue
string = capa.features.extractors.ida.helpers.find_string_at(ref)
if not string:
continue
for db in dbs:
if metadata := db.metadata_by_string.get(string):
strings_by_function[ea].add(string)
# ensure there are at least XXX functions renamed, or ignore those entries
console.print("functions:", style="bold")
for function, strings in sorted(strings_by_function.items()):
if strings:
name = ida_funcs.get_func_name(function)
console.print(f" [b]{name}[/]@{function:08x}:")
for string in strings:
for db in dbs:
if metadata := db.metadata_by_string.get(string):
location = Text(
f"{metadata.library_name}@{metadata.library_version}::{metadata.function_name}",
style="grey37",
)
console.print(" - ", location, ": ", string.rstrip())
# TODO: ensure there aren't conflicts among the matches
console.print()
console.print(
f"found {len(strings_by_function)} library functions across {len(list(idautils.Functions()))} functions"
)
if __name__ == "__main__":
main()

View File

@@ -1,52 +0,0 @@
"""
convert from a jh CSV file to a .jsonl.gz OpenSourceString database.
the jh file looks like:
# triplet,compiler,library,version,profile,path,function,type,value
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0x00000100
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0xfffffff8
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,number,0xfffffffe
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,api,BZ2_bzCompressInit
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffCompress,api,handle_compress
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0x0000fa90
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0xfffffff8
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0xfffffff9
x64-windows-static,msvc143,bzip2,1.0.8#3,release,CMakeFiles/bz2.dir/bzlib.c.obj,BZ2_bzBuffToBuffDecompress,number,0xfffffffd
jh is found here: https://github.com/williballenthin/lancelot/blob/master/bin/src/bin/jh.rs
"""
import sys
import json
import pathlib
import msgspec
from capa.analysis.strings import LibraryString
p = pathlib.Path(sys.argv[1])
for line in p.read_text().split("\n"):
if not line:
continue
if line.startswith("#"):
continue
triplet, compiler, library, version, profile, path, function, rest = line.split(",", 7)
type, _, value = rest.partition(",")
if type != "string":
continue
if value.startswith('"'):
value = json.loads(value)
s = LibraryString(
string=value,
library_name=library,
library_version=version,
file_path=path,
function_name=function,
)
sys.stdout.buffer.write(msgspec.json.encode(s))
sys.stdout.buffer.write(b"\n")