Skip to content

Commit

Permalink
feat(pubsub-avro): add a pubsub avro example (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
alevenberg authored Dec 19, 2023
1 parent 954b9d5 commit 01a18f8
Show file tree
Hide file tree
Showing 8 changed files with 465 additions and 0 deletions.
49 changes: 49 additions & 0 deletions pubsub-avro/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# ~~~
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ~~~

cmake_minimum_required(VERSION 3.20)

# Define the project name and where to report bugs.
set(PACKAGE_BUGREPORT
"https://github.com/GoogleCloudPlatform/cpp-samples/issues")
project(pubsub-avro CXX)

find_package(google_cloud_cpp_pubsub CONFIG REQUIRED)
find_package(unofficial-avro-cpp CONFIG REQUIRED)

# Generate the avro C++ files using the avro compiler.
find_program(AVROGENCPP NAMES avrogencpp)
macro (run_avro_compiler file namespace)
add_custom_command(
OUTPUT ${file}.h
COMMAND
${AVROGENCPP} ARGS --input
"${CMAKE_CURRENT_SOURCE_DIR}/${file}.avro" --output ${file}.h
--namespace ${namespace}
DEPENDS ${AVROGENCPP} "${CMAKE_CURRENT_SOURCE_DIR}/${file}.avro"
COMMENT "Executing Avro compiler")
set_source_files_properties(${file}.h PROPERTIES GENERATED TRUE)
endmacro (run_avro_compiler)

run_avro_compiler(schema1 v1)
run_avro_compiler(schema2 v2)

add_executable(quickstart quickstart.cc schema1.h schema2.h)
target_compile_features(quickstart PRIVATE cxx_std_14)
target_include_directories(quickstart PRIVATE SYSTEM
"${CMAKE_CURRENT_BINARY_DIR}")
target_link_libraries(quickstart PRIVATE google-cloud-cpp::pubsub
unofficial::avro-cpp::avrocpp)
171 changes: 171 additions & 0 deletions pubsub-avro/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Subscribe to avro records

## Overview

The quickstart shows how to subscribe to receive avro messages that could be for
different schema revisions. This example uses the [Avro C++] library and
[C++ Cloud Pub/Sub] library to use the [Cloud Pub/Sub] service. The setup
involves:

1. Creating an initial schema (Schema 1)
1. Creating a topic with Schema 1
1. Creating a subscription to the topic
1. Publishing a message to the topic with Schema 1
1. Commiting a revision schema (Schema 2)
1. Publishing a message to the topic with Schema 2
1. Recieve both messages using a subscriber

## Prerequisites

### 1. Create a project in the Google Cloud Platform Console

If you haven't already created a project, create one now.

Projects enable you to manage all Google Cloud Platform resources for your app,
including deployment, access control, billing, and services.

1. Open the [Cloud Platform Console](https://console.cloud.google.com/).
1. In the drop-down menu at the top, select Create a project.
1. Give your project a name.
1. Make a note of the project ID, which might be different from the project
name. The project ID is used in commands and in configurations.

### 2. Enable billing for your project

If you haven't already enabled billing for your project,
[enable billing now](https://console.cloud.google.com/project/_/settings).
Enabling billing allows the application to consume billable resources such as
Pub/Sub API calls.

See
[Cloud Platform Console Help](https://support.google.com/cloud/answer/6288653)
for more information about billing settings.

### 3. Enable APIs for your project

[Click here](https://console.cloud.google.com/flows/enableapi?apiid=speech&showconfirmation=true)
to visit Cloud Platform Console and enable the Pub/Sub and Trace API via the UI.

Or use the CLI:

```
gcloud services enable pubsub.googleapis.com
```

## Build using CMake and Vcpkg

To build and run the sample, [setup a C++ development environment].

### 1. Install vcpkg

This project uses [`vcpkg`](https://github.com/microsoft/vcpkg) for dependency
management. Clone the vcpkg repository to your preferred location. In these
instructions we use`$HOME`:

```shell
git clone -C $HOME https://github.com/microsoft/vcpkg.git
cd $HOME/vcpkg
./vcpkg install google-cloud-cpp
```

### 2. Download or clone this repo

```shell
git clone https://github.com/GoogleCloudPlatform/cpp-samples
```

### 3. Compile these examples

Use the `vcpkg` toolchain file to download and compile dependencies. This file
would be in the directory you cloned `vcpkg` into, `$HOME/vcpkg` if you are
following the instructions to the letter. Note that building all the
dependencies can take up to an hour, depending on the performance of your
workstation. These dependencies are cached, so a second build should be
substantially faster.

```sh
cd cpp-samples/pubsub-open-telemetry
cmake -S . -B .build -DCMAKE_TOOLCHAIN_FILE=$HOME/vcpkg/scripts/buildsystems/vcpkg.cmake -G Ninja
cmake --build .build
```

## Setup the Pub/Sub messages

Export the following environment variables to run the setup scripts:

```shell
export GOOGLE_CLOUD_PROJECT=[PROJECT ID] # Use your project ID here
export GOOGLE_CLOUD_TOPIC=avro-topic
export GOOGLE_CLOUD_SUBSCRIPTION=avro-sub
export GOOGLE_CLOUD_SCHEMA_NAME=state
export GOOGLE_CLOUD_SCHEMA_FILE1=schema1.avro
export GOOGLE_CLOUD_SCHEMA_FILE2=schema2.avro
```

```shell
./setup.sh
```

## Run the example

This will resolve the schemas when recieving the message and return data in the
format of schema2, even if it was sent in the format of schema1.

```sh
.build/quickstart ${GOOGLE_CLOUD_PROJECT} avro-sub schema2.avro
```

If you want to send more message to test, you can use the following commands to
send a message in schema1

```sh
gcloud pubsub topics publish ${GOOGLE_CLOUD_TOPIC} \
--project ${GOOGLE_CLOUD_PROJECT} \
--message '{"name": "New York", "post_abbr": "NY"}'
```

Or in schema2

```sh
gcloud pubsub topics publish ${GOOGLE_CLOUD_TOPIC} \
--project ${GOOGLE_CLOUD_PROJECT} \
--message '{"name": "New York", "post_abbr": "NY", "population": 10000}'
```

## Cleanup

To delete the created resources (topic, subscription, schema), run:

```shell
./cleanup.sh
```

## Platform Specific Notes

### macOS

gRPC [requires][grpc-roots-pem-bug] an environment variable to configure the
trust store for SSL certificates, you can download and configure this using:

```bash
curl -Lo roots.pem https://pki.google.com/roots.pem
export GRPC_DEFAULT_SSL_ROOTS_FILE_PATH="$PWD/roots.pem"
```

### Windows

gRPC [requires][grpc-roots-pem-bug] an environment variable to configure the
trust store for SSL certificates, you can download and configure this using:

```console
@powershell -NoProfile -ExecutionPolicy unrestricted -Command ^
(new-object System.Net.WebClient).Downloadfile( ^
'https://pki.google.com/roots.pem', 'roots.pem')
set GRPC_DEFAULT_SSL_ROOTS_FILE_PATH=%cd%\roots.pem
```

[avro c++]: https://avro.apache.org/docs/1.11.1/api/cpp/html/
[c++ cloud pub/sub]: https://cloud.google.com/cpp/docs/reference/pubsub/latest
[cloud pub/sub]: https://cloud.google.com/pubsub/docs
[grpc-roots-pem-bug]: https://github.com/grpc/grpc/issues/16571
[setup a c++ development environment]: cloud.google.com/cpp/docs/setup
20 changes: 20 additions & 0 deletions pubsub-avro/cleanup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
#
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Delete the topic, subscription, and schema.
gcloud pubsub topics delete ${GOOGLE_CLOUD_TOPIC} "--project=${GOOGLE_CLOUD_PROJECT}"
gcloud pubsub subscriptions delete ${GOOGLE_CLOUD_SUBSCRIPTION} "--project=${GOOGLE_CLOUD_PROJECT}"
gcloud pubsub schemas delete ${GOOGLE_CLOUD_SCHEMA_NAME} "--project=${GOOGLE_CLOUD_PROJECT}"
123 changes: 123 additions & 0 deletions pubsub-avro/quickstart.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "avro/Compiler.hh"
#include "avro/DataFile.hh"
#include "avro/Decoder.hh"
#include "avro/Stream.hh"
#include "avro/ValidSchema.hh"
#include "google/cloud/pubsub/message.h"
#include "google/cloud/pubsub/schema_client.h"
#include "google/cloud/pubsub/subscriber.h"
#include "google/cloud/pubsub/subscription.h"
#include "schema1.h"
#include "schema2.h"
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>

int main(int argc, char* argv[]) try {
if (argc != 4) {
std::cerr << "Usage: " << argv[0]
<< " <project-id> <subscription-id> <avro-file>\n";
return 1;
}

std::string const project_id = argv[1];
std::string const subscription_id = argv[2];
std::string const avro_file = argv[3];

auto constexpr kWaitTimeout = std::chrono::seconds(30);

// Create a namespace alias to make the code easier to read.
namespace pubsub = ::google::cloud::pubsub;

//! [START pubsub_subscribe_avro_records_with_revisions]
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(project_id, subscription_id)));

// Create a schema client.
auto schema_client =
pubsub::SchemaServiceClient(pubsub::MakeSchemaServiceConnection());

// Read the reader schema. This is the schema you want the messages to be
// evaluated using.
std::ifstream ifs(avro_file);
avro::ValidSchema reader_schema;
avro::compileJsonSchema(ifs, reader_schema);

std::unordered_map<std::string, avro::ValidSchema> revisions_to_schemas;
auto session = subscriber.Subscribe(
[&](pubsub::Message const& message, pubsub::AckHandler h) {
// Get the reader schema revision for the message.
auto schema_name = message.attributes()["googclient_schemaname"];
auto schema_revision_id =
message.attributes()["googclient_schemarevisionid"];
// If we haven't received a message with this schema, look it up.
if (revisions_to_schemas.find(schema_revision_id) ==
revisions_to_schemas.end()) {
auto schema_path = schema_name + "@" + schema_revision_id;
// Use the schema client to get the path.
auto schema = schema_client.GetSchema(schema_path);
if (!schema) {
std::cout << "Schema not found:" << schema_path << "\n";
return;
}
avro::ValidSchema writer_schema;
std::stringstream in;
in << schema.value().definition();
avro::compileJsonSchema(in, writer_schema);
revisions_to_schemas[schema_revision_id] = writer_schema;
}
auto writer_schema = revisions_to_schemas[schema_revision_id];

auto encoding = message.attributes()["googclient_schemaencoding"];
if (encoding == "JSON") {
std::stringstream in;
in << message.data();
auto avro_in = avro::istreamInputStream(in);
avro::DecoderPtr decoder = avro::resolvingDecoder(
writer_schema, reader_schema, avro::jsonDecoder(writer_schema));
decoder->init(*avro_in);

v2::State state;
avro::decode(*decoder, state);
std::cout << "Name: " << state.name << "\n";
std::cout << "Postal Abbreviation: " << state.post_abbr << "\n";
std::cout << "Population: " << state.population << "\n";
} else {
std::cout << "Unable to decode. Received message using encoding"
<< encoding << "\n";
}
std::move(h).ack();
});
// [END pubsub_subscribe_avro_records_with_revisions]

std::cout << "Waiting for messages on " + subscription_id + "...\n";

// Blocks until the timeout is reached.
auto result = session.wait_for(kWaitTimeout);
if (result == std::future_status::timeout) {
std::cout << "timeout reached, ending session\n";
session.cancel();
}

return 0;
} catch (google::cloud::Status const& status) {
std::cerr << "google::cloud::Status thrown: " << status << "\n";
return 1;
} catch (const std::exception& error) {
std::cout << error.what() << std::endl;
}
18 changes: 18 additions & 0 deletions pubsub-avro/schema1.avro
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"type": "record",
"name": "State",
"namespace": "utilities",
"doc": "A list of states in the United States of America.",
"fields": [
{
"name": "name",
"type": "string",
"doc": "The common name of the state."
},
{
"name": "post_abbr",
"type": "string",
"doc": "The postal code abbreviation of the state."
}
]
}
Loading

0 comments on commit 01a18f8

Please sign in to comment.