Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions awscli/customizations/configure/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class SectionNotFoundError(Exception):
pass


class SubsectionNotFoundError(Exception):
pass


def mask_value(current_value):
if current_value is None:
return 'None'
Expand Down
96 changes: 90 additions & 6 deletions awscli/customizations/configure/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
import sys

from awscli.customizations.commands import BasicCommand
from awscli.customizations.utils import validate_mutually_exclusive

from . import PREDEFINED_SECTION_NAMES
from . import (
PREDEFINED_SECTION_NAMES,
SUBSECTION_TYPE_ALLOWLIST,
SubsectionNotFoundError,
)

LOG = logging.getLogger(__name__)

Expand All @@ -35,29 +40,80 @@ class ConfigureGetCommand(BasicCommand):
'cli_type_name': 'string',
'positional_arg': True,
},
{
'name': 'sso-session',
'help_text': 'The name of the sub-section from which to retrieve a value.',
'action': 'store',
'cli_type_name': 'string',
},
{
'name': 'services',
'help_text': 'The name of the sub-section from which to retrieve a value.',
'action': 'store',
'cli_type_name': 'string',
},
]

def __init__(self, session, stream=None, error_stream=None):
super(ConfigureGetCommand, self).__init__(session)
super().__init__(session)
if stream is None:
stream = sys.stdout
if error_stream is None:
error_stream = sys.stderr
self._stream = stream
self._error_stream = error_stream

def _subsection_parameter_to_argument_name(self, parameter_name):
return parameter_name.replace("-", "_")

def _get_subsection_config_name_from_args(self, args):
# Validate mutual exclusivity of sub-section type parameters
groups = [
[self._subsection_parameter_to_argument_name(key)]
for key in SUBSECTION_TYPE_ALLOWLIST.keys()
]
validate_mutually_exclusive(args, *groups)

subsection_name = None
subsection_config_name = None

for (
section_type,
section_properties,
) in SUBSECTION_TYPE_ALLOWLIST.items():
cli_parameter_name = self._subsection_parameter_to_argument_name(
section_type
)

if hasattr(args, cli_parameter_name):
subsection_name = getattr(args, cli_parameter_name)
if subsection_name is not None:
subsection_config_name = section_properties[
'full_config_name'
]
break

return (subsection_config_name, subsection_name)

def _run_main(self, args, parsed_globals):
varname = args.varname
section_type, section_name = self._get_subsection_config_name_from_args(args)

if '.' not in varname:
if section_type:
value = self._get_subsection_property(
section_type.replace("-", "_"),
section_name,
varname
)
elif '.' not in varname:
# get_scoped_config() returns the config variables in the config
# file (not the logical_var names), which is what we want.
config = self._session.get_scoped_config()
value = config.get(varname)
else:
value = self._get_dotted_config_value(varname)

LOG.debug('Config value retrieved: %s' % value)
LOG.debug(f'Config value retrieved: {value}')

if isinstance(value, str):
self._stream.write(value)
Expand All @@ -67,8 +123,8 @@ def _run_main(self, args, parsed_globals):
# TODO: add support for this. We would need to print it off in
# the same format as the config file.
self._error_stream.write(
'varname (%s) must reference a value, not a section or '
'sub-section.' % varname
f'varname ({varname}) must reference a value, not a section '
'or sub-section.'
)
return 1
else:
Expand Down Expand Up @@ -121,3 +177,31 @@ def _get_dotted_config_value(self, varname):
except AttributeError:
value = None
return value

def _get_subsection_property(self, section_type, section_name, varname):
full_config = self._session.full_config

if section_type not in full_config:
raise SubsectionNotFoundError(
f"The config sub-section ({section_name}) could not be found"
)

section_type_config = full_config[section_type]
section_config = section_type_config.get(section_name, None)

if section_config is None:
raise SubsectionNotFoundError(
f"The config sub-section ({section_name}) could not be found"
)

# Handle nested properties
if '.' in varname:
parts = varname.split('.')
if len(parts) == 2:
value = section_config.get(parts[0])
if isinstance(value, dict):
return value.get(parts[1])
else:
return None
else:
return section_config.get(varname)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aws configure get --sso-session non-existent sso_region        

'NoneType' object has no attribute 'get'

Can we throw a nicer error when the sub-section doesn't exist? And expand the test for this case to check more than exit code?

Something like we do for profiles?

aws configure get region --profile non-existent

The config profile (non-existent) could not be found

Copy link
Member Author

@kdaily kdaily Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 139528d

27 changes: 27 additions & 0 deletions awscli/examples/configure/get/_examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,23 @@ Suppose you had the following config file::
aws_secret_access_key=testing_secret_key
region=us-west-2

[profile my-dev-profile]
services = my-services
sso_session = my-sso
sso_account_id = 123456789011
sso_role_name = readOnly
region = us-west-2
output = json

[sso-session my-sso]
sso_region = us-east-1
sso_start_url = https://my-sso-portal.awsapps.com/start
sso_registration_scopes = sso:account:access

