From 92cd82620715512f7de6196dfc2634cd0e8c2821 Mon Sep 17 00:00:00 2001 From: Benjamin Krull Date: Mon, 16 Jun 2025 11:39:07 +0200 Subject: [PATCH 1/4] fix: comprehensive query param enconding --- sw360/base.py | 17 +++++++------ sw360/components.py | 49 +++++++++++++++++++------------------ sw360/moderationrequests.py | 30 ++++++++++++----------- sw360/packages.py | 39 +++++++++++++++-------------- sw360/project.py | 15 ++++++------ sw360/releases.py | 20 ++++++++------- 6 files changed, 90 insertions(+), 80 deletions(-) diff --git a/sw360/base.py b/sw360/base.py index 33fd6d6..9ce3b50 100644 --- a/sw360/base.py +++ b/sw360/base.py @@ -11,6 +11,8 @@ import requests +from urllib.parse import urlencode + from .sw360error import SW360Error @@ -217,16 +219,17 @@ def _update_external_ids(self, current_data: Dict[str, Any], ext_id_name: str, e update = False return (old_value, ext_id_data, update) - - def _add_param(self, url: str, param: str) -> str: + + def _add_params(self, url: str, params: dict) -> str: """Add the given parameter to the given url""" + + query_string = urlencode(params) + if "?" in url: - url = url + "&" + return f"{url}&{query_string}" else: - url = url + "?" - - return url + param - + return f"{url}?{query_string}" + @classmethod def get_id_from_href(cls, href: str) -> str: """"Extracts the identifier from the href and returns it diff --git a/sw360/components.py b/sw360/components.py index 99b0c31..9bd498e 100644 --- a/sw360/components.py +++ b/sw360/components.py @@ -37,25 +37,25 @@ def get_all_components(self, fields: str = "", page: int = -1, page_size: int = :rtype: list of JSON component objects :raises SW360Error: if there is a negative HTTP response """ - - url = self.url + "resource/api/components" + + fullbase_url = self.url + "resource/api/components" + params = {} if all_details: - url = self._add_param(url, "allDetails=true") + params["allDetails"] = "true" if fields: - url = self._add_param(url, "fields=" + fields) + params["fields"] = fields if page > -1: - url = self._add_param(url, "page=" + str(page)) - url = self._add_param(url, "page_entries=" + str(page_size)) + params["page"] = str(page) + params["page_entries"] = str(page_size) if sort: - # ensure HTML encoding - sort = sort.replace(",", "%2C") - url = self._add_param(url, "sort=" + sort) + params["sort"] = sort - resp = self.api_get(url) + full_url = self._add_params(fullbase_url, params) + resp = self.api_get(full_url) if not resp: return [] @@ -94,18 +94,18 @@ def get_components_by_type( :raises SW360Error: if there is a negative HTTP response """ - url = self.url + "resource/api/components?type=" + component_type + fullbase_url = self.url + "resource/api/components" + params = {"type": component_type} if page > -1: - url = self._add_param(url, "page=" + str(page)) - url = self._add_param(url, "page_entries=" + str(page_size)) + params["page"] = str(page) + params["page_entries"] = str(page_size) if sort: - # ensure HTML encoding - sort = sort.replace(",", "%2C") - url = self._add_param(url, "sort=" + sort) + params["sort"] = sort - resp = self.api_get(url) + full_url = self._add_params(fullbase_url, params) + resp = self.api_get(full_url) if resp and ("_embedded" in resp) and ("sw360:components" in resp["_embedded"]): return resp["_embedded"]["sw360:components"] @@ -165,17 +165,18 @@ def get_component_by_name( :raises SW360Error: if there is a negative HTTP response """ - url = self.url + "resource/api/components?name=" + component_name + fullbase_url = self.url + "resource/api/components?name=" + component_name + params = {"name": component_name} + if page > -1: - url = self._add_param(url, "page=" + str(page)) - url = self._add_param(url, "page_entries=" + str(page_size)) + params["page"] = str(page) + params["page_entries"] = str(page_size) if sort: - # ensure HTML encoding - sort = sort.replace(",", "%2C") - url = self._add_param(url, "sort=" + sort) + params["sort"] = sort - resp = self.api_get(url) + full_url = self._add_params(fullbase_url, params) + resp = self.api_get(full_url) return resp def get_components_by_external_id(self, ext_id_name: str, ext_id_value: str = "") -> List[Dict[str, Any]]: diff --git a/sw360/moderationrequests.py b/sw360/moderationrequests.py index fb66e7e..f8f8625 100644 --- a/sw360/moderationrequests.py +++ b/sw360/moderationrequests.py @@ -30,16 +30,17 @@ def get_all_moderation_requests(self, page: int = -1, page_size: int = -1, :raises SW360Error: if there is a negative HTTP response """ - full_url = self.url + "resource/api/moderationrequest" + fullbase_url = self.url + "resource/api/moderationrequest" + params = {} + if page > -1: - full_url = self._add_param(full_url, "page=" + str(page)) - full_url = self._add_param(full_url, "page_entries=" + str(page_size)) + params["page"] = str(page) + params["page_entries"] = str(page_size) if sort: - # ensure HTML encoding - sort = sort.replace(",", "%2C") - full_url = self._add_param(full_url, "sort=" + sort) + params["sort"] = sort + full_url = self._add_params(fullbase_url, params) resp = self.api_get(full_url) return resp @@ -63,22 +64,23 @@ def get_moderation_requests_by_state(self, state: str, all_details: bool = False :raises SW360Error: if there is a negative HTTP response """ - full_url = self.url + "resource/api/moderationrequest/byState?state=" + state + fullbase_url = self.url + "resource/api/moderationrequest/byState" + params = {"state": state} + if all_details: - full_url = self._add_param(full_url, "allDetails=true") + params["allDetails"] = "true" if page > -1: - full_url = self._add_param(full_url, "page=" + str(page)) - full_url = self._add_param(full_url, "page_entries=" + str(page_size)) + params["page"] = str(page) + params["page_entries"] = str(page_size) if sort: - # ensure HTML encoding - sort = sort.replace(",", "%2C") - full_url = self._add_param(full_url, "sort=" + sort) + params["sort"] = sort + full_url = self._add_params(fullbase_url, params) resp = self.api_get(full_url) return resp - + def get_moderation_request(self, mr_id: str) -> Optional[Dict[str, Any]]: """Get information of about a moderation request diff --git a/sw360/packages.py b/sw360/packages.py index 56f3db5..86996c4 100644 --- a/sw360/packages.py +++ b/sw360/packages.py @@ -65,26 +65,29 @@ def get_all_packages(self, name: str = "", version: str = "", purl: str = "", :rtype: list of JSON package objects :raises SW360Error: if there is a negative HTTP response """ - full_url = self.url + "resource/api/packages" + fullbase_url = self.url + "resource/api/packages" + params = {} + if all_details: - full_url = self._add_param(full_url, "allDetails=true") + params["allDetails"] = "true" if name: - full_url = self._add_param(full_url, "name=" + name) + params["name"] = name + if version: - full_url = self._add_param(full_url, "version=" + version) - if purl: - full_url = self._add_param(full_url, "purl=" + purl) + params["version"] = version + if purl: + params["purl"] = purl + if page > -1: - full_url = self._add_param(full_url, "page=" + str(page)) - full_url = self._add_param(full_url, "page_entries=" + str(page_size)) + params["page"] = str(page) + params["page_entries"] = str(page_size) if sort: - # ensure HTML encoding - sort = sort.replace(",", "%2C") - full_url = self._add_param(full_url, "sort=" + sort) + params["sort"] = sort + full_url = self._add_params(fullbase_url, params) resp = self.api_get(full_url) if page == -1 and resp and ("_embedded" in resp) and ("sw360:packages" in resp["_embedded"]): @@ -110,19 +113,17 @@ def get_packages_by_packagemanager(self, manager: str, page: int = -1, :rtype: list of JSON package objects :raises SW360Error: if there is a negative HTTP response """ - full_url = self.url + "resource/api/packages" - full_url = self._add_param(full_url, "packageManager=" + str(manager)) + fullbase_url = self.url + "resource/api/packages" + params = {"packageManager": manager} if page > -1: - full_url = self._add_param(full_url, "page=" + str(page)) - full_url = self._add_param(full_url, "page_entries=" + str(page_size)) + params["page"] = str(page) + params["page_entries"] = str(page_size) if sort: - # ensure HTML encoding - sort = sort.replace(",", "%2C") - full_url = self._add_param(full_url, "sort=" + sort) + params["sort"] = sort - resp = self.api_get(full_url) + resp = self.api_get(fullbase_url) if page == -1 and resp and ("_embedded" in resp) and ("sw360:packages" in resp["_embedded"]): return resp["_embedded"]["sw360:packages"] diff --git a/sw360/project.py b/sw360/project.py index 491be34..aab8ddb 100644 --- a/sw360/project.py +++ b/sw360/project.py @@ -83,19 +83,20 @@ def get_projects(self, all_details: bool = False, page: int = -1, :raises SW360Error: if there is a negative HTTP response """ - full_url = self.url + "resource/api/projects" + fullbase_url = self.url + "resource/api/projects" + params = {} + if all_details: - full_url = self._add_param(full_url, "allDetails=true") + params["allDetails"] = "true" if page > -1: - full_url = self._add_param(full_url, "page=" + str(page)) - full_url = self._add_param(full_url, "page_entries=" + str(page_size)) + params["page"] = str(page) + params["page_entries"] = str(page_size) if sort: - # ensure HTML encoding - sort = sort.replace(",", "%2C") - full_url = self._add_param(full_url, "sort=" + sort) + params["sort"] = sort + full_url = self._add_params(fullbase_url, params) resp = self.api_get(full_url) return resp diff --git a/sw360/releases.py b/sw360/releases.py index 69c60bb..935f5eb 100644 --- a/sw360/releases.py +++ b/sw360/releases.py @@ -85,24 +85,26 @@ def get_all_releases(self, fields: str = "", all_details: bool = False, :rtype: list of JSON release objects :raises SW360Error: if there is a negative HTTP response """ - full_url = self.url + "resource/api/releases" + fullbase_url = self.url + "resource/api/releases" + params = {} + if all_details: - full_url = self._add_param(full_url, "allDetails=true") + params["allDetails"] = "true" if isNewClearingWithSourceAvailable: - full_url = self._add_param(full_url, "isNewClearingWithSourceAvailable=true") + params["isNewClearingWithSourceAvailable"] = "true" if fields: - full_url = self._add_param(full_url, "fields=" + fields) + params["fields"] = fields if page > -1: - full_url = self._add_param(full_url, "page=" + str(page)) - full_url = self._add_param(full_url, "page_entries=" + str(page_size)) + params["page"] = str(page) + params["page_entries"] = str(page_size) if sort: - # ensure HTML encoding - sort = sort.replace(",", "%2C") - full_url = self._add_param(full_url, "sort=" + sort) + params["sort"] = sort + + full_url = self._add_params(fullbase_url, params) resp = self.api_get(full_url) From a29779b5977d627255783608d2031988478c4e85 Mon Sep 17 00:00:00 2001 From: Benjamin Krull Date: Tue, 17 Jun 2025 15:00:20 +0200 Subject: [PATCH 2/4] fix: duplicate query param --- sw360/components.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sw360/components.py b/sw360/components.py index 9bd498e..177dd9b 100644 --- a/sw360/components.py +++ b/sw360/components.py @@ -165,7 +165,7 @@ def get_component_by_name( :raises SW360Error: if there is a negative HTTP response """ - fullbase_url = self.url + "resource/api/components?name=" + component_name + fullbase_url = self.url + "resource/api/components" params = {"name": component_name} if page > -1: From 4e5c448208603f7f7028fbcaf2caddb7d2da48b4 Mon Sep 17 00:00:00 2001 From: Benjamin Krull Date: Tue, 17 Jun 2025 15:11:27 +0200 Subject: [PATCH 3/4] fix: explicit dict type --- sw360/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sw360/base.py b/sw360/base.py index 9ce3b50..2de4288 100644 --- a/sw360/base.py +++ b/sw360/base.py @@ -220,7 +220,7 @@ def _update_external_ids(self, current_data: Dict[str, Any], ext_id_name: str, e return (old_value, ext_id_data, update) - def _add_params(self, url: str, params: dict) -> str: + def _add_params(self, url: str, params: Dict[str, str]) -> str: """Add the given parameter to the given url""" query_string = urlencode(params) From 61b5ebce0af51df36361a507a4a7b08e89960427 Mon Sep 17 00:00:00 2001 From: Thomas Graf Date: Fri, 20 Jun 2025 08:26:04 +0200 Subject: [PATCH 4/4] fix: apply proper query param encoding everywhere --- sw360/packages.py | 9 +++++---- sw360/vendor.py | 2 +- tests/test_sw360_packages.py | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sw360/packages.py b/sw360/packages.py index 86996c4..a2de045 100644 --- a/sw360/packages.py +++ b/sw360/packages.py @@ -1,5 +1,5 @@ # ------------------------------------------------------------------------------- -# Copyright (c) 2024 Siemens +# Copyright (c) 2024-2025 Siemens # All Rights Reserved. # Authors: thomas.graf@siemens.com # @@ -67,7 +67,7 @@ def get_all_packages(self, name: str = "", version: str = "", purl: str = "", """ fullbase_url = self.url + "resource/api/packages" params = {} - + if all_details: params["allDetails"] = "true" @@ -79,7 +79,7 @@ def get_all_packages(self, name: str = "", version: str = "", purl: str = "", if purl: params["purl"] = purl - + if page > -1: params["page"] = str(page) params["page_entries"] = str(page_size) @@ -123,7 +123,8 @@ def get_packages_by_packagemanager(self, manager: str, page: int = -1, if sort: params["sort"] = sort - resp = self.api_get(fullbase_url) + full_url = self._add_params(fullbase_url, params) + resp = self.api_get(full_url) if page == -1 and resp and ("_embedded" in resp) and ("sw360:packages" in resp["_embedded"]): return resp["_embedded"]["sw360:packages"] diff --git a/sw360/vendor.py b/sw360/vendor.py index 2e51637..30d955d 100644 --- a/sw360/vendor.py +++ b/sw360/vendor.py @@ -52,7 +52,7 @@ def create_new_vendor(self, vendor: Dict[str, Any]) -> Dict[str, Any]: API endpoint: POST /vendors - :param vendor: the new vedor data + :param vendor: the new vendor data :type vendor: JSON vendor object :raises SW360Error: if there is a negative HTTP response """ diff --git a/tests/test_sw360_packages.py b/tests/test_sw360_packages.py index b6b108d..4c067b3 100644 --- a/tests/test_sw360_packages.py +++ b/tests/test_sw360_packages.py @@ -202,7 +202,7 @@ def test_get_packages_by_packagemanager_with_details(self) -> None: adding_headers={"Authorization": "Token " + self.MYTOKEN}, ) - packages = lib.get_packages_by_packagemanager("nuget", page=1, page_size=5, sort="name%2Cdesc") + packages = lib.get_packages_by_packagemanager("nuget", page=1, page_size=5, sort="name,desc") self.assertIsNotNone(packages) self.assertTrue(len(packages) > 0) pkgs = packages["_embedded"]["sw360:packages"] @@ -410,4 +410,4 @@ def test_delete_package_failed(self) -> None: if __name__ == "__main__": APP = Sw360TestPackages() - APP.test_get_packages_by_packagemanager() + APP.test_get_packages_by_packagemanager_with_details()