Skip to content

Commit e34d7fc

Browse files
committed
Refactor SASL authentication code
1 parent 7a5fbcb commit e34d7fc

11 files changed

+410
-587
lines changed

lib/kafka/sasl/gssapi.rb

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
module Kafka
2+
module Sasl
3+
class Gssapi
4+
GSSAPI_IDENT = "GSSAPI"
5+
GSSAPI_CONFIDENTIALITY = false
6+
7+
def initialize(logger:, principal:, keytab:)
8+
@logger = logger
9+
@principal = principal
10+
@keytab = keytab
11+
end
12+
13+
def configured?
14+
@principal && !@principal.empty?
15+
end
16+
17+
def ident
18+
GSSAPI_IDENT
19+
end
20+
21+
def authenticate!(host, encoder, decoder)
22+
load_gssapi
23+
initialize_gssapi_context(host)
24+
25+
@encoder = encoder
26+
@decoder = decoder
27+
28+
# send gssapi token and receive token to verify
29+
token_to_verify = send_and_receive_sasl_token
30+
31+
# verify incoming token
32+
unless @gssapi_ctx.init_context(token_to_verify)
33+
raise Kafka::Error, "GSSAPI context verification failed."
34+
end
35+
36+
# we can continue, so send OK
37+
@encoder.write([0, 2].pack('l>c'))
38+
39+
# read wrapped message and return it back with principal
40+
handshake_messages
41+
end
42+
43+
def handshake_messages
44+
msg = @decoder.bytes
45+
raise Kafka::Error, "GSSAPI negotiation failed." unless msg
46+
# unwrap with integrity only
47+
msg_unwrapped = @gssapi_ctx.unwrap_message(msg, GSSAPI_CONFIDENTIALITY)
48+
msg_wrapped = @gssapi_ctx.wrap_message(msg_unwrapped + @principal, GSSAPI_CONFIDENTIALITY)
49+
@encoder.write_bytes(msg_wrapped)
50+
end
51+
52+
def send_and_receive_sasl_token
53+
@encoder.write_bytes(@gssapi_token)
54+
@decoder.bytes
55+
end
56+
57+
def load_gssapi
58+
begin
59+
require "gssapi"
60+
rescue LoadError
61+
@logger.error "In order to use GSSAPI authentication you need to install the `gssapi` gem."
62+
raise
63+
end
64+
end
65+
66+
def initialize_gssapi_context(host)
67+
@logger.debug "GSSAPI: Initializing context with #{host}, principal #{@principal}"
68+
69+
@gssapi_ctx = GSSAPI::Simple.new(host, @principal, @keytab)
70+
@gssapi_token = @gssapi_ctx.init_context(nil)
71+
end
72+
end
73+
end
74+
end

lib/kafka/sasl/plain.rb

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
module Kafka
2+
module Sasl
3+
class Plain
4+
PLAIN_IDENT = "PLAIN"
5+
6+
def initialize(logger:, authzid:, username:, password:)
7+
@logger = logger
8+
@authzid = authzid
9+
@username = username
10+
@password = password
11+
end
12+
13+
def ident
14+
PLAIN_IDENT
15+
end
16+
17+
def configured?
18+
@authzid && @username && @password
19+
end
20+
21+
def authenticate!(host, encoder, decoder)
22+
msg = [@authzid, @username, @password].join("\000").force_encoding("utf-8")
23+
24+
encoder.write_bytes(msg)
25+
26+
begin
27+
msg = decoder.bytes
28+
raise Kafka::Error, "SASL PLAIN authentication failed: unknown error" unless msg
29+
rescue Errno::ETIMEDOUT, EOFError => e
30+
raise Kafka::Error, "SASL PLAIN authentication failed: #{e.message}"
31+
end
32+
33+
@logger.debug "SASL PLAIN authentication successful."
34+
end
35+
end
36+
end
37+
end