[services my-services]
ec2 =
endpoint_url = http://localhost:4567/

The following commands would have the corresponding output::

$ aws configure get aws_access_key_id
Expand All @@ -27,3 +44,13 @@ The following commands would have the corresponding output::
$
$ echo $?
1

$ aws configure get --profile my-dev-profile sso_account_id
123456789011

$ aws configure get --sso-session my-sso-session sso_region
us-east-1

$ aws configure get --services my-services ec2.endpoint_url
http://localhost:4567

70 changes: 70 additions & 0 deletions tests/functional/configure/test_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ def test_get_command_with_profile_set(self):
)
self.assertEqual(stdout.strip(), "testing_access_key")

def test_get_command_with_nonexisting_profile(self):
self.set_config_file_contents(
"\n"
"[default]\n"
"aws_access_key_id=default_access_key\n"
"\n"
)
_, stderr, _ = self.run_cmd(
"configure get --profile doesntexist sso_region",
expected_rc=255,
)
self.assertIn("could not be found", stderr)

def test_get_with_fq_name(self):
# test get configs with fully qualified name.
self.set_config_file_contents(
Expand Down Expand Up @@ -129,6 +142,63 @@ def test_get_fq_with_quoted_profile_name(self):
)
self.assertEqual(stdout.strip(), "testing_access_key")

def test_get_command_with_subsection(self):
self.set_config_file_contents(
"\n"
"[default]\n"
"aws_access_key_id=default_access_key\n"
"\n"
"[sso-session my-sso-session]\n"
"sso_region = us-west-2\n"
)
stdout, _, _ = self.run_cmd(
"configure get --sso-session my-sso-session sso_region",
)
self.assertEqual(stdout.strip(), "us-west-2")

def test_get_command_with_subsection_nested_property(self):
self.set_config_file_contents(
"\n"
"[default]\n"
"aws_access_key_id=default_access_key\n"
"\n"
"[services my-services]\n"
"ec2 = \n"
" endpoint_url = http://localhost:4567/"
)
stdout, _, _ = self.run_cmd(
"configure get --services my-services ec2.endpoint_url",
)
self.assertEqual(stdout.strip(), "http://localhost:4567/")

def test_get_command_with_nonexisting_subsection(self):
self.set_config_file_contents(
"\n"
"[default]\n"
"aws_access_key_id=default_access_key\n"
"\n"
)
_, stderr, _ = self.run_cmd(
"configure get --sso-session my-sso-session sso_region",
expected_rc=255,
)
self.assertIn("could not be found", stderr)

def test_get_command_with_subsection_set_nonexisting_subsection(self):
self.set_config_file_contents(
"\n"
"[default]\n"
"aws_access_key_id=default_access_key\n"
"\n"
"[sso-session my-sso-session]\n"
"sso_region = us-west-2\n"
)
_, stderr, _ = self.run_cmd(
"configure get --services my-services ec2.endpoint_url",
expected_rc=255,
)
self.assertIn("could not be found", stderr)

def test_set_with_config_file_no_exist(self):
self.run_cmd("configure set region us-west-1")
self.assertEqual(
Expand Down
60 changes: 60 additions & 0 deletions tests/unit/customizations/configure/test_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,63 @@ def test_get_non_string_returns_error(self):
self.assertEqual(rc, 1)
self.assertEqual(stream.getvalue(), '')
self.assertEqual(error_stream.getvalue(), '')


class TestConfigureGetCommandSubSections(unittest.TestCase):
def create_command(self, session):
stdout = StringIO()
stderr = StringIO()
command = ConfigureGetCommand(session, stdout, stderr)
return stdout, stderr, command

def test_get_simple_property(self):
session = FakeSession({})
session.full_config = {
'sso_sessions': {
'my-session': {
'sso_region': 'us-west-2',
'sso_start_url': 'https://example.awsapps.com/start'
}
}
}
stream, error_stream, config_get = self.create_command(session)
rc = config_get(args=['--sso-session', 'my-session', 'sso_region'],
parsed_globals=None)
rendered = stream.getvalue()
self.assertEqual(rendered.strip(), 'us-west-2')
self.assertEqual(rc, 0)


def test_get_nested_property(self):
session = FakeSession({})
session.full_config = {
'services': {
'my-services': {
's3': {
'endpoint_url': 'http://localhost:4566',
}
}
}
}
stream, error_stream, config_get = self.create_command(session)
rc = config_get(args=['--services', 'my-services', 's3.endpoint_url'],
parsed_globals=None)
rendered = stream.getvalue()
self.assertEqual(rendered.strip(), 'http://localhost:4566')
self.assertEqual(rc, 0)

def test_non_existent_property_returns_exit_code_1(self):
session = FakeSession({})
session.full_config = {
'sso_sessions': {
'my-session': {
'sso_region': 'us-west-2'
}
}
}
stream, _, config_get = self.create_command(session)
rc = config_get(args=['--sso-session', 'my-session', 'non_existent_property'],
parsed_globals=None)

self.assertEqual(rc, 1)
self.assertEqual(stream.getvalue(), '')
Loading