|
| 1 | +""" |
| 2 | +Tests for ensure_running_as() function in rimport script. |
| 3 | +""" |
| 4 | + |
| 5 | +import os |
| 6 | +import sys |
| 7 | +import importlib.util |
| 8 | +from importlib.machinery import SourceFileLoader |
| 9 | +from unittest.mock import patch, MagicMock |
| 10 | + |
| 11 | +import pytest |
| 12 | + |
| 13 | +# Import rimport module from file without .py extension |
| 14 | +rimport_path = os.path.join( |
| 15 | + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), |
| 16 | + "rimport", |
| 17 | +) |
| 18 | +loader = SourceFileLoader("rimport", rimport_path) |
| 19 | +spec = importlib.util.spec_from_loader("rimport", loader) |
| 20 | +if spec is None: |
| 21 | + raise ImportError(f"Could not create spec for rimport from {rimport_path}") |
| 22 | +rimport = importlib.util.module_from_spec(spec) |
| 23 | +sys.modules["rimport"] = rimport |
| 24 | +loader.exec_module(rimport) |
| 25 | + |
| 26 | + |
| 27 | +class TestEnsureRunningAs: |
| 28 | + """Test suite for ensure_running_as() function.""" |
| 29 | + |
| 30 | + def test_does_nothing_when_already_running_as_target_user(self): |
| 31 | + """Test that function returns normally when already running as target user.""" |
| 32 | + # Get current user |
| 33 | + current_uid = os.geteuid() |
| 34 | + |
| 35 | + # Mock pwd.getpwnam to return current UID |
| 36 | + with patch("pwd.getpwnam") as mock_getpwnam: |
| 37 | + mock_pw = MagicMock() |
| 38 | + mock_pw.pw_uid = current_uid |
| 39 | + mock_getpwnam.return_value = mock_pw |
| 40 | + |
| 41 | + # Mock stdin.isatty and os.execvp to ensure they're not called |
| 42 | + with patch("sys.stdin.isatty") as mock_isatty: |
| 43 | + with patch("os.execvp") as mock_execvp: |
| 44 | + # Should not raise or exec |
| 45 | + rimport.ensure_running_as("testuser", ["rimport", "-file", "test.nc"]) |
| 46 | + |
| 47 | + # Verify stdin.isatty and os.execvp were NOT called |
| 48 | + mock_isatty.assert_not_called() |
| 49 | + mock_execvp.assert_not_called() |
| 50 | + |
| 51 | + def test_execs_sudo_when_different_user_and_interactive(self): |
| 52 | + """Test that function execs sudo when running as different user with TTY.""" |
| 53 | + current_uid = os.geteuid() |
| 54 | + different_uid = current_uid + 1000 |
| 55 | + |
| 56 | + # Mock pwd.getpwnam to return different UID |
| 57 | + with patch("pwd.getpwnam") as mock_getpwnam: |
| 58 | + mock_pw = MagicMock() |
| 59 | + mock_pw.pw_uid = different_uid |
| 60 | + mock_getpwnam.return_value = mock_pw |
| 61 | + |
| 62 | + # Mock stdin.isatty to return True |
| 63 | + with patch("sys.stdin.isatty", return_value=True): |
| 64 | + # Mock os.execvp to prevent actual exec |
| 65 | + with patch("os.execvp") as mock_execvp: |
| 66 | + rimport.ensure_running_as( |
| 67 | + "otheruser", ["rimport", "-file", "test.nc"] |
| 68 | + ) |
| 69 | + |
| 70 | + # Verify execvp was called with correct arguments |
| 71 | + mock_execvp.assert_called_once() |
| 72 | + call_args = mock_execvp.call_args[0] |
| 73 | + assert call_args[0] == "sudo" |
| 74 | + assert call_args[1][0] == "sudo" |
| 75 | + assert call_args[1][1] == "-u" |
| 76 | + assert call_args[1][2] == "otheruser" |
| 77 | + assert call_args[1][3] == "--" |
| 78 | + assert call_args[1][4:] == ["rimport", "-file", "test.nc"] |
| 79 | + |
| 80 | + def test_error_message_for_nonexistent_user(self, capsys): |
| 81 | + """Test that appropriate error message is shown for nonexistent user.""" |
| 82 | + # Mock pwd.getpwnam to raise KeyError |
| 83 | + with patch("pwd.getpwnam", side_effect=KeyError("user not found")): |
| 84 | + with pytest.raises(SystemExit) as exc_info: |
| 85 | + rimport.ensure_running_as("baduser", ["rimport", "-file", "test.nc"]) |
| 86 | + |
| 87 | + assert exc_info.value.code == 2 |
| 88 | + captured = capsys.readouterr() |
| 89 | + assert "baduser" in captured.err |
| 90 | + assert "not found" in captured.err |
| 91 | + |
| 92 | + def test_error_message_for_non_interactive(self, capsys): |
| 93 | + """Test that appropriate error message is shown when not interactive.""" |
| 94 | + current_uid = os.geteuid() |
| 95 | + different_uid = current_uid + 1000 |
| 96 | + |
| 97 | + # Mock pwd.getpwnam to return different UID |
| 98 | + with patch("pwd.getpwnam") as mock_getpwnam: |
| 99 | + mock_pw = MagicMock() |
| 100 | + mock_pw.pw_uid = different_uid |
| 101 | + mock_getpwnam.return_value = mock_pw |
| 102 | + |
| 103 | + # Mock stdin.isatty to return False |
| 104 | + with patch("sys.stdin.isatty", return_value=False): |
| 105 | + with pytest.raises(SystemExit) as exc_info: |
| 106 | + rimport.ensure_running_as( |
| 107 | + "otheruser", ["rimport", "-file", "test.nc"] |
| 108 | + ) |
| 109 | + |
| 110 | + assert exc_info.value.code == 2 |
| 111 | + captured = capsys.readouterr() |
| 112 | + assert "interactive TTY" in captured.err |
| 113 | + assert "2FA" in captured.err |
0 commit comments