Skip to content

Commit 0579d49

Browse files
committed
FIX | super critical bug in http configs & create pipeling in send_file
1 parent 4b80854 commit 0579d49

File tree

3 files changed

+113
-132
lines changed

3 files changed

+113
-132
lines changed

main.cpp

Lines changed: 82 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -169,88 +169,88 @@ int main () {
169169
using http = manapi::net::http::server;
170170
// auto db = manapi::ext::pq::connection::create().unwrap();
171171

172-
/**
173-
* grpc
174-
*/
175-
176-
auto thrcntind = thrcnt.fetch_add(1);
177-
178-
auto service = std::make_shared<GreeterServiceImpl>();
179-
180-
auto grpc_server = manapi::net::wgrpc::server::create (grpc_server_ctx).unwrap();
181-
manapi::async::run([grpc_server, service, thrcntind] () mutable -> manapi::future<> {
182-
auto res = co_await grpc_server.config("/home/Timur/Desktop/WorkSpace/ManapiHTTP/cmake-build-debug/grpc.json");
183-
184-
res.log();
185-
186-
res = co_await grpc_server.start([&] (grpc::ServerBuilder &builder) -> manapi::error::status {
187-
builder.RegisterService(service.get());
188-
return manapi::error::status_ok();
189-
});
190-
191-
res.log();
192-
assert(res.ok());
193-
if (res.ok()) {
194-
auto creds = co_await manapi::net::wgrpc::secure_channel_credentials("/home/Timur/Documents/ssl/quic/cert.crt");
195-
if (!creds.ok()) {
196-
creds.err().log();
197-
co_return;
198-
}
199-
auto greeter = std::make_shared<GreeterClient>(grpc::CreateChannel("localhost:8080", creds.unwrap()));
200-
manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
201-
std::string user = "Xiadnoring Client #1";
202-
auto res = co_await greeter->SayHello(user);
203-
if (res.ok())
204-
std::cout << "Xiadnoring Client#1 =" << res.unwrap() << "\n";
205-
else
206-
res.err().log();
207-
});
208-
manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
209-
std::string user = "Xiadnoring Client #2";
210-
auto res = co_await greeter->SayHello(user);
211-
if (res.ok())
212-
std::cout << "Xiadnoring Client #2=" << res.unwrap() << "\n";
213-
else
214-
res.err().log();
215-
});
216-
manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
217-
std::string user = "Xiadnoring Client #3";
218-
auto res = co_await greeter->SayHello(user);
219-
if (res.ok())
220-
std::cout << "Xiadnoring Client #3=" << res.unwrap() << "\n";
221-
else
222-
res.err().log();
223-
});
224-
manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
225-
std::string user = "Xiadnoring Client #4";
226-
auto res = co_await greeter->SayHello(user);
227-
if (res.ok())
228-
std::cout << "Xiadnoring Client #4=" << res.unwrap() << "\n";
229-
else
230-
res.err().log();
231-
});
232-
manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
233-
std::string user = "Xiadnoring Client #5";
234-
auto res = co_await greeter->SayHello(user);
235-
if (res.ok())
236-
std::cout << "Xiadnoring Client #5=" << res.unwrap() << "\n";
237-
else
238-
res.err().log();
239-
});
240-
manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
241-
std::string user = "Xiadnoring Client #6";
242-
auto res = co_await greeter->SayHello(user);
243-
if (res.ok())
244-
std::cout << "Xiadnoring Client #6=" << res.unwrap() << "\n";
245-
else
246-
res.err().log();
247-
});
248-
}
249-
250-
}, [] (std::exception_ptr err) -> void {
251-
if (err)
252-
std::rethrow_exception(err);
253-
});
172+
// /**
173+
// * grpc
174+
// */
175+
//
176+
// auto thrcntind = thrcnt.fetch_add(1);
177+
//
178+
// auto service = std::make_shared<GreeterServiceImpl>();
179+
//
180+
// auto grpc_server = manapi::net::wgrpc::server::create (grpc_server_ctx).unwrap();
181+
// manapi::async::run([grpc_server, service, thrcntind] () mutable -> manapi::future<> {
182+
// auto res = co_await grpc_server.config("/home/Timur/Desktop/WorkSpace/ManapiHTTP/cmake-build-debug/grpc.json");
183+
//
184+
// res.log();
185+
//
186+
// res = co_await grpc_server.start([&] (grpc::ServerBuilder &builder) -> manapi::error::status {
187+
// builder.RegisterService(service.get());
188+
// return manapi::error::status_ok();
189+
// });
190+
//
191+
// res.log();
192+
// assert(res.ok());
193+
// if (res.ok()) {
194+
// auto creds = co_await manapi::net::wgrpc::secure_channel_credentials("/home/Timur/Documents/ssl/quic/cert.crt");
195+
// if (!creds.ok()) {
196+
// creds.err().log();
197+
// co_return;
198+
// }
199+
// auto greeter = std::make_shared<GreeterClient>(grpc::CreateChannel("localhost:8080", creds.unwrap()));
200+
// manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
201+
// std::string user = "Xiadnoring Client #1";
202+
// auto res = co_await greeter->SayHello(user);
203+
// if (res.ok())
204+
// std::cout << "Xiadnoring Client#1 =" << res.unwrap() << "\n";
205+
// else
206+
// res.err().log();
207+
// });
208+
// manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
209+
// std::string user = "Xiadnoring Client #2";
210+
// auto res = co_await greeter->SayHello(user);
211+
// if (res.ok())
212+
// std::cout << "Xiadnoring Client #2=" << res.unwrap() << "\n";
213+
// else
214+
// res.err().log();
215+
// });
216+
// manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
217+
// std::string user = "Xiadnoring Client #3";
218+
// auto res = co_await greeter->SayHello(user);
219+
// if (res.ok())
220+
// std::cout << "Xiadnoring Client #3=" << res.unwrap() << "\n";
221+
// else
222+
// res.err().log();
223+
// });
224+
// manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
225+
// std::string user = "Xiadnoring Client #4";
226+
// auto res = co_await greeter->SayHello(user);
227+
// if (res.ok())
228+
// std::cout << "Xiadnoring Client #4=" << res.unwrap() << "\n";
229+
// else
230+
// res.err().log();
231+
// });
232+
// manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
233+
// std::string user = "Xiadnoring Client #5";
234+
// auto res = co_await greeter->SayHello(user);
235+
// if (res.ok())
236+
// std::cout << "Xiadnoring Client #5=" << res.unwrap() << "\n";
237+
// else
238+
// res.err().log();
239+
// });
240+
// manapi::async::current()->timerpool()->append_interval_async(100, [greeter] (const manapi::timer &t) -> manapi::future<> {
241+
// std::string user = "Xiadnoring Client #6";
242+
// auto res = co_await greeter->SayHello(user);
243+
// if (res.ok())
244+
// std::cout << "Xiadnoring Client #6=" << res.unwrap() << "\n";
245+
// else
246+
// res.err().log();
247+
// });
248+
// }
249+
//
250+
// }, [] (std::exception_ptr err) -> void {
251+
// if (err)
252+
// std::rethrow_exception(err);
253+
// });
254254

