Skip to content

Commit 03db094

Browse files
AMBARI-26556: Add a way to encrypted configurations.json file in the ambari-agent's cache
- add unit test - fix unit test and add some comments - add comment for the new value - fix cryptography installation script - use ambari_pyaes instead of downloading a new one - revert install-helper.sh - fix typo - AMBARI-26556: remove salt from config and make it random
1 parent ec1b8d6 commit 03db094

10 files changed

Lines changed: 229 additions & 18 deletions

File tree

ambari-agent/conf/unix/ambari-agent.ini

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ ssl_verify_cert=0
5757
credential_lib_dir=/var/lib/ambari-agent/cred/lib
5858
credential_conf_dir=/var/lib/ambari-agent/cred/conf
5959
credential_shell_cmd=org.apache.hadoop.security.alias.CredentialShell
60+
; agent_secret required if the user wants to enable encryption, and they
61+
; should be changed from the default values for production clusters. After enabling this for the
62+
; first time, the user will also need to clear the cache directory so it can be rebuilt in an encrypted form.
63+
; agent_secret=default-secret-change-me
6064

6165
[network]
6266
; this option apply only for Agent communication
@@ -78,4 +82,4 @@ idle_interval_max=10
7882

7983

8084
[logging]
81-
syslog_enabled=0
85+
syslog_enabled=0

ambari-agent/src/main/python/ambari_agent/AmbariConfig.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,14 @@ def get_ca_cert_file_path(self):
469469
"""
470470
return self.get("security", "ca_cert_path", default="")
471471

472+
def get_agent_secret(self):
473+
"""
474+
Get agent secret used to authenticate with the server.
475+
476+
:return: agent secret string
477+
"""
478+
return self.get('security', 'agent_secret', default="")
479+
472480
@property
473481
def send_alert_changes_only(self):
474482
return bool(self.get("agent", "send_alert_changes_only", "0"))

ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ class ClusterAlertDefinitionsCache(ClusterCache):
3434
differently for every host.
3535
"""
3636

37-
def __init__(self, cluster_cache_dir):
37+
def __init__(self, cluster_cache_dir, secret=None):
3838
"""
3939
Initializes the host level params cache.
4040
:param cluster_cache_dir:
4141
:return:
4242
"""
43-
super(ClusterAlertDefinitionsCache, self).__init__(cluster_cache_dir)
43+
super(ClusterAlertDefinitionsCache, self).__init__(cluster_cache_dir, secret)
4444

4545
def get_alert_definition_index_by_id(self, alert_dict, cluster_id, alert_id):
4646
definitions = alert_dict[cluster_id]["alertDefinitions"]

ambari-agent/src/main/python/ambari_agent/ClusterCache.py

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import os
2323
import threading
2424
from collections import defaultdict
25+
import ambari_pyaes
26+
from ambari_pbkdf2.pbkdf2 import PBKDF2
27+
2528

2629
from ambari_agent.Utils import Utils
2730

@@ -38,14 +41,15 @@ class ClusterCache(dict):
3841

3942
file_locks = defaultdict(threading.RLock)
4043

41-
def __init__(self, cluster_cache_dir):
44+
def __init__(self, cluster_cache_dir, secret=None):
4245
"""
4346
Initializes the cache.
4447
:param cluster_cache_dir:
4548
:return:
4649
"""
4750

4851
self.cluster_cache_dir = cluster_cache_dir
52+
self.secret = secret
4953