lib/kafka/sasl/scram.rb

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
require 'securerandom'
2+
require 'base64'
3+
4+
module Kafka
5+
module Sasl
6+
class Scram
7+
MECHANISMS = {
8+
"sha256" => "SCRAM-SHA-256",
9+
"sha512" => "SCRAM-SHA-512",
10+
}.freeze
11+
12+
def initialize(username:, password:, mechanism: 'sha256', logger:)
13+
@username = username
14+
@password = password
15+
@logger = logger
16+
17+
if mechanism
18+
@mechanism = MECHANISMS.fetch(mechanism) do
19+
raise Kafka::SaslScramError, "SCRAM mechanism #{mechanism} is not supported."
20+
end
21+
end
22+
end
23+
24+
def ident
25+
@mechanism
26+
end
27+
28+
def configured?
29+
@username && @password && @mechanism
30+
end
31+
32+
def authenticate!(host, encoder, decoder)
33+
@logger.debug "Authenticating #{@username} with SASL #{@mechanism}"
34+
35+
begin
36+
msg = first_message
37+
@logger.debug "Sending first client SASL SCRAM message: #{msg}"
38+
encoder.write_bytes(msg)
39+
40+
@server_first_message = decoder.bytes
41+
@logger.debug "Received first server SASL SCRAM message: #{@server_first_message}"
42+
43+
msg = final_message
44+
@logger.debug "Sending final client SASL SCRAM message: #{msg}"
45+
encoder.write_bytes(msg)
46+
47+
response = parse_response(decoder.bytes)
48+
@logger.debug "Received last server SASL SCRAM message: #{response}"
49+
50+
raise FailedScramAuthentication, response['e'] if response['e']
51+
raise FailedScramAuthentication, "Invalid server signature" if response['v'] != server_signature
52+
rescue EOFError => e
53+
raise FailedScramAuthentication, e.message
54+
end
55+
56+
@logger.debug "SASL SCRAM authentication successful"
57+
end
58+
59+
private
60+
61+
def first_message
62+
"n,,#{first_message_bare}"
63+
end
64+
65+
def first_message_bare
66+
"n=#{encoded_username},r=#{nonce}"
67+
end
68+
69+
def final_message_without_proof
70+
"c=biws,r=#{rnonce}"
71+
end
72+
73+
def final_message
74+
"#{final_message_without_proof},p=#{client_proof}"
75+
end
76+
77+
def server_data
78+
parse_response(@server_first_message)
79+
end
80+
81+
def rnonce
82+
server_data['r']
83+
end
84+
85+
def salt
86+
Base64.strict_decode64(server_data['s'])
87+
end
88+
89+
def iterations
90+
server_data['i'].to_i
91+
end
92+
93+
def auth_message
94+
[first_message_bare, @server_first_message, final_message_without_proof].join(',')
95+
end
96+
97+
def salted_password
98+
hi(@password, salt, iterations)
99+
end
100+
101+
def client_key
102+
hmac(salted_password, 'Client Key')
103+
end
104+
105+
def stored_key
106+
h(client_key)
107+
end
108+
109+
def server_key
110+
hmac(salted_password, 'Server Key')
111+
end
112+
113+
def client_signature
114+
hmac(stored_key, auth_message)
115+
end
116+
117+
def server_signature
118+
Base64.strict_encode64(hmac(server_key, auth_message))
119+
end
120+
121+
def client_proof
122+
Base64.strict_encode64(xor(client_key, client_signature))
123+
end
124+
125+
def h(str)
126+
digest.digest(str)
127+
end
128+
129+
def hi(str, salt, iterations)
130+
OpenSSL::PKCS5.pbkdf2_hmac(
131+
str,
132+
salt,
133+
iterations,
134+
digest.size,
135+
digest
136+
)
137+
end
138+
139+
def hmac(data, key)
140+
OpenSSL::HMAC.digest(digest, data, key)
141+
end
142+
143+
def xor(first, second)
144+
first.bytes.zip(second.bytes).map { |(a, b)| (a ^ b).chr }.join('')
145+
end
146+
147+
def parse_response(data)
148+
data.split(',').map { |s| s.split('=', 2) }.to_h
149+
end
150+
151+
def encoded_username
152+
safe_str(@username.encode(Encoding::UTF_8))
153+
end
154+
155+
def nonce
156+
@nonce ||= SecureRandom.urlsafe_base64(32)
157+
end
158+
159+
def digest
160+
@digest ||= case @mechanism
161+
when 'SCRAM-SHA-256'
162+
OpenSSL::Digest::SHA256.new
163+
when 'SCRAM-SHA-512'
164+
OpenSSL::Digest::SHA512.new
165+
else
166+
raise ArgumentError, "Unknown SASL mechanism '#{@mechanism}'"
167+
end
168+
end
169+
170+
def safe_str(val)
171+
val.gsub('=', '=3D').gsub(',', '=2C')
172+
end
173+
end
174+
end
175+
end

