diff --git a/salt/modules/cmdmod.py b/salt/modules/cmdmod.py index 2c51c45e0b6..d5e54d24c84 100644 --- a/salt/modules/cmdmod.py +++ b/salt/modules/cmdmod.py @@ -57,7 +57,7 @@ import salt.platform.win from salt.utils.win_functions import escape_argument as _cmd_quote - from salt.utils.win_runas import runas as win_runas + import salt.utils.win_runas as win_runas HAS_WIN_RUNAS = True else: @@ -102,6 +102,24 @@ def _check_cb(cb_): return lambda x: x +def _validate_windows_runas(runas): + """ + Validate the requested runas user on Windows systems. + """ + if not runas or not salt.utils.platform.is_windows(): + return True + if not HAS_WIN_RUNAS: + raise CommandExecutionError("missing salt/utils/win_runas.py") + if not getattr(win_runas, "HAS_WIN32", False): + raise CommandExecutionError("This utility requires pywin32") + try: + win_runas.validate_username(runas, raise_on_error=True) + except CommandExecutionError as exc: + log.debug("runas validation failed for %s: %s", runas, exc) + raise CommandExecutionError(f"Invalid user: {runas}") + return True + + def _python_shell_default(python_shell, __pub_jid): """ Set python_shell default based on the shell parameter and __opts__['cmd_safe'] @@ -788,7 +806,7 @@ def _run( if change_windows_codepage: salt.utils.win_chcp.set_codepage_id(windows_codepage) try: - proc = win_runas(cmd, runas, password, **new_kwargs) + proc = win_runas.runas(cmd, runas, password, **new_kwargs) except (OSError, pywintypes.error) as exc: msg = "Unable to run command '{}' with the context '{}', reason: {}".format( cmd if output_loglevel is not None else "REDACTED", @@ -3023,10 +3041,7 @@ def _cleanup_tempfile(path): win_cwd = False if salt.utils.platform.is_windows() and runas: - # Let's make sure the user exists first - if not __salt__["user.info"](runas): - msg = f"Invalid user: {runas}" - raise CommandExecutionError(msg) + _validate_windows_runas(runas) if cwd is None: # Create a temp working directory cwd = tempfile.mkdtemp(dir=__opts__["cachedir"]) diff --git a/salt/states/cmd.py b/salt/states/cmd.py index e9fb2a2315f..1a018faaccc 100644 --- a/salt/states/cmd.py +++ b/salt/states/cmd.py @@ -364,6 +364,20 @@ def _is_true(val): raise ValueError(f"Failed parsing boolean value: {val}") +def _validate_windows_runas(runas): + if not runas or not salt.utils.platform.is_windows(): + return True, "" + try: + import salt.utils.win_runas as win_runas + except Exception: # pylint: disable=broad-except + return False, "win_runas is not available" + if not getattr(win_runas, "HAS_WIN32", False): + return False, "win_runas is not available" + if not win_runas.validate_username(runas): + return False, f"Invalid user: {runas}" + return True, "" + + def wait( name, cwd=None, @@ -872,6 +886,11 @@ def run( ret["comment"] = "Invalidly-formatted 'env' parameter. See documentation." return ret + valid_runas, runas_comment = _validate_windows_runas(runas) + if not valid_runas: + ret["comment"] = runas_comment + return ret + cmd_kwargs = copy.deepcopy(kwargs) cmd_kwargs.update( { @@ -1158,6 +1177,11 @@ def script( ret["comment"] = "Must supply a password if runas argument is used on Windows." return ret + valid_runas, runas_comment = _validate_windows_runas(runas) + if not valid_runas: + ret["comment"] = runas_comment + return ret + tmpctx = defaults if defaults else {} if context: tmpctx.update(context) diff --git a/salt/utils/win_runas.py b/salt/utils/win_runas.py index ea426e8a51b..5518c9518cf 100644 --- a/salt/utils/win_runas.py +++ b/salt/utils/win_runas.py @@ -81,6 +81,52 @@ def split_username(username): return str(user_name), str(domain) +def _is_upn(username): + """ + Return True when the username is in UPN format (user@domain). + """ + return "@" in username and "\\" not in username + + +def resolve_logon_credentials(username): + """ + Resolve logon credentials for Windows logon APIs. + """ + if not isinstance(username, str): + username = str(username) + is_upn = _is_upn(username) + try: + _, lookup_domain, _ = win32security.LookupAccountName(None, username) + except pywintypes.error as exc: + message = win32api.FormatMessage(exc.winerror).rstrip("\n") + raise CommandExecutionError(message) + user_name, domain_name = split_username(username) + logon_name = username if is_upn else user_name + logon_domain = "" if is_upn else lookup_domain + return { + "input": username, + "user_name": user_name, + "domain_name": domain_name, + "logon_name": logon_name, + "logon_domain": logon_domain, + "lookup_domain": lookup_domain, + "is_upn": is_upn, + } + + +def validate_username(username, raise_on_error=False): + """ + Validate that a username can be resolved on this host. + """ + try: + resolve_logon_credentials(username) + except CommandExecutionError: + if raise_on_error: + raise + return False + return True + + def create_env(user_token, inherit, timeout=1): """ CreateEnvironmentBlock might fail when we close a login session and then @@ -173,13 +219,10 @@ def runas(cmd, username, password=None, **kwargs): # Let's make it a string if it's anything other than a string if not isinstance(username, str): username = str(username) - # Validate the domain and sid exist for the username - try: - _, domain, _ = win32security.LookupAccountName(None, username) - username, _ = split_username(username) - except pywintypes.error as exc: - message = win32api.FormatMessage(exc.winerror).rstrip("\n") - raise CommandExecutionError(message) + resolved = resolve_logon_credentials(username) + domain = resolved["lookup_domain"] + logon_name = resolved["logon_name"] + logon_domain = resolved["logon_domain"] # Elevate the token from the current process access = win32security.TOKEN_QUERY | win32security.TOKEN_ADJUST_PRIVILEGES @@ -216,8 +259,8 @@ def runas(cmd, username, password=None, **kwargs): # Logon as a system level account, SYSTEM, LOCAL SERVICE, or NETWORK # SERVICE. user_token = win32security.LogonUser( - username, - domain, + logon_name, + logon_domain, "", win32con.LOGON32_LOGON_SERVICE, win32con.LOGON32_PROVIDER_DEFAULT, @@ -225,15 +268,15 @@ def runas(cmd, username, password=None, **kwargs): elif password: # Login with a password. user_token = win32security.LogonUser( - username, - domain, + logon_name, + logon_domain, password, win32con.LOGON32_LOGON_INTERACTIVE, win32con.LOGON32_PROVIDER_DEFAULT, ) else: # Login without a password. This always returns an elevated token. - user_token = salt.platform.win.logon_msv1_s4u(username).Token + user_token = salt.platform.win.logon_msv1_s4u(logon_name).Token # Get a linked user token to elevate if needed elevation_type = win32security.GetTokenInformation( @@ -374,13 +417,10 @@ def runas_unpriv(cmd, username, password, **kwargs): # Let's make it a string if it's anything other than a string if not isinstance(username, str): username = str(username) - # Validate the domain and sid exist for the username - try: - _, domain, _ = win32security.LookupAccountName(None, username) - username, _ = split_username(username) - except pywintypes.error as exc: - message = win32api.FormatMessage(exc.winerror).rstrip("\n") - raise CommandExecutionError(message) + resolved = resolve_logon_credentials(username) + username = resolved["user_name"] + logon_name = resolved["logon_name"] + logon_domain = resolved["logon_domain"] # Create inheritable copy of the stdin stdin = salt.platform.win.kernel32.GetStdHandle( @@ -445,8 +485,8 @@ def runas_unpriv(cmd, username, password, **kwargs): try: # Run command and return process info structure process_info = salt.platform.win.CreateProcessWithLogonW( - username=username, - domain=domain, + username=logon_name, + domain=logon_domain, password=password, logonflags=salt.platform.win.LOGON_WITH_PROFILE, applicationname=None, diff --git a/tests/pytests/unit/utils/test_win_runas.py b/tests/pytests/unit/utils/test_win_runas.py index 95443691920..208327b9672 100644 --- a/tests/pytests/unit/utils/test_win_runas.py +++ b/tests/pytests/unit/utils/test_win_runas.py @@ -1,6 +1,7 @@ import pytest from salt.utils import win_runas +from salt.exceptions import CommandExecutionError @pytest.mark.parametrize( @@ -21,3 +22,35 @@ def test_split_username(input_value, expected): """ result = win_runas.split_username(input_value) assert result == expected + + +def test_validate_username_returns_false(monkeypatch): + def _raise(_): + raise CommandExecutionError("lookup failed") + + monkeypatch.setattr(win_runas, "resolve_logon_credentials", _raise) + assert win_runas.validate_username("DOMAIN\\user") is False + + +def test_validate_username_raises(monkeypatch): + def _raise(_): + raise CommandExecutionError("lookup failed") + + monkeypatch.setattr(win_runas, "resolve_logon_credentials", _raise) + with pytest.raises(CommandExecutionError): + win_runas.validate_username("DOMAIN\\user", raise_on_error=True) + + +@pytest.mark.skipif( + not win_runas.HAS_WIN32, reason="win32 libraries are required for this test" +) +def test_resolve_logon_credentials_upn(monkeypatch): + def _lookup(_, username): + return "sid", "DOMAIN", 1 + + monkeypatch.setattr(win_runas.win32security, "LookupAccountName", _lookup) + result = win_runas.resolve_logon_credentials("user@domain.com") + assert result["user_name"] == "user" + assert result["domain_name"] == "domain.com" + assert result["logon_name"] == "user@domain.com" + assert result["logon_domain"] == ""