5054
self.__current_cache_json_file = os.path.join(
5155
self.cluster_cache_dir, self.get_cache_name() + ".json"
@@ -63,8 +67,10 @@ def __init__(self, cluster_cache_dir):
6367
try:
6468
with self.__file_lock:
6569
if os.path.isfile(self.__current_cache_json_file):
66-
with open(self.__current_cache_json_file, "r") as fp:
67-
cache_dict = json.load(fp)
70+
with open(self.__current_cache_json_file, "rb") as fp: # Note: 'rb' for binary
71+
encrypted_data = fp.read()
72+
decrypted_json = self._decrypt_data(encrypted_data)
73+
cache_dict = json.loads(decrypted_json)
6874

6975
if os.path.isfile(self.__current_cache_hash_file):
7076
with open(self.__current_cache_hash_file, "r") as fp:
@@ -83,6 +89,85 @@ def __init__(self, cluster_cache_dir):
8389
logger.exception(f"Loading saved cache for {self.__class__.__name__} failed")
8490
self.rewrite_cache({}, None)
8591

92+
def encrypt(self, plaintext, encryption_key):
93+
salt = os.urandom(16)
94+
iv = os.urandom(16)
95+
96+
key = PBKDF2(encryption_key, salt, iterations=65536).read(16)
97+
aes = ambari_pyaes.AESModeOfOperationCBC(key, iv=iv)
98+
99+
# ensure bytes
100+
if not isinstance(plaintext, bytes):
101+
plaintext = plaintext.encode()
102+
103+
# PKCS7 pad
104+
padded = ambari_pyaes.util.append_PKCS7_padding(plaintext)
105+
106+
# CBC encrypt block-by-block
107+
ciphertext = b""
108+
for i in range(0, len(padded), 16):
109+
block = padded[i:i + 16]
110+
encrypted_block = aes.encrypt(block) # must be exactly 16 bytes
111+
ciphertext += encrypted_block
112+
113+
inner = "::".join([
114+
salt.hex(),
115+
iv.hex(),
116+
ciphertext.hex()
117+
]).encode()
118+
119+
return f"${{enc=aes128_hex, value={inner.hex()}}}"
120+
121+
def decrypt(self, encrypted_value, encryption_key):
122+
if isinstance(encrypted_value, bytes):
123+
try:
124+
ev_str = encrypted_value.decode()
125+
except Exception:
126+
ev_str = None
127+
else:
128+
ev_str = encrypted_value
129+
130+
if not ev_str or "value=" not in ev_str:
131+
return encrypted_value
132+
133+
enc_text = ev_str.split("value=")[1][:-1]
134+
# salt::iv::ciphertext(hex)
135+
salt_hex, iv_hex, data_hex = (
136+
bytes.fromhex(part)
137+
for part in bytes.fromhex(enc_text).decode().split("::")
138+
)
139+
140+
key = PBKDF2(encryption_key, salt_hex, iterations=65536).read(16)
141+
aes = ambari_pyaes.AESModeOfOperationCBC(key, iv=iv_hex)
142+
143+
data = data_hex
144+
145+
# Decrypt block-by-block (required)
146+
plaintext = b""
147+
for i in range(0, len(data), 16):
148+
block = data[i:i + 16]
149+
plaintext += aes.decrypt(block)
150+
151+
# Remove padding
152+
return ambari_pyaes.util.strip_PKCS7_padding(plaintext)
153+
154+
def _is_encryption_enabled(self):
155+
return not self.secret
156+
157+
def _encrypt_data(self, data):
158+
"""Encrypt string data"""
159+
if self._is_encryption_enabled():
160+
return data
161+
else:
162+
return self.encrypt(data.encode(), self.secret)
163+
164+
def _decrypt_data(self, encrypted_data):
165+
"""Decrypt encrypted bytes to string"""
166+
if self._is_encryption_enabled():
167+
return encrypted_data
168+
else:
169+
return self.decrypt(encrypted_data, self.secret).decode()
170+
86171
def get_cluster_indepedent_data(self):
87172
return self[ClusterCache.COMMON_DATA_CLUSTER]
88173

@@ -141,8 +226,12 @@ def persist_cache(self, cache_hash):
141226
os.makedirs(self.cluster_cache_dir)
142227

143228
with self.__file_lock:
229+
# Encrypt JSON data
230+
json_str = json.dumps(self, indent=2)
231+
encrypted_json = self._encrypt_data(json_str)
232+
144233
with open(self.__current_cache_json_file, "w") as f:
145-
json.dump(self, f, indent=2)
234+
f.write(encrypted_json)
146235

147236
if self.hash is not None:
148237
with open(self.__current_cache_hash_file, "w") as fp:
@@ -173,7 +262,7 @@ def get_cache_name(self):
173262
raise NotImplemented()
174263

175264
def __deepcopy__(self, memo):
176-
return self.__class__(self.cluster_cache_dir)
265+
return self.__class__(self.cluster_cache_dir, self.secret)
177266

178267
def __copy__(self):
179-
return self.__class__(self.cluster_cache_dir)
268+
return self.__class__(self.cluster_cache_dir, self.secret)

ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ class ClusterConfigurationCache(ClusterCache):
3131
configuration properties.
3232
"""
3333

34-
def __init__(self, cluster_cache_dir):
34+
def __init__(self, cluster_cache_dir, secret=None):
3535
"""
3636
Initializes the configuration cache.
3737
:param cluster_cache_dir: directory the changed json are saved
3838
:return:
3939
"""
40-
super(ClusterConfigurationCache, self).__init__(cluster_cache_dir)
40+
super(ClusterConfigurationCache, self).__init__(cluster_cache_dir, secret)
4141

4242
def get_cache_name(self):
4343
return "configurations"

ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ class ClusterHostLevelParamsCache(ClusterCache):
3434
differently for every host.
3535
"""
3636

37-
def __init__(self, cluster_cache_dir):
37+
def __init__(self, cluster_cache_dir, secret=None):
3838
"""
3939
Initializes the host level params cache.
4040
:param cluster_cache_dir:
4141
:return:
4242
"""
43-
super(ClusterHostLevelParamsCache, self).__init__(cluster_cache_dir)
43+
super(ClusterHostLevelParamsCache, self).__init__(cluster_cache_dir, secret)
4444

4545
def get_cache_name(self):
4646
return "host_level_params"

ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def __init__(self, cluster_cache_dir, config):
3838
:return:
3939
"""
4040
self.config = config
41-
super(ClusterMetadataCache, self).__init__(cluster_cache_dir)
41+
super(ClusterMetadataCache, self).__init__(cluster_cache_dir, config.get_agent_secret())
4242

4343
def on_cache_update(self):
4444
try:

ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def __init__(self, cluster_cache_dir, config):
5050
self.cluster_local_components = {}
5151
self.cluster_host_info = None
5252
self.component_version_map = {}
53-
super(ClusterTopologyCache, self).__init__(cluster_cache_dir)
53+
super(ClusterTopologyCache, self).__init__(cluster_cache_dir, config.get_agent_secret())
5454

5555
def get_cache_name(self):
5656
return "topology"

ambari-agent/src/main/python/ambari_agent/InitializerModule.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,16 @@ def init(self):
9090
self.config.cluster_cache_dir, self.config
9191
)
9292
self.host_level_params_cache = ClusterHostLevelParamsCache(
93-
self.config.cluster_cache_dir
93+
self.config.cluster_cache_dir,
94+
self.config.get_agent_secret()
95+
)
96+
self.configurations_cache = ClusterConfigurationCache(
97+
self.config.cluster_cache_dir,
98+
self.config.get_agent_secret()
9499
)
95-
self.configurations_cache = ClusterConfigurationCache(self.config.cluster_cache_dir)
96100
self.alert_definitions_cache = ClusterAlertDefinitionsCache(
97-
self.config.cluster_cache_dir
101+
self.config.cluster_cache_dir,
102+
self.config.get_agent_secret()
98103
)
99104
self.configuration_builder = ConfigurationBuilder(self)
100105
self.stale_alerts_monitor = StaleAlertsMonitor(self)
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
#!/usr/bin/env python3
2+
3+
"""
4+
Licensed to the Apache Software Foundation (ASF) under one
5+
or more contributor license agreements. See the NOTICE file
6+
distributed with this work for additional information
7+
regarding copyright ownership. The ASF licenses this file
8+
to you under the Apache License, Version 2.0 (the
9+
"License"); you may not use this file except in compliance
10+
with the License. You may obtain a copy of the License at
11+
12+
http://www.apache.org/licenses/LICENSE-2.0
13+
14+
Unless required by applicable law or agreed to in writing, software
15+
distributed under the License is distributed on an "AS IS" BASIS,
16+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
See the License for the specific language governing permissions and
18+
limitations under the License.
19+
"""
20+
21+
from ambari_agent import main
22+
23+
main.MEMORY_LEAK_DEBUG_FILEPATH = "/tmp/memory_leak_debug.out"
24+
import os
25+
import tempfile
26+
import shutil
27+
from unittest import TestCase
28+
29+
from ambari_agent.ClusterCache import ClusterCache
30+
from mock.mock import patch, MagicMock
31+
from ambari_commons import OSCheck
32+
from only_for_platform import os_distro_value
33+
34+
35+
class TestClusterCache(TestCase):
36+
"""
37+
Test suite for verifying encryption behavior of ClusterCache.
38+
39+
It covers:
40+
- encryption/decryption round-trip when secret are provided
41+
- behavior when encryption is effectively disabled (no secret)
42+
"""
43+
44+
# so that ClusterCache initialization is OS-agnostic in this test.
45+
@patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
46+
def setUp(self):
47+
# Create a temporary directory that will be cleaned up after each test.
48+
self.tmpdir = tempfile.mkdtemp()
49+
cluster_cache_dir = self.tmpdir + "/cluster_cache_dir"
50+
51+
# Instance with encryption enabled (secret provided).
52+
self.cluster_cache_encrypted = DummyClusterCache(
53+
cluster_cache_dir,
54+
"super_secret"
55+
)
56+
57+
# Instance with encryption disabled (no secret).
58+
self.cluster_cache_unencrypted = DummyClusterCache(cluster_cache_dir)
59+
60+
@patch.object(os, "chmod")
61+
def test_enc(self, chmod_mock):
62+
"""
63+
Verify that:
64+
- encrypted instance changes the data and can restore it back
65+
- unencrypted instance is a no-op for encrypt/decrypt
66+
"""
67+
string_json = '{"a": 1, "b": 2}'
68+
69+
# Encrypted cache should not store raw JSON.
70+
encrypted = self.cluster_cache_encrypted._encrypt_data(string_json)
71+
self.assertNotEqual(string_json, encrypted)
72+
# Round-trip must produce original JSON.
73+
decrypted = self.cluster_cache_encrypted._decrypt_data(encrypted)
74+
self.assertEqual(string_json, decrypted)
75+
76+
# For unencrypted cache, encrypt/decrypt should behave as pass-through.
77+
string_json = '{"a": 1, "b": 2}'
78+
encrypted = self.cluster_cache_unencrypted._encrypt_data(string_json)
79+
self.assertEqual(string_json, encrypted)
80+
decrypted = self.cluster_cache_unencrypted._decrypt_data(encrypted)
81+
self.assertEqual(string_json, decrypted)
82+
83+
@patch.object(os, "chmod")
84+
def test_encryption_enable(self, chmod_mock):
85+
"""
86+
Verify that _is_encryption_enabled reflects whether secret were provided.
87+
"""
88+
# When secret are given, encryption flag should reflect enabled status.
89+
self.assertFalse(self.cluster_cache_encrypted._is_encryption_enabled())
90+
91+
# When no secret, encryption should be reported as disabled.
92+
self.assertTrue(self.cluster_cache_unencrypted._is_encryption_enabled())
93+
94+
def tearDown(self):
95+
shutil.rmtree(self.tmpdir)
96+
97+
class DummyClusterCache(ClusterCache):
98+
"""
99+
Minimal ClusterCache subclass used only for unit testing.
100+
101+
It overrides get_cache_name to avoid depending on real production cache names.
102+
"""
103+
def get_cache_name(self):
104+
# Dummy implementation just for tests.
105+
return "configuration"

0 commit comments

Comments
 (0)