From 0a5412dbf2a9060e4f566acb0ed95f286df97b16 Mon Sep 17 00:00:00 2001 From: Eric Hodel Date: Fri, 29 May 2020 20:43:08 -0700 Subject: [PATCH 01/10] Do all http building in Client#build_http --- lib/influxdb/client/http.rb | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/lib/influxdb/client/http.rb b/lib/influxdb/client/http.rb index c4a7cf1..6991b6a 100644 --- a/lib/influxdb/client/http.rb +++ b/lib/influxdb/client/http.rb @@ -40,16 +40,12 @@ def post(url, data) private def connect_with_retry - host = config.next_host delay = config.initial_delay retry_count = 0 - begin - http = build_http(host, config.port) - http.open_timeout = config.open_timeout - http.read_timeout = config.read_timeout + http = build_http - http = setup_ssl(http) + begin yield http rescue *InfluxDB::NON_RECOVERABLE_EXCEPTIONS => e raise InfluxDB::ConnectionError, InfluxDB::NON_RECOVERABLE_MESSAGE @@ -136,12 +132,23 @@ def generate_cert_store # Builds an http instance, taking into account any configured # proxy configuration - def build_http(host, port) - if config.proxy_addr - Net::HTTP.new(host, port, config.proxy_addr, config.proxy_port) - else - Net::HTTP.new(host, port) - end + def build_http + host = config.next_host + port = config.port + + http = + if config.proxy_addr + Net::HTTP.new(host, port, config.proxy_addr, config.proxy_port) + else + Net::HTTP.new(host, port) + end + + http.open_timeout = config.open_timeout + http.read_timeout = config.read_timeout + + setup_ssl http + + http end end # rubocop:enable Metrics/MethodLength From e4252e5a501c6a207a83d274f26f378c08c66435 Mon Sep 17 00:00:00 2001 From: Eric Hodel Date: Fri, 29 May 2020 20:58:25 -0700 Subject: [PATCH 02/10] Save HTTP connection across requests This influxdb client is not using persistent connections. This causes a significant performance loss. Below is a benchmark script that writes a single point to influxdb. Running it takes 44 seconds of time: $ ruby -Ilib t.rb Writing 1000 points user system total real write_points 19.270388 1.394931 20.665319 ( 44.622170) This works out to a write speed of about 22 points per second, or about 50ms per point. This is: * Pretty slow. If you are generating points quickly maximum resolution of 50ms is not great. A user may lose data because they cannot sample quickly enough. * Pretty inefficient. For each point written the client library must establish a TCP connection, establish a TLS session, finally write some data. Any data written may be restricted by the TCP slow-start windowing algorithm. Much more ruby code must be run, almost half the time spent is in the "user" category, time that could be doing anything else in a ruby application, or time that could be used by other processes. This commit caches HTTP connections across requests. Running the same benchmark takes 4.26 seconds: $ ruby -Ilib t.rb Writing 1000 points user system total real write_points 0.551663 0.084603 0.636266 ( 4.261201) This works out to a speed of 234 points per second, or about 5ms per point. Writing points now no longer need to recreate a TCP connection, renegotiate a TLS session, or be held up by TCP window sizing limitations. This is much more efficient in terms of CPU time used per point, instead of ~46% of time taken occurring in the "user" category, now only 13% of time is in "user". The balance of the time is now spent waiting for IO to complete. Benchmark script: require "benchmark" require "influxdb" n = Integer ENV["N"] rescue 1_000 influxdb = InfluxDB::Client.new url: ENV["INFLUX_URL"], username: ENV["INFLUX_USER"], password: ENV["INFLUX_PASS"], time_precision: "u" def write_point counter, influxdb points = [ { series: "test", values: { counter: counter, }, }, ] influxdb.write_points points end puts "Writing #{n} points" Benchmark.bm 12 do |bm| bm.report "write_points" do n.times do |i| write_point i, influxdb end end end --- lib/influxdb/client/http.rb | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/lib/influxdb/client/http.rb b/lib/influxdb/client/http.rb index 6991b6a..40ea94a 100644 --- a/lib/influxdb/client/http.rb +++ b/lib/influxdb/client/http.rb @@ -43,13 +43,19 @@ def connect_with_retry delay = config.initial_delay retry_count = 0 - http = build_http + http = get_http begin + http.start unless http.started? + yield http rescue *InfluxDB::NON_RECOVERABLE_EXCEPTIONS => e + http.finish + raise InfluxDB::ConnectionError, InfluxDB::NON_RECOVERABLE_MESSAGE rescue Timeout::Error, *InfluxDB::RECOVERABLE_EXCEPTIONS => e + http.finish + retry_count += 1 unless (config.retry == -1 || retry_count <= config.retry) && !stopped? raise InfluxDB::ConnectionError, "Tried #{retry_count - 1} times to reconnect but failed." @@ -58,9 +64,8 @@ def connect_with_retry log(:warn) { "Failed to contact host #{host}: #{e.inspect} - retrying in #{delay}s." } sleep delay delay = [config.max_delay, delay * 2].min + retry - ensure - http.finish if http.started? end end @@ -130,11 +135,23 @@ def generate_cert_store store end + def get_http + @https ||= + begin + https = config.hosts.map { |host| + build_http host + } + + Hash[config.hosts.zip(https)] + end + + @https[config.next_host] + end + # Builds an http instance, taking into account any configured # proxy configuration - def build_http - host = config.next_host - port = config.port + def build_http(host) + port = config.port http = if config.proxy_addr From 4f556e62f9f5b3e337501de0c82b399b1e7b90ac Mon Sep 17 00:00:00 2001 From: Eric Hodel Date: Fri, 29 May 2020 22:36:48 -0700 Subject: [PATCH 03/10] Allow disabling persistent connections --- README.md | 1 + lib/influxdb/client/http.rb | 2 ++ lib/influxdb/config.rb | 1 + spec/influxdb/client_spec.rb | 18 ++++++++++++++++++ spec/influxdb/config_spec.rb | 1 + 5 files changed, 23 insertions(+) diff --git a/README.md b/README.md index c15ac96..b407afe 100644 --- a/README.md +++ b/README.md @@ -770,6 +770,7 @@ found in `lib/influxdb/config.rb` for the source of truth. | | `:open_timeout` | 5 | socket timeout | | `:read_timeout` | 300 | socket timeout | | `:auth_method` | "params" | "params", "basic_auth" or "none" +| | `:persistent` | true | set to false to disable persistent connections | Retry | `:retry` | -1 | max. number of retry attempts (reading and writing) | | `:initial_delay` | 0.01 | initial wait time (doubles every retry attempt) | | `:max_delay` | 30 | max. wait time when retrying diff --git a/lib/influxdb/client/http.rb b/lib/influxdb/client/http.rb index 40ea94a..ec982c6 100644 --- a/lib/influxdb/client/http.rb +++ b/lib/influxdb/client/http.rb @@ -136,6 +136,8 @@ def generate_cert_store end def get_http + return build_http config.next_host unless config.persistent + @https ||= begin https = config.hosts.map { |host| diff --git a/lib/influxdb/config.rb b/lib/influxdb/config.rb index 2bab493..52bb1ed 100644 --- a/lib/influxdb/config.rb +++ b/lib/influxdb/config.rb @@ -18,6 +18,7 @@ module InfluxDB auth_method: nil, proxy_addr: nil, proxy_port: nil, + persistent: true, # SSL options use_ssl: false, diff --git a/spec/influxdb/client_spec.rb b/spec/influxdb/client_spec.rb index bf40b8c..b60befe 100644 --- a/spec/influxdb/client_spec.rb +++ b/spec/influxdb/client_spec.rb @@ -60,6 +60,24 @@ end end + describe "#get_http" do + it "returns an existing connection with persistence enabled" do + first = subject.send :get_http + second = subject.send :get_http + + expect(first).to equal(second) + end + + it "returns a new connection with persistence disabled" do + subject.config.persistent = false + + first = subject.send :get_http + second = subject.send :get_http + + expect(first).to_not equal(second) + end + end + describe "GET #ping" do it "returns OK" do stub_request(:get, "http://influxdb.test:9999/ping") diff --git a/spec/influxdb/config_spec.rb b/spec/influxdb/config_spec.rb index 9a2a544..66d88cb 100644 --- a/spec/influxdb/config_spec.rb +++ b/spec/influxdb/config_spec.rb @@ -17,6 +17,7 @@ specify { expect(conf.port).to eq 8086 } specify { expect(conf.username).to eq "root" } specify { expect(conf.password).to eq "root" } + specify { expect(conf.persistent).to be_truthy } specify { expect(conf.use_ssl).to be_falsey } specify { expect(conf.time_precision).to eq "s" } specify { expect(conf.auth_method).to eq "params" } From 56baa9b54b8e77836ccd54a7cdad1e7a16cb44f2 Mon Sep 17 00:00:00 2001 From: Eric Hodel Date: Fri, 29 May 2020 22:54:05 -0700 Subject: [PATCH 04/10] Fix logging at level Logger::DEBUG Previously setting the log level to Logger::DEBUG would have no effect as the log level of the Logger object was not changed. If you set: InfluxDB::Logging.log_level = Logger::DEBUG InfluxDB::Logging::log? would return true, allowing the log statement to proceed, but #log would not do anything because the Logger object was still at its default from initialization, Logger::INFO. By setting the log level directly on the Logger object and removing ::log? we allow the Logger object to determine if a log level needs to be logged or not. This allows debug-level log messages to be displayed. --- lib/influxdb/logging.rb | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/lib/influxdb/logging.rb b/lib/influxdb/logging.rb index 882275e..68721e3 100644 --- a/lib/influxdb/logging.rb +++ b/lib/influxdb/logging.rb @@ -6,7 +6,6 @@ module Logging # :nodoc: class << self attr_writer :logger - attr_writer :log_level def logger return false if @logger == false @@ -15,18 +14,13 @@ def logger end def log_level - @log_level || Logger::INFO + logger&.level || Logger::INFO end - def log?(level) - case level - when :debug then log_level <= Logger::DEBUG - when :info then log_level <= Logger::INFO - when :warn then log_level <= Logger::WARN - when :error then log_level <= Logger::ERROR - when :fatal then log_level <= Logger::FATAL - else true - end + def log_level=(level) + return unless logger + + logger.level = level end end @@ -34,7 +28,6 @@ def log?(level) def log(level, message = nil, &block) return unless InfluxDB::Logging.logger - return unless InfluxDB::Logging.log?(level) if block_given? InfluxDB::Logging.logger.send(level.to_sym, PREFIX, &block) From d5c164981addf685c223e9b8881181a1b2525669 Mon Sep 17 00:00:00 2001 From: Eric Hodel Date: Sat, 30 May 2020 00:51:33 -0700 Subject: [PATCH 05/10] Fix async persistent connections With the addition of persistent connections we need to make a separate InfluxDB::Client per worker as the persistent connection is attached to the client. Here this is done by copying the InfluxDB::Config object and creating a new host queue (so threads don't have to communicate to share hosts) and creating a new client without async enabled (so it will use HTTP or UDP write methods). --- lib/influxdb/client.rb | 11 ++++++++--- lib/influxdb/config.rb | 18 +++++++++++++++++- spec/influxdb/cases/async_client_spec.rb | 7 ++++--- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/lib/influxdb/client.rb b/lib/influxdb/client.rb index 18ea2be..b5e2e5f 100644 --- a/lib/influxdb/client.rb +++ b/lib/influxdb/client.rb @@ -49,9 +49,12 @@ class Client # +:ssl_ca_cert+:: ssl CA certificate, chainfile or CA path. # The system CA path is automatically included # +:retry+:: number of times a failed request should be retried. Defaults to infinite. - def initialize(database = nil, **opts) + def initialize(database = nil, config = nil, **opts) + raise ArgumentError, "provide config or opts, not both" if + config && !opts.empty? + opts[:database] = database if database.is_a? String - @config = InfluxDB::Config.new(**opts) + @config = config || InfluxDB::Config.new(**opts) @stopped = false @writer = find_writer @@ -82,7 +85,9 @@ def now def find_writer if config.async? - InfluxDB::Writer::Async.new(self, config.async) + client = InfluxDB::Client.new nil, config.writer_config + + InfluxDB::Writer::Async.new(client, config.async) elsif config.udp.is_a?(Hash) InfluxDB::Writer::UDP.new(self, **config.udp) elsif config.udp? diff --git a/lib/influxdb/config.rb b/lib/influxdb/config.rb index 52bb1ed..9a805c8 100644 --- a/lib/influxdb/config.rb +++ b/lib/influxdb/config.rb @@ -83,6 +83,12 @@ def initialize(url: nil, **opts) configure_hosts! opts[:hosts] || opts[:host] || "localhost".freeze end + def initialize_copy(source) + super + + configure_hosts! source.hosts + end + def udp? udp != false end @@ -105,7 +111,15 @@ def hosts end end - private + def writer_config + writer_config = dup + + writer_config.set_ivar! :async, false + + writer_config + end + + protected def set_ivar!(name, value) case name @@ -118,6 +132,8 @@ def set_ivar!(name, value) instance_variable_set "@#{name}", value end + private + def normalize_retry_option(value) case value when Integer then value diff --git a/spec/influxdb/cases/async_client_spec.rb b/spec/influxdb/cases/async_client_spec.rb index 8ee6389..6b880a4 100644 --- a/spec/influxdb/cases/async_client_spec.rb +++ b/spec/influxdb/cases/async_client_spec.rb @@ -6,7 +6,8 @@ let(:client) { described_class.new(async: async_options) } let(:subject) { client } let(:stub_url) { "http://localhost:8086/write?db=&p=root&precision=s&u=root" } - let(:worker) { client.writer.worker } + let(:writer) { client.writer } + let(:worker) { writer.worker } specify { expect(subject.writer).to be_a(InfluxDB::Writer::Async) } @@ -40,7 +41,7 @@ it "writes aggregate payload to the client" do queue = Queue.new - allow(client).to receive(:write) do |*args| + allow_any_instance_of(InfluxDB::Client).to receive(:write) do |_, *args| queue.push(args) end @@ -61,7 +62,7 @@ it "writes separated payloads for each {precision, retention_policy, database} set" do queue = Queue.new - allow(client).to receive(:write) do |*args| + allow_any_instance_of(InfluxDB::Client).to receive(:write) do |_, *args| queue.push(args) end From 5b1bbdbea73eb37929b022af0968a7b771d17a18 Mon Sep 17 00:00:00 2001 From: Eric Hodel Date: Sat, 30 May 2020 01:12:44 -0700 Subject: [PATCH 06/10] rubocop --- lib/influxdb/client/http.rb | 10 ++++++---- lib/influxdb/config.rb | 3 +++ spec/influxdb/client_spec.rb | 10 +++++----- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/lib/influxdb/client/http.rb b/lib/influxdb/client/http.rb index ec982c6..3a25d73 100644 --- a/lib/influxdb/client/http.rb +++ b/lib/influxdb/client/http.rb @@ -5,6 +5,7 @@ module InfluxDB # rubocop:disable Metrics/MethodLength + # rubocop:disable Metrics/ModuleLength # rubocop:disable Metrics/AbcSize module HTTP # :nodoc: def get(url, options = {}) @@ -43,7 +44,7 @@ def connect_with_retry delay = config.initial_delay retry_count = 0 - http = get_http + http = current_http begin http.start unless http.started? @@ -135,14 +136,14 @@ def generate_cert_store store end - def get_http + def current_http return build_http config.next_host unless config.persistent @https ||= begin - https = config.hosts.map { |host| + https = config.hosts.map do |host| build_http host - } + end Hash[config.hosts.zip(https)] end @@ -171,5 +172,6 @@ def build_http(host) end end # rubocop:enable Metrics/MethodLength + # rubocop:enable Metrics/ModuleLength # rubocop:enable Metrics/AbcSize end diff --git a/lib/influxdb/config.rb b/lib/influxdb/config.rb index 9a805c8..32e860c 100644 --- a/lib/influxdb/config.rb +++ b/lib/influxdb/config.rb @@ -45,6 +45,8 @@ module InfluxDB denormalize: true, }.freeze + # rubocop:disable Metrics/ClassLength + # InfluxDB client configuration class Config # Valid values for the "auth_method" option. @@ -205,4 +207,5 @@ def coerce(name, value) end end end + # rubocop:enable Metrics/ClassLength end diff --git a/spec/influxdb/client_spec.rb b/spec/influxdb/client_spec.rb index b60befe..8b46b03 100644 --- a/spec/influxdb/client_spec.rb +++ b/spec/influxdb/client_spec.rb @@ -60,10 +60,10 @@ end end - describe "#get_http" do + describe "#current_http" do it "returns an existing connection with persistence enabled" do - first = subject.send :get_http - second = subject.send :get_http + first = subject.send :current_http + second = subject.send :current_http expect(first).to equal(second) end @@ -71,8 +71,8 @@ it "returns a new connection with persistence disabled" do subject.config.persistent = false - first = subject.send :get_http - second = subject.send :get_http + first = subject.send :current_http + second = subject.send :current_http expect(first).to_not equal(second) end From de75723ae7d8354451805772c5065d4515d54e54 Mon Sep 17 00:00:00 2001 From: Eric Hodel Date: Sat, 30 May 2020 01:12:50 -0700 Subject: [PATCH 07/10] Support ruby 2.2 --- lib/influxdb/logging.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/influxdb/logging.rb b/lib/influxdb/logging.rb index 68721e3..d96185e 100644 --- a/lib/influxdb/logging.rb +++ b/lib/influxdb/logging.rb @@ -14,7 +14,9 @@ def logger end def log_level - logger&.level || Logger::INFO + return Logger::INFO unless logger + + logger.level end def log_level=(level) From 9bf8da75ee28a8de0b64ffd330a13ba141920671 Mon Sep 17 00:00:00 2001 From: Eric Hodel Date: Sun, 31 May 2020 10:16:05 -0700 Subject: [PATCH 08/10] Fix warning message --- lib/influxdb/client/http.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/influxdb/client/http.rb b/lib/influxdb/client/http.rb index 3a25d73..5a9276e 100644 --- a/lib/influxdb/client/http.rb +++ b/lib/influxdb/client/http.rb @@ -62,7 +62,9 @@ def connect_with_retry raise InfluxDB::ConnectionError, "Tried #{retry_count - 1} times to reconnect but failed." end - log(:warn) { "Failed to contact host #{host}: #{e.inspect} - retrying in #{delay}s." } + log(:warn) do + "Failed to contact host #{http.address}: #{e.inspect} - retrying in #{delay}s." + end sleep delay delay = [config.max_delay, delay * 2].min From 175b1696c4fae85bddd1b5ab4850526a82649d71 Mon Sep 17 00:00:00 2001 From: Eric Hodel Date: Wed, 3 Jun 2020 14:00:17 -0700 Subject: [PATCH 09/10] Only finish started HTTP connections --- lib/influxdb/client/http.rb | 4 ++-- spec/influxdb/client_spec.rb | 13 +++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/lib/influxdb/client/http.rb b/lib/influxdb/client/http.rb index 5a9276e..a9ddcb7 100644 --- a/lib/influxdb/client/http.rb +++ b/lib/influxdb/client/http.rb @@ -51,11 +51,11 @@ def connect_with_retry yield http rescue *InfluxDB::NON_RECOVERABLE_EXCEPTIONS => e - http.finish + http.finish if http.started? raise InfluxDB::ConnectionError, InfluxDB::NON_RECOVERABLE_MESSAGE rescue Timeout::Error, *InfluxDB::RECOVERABLE_EXCEPTIONS => e - http.finish + http.finish if http.started? retry_count += 1 unless (config.retry == -1 || retry_count <= config.retry) && !stopped? diff --git a/spec/influxdb/client_spec.rb b/spec/influxdb/client_spec.rb index 8b46b03..7c956af 100644 --- a/spec/influxdb/client_spec.rb +++ b/spec/influxdb/client_spec.rb @@ -39,6 +39,19 @@ end end + describe "#connect_with_retry" do + it "raises InfluxDB::ConnectionError when the hostname is unknown" do + subject.config.retry = 0 + + allow_any_instance_of(Net::HTTP).to receive(:start) do + raise SocketError, "simulate getaddrinfo error" + end + + error = InfluxDB::ConnectionError.new "Tried 0 times to reconnect but failed." + expect { subject.send(:connect_with_retry) }.to raise_error(InfluxDB::ConnectionError) + end + end + describe "#full_url" do it "returns String" do expect(subject.send(:full_url, "/unknown")).to be_a String From c3f1aa656baeca616e99effa37d94119bd7985a6 Mon Sep 17 00:00:00 2001 From: Henne Vogelsang Date: Sat, 6 Feb 2021 19:29:46 +0100 Subject: [PATCH 10/10] Address some linter warnings Let's not split this central method. --- lib/influxdb/client/http.rb | 4 ++++ spec/influxdb/client_spec.rb | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/influxdb/client/http.rb b/lib/influxdb/client/http.rb index a9ddcb7..8f1ada4 100644 --- a/lib/influxdb/client/http.rb +++ b/lib/influxdb/client/http.rb @@ -40,6 +40,8 @@ def post(url, data) private + # rubocop:disable Metrics/CyclomaticComplexity + # rubocop:disable Metrics/PerceivedComplexity def connect_with_retry delay = config.initial_delay retry_count = 0 @@ -71,6 +73,8 @@ def connect_with_retry retry end end + # rubocop:enable Metrics/CyclomaticComplexity + # rubocop:enable Metrics/PerceivedComplexity def do_request(http, req, data = nil) req.basic_auth config.username, config.password if basic_auth? diff --git a/spec/influxdb/client_spec.rb b/spec/influxdb/client_spec.rb index 7c956af..0ddbb49 100644 --- a/spec/influxdb/client_spec.rb +++ b/spec/influxdb/client_spec.rb @@ -47,8 +47,7 @@ raise SocketError, "simulate getaddrinfo error" end - error = InfluxDB::ConnectionError.new "Tried 0 times to reconnect but failed." - expect { subject.send(:connect_with_retry) }.to raise_error(InfluxDB::ConnectionError) + expect { subject.send(:connect_with_retry) }.to raise_error(InfluxDB::ConnectionError, "Tried 0 times to reconnect but failed.") end end