lib/kafka/sasl_authenticator.rb

Lines changed: 25 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,48 @@
1-
require 'kafka/sasl_gssapi_authenticator'
2-
require 'kafka/sasl_plain_authenticator'
3-
require 'kafka/sasl_scram_authenticator'
1+
require 'kafka/sasl/plain'
2+
require 'kafka/sasl/gssapi'
3+
require 'kafka/sasl/scram'
44

55
module Kafka
66
class SaslAuthenticator
77
def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:,
88
sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:,
99
sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:)
1010
@logger = logger
11-
@sasl_gssapi_principal = sasl_gssapi_principal
12-
@sasl_gssapi_keytab = sasl_gssapi_keytab
13-
@sasl_plain_authzid = sasl_plain_authzid
14-
@sasl_plain_username = sasl_plain_username
15-
@sasl_plain_password = sasl_plain_password
16-
@sasl_scram_username = sasl_scram_username
17-
@sasl_scram_password = sasl_scram_password
18-
@sasl_scram_mechanism = sasl_scram_mechanism
19-
end
20-
21-
def authenticate!(connection)
22-
if authenticate_using_sasl_gssapi?
23-
sasl_gssapi_authenticate(connection)
24-
elsif authenticate_using_sasl_plain?
25-
sasl_plain_authenticate(connection)
26-
elsif authenticate_using_sasl_scram?
27-
sasl_scram_authenticate(connection)
28-
end
29-
end
3011

31-
private
32-
33-
def sasl_scram_authenticate(connection)
34-
auth = SaslScramAuthenticator.new(
35-
username: @sasl_scram_username,
36-
password: @sasl_scram_password,
12+
@plain = Sasl::Plain.new(
13+
authzid: sasl_plain_authzid,
14+
username: sasl_plain_username,
15+
password: sasl_plain_password,
3716
logger: @logger,
38-
mechanism: @sasl_scram_mechanism,
39-
connection: connection
4017
)
4118

42-
auth.authenticate!
43-
end
44-
45-
def sasl_gssapi_authenticate(connection)
46-
auth = SaslGssapiAuthenticator.new(
19+
@gssapi = Sasl::Gssapi.new(
20+
principal: sasl_gssapi_principal,
21+
keytab: sasl_gssapi_keytab,
4722
logger: @logger,
48-
sasl_gssapi_principal: @sasl_gssapi_principal,
49-
sasl_gssapi_keytab: @sasl_gssapi_keytab,
50-
connection: connection
5123
)
5224

53-
auth.authenticate!
54-
end
55-
56-
def sasl_plain_authenticate(connection)
57-
auth = SaslPlainAuthenticator.new(
25+
@scram = Sasl::Scram.new(
26+
username: sasl_scram_username,
27+
password: sasl_scram_password,
28+
mechanism: sasl_scram_mechanism,
5829
logger: @logger,
59-
authzid: @sasl_plain_authzid,
60-
username: @sasl_plain_username,
61-
password: @sasl_plain_password,
62-
connection: connection
6330
)
64-
65-
auth.authenticate!
6631
end
6732

68-
def authenticate_using_sasl_scram?
69-
@sasl_scram_username && @sasl_scram_password
70-
end
33+
def authenticate!(connection)
34+
mechanism = [@gssapi, @plain, @scram].find(&:configured?)
7135

72-
def authenticate_using_sasl_gssapi?
73-
!@ssl_context && @sasl_gssapi_principal && !@sasl_gssapi_principal.empty?
74-
end
36+
return if mechanism.nil?
37+
38+
ident = mechanism.ident
39+
response = connection.send_request(Kafka::Protocol::SaslHandshakeRequest.new(ident))
40+
41+
unless response.error_code == 0 && response.enabled_mechanisms.include?(ident)
42+
raise Kafka::Error, "#{ident} is not supported."
43+
end
7544

76-
def authenticate_using_sasl_plain?
77-
@sasl_plain_authzid && @sasl_plain_username && @sasl_plain_password
45+
mechanism.authenticate!(connection.to_s, connection.encoder, connection.decoder)
7846
end
7947
end
8048
end

0 commit comments

Comments
 (0)