Load .json.gz files directly (#1990)

* Load .json.gz files directly

* Add helper function to load .json and replace json.load references

* add test and update change log

* add .json.gz in EXTENSIONS_DYNAMIC

Co-authored-by: Moritz <mr-tz@users.noreply.github.com>

---------

Co-authored-by: Moritz <mr-tz@users.noreply.github.com>
This commit is contained in:
Rohit Konakalla
2024-02-25 20:31:36 +05:30
committed by GitHub
parent 65b5c46029
commit 9a449b6bd9
5 changed files with 25 additions and 12 deletions

View File

@@ -4,6 +4,7 @@
### New Features
- add function in capa/helpers to load plain and compressed JSON reports #1883 @Rohit1123
### Breaking Changes

View File

@@ -6,6 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import gzip
import json
import inspect
import logging
@@ -30,7 +31,7 @@ from capa.features.common import (
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64")
EXTENSIONS_DYNAMIC = ("json", "json_")
EXTENSIONS_DYNAMIC = ("json", "json_", "json.gz")
EXTENSIONS_ELF = "elf_"
EXTENSIONS_FREEZE = "frz"
@@ -70,9 +71,19 @@ def assert_never(value) -> NoReturn:
assert False, f"Unhandled value: {value} ({type(value).__name__})" # noqa: B011
def get_format_from_report(sample: Path) -> str:
report = json.load(sample.open(encoding="utf-8"))
def load_json_from_path(json_path: Path):
with gzip.open(json_path, "r") as compressed_report:
try:
report_json = compressed_report.read()
except gzip.BadGzipFile:
report = json.load(json_path.open(encoding="utf-8"))
else:
report = json.loads(report_json)
return report
def get_format_from_report(sample: Path) -> str:
report = load_json_from_path(sample)
if "CAPE" in report:
return FORMAT_CAPE

View File

@@ -6,7 +6,6 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import json
import logging
import datetime
from typing import Set, Dict, List, Optional
@@ -180,7 +179,7 @@ def get_extractor(
if backend == BACKEND_CAPE:
import capa.features.extractors.cape.extractor
report = json.loads(input_path.read_text(encoding="utf-8"))
report = capa.helpers.load_json_from_path(input_path)
return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)
elif backend == BACKEND_DOTNET:
@@ -297,7 +296,7 @@ def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtr
elif input_format == FORMAT_CAPE:
import capa.features.extractors.cape.extractor
report = json.loads(input_file.read_text(encoding="utf-8"))
report = capa.helpers.load_json_from_path(input_file)
file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report))
return file_extractors

View File

@@ -191,14 +191,10 @@ def get_binja_extractor(path: Path):
@lru_cache(maxsize=1)
def get_cape_extractor(path):
import gzip
import json
from capa.helpers import load_json_from_path
from capa.features.extractors.cape.extractor import CapeExtractor
with gzip.open(path, "r") as compressed_report:
report_json = compressed_report.read()
report = json.loads(report_json)
report = load_json_from_path(path)
return CapeExtractor.from_report(report)

View File

@@ -356,3 +356,9 @@ def test_main_cape1(tmp_path):
assert capa.main.main([str(path), "-j", "-r", str(rules)]) == 0
assert capa.main.main([str(path), "-v", "-r", str(rules)]) == 0
assert capa.main.main([str(path), "-vv", "-r", str(rules)]) == 0
def test_main_cape_gzip():
# tests successful execution of .json.gz
path = str(fixtures.get_data_path_by_name("0000a657"))
assert capa.main.main([path]) == 0