2024-02-09 13:44:47 +00:00
|
|
|
# Copyright 2024 Google LLC
|
|
|
|
|
#
|
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
|
#
|
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
#
|
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
2025-03-28 14:17:06 +00:00
|
|
|
import gzip
|
2025-03-26 14:46:15 +00:00
|
|
|
import math
|
2024-02-09 13:44:47 +00:00
|
|
|
import random
|
|
|
|
|
import string
|
|
|
|
|
from pathlib import Path
|
2025-10-10 15:58:02 +00:00
|
|
|
from typing import Any, Dict, List
|
2025-10-10 15:52:44 +00:00
|
|
|
|
|
|
|
|
import pytest
|
2024-02-09 13:44:47 +00:00
|
|
|
|
|
|
|
|
|
2025-03-25 16:26:52 +00:00
|
|
|
def get_repo_root_dir() -> Path:
|
|
|
|
|
return Path(__file__).parent.parent.parent.resolve()
|
|
|
|
|
|
|
|
|
|
|
2024-02-09 13:44:47 +00:00
|
|
|
def get_tests_data_dir() -> Path:
|
2025-03-25 16:26:52 +00:00
|
|
|
repo_root_dir = get_repo_root_dir()
|
2024-02-09 13:44:47 +00:00
|
|
|
tests_data_dir = repo_root_dir / "tests_data"
|
|
|
|
|
if tests_data_dir.is_dir():
|
|
|
|
|
return tests_data_dir
|
|
|
|
|
tests_data_dir = repo_root_dir.parent / "tests_data"
|
|
|
|
|
assert tests_data_dir.is_dir()
|
|
|
|
|
return tests_data_dir
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_basic_tests_files_dir() -> Path:
|
|
|
|
|
tests_files_dir = get_tests_data_dir() / "basic"
|
|
|
|
|
assert tests_files_dir.is_dir()
|
|
|
|
|
return tests_files_dir
|
|
|
|
|
|
|
|
|
|
|
2024-02-15 13:09:08 +00:00
|
|
|
def get_mitra_tests_files_dir() -> Path:
|
|
|
|
|
tests_files_dir = get_tests_data_dir() / "mitra"
|
|
|
|
|
assert tests_files_dir.is_dir()
|
|
|
|
|
return tests_files_dir
|
|
|
|
|
|
|
|
|
|
|
2024-08-23 14:41:20 +00:00
|
|
|
def get_previously_missdetected_files_dir() -> Path:
|
|
|
|
|
tests_files_dir = get_tests_data_dir() / "previous_missdetections"
|
|
|
|
|
assert tests_files_dir.is_dir()
|
|
|
|
|
return tests_files_dir
|
|
|
|
|
|
|
|
|
|
|
2024-02-09 13:44:47 +00:00
|
|
|
def get_basic_test_files_paths() -> List[Path]:
|
|
|
|
|
tests_files_dir = get_basic_tests_files_dir()
|
2024-07-17 18:30:24 +00:00
|
|
|
test_files_paths = sorted(filter(lambda p: p.is_file(), tests_files_dir.rglob("*")))
|
2024-02-09 13:44:47 +00:00
|
|
|
return test_files_paths
|
|
|
|
|
|
|
|
|
|
|
2024-02-15 13:09:08 +00:00
|
|
|
def get_mitra_test_files_paths() -> List[Path]:
|
|
|
|
|
tests_files_dir = get_mitra_tests_files_dir()
|
2024-07-17 18:30:24 +00:00
|
|
|
test_files_paths = sorted(filter(lambda p: p.is_file(), tests_files_dir.rglob("*")))
|
2024-02-15 13:09:08 +00:00
|
|
|
return test_files_paths
|
|
|
|
|
|
|
|
|
|
|
2024-08-23 14:41:20 +00:00
|
|
|
def get_previously_missdetected_files_paths() -> List[Path]:
|
|
|
|
|
tests_files_dir = get_previously_missdetected_files_dir()
|
|
|
|
|
test_files_paths = sorted(filter(lambda p: p.is_file(), tests_files_dir.rglob("*")))
|
|
|
|
|
return test_files_paths
|
|
|
|
|
|
|
|
|
|
|
2025-03-26 17:24:23 +00:00
|
|
|
def get_reference_features_extraction_examples_path() -> Path:
|
|
|
|
|
return get_tests_data_dir() / "reference" / "features_extraction_examples.json.gz"
|
2025-03-25 12:46:20 +00:00
|
|
|
|
|
|
|
|
|
2025-03-25 20:51:13 +00:00
|
|
|
def get_reference_for_inference_examples_by_path_path(model_name: str) -> Path:
|
2025-03-26 14:46:15 +00:00
|
|
|
return (
|
|
|
|
|
get_tests_data_dir()
|
|
|
|
|
/ "reference"
|
2025-03-26 15:34:53 +00:00
|
|
|
/ f"{model_name}-inference_examples_by_path.json.gz"
|
2025-03-26 14:46:15 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_reference_for_inference_examples_by_content_path(model_name: str) -> Path:
|
|
|
|
|
return (
|
|
|
|
|
get_tests_data_dir()
|
|
|
|
|
/ "reference"
|
2025-03-26 15:34:53 +00:00
|
|
|
/ f"{model_name}-inference_examples_by_content.json.gz"
|
2025-03-26 14:46:15 +00:00
|
|
|
)
|
2025-03-25 16:26:52 +00:00
|
|
|
|
|
|
|
|
|
2024-02-09 13:44:47 +00:00
|
|
|
def get_one_basic_test_file_path() -> Path:
|
|
|
|
|
return get_basic_test_files_paths()[0]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_random_ascii_bytes(size: int) -> bytes:
|
|
|
|
|
return bytes(
|
|
|
|
|
[
|
2025-03-26 14:46:15 +00:00
|
|
|
random.choice(bytes(string.printable[:62].encode("ascii")))
|
2024-02-09 13:44:47 +00:00
|
|
|
for _ in range(size)
|
|
|
|
|
]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2025-03-26 14:46:15 +00:00
|
|
|
def get_random_bytes(size: int) -> bytes:
|
|
|
|
|
return bytes([random.choice(range(256)) for _ in range(size)])
|
|
|
|
|
|
|
|
|
|
|
2024-02-09 13:44:47 +00:00
|
|
|
def get_lines_from_stream(stream: str) -> List[str]:
|
|
|
|
|
candidates = stream.split("\n")
|
|
|
|
|
lines = []
|
|
|
|
|
for line in candidates:
|
|
|
|
|
line = line.strip()
|
|
|
|
|
if line == "":
|
|
|
|
|
continue
|
|
|
|
|
lines.append(line)
|
|
|
|
|
return lines
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_random_file_with_size(sample_path: Path, sample_size: int) -> None:
|
|
|
|
|
print(f"Writing random file at {str(sample_path)} with size {sample_size}")
|
|
|
|
|
assert not sample_path.is_file()
|
|
|
|
|
block_size = 1024 * 1024 * 1024 # 1GB
|
|
|
|
|
with open(sample_path, "wb") as f:
|
|
|
|
|
if sample_size > block_size:
|
|
|
|
|
for _ in range(sample_size // block_size):
|
|
|
|
|
f.write(b"A" * block_size)
|
|
|
|
|
if sample_size % block_size != 0:
|
|
|
|
|
f.write(b"A" * (sample_size % block_size))
|
|
|
|
|
else:
|
|
|
|
|
f.write(b"A" * sample_size)
|
|
|
|
|
print("Random file created")
|
|
|
|
|
|
|
|
|
|
|
2024-09-17 08:44:59 +00:00
|
|
|
def get_models_dir() -> Path:
|
|
|
|
|
return Path(__file__).parent.parent / "src" / "magika" / "models"
|
|
|
|
|
|
|
|
|
|
|
2024-02-09 13:44:47 +00:00
|
|
|
def get_default_model_dir() -> Path:
|
2024-02-12 13:42:04 +00:00
|
|
|
from magika.magika import Magika
|
2024-02-09 13:44:47 +00:00
|
|
|
|
2024-09-20 17:14:11 +00:00
|
|
|
return get_models_dir() / Magika._get_default_model_name()
|
2025-03-26 14:46:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_whitespaces(size: int) -> bytes:
|
|
|
|
|
whitespaces = string.whitespace
|
|
|
|
|
ws_len = len(whitespaces)
|
|
|
|
|
return bytes([ord(whitespaces[idx % ws_len]) for idx in range(size)])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_pattern(size: int, only_printable: bool) -> bytes:
|
|
|
|
|
"""Generate a pattern we can use to test features extraction"""
|
|
|
|
|
|
|
|
|
|
if only_printable:
|
|
|
|
|
chars = string.printable[: 10 + 26 * 2]
|
|
|
|
|
base_pattern = chars.encode("ascii")
|
|
|
|
|
else:
|
|
|
|
|
base_pattern = bytes(range(256))
|
|
|
|
|
|
|
|
|
|
base_pattern_len = len(base_pattern)
|
|
|
|
|
pattern = (base_pattern * int(math.ceil(size / base_pattern_len)))[:size]
|
|
|
|
|
assert len(pattern) == size
|
|
|
|
|
return pattern
|
2025-03-28 14:17:06 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def gzip_compress(content: bytes) -> bytes:
|
|
|
|
|
return gzip.compress(content, mtime=0.0)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def gzip_decompress(content: bytes) -> bytes:
|
|
|
|
|
return gzip.decompress(content)
|
2025-10-10 15:52:44 +00:00
|
|
|
|
|
|
|
|
|
2025-10-10 15:58:02 +00:00
|
|
|
def get_imported_objects_after_wildcard() -> Dict[str, Any]:
|
2025-10-10 15:52:44 +00:00
|
|
|
# Execute 'from magika import *' in a new, clean namespace
|
2025-10-10 15:58:02 +00:00
|
|
|
namespace: Dict[str, Any] = {}
|
2025-10-10 15:52:44 +00:00
|
|
|
try:
|
|
|
|
|
exec("from magika import *", globals(), namespace)
|
|
|
|
|
except ImportError as e:
|
|
|
|
|
pytest.fail(f"Could not import module 'magika': {e}")
|
|
|
|
|
return namespace
|