-
Notifications
You must be signed in to change notification settings - Fork 203
Expand file tree
/
Copy pathkubernetes.ex
More file actions
337 lines (269 loc) · 10.6 KB
/
kubernetes.ex
File metadata and controls
337 lines (269 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
defmodule Cluster.Strategy.Kubernetes do
@moduledoc """
This clustering strategy works by loading all endpoints in the current Kubernetes
namespace with the configured label. It will fetch the addresses of all endpoints with
that label and attempt to connect. It will continually monitor and update its
connections every 5s. Alternatively the IP can be looked up from the pods directly
by setting `kubernetes_ip_lookup_mode` to `:pods`.
In order for your endpoints to be found they should be returned when you run:
kubectl get endpoints -l app=myapp
In order for your pods to be found they should be returned when you run:
kubectl get pods -l app=myapp
It assumes that all nodes share a base name, are using longnames, and are unique
based on their FQDN, rather than the base hostname. In other words, in the following
longname, `<basename>@<domain>`, `basename` would be the value configured in
`kubernetes_node_basename`.
`domain` would be the value configured in `mode` and can be either of type `:ip`
(the pod's ip, can be obtained by setting an env variable to status.podIP), `:hostname`
or `:dns`, which is the pod's internal A Record. This A Record has the format
`<ip-with-dashes>.<namespace>.pod.cluster.local`, e.g.
`1-2-3-4.default.pod.cluster.local`.
Getting `:dns` to work requires setting the `POD_A_RECORD` environment variable before
the application starts. If you use Distillery you can set it in your `pre_configure` hook:
# deployment.yaml
command: ["sh", "-c"]
args: ["POD_A_RECORD"]
args: ["export POD_A_RECORD=$(echo $POD_IP | sed 's/\./-/g') && /app/bin/app foreground"]
# vm.args
-name app@<%= "${POD_A_RECORD}.${NAMESPACE}.pod.cluster.local" %>
To set the `NAMESPACE` and `POD_ID` environment variables you can configure your pod as follows:
# deployment.yaml
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
The benefit of using `:dns` over `:ip` is that you can establish a remote shell (as well as
run observer) by using `kubectl port-forward` in combination with some entries in `/etc/hosts`.
Using `:hostname` is useful when deploying your app to K8S as a stateful set. In this case you can
set your erlang name as the fully qualified domain name of the pod which would be something similar to
`my-app-0.my-service-name.my-namespace.svc.cluster.local`.
e.g.
# vm.args
-name app@<%=`(hostname -f)`%>
In this case you must also set `kubernetes_service_name` to the name of the K8S service that is being queried.
`mode` defaults to `:ip`.
An example configuration is below:
config :libcluster,
topologies: [
k8s_example: [
strategy: #{__MODULE__},
config: [
mode: :ip,
kubernetes_node_basename: "myapp",
kubernetes_selector: "app=myapp",
kubernetes_namespace: "my_namespace",
polling_interval: 10_000]]]
"""
use GenServer
use Cluster.Strategy
import Cluster.Logger
alias Cluster.Strategy.State
@default_polling_interval 5_000
@kubernetes_master "kubernetes.default.svc.cluster.local"
@service_account_path "/var/run/secrets/kubernetes.io/serviceaccount"
def start_link(args), do: GenServer.start_link(__MODULE__, args)
@impl true
def init([%State{meta: nil} = state]) do
init([%State{state | :meta => MapSet.new()}])
end
def init([%State{} = state]) do
{:ok, load(state)}
end
@impl true
def handle_info(:timeout, state) do
handle_info(:load, state)
end
def handle_info(:load, %State{} = state) do
{:noreply, load(state)}
end
def handle_info(_, state) do
{:noreply, state}
end
defp load(%State{topology: topology, meta: meta} = state) do
new_nodelist = MapSet.new(get_nodes(state))
nodes = Node.list()
added =
MapSet.union(
MapSet.difference(new_nodelist, meta),
MapSet.new(Enum.filter(new_nodelist, &(&1 not in nodes)))
)
removed = MapSet.difference(state.meta, new_nodelist)
new_nodelist =
case Cluster.Strategy.disconnect_nodes(
topology,
state.disconnect,
state.list_nodes,
MapSet.to_list(removed)
) do
:ok ->
new_nodelist
{:error, bad_nodes} ->
# Add back the nodes which should have been removed, but which couldn't be for some reason
Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc ->
MapSet.put(acc, n)
end)
end
new_nodelist =
case Cluster.Strategy.connect_nodes(
topology,
state.connect,
state.list_nodes,
MapSet.to_list(added)
) do
:ok ->
new_nodelist
{:error, bad_nodes} ->
# Remove the nodes which should have been added, but couldn't be for some reason
Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc ->
MapSet.delete(acc, n)
end)
end
Process.send_after(self(), :load, polling_interval(state))
%State{state | :meta => new_nodelist}
end
defp polling_interval(%State{config: config}) do
Keyword.get(config, :polling_interval, @default_polling_interval)
end
@spec get_token(String.t()) :: String.t()
defp get_token(service_account_path) do
path = Path.join(service_account_path, "token")
case File.exists?(path) do
true -> path |> File.read!() |> String.trim()
false -> ""
end
end
@spec get_namespace(String.t(), String.t()) :: String.t()
if Mix.env() == :test do
defp get_namespace(_service_account_path, nil), do: "__libcluster_test"
else
defp get_namespace(service_account_path, nil) do
path = Path.join(service_account_path, "namespace")
if File.exists?(path) do
path |> File.read!() |> String.trim()
else
""
end
end
end
defp get_namespace(_, namespace), do: namespace
@spec get_nodes(State.t()) :: [atom()]
defp get_nodes(%State{topology: topology, config: config, meta: meta}) do
service_account_path =
Keyword.get(config, :kubernetes_service_account_path, @service_account_path)
token = get_token(service_account_path)
namespace = get_namespace(service_account_path, Keyword.get(config, :kubernetes_namespace))
app_name = Keyword.fetch!(config, :kubernetes_node_basename)
cluster_name = Keyword.get(config, :kubernetes_cluster_name, "cluster")
service_name = Keyword.get(config, :kubernetes_service_name)
selector = Keyword.fetch!(config, :kubernetes_selector)
ip_lookup_mode = Keyword.get(config, :kubernetes_ip_lookup_mode, :endpoints)
master = Keyword.get(config, :kubernetes_master, @kubernetes_master)
mode = Keyword.get(config, :mode, :ip)
cond do
app_name != nil and selector != nil ->
selector = URI.encode(selector)
path =
case ip_lookup_mode do
:endpoints -> "api/v1/namespaces/#{namespace}/endpoints?labelSelector=#{selector}"
:pods -> "api/v1/namespaces/#{namespace}/pods?labelSelector=#{selector}"
end
headers = [{'authorization', 'Bearer #{token}'}]
http_options = [ssl: [verify: :verify_none], timeout: 15000]
case :httpc.request(:get, {'https://#{master}/#{path}', headers}, http_options, []) do
{:ok, {{_version, 200, _status}, _headers, body}} ->
body
|> Jason.decode!
|> debug_inspect(topology, label: "Kubernetes API", pretty: true, verbose: 5)
|> (&parse_response(ip_lookup_mode, &1)).()
|> debug_inspect(topology, label: "detected nodes", pretty: true, verbose: 3)
|> Enum.map(&format_node(mode, &1, app_name, cluster_name, service_name))
|> debug_inspect(topology, label: "node names", pretty: true, verbose: 2)
{:ok, {{_version, 403, _status}, _headers, body}} ->
%{"message" => msg} = Jason.decode!(body)
warn(topology, "cannot query kubernetes (unauthorized): #{msg}")
[]
{:ok, {{_version, code, status}, _headers, body}} ->
warn(topology, "cannot query kubernetes (#{code} #{status}): #{inspect(body)}")
meta
{:error, reason} ->
error(topology, "request to kubernetes failed!: #{inspect(reason)}")
meta
end
app_name == nil ->
warn(
topology,
"kubernetes strategy is selected, but :kubernetes_node_basename is not configured!"
)
[]
selector == nil ->
warn(
topology,
"kubernetes strategy is selected, but :kubernetes_selector is not configured!"
)
[]
:else ->
warn(topology, "kubernetes strategy is selected, but is not configured!")
[]
end
end
defp parse_response(:endpoints, resp) do
case resp do
%{"items" => items} when is_list(items) ->
Enum.reduce(items, [], fn
%{"subsets" => subsets}, acc when is_list(subsets) ->
addrs =
Enum.flat_map(subsets, fn
%{"addresses" => addresses} when is_list(addresses) ->
Enum.map(addresses, fn %{"ip" => ip, "targetRef" => %{"namespace" => namespace}} =
address ->
%{ip: ip, namespace: namespace, hostname: address["hostname"]}
end)
_ ->
[]
end)
acc ++ addrs
_, acc ->
acc
end)
_ ->
[]
end
end
defp parse_response(:pods, resp) do
case resp do
%{"items" => items} when is_list(items) ->
Enum.map(items, fn
%{
"status" => %{"podIP" => ip},
"metadata" => %{"namespace" => ns}
} ->
%{ip: ip, namespace: ns}
_ ->
nil
end)
|> Enum.filter(&(&1 != nil))
_ ->
[]
end
end
defp format_node(:ip, %{ip: ip}, app_name, _cluster_name, _service_name),
do: :"#{app_name}@#{ip}"
defp format_node(
:hostname,
%{hostname: hostname, namespace: namespace},
app_name,
cluster_name,
service_name
) do
:"#{app_name}@#{hostname}.#{service_name}.#{namespace}.svc.#{cluster_name}.local"
end
defp format_node(:dns, %{ip: ip, namespace: namespace}, app_name, cluster_name, _service_name) do
ip = String.replace(ip, ".", "-")
:"#{app_name}@#{ip}.#{namespace}.pod.#{cluster_name}.local"
end
end