Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions .github/workflows/tests/common_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to you under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import os
import unittest
from typing import Any

import requests
import urllib3

# Default timeout for HTTP calls to the gateway (self-signed TLS, CI).
KNOX_REQUEST_TIMEOUT = 30

HSTS_HEADER_NAME = "Strict-Transport-Security"
HSTS_EXPECTED_VALUE = "max-age=300; includeSubDomains"

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


def gateway_base_url() -> str:
"""Return KNOX_GATEWAY_URL with a trailing slash."""
url = os.environ.get("KNOX_GATEWAY_URL", "https://localhost:8443/")
return url if url.endswith("/") else (url + "/")


def knox_get(url: str, **kwargs: Any) -> requests.Response:
"""GET against Knox with verify=False and default timeout unless overridden."""
opts: dict[str, Any] = {"verify": False, "timeout": KNOX_REQUEST_TIMEOUT}
opts.update(kwargs)
return requests.get(url, **opts)


def knox_post(url: str, **kwargs: Any) -> requests.Response:
"""POST against Knox with verify=False and default timeout unless overridden."""
opts: dict[str, Any] = {"verify": False, "timeout": KNOX_REQUEST_TIMEOUT}
opts.update(kwargs)
return requests.post(url, **opts)


def collect_actor_group_values(
response: requests.Response, prefix: str = "x-knox-actor-groups"
) -> list[str]:
"""Comma-split values from all response headers whose names start with prefix (case-insensitive)."""
prefix_lower = prefix.lower()
all_groups: list[str] = []
for name in response.headers:
if name.lower().startswith(prefix_lower):
all_groups.extend(response.headers[name].split(","))
return all_groups


def assert_hsts_header(testcase: unittest.TestCase, response: requests.Response) -> None:
testcase.assertIn(HSTS_HEADER_NAME, response.headers)
testcase.assertEqual(response.headers[HSTS_HEADER_NAME], HSTS_EXPECTED_VALUE)
32 changes: 24 additions & 8 deletions .github/workflows/tests/test_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,47 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json
import unittest

import requests
import urllib3

# Suppress InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from common_utils import assert_hsts_header, gateway_base_url, knox_get


class TestKnoxHealth(unittest.TestCase):
def test_admin_api_health(self):
def setUp(self):
self.base_url = gateway_base_url()

def test_health_ping_ok_and_hsts(self):
"""
Basic health check to ensure Knox is up and running.
We expect a response 200 to indicate the server is up.
"""
url = os.environ.get("KNOX_GATEWAY_URL", "https://localhost:8443/")
url = self.base_url + "gateway/health/v1/ping"
print(f"Checking connectivity to {url}...")
try:
response = requests.get(url + "health/v1/ping", verify=False, timeout=30)
response = knox_get(url)
print(f"Received status code: {response.status_code}")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.text.strip(), "OK")

assert_hsts_header(self, response)
except requests.exceptions.ConnectionError:
self.fail("Failed to connect to Knox on port 8443 - Connection refused")
except Exception as e:
self.fail(f"Health check failed with unexpected error: {e}")

def test_health_metrics_returns_json(self):
url = self.base_url + "gateway/health/v1/metrics?pretty=true"
response = knox_get(url)
self.assertEqual(response.status_code, 200)

content_type = response.headers.get("Content-Type", "")
self.assertIn("application/json", content_type)

payload = json.loads(response.text)
self.assertIsInstance(payload, dict)

if __name__ == '__main__':
unittest.main()

43 changes: 13 additions & 30 deletions .github/workflows/tests/test_knox_auth_service_and_LDAP.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import requests
import urllib3
from requests.auth import HTTPBasicAuth

from common_utils import collect_actor_group_values, gateway_base_url, knox_get

########################################################
# This test is verifying the behavior of the Knox Auth Service + LDAP authentication.
# It is using the 'auth/api/v1/pre' endpoint to get the actor ID and group headers.
Expand All @@ -28,14 +27,9 @@
# It is verifying that the actor ID and group headers are not None.
########################################################

# Suppress InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class TestKnoxAuthService(unittest.TestCase):
def setUp(self):
self.base_url = os.environ.get("KNOX_GATEWAY_URL", "https://localhost:8443/")
if not self.base_url.endswith("/"):
self.base_url += "/"
self.base_url = gateway_base_url()
# The topology name is based on the filename knoxldap.xml
self.topology_url = self.base_url + "gateway/knoxldap/auth/api/v1/extauthz"

