Skip to content
Open
35 changes: 34 additions & 1 deletion lib/aerospike/aerospike_exception.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,31 @@ def initialize(result_code, message = nil)
message ||= ResultCode.message(result_code)
super(message)
end

def retryable?
case @result_code
when ResultCode::COMMAND_REJECTED
true
when ResultCode::INVALID_NODE_ERROR
true
when ResultCode::SERVER_ERROR
true
when ResultCode::SERVER_MEM_ERROR
true
when ResultCode::TIMEOUT
true
when ResultCode::KEY_BUSY
true
when ResultCode::SERVER_NOT_AVAILABLE
true
when ResultCode::DEVICE_OVERLOAD
true
when ResultCode::QUERY_NET_IO
true
else
false
end
end
end

class Timeout < Aerospike
Expand All @@ -39,7 +64,11 @@ def initialize(timeout, iterations, failed_nodes=nil, failed_connections=nil)
@failed_nodes = failed_nodes
@failed_connections = failed_connections

super(ResultCode::TIMEOUT)
super(ResultCode::TIMEOUT, "Timeout after #{iterations} attempts!")
end

def retryable?
true
end
end

Expand All @@ -65,6 +94,10 @@ class Connection < Aerospike
def initialize(msg=nil)
super(ResultCode::SERVER_NOT_AVAILABLE, msg)
end

def retryable?
true
end
end

