diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1433992 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +/.bundle/ +/.yardoc +/_yardoc/ +/coverage/ +/doc/ +/pkg/ +/spec/reports/ +/tmp/ + +# rspec failure tracking +.rspec_status + +# build artifacts +lib/pulsar/bindings.bundle +*.gem diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..34c5164 --- /dev/null +++ b/.rspec @@ -0,0 +1,3 @@ +--format documentation +--color +--require spec_helper diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..bb27cea --- /dev/null +++ b/.travis.yml @@ -0,0 +1,5 @@ +sudo: false +language: ruby +rvm: + - 2.4.3 +before_install: gem install bundler -v 1.16.1 diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md new file mode 100644 index 0000000..8e2d1ab --- /dev/null +++ b/CONTRIBUTORS.md @@ -0,0 +1,26 @@ + + +#### Initial contributors + +* Jacob Fugal +* Matteo Merli +* JD Harrington diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..2ccaf5f --- /dev/null +++ b/Gemfile @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +source "https://rubygems.org" + +git_source(:github) {|repo_name| "https://github.com/#{repo_name}" } + +# Specify your gem's dependencies in pulsar-client.gemspec +gemspec diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..49a5fba --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,40 @@ +PATH + remote: . + specs: + pulsar-client (0.0.1) + rake-compiler + rice + +GEM + remote: https://rubygems.org/ + specs: + diff-lcs (1.3) + rake (10.5.0) + rake-compiler (1.0.7) + rake + rice (2.1.3) + rspec (3.8.0) + rspec-core (~> 3.8.0) + rspec-expectations (~> 3.8.0) + rspec-mocks (~> 3.8.0) + rspec-core (3.8.0) + rspec-support (~> 3.8.0) + rspec-expectations (3.8.3) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.8.0) + rspec-mocks (3.8.0) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.8.0) + rspec-support (3.8.0) + +PLATFORMS + ruby + +DEPENDENCIES + bundler (~> 1.16) + pulsar-client! + rake (~> 10.0) + rspec (~> 3.0) + +BUNDLED WITH + 1.16.1 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..96c2045 --- /dev/null +++ b/NOTICE @@ -0,0 +1,6 @@ + +Apache Pulsar Ruby Client +Copyright 2019 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). diff --git a/README.md b/README.md index ed9f38a..fe374aa 100644 --- a/README.md +++ b/README.md @@ -1 +1,114 @@ # Apache Pulsar Ruby client + +## Installation + +Add this line to your application's Gemfile: + +```ruby +gem "pulsar-client", "~> 2.4.1.pre" +``` + +And then execute: + + $ bundle + +Or install it yourself as: + + $ gem install pulsar-client --pre + +Note #1: You will need libpulsar (for linking) and libpulsar-dev (for +C++ client header files, for compiling) installed first. For both, the +Gem currently targets version 2.4.1. If your libpulsar is older, it will +fail to compile. If it is newer, it _might_ compile is not guaranteed. + +Note #2: This is a pre-release version of this Gem. You will need the +`--pre` flag to `gem install` to install it manually, and must include +the `.pre` suffix in the Gemfile to install it via Bundler. + +## Usage + +Setup and basic `consumer.receive` example: + +```ruby +# have these in your shell with appropriate values +# export PULSAR_BROKER_URI=pulsar://your-pulsar-broker:6651 +# export PULSAR_CERT_PATH=/path/to/your/pulsar-ca.pem +# export PULSAR_AUTH_TOKEN=your-auth-token + +# create client using values from environment +client = Pulsar::Client.from_environment + +# produce a message on the "hello-world" topic in the "namespace" +# namespace of the "tenant" tenant +topic = "tenant/namespace/topic" +producer = client.create_producer(topic) +producer.send("Hello, world!") + +# consumer that message from the topic with an exclusive subscription +# named "hello-consumer" +subscription = "hello-consumer" +consumer = client.subscribe(topic, subscription) + +msg = consumer.receive +message = msg.data +puts "got #{message}" +consumer.acknolwedge(msg) +``` + +Convenience method for listening to messages in a loop: + +```ruby +consumer.listen do |message, _, done| + # process message here; call done to stop the loop. + # messages are auto-acknowledged. + puts "got #{message}" + done.call() +end +``` + +Convenience method for listening on a separate thread: + +```ruby +listenerThread = consumer.listen_in_thread do |message, _, done| + # process message here; call done to stop the loop. + # messages are auto-acknowledged. + puts "got #{message}" + done.call() +end +# ... +listenerThread.join # wait for the thread to finish +``` + +(more documentation coming; see TODO.md) + +## Development + +If your ruby is not already compiled with `--enable-shared`, you'll need +to rebuild it. Example for rbenv: + +``` +CONFIGURE_OPTS="--enable-shared" rbenv install +``` + +If you don't already have them installed, you need libpulsar and +automake for the compilation and linking to work. Example with brew: + +``` +brew install libpulsar automake +``` + +Next, run `bin/setup` to install dependencies -- Rice in particular. +Once that successfully completes, you can `rake compile` to build the +extension. It is then ready to use locally. + +You can run `bin/console` for an interactive prompt that will +allow you to experiment. You can also run `rake spec` to run the tests. + +To install this gem onto your local machine, run `bundle exec rake +install`. + +## Contributing + +Bug reports and pull requests are welcome on GitHub at +https://github.com/apache/pulsar-client-ruby or +https://github.com/instructure/pulsar-client-ruby. diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..e553acc --- /dev/null +++ b/Rakefile @@ -0,0 +1,10 @@ +require "rspec/core/rake_task" +require "rake/extensiontask" + +RSpec::Core::RakeTask.new(:spec) + +task :default => :spec + +Rake::ExtensionTask.new "bindings" do |ext| + ext.lib_dir = "lib/pulsar" +end diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..2069c4f --- /dev/null +++ b/TODO.md @@ -0,0 +1,24 @@ +# Confirm Licensing and Copyright + +Currently, the repository LICENSE file states APL 2.0, reiterated in the +Ruby source files. The NOTICE file attributes Copyright to the ASF. Get +approval for this from Instructure OSC. + +Aside: The uncompiled C++ in the ext/bindings/ directory are also +distributed, to be compiled on the user's machine during gem install. +Should these then also have license information per file, as the Ruby +files do? + +# README Detail + +The README has very minimal information on installing and building +locally right now. It needs to be fleshed out more. In particular, usage +of the library, specifically around significant divergences from the C++ +code (e.g. `ClientConfiguration#authentication_token=` and +`Consumer#listen`) + +# Write Some Specs + +# Code TODOs + +* `producer.schema` diff --git a/bin/console b/bin/console new file mode 100755 index 0000000..c60569e --- /dev/null +++ b/bin/console @@ -0,0 +1,8 @@ +#!/usr/bin/env ruby + +$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), "../lib")) + +require "pulsar/client" +require "irb" + +IRB.start(__FILE__) diff --git a/bin/example-consumer b/bin/example-consumer new file mode 100755 index 0000000..12033ae --- /dev/null +++ b/bin/example-consumer @@ -0,0 +1,45 @@ +#!/usr/bin/env ruby + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), "../lib")) + +require "pulsar/client" + +unless ARGV.size >= 1 && ARGV.size <= 2 && Pulsar::Client.sufficient_environment? + puts "Usage: #{__FILE__} [consumer-name]" + puts + puts "If not specified, the consumer name defaults to 'example-consumer'." + puts + puts "Additionally, the PULSAR_BROKER_URI environment variable must be set. Optional" + puts "PULSAR_CERT_PATH and PULSAR_AUTH_TOKEN environment variables are also" + puts "recognized." + exit 1 +end + +topic = ARGV[0] +consumer_name = ARGV[1] || 'example-consumer' +client = Pulsar::Client.from_environment +consumer = client.subscribe(topic, consumer_name) +consumer.listen do |message, id, finish| + puts "Received message '#{message}' [id: #{id}]" + finish.call if message == "exit" +end +client.close diff --git a/bin/example-producer b/bin/example-producer new file mode 100755 index 0000000..ba15d4d --- /dev/null +++ b/bin/example-producer @@ -0,0 +1,43 @@ +#!/usr/bin/env ruby + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), "../lib")) + +require "pulsar/client" + +unless ARGV.size == 1 && Pulsar::Client.sufficient_environment? + puts "Usage: #{__FILE__} " + puts + puts "Additionally, the PULSAR_BROKER_URI environment variable must be set. Optional" + puts "PULSAR_CERT_PATH and PULSAR_AUTH_TOKEN environment variables are also" + puts "recognized." + exit 1 +end + +topic = ARGV.shift +client = Pulsar::Client.from_environment +producer = client.create_producer(topic) +while data = gets + data.chomp! + producer.send(data) + break if data == "exit" +end +client.close diff --git a/bin/example-reader b/bin/example-reader new file mode 100755 index 0000000..8faa0fd --- /dev/null +++ b/bin/example-reader @@ -0,0 +1,42 @@ +#!/usr/bin/env ruby + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), "../lib")) + +require "pulsar/client" + +unless ARGV.size == 1 && Pulsar::Client.sufficient_environment? + puts "Usage: #{__FILE__} " + puts + puts "Additionally, the PULSAR_BROKER_URI environment variable must be set. Optional" + puts "PULSAR_CERT_PATH and PULSAR_AUTH_TOKEN environment variables are also" + puts "recognized." + exit 1 +end + +topic = ARGV.shift +client = Pulsar::Client.from_environment +reader = client.create_reader(topic, Pulsar::MessageId.earliest, reader_name: "example-reader") +while reader.message_available? + message = reader.next_message + puts "Read message '#{message.data}' [id: #{message.message_id}]" +end +client.close diff --git a/bin/setup b/bin/setup new file mode 100755 index 0000000..cf4ad25 --- /dev/null +++ b/bin/setup @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +set -euo pipefail +IFS=$'\n\t' +set -vx + +bundle install diff --git a/ext/bindings/bindings.cpp b/ext/bindings/bindings.cpp new file mode 100644 index 0000000..3652054 --- /dev/null +++ b/ext/bindings/bindings.cpp @@ -0,0 +1,22 @@ +#include "rice/Module.hpp" + +#include "message.hpp" +#include "producer.hpp" +#include "consumer.hpp" +#include "reader.hpp" +#include "client.hpp" +#include "util.hpp" + +using namespace Rice; + +extern "C" +void Init_bindings() +{ + Module rb_mPulsar = define_module("Pulsar"); + bind_errors(rb_mPulsar); + bind_message(rb_mPulsar); + bind_producer(rb_mPulsar); + bind_consumer(rb_mPulsar); + bind_reader(rb_mPulsar); + bind_client(rb_mPulsar); +} diff --git a/ext/bindings/client.cpp b/ext/bindings/client.cpp new file mode 100644 index 0000000..1f69814 --- /dev/null +++ b/ext/bindings/client.cpp @@ -0,0 +1,210 @@ +#include "rice/Data_Type.hpp" +#include "rice/Constructor.hpp" +#include +#include + +#include "client.hpp" +#include "util.hpp" + +namespace pulsar_rb { + +ClientConfiguration::ClientConfiguration() : _config() { +} + +void ClientConfiguration::setAuthFromToken(const std::string &token) { + _config.setAuth(pulsar::AuthToken::createWithToken(token)); +} + +int ClientConfiguration::getOperationTimeoutSeconds() { + return _config.getOperationTimeoutSeconds(); +} + +void ClientConfiguration::setOperationTimeoutSeconds(int timeout) { + _config.setOperationTimeoutSeconds(timeout); +} + +int ClientConfiguration::getIOThreads() { + return _config.getIOThreads(); +} + +void ClientConfiguration::setIOThreads(int threads) { + _config.setIOThreads(threads); +} + +int ClientConfiguration::getMessageListenerThreads() { + return _config.getMessageListenerThreads(); +} + +void ClientConfiguration::setMessageListenerThreads(int threads) { + _config.setMessageListenerThreads(threads); +} + +int ClientConfiguration::getConcurrentLookupRequest() { + return _config.getConcurrentLookupRequest(); +} + +void ClientConfiguration::setConcurrentLookupRequest(int n) { + _config.setConcurrentLookupRequest(n); +} + +std::string ClientConfiguration::getLogConfFilePath() { + return _config.getLogConfFilePath(); +} + +void ClientConfiguration::setLogConfFilePath(const std::string& path) { + _config.setLogConfFilePath(path); +} + +bool ClientConfiguration::isUseTls() { + return _config.isUseTls(); +} + +void ClientConfiguration::setUseTls(bool enable) { + _config.setUseTls(enable); +} + +std::string ClientConfiguration::getTlsTrustCertsFilePath() { + return _config.getTlsTrustCertsFilePath(); +} + +void ClientConfiguration::setTlsTrustCertsFilePath(const std::string& path) { + _config.setTlsTrustCertsFilePath(path); +} + +bool ClientConfiguration::isTlsAllowInsecureConnection() { + return _config.isTlsAllowInsecureConnection(); +} + +void ClientConfiguration::setTlsAllowInsecureConnection(bool enable) { + _config.setTlsAllowInsecureConnection(enable); +} + +bool ClientConfiguration::isValidateHostName() { + return _config.isValidateHostName(); +} + +void ClientConfiguration::setValidateHostName(bool enable) { + _config.setValidateHostName(enable); +} + +Client::Client(Rice::String service_url, const ClientConfiguration& config) : _client(service_url.str(), config._config) { +} + +typedef struct { + pulsar::Client& client; + const Rice::String& topic; + const pulsar::ProducerConfiguration& config; + pulsar::Producer producer; + pulsar::Result result; +} client_create_producer_task; + +void* client_create_producer_worker(void* taskPtr) { + client_create_producer_task& task = *(client_create_producer_task*)taskPtr; + task.result = task.client.createProducer(task.topic.str(), task.config, task.producer); + return nullptr; +} + +Producer::ptr Client::create_producer(Rice::String topic, const ProducerConfiguration& config) { + client_create_producer_task task = { _client, topic, config }; + rb_thread_call_without_gvl(&client_create_producer_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); + return Producer::ptr(new Producer(task.producer)); +} + +typedef struct { + pulsar::Client& client; + const Rice::String& topic; + const Rice::String& subscriptionName; + const pulsar::ConsumerConfiguration& config; + pulsar::Consumer consumer; + pulsar::Result result; +} client_subscribe_task; + +void* client_subscribe_worker(void* taskPtr) { + client_subscribe_task& task = *(client_subscribe_task*)taskPtr; + task.result = task.client.subscribe(task.topic.str(), task.subscriptionName.str(), task.config, task.consumer); + return nullptr; +} + +Consumer::ptr Client::subscribe(Rice::String topic, Rice::String subscriptionName, const ConsumerConfiguration& config) { + client_subscribe_task task = { _client, topic, subscriptionName, config }; + rb_thread_call_without_gvl(&client_subscribe_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); + return Consumer::ptr(new Consumer(task.consumer)); +} + +typedef struct { + pulsar::Client& client; + const Rice::String& topic; + const pulsar::MessageId& startMessageId; + const pulsar::ReaderConfiguration& config; + pulsar::Reader reader; + pulsar::Result result; +} client_create_reader_task; + +void* client_create_reader_worker(void* taskPtr) { + client_create_reader_task& task = *(client_create_reader_task*)taskPtr; + task.result = task.client.createReader(task.topic.str(), task.startMessageId, task.config, task.reader); + return nullptr; +} + +Reader::ptr Client::create_reader(Rice::String topic, const MessageId& startMessageId, const ReaderConfiguration& config) { + client_create_reader_task task = { _client, topic, startMessageId._msgId, config }; + rb_thread_call_without_gvl(&client_create_reader_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); + return Reader::ptr(new Reader(task.reader)); +} + +typedef struct { + pulsar::Client& client; + pulsar::Result result; +} client_close_task; + +void* client_close_worker(void* taskPtr) { + client_close_task& task = *(client_close_task*)taskPtr; + task.result = task.client.close(); + return nullptr; +} + +void Client::close() { + client_close_task task = { _client }; + rb_thread_call_without_gvl(&client_close_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); +} + +} + +using namespace Rice; + +void bind_client(Module& module) { + define_class_under(module, "Client") + .define_constructor(Constructor()) + .define_method("create_producer", &pulsar_rb::Client::create_producer) + .define_method("subscribe", &pulsar_rb::Client::subscribe) + .define_method("create_reader", &pulsar_rb::Client::create_reader) + .define_method("close", &pulsar_rb::Client::close) + ; + + define_class_under(module, "ClientConfiguration") + .define_constructor(Constructor()) + .define_method("authentication_token=", &pulsar_rb::ClientConfiguration::setAuthFromToken) + .define_method("operation_timeout_seconds", &pulsar_rb::ClientConfiguration::getOperationTimeoutSeconds) + .define_method("operation_timeout_seconds=", &pulsar_rb::ClientConfiguration::setOperationTimeoutSeconds) + .define_method("io_threads", &pulsar_rb::ClientConfiguration::getIOThreads) + .define_method("io_threads=", &pulsar_rb::ClientConfiguration::setIOThreads) + .define_method("message_listener_threads", &pulsar_rb::ClientConfiguration::getMessageListenerThreads) + .define_method("message_listener_threads=", &pulsar_rb::ClientConfiguration::setMessageListenerThreads) + .define_method("concurrent_lookup_requests", &pulsar_rb::ClientConfiguration::getConcurrentLookupRequest) + .define_method("concurrent_lookup_requests=", &pulsar_rb::ClientConfiguration::setConcurrentLookupRequest) + .define_method("log_conf_file_path", &pulsar_rb::ClientConfiguration::getLogConfFilePath) + .define_method("log_conf_file_path=", &pulsar_rb::ClientConfiguration::setLogConfFilePath) + .define_method("use_tls?", &pulsar_rb::ClientConfiguration::isUseTls) + .define_method("use_tls=", &pulsar_rb::ClientConfiguration::setUseTls) + .define_method("tls_trust_certs_file_path", &pulsar_rb::ClientConfiguration::getTlsTrustCertsFilePath) + .define_method("tls_trust_certs_file_path=", &pulsar_rb::ClientConfiguration::setTlsTrustCertsFilePath) + .define_method("tls_allow_insecure_connection?", &pulsar_rb::ClientConfiguration::isTlsAllowInsecureConnection) + .define_method("tls_allow_insecure_connection=", &pulsar_rb::ClientConfiguration::setTlsAllowInsecureConnection) + .define_method("tls_validate_hostname?", &pulsar_rb::ClientConfiguration::isValidateHostName) + .define_method("tls_validate_hostname=", &pulsar_rb::ClientConfiguration::setValidateHostName) + ; +} diff --git a/ext/bindings/client.hpp b/ext/bindings/client.hpp new file mode 100644 index 0000000..f381ab3 --- /dev/null +++ b/ext/bindings/client.hpp @@ -0,0 +1,59 @@ +#ifndef __PULSAR_RUBY_CLIENT_CLIENT_HPP +#define __PULSAR_RUBY_CLIENT_CLIENT_HPP + +#include "rice/Module.hpp" +#include "rice/String.hpp" +#include "rice/Data_Object.hpp" +#include + +#include "producer.hpp" +#include "consumer.hpp" +#include "reader.hpp" + +namespace pulsar_rb { + class ClientConfiguration { + public: + pulsar::ClientConfiguration _config; + ClientConfiguration(); + + void setAuthFromToken(const std::string &token); + int getOperationTimeoutSeconds(); + void setOperationTimeoutSeconds(int timeout); + int getIOThreads(); + void setIOThreads(int threads); + int getMessageListenerThreads(); + void setMessageListenerThreads(int threads); + int getConcurrentLookupRequest(); + void setConcurrentLookupRequest(int n); + std::string getLogConfFilePath(); + void setLogConfFilePath(const std::string& path); + bool isUseTls(); + void setUseTls(bool enable); + std::string getTlsTrustCertsFilePath(); + void setTlsTrustCertsFilePath(const std::string& path); + bool isTlsAllowInsecureConnection(); + void setTlsAllowInsecureConnection(bool enable); + bool isValidateHostName(); + void setValidateHostName(bool enable); + + typedef Rice::Data_Object ptr; + }; + + class Client { + public: + pulsar::Client _client; + Client(Rice::String service_url, const ClientConfiguration& config); + + Producer::ptr create_producer(Rice::String topic, const ProducerConfiguration& config); + Consumer::ptr subscribe(Rice::String topic, Rice::String subscriptionName, const ConsumerConfiguration& config); + Reader::ptr create_reader(Rice::String topic, const MessageId& startMessageId, const ReaderConfiguration& config); + + void close(); + + typedef Rice::Data_Object ptr; + }; +}; + +void bind_client(Rice::Module &module); + +#endif diff --git a/ext/bindings/consumer.cpp b/ext/bindings/consumer.cpp new file mode 100644 index 0000000..b7c0620 --- /dev/null +++ b/ext/bindings/consumer.cpp @@ -0,0 +1,95 @@ +#include "rice/Data_Type.hpp" +#include "rice/Enum.hpp" +#include "rice/Constructor.hpp" +#include +#include + +#include "consumer.hpp" +#include "util.hpp" + +namespace pulsar_rb { + +typedef struct { + pulsar::Consumer& consumer; + unsigned int timeout_ms; + pulsar::Message message; + pulsar::Result result; +} consumer_receive_job; + +void* consumer_receive_nogvl(void* jobPtr) { + consumer_receive_job& job = *(consumer_receive_job*)jobPtr; + if (job.timeout_ms > 0) { + job.result = job.consumer.receive(job.message, job.timeout_ms); + } else { + job.result = job.consumer.receive(job.message); + } + return nullptr; +} + +pulsar::Message consumer_receive(pulsar::Consumer& consumer, unsigned int timeout_ms) { + consumer_receive_job job = { consumer, timeout_ms }; + rb_thread_call_without_gvl(&consumer_receive_nogvl, &job, RUBY_UBF_IO, nullptr); + CheckResult(job.result); + return job.message; +} + +Message::ptr Consumer::receive(unsigned int timeout_ms) { + pulsar::Message message = consumer_receive(_consumer, timeout_ms); + return Message::ptr(new Message(message)); +} + +void Consumer::acknowledge(const Message& message) { + _consumer.acknowledgeAsync(message._msg, nullptr); +} + +void Consumer::negative_acknowledge(const Message& message) { + _consumer.negativeAcknowledge(message._msg); +} + +} + +using namespace Rice; + +void bind_consumer(Module &module) { + define_class_under(module, "Consumer") + .define_constructor(Constructor()) + .define_method("receive", &pulsar_rb::Consumer::receive, (Arg("timeout_ms") = 0)) + .define_method("acknowledge", &pulsar_rb::Consumer::acknowledge) + .define_method("negative_acknowledge", &pulsar_rb::Consumer::negative_acknowledge) + ; + + define_enum("ConsumerType", module) + .define_value("Exclusive", ConsumerExclusive) + .define_value("Shared", ConsumerShared) + .define_value("Failover", ConsumerFailover) + .define_value("KeyShared", ConsumerKeyShared); + + define_class_under(module, "ConsumerConfiguration") + .define_constructor(Constructor()) + .define_method("consumer_type", &ConsumerConfiguration::getConsumerType) + .define_method("consumer_type=", &ConsumerConfiguration::setConsumerType) + // TODO .define_method("schema", &ConsumerConfiguration::getSchema) + // TODO .define_method("schema=", &ConsumerConfiguration::setSchema) + // TODO .define_method("message_listener", &ConsumerConfiguration_setMessageListener) + .define_method("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize) + .define_method("receiver_queue_size=", &ConsumerConfiguration::setReceiverQueueSize) + .define_method("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions) + .define_method("max_total_receiver_queue_size_across_partitions=", &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions) + .define_method("consumer_name", &ConsumerConfiguration::getConsumerName) + .define_method("consumer_name=", &ConsumerConfiguration::setConsumerName) + .define_method("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs) + .define_method("unacked_messages_timeout_ms=", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs) + .define_method("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::getNegativeAckRedeliveryDelayMs) + .define_method("negative_ack_redelivery_delay_ms=", &ConsumerConfiguration::setNegativeAckRedeliveryDelayMs) + .define_method("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs) + .define_method("broker_consumer_stats_cache_time_ms=", &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs) + .define_method("pattern_auto_discovery_period", &ConsumerConfiguration::getPatternAutoDiscoveryPeriod) + .define_method("pattern_auto_discovery_period=", &ConsumerConfiguration::setPatternAutoDiscoveryPeriod) + .define_method("read_compacted?", &ConsumerConfiguration::isReadCompacted) + .define_method("read_compacted=", &ConsumerConfiguration::setReadCompacted) + .define_method("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition) + .define_method("subscription_initial_position=", &ConsumerConfiguration::setSubscriptionInitialPosition) + .define_method("[]", &ConsumerConfiguration::getProperty) + .define_method("[]=", &ConsumerConfiguration::setProperty) + ; +} diff --git a/ext/bindings/consumer.hpp b/ext/bindings/consumer.hpp new file mode 100644 index 0000000..bc4bfa1 --- /dev/null +++ b/ext/bindings/consumer.hpp @@ -0,0 +1,31 @@ +#ifndef __PULSAR_RUBY_CLIENT_CONSUMER_HPP +#define __PULSAR_RUBY_CLIENT_CONSUMER_HPP + +#include "rice/Module.hpp" +#include "rice/Data_Object.hpp" +#include + +#include "message.hpp" + +namespace pulsar_rb { + class Consumer { + public: + pulsar::Consumer _consumer; + Consumer() {}; + Consumer(const pulsar::Consumer& consumer) : _consumer(consumer) {} + + Message::ptr receive(unsigned int timeout_ms=0); + void acknowledge(const Message& message); + void negative_acknowledge(const Message& message); + + typedef Rice::Data_Object ptr; + }; + + // direct typedef instead of wrapping because implementations don't need any + // wrapping. but still re-namespaced for consistency + typedef pulsar::ConsumerConfiguration ConsumerConfiguration; +}; + +void bind_consumer(Rice::Module& module); + +#endif diff --git a/ext/bindings/extconf.rb b/ext/bindings/extconf.rb new file mode 100644 index 0000000..068abd9 --- /dev/null +++ b/ext/bindings/extconf.rb @@ -0,0 +1,3 @@ +require 'mkmf-rice' +$LOCAL_LIBS << "-lpulsar" +create_makefile('pulsar/bindings') diff --git a/ext/bindings/message.cpp b/ext/bindings/message.cpp new file mode 100644 index 0000000..6ef005e --- /dev/null +++ b/ext/bindings/message.cpp @@ -0,0 +1,57 @@ +#include "rice/Data_Type.hpp" +#include "rice/Constructor.hpp" +#include + +#include "message.hpp" + +namespace pulsar_rb { + +Rice::String MessageId::toString() { + std::stringstream ss; + ss << _msgId; + return Rice::String(ss.str()); +} + +Rice::String MessageId::serialize() { + std::string serialized; + _msgId.serialize(serialized); + return Rice::String(serialized); +} + +Message::Message(const std::string& data) { + pulsar::MessageBuilder mb; + mb.setContent(data); + _msg = mb.build(); +} + +Rice::String Message::getData() { + std::string str((const char*)_msg.getData(), _msg.getLength()); + return Rice::String(str); +} + +MessageId::ptr Message::getMessageId() { + pulsar::MessageId messageId = _msg.getMessageId(); + return MessageId::ptr(new MessageId(messageId)); +} + +} + +using namespace Rice; + +void bind_message(Module& module) { + define_class_under(module, "MessageId") + .define_constructor(Constructor()) + .define_method("to_s", &pulsar_rb::MessageId::toString) + .define_method("serialize", &pulsar_rb::MessageId::serialize) + .define_singleton_method("deserialize", &pulsar_rb::MessageId::deserialize) + .define_singleton_method("earliest", &pulsar_rb::MessageId::earliest) + .define_singleton_method("latest", &pulsar_rb::MessageId::latest) + ; + + define_class_under(module, "Message") + .define_constructor(Constructor()) + .define_constructor(Constructor()) + .define_method("data", &pulsar_rb::Message::getData) + .define_method("message_id", &pulsar_rb::Message::getMessageId) + ; +} diff --git a/ext/bindings/message.hpp b/ext/bindings/message.hpp new file mode 100644 index 0000000..b542701 --- /dev/null +++ b/ext/bindings/message.hpp @@ -0,0 +1,48 @@ +#ifndef __PULSAR_RUBY_CLIENT_MESSAGE_HPP +#define __PULSAR_RUBY_CLIENT_MESSAGE_HPP + +#include "rice/Module.hpp" +#include "rice/String.hpp" +#include "rice/Data_Object.hpp" +#include + +namespace pulsar_rb { + class MessageId { + public: + pulsar::MessageId _msgId; + MessageId(const pulsar::MessageId& msgId) : _msgId(msgId) {}; + + Rice::String toString(); + Rice::String serialize(); + + typedef Rice::Data_Object ptr; + + static MessageId::ptr deserialize(const Rice::String& serialized) { + return MessageId::ptr(new MessageId(pulsar::MessageId::deserialize(serialized.str()))); + } + + static MessageId::ptr earliest() { + return MessageId::ptr(new MessageId(pulsar::MessageId::earliest())); + } + + static MessageId::ptr latest() { + return MessageId::ptr(new MessageId(pulsar::MessageId::latest())); + } + }; + + class Message { + public: + pulsar::Message _msg; + Message(const pulsar::Message& msg) : _msg(msg) {}; + Message(const std::string& data); + + Rice::String getData(); + MessageId::ptr getMessageId(); + + typedef Rice::Data_Object ptr; + }; +}; + +void bind_message(Rice::Module& module); + +#endif diff --git a/ext/bindings/producer.cpp b/ext/bindings/producer.cpp new file mode 100644 index 0000000..4565370 --- /dev/null +++ b/ext/bindings/producer.cpp @@ -0,0 +1,69 @@ +#include "rice/Data_Type.hpp" +#include "rice/Constructor.hpp" +#include +#include + +#include "producer.hpp" +#include "util.hpp" + +namespace pulsar_rb { + +typedef struct { + pulsar::Producer& producer; + const pulsar::Message& message; + pulsar::Result result; +} producer_send_task; + +void* producer_send_worker(void* taskPtr) { + producer_send_task& task = *(producer_send_task*)taskPtr; + task.result = task.producer.send(task.message); + return nullptr; +} + +void Producer::send(const Message& message) { + producer_send_task task = { _producer, message._msg }; + rb_thread_call_without_gvl(&producer_send_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); +} + +} + +using namespace Rice; + +void bind_producer(Module& module) { + define_class_under(module, "Producer") + .define_constructor(Constructor()) + .define_method("send", &pulsar_rb::Producer::send) + ; + + define_class_under(module, "ProducerConfiguration") + .define_constructor(Constructor()) + .define_method("producer_name", &ProducerConfiguration::getProducerName) + .define_method("producer_name=", &ProducerConfiguration::setProducerName) + // TODO .define_method("schema", &ProducerConfiguration::getSchema) + // TODO .define_method("schema=", &ProducerConfiguration::setSchema) + .define_method("send_timeout_millis", &ProducerConfiguration::getSendTimeout) + .define_method("send_timeout_millis=", &ProducerConfiguration::setSendTimeout) + .define_method("initial_sequence_id", &ProducerConfiguration::getInitialSequenceId) + .define_method("initial_sequence_id=", &ProducerConfiguration::setInitialSequenceId) + .define_method("compression_type", &ProducerConfiguration::getCompressionType) + .define_method("compression_type=", &ProducerConfiguration::setCompressionType) + .define_method("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages) + .define_method("max_pending_messages=", &ProducerConfiguration::setMaxPendingMessages) + .define_method("max_pending_messages_across_partitions", &ProducerConfiguration::getMaxPendingMessagesAcrossPartitions) + .define_method("max_pending_messages_across_partitions=", &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions) + .define_method("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull) + .define_method("block_if_queue_full=", &ProducerConfiguration::setBlockIfQueueFull) + .define_method("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode) + .define_method("partitions_routing_mode=", &ProducerConfiguration::setPartitionsRoutingMode) + .define_method("batching_enabled", &ProducerConfiguration::getBatchingEnabled) + .define_method("batching_enabled=", &ProducerConfiguration::setBatchingEnabled) + .define_method("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages) + .define_method("batching_max_messages=", &ProducerConfiguration::setBatchingMaxMessages) + .define_method("batching_max_allowed_size_in_bytes", &ProducerConfiguration::getBatchingMaxAllowedSizeInBytes) + .define_method("batching_max_allowed_size_in_bytes=", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes) + .define_method("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs) + .define_method("batching_max_publish_delay_ms=", &ProducerConfiguration::setBatchingMaxPublishDelayMs) + .define_method("[]", &ProducerConfiguration::getProperty) + .define_method("[]=", &ProducerConfiguration::setProperty); +} diff --git a/ext/bindings/producer.hpp b/ext/bindings/producer.hpp new file mode 100644 index 0000000..718f80d --- /dev/null +++ b/ext/bindings/producer.hpp @@ -0,0 +1,29 @@ +#ifndef __PULSAR_RUBY_CLIENT_PRODUCER_HPP +#define __PULSAR_RUBY_CLIENT_PRODUCER_HPP + +#include "rice/Module.hpp" +#include "rice/Data_Object.hpp" +#include + +#include "message.hpp" + +namespace pulsar_rb { + class Producer { + public: + pulsar::Producer _producer; + Producer() {}; + Producer(const pulsar::Producer& producer) : _producer(producer) {} + + void send(const Message& message); + + typedef Rice::Data_Object ptr; + }; + + // direct typedef instead of wrapping because implementations don't need any + // wrapping. but still re-namespaced for consistency + typedef pulsar::ProducerConfiguration ProducerConfiguration; +}; + +void bind_producer(Rice::Module& module); + +#endif diff --git a/ext/bindings/reader.cpp b/ext/bindings/reader.cpp new file mode 100644 index 0000000..6034e54 --- /dev/null +++ b/ext/bindings/reader.cpp @@ -0,0 +1,101 @@ +#include "rice/Data_Type.hpp" +#include "rice/Enum.hpp" +#include "rice/Constructor.hpp" +#include +#include + +#include "reader.hpp" +#include "util.hpp" + +namespace pulsar_rb { + +/* TODO in newer pulsar C++ lib +typedef struct { + pulsar::Reader& reader; + const pulsar::MessageId& msgId; + pulsar::Result result; +} reader_seek_task; + +void* reader_seek_nogvl(void* taskPtr) { + reader_seek_task& task = *(reader_seek_task*)taskPtr; + task.result = task.reader.seek(task.msgId); + return nullptr; +} + +void Reader::seek(MessageId::ptr msgId) { + reader_seek_task task = { _reader, msgId->_msgId }; + rb_thread_call_without_gvl(&reader_seek_nogvl, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); +} +*/ + +typedef struct { + pulsar::Reader& reader; + pulsar::Result result; + bool answer; +} reader_message_available_task; + +void* reader_message_available_nogvl(void* taskPtr) { + reader_message_available_task& task = *(reader_message_available_task*)taskPtr; + task.result = task.reader.hasMessageAvailable(task.answer); + return nullptr; +} + +bool Reader::hasMessageAvailable() { + reader_message_available_task task = { _reader }; + rb_thread_call_without_gvl(&reader_message_available_nogvl, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); + return task.answer; +} + +typedef struct { + pulsar::Reader& reader; + unsigned int timeout_ms; + pulsar::Result result; + pulsar::Message message; +} reader_read_next_task; + +void* reader_read_next_nogvl(void* taskPtr) { + reader_read_next_task& task = *(reader_read_next_task*)taskPtr; + if (task.timeout_ms > 0) { + task.result = task.reader.readNext(task.message, task.timeout_ms); + } else { + task.result = task.reader.readNext(task.message); + } + return nullptr; +} + +Message::ptr Reader::readNext(unsigned int timeout_ms) { + reader_read_next_task task = { _reader, timeout_ms }; + rb_thread_call_without_gvl(&reader_read_next_nogvl, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); + return Message::ptr(new Message(task.message)); +} + +} + +using namespace Rice; + +void bind_reader(Module &module) { + define_class_under(module, "Reader") + .define_constructor(Constructor()) + // TODO .define_method("seek", &pulsar_rb::Reader::seek) + .define_method("message_available?", &pulsar_rb::Reader::hasMessageAvailable) + .define_method("next_message", &pulsar_rb::Reader::readNext, (Arg("timeout_ms") = 0)) + ; + + define_class_under(module, "ReaderConfiguration") + .define_constructor(Constructor()) + // TODO .define_method("schema", &ReaderConfiguration::getSchema) + // TODO .define_method("schema=", &ReaderConfiguration::setSchema) + // TODO .define_method("message_listener", &ReaderConfiguration::setMessageListener) + .define_method("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize) + .define_method("receiver_queue_size=", &ReaderConfiguration::setReceiverQueueSize) + .define_method("reader_name", &ReaderConfiguration::getReaderName) + .define_method("reader_name=", &ReaderConfiguration::setReaderName) + .define_method("subscription_role_prefix", &ReaderConfiguration::getSubscriptionRolePrefix) + .define_method("subscription_role_prefix=", &ReaderConfiguration::setSubscriptionRolePrefix) + .define_method("read_compacted?", &ReaderConfiguration::isReadCompacted) + .define_method("read_compacted=", &ReaderConfiguration::setReadCompacted) + ; +} diff --git a/ext/bindings/reader.hpp b/ext/bindings/reader.hpp new file mode 100644 index 0000000..1064409 --- /dev/null +++ b/ext/bindings/reader.hpp @@ -0,0 +1,31 @@ +#ifndef __PULSAR_RUBY_CLIENT_READER_HPP +#define __PULSAR_RUBY_CLIENT_READER_HPP + +#include "rice/Module.hpp" +#include "rice/Data_Object.hpp" +#include + +#include "message.hpp" + +namespace pulsar_rb { + class Reader { + public: + pulsar::Reader _reader; + Reader() {}; + Reader(const pulsar::Reader& reader) : _reader(reader) {} + + // void seek(const MessageId::ptr msgId, unsigned int timeout_ms=0); + bool hasMessageAvailable(); + Message::ptr readNext(unsigned int timeout_ms=0); + + typedef Rice::Data_Object ptr; + }; + + // direct typedef instead of wrapping because implementations don't need any + // wrapping. but still re-namespaced for consistency + typedef pulsar::ReaderConfiguration ReaderConfiguration; +}; + +void bind_reader(Rice::Module& module); + +#endif diff --git a/ext/bindings/util.cpp b/ext/bindings/util.cpp new file mode 100644 index 0000000..a847ed0 --- /dev/null +++ b/ext/bindings/util.cpp @@ -0,0 +1,23 @@ +#include + +#include "util.hpp" +#include "rice/Exception.hpp" + +using namespace Rice; + +VALUE rb_ePulsarError = Qnil; +VALUE rb_ePulsarError_Timeout = Qnil; + +void bind_errors(Module &module) { + rb_ePulsarError = rb_define_class_under(module.value(), "Error", rb_eStandardError); + rb_ePulsarError_Timeout = rb_define_class_under(rb_ePulsarError, "Timeout", rb_ePulsarError); +} + +void CheckResult(pulsar::Result res) { + if (res == pulsar::ResultTimeout) { + throw Exception(rb_ePulsarError_Timeout, "pulsar timeout"); + } + else if (res != ResultOk) { + throw Exception(rb_ePulsarError, "unexpected pulsar exception: %d", res); + } +} diff --git a/ext/bindings/util.hpp b/ext/bindings/util.hpp new file mode 100644 index 0000000..c3eaf7b --- /dev/null +++ b/ext/bindings/util.hpp @@ -0,0 +1,13 @@ +#ifndef __PULSAR_RUBY_CLIENT_UTIL_HPP +#define __PULSAR_RUBY_CLIENT_UTIL_HPP + +#include "rice/Module.hpp" +#include + +using namespace pulsar; + +void CheckResult(Result res); + +void bind_errors(Rice::Module& module); + +#endif diff --git a/lib/pulsar/client.rb b/lib/pulsar/client.rb new file mode 100644 index 0000000..51ce48a --- /dev/null +++ b/lib/pulsar/client.rb @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/client/version' +require 'pulsar/bindings' +require 'pulsar/client_configuration' +require 'pulsar/consumer' +require 'pulsar/consumer_configuration' +require 'pulsar/producer' +require 'pulsar/reader_configuration' + +module Pulsar + class Client + module RubySideTweaks + def initialize(service_url, config=nil) + config = Pulsar::ClientConfiguration.from(config) + super(service_url, config) + end + + def create_producer(topic, config=nil) + config ||= Pulsar::ProducerConfiguration.new + super(topic, config) + end + + def subscribe(topic, subscription_name, config={}) + unless config.is_a?(Pulsar::ConsumerConfiguration) + config = Pulsar::ConsumerConfiguration.new(config) + end + super(topic, subscription_name, config) + end + + def create_reader(topic, start_message_id, config={}) + unless config.is_a?(Pulsar::ReaderConfiguration) + config = Pulsar::ReaderConfiguration.new(config) + end + super(topic, start_message_id, config) + end + end + + prepend RubySideTweaks + + def self.sufficient_environment?(config={}) + config[:broker_uri] || ENV['PULSAR_BROKER_URI'] + end + + def self.from_environment(config={}) + broker_uri = config[:broker_uri] || ENV['PULSAR_BROKER_URI'] + config = Pulsar::ClientConfiguration.from_environment(config) + new(broker_uri, config) + end + end +end diff --git a/lib/pulsar/client/version.rb b/lib/pulsar/client/version.rb new file mode 100644 index 0000000..5667967 --- /dev/null +++ b/lib/pulsar/client/version.rb @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Pulsar + class Client + VERSION = "2.4.1-beta.4" + end +end diff --git a/lib/pulsar/client_configuration.rb b/lib/pulsar/client_configuration.rb new file mode 100644 index 0000000..b2dee09 --- /dev/null +++ b/lib/pulsar/client_configuration.rb @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/bindings' + +module Pulsar + class ClientConfiguration + def self.from(config) + case config + when self then config # already a config object + when nil then self.new # empty (all defaults) config object + when Hash then self.new(config) # config object from hash + else raise ArgumentError + end + end + + def self.from_environment(config={}) + env_config = {} + if ENV.key?('PULSAR_CERT_PATH') + env_config[:use_tls] = true + env_config[:tls_allow_insecure_connection] = false + env_config[:tls_validate_hostname] = false + env_config[:tls_trust_certs_file_path] = ENV['PULSAR_CERT_PATH'] + end + if ENV.key?('PULSAR_AUTH_TOKEN') + env_config[:authentication_token] = ENV['PULSAR_AUTH_TOKEN'] + end + self.from(env_config.merge(config)) + end + + module RubySideTweaks + def initialize(config={}) + super() + populate(config) + end + end + + prepend RubySideTweaks + + def populate(config={}) + populate_one(config, :authentication_token) + populate_one(config, :operation_timeout_seconds) + populate_one(config, :io_threads) + populate_one(config, :message_listener_threads) + populate_one(config, :concurrent_lookup_requests) + populate_one(config, :log_conf_file_path) + populate_one(config, :use_tls) + populate_one(config, :tls_trust_certs_file_path) + populate_one(config, :tls_allow_insecure_connection) + populate_one(config, :tls_validate_hostname) + end + + def populate_one(config, key) + if config.key?(key) + self.send(:"#{key}=", config[key]) + elsif config.key?(key.to_s) + self.send(:"#{key}=", config[key.to_s]) + end + end + end +end diff --git a/lib/pulsar/consumer.rb b/lib/pulsar/consumer.rb new file mode 100644 index 0000000..e938444 --- /dev/null +++ b/lib/pulsar/consumer.rb @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/bindings' + +module Pulsar + class Consumer + class ListenerToken + def initialize + @active = true + end + + def finish + @active = false + end + + def active? + @active + end + end + + def listen + listener = ListenerToken.new + while listener.active? + msg = receive + yield msg.data, msg.message_id, lambda { listener.finish } + acknowledge(msg) + end + end + + def listen_in_thread + Thread.new { listen {|*args| yield *args }} + end + end +end diff --git a/lib/pulsar/consumer_configuration.rb b/lib/pulsar/consumer_configuration.rb new file mode 100644 index 0000000..df9ebe9 --- /dev/null +++ b/lib/pulsar/consumer_configuration.rb @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/client/version' +require 'pulsar/bindings' +require 'pulsar/client_configuration' +require 'pulsar/consumer' +require 'pulsar/producer' + +module Pulsar + class ConsumerConfiguration + # aligns with the pulsar::ConsumerType enum in the C++ library + CONSUMER_TYPES = { + :exclusive => Pulsar::ConsumerType::Exclusive, + :shared => Pulsar::ConsumerType::Shared, + :failover => Pulsar::ConsumerType::Failover, + :key_shared => Pulsar::ConsumerType::KeyShared + } + + module RubySideTweaks + def initialize(config={}) + super() + self.consumer_type = config[:consumer_type] if config.has_key?(:consumer_type) + end + + def consumer_type + enum_value = super + CONSUMER_TYPES.invert[enum_value] + end + + def consumer_type=(type) + unless type.is_a?(Pulsar::ConsumerType) + type = CONSUMER_TYPES[type] + unless type + raise ArgumentError, "unrecognized consumer_type" + end + end + super(type) + end + end + + prepend RubySideTweaks + end +end diff --git a/lib/pulsar/producer.rb b/lib/pulsar/producer.rb new file mode 100644 index 0000000..cfd42de --- /dev/null +++ b/lib/pulsar/producer.rb @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/bindings' + +module Pulsar + class Producer + module RubySideTweaks + def send(message) + unless message.is_a?(Pulsar::Message) + message = Pulsar::Message.new(message) + end + super(message) + end + end + + prepend RubySideTweaks + end +end diff --git a/lib/pulsar/reader_configuration.rb b/lib/pulsar/reader_configuration.rb new file mode 100644 index 0000000..bf7e625 --- /dev/null +++ b/lib/pulsar/reader_configuration.rb @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/client/version' +require 'pulsar/bindings' + +module Pulsar + class ReaderConfiguration + + module RubySideTweaks + def initialize(config={}) + super() + self.receiver_queue_size = config[:receiver_queue_size] if config.has_key?(:receiver_queue_size) + self.reader_name = config[:reader_name] if config.has_key?(:reader_name) + self.subscription_role_prefix = config[:subscription_role_prefix] if config.has_key?(:subscription_role_prefix) + self.read_compacted = config[:read_compacted] if config.has_key?(:read_compacted) + end + end + + prepend RubySideTweaks + end +end diff --git a/pulsar-client.gemspec b/pulsar-client.gemspec new file mode 100644 index 0000000..2d387df --- /dev/null +++ b/pulsar-client.gemspec @@ -0,0 +1,31 @@ +lib = File.expand_path("../lib", __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) +require "pulsar/client/version" + +Gem::Specification.new do |spec| + spec.name = "pulsar-client" + spec.version = Pulsar::Client::VERSION + spec.date = "2019-10-03" + spec.summary = "Apache Pulsar Ruby Client" + spec.description = "Wraps the Apache Pulsar C++ Client with Ruby bindings." + spec.authors = ["Jacob Fugal"] + spec.email = ["lukfugl@gmail.com"] + # once merged upstream to apache, we'll rename the homepage + spec.homepage = "https://github.com/instructure/pulsar-client-ruby" + spec.license = "Apache-2.0" + + spec.files = `git ls-files -z`.split("\x0").reject do |f| + f.match(%r{^(test|spec|features)/}) + end + spec.bindir = "exe" + spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } + + spec.extensions = ["ext/bindings/extconf.rb"] + + spec.add_development_dependency "bundler", "~> 1.16" + spec.add_development_dependency "rake", "~> 10.0" + spec.add_development_dependency "rspec", "~> 3.0" + + spec.add_dependency "rake-compiler", "~> 1.0" + spec.add_dependency "rice", "~> 2.1" +end diff --git a/spec/pulsar/client_spec.rb b/spec/pulsar/client_spec.rb new file mode 100644 index 0000000..5ae8c72 --- /dev/null +++ b/spec/pulsar/client_spec.rb @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +RSpec.describe Pulsar::Client do + it "has a version number" do + expect(Pulsar::Client::VERSION).not_to be nil + end + + it "does something useful" do + expect(false).to eq(true) + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..a9ba201 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "pulsar/client" + +RSpec.configure do |config| + # Enable flags like --only-failures and --next-failure + config.example_status_persistence_file_path = ".rspec_status" + + # Disable RSpec exposing methods globally on `Module` and `main` + config.disable_monkey_patching! + + config.expect_with :rspec do |c| + c.syntax = :expect + end +end