Skip to content

Commit 3e2d5e8

Browse files
committed
feat(out_waterdrop): add basic waterdrop output plugin
Signed-off-by: Ray Tung <[email protected]>
1 parent ea0f10a commit 3e2d5e8

File tree

5 files changed

+213
-0
lines changed

5 files changed

+213
-0
lines changed

Rakefile

+3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ require 'rake/testtask'
55

66
Rake::TestTask.new(:test) do |test|
77
test.libs << 'lib' << 'test'
8+
9+
# TODO: include the waterdrop tests after fixing up CI
810
test.test_files = FileList['test/**/test_*.rb']
11+
.exclude('test/**/test_out_waterdrop.rb')
912
test.verbose = true
1013
end
1114

docker-compose.yml

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
version: "3"
2+
services:
3+
kafka:
4+
image: confluentinc/confluent-local:7.4.3
5+
ports:
6+
- "9092:9092"
7+
8+

fluent-plugin-kafka.gemspec

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
1919
gem.add_dependency "fluentd", [">= 0.10.58", "< 2"]
2020
gem.add_dependency 'ltsv'
2121
gem.add_dependency 'ruby-kafka', '>= 1.5.0', '< 2'
22+
gem.add_dependency 'waterdrop', '~> 2.6'
2223
gem.add_development_dependency "rake", ">= 0.9.2"
2324
gem.add_development_dependency "test-unit", ">= 3.0.8"
2425
gem.add_development_dependency "test-unit-rr", "~> 1.0"

lib/fluent/plugin/out_waterdrop.rb

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
require 'thread'
2+
require 'logger'
3+
require 'fluent/plugin/output'
4+
require 'fluent/plugin/kafka_plugin_util'
5+
require 'waterdrop'
6+
7+
module Fluent::Plugin
8+
class Fluent::WaterdropOutput < Output
9+
Fluent::Plugin.register_output('waterdrop', self)
10+
helpers :inject, :formatter, :record_accessor
11+
12+
config_param :bootstrap_servers, :string, default: 'localhost:9092',
13+
desc: <<-DESC
14+
Set bootstrap servers directly:
15+
<broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
16+
DESC
17+
18+
config_param :default_topic, :string, default: nil, desc: <<-DESC
19+
Default output topic when record doesn't have topic field
20+
DESC
21+
22+
config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic"
23+
24+
config_section :buffer do
25+
config_set_default :chunk_keys, ["topic"]
26+
end
27+
28+
config_section :format do
29+
config_set_default :@type, 'json'
30+
config_set_default :add_newline, false
31+
end
32+
33+
def initialize
34+
super
35+
36+
config = {
37+
'bootstrap.servers': @bootstrap_servers
38+
}
39+
40+
@producer = WaterDrop::Producer.new do |conf|
41+
conf.deliver = true
42+
conf.kafka = config
43+
end
44+
45+
@formatter_proc = nil
46+
@topic_key_sym = @topic_key.to_sym
47+
end
48+
49+
def configure(conf)
50+
super
51+
52+
formatter_conf = conf.elements('format').first
53+
unless formatter_conf
54+
raise Fluent::ConfigError, "<format> section is required."
55+
end
56+
unless formatter_conf["@type"]
57+
raise Fluent::ConfigError, "format/@type is required."
58+
end
59+
60+
@formatter_proc = setup_formatter(formatter_conf)
61+
end
62+
63+
def setup_formatter(conf)
64+
@formatter = formatter_create(usage: 'waterdrop-plugin', conf: conf)
65+
@formatter.method(:format)
66+
end
67+
68+
def write(chunk)
69+
tag = chunk.metadata.tag
70+
topic = if @topic
71+
extract_placeholders(@topic, chunk)
72+
else
73+
(chunk.metadata.variables && chunk.metadata.variables[@topic_key_sym]) || @default_topic || tag
74+
end
75+
begin
76+
chunk.msgpack_each do |time, record|
77+
record_buf = @formatter_proc.call(tag, time, record)
78+
@producer.buffer(topic: topic, payload: record_buf)
79+
end
80+
81+
@producer.flush_sync
82+
end
83+
end
84+
85+
def shutdown
86+
super
87+
88+
@producer.close
89+
end
90+
end
91+
end