class InvalidNode < Aerospike
Expand Down
6 changes: 4 additions & 2 deletions lib/aerospike/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,8 @@ def execute_udf_on_query(statement, package_name, function_name, function_args =
statement = statement.clone
statement.set_aggregate_function(package_name, function_name, function_args, false)
# Use a thread per node
nodes.each do |node|
threads = nodes.map do |node|
Thread.new do
Thread.current.abort_on_exception = true
begin
command = ServerCommand.new(@cluster, node, policy, statement, true, statement.task_id)
execute_command(command)
Expand All @@ -545,6 +544,9 @@ def execute_udf_on_query(statement, package_name, function_name, function_args =
end
end

# check for value of each threads. If any thread fails, this will raise an error.
threads.each(&:join)

ExecuteTask.new(@cluster, statement)
end

Expand Down
20 changes: 9 additions & 11 deletions lib/aerospike/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def batch_read_node(partition, replica_policy)
when Aerospike::Replica::RANDOM
random_node
else
raise Aerospike::Exceptions::InvalidNode("invalid policy.replica value")
raise Aerospike::Exceptions::InvalidNode.new("invalid policy.replica value")
end
end

Expand All @@ -147,18 +147,18 @@ def read_node(partition, replica_policy, seq)
when Aerospike::Replica::RANDOM
random_node
else
raise Aerospike::Exceptions::InvalidNode("invalid policy.replica value")
raise Aerospike::Exceptions::InvalidNode.new("invalid policy.replica value")
end
end

# Returns a node on the cluster for read operations
def master_node(partition)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

node_array = replica_array.get[0]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless node_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless node_array

node = node_array.get[partition.partition_id]
raise Aerospike::Exceptions::InvalidNode if !node || !node.active?
Expand All @@ -170,7 +170,7 @@ def master_node(partition)
def rack_node(partition, seq)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

replica_array = replica_array.get

Expand Down Expand Up @@ -202,7 +202,7 @@ def rack_node(partition, seq)
def master_proles_node(partition)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

replica_array = replica_array.get

Expand All @@ -221,7 +221,7 @@ def master_proles_node(partition)
def sequence_node(partition, seq)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

replica_array = replica_array.get

Expand Down Expand Up @@ -251,11 +251,10 @@ def node_partitions(node, namespace)

partition_map = partitions
replica_array = partition_map[namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

node_array = replica_array.get[0]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless node_array

raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless node_array

pid = 0
for tnode in node_array.get
Expand Down Expand Up @@ -451,7 +450,6 @@ def refresh_nodes
cluster_config_changed = true
end


cluster_config_changed
end

Expand Down
48 changes: 24 additions & 24 deletions lib/aerospike/command/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -686,20 +686,18 @@ def set_query(cluster, policy, statement, background, node_partitions)
def execute
iterations = 0

# set timeout outside the loop
limit = Time.now + @policy.timeout
retries = @policy.max_retries

# Execute command until successful, timed out or maximum iterations have been reached:
while iterations <= retries
# Sleep before trying again, after the first iteration:
if iterations > 0 && @policy.sleep_between_retries > 0
# Use a back-off according to the number of iterations:
sleep(@policy.sleep_between_retries * iterations)
end

# Execute command until successful, timed out or maximum iterations have been reached.
while true
# too many retries
# Next iteration:
iterations += 1
break if (@policy.max_retries > 0) && (iterations > @policy.max_retries + 1)

# Sleep before trying again, after the first iteration
sleep(@policy.sleep_between_retries) if iterations > 1 && @policy.sleep_between_retries > 0

# check for command timeout
break if @policy.timeout > 0 && Time.now > limit

begin
@node = get_node
Expand All @@ -716,7 +714,7 @@ def execute
next
end

# Draw a buffer from buffer pool, and make sure it will be put back
# Draw a buffer from buffer pool, and make sure it will be put back:
begin
@data_buffer = Buffer.get

Expand Down Expand Up @@ -752,21 +750,22 @@ def execute
# Parse results.
begin
parse_result
rescue => e
case e
# do not log the following exceptions
when Aerospike::Exceptions::ScanTerminated
when Aerospike::Exceptions::QueryTerminated
else
Aerospike.logger.error(e)
end

rescue Aerospike::Exceptions::Aerospike => exception
# close the connection
# cancelling/closing the batch/multi commands will return an error, which will
# close the connection to throw away its data and signal the server about the
# situation. We will not put back the connection in the buffer.
@conn.close if @conn
raise e

# Some exceptions are non-fatal and retrying may succeed:
if exception.retryable?
next
else
raise
end
rescue
@conn.close if @conn
next
end

# Reflect healthy status.
Expand All @@ -779,11 +778,12 @@ def execute
return
ensure
Buffer.put(@data_buffer)
@data_buffer = nil
end
end # while

# execution timeout
raise Aerospike::Exceptions::Timeout.new(limit, iterations)
raise Aerospike::Exceptions::Timeout.new(@policy.timeout, iterations)
end

protected
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/policy/policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def initialize(opt = {})
# A retry is attempted when there is a network error other than timeout.
# If max_retries is exceeded, the abort will occur even if the timeout
# has not yet been exceeded.
@max_retries = opt[:max_retries] || 2
@max_retries = opt[:max_retries] || 3

# Determines how record TTL (time to live) is affected on reads. When enabled, the server can
# efficiently operate as a read-based LRU cache where the least recently used records are expired.
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/result_code.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ module ResultCode
# Bin not found on update-only operation.
BIN_NOT_FOUND = 17

# Specified bin name does not exist in record.
# Device not keeping up with writes.
DEVICE_OVERLOAD = 18

# Key type mismatch.
Expand Down
4 changes: 4 additions & 0 deletions spec/aerospike/cluster_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
let(:instance) { described_class.new(policy, hosts) }
let(:hosts) { [] }

after do
instance.close
end

describe '#create_node' do
let(:nv) { double('nv') }
let(:node) { instance_double(Aerospike::Node) }
Expand Down
42 changes: 42 additions & 0 deletions spec/aerospike/command_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

RSpec.describe Aerospike::Command do
context Aerospike::ExistsCommand do
let(:cluster) { Support.client.cluster }
let(:policy) { Support.client.default_read_policy }
let(:key) { Support.gen_random_key }

subject { described_class.new(cluster, policy, key) }

describe '#execute' do
it "can retry even if parse_result fails" do
expect(policy.max_retries).to be > 0

expect(subject).to receive(:parse_result).once do
expect(subject).to receive(:parse_result).and_call_original

raise Errno::ECONNRESET
end

subject.execute

expect(subject.exists).to be false
end
end
end
end
2 changes: 1 addition & 1 deletion spec/aerospike/policy/policy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

expect(policy.class).to eq described_class
expect(policy.timeout).to eq 0
expect(policy.max_retries).to eq 2
expect(policy.max_retries).to eq 3
expect(policy.sleep_between_retries).to eq 0.5

end
Expand Down
2 changes: 1 addition & 1 deletion spec/aerospike/policy/write_policy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

expect(policy.class).to eq described_class
expect(policy.timeout).to eq 1
expect(policy.max_retries).to eq 2
expect(policy.max_retries).to eq 3
expect(policy.sleep_between_retries).to eq 0.5
expect(policy.record_exists_action).to eq Aerospike::RecordExistsAction::UPDATE
expect(policy.generation_policy).to eq Aerospike::GenerationPolicy::NONE
Expand Down