Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions AddonCatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class AddonCatalogEntry:
curated: bool = True # Generated by the cache system
sparse_cache: bool = False # Generated by the cache system
relative_cache_path: str = "" # Generated by the cache system
git_hash: Optional[str] = None # Generated by the cache system
git_tag: Optional[str] = None # Generated by the cache system

def __init__(self, raw_data: Dict[str, str]) -> None:
"""Create an AddonDictionaryEntry from the raw JSON data"""
Expand Down Expand Up @@ -340,6 +342,19 @@ def add_metadata_to_entry(
raise RuntimeError(f"Addon {addon_id} index out of range")
self._dictionary[addon_id][index].metadata = metadata

def add_git_info_to_entry(
self, addon_id: str, index: int, commit_hash: str | None, tag: str | None
) -> None:
"""Adds git commit hash and tag to an AddonCatalogEntry"""
entries = self._dictionary.get(addon_id)
if entries is None:
raise RuntimeError(f"Addon {addon_id} does not exist")
if index >= len(entries):
raise RuntimeError(f"Addon {addon_id} index out of range")
entry = entries[index]
entry.git_hash = commit_hash
entry.git_tag = tag

def get_available_branches(self, addon_id: str) -> List[str]:
"""For a given ID, get the list of available branches compatible with this version of
FreeCAD.
Expand Down
110 changes: 98 additions & 12 deletions AddonCatalogCacheCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,24 @@
import shutil
import sys
from dataclasses import is_dataclass, fields
from typing import Any, List, Optional
from typing import Any, List, Optional, Dict, Tuple

import base64
import enum
import hashlib
import io
import json
import os
import re
import requests
import subprocess
from typing import List
import xml.etree.ElementTree
import zipfile

import AddonCatalog
import addonmanager_metadata
import addonmanager_utilities as utils

import addonmanager_icon_utilities as icon_utils

ADDON_CATALOG_URL = "https://raw.githubusercontent.com/FreeCAD/Addons/main/Data/Index.json"
BASE_DIRECTORY = "./CatalogCache"
Expand Down Expand Up @@ -114,6 +114,8 @@ def __init__(self):
else:
self.cwd = os.path.normpath(os.path.join(os.getcwd(), BASE_DIRECTORY))
self._cache = {}
self._sanitize_counter = 0
self._directory_name_cache: Dict[str, str] = {}

def write(self, addon_id: Optional[str] = None) -> None:
original_working_directory = os.getcwd()
Expand Down Expand Up @@ -227,8 +229,35 @@ def create_local_copy_of_single_addon(
continue
metadata = self.generate_cache_entry(addon_id, index, catalog_entry)
self.catalog.add_metadata_to_entry(addon_id, index, metadata)
git_hash, git_tag = self.get_git_info(addon_id, index, catalog_entry)
self.catalog.add_git_info_to_entry(addon_id, index, git_hash, git_tag)
self.create_zip_of_entry(addon_id, index, catalog_entry)

def get_git_info(
self, addon_id: str, index: int, catalog_entry: AddonCatalog.AddonCatalogEntry
) -> Tuple[str | None, str | None]:
"""Get git commit hash and tag if available."""
dirname = self.get_directory_name(addon_id, index, catalog_entry)
if not os.path.exists(os.path.join(self.cwd, dirname, ".git")):
return None, None
repo = os.path.join(self.cwd, dirname)
hash_cmd = ["git", "rev-parse", "HEAD"]
tag_cmd = ["git", "describe", "--tags", "--exact-match"]
results = []
for cmd in (hash_cmd, tag_cmd):
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
cwd=repo,
)
results.append(result.stdout.strip())
except (subprocess.CalledProcessError, OSError, FileNotFoundError):
results.append(None)
return tuple(results)

def generate_cache_entry(
self, addon_id: str, index: int, catalog_entry: AddonCatalog.AddonCatalogEntry
) -> Optional[AddonCatalog.CatalogEntryMetadata]:
Expand All @@ -253,7 +282,7 @@ def generate_cache_entry(
with open(path_to_metadata, "r", encoding="utf-8") as f:
cache_entry.metadata_txt = f.read()

dirname = CacheWriter.get_directory_name(addon_id, index, catalog_entry)
dirname = self.get_directory_name(addon_id, index, catalog_entry)
if os.path.exists(os.path.join(self.cwd, dirname, ".git")):
old_dir = os.getcwd()
os.chdir(os.path.join(self.cwd, dirname))
Expand Down Expand Up @@ -293,17 +322,46 @@ def generate_cache_entry_from_package_xml(
os.path.dirname(path_to_package_xml), relative_icon_path
)
if os.path.exists(absolute_icon_path):
icon_data_is_good = True
with open(absolute_icon_path, "rb") as f:
icon_data = None
try:
cache_entry.icon_data = base64.b64encode(f.read()).decode("utf-8")
icon_data = f.read()
except IOError as e:
print(f"ERROR: IO Error while reading icon file {absolute_icon_path}")
print(e)
icon_data_is_good = False
except Exception as e:
print(f"ERROR: Unknown error while reading icon file {absolute_icon_path}")
print(e)
icon_data_is_good = False
if icon_data is not None:
if absolute_icon_path.lower().endswith(".svg"):
try:
if not icon_utils.is_svg_bytes(icon_data):
self.icon_errors[metadata.name] = {
"valid_icon_path": relative_icon_path,
"error_message": "SVG file does not have valid XML header",
}
icon_data_is_good = False
except icon_utils.BadIconData as e:
self.icon_errors[metadata.name] = {
"valid_icon_path": relative_icon_path,
"error_message": str(e),
}
icon_data_is_good = False
elif absolute_icon_path.lower().endswith(".png"):
if icon_utils.png_has_duplicate_iccp(icon_data):
self.icon_errors[metadata.name] = {
"valid_icon_path": relative_icon_path,
"error_message": "PNG data has duplicate iCCP chunk",
}
icon_data_is_good = False

if icon_data_is_good:
cache_entry.icon_data = base64.b64encode(icon_data).decode("utf-8")
else:
self.icon_errors[metadata.name] = relative_icon_path
self.icon_errors[metadata.name] = {"bad_icon_path": relative_icon_path}
print(f"ERROR: Could not find icon file {absolute_icon_path}")
return cache_entry

Expand Down Expand Up @@ -334,16 +392,44 @@ def create_local_copy_of_single_addon_with_git_sparse(
print(f"ERROR: Failed to clone or update {addon_id} from {catalog_entry.repository}.")
print(f"ERROR: {e}")

@staticmethod
def get_directory_name(addon_id, index, catalog_entry):
def sanitize_directory_name(self, expected_name: str) -> str:
"""Take a string and return a sanitized version suitable for use as a directory name."""
if expected_name in self._directory_name_cache:
return self._directory_name_cache[expected_name]
self._sanitize_counter += 1
forbidden_chars = r'<>:"|?*'
if os.path.sep == "/":
forbidden_chars += "\\\\"
else:
forbidden_chars += "/"
sanitized = re.sub(f"[{forbidden_chars}]", str(self._sanitize_counter), expected_name)
sanitized = sanitized.rstrip(" .")
reserved = {
"con",
"prn",
"aux",
"nul",
*(f"com{i}" for i in range(1, 10)),
*(f"lpt{i}" for i in range(1, 10)),
}
components = sanitized.split(os.path.sep)
for i, comp in enumerate(components):
if comp.lower() in reserved:
components[i] = comp + "-RES"
sanitized = os.path.sep.join(components)

self._directory_name_cache[expected_name] = sanitized
return sanitized

def get_directory_name(self, addon_id, index, catalog_entry):
expected_name = os.path.join(addon_id, str(index) + "-")
if catalog_entry.branch_display_name:
expected_name += catalog_entry.branch_display_name.replace("/", "-")
expected_name += catalog_entry.branch_display_name
elif catalog_entry.git_ref:
expected_name += catalog_entry.git_ref.replace("/", "-")
expected_name += catalog_entry.git_ref
Comment on lines 426 to +429
else:
expected_name += "unknown-branch-name"
return expected_name
return self.sanitize_directory_name(expected_name)

def create_local_copy_of_single_addon_with_zip(
self, addon_id: str, index: int, catalog_entry: AddonCatalog.AddonCatalogEntry
Expand Down Expand Up @@ -562,7 +648,7 @@ def create_zip_of_entry(
zip file is written to a file with the same name as the calculated addon cache directory
in the current working directory."""

dirname = CacheWriter.get_directory_name(addon_id, index, catalog_entry)
dirname = self.get_directory_name(addon_id, index, catalog_entry)
start_dir = os.path.join(self.cwd, dirname)
zip_file_path = os.path.join(self.cwd, f"{dirname}.zip")
temp_file_path = zip_file_path + ".new"
Expand Down
20 changes: 12 additions & 8 deletions AddonManagerTest/app/test_addon_catalog_cache_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,32 @@ def setUp(self):

def test_get_directory_name_with_branch_name(self):
"""If a branch display name is present, that should be appended to the name."""
writer = accc.CacheWriter()
ace = AddonCatalog.AddonCatalogEntry({"branch_display_name": "test_branch"})
result = accc.CacheWriter.get_directory_name("test_addon", 99, ace)
result = writer.get_directory_name("test_addon", 99, ace)
self.assertEqual(result, os.path.join("test_addon", "99-test_branch"))

def test_get_directory_name_with_git_ref(self):
"""If a branch display name is present, that should be appended to the name."""
writer = accc.CacheWriter()
ace = AddonCatalog.AddonCatalogEntry({"git_ref": "test_ref"})
result = accc.CacheWriter.get_directory_name("test_addon", 99, ace)
result = writer.get_directory_name("test_addon", 99, ace)
self.assertEqual(result, os.path.join("test_addon", "99-test_ref"))

def test_get_directory_name_with_branch_and_ref(self):
"""If a branch and git ref are both present, then the branch display name is used."""
writer = accc.CacheWriter()
ace = AddonCatalog.AddonCatalogEntry(
{"branch_display_name": "test_branch", "git_ref": "test_ref"}
)
result = accc.CacheWriter.get_directory_name("test_addon", 99, ace)
result = writer.get_directory_name("test_addon", 99, ace)
self.assertEqual(result, os.path.join("test_addon", "99-test_branch"))

def test_get_directory_name_with_no_information(self):
"""If there is no branch name or git ref information, a valid directory name is still generated."""
writer = accc.CacheWriter()
ace = AddonCatalog.AddonCatalogEntry({})
result = accc.CacheWriter.get_directory_name("test_addon", 99, ace)
result = writer.get_directory_name("test_addon", 99, ace)
self.assertTrue(result.startswith(os.path.join("test_addon", "99")))

def test_find_file_with_existing_file(self):
Expand Down Expand Up @@ -189,23 +193,23 @@ def test_generate_cache_entry_from_package_xml(self, _):
@patch("AddonCatalogCacheCreator.addonmanager_metadata.MetadataReader.from_bytes")
@patch("AddonCatalogCacheCreator.CacheWriter.get_icon_from_metadata")
def test_generate_cache_entry_from_package_xml_with_icon(self, mock_icon, _):
"""Given a metadata file that contains an icon, that icon's contents are
base64-encoded and embedded in the cache."""
"""Given a metadata file that contains an icon, that icon's contents are base64-encoded and embedded in the cache."""

file_path = os.path.abspath(
os.path.join("home", "cache", "TestMod", "1-main", "package.xml")
)
icon_path = os.path.abspath(
os.path.join("home", "cache", "TestMod", "1-main", "icons", "TestMod.svg")
)
icon_data = "<svg xmlns='http://www.w3.org/2000/svg'/>"
self.fake_fs().create_file(file_path, contents="test data")
self.fake_fs().create_file(icon_path, contents="test icon data")
self.fake_fs().create_file(icon_path, contents=icon_data)
mock_icon.return_value = os.path.join("icons", "TestMod.svg")
writer = accc.CacheWriter()
writer.cwd = os.path.abspath(os.path.join("home", "cache"))
cache_entry = writer.generate_cache_entry_from_package_xml(file_path)
self.assertEqual(
base64.b64encode("test icon data".encode("utf-8")).decode("utf-8"),
base64.b64encode(icon_data.encode("utf-8")).decode("utf-8"),
cache_entry.icon_data,
)

Expand Down
21 changes: 8 additions & 13 deletions AddonManagerTest/app/test_freecad_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,39 +75,34 @@ def test_log_no_freecad(self):
"""Test that if the FreeCAD import fails, the logger is set up correctly, and
implements PrintLog"""
sys.modules["FreeCAD"] = None
with patch("addonmanager_freecad_interface.logging", new=MagicMock()) as mock_logging:
import addonmanager_freecad_interface as fc
import addonmanager_freecad_interface as fc

with self.assertLogs("addonmanager", level="DEBUG") as cm:
fc.Console.PrintLog("Test output")
self.assertTrue(isinstance(fc.Console, fc.ConsoleReplacement))
self.assertTrue(mock_logging.log.called)

def test_message_no_freecad(self):
"""Test that if the FreeCAD import fails the logger implements PrintMessage"""
sys.modules["FreeCAD"] = None
with patch("addonmanager_freecad_interface.logging", new=MagicMock()) as mock_logging:
import addonmanager_freecad_interface as fc
import addonmanager_freecad_interface as fc

with self.assertLogs("addonmanager", level="INFO") as cm:
fc.Console.PrintMessage("Test output")
self.assertTrue(mock_logging.info.called)

def test_warning_no_freecad(self):
"""Test that if the FreeCAD import fails the logger implements PrintWarning"""
sys.modules["FreeCAD"] = None
with patch("addonmanager_freecad_interface.logging", new=MagicMock()) as mock_logging:
import addonmanager_freecad_interface as fc
import addonmanager_freecad_interface as fc

with self.assertLogs("addonmanager", level="WARNING") as cm:
fc.Console.PrintWarning("Test output")
self.assertTrue(mock_logging.warning.called)

def test_error_no_freecad(self):
"""Test that if the FreeCAD import fails the logger implements PrintError"""
sys.modules["FreeCAD"] = None
with patch("addonmanager_freecad_interface.logging", new=MagicMock()) as mock_logging:
import addonmanager_freecad_interface as fc
import addonmanager_freecad_interface as fc

with self.assertLogs("addonmanager", level="ERROR") as cm:
fc.Console.PrintError("Test output")
self.assertTrue(mock_logging.error.called)


class TestParameters(WrapTestFreeCADImports):
Expand Down
28 changes: 28 additions & 0 deletions AddonManagerTest/app/test_macro_cache_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from unittest.mock import patch, MagicMock

from MacroCacheCreator import MacroCatalog, CacheWriter
import addonmanager_freecad_interface as fci


class TestMacroCatalog(unittest.TestCase):
Expand Down Expand Up @@ -84,3 +85,30 @@ def test_retrieve_macros_fetch_failure(self, mock_console, mock_add_macro, mock_
instance.retrieve_macros_from_wiki()

mock_add_macro.assert_not_called()

def test_log_error(self):
instance = MacroCatalog()
instance.log_error("macro", "Test error")
instance.log_error("macro", "Test error 2")
self.assertIn("macro", instance.macro_errors)
self.assertEqual(len(instance.macro_errors["macro"]), 2)

def test_fetch_macros_logs_errors(self):

def fake_git(self):
fci.Console.PrintError("git failure")

def fake_wiki(self):
fci.Console.PrintWarning("wiki failure")

instance = MacroCatalog()
with patch.object(type(instance), "retrieve_macros_from_git", fake_git), patch.object(
type(instance), "retrieve_macros_from_wiki", fake_wiki
):
instance.fetch_macros()

messages = [record["msg"] for record in instance.log_buffer]

self.assertIn("git failure", messages)
self.assertIn("wiki failure", messages)
self.assertEqual(len(messages), 2)
Loading
Loading