test/plugin/test_out_waterdrop.rb

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
require 'fluent/test'
2+
require 'fluent/test/helpers'
3+
require 'fluent/test/driver/output'
4+
require 'fluent/plugin/out_waterdrop'
5+
require 'rdkafka'
6+
require 'json'
7+
8+
# 1. run docker-compose to spin up the Kafka broker
9+
# 2. Run these tests
10+
class WaterdropOutputTest < Test::Unit::TestCase
11+
include Fluent::Test::Helpers
12+
13+
def setup
14+
Fluent::Test.setup
15+
end
16+
17+
def create_driver(conf, tag = 'test')
18+
Fluent::Test::Driver::Output.new(Fluent::WaterdropOutput).configure(conf)
19+
end
20+
21+
sub_test_case 'configure' do
22+
test 'basic configuration' do
23+
assert_nothing_raised(Fluent::ConfigError) do
24+
config = %[
25+
@type waterdrop
26+
<format>
27+
@type json
28+
</format>
29+
]
30+
driver = create_driver(config)
31+
32+
assert_equal 'localhost:9092', driver.instance.bootstrap_servers
33+
end
34+
end
35+
36+
test 'missing format section' do
37+
assert_raise(Fluent::ConfigError) do
38+
config = %[
39+
@type waterdrop
40+
]
41+
create_driver(config)
42+
end
43+
end
44+
45+
test 'formatter section missing @type' do
46+
assert_raise(Fluent::ConfigError) do
47+
config = %[
48+
@type waterdrop
49+
<format>
50+
literally 'anything else'
51+
</format>
52+
]
53+
create_driver(config)
54+
end
55+
end
56+
end
57+
58+
sub_test_case 'produce' do
59+
GLOBAL_CONFIG = {
60+
"bootstrap.servers" => "localhost:9092",
61+
"topic.metadata.propagation.max.ms" => 11 * 1_000,
62+
"topic.metadata.refresh.interval.ms" => 10 * 1_000,
63+
}
64+
TOPIC = 'produce.basic-produce'
65+
66+
def setup
67+
@kafka_admin = Rdkafka::Config.new(GLOBAL_CONFIG).admin
68+
@kafka_consumer = Rdkafka::Config.new(
69+
GLOBAL_CONFIG.merge(
70+
{
71+
"group.id" => "waterdrop",
72+
"auto.offset.reset" => "earliest",
73+
}
74+
)
75+
).consumer
76+
77+
@kafka_admin.delete_topic(TOPIC)
78+
@kafka_admin.create_topic(TOPIC, 1, 1)
79+
.wait(max_wait_timeout: 30)
80+
@kafka_consumer.subscribe(TOPIC)
81+
end
82+
83+
def teardown
84+
@kafka_consumer.close
85+
@kafka_admin.delete_topic(TOPIC)
86+
@kafka_admin.close
87+
end
88+
89+
test 'basic produce' do
90+
config = %[
91+
@type waterdrop
92+
default_topic #{TOPIC}
93+
<format>
94+
@type json
95+
</format>
96+
]
97+
d = create_driver(config)
98+
d.run(default_tag: TOPIC, flush: true) do
99+
d.feed(Fluent::EventTime.now, { topic: TOPIC, body: '123' })
100+
end
101+
102+
sleep(12)
103+
104+
raw_message = @kafka_consumer.poll(5_000)
105+
106+
message = JSON.parse!(raw_message.payload)
107+
assert_equal '123', message["body"]
108+
end
109+
end
110+
end

0 commit comments

Comments
 (0)