forked from discourse/discourse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcache_critical_dns
executable file
·475 lines (419 loc) · 13.6 KB
/
cache_critical_dns
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
#!/usr/bin/env ruby
# frozen_string_literal: true
require 'optparse'
# cache_critical_dns is intended to be used for performing DNS lookups against
# the services critical for Discourse to run - PostgreSQL and Redis. The
# cache mechanism is storing the resolved host addresses in /etc/hosts. This can
# protect against DNS lookup failures _after_ the resolved addresses have been
# written to /etc/hosts at least once. Example lookup failures may be NXDOMAIN
# or SERVFAIL responses from DNS requests.
#
# Before a resolved address is cached, a protocol-aware healthcheck is
# performed against the host with the authentication details found for that
# service in the process environment. Any hosts that fail the healthcheck will
# never be cached.
#
# This is as far as you need to read if you are using CNAME or A records for
# your services.
#
# The extended behaviour of cache_critical_dns is to add SRV RR lookup support
# for DNS Service Discovery (see http://www.dns-sd.org/). For any of the critical
# service environment variables (see CRITICAL_HOST_ENV_VARS), if a corresponding
# SRV environment variable is found (suffixed with _SRV), cache_critical_dns
# will assume that SRV RRs should exist and will begin to lookup SRV targets
# for resolving the host addresses for caching, and ignore the original service
# name variable. Healthy target addresses are cached against the original service
# environment variable, as the Discourse application expects. For example a
# healthy target found from the SRV lookup for DISCOURSE_DB_HOST_SRV will be
# cached against the name specified by the DISCOURSE_DB_HOST.
#
# Example environment variables for SRV lookups are:
# DISCOURSE_DB_HOST_SRV
# DISCOURSE_DB_REPLICA_HOST_SRV
# DISCOURSE_REDIS_HOST_SRV
# DISCOURSE_REDIS_REPLICA_HOST_SRV
#
# cache_critical_dns will keep an internal record of all names resolved within
# the last 30 minutes. This internal cache is to give a priority order to new
# SRV targets that have appeared during the program runtime (SRV records
# contain zero or more targets, which may appear or disappear at any time).
# If a target has not been seen for more than 30 minutes it will be evicted from
# the internal cache. The internal cache of healthy targets is a fallback for
# when errors occur during DNS lookups.
#
# Targets that are resolved and found healthy usually find themselves in the host
# cache, depending on if they are the newest or not. Targets that are resolved
# but never found healthy will never be cached or even stored in the internal
# cache. Targets that _begin_ healthy and are cached, and _become_ unhealthy
# will only be removed from the host cache if another newer target is resolved
# and found to be healthy. This is because we never write a resolved target to
# the hosts cache unless it is both the newest and healthy. We assume that
# cached hosts are healthy until they are superseded by a newer healthy target.
#
# An SRV RR specifies a priority value for each of the SRV targets that
# are present, ranging from 0 - 65535. When caching SRV records we may want to
# filter out any targets above or below a particular threshold. The LE (less
# than or equal to) and GE (greater than or equal to) environment variables
# (suffixed with _PRIORITY_LE or PRIORITY_GE) for a corresponding SRV variable
# will ignore targets above or below the threshold, respectively.
#
# This mechanism may be used for SRV RRs that share a single name and utilise
# the priority value for signalling to cache_critical_dns which targets are
# relevant to a given name. Any target found outside of the threshold is ignored.
# The host and internal caching behavior are otherwise the same.
#
# Example environment variables for SRV priority thresholds are:
# DISCOURSE_DB_HOST_SRV_PRIORITY_LE
# DISCOURSE_DB_REPLICA_HOST_SRV_PRIORITY_GE
# Specifying this env var ensures ruby can load gems installed via the Discourse
# project Gemfile (e.g. pg, redis).
ENV['BUNDLE_GEMFILE'] ||= '/var/www/discourse/Gemfile'
require 'bundler/setup'
require 'ipaddr'
require 'pg'
require 'redis'
require 'resolv'
require 'socket'
require 'time'
CRITICAL_HOST_ENV_VARS = %w{
DISCOURSE_DB_HOST
DISCOURSE_DB_REPLICA_HOST
DISCOURSE_REDIS_HOST
DISCOURSE_REDIS_SLAVE_HOST
DISCOURSE_REDIS_REPLICA_HOST
}
DEFAULT_DB_NAME = "discourse"
HOST_RESOLVER_CACHE = {}
HOST_HEALTHY_CACHE = {}
HOSTS_PATH = ENV['DISCOURSE_DNS_CACHE_HOSTS_FILE'] || "/etc/hosts"
PrioFilter = Struct.new(:min, :max) do
# min and max must be integers and relate to the minimum or maximum accepted
# priority of an SRV RR target.
# The return value from within_threshold? indicates if the priority is less
# than or equal to the upper threshold, or greater than or equal to the
# lower threshold.
def within_threshold?(p)
p >= min && p <= max
end
end
SRV_PRIORITY_THRESHOLD_MIN = 0
SRV_PRIORITY_THRESHOLD_MAX = 65535
SRV_PRIORITY_FILTERS = Hash.new(
PrioFilter.new(SRV_PRIORITY_THRESHOLD_MIN, SRV_PRIORITY_THRESHOLD_MAX))
REFRESH_SECONDS = 30
module DNSClient
def dns_client_with_timeout
Resolv::DNS.open do |dns_client|
dns_client.timeouts = 2
yield dns_client
end
end
end
class Name
include DNSClient
def initialize(hostname)
@name = hostname
end
def resolve
dns_client_with_timeout do |dns_client|
[].tap do |addresses|
addresses.concat(dns_client.getresources(@name, Resolv::DNS::Resource::IN::A).map(&:address))
addresses.concat(dns_client.getresources(@name, Resolv::DNS::Resource::IN::AAAA).map(&:address))
end.map(&:to_s)
end
end
end
class SRVName
include DNSClient
def initialize(srv_hostname, prio_filter)
@name = srv_hostname
@prio_filter = prio_filter
end
def resolve
dns_client_with_timeout do |dns_client|
[].tap do |addresses|
targets = dns_client.getresources(@name, Resolv::DNS::Resource::IN::SRV)
targets.delete_if { |t| !@prio_filter.within_threshold?(t.priority) }
addresses.concat(targets.map { |t| Name.new(t.target.to_s).resolve }.flatten)
end
end
end
end
CacheMeta = Struct.new(:first_seen, :last_seen)
class ResolverCache
def initialize(name)
# instance of Name|SRVName
@name = name
# {IPv4/IPv6 address: CacheMeta}
@cached = {}
end
# resolve returns a list of resolved addresses ordered by the time first seen,
# most recently seen at the head of the list.
# Addresses last seen more than 30 minutes ago are evicted from the cache on
# a call to resolve().
# If an exception occurs during DNS resolution we return whatever addresses are
# cached.
def resolve
@name.resolve.each do |address|
if @cached[address]
@cached[address].last_seen = Time.now.utc
else
@cached[address] = CacheMeta.new(Time.now.utc, Time.now.utc)
end
end
ensure
@cached = @cached.delete_if { |_, meta| Time.now.utc - 30 * 60 > meta.last_seen }
@cached.sort_by { |_, meta| meta.first_seen }.reverse.map(&:first)
end
end
class HealthyCache
def initialize(resolver_cache, check)
@resolver_cache = resolver_cache # instance of ResolverCache
@check = check # lambda function to perform for health checks
@cached = nil # a single IP address that was most recently found to be healthy
end
def first_healthy
address = @resolver_cache.resolve.lazy.select { |addr| @check.call(addr) }.first
if !nilempty(address).nil?
@cached = address
end
@cached
end
end
def redis_healthcheck(host:, password:)
client_opts = {
host: host,
password: password,
timeout: 1,
}
if !nilempty(ENV['DISCOURSE_REDIS_USE_SSL']).nil? then
client_opts[:ssl] = true
client_opts[:ssl_params] = {
verify_mode: OpenSSL::SSL::VERIFY_NONE,
}
end
client = Redis.new(**client_opts)
response = client.ping
response == "PONG"
rescue
false
ensure
client.close if client
end
def postgres_healthcheck(host:, user:, password:, dbname:)
client = PG::Connection.new(
host: host,
user: user,
password: password,
dbname: dbname,
connect_timeout: 2, # minimum
)
client.exec(';').none?
rescue
false
ensure
client.close if client
end
HEALTH_CHECKS = {
"DISCOURSE_DB_HOST": lambda { |addr|
postgres_healthcheck(
host: addr,
user: ENV["DISCOURSE_DB_USERNAME"] || DEFAULT_DB_NAME,
dbname: ENV["DISCOURSE_DB_NAME"] || DEFAULT_DB_NAME,
password: ENV["DISCOURSE_DB_PASSWORD"])},
"DISCOURSE_DB_REPLICA_HOST": lambda { |addr|
postgres_healthcheck(
host: addr,
user: ENV["DISCOURSE_DB_USERNAME"] || DEFAULT_DB_NAME,
dbname: ENV["DISCOURSE_DB_NAME"] || DEFAULT_DB_NAME,
password: ENV["DISCOURSE_DB_PASSWORD"])},
"DISCOURSE_REDIS_HOST": lambda { |addr|
redis_healthcheck(
host: addr,
password: ENV["DISCOURSE_REDIS_PASSWORD"])},
"DISCOURSE_REDIS_REPLICA_HOST": lambda { |addr|
redis_healthcheck(
host: addr,
password: ENV["DISCOURSE_REDIS_PASSWORD"])},
}
def log(msg)
STDERR.puts "#{Time.now.utc.iso8601}: #{msg}"
end
def error(msg)
log(msg)
end
def swap_address(hosts, name, ips)
new_file = []
hosts.split("\n").each do |line|
line.strip!
if line[0] != '#'
_, hostname = line.split(/\s+/)
next if hostname == name
end
new_file << line
new_file << "\n"
end
ips.each do |ip|
new_file << "#{ip} #{name} # AUTO GENERATED: #{Time.now.utc.iso8601}\n"
end
new_file.join
end
def send_counter(name, description, labels, value)
host = "localhost"
port = ENV.fetch("DISCOURSE_PROMETHEUS_COLLECTOR_PORT", 9405).to_i
if labels
labels = labels.map do |k, v|
"\"#{k}\": \"#{v}\""
end.join(",")
else
labels = ""
end
json = <<~JSON
{
"_type": "Custom",
"type": "Counter",
"name": "#{name}",
"description": "#{description}",
"labels": { #{labels} },
"value": #{value}
}
JSON
payload = +"POST /send-metrics HTTP/1.1\r\n"
payload << "Host: #{host}\r\n"
payload << "Connection: Close\r\n"
payload << "Content-Type: application/json\r\n"
payload << "Content-Length: #{json.bytesize}\r\n"
payload << "\r\n"
payload << json
socket = TCPSocket.new host, port
socket.write payload
socket.flush
result = socket.read
first_line = result.split("\n")[0]
if first_line.strip != "HTTP/1.1 200 OK"
error("Failed to report metric #{result}")
end
socket.close
rescue => e
error("Failed to send metric to Prometheus #{e}")
end
def report_success
send_counter('critical_dns_successes_total', 'critical DNS resolution success', nil, 1)
end
def report_failure(errors)
errors.each do |host, count|
send_counter('critical_dns_failures_total', 'critical DNS resolution failures', host ? { host: host } : nil, count)
end
end
def nilempty(v)
if v.nil?
nil
elsif v.respond_to?(:empty?) && v.empty?
nil
else
v
end
end
def env_srv_var(env_name)
"#{env_name}_SRV"
end
def env_srv_name(env_name)
nilempty(ENV[env_srv_var(env_name)])
end
def run_and_report(hostname_vars)
errors = run(hostname_vars)
if errors.empty?
report_success
else
report_failure(errors)
end
end
def run(hostname_vars)
# HOSTNAME: [IP_ADDRESS, ...]
# this will usually be a single address
resolved = {}
errors = Hash.new(0)
hostname_vars.each do |var|
name = ENV[var]
HOST_RESOLVER_CACHE[var] ||= ResolverCache.new(
if (srv_name = env_srv_name(var))
SRVName.new(srv_name, SRV_PRIORITY_FILTERS[env_srv_var(var)])
else
Name.new(name)
end
)
HOST_HEALTHY_CACHE[var] ||= HealthyCache.new(HOST_RESOLVER_CACHE[var], HEALTH_CHECKS[var.to_sym])
begin
if (address = HOST_HEALTHY_CACHE[var].first_healthy)
resolved[name] = [address]
else
error("#{var}: #{name}: no address")
errors[name] += 1
end
rescue => e
error("#{var}: #{name}: #{e}")
errors[name] += 1
end
end
hosts_content = File.read(HOSTS_PATH)
hosts = Resolv::Hosts.new(HOSTS_PATH)
changed = false
resolved.each do |hostname, ips|
if hosts.getaddresses(hostname).map(&:to_s).sort != ips.sort
log("IP addresses for #{hostname} changed to #{ips}")
hosts_content = swap_address(hosts_content, hostname, ips)
changed = true
end
end
if changed
File.write(HOSTS_PATH, hosts_content)
end
rescue => e
error("DNS lookup failed: #{e}")
errors[nil] = 1
ensure
return errors
end
# If any of the host variables are an explicit IP we will not attempt to cache
# them.
all_hostname_vars = CRITICAL_HOST_ENV_VARS.select do |name|
begin
host = ENV[name]
next if nilempty(host).nil?
IPAddr.new(host)
false
rescue IPAddr::InvalidAddressError, IPAddr::AddressFamilyError
true
end
end
# Populate the SRV_PRIORITY_FILTERS for any name that has a priority present in
# the environment. If no priority thresholds are found for the name, the default
# is that no filtering based on priority will be performed.
CRITICAL_HOST_ENV_VARS.each do |v|
if (name = env_srv_name(v))
max = ENV.fetch("#{env_srv_var(v)}_PRIORITY_LE", SRV_PRIORITY_THRESHOLD_MAX).to_i
min = ENV.fetch("#{env_srv_var(v)}_PRIORITY_GE", SRV_PRIORITY_THRESHOLD_MIN).to_i
if max > SRV_PRIORITY_THRESHOLD_MAX ||
min < SRV_PRIORITY_THRESHOLD_MIN ||
min > max
raise "invalid priority threshold set for #{v}"
end
SRV_PRIORITY_FILTERS[env_srv_var(v)] = PrioFilter.new(min, max)
end
end
options = {
once: false,
}
OptionParser.new do |opts|
opts.on("--once", "run script once instead of indefinitely") do |o|
options[:once] = true
end
end.parse!
if options[:once]
errors = run(all_hostname_vars)
exit errors.empty? ? 0 : 1
end
while true
run_and_report(all_hostname_vars)
sleep REFRESH_SECONDS
end