255255
/**
256256
* http

src/http/ManapiBaseHttp.cpp

Lines changed: 30 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -738,8 +738,11 @@ void manapi::net::http::internal::send_response_sync_cb(std::unique_ptr<response
738738
cursor = 0;
739739
}
740740

741+
auto slice_vw = slices.subslice(0, total).unwrap();
742+
manapi_log_trace_hard("send size=%d fin=%d", slice_vw.size(), finish);
741743
auto const rhs = co_await cdata->worker->fwrite(
742-
cdata->conn, slices.subslice(0, total).unwrap(), finish);
744+
cdata->conn, slice_vw, finish);
745+
manapi_log_trace_hard("fin send");
743746

744747
if (rhs <= 0)
745748
co_return;
@@ -1199,77 +1202,55 @@ manapi::future<void> manapi::net::http::internal::send_file(std::unique_ptr<resp
11991202
auto const cdata = res->connection_data();
12001203

12011204
auto write_block = cdata->worker->bufferpool().slice(block_size).unwrap();
1202-
//auto read_block = cdata->worker->bufferpool().slice(block_size).unwrap();
1205+
auto read_block = cdata->worker->bufferpool().slice(block_size).unwrap();
12031206

12041207
ssize_t current = f.tellg();
12051208

12061209
size += current;
12071210

1208-
//async::parallel_run<ssize_t> parallel;
1209-
//auto parallel_status = async::parallel_run<ssize_t>::create();
1210-
//if (!parallel_status)
1211-
// co_return;
1211+
async::parallel_run<ssize_t> parallel;
1212+
auto parallel_status = async::parallel_run<ssize_t>::create();
1213+
if (!parallel_status)
1214+
co_return;
12121215

1213-
//parallel = parallel_status.unwrap();
1216+
parallel = parallel_status.unwrap();
12141217

12151218
ssize_t rhs;
12161219

1217-
// if ((rhs = co_await f.read(write_block.subslice(0,
1218-
// std::min(block_size, size - current)).unwrap())) <= 0) {
1219-
// co_return;
1220-
// }
1220+
if ((rhs = co_await f.read(write_block.subslice(0,
1221+
std::min(block_size, size - current)).unwrap())) <= 0) {
1222+
co_return;
1223+
}
12211224

12221225
try {
12231226
while (size > current) {
12241227
/* add count of the chars which will be sent at this iterration */
1225-
//current += rhs;
1226-
// bool readsome = size > current;
1227-
// if (readsome) {
1228-
// auto status = parallel.run(f.read(read_block.subslice(0,
1229-
// std::min(block_size, size - current)).unwrap()));
1230-
// if (!status) {
1231-
// manapi_log_trace(manapi::debug::LOG_TRACE_HIGH,
1232-
// "%s failed due to %.*s", "send_file", status.msg().size(), status.msg().data());
1233-
// break;
1234-
// }
1235-
1236-
rhs = co_await f.read(write_block.subslice(0,
1237-
std::min(block_size, size - current)).unwrap());
1238-
1239-
if (rhs <= 0) {
1240-
// manapi_log_trace(manapi::debug::LOG_TRACE_HIGH,
1241-
// "%s failed due to %s", "send_file", "read failed");
1228+
current += rhs;
1229+
bool readsome = size > current;
1230+
if (readsome) {
1231+
auto status = parallel.run(f.read(read_block.subslice(0,
1232+
std::min(block_size, size - current)).unwrap()));
1233+
if (!status) {
1234+
manapi_log_trace(manapi::debug::LOG_TRACE_HIGH,
1235+
"%s failed due to %.*s", "send_file", status.msg().size(), status.msg().data());
12421236
break;
12431237
}
1244-
//}
1238+
}
12451239

1246-
current += rhs;
1247-
//auto sv = write_block.subslice(0, rhs).unwrap();
1248-
// for (auto it = sv.begin(); it != sv.end(); it++) {
1249-
// if ((co_await cdata->worker->fwrite (cdata->conn, it.buffer(), it.size(), !readsome && it.is_last())) <= 0)
1250-
// /* failed to send */
1251-
// goto err;
1252-
// }
1253-
1254-
//assert(sv.size() == rhs);
1255-
if ((co_await cdata->worker->fwrite (cdata->conn, write_block, rhs, /*!readsome*/ size==current)) <= 0) {
1240+
auto sv = write_block.subslice(0, rhs).unwrap();
1241+
1242+
assert(sv.size() == rhs);
1243+
if ((co_await cdata->worker->fwrite (cdata->conn, sv, !readsome)) <= 0) {
12561244
manapi_log_trace(debug::LOG_TRACE_MEDIUM, "send_file() %p failed due to %s", cdata->conn.get(), "fwrite() <= 0");
12571245
/* failed to send */
12581246
goto err;
12591247
}
12601248

1261-
if (write_block.size() != block_size) {
1262-
write_block.remove_shift();
1263-
write_block.resize(block_size).unwrap();
1249+
if ((rhs = co_await parallel.get_or(0)) <= 0) {
1250+
break;
12641251
}
12651252

1266-
// if ((rhs = co_await parallel.get_or(0)) <= 0) {
1267-
// break;
1268-
// }
1269-
//
1270-
// std::swap(write_block, read_block);
1271-
1272-
//printf("%s STEP: %zi LEFT: %zi NEED: %zi CURRENT: %zi\n", res.get_file().data(), sent, left, size, current);
1253+
std::swap(write_block, read_block);
12731254
}
12741255

12751256

src/http/ManapiHttpConfig.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ manapi::net::http::config::config(const json &config) {
3333
this->http1_implementation = get_config_param<std::string>(config, "http1_implementation", "default");
3434
this->http2_implementation = get_config_param<std::string>(config, "http2_implementation", "default");
3535
this->http3_implementation = get_config_param<std::string>(config, "http3_implementation", "default");
36-
this->max_merge_buffer_stack = get_config_param<ssize_t>(config, "max_merge_buffer_stack", 16);
36+
this->max_merge_buffer_stack = get_config_param<ssize_t>(config, "max_merge_buffer_stack", 2);
3737
this->partial_data_min_size = get_config_param<ssize_t>(config, "partial_data_min_size", 0);
3838
this->max_buffer_stack = get_config_param<ssize_t>(config, "max_buffer_stack", 5);
3939
this->port = get_config_param<std::string>(config, "port", "8888");

0 commit comments

Comments
 (0)