Expand All @@ -44,11 +38,9 @@ def test_auth_service_guest(self):
Verify that guest user gets the correct actor ID header.
"""
print(f"\nTesting guest authentication against {self.topology_url}")
response = requests.get(
self.topology_url,
response = knox_get(
self.topology_url,
auth=HTTPBasicAuth('guest', 'guest-password'),
verify=False,
timeout=30
)

print(f"Status Code: {response.status_code}")
Expand All @@ -66,11 +58,9 @@ def test_auth_service_admin_groups(self):
Verify that admin user gets actor ID and group headers.
"""
print(f"\nTesting admin authentication against {self.topology_url}")
response = requests.get(
self.topology_url,
response = knox_get(
self.topology_url,
auth=HTTPBasicAuth('admin', 'admin-password'),
verify=False,
timeout=30
)

print(f"Status Code: {response.status_code}")
Expand All @@ -82,22 +72,15 @@ def test_auth_service_admin_groups(self):
self.assertEqual(response.headers[actor_id_header], 'admin')
print(f"Verified {actor_id_header}: {response.headers[actor_id_header]}")

# Check for Group headers
# Config: 'preauth.auth.header.actor.groups.prefix' = 'x-knox-actor-groups'
# We mapped admin to 'longGroupName1,longGroupName2,longGroupName3,longGroupName4'

# We just verify that at least one header starting with the prefix exists
prefix = 'x-knox-actor-groups'
group_headers = [h for h in response.headers.keys() if h.lower().startswith(prefix.lower())]

self.assertTrue(len(group_headers) > 0, f"No headers found starting with {prefix}")

# Verify content of groups
all_groups = []
for h in group_headers:
all_groups.extend(response.headers[h].split(','))
print(f"Found group header {h}: {response.headers[h]}")

all_groups = collect_actor_group_values(response, prefix=prefix)
self.assertTrue(len(all_groups) > 0, f"No headers found starting with {prefix}")
for h in response.headers:
if h.lower().startswith(prefix.lower()):
print(f"Found group header {h}: {response.headers[h]}")

expected_groups = ['longGroupName1', 'longGroupName2', 'longGroupName3', 'longGroupName4']
for group in expected_groups:
self.assertIn(group, all_groups)
Expand Down
22 changes: 6 additions & 16 deletions .github/workflows/tests/test_knox_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,36 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import requests
import urllib3
from requests.auth import HTTPBasicAuth

from common_utils import assert_hsts_header, gateway_base_url, knox_get


########################################################
# This test is verifying the global HSTS headers for 404 response.
# It executes new GET request on non-existent Knox path
# It verifies header is present with the correct value.
########################################################

# Suppress InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class TestKnoxConfigs(unittest.TestCase):
def setUp(self):
self.base_url = os.environ.get("KNOX_GATEWAY_URL", "https://localhost:8443/")
if not self.base_url.endswith("/"):
self.base_url += "/"
self.base_url = gateway_base_url()
self.non_existent_path = self.base_url + "gateway/not-exists"

def test_auth_service_guest(self):
"""
Verifies header is present with the correct value
"""
print(f"\nTesting global HSTS config for 404 response")
response = requests.get(
response = knox_get(
self.non_existent_path,
auth=HTTPBasicAuth('admin', 'admin-password'),
verify=False,
timeout=30
)

print(f"Status Code: {response.status_code}")
self.assertEqual(response.status_code, 404)

hsts_header = 'Strict-Transport-Security'
self.assertIn(hsts_header, response.headers)
self.assertEqual(response.headers[hsts_header], 'max-age=300; includeSubDomains')
print(f"Verified {hsts_header}: {response.headers[hsts_header]}")
assert_hsts_header(self, response)
print(f"Verified Strict-Transport-Security: {response.headers['Strict-Transport-Security']}")

60 changes: 60 additions & 0 deletions .github/workflows/tests/test_knoxauth_preauth_and_paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to you under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest

from requests.auth import HTTPBasicAuth

from common_utils import gateway_base_url, knox_get, knox_post


class TestKnoxAuthServicePreAuthAndPaths(unittest.TestCase):
def setUp(self):
self.base_url = gateway_base_url()
self.preauth_url = self.base_url + "gateway/knoxldap/auth/api/v1/pre"
self.extauthz_url = self.base_url + "gateway/knoxldap/auth/api/v1/extauthz"

def test_preauth_requires_auth(self):
response = knox_get(self.preauth_url)
self.assertEqual(response.status_code, 401)

def test_preauth_bad_credentials_unauthorized(self):
response = knox_get(
self.preauth_url,
auth=HTTPBasicAuth("baduser", "badpass"),
)
self.assertEqual(response.status_code, 401)

def test_preauth_post_supported(self):
response = knox_post(
self.preauth_url,
auth=HTTPBasicAuth("guest", "guest-password"),
)
self.assertEqual(response.status_code, 200)

actor_id_header = "x-knox-actor-username"
self.assertIn(actor_id_header, response.headers)
self.assertEqual(response.headers[actor_id_header], "guest")

def test_extauthz_additional_path_not_ignored_in_knoxldap(self):
response = knox_get(
self.extauthz_url + "/does-not-exist",
auth=HTTPBasicAuth("guest", "guest-password"),
)
self.assertEqual(response.status_code, 404)


if __name__ == "__main__":
unittest.main()

Loading
Loading