Skip to content

Commit

Permalink
refactor(pubsub-open-telemetry): move publisher functions into a help…
Browse files Browse the repository at this point in the history
…er library (#278)

Co-authored-by: Carlos O'Ryan <[email protected]>
  • Loading branch information
alevenberg and coryan authored Dec 7, 2023
1 parent f5896eb commit b3d47cb
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 125 deletions.
12 changes: 12 additions & 0 deletions pubsub-open-telemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,23 @@ cc_library(
],
)

cc_library(
name = "publisher_helper",
srcs = ["publisher_helper.cc"],
hdrs = ["publisher_helper.h"],
deps = [
":parse_args",
"@google_cloud_cpp//:opentelemetry",
"@google_cloud_cpp//:pubsub",
],
)

cc_binary(
name = "publisher",
srcs = ["publisher.cc"],
deps = [
":parse_args",
":publisher_helper",
"@google_cloud_cpp//:opentelemetry",
"@google_cloud_cpp//:pubsub",
],
Expand Down
12 changes: 8 additions & 4 deletions pubsub-open-telemetry/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@ project(pubsub-open-telemetry CXX)
find_package(google_cloud_cpp_pubsub CONFIG REQUIRED)
find_package(google_cloud_cpp_opentelemetry CONFIG REQUIRED)
find_package(Boost 1.66 REQUIRED COMPONENTS program_options)
find_package(opentelemetry-cpp CONFIG REQUIRED)

add_library(parse_args STATIC parse_args.cc parse_args.h)
add_library(parse_args parse_args.cc parse_args.h)
target_compile_features(parse_args PUBLIC cxx_std_14)
target_link_libraries(
parse_args PUBLIC Boost::program_options google-cloud-cpp::pubsub
google-cloud-cpp::opentelemetry)

add_library(publisher_helper publisher_helper.cc publisher_helper.h)
target_compile_features(publisher_helper PUBLIC cxx_std_14)
target_link_libraries(publisher_helper PUBLIC parse_args opentelemetry-cpp::api
opentelemetry-cpp::sdk)

add_executable(publisher publisher.cc)
target_compile_features(publisher PRIVATE cxx_std_14)
target_link_libraries(
publisher PRIVATE google-cloud-cpp::pubsub google-cloud-cpp::opentelemetry
parse_args)
target_link_libraries(publisher PRIVATE publisher_helper parse_args)

add_executable(quickstart quickstart.cc)
target_compile_features(quickstart PRIVATE cxx_std_14)
Expand Down
78 changes: 0 additions & 78 deletions pubsub-open-telemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,6 @@ To find the traces, navigate to the Cloud Trace UI.

For an overview of the Cloud Trace UI, see: [View traces overview].

### Publisher

The publisher application lets the user configure a tracing enabled Pub/Sub
Publisher client to see how different configuration settings change the produced
telemetry data.

#### Example traces

To find the traces, navigate to the Cloud Trace UI.

##### Publish trace

![Screenshot of the publish span in the Cloud Trace UI running publisher.](assets/publish_span.png)

##### Create trace

![Screenshot of the create span in the Cloud Trace UI running publisher.](assets/create_span.png)

## Prerequisites

### 1. Create a project in the Google Cloud Platform Console
Expand Down Expand Up @@ -137,57 +119,6 @@ cmake --build .build
.build/quickstart [project-name] [topic-id]
```

#### Run basic publisher examples

```shell
.build/publisher [project-name] [topic-id]
.build/publisher [project-name] [topic-id] -n 1000
.build/publisher [project-name] [topic-id] --message-size 0
.build/publisher [project-name] [topic-id] --tracing-rate 0.01 -n 10
```

#### Flow control example

```shell
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action reject
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action block
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action ignore
.build/publisher [project-name] [topic-id] -n 5 --message-size 10 --max-batch-bytes 60 --publisher-action block
```

#### Batching example

```shell
.build/publisher [project-name] [topic-id] -n 5 --max-batch-messages 2 --max-hold-time 100
.build/publisher [project-name] [topic-id] -n 5 --message-size 10 --max-batch-bytes 60 --max-hold-time 1000
```

#### To see all options

```shell
.build/publisher --help
Usage: .build/publisher <project-id> <topic-id>
A simple publisher application with Open Telemetery enabled:
-h [ --help ] produce help message
--project-id arg the name of the Google Cloud project
--topic-id arg the name of the Google Cloud topic
--tracing-rate arg (=1) otel::BasicTracingRateOption value
--max-queue-size arg (=0) If set to 0, uses the default tracing
configuration.
-n [ --message-count ] arg (=1) the number of messages to publish
--message-size arg (=1) the desired message payload size
--enable-ordering-keys arg (=0) If set to true, the messages will be sent
with ordering keys. There will be 3 possible
ordering keys and they will be set randomly
--max-pending-messages arg pubsub::MaxPendingMessagesOption value
--max-pending-bytes arg pubsub::MaxPendingBytesOption value
--publisher-action arg pubsub::FullPublisherAction value
(block|ignore|reject)
--max-hold-time arg pubsub::MaxHoldTimeOption value in us
--max-batch-bytes arg pubsub::MaxBatchBytesOption value
--max-batch-messages arg pubsub::MaxBatchMessagesOption value
```

## Build and run using Bazel

### 1. Download or clone this repo
Expand All @@ -211,15 +142,6 @@ bazel build //:quickstart
bazel run //:quickstart [project-name] [topic-id]
```

#### Run basic publisher examples

```shell
bazel run //:publisher [project-name] [topic-id]
bazel run //:publisher -- [project-name] [topic-id] -n 1000
bazel run //:publisher -- [project-name] [topic-id] --message_size 0
bazel run //:publisher -- [project-name] [topic-id] --tracing-rate 0.01 -n 10
```

#### Run with a local version of google-cloud-cpp

```shell
Expand Down
62 changes: 19 additions & 43 deletions pubsub-open-telemetry/publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,73 +13,49 @@
// limitations under the License.

#include "google/cloud/pubsub/publisher.h"
#include "google/cloud/opentelemetry/configure_basic_tracing.h"
#include "google/cloud/opentelemetry/trace_exporter.h"
#include "google/cloud/opentelemetry_options.h"
#include "parse_args.h"
#include "publisher_helper.h"
#include <opentelemetry/sdk/trace/batch_span_processor_factory.h>
#include <opentelemetry/sdk/trace/batch_span_processor_options.h>
#include <opentelemetry/sdk/trace/processor.h>
#include <opentelemetry/sdk/trace/tracer_provider_factory.h>
#include <opentelemetry/trace/provider.h>
#include <iostream>
#include <string_view>

// Create a few namespace aliases to make the code easier to read.
namespace gc = ::google::cloud;
namespace pubsub = gc::pubsub;
namespace otel = gc::otel;
namespace trace_sdk = ::opentelemetry::sdk::trace;
namespace trace = ::opentelemetry::trace;

std::string GeneratePayload(int payload_size) {
auto gen = google::cloud::internal::DefaultPRNG(std::random_device{}());
const std::string charset = "abcdefghijklmnopqrstuvwxyz";
std::uniform_int_distribution<std::size_t> rd(0, charset.size() - 1);

std::string result(payload_size, '0');
std::generate(result.begin(), result.end(),
[&rd, &gen, &charset]() { return charset[rd(gen)]; });
return result;
void ConfigureCloudTraceTracer(ParseResult const& args) {
auto exporter = otel::MakeTraceExporter(gc::Project(args.project_id));
trace_sdk::BatchSpanProcessorOptions span_options;
span_options.max_queue_size = args.max_queue_size;
auto processor = trace_sdk::BatchSpanProcessorFactory::Create(
std::move(exporter), span_options);
auto provider =
trace_sdk::TracerProviderFactory::Create(std::move(processor));
trace::Provider::SetTracerProvider(std::move(provider));
}

int main(int argc, char* argv[]) try {
auto args = ParseArguments(argc, argv);
if (args.project_id.empty() && args.topic_id.empty()) {
return 1;
}

std::cout << "Using project `" << args.project_id << "` and topic `"
<< args.topic_id << "`\n";
auto exporter = otel::MakeTraceExporter(gc::Project(args.project_id));
opentelemetry::sdk::trace::BatchSpanProcessorOptions span_options;
span_options.max_queue_size = args.max_queue_size;
auto processor = opentelemetry::sdk::trace::BatchSpanProcessorFactory::Create(
std::move(exporter), span_options);
auto provider = opentelemetry::sdk::trace::TracerProviderFactory::Create(
std::move(processor));
opentelemetry::trace::Provider::SetTracerProvider(std::move(provider));

auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
pubsub::Topic(args.project_id, args.topic_id), args.publisher_options));
// Automatically call `Cleanup()` before returning from `main()`.
std::shared_ptr<void> cleanup(nullptr, [](void*) { Cleanup(); });

