diff --git a/README.md b/README.md index 3fc0839..48a8b43 100644 --- a/README.md +++ b/README.md @@ -770,6 +770,7 @@ found in `lib/influxdb/config.rb` for the source of truth. | | `:open_timeout` | 5 | socket timeout | | `:read_timeout` | 300 | socket timeout | | `:auth_method` | "params" | "params", "basic_auth" or "none" +| | `:persistent` | true | set to false to disable persistent connections | Retry | `:retry` | -1 | max. number of retry attempts (reading and writing) | | `:initial_delay` | 0.01 | initial wait time (doubles every retry attempt) | | `:max_delay` | 30 | max. wait time when retrying diff --git a/lib/influxdb/client.rb b/lib/influxdb/client.rb index 18ea2be..b5e2e5f 100644 --- a/lib/influxdb/client.rb +++ b/lib/influxdb/client.rb @@ -49,9 +49,12 @@ class Client # +:ssl_ca_cert+:: ssl CA certificate, chainfile or CA path. # The system CA path is automatically included # +:retry+:: number of times a failed request should be retried. Defaults to infinite. - def initialize(database = nil, **opts) + def initialize(database = nil, config = nil, **opts) + raise ArgumentError, "provide config or opts, not both" if + config && !opts.empty? + opts[:database] = database if database.is_a? String - @config = InfluxDB::Config.new(**opts) + @config = config || InfluxDB::Config.new(**opts) @stopped = false @writer = find_writer @@ -82,7 +85,9 @@ def now def find_writer if config.async? - InfluxDB::Writer::Async.new(self, config.async) + client = InfluxDB::Client.new nil, config.writer_config + + InfluxDB::Writer::Async.new(client, config.async) elsif config.udp.is_a?(Hash) InfluxDB::Writer::UDP.new(self, **config.udp) elsif config.udp? diff --git a/lib/influxdb/client/http.rb b/lib/influxdb/client/http.rb index c4a7cf1..a9ddcb7 100644 --- a/lib/influxdb/client/http.rb +++ b/lib/influxdb/client/http.rb @@ -5,6 +5,7 @@ module InfluxDB # rubocop:disable Metrics/MethodLength + # rubocop:disable Metrics/ModuleLength # rubocop:disable Metrics/AbcSize module HTTP # :nodoc: def get(url, options = {}) @@ -40,31 +41,34 @@ def post(url, data) private def connect_with_retry - host = config.next_host delay = config.initial_delay retry_count = 0 + http = current_http + begin - http = build_http(host, config.port) - http.open_timeout = config.open_timeout - http.read_timeout = config.read_timeout + http.start unless http.started? - http = setup_ssl(http) yield http rescue *InfluxDB::NON_RECOVERABLE_EXCEPTIONS => e + http.finish if http.started? + raise InfluxDB::ConnectionError, InfluxDB::NON_RECOVERABLE_MESSAGE rescue Timeout::Error, *InfluxDB::RECOVERABLE_EXCEPTIONS => e + http.finish if http.started? + retry_count += 1 unless (config.retry == -1 || retry_count <= config.retry) && !stopped? raise InfluxDB::ConnectionError, "Tried #{retry_count - 1} times to reconnect but failed." end - log(:warn) { "Failed to contact host #{host}: #{e.inspect} - retrying in #{delay}s." } + log(:warn) do + "Failed to contact host #{http.address}: #{e.inspect} - retrying in #{delay}s." + end sleep delay delay = [config.max_delay, delay * 2].min + retry - ensure - http.finish if http.started? end end @@ -134,16 +138,42 @@ def generate_cert_store store end + def current_http + return build_http config.next_host unless config.persistent + + @https ||= + begin + https = config.hosts.map do |host| + build_http host + end + + Hash[config.hosts.zip(https)] + end + + @https[config.next_host] + end + # Builds an http instance, taking into account any configured # proxy configuration - def build_http(host, port) - if config.proxy_addr - Net::HTTP.new(host, port, config.proxy_addr, config.proxy_port) - else - Net::HTTP.new(host, port) - end + def build_http(host) + port = config.port + + http = + if config.proxy_addr + Net::HTTP.new(host, port, config.proxy_addr, config.proxy_port) + else + Net::HTTP.new(host, port) + end + + http.open_timeout = config.open_timeout + http.read_timeout = config.read_timeout + + setup_ssl http + + http end end # rubocop:enable Metrics/MethodLength + # rubocop:enable Metrics/ModuleLength # rubocop:enable Metrics/AbcSize end diff --git a/lib/influxdb/config.rb b/lib/influxdb/config.rb index 412052b..fe777d2 100644 --- a/lib/influxdb/config.rb +++ b/lib/influxdb/config.rb @@ -18,6 +18,7 @@ module InfluxDB auth_method: nil, proxy_addr: nil, proxy_port: nil, + persistent: true, # SSL options use_ssl: false, @@ -44,6 +45,8 @@ module InfluxDB denormalize: true, }.freeze + # rubocop:disable Metrics/ClassLength + # InfluxDB client configuration class Config # Valid values for the "auth_method" option. @@ -82,6 +85,12 @@ def initialize(url: nil, **opts) configure_hosts! opts[:hosts] || opts[:host] || "localhost".freeze end + def initialize_copy(source) + super + + configure_hosts! source.hosts + end + def udp? udp != false end @@ -104,7 +113,15 @@ def hosts end end - private + def writer_config + writer_config = dup + + writer_config.set_ivar! :async, false + + writer_config + end + + protected def set_ivar!(name, value) case name @@ -117,6 +134,8 @@ def set_ivar!(name, value) instance_variable_set "@#{name}", value end + private + def normalize_retry_option(value) case value when Integer then value @@ -188,4 +207,5 @@ def coerce(name, value) end end end + # rubocop:enable Metrics/ClassLength end diff --git a/lib/influxdb/logging.rb b/lib/influxdb/logging.rb index 882275e..d96185e 100644 --- a/lib/influxdb/logging.rb +++ b/lib/influxdb/logging.rb @@ -6,7 +6,6 @@ module Logging # :nodoc: class << self attr_writer :logger - attr_writer :log_level def logger return false if @logger == false @@ -15,18 +14,15 @@ def logger end def log_level - @log_level || Logger::INFO + return Logger::INFO unless logger + + logger.level end - def log?(level) - case level - when :debug then log_level <= Logger::DEBUG - when :info then log_level <= Logger::INFO - when :warn then log_level <= Logger::WARN - when :error then log_level <= Logger::ERROR - when :fatal then log_level <= Logger::FATAL - else true - end + def log_level=(level) + return unless logger + + logger.level = level end end @@ -34,7 +30,6 @@ def log?(level) def log(level, message = nil, &block) return unless InfluxDB::Logging.logger - return unless InfluxDB::Logging.log?(level) if block_given? InfluxDB::Logging.logger.send(level.to_sym, PREFIX, &block) diff --git a/spec/influxdb/cases/async_client_spec.rb b/spec/influxdb/cases/async_client_spec.rb index 8ee6389..6b880a4 100644 --- a/spec/influxdb/cases/async_client_spec.rb +++ b/spec/influxdb/cases/async_client_spec.rb @@ -6,7 +6,8 @@ let(:client) { described_class.new(async: async_options) } let(:subject) { client } let(:stub_url) { "http://localhost:8086/write?db=&p=root&precision=s&u=root" } - let(:worker) { client.writer.worker } + let(:writer) { client.writer } + let(:worker) { writer.worker } specify { expect(subject.writer).to be_a(InfluxDB::Writer::Async) } @@ -40,7 +41,7 @@ it "writes aggregate payload to the client" do queue = Queue.new - allow(client).to receive(:write) do |*args| + allow_any_instance_of(InfluxDB::Client).to receive(:write) do |_, *args| queue.push(args) end @@ -61,7 +62,7 @@ it "writes separated payloads for each {precision, retention_policy, database} set" do queue = Queue.new - allow(client).to receive(:write) do |*args| + allow_any_instance_of(InfluxDB::Client).to receive(:write) do |_, *args| queue.push(args) end diff --git a/spec/influxdb/client_spec.rb b/spec/influxdb/client_spec.rb index 3785e46..3735299 100644 --- a/spec/influxdb/client_spec.rb +++ b/spec/influxdb/client_spec.rb @@ -39,6 +39,19 @@ end end + describe "#connect_with_retry" do + it "raises InfluxDB::ConnectionError when the hostname is unknown" do + subject.config.retry = 0 + + allow_any_instance_of(Net::HTTP).to receive(:start) do + raise SocketError, "simulate getaddrinfo error" + end + + error = InfluxDB::ConnectionError.new "Tried 0 times to reconnect but failed." + expect { subject.send(:connect_with_retry) }.to raise_error(InfluxDB::ConnectionError) + end + end + describe "#full_url" do it "returns String" do expect(subject.send(:full_url, "/unknown")).to be_a String @@ -60,6 +73,24 @@ end end + describe "#current_http" do + it "returns an existing connection with persistence enabled" do + first = subject.send :current_http + second = subject.send :current_http + + expect(first).to equal(second) + end + + it "returns a new connection with persistence disabled" do + subject.config.persistent = false + + first = subject.send :current_http + second = subject.send :current_http + + expect(first).to_not equal(second) + end + end + describe "GET #ping" do it "returns OK" do stub_request(:get, "http://influxdb.test:9999/ping") diff --git a/spec/influxdb/config_spec.rb b/spec/influxdb/config_spec.rb index c81e513..36eae57 100644 --- a/spec/influxdb/config_spec.rb +++ b/spec/influxdb/config_spec.rb @@ -14,6 +14,7 @@ specify { expect(conf.port).to eq 8086 } specify { expect(conf.username).to eq "root" } specify { expect(conf.password).to eq "root" } + specify { expect(conf.persistent).to be_truthy } specify { expect(conf.use_ssl).to be_falsey } specify { expect(conf.time_precision).to eq "s" } specify { expect(conf.auth_method).to eq "params" }