Skip to content

Support persistent connections #241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ found in `lib/influxdb/config.rb` for the source of truth.
| | `:open_timeout` | 5 | socket timeout
| | `:read_timeout` | 300 | socket timeout
| | `:auth_method` | "params" | "params", "basic_auth" or "none"
| | `:persistent` | true | set to false to disable persistent connections
| Retry | `:retry` | -1 | max. number of retry attempts (reading and writing)
| | `:initial_delay` | 0.01 | initial wait time (doubles every retry attempt)
| | `:max_delay` | 30 | max. wait time when retrying
Expand Down
11 changes: 8 additions & 3 deletions lib/influxdb/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ class Client
# +:ssl_ca_cert+:: ssl CA certificate, chainfile or CA path.
# The system CA path is automatically included
# +:retry+:: number of times a failed request should be retried. Defaults to infinite.
def initialize(database = nil, **opts)
def initialize(database = nil, config = nil, **opts)
raise ArgumentError, "provide config or opts, not both" if
config && !opts.empty?

opts[:database] = database if database.is_a? String
@config = InfluxDB::Config.new(**opts)
@config = config || InfluxDB::Config.new(**opts)
@stopped = false
@writer = find_writer

Expand Down Expand Up @@ -82,7 +85,9 @@ def now

def find_writer
if config.async?
InfluxDB::Writer::Async.new(self, config.async)
client = InfluxDB::Client.new nil, config.writer_config

InfluxDB::Writer::Async.new(client, config.async)
elsif config.udp.is_a?(Hash)
InfluxDB::Writer::UDP.new(self, **config.udp)
elsif config.udp?
Expand Down
58 changes: 44 additions & 14 deletions lib/influxdb/client/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

module InfluxDB
# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/ModuleLength
# rubocop:disable Metrics/AbcSize
module HTTP # :nodoc:
def get(url, options = {})
Expand Down Expand Up @@ -40,31 +41,34 @@ def post(url, data)
private

def connect_with_retry
host = config.next_host
delay = config.initial_delay
retry_count = 0

http = current_http

begin
http = build_http(host, config.port)
http.open_timeout = config.open_timeout
http.read_timeout = config.read_timeout
http.start unless http.started?

http = setup_ssl(http)
yield http
rescue *InfluxDB::NON_RECOVERABLE_EXCEPTIONS => e
http.finish if http.started?

raise InfluxDB::ConnectionError, InfluxDB::NON_RECOVERABLE_MESSAGE
rescue Timeout::Error, *InfluxDB::RECOVERABLE_EXCEPTIONS => e
http.finish if http.started?

retry_count += 1
unless (config.retry == -1 || retry_count <= config.retry) && !stopped?
raise InfluxDB::ConnectionError, "Tried #{retry_count - 1} times to reconnect but failed."
end

log(:warn) { "Failed to contact host #{host}: #{e.inspect} - retrying in #{delay}s." }
log(:warn) do
"Failed to contact host #{http.address}: #{e.inspect} - retrying in #{delay}s."
end
sleep delay
delay = [config.max_delay, delay * 2].min

retry
ensure
http.finish if http.started?
end
end

Expand Down Expand Up @@ -134,16 +138,42 @@ def generate_cert_store
store
end

def current_http
return build_http config.next_host unless config.persistent

@https ||=
begin
https = config.hosts.map do |host|
build_http host
end

Hash[config.hosts.zip(https)]
end

@https[config.next_host]
end

# Builds an http instance, taking into account any configured
# proxy configuration
def build_http(host, port)
if config.proxy_addr
Net::HTTP.new(host, port, config.proxy_addr, config.proxy_port)
else
Net::HTTP.new(host, port)
end
def build_http(host)
port = config.port

http =
if config.proxy_addr
Net::HTTP.new(host, port, config.proxy_addr, config.proxy_port)
else
Net::HTTP.new(host, port)
end

http.open_timeout = config.open_timeout
http.read_timeout = config.read_timeout

setup_ssl http

http
end
end
# rubocop:enable Metrics/MethodLength
# rubocop:enable Metrics/ModuleLength
# rubocop:enable Metrics/AbcSize
end
22 changes: 21 additions & 1 deletion lib/influxdb/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module InfluxDB
auth_method: nil,
proxy_addr: nil,
proxy_port: nil,
persistent: true,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be false. If people are using this gem, we want to allow them to turn on new features when they are ready.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do


# SSL options
use_ssl: false,
Expand All @@ -44,6 +45,8 @@ module InfluxDB
denormalize: true,
}.freeze

# rubocop:disable Metrics/ClassLength