std::cout << "Publishing " << std::to_string(args.message_count)
<< " message(s) with payload size "
<< std::to_string(args.message_size) << "...\n";
std::vector<gc::future<std::string>> ids;
for (int i = 0; i < args.message_count; i++) {
auto id = publisher
.Publish(pubsub::MessageBuilder()
.SetData(GeneratePayload(args.message_size))
.Build())
.then([](gc::future<gc::StatusOr<std::string>> f) {
return f.get().value();
});
ids.push_back(std::move(id));
}
for (auto& id : ids) try {
std::cout << "Sent message with id: " << id.get() << "\n";
} catch (std::exception const& ex) {
std::cout << "Error in publish: " << ex.what() << "\n";
}
ConfigureCloudTraceTracer(args);

auto publisher = CreatePublisher(args);

Publish(publisher, args);

return 0;
} catch (google::cloud::Status const& status) {
Expand Down
83 changes: 83 additions & 0 deletions pubsub-open-telemetry/publisher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Publisher

The publisher application lets the user configure a tracing enabled Pub/Sub
Publisher client to see how different configuration settings change the produced
telemetry data.

## Example traces

### Cloud Trace

To find the traces, navigate to the Cloud Trace UI.

#### Publish trace

![Screenshot of the publish span in the Cloud Trace UI running publisher.](assets/publish_span.png)

#### Create trace

![Screenshot of the create span in the Cloud Trace UI running publisher.](assets/create_span.png)

## Build and run

For setup instructions, refer to the [README.md](README.md).

### Using CMake and Vcpkg

#### Run basic publisher examples

```shell
.build/publisher [project-name] [topic-id]
.build/publisher [project-name] [topic-id] -n 1000
.build/publisher [project-name] [topic-id] --message-size 0
.build/publisher [project-name] [topic-id] --tracing-rate 0.01 -n 10
```

#### Flow control example

```shell
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action reject
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action block
.build/publisher [project-name] [topic-id] -n 5 --max-pending-messages 2 --publisher-action ignore
.build/publisher [project-name] [topic-id] -n 5 --message-size 10 --max-batch-bytes 60 --publisher-action block
```

#### Batching example

```shell
.build/publisher [project-name] [topic-id] -n 5 --max-batch-messages 2 --max-hold-time 100
.build/publisher [project-name] [topic-id] -n 5 --message-size 10 --max-batch-bytes 60 --max-hold-time 1000
```

#### To see all options

```shell
.build/publisher --help
Usage: .build/publisher <project-id> <topic-id>
A simple publisher application with Open Telemetery enabled:
-h [ --help ] produce help message
--project-id arg the name of the Google Cloud project
--topic-id arg the name of the Google Cloud topic
--tracing-rate arg (=1) otel::BasicTracingRateOption value
--max-queue-size arg (=2048) set the max queue size for open telemetery
-n [ --message-count ] arg (=1) the number of messages to publish
--message-size arg (=1) the desired message payload size
--max-pending-messages arg pubsub::MaxPendingMessagesOption value
--max-pending-bytes arg pubsub::MaxPendingBytesOption value
--publisher-action arg pubsub::FullPublisherAction value
(block|ignore|reject)
--max-hold-time arg pubsub::MaxHoldTimeOption value in us
--max-batch-bytes arg pubsub::MaxBatchBytesOption value
--max-batch-messages arg pubsub::MaxBatchMessagesOption value
```

### Using Bazel

#### Run basic publisher examples

```shell
bazel run //:publisher [project-name] [topic-id]
bazel run //:publisher -- [project-name] [topic-id] -n 1000
bazel run //:publisher -- [project-name] [topic-id] --message_size 0
bazel run //:publisher -- [project-name] [topic-id] --tracing-rate 0.01 -n 10
```
Loading

0 comments on commit b3d47cb

Please sign in to comment.