Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions examples/features/http_sse/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
#
# Tencent is pleased to support the open source community by making tRPC available.
#
# Copyright (C) 2023 Tencent.
# All rights reserved.
#
# If you have downloaded a copy of the tRPC source code from Tencent,
# please note that tRPC source code is licensed under the Apache 2.0 License,
# A copy of the Apache 2.0 License is included in this file.
#
#

cmake_minimum_required(VERSION 3.14)

include(../cmake/common.cmake)

#---------------------------------------------------------------------------------------
# Compile project
#---------------------------------------------------------------------------------------
project(features_http_sse)

# compile server
file(GLOB SRC_FILES ${CMAKE_CURRENT_SOURCE_DIR}/server/*.cc)
add_executable(http_sse_server ${SRC_FILES})
target_link_libraries(http_sse_server ${LIBRARY})

# compile client
file(GLOB SRC_FILES ${CMAKE_CURRENT_SOURCE_DIR}/client/*.cc)
add_executable(sse_client ${SRC_FILES})
target_link_libraries(sse_client ${LIBRARY})
110 changes: 110 additions & 0 deletions examples/features/http_sse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# HTTP SSE (Server-Sent Events) Example

Server-Sent Events (SSE) is a standard allowing a server to send updates to a client over a single HTTP connection. Unlike traditional HTTP requests where the client sends a request and waits for a single response, SSE enables the server to push multiple updates to the client in real-time.

## Usage

We can use the following command to view the directory tree.
```shell
$ tree examples/features/http_sse/
examples/features/http_sse/
├── client
│ ├── BUILD
│ ├── sse_client.cc
│ └── trpc_cpp_fiber.yaml
├── CMakeLists.txt
├── README.md
├── run_cmake.sh
├── run.sh
└── server
├── BUILD
├── http_sse_server.cc
└── trpc_cpp_fiber.yaml
```

We can use the following script to quickly compile and run a program.
```shell
sh examples/features/http_sse/run.sh
```

* Compilation

We can run the following command to compile the client and server programs.

```shell
bazel build //examples/features/http_sse/server:http_sse_server
bazel build //examples/features/http_sse/client:sse_client
```

Alternatively, you can use cmake.
```shell
# build trpc-cpp libs first, if already build, just skip this build process.
$ mkdir -p build && cd build && cmake -DCMAKE_BUILD_TYPE=Release .. && make -j8 && cd -
# build examples/http_sse
$ mkdir -p examples/features/http_sse/build && cd examples/features/http_sse/build && cmake -DCMAKE_BUILD_TYPE=Release .. && make -j8 && cd -
```

* Run the server program

We can run the following command to start the server program.

*CMake build targets can be found at `build` of this directory, you can replace below server&client binary path when you use cmake to compile.*

```shell
bazel-bin/examples/features/http_sse/server/http_sse_server --config=examples/features/http_sse/server/trpc_cpp_fiber.yaml
```
* Use the curl command to test the server

```shell
curl -i -N http://127.0.0.1:24856/sse/test
```
* The curl test results are as follows

``` text
HTTP/1.1 200 OK
Connection: keep-alive
Content-Type: text/event-stream
Cache-Control: no-cache
Transfer-Encoding: chunked
Access-Control-Allow-Origin: *
Access-Control-Allow-Headers: Cache-Control

event: message
data: {"msg": "hello", "idx": 0}
id: 0

event: message
data: {"msg": "hello", "idx": 1}
id: 1

event: message
data: {"msg": "hello", "idx": 2}
id: 2
......


```
* Run the client program

We can run the following command to start the client program.

```shell
bazel-bin/examples/features/http_sse/client/sse_client --client_config=examples/features/http_sse/client/trpc_cpp_fiber.yaml
```

The content of the output from the client program is as follows:
``` text
Received SSE event - id: 0, event: message, data: {"msg": "hello", "idx": 0}
Received SSE event - id: 1, event: message, data: {"msg": "hello", "idx": 1}
Received SSE event - id: 2, event: message, data: {"msg": "hello", "idx": 2}
Received SSE event - id: 3, event: message, data: {"msg": "hello", "idx": 3}
Received SSE event - id: 4, event: message, data: {"msg": "hello", "idx": 4}
Received SSE event - id: 5, event: message, data: {"msg": "hello", "idx": 5}
Received SSE event - id: 6, event: message, data: {"msg": "hello", "idx": 6}
Received SSE event - id: 7, event: message, data: {"msg": "hello", "idx": 7}
Received SSE event - id: 8, event: message, data: {"msg": "hello", "idx": 8}
Received SSE event - id: 9, event: message, data: {"msg": "hello", "idx": 9}
SSE stream finished

```

18 changes: 18 additions & 0 deletions examples/features/http_sse/client/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package(default_visibility = ["//visibility:public"])

cc_binary(
name = "sse_client",
srcs = ["sse_client.cc"],
deps = [
"@trpc_cpp//trpc/client:make_client_context",
"@trpc_cpp//trpc/client:trpc_client",
"@trpc_cpp//trpc/client/sse:http_sse_proxy",
"@trpc_cpp//trpc/common:runtime_manager",
"@trpc_cpp//trpc/common:status",
"@trpc_cpp//trpc/common:trpc_plugin",
"@trpc_cpp//trpc/common/config:trpc_config",
"@trpc_cpp//trpc/coroutine:fiber",
"@trpc_cpp//trpc/util/log:logging",
"@com_github_gflags_gflags//:gflags",
],
)
14 changes: 14 additions & 0 deletions examples/features/http_sse/client/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ROOT_DIR="$(cd $DIR/../../../.. && pwd)"

echo "Building SSE client..."
cd $ROOT_DIR
bazel build //examples/features/http_sse/client:sse_client

echo "Running SSE client..."
$ROOT_DIR/bazel-bin/examples/features/http_sse/client/sse_client \
--client_config=$DIR/trpc_cpp_fiber.yaml \
--addr=127.0.0.1:24856 \
--path=/sse/test

182 changes: 182 additions & 0 deletions examples/features/http_sse/client/sse_client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 Tencent.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//#include <iostream>
#include <string>
#include <memory>
#include <chrono>

#include "gflags/gflags.h"
#include "trpc/client/make_client_context.h"
#include "trpc/client/trpc_client.h"
#include "trpc/client/sse/http_sse_proxy.h"
#include "trpc/common/config/trpc_config.h"
#include "trpc/common/runtime_manager.h"
#include "trpc/common/status.h"
#include "trpc/coroutine/fiber.h"
#include "trpc/util/log/logging.h"
#include "trpc/util/http/sse/sse_event.h"

DEFINE_string(service_name, "sse_client", "callee service name");
DEFINE_string(client_config, "trpc_cpp_fiber.yaml", "path to client config");
DEFINE_string(addr, "127.0.0.1:24856", "server ip:port");
DEFINE_string(path, "/sse/test", "SSE URL path");

namespace http::sse_demo {

using HttpSseProxyPtr = std::shared_ptr<::trpc::HttpSseProxy>;

// Callback-based SSE client using streaming approach
bool StartSseClient(const HttpSseProxyPtr& proxy) {
std::string url = "http://" + FLAGS_addr + FLAGS_path;
TRPC_FMT_INFO("StartSseClient connecting to {}", url);

auto ctx = ::trpc::MakeClientContext(proxy);

// Set very long timeout for the context
ctx->SetTimeout(120000); // 120 seconds

// Create the reader
auto reader = proxy->Get(ctx, url);
if (!reader.IsValid()) {
TRPC_FMT_ERROR("Failed to create SSE stream reader");
return false;
}

// Event callback
auto event_callback = [](const ::trpc::http::sse::SseEvent& event) {
std::string id_str = event.id.has_value() ? event.id.value() : "";
TRPC_FMT_INFO("Received SSE event - id: {}, event: {}, data: {}",
id_str, event.event_type, event.data);
};

// Start streaming with the new non-blocking approach
::trpc::Status status = reader.StartStreaming(event_callback, 30000); // 30 second timeout for reads
if (!status.OK()) {
TRPC_FMT_ERROR("Failed to start SSE streaming: {}", status.ToString());
return false;
}

TRPC_FMT_INFO("SSE client started successfully with streaming (callback-based)");

// Wait for events to be received
::trpc::FiberSleepFor(std::chrono::seconds(15)); // Wait for callback to print events

return true;
}

// Manual read SSE client - using streaming approach
bool GetSseClient(const HttpSseProxyPtr& proxy) {
std::string url = "http://" + FLAGS_addr + FLAGS_path;
TRPC_FMT_INFO("GetSseClient connecting to {}", url);
TRPC_FMT_DEBUG("Fiber Scheduler running: {}", ::trpc::IsRunningInFiberWorker());

auto ctx = ::trpc::MakeClientContext(proxy);

// Set very long timeout for the context
ctx->SetTimeout(120000); // 120 seconds

// Create the reader
::trpc::HttpSseStreamReader reader = proxy->Get(ctx, url);
if (!reader.IsValid()) {
TRPC_FMT_ERROR("Failed to create SSE stream reader");
return false;
}

// For manual reading, we'll use the streaming approach but with a different pattern
TRPC_FMT_INFO("Using streaming approach for manual SSE reading");

// We'll use a flag to control the reading loop
bool should_continue = true;
int event_count = 0;
const int max_events = 10;

// Start streaming with a callback that stores events
auto event_callback = [&should_continue, &event_count, max_events](const ::trpc::http::sse::SseEvent& event) {
std::string id_str = event.id.has_value() ? event.id.value() : "";
TRPC_FMT_INFO("Received SSE event - id: {}, event: {}, data: {}",
id_str, event.event_type, event.data);

event_count++;
if (event_count >= max_events) {
should_continue = false;
}
};

// Start streaming
::trpc::Status status = reader.StartStreaming(event_callback, 30000); // 30 second timeout for reads
if (!status.OK()) {
TRPC_FMT_ERROR("Failed to start SSE streaming: {}", status.ToString());
return false;
}

TRPC_FMT_INFO("SSE streaming started successfully (manual reading)");

// Wait for events to be received
for (int i = 0; i < 15 && should_continue; i++) {
::trpc::FiberSleepFor(std::chrono::seconds(1));
}

return true;
}

int Run() {
bool final_ok = true;

::trpc::ServiceProxyOption option;
option.name = FLAGS_service_name;
option.codec_name = "http";
option.network = "tcp";
option.conn_type = "long"; // Long connection
option.timeout = 180000; // 180 seconds timeout
option.selector_name = "direct";
option.target = FLAGS_addr;

auto sse_client = ::trpc::GetTrpcClient()->GetProxy<::trpc::HttpSseProxy>(FLAGS_service_name, option);

TRPC_FMT_INFO("Testing SSE client with Start API (callback-based)");
if (!StartSseClient(sse_client)) final_ok = false;

::trpc::FiberSleepFor(std::chrono::seconds(3));

TRPC_FMT_INFO("Testing SSE client with Get API (manual reading)");
if (!GetSseClient(sse_client)) final_ok = false;

std::cout << "Final SSE result: " << final_ok << std::endl;
return final_ok ? 0 : -1;
}

} // namespace http::sse_demo

void ParseClientConfig(int argc, char* argv[]) {
google::ParseCommandLineFlags(&argc, &argv, true);
google::CommandLineFlagInfo info;
if (GetCommandLineFlagInfo("client_config", &info) && info.is_default) {
std::cerr << "start client with client_config, for example: " << argv[0]
<< " --client_config=/client/client_config/filepath" << std::endl;
exit(-1);
}
std::cout << "FLAGS_service_name: " << FLAGS_service_name << std::endl;
std::cout << "FLAGS_client_config: " << FLAGS_client_config << std::endl;
std::cout << "FLAGS_addr: " << FLAGS_addr << std::endl;
std::cout << "FLAGS_path: " << FLAGS_path << std::endl;
}

int main(int argc, char* argv[]) {
ParseClientConfig(argc, argv);

if (::trpc::TrpcConfig::GetInstance()->Init(FLAGS_client_config) != 0) {
std::cerr << "load client_config failed." << std::endl;
return -1;
}

return ::trpc::RunInTrpcRuntime([]() { return http::sse_demo::Run(); });
}
21 changes: 21 additions & 0 deletions examples/features/http_sse/client/trpc_cpp_fiber.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
global:
threadmodel:
fiber:
- instance_name: fiber_instance
concurrency_hint: 4
scheduling_group_size: 4
reactor_num_per_scheduling_group: 1

plugins:
log:
default:
- name: default
min_level: 1 # 0-trace, 1-debug, 2-info, 3-warn, 4-error, 5-critical
format: "[%Y-%m-%d %H:%M:%S.%e] [thread %t] [%l] [%@] %v"
mode: 1 # 1-sync 2-async, 3-fast
sinks:
local_file:
eol: true
filename: sse_client.log
stdout:
eol: true
Loading