Skip to content

Commit cfd5eca

Browse files
committed
http sse implementation
1 parent 9eae12f commit cfd5eca

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+8067
-170
lines changed

-remote --heads origin

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
2+
SSUUMMMMAARRYY OOFF LLEESSSS CCOOMMMMAANNDDSS
3+
4+
Commands marked with * may be preceded by a number, _N.
5+
Notes in parentheses indicate the behavior if _N is given.
6+
A key preceded by a caret indicates the Ctrl key; thus ^K is ctrl-K.
7+
8+
h H Display this help.
9+
q :q Q :Q ZZ Exit.
10+
---------------------------------------------------------------------------
11+
12+
MMOOVVIINNGG
13+
14+
e ^E j ^N CR * Forward one line (or _N lines).
15+
y ^Y k ^K ^P * Backward one line (or _N lines).
16+
ESC-j * Forward one file line (or _N file lines).
17+
ESC-k * Backward one file line (or _N file lines).
18+
f ^F ^V SPACE * Forward one window (or _N lines).
19+
b ^B ESC-v * Backward one window (or _N lines).
20+
z * Forward one window (and set window to _N).
21+
w * Backward one window (and set window to _N).
22+
ESC-SPACE * Forward one window, but don't stop at end-of-file.
23+
ESC-b * Backward one window, but don't stop at beginning-of-file.
24+
d ^D * Forward one half-window (and set half-window to _N).
25+
u ^U * Backward one half-window (and set half-window to _N).
26+
ESC-) RightArrow * Right one half screen width (or _N positions).
27+
ESC-( LeftArrow * Left one half screen width (or _N positions).
28+
ESC-} ^RightArrow Right to last column displayed.
29+
ESC-{ ^LeftArrow Left to first column.
30+
F Forward forever; like "tail -f".
31+
ESC-F Like F but stop when search pattern is found.
32+
r ^R ^L Repaint screen.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
#
3+
# Tencent is pleased to support the open source community by making tRPC available.
4+
#
5+
# Copyright (C) 2023 Tencent.
6+
# All rights reserved.
7+
#
8+
# If you have downloaded a copy of the tRPC source code from Tencent,
9+
# please note that tRPC source code is licensed under the Apache 2.0 License,
10+
# A copy of the Apache 2.0 License is included in this file.
11+
#
12+
#
13+
14+
cmake_minimum_required(VERSION 3.14)
15+
16+
include(../cmake/common.cmake)
17+
18+
#---------------------------------------------------------------------------------------
19+
# Compile project
20+
#---------------------------------------------------------------------------------------
21+
project(features_http_sse)
22+
23+
# compile server
24+
file(GLOB SRC_FILES ${CMAKE_CURRENT_SOURCE_DIR}/server/*.cc)
25+
add_executable(http_sse_server ${SRC_FILES})
26+
target_link_libraries(http_sse_server ${LIBRARY})
27+
28+
# compile client
29+
file(GLOB SRC_FILES ${CMAKE_CURRENT_SOURCE_DIR}/client/*.cc)
30+
add_executable(sse_client ${SRC_FILES})
31+
target_link_libraries(sse_client ${LIBRARY})
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# HTTP SSE (Server-Sent Events) Example
2+
3+
Server-Sent Events (SSE) is a standard allowing a server to send updates to a client over a single HTTP connection. Unlike traditional HTTP requests where the client sends a request and waits for a single response, SSE enables the server to push multiple updates to the client in real-time.
4+
5+
## Usage
6+
7+
We can use the following command to view the directory tree.
8+
```shell
9+
$ tree examples/features/http_sse/
10+
examples/features/http_sse/
11+
├── client
12+
│ ├── BUILD
13+
│ ├── sse_client.cc
14+
│ └── trpc_cpp_fiber.yaml
15+
├── CMakeLists.txt
16+
├── README.md
17+
├── run_cmake.sh
18+
├── run.sh
19+
└── server
20+
├── BUILD
21+
├── http_sse_server.cc
22+
└── trpc_cpp_fiber.yaml
23+
```
24+
25+
We can use the following script to quickly compile and run a program.
26+
```shell
27+
sh examples/features/http_sse/run.sh
28+
```
29+
30+
* Compilation
31+
32+
We can run the following command to compile the client and server programs.
33+
34+
```shell
35+
bazel build //examples/features/http_sse/server:http_sse_server
36+
bazel build //examples/features/http_sse/client:sse_client
37+
```
38+
39+
Alternatively, you can use cmake.
40+
```shell
41+
# build trpc-cpp libs first, if already build, just skip this build process.
42+
$ mkdir -p build && cd build && cmake -DCMAKE_BUILD_TYPE=Release .. && make -j8 && cd -
43+
# build examples/http_sse
44+
$ mkdir -p examples/features/http_sse/build && cd examples/features/http_sse/build && cmake -DCMAKE_BUILD_TYPE=Release .. && make -j8 && cd -
45+
```
46+
47+
* Run the server program
48+
49+
We can run the following command to start the server program.
50+
51+
*CMake build targets can be found at `build` of this directory, you can replace below server&client binary path when you use cmake to compile.*
52+
53+
```shell
54+
bazel-bin/examples/features/http_sse/server/http_sse_server --config=examples/features/http_sse/server/trpc_cpp_fiber.yaml
55+
```
56+
* Use the curl command to test the server
57+
58+
```shell
59+
curl -i -N http://127.0.0.1:24856/sse/test
60+
```
61+
* The curl test results are as follows
62+
63+
``` text
64+
HTTP/1.1 200 OK
65+
Connection: keep-alive
66+
Content-Type: text/event-stream
67+
Cache-Control: no-cache
68+
Transfer-Encoding: chunked
69+
Access-Control-Allow-Origin: *
70+
Access-Control-Allow-Headers: Cache-Control
71+
72+
event: message
73+
data: {"msg": "hello", "idx": 0}
74+
id: 0
75+
76+
event: message
77+
data: {"msg": "hello", "idx": 1}
78+
id: 1
79+
80+
event: message
81+
data: {"msg": "hello", "idx": 2}
82+
id: 2
83+
......
84+
85+
86+
```
87+
* Run the client program
88+
89+
We can run the following command to start the client program.
90+
91+
```shell
92+
bazel-bin/examples/features/http_sse/client/sse_client --client_config=examples/features/http_sse/client/trpc_cpp_fiber.yaml
93+
```
94+
95+
The content of the output from the client program is as follows:
96+
``` text
97+
Received SSE event - id: 0, event: message, data: {"msg": "hello", "idx": 0}
98+
Received SSE event - id: 1, event: message, data: {"msg": "hello", "idx": 1}
99+
Received SSE event - id: 2, event: message, data: {"msg": "hello", "idx": 2}
100+
Received SSE event - id: 3, event: message, data: {"msg": "hello", "idx": 3}
101+
Received SSE event - id: 4, event: message, data: {"msg": "hello", "idx": 4}
102+
Received SSE event - id: 5, event: message, data: {"msg": "hello", "idx": 5}
103+
Received SSE event - id: 6, event: message, data: {"msg": "hello", "idx": 6}
104+
Received SSE event - id: 7, event: message, data: {"msg": "hello", "idx": 7}
105+
Received SSE event - id: 8, event: message, data: {"msg": "hello", "idx": 8}
106+
Received SSE event - id: 9, event: message, data: {"msg": "hello", "idx": 9}
107+
SSE stream finished
108+
109+
```
110+
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package(default_visibility = ["//visibility:public"])
2+
3+
cc_binary(
4+
name = "sse_client",
5+
srcs = ["sse_client.cc"],
6+
deps = [
7+
"@trpc_cpp//trpc/client:make_client_context",
8+
"@trpc_cpp//trpc/client:trpc_client",
9+
"@trpc_cpp//trpc/client/sse:http_sse_proxy",
10+
"@trpc_cpp//trpc/common:runtime_manager",
11+
"@trpc_cpp//trpc/common:status",
12+
"@trpc_cpp//trpc/common:trpc_plugin",
13+
"@trpc_cpp//trpc/common/config:trpc_config",
14+
"@trpc_cpp//trpc/coroutine:fiber",
15+
"@trpc_cpp//trpc/util/log:logging",
16+
"@com_github_gflags_gflags//:gflags",
17+
],
18+
)
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/bin/bash
2+
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
3+
ROOT_DIR="$(cd $DIR/../../../.. && pwd)"
4+
5+
echo "Building SSE client..."
6+
cd $ROOT_DIR
7+
bazel build //examples/features/http_sse/client:sse_client
8+
9+
echo "Running SSE client..."
10+
$ROOT_DIR/bazel-bin/examples/features/http_sse/client/sse_client \
11+
--client_config=$DIR/trpc_cpp_fiber.yaml \
12+
--addr=127.0.0.1:24856 \
13+
--path=/sse/test
14+
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
//
2+
//
3+
// Tencent is pleased to support the open source community by making tRPC available.
4+
//
5+
// Copyright (C) 2023 Tencent.
6+
// All rights reserved.
7+
//
8+
// If you have downloaded a copy of the tRPC source code from Tencent,
9+
// please note that tRPC source code is licensed under the Apache 2.0 License,
10+
// A copy of the Apache 2.0 License is included in this file.
11+
//
12+
//#include <iostream>
13+
#include <string>
14+
#include <memory>
15+
#include <chrono>
16+
17+
#include "gflags/gflags.h"
18+
#include "trpc/client/make_client_context.h"
19+
#include "trpc/client/trpc_client.h"
20+
#include "trpc/client/sse/http_sse_proxy.h"
21+
#include "trpc/common/config/trpc_config.h"
22+
#include "trpc/common/runtime_manager.h"
23+
#include "trpc/common/status.h"
24+
#include "trpc/coroutine/fiber.h"
25+
#include "trpc/util/log/logging.h"
26+
#include "trpc/util/http/sse/sse_event.h"
27+
28+
DEFINE_string(service_name, "sse_client", "callee service name");
29+
DEFINE_string(client_config, "trpc_cpp_fiber.yaml", "path to client config");
30+
DEFINE_string(addr, "127.0.0.1:24856", "server ip:port");
31+
DEFINE_string(path, "/sse/test", "SSE URL path");
32+
33+
namespace http::sse_demo {
34+
35+
using HttpSseProxyPtr = std::shared_ptr<::trpc::HttpSseProxy>;
36+
37+
// Callback-based SSE client using streaming approach
38+
bool StartSseClient(const HttpSseProxyPtr& proxy) {
39+
std::string url = "http://" + FLAGS_addr + FLAGS_path;
40+
TRPC_FMT_INFO("StartSseClient connecting to {}", url);
41+
42+
auto ctx = ::trpc::MakeClientContext(proxy);
43+
44+
// Set very long timeout for the context
45+
ctx->SetTimeout(120000); // 120 seconds
46+
47+
// Create the reader
48+
auto reader = proxy->Get(ctx, url);
49+
if (!reader.IsValid()) {
50+
TRPC_FMT_ERROR("Failed to create SSE stream reader");
51+
return false;
52+
}
53+
54+
// Event callback
55+
auto event_callback = [](const ::trpc::http::sse::SseEvent& event) {
56+
std::string id_str = event.id.has_value() ? event.id.value() : "";
57+
TRPC_FMT_INFO("Received SSE event - id: {}, event: {}, data: {}",
58+
id_str, event.event_type, event.data);
59+
};
60+
61+
// Start streaming with the new non-blocking approach
62+
::trpc::Status status = reader.StartStreaming(event_callback, 30000); // 30 second timeout for reads
63+
if (!status.OK()) {
64+
TRPC_FMT_ERROR("Failed to start SSE streaming: {}", status.ToString());
65+
return false;
66+
}
67+
68+
TRPC_FMT_INFO("SSE client started successfully with streaming (callback-based)");
69+
70+
// Wait for events to be received
71+
::trpc::FiberSleepFor(std::chrono::seconds(15)); // Wait for callback to print events
72+
73+
return true;
74+
}
75+
76+
// Manual read SSE client - using streaming approach
77+
bool GetSseClient(const HttpSseProxyPtr& proxy) {
78+
std::string url = "http://" + FLAGS_addr + FLAGS_path;
79+
TRPC_FMT_INFO("GetSseClient connecting to {}", url);
80+
TRPC_FMT_DEBUG("Fiber Scheduler running: {}", ::trpc::IsRunningInFiberWorker());
81+
82+
auto ctx = ::trpc::MakeClientContext(proxy);
83+
84+
// Set very long timeout for the context
85+
ctx->SetTimeout(120000); // 120 seconds
86+
87+
// Create the reader
88+
::trpc::HttpSseStreamReader reader = proxy->Get(ctx, url);
89+
if (!reader.IsValid()) {
90+
TRPC_FMT_ERROR("Failed to create SSE stream reader");
91+
return false;
92+
}
93+
94+
// For manual reading, we'll use the streaming approach but with a different pattern
95+
TRPC_FMT_INFO("Using streaming approach for manual SSE reading");
96+
97+
// We'll use a flag to control the reading loop
98+
bool should_continue = true;
99+
int event_count = 0;
100+
const int max_events = 10;
101+
102+
// Start streaming with a callback that stores events
103+
auto event_callback = [&should_continue, &event_count, max_events](const ::trpc::http::sse::SseEvent& event) {
104+
std::string id_str = event.id.has_value() ? event.id.value() : "";
105+
TRPC_FMT_INFO("Received SSE event - id: {}, event: {}, data: {}",
106+
id_str, event.event_type, event.data);
107+
108+
event_count++;
109+
if (event_count >= max_events) {
110+
should_continue = false;
111+
}
112+
};
113+
114+
// Start streaming
115+
::trpc::Status status = reader.StartStreaming(event_callback, 30000); // 30 second timeout for reads
116+
if (!status.OK()) {
117+
TRPC_FMT_ERROR("Failed to start SSE streaming: {}", status.ToString());
118+
return false;
119+
}
120+
121+
TRPC_FMT_INFO("SSE streaming started successfully (manual reading)");
122+
123+
// Wait for events to be received
124+
for (int i = 0; i < 15 && should_continue; i++) {
125+
::trpc::FiberSleepFor(std::chrono::seconds(1));
126+
}
127+
128+
return true;
129+
}
130+
131+
int Run() {
132+
bool final_ok = true;
133+
134+
::trpc::ServiceProxyOption option;
135+
option.name = FLAGS_service_name;
136+
option.codec_name = "http";
137+
option.network = "tcp";
138+
option.conn_type = "long"; // Long connection
139+
option.timeout = 180000; // 180 seconds timeout
140+
option.selector_name = "direct";
141+
option.target = FLAGS_addr;
142+
143+
auto sse_client = ::trpc::GetTrpcClient()->GetProxy<::trpc::HttpSseProxy>(FLAGS_service_name, option);
144+
145+
TRPC_FMT_INFO("Testing SSE client with Start API (callback-based)");
146+
if (!StartSseClient(sse_client)) final_ok = false;
147+
148+
::trpc::FiberSleepFor(std::chrono::seconds(3));
149+
150+
TRPC_FMT_INFO("Testing SSE client with Get API (manual reading)");
151+
if (!GetSseClient(sse_client)) final_ok = false;
152+
153+
std::cout << "Final SSE result: " << final_ok << std::endl;
154+
return final_ok ? 0 : -1;
155+
}
156+
157+
} // namespace http::sse_demo
158+
159+
void ParseClientConfig(int argc, char* argv[]) {
160+
google::ParseCommandLineFlags(&argc, &argv, true);
161+
google::CommandLineFlagInfo info;
162+
if (GetCommandLineFlagInfo("client_config", &info) && info.is_default) {
163+
std::cerr << "start client with client_config, for example: " << argv[0]
164+
<< " --client_config=/client/client_config/filepath" << std::endl;
165+
exit(-1);
166+
}
167+
std::cout << "FLAGS_service_name: " << FLAGS_service_name << std::endl;
168+
std::cout << "FLAGS_client_config: " << FLAGS_client_config << std::endl;
169+
std::cout << "FLAGS_addr: " << FLAGS_addr << std::endl;
170+
std::cout << "FLAGS_path: " << FLAGS_path << std::endl;
171+
}
172+
173+
int main(int argc, char* argv[]) {
174+
ParseClientConfig(argc, argv);
175+
176+
if (::trpc::TrpcConfig::GetInstance()->Init(FLAGS_client_config) != 0) {
177+
std::cerr << "load client_config failed." << std::endl;
178+
return -1;
179+
}
180+
181+
return ::trpc::RunInTrpcRuntime([]() { return http::sse_demo::Run(); });
182+
}

0 commit comments

Comments
 (0)