# InfluxDB client configuration
class Config
# Valid values for the "auth_method" option.
Expand Down Expand Up @@ -82,6 +85,12 @@ def initialize(url: nil, **opts)
configure_hosts! opts[:hosts] || opts[:host] || "localhost".freeze
end

def initialize_copy(source)
super

configure_hosts! source.hosts
end

def udp?
udp != false
end
Expand All @@ -104,7 +113,15 @@ def hosts
end
end

private
def writer_config
writer_config = dup

writer_config.set_ivar! :async, false

writer_config
end

protected

def set_ivar!(name, value)
case name
Expand All @@ -117,6 +134,8 @@ def set_ivar!(name, value)
instance_variable_set "@#{name}", value
end

private

def normalize_retry_option(value)
case value
when Integer then value
Expand Down Expand Up @@ -188,4 +207,5 @@ def coerce(name, value)
end
end
end
# rubocop:enable Metrics/ClassLength
end
19 changes: 7 additions & 12 deletions lib/influxdb/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ module Logging # :nodoc:

class << self
attr_writer :logger
attr_writer :log_level

def logger
return false if @logger == false
Expand All @@ -15,26 +14,22 @@ def logger
end

def log_level
@log_level || Logger::INFO
return Logger::INFO unless logger

logger.level
end

def log?(level)
case level
when :debug then log_level <= Logger::DEBUG
when :info then log_level <= Logger::INFO
when :warn then log_level <= Logger::WARN
when :error then log_level <= Logger::ERROR
when :fatal then log_level <= Logger::FATAL
else true
end
def log_level=(level)
return unless logger

logger.level = level
end
end

private

def log(level, message = nil, &block)
return unless InfluxDB::Logging.logger
return unless InfluxDB::Logging.log?(level)

if block_given?
InfluxDB::Logging.logger.send(level.to_sym, PREFIX, &block)
Expand Down
7 changes: 4 additions & 3 deletions spec/influxdb/cases/async_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
let(:client) { described_class.new(async: async_options) }
let(:subject) { client }
let(:stub_url) { "http://localhost:8086/write?db=&p=root&precision=s&u=root" }
let(:worker) { client.writer.worker }
let(:writer) { client.writer }
let(:worker) { writer.worker }

specify { expect(subject.writer).to be_a(InfluxDB::Writer::Async) }

Expand Down Expand Up @@ -40,7 +41,7 @@

it "writes aggregate payload to the client" do
queue = Queue.new
allow(client).to receive(:write) do |*args|
allow_any_instance_of(InfluxDB::Client).to receive(:write) do |_, *args|
queue.push(args)
end

Expand All @@ -61,7 +62,7 @@

it "writes separated payloads for each {precision, retention_policy, database} set" do
queue = Queue.new
allow(client).to receive(:write) do |*args|
allow_any_instance_of(InfluxDB::Client).to receive(:write) do |_, *args|
queue.push(args)
end

Expand Down
31 changes: 31 additions & 0 deletions spec/influxdb/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@
end
end

describe "#connect_with_retry" do
it "raises InfluxDB::ConnectionError when the hostname is unknown" do
subject.config.retry = 0

allow_any_instance_of(Net::HTTP).to receive(:start) do
raise SocketError, "simulate getaddrinfo error"
end

error = InfluxDB::ConnectionError.new "Tried 0 times to reconnect but failed."
expect { subject.send(:connect_with_retry) }.to raise_error(InfluxDB::ConnectionError)
end
end

describe "#full_url" do
it "returns String" do
expect(subject.send(:full_url, "/unknown")).to be_a String
Expand All @@ -60,6 +73,24 @@
end
end

describe "#current_http" do
it "returns an existing connection with persistence enabled" do
first = subject.send :current_http
second = subject.send :current_http

expect(first).to equal(second)
end

it "returns a new connection with persistence disabled" do
subject.config.persistent = false

first = subject.send :current_http
second = subject.send :current_http

expect(first).to_not equal(second)
end
end

describe "GET #ping" do
it "returns OK" do
stub_request(:get, "http://influxdb.test:9999/ping")
Expand Down
1 change: 1 addition & 0 deletions spec/influxdb/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
specify { expect(conf.port).to eq 8086 }
specify { expect(conf.username).to eq "root" }
specify { expect(conf.password).to eq "root" }
specify { expect(conf.persistent).to be_truthy }
specify { expect(conf.use_ssl).to be_falsey }
specify { expect(conf.time_precision).to eq "s" }
specify { expect(conf.auth_method).to eq "params" }
Expand Down