1717
1818import pytest
1919
20- from couchbase .auth import CertificateAuthenticator , PasswordAuthenticator
20+ from couchbase .auth import (CertificateAuthenticator ,
21+ JwtAuthenticator ,
22+ PasswordAuthenticator )
2123from couchbase .cluster import Cluster
2224from couchbase .exceptions import InvalidArgumentException
2325from couchbase .options import ClusterOptions
2426
2527
28+ class JwtAuthenticatorUnitTests :
29+ """Unit tests for JwtAuthenticator that don't require a cluster connection."""
30+
31+ def test_jwt_authenticator_creation (self ):
32+ token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test.signature"
33+ auth = JwtAuthenticator (token )
34+ assert auth .as_dict () == {'jwt_token' : token }
35+
36+ def test_jwt_authenticator_valid_keys (self ):
37+ auth = JwtAuthenticator ("test.jwt.token" )
38+ assert auth .valid_keys () == ['jwt_token' ]
39+
40+ def test_jwt_authenticator_rejects_non_string (self ):
41+ with pytest .raises (InvalidArgumentException ):
42+ JwtAuthenticator (12345 )
43+
44+ def test_jwt_authenticator_rejects_none (self ):
45+ with pytest .raises (InvalidArgumentException ):
46+ JwtAuthenticator (None )
47+
48+
2649class ClassicCredentialsTests :
2750
28- def test_update_credentials_reflected_in_connection_info (self , couchbase_config ):
51+ def test_set_authenticator_reflected_in_connection_info (self , couchbase_config ):
2952 conn_string = couchbase_config .get_connection_string ()
3053 username , pw = couchbase_config .get_username_and_pw ()
3154
@@ -36,22 +59,22 @@ def test_update_credentials_reflected_in_connection_info(self, couchbase_config)
3659 assert 'credentials' in info_before
3760 orig_creds = info_before ['credentials' ]
3861
39- # update to new (likely invalid) creds; we only assert that core origin is updated
62+ # update to new creds; we only assert that core origin is updated
4063 new_user = f"pycbc_{ uuid .uuid4 ().hex [:8 ]} "
4164 new_pass = f"pw_{ uuid .uuid4 ().hex [:8 ]} "
42- cluster .update_credentials (PasswordAuthenticator (new_user , new_pass ))
65+ cluster .set_authenticator (PasswordAuthenticator (new_user , new_pass ))
4366
4467 info_after = cluster ._impl .get_connection_info ()
4568 assert info_after ['credentials' ]['username' ] == new_user
4669 assert info_after ['credentials' ]['password' ] == new_pass
4770
4871 # restore original to avoid impacting other tests
49- cluster .update_credentials (PasswordAuthenticator (orig_creds .get ('username' , username ),
50- orig_creds .get ('password' , pw )))
72+ cluster .set_authenticator (PasswordAuthenticator (orig_creds .get ('username' , username ),
73+ orig_creds .get ('password' , pw )))
5174
5275 cluster .close ()
5376
54- def test_update_to_certificate_auth_without_tls_fails (self , couchbase_config ):
77+ def test_set_authenticator_password_to_certificate_fails (self , couchbase_config ):
5578 conn_string = couchbase_config .get_connection_string ()
5679 username , pw = couchbase_config .get_username_and_pw ()
5780
@@ -60,12 +83,12 @@ def test_update_to_certificate_auth_without_tls_fails(self, couchbase_config):
6083
6184 # Core should reject this at validation step, surfacing as InvalidArgumentException
6285 with pytest .raises (InvalidArgumentException ):
63- cluster .update_credentials (CertificateAuthenticator (cert_path = 'path/to/cert' ,
64- key_path = 'path/to/key' ))
86+ cluster .set_authenticator (CertificateAuthenticator (cert_path = 'path/to/cert' ,
87+ key_path = 'path/to/key' ))
6588
6689 cluster .close ()
6790
68- def test_update_credentials_failure_does_not_change_state (self , couchbase_config ):
91+ def test_set_authenticator_failure_does_not_change_state (self , couchbase_config ):
6992 conn_string = couchbase_config .get_connection_string ()
7093 username , pw = couchbase_config .get_username_and_pw ()
7194
@@ -78,11 +101,24 @@ def test_update_credentials_failure_does_not_change_state(self, couchbase_config
78101
79102 # attempt to switch to certificate auth on non-TLS connection; expect failure
80103 with pytest .raises (InvalidArgumentException ):
81- cluster .update_credentials (CertificateAuthenticator (cert_path = 'path/to/cert' ,
82- key_path = 'path/to/key' ))
104+ cluster .set_authenticator (CertificateAuthenticator (cert_path = 'path/to/cert' ,
105+ key_path = 'path/to/key' ))
83106
84107 # ensure credentials are unchanged after the failed update
85108 info_after = cluster ._impl .get_connection_info ()
86109 assert info_after ['credentials' ] == orig_creds
87110
88111 cluster .close ()
112+
113+ def test_set_authenticator_password_to_jwt_fails (self , couchbase_config ):
114+ """RFC: Cannot switch from PasswordAuthenticator to JwtAuthenticator."""
115+ conn_string = couchbase_config .get_connection_string ()
116+ username , pw = couchbase_config .get_username_and_pw ()
117+
118+ cluster = Cluster .connect (conn_string , ClusterOptions (PasswordAuthenticator (username , pw )))
119+
120+ # RFC: Cannot switch authenticator types
121+ with pytest .raises (InvalidArgumentException ):
122+ cluster .set_authenticator (JwtAuthenticator ("some.jwt.token" ))
123+
124+ cluster .close ()
0 commit comments