Skip to content

Commit

Permalink
feat: support set TOS in curl (#2106)
Browse files Browse the repository at this point in the history
* feat: support set TOS in curl

* fix lint

* fix

* fix

* fix

* fix

* fix
  • Loading branch information
Abingcbc authored Feb 21, 2025
1 parent e36a7c9 commit 256b281
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 15 deletions.
29 changes: 26 additions & 3 deletions core/common/http/Curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@
#include "common/http/Curl.h"

#include <cstdint>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <sys/socket.h>

#include <map>
#include <string>

#include "app_config/AppConfig.h"
#include "common/DNSCache.h"
#include "common/Flags.h"
#include "common/StringTools.h"
#include "common/http/HttpRequest.h"
#include "common/http/HttpResponse.h"
#include "logger/Logger.h"

DECLARE_FLAG_INT32(curl_ip_dscp);

using namespace std;

namespace logtail {
Expand Down Expand Up @@ -99,6 +106,14 @@ static size_t header_write_callback(char* buffer,
return sizes;
}

static size_t socket_write_callback(void* socketData, curl_socket_t fd, curlsocktype purpose) {
auto* socket = static_cast<CurlSocket*>(socketData);
if (socket->mTOS.has_value()) {
setsockopt(fd, IPPROTO_IP, IP_TOS, &socket->mTOS, sizeof(socket->mTOS));
}
return 0;
}

CURL* CreateCurlHandler(const string& method,
bool httpsFlag,
const string& host,
Expand All @@ -113,7 +128,9 @@ CURL* CreateCurlHandler(const string& method,
bool replaceHostWithIp,
const string& intf,
bool followRedirects,
optional<CurlTLS> tls) {
const optional<CurlTLS>& tls,
const optional<CurlSocket>& socket // socket is used async, the lifespan must be longer
) {
static DnsCache* dnsCache = DnsCache::GetInstance();

CURL* curl = curl_easy_init();
Expand Down Expand Up @@ -182,6 +199,10 @@ CURL* CreateCurlHandler(const string& method,
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
curl_easy_setopt(curl, CURLOPT_TCP_NODELAY, 1);
curl_easy_setopt(curl, CURLOPT_NETRC, CURL_NETRC_IGNORED);
if (socket.has_value()) {
curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA, &socket.value());
curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, socket_write_callback);
}

return curl;
}
Expand All @@ -202,7 +223,8 @@ bool SendHttpRequest(unique_ptr<HttpRequest>&& request, HttpResponse& response)
AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(),
AppConfig::GetInstance()->GetBindInterface(),
request->mFollowRedirects,
request->mTls);
request->mTls,
request->mSocket);
if (curl == NULL) {
LOG_ERROR(sLogger,
("failed to init curl handler", "failed to init curl client")("request address", request.get()));
Expand Down Expand Up @@ -262,7 +284,8 @@ bool AddRequestToMultiCurlHandler(CURLM* multiCurl, unique_ptr<AsynHttpRequest>&
AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(),
AppConfig::GetInstance()->GetBindInterface(),
request->mFollowRedirects,
request->mTls);
request->mTls,
request->mSocket);
if (curl == NULL) {
request->mResponse.SetNetworkStatus(NetworkCode::Other, "failed to init curl handler");
LOG_ERROR(sLogger, ("failed to send request", "failed to init curl handler")("request address", request.get()));
Expand Down
4 changes: 3 additions & 1 deletion core/common/http/Curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <map>
#include <memory>
#include <optional>
#include <string>

#include "curl/curl.h"
Expand All @@ -45,7 +46,8 @@ CURL* CreateCurlHandler(const std::string& method,
bool replaceHostWithIp = true,
const std::string& intf = "",
bool followRedirects = false,
std::optional<CurlTLS> tls = std::nullopt);
const std::optional<CurlTLS>& tls = std::nullopt,
const std::optional<CurlSocket>& socket = std::nullopt);

bool SendHttpRequest(std::unique_ptr<HttpRequest>&& request, HttpResponse& response);

Expand Down
25 changes: 21 additions & 4 deletions core/common/http/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>

#include <map>
#include <optional>
#include <string>
#include <utility>

Expand All @@ -38,6 +39,17 @@ struct CurlTLS {
bool mInsecureSkipVerify = true;
};

struct CurlSocket {
// TOS 8 bits: first 6 bits are DSCP (user customized), last 2 bits are ECN (auto set by OS)
std::optional<uint32_t> mTOS;

CurlSocket(int32_t dscp) {
if (dscp >= 0 && dscp <= 63) {
mTOS = dscp << 2;
}
}
};

struct HttpRequest {
std::string mMethod;
// TODO: upgrade curl to 7.62, and replace the following 4 members
Expand All @@ -54,6 +66,7 @@ struct HttpRequest {
uint32_t mMaxTryCnt = static_cast<uint32_t>(INT32_FLAG(default_http_request_max_try_cnt));
bool mFollowRedirects = false;
std::optional<CurlTLS> mTls = std::nullopt;
std::optional<CurlSocket> mSocket = std::nullopt;

uint32_t mTryCnt = 1;
std::chrono::system_clock::time_point mLastSendTime;
Expand All @@ -69,7 +82,8 @@ struct HttpRequest {
uint32_t timeout = static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)),
uint32_t maxTryCnt = static_cast<uint32_t>(INT32_FLAG(default_http_request_max_try_cnt)),
bool followRedirects = false,
std::optional<CurlTLS> tls = std::nullopt)
std::optional<CurlTLS> tls = std::nullopt,
std::optional<CurlSocket> socket = std::nullopt)
: mMethod(method),
mHTTPSFlag(httpsFlag),
mUrl(url),
Expand All @@ -81,7 +95,8 @@ struct HttpRequest {
mTimeout(timeout),
mMaxTryCnt(maxTryCnt),
mFollowRedirects(followRedirects),
mTls(std::move(tls)) {}
mTls(std::move(tls)),
mSocket(std::move(socket)) {}
virtual ~HttpRequest() = default;
};

Expand All @@ -102,7 +117,8 @@ struct AsynHttpRequest : public HttpRequest {
uint32_t timeout = static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)),
uint32_t maxTryCnt = static_cast<uint32_t>(INT32_FLAG(default_http_request_max_try_cnt)),
bool followRedirects = false,
std::optional<CurlTLS> tls = std::nullopt)
std::optional<CurlTLS> tls = std::nullopt,
std::optional<CurlSocket> socket = std::nullopt)
: HttpRequest(method,
httpsFlag,
host,
Expand All @@ -114,7 +130,8 @@ struct AsynHttpRequest : public HttpRequest {
timeout,
maxTryCnt,
followRedirects,
std::move(tls)),
std::move(tls),
std::move(socket)),
mResponse(std::move(response)) {}

virtual bool IsContextValid() const = 0;
Expand Down
4 changes: 3 additions & 1 deletion core/common/http/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class curl_slist;
namespace logtail {

struct CurlTLS;
struct CurlSocket;

enum NetworkCode {
Ok = 0,
Expand Down Expand Up @@ -69,7 +70,8 @@ class HttpResponse {
bool replaceHostWithIp,
const std::string& intf,
bool followRedirects,
std::optional<CurlTLS> tls);
const std::optional<CurlTLS>& tls,
const std::optional<CurlSocket>& socket);

public:
HttpResponse()
Expand Down
12 changes: 9 additions & 3 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
#include "collection_pipeline/queue/SLSSenderQueueItem.h"
#include "collection_pipeline/queue/SenderQueueManager.h"
#include "common/EndpointUtil.h"
#include "common/Flags.h"
#include "common/HashUtil.h"
#include "common/LogtailCommonFlags.h"
#include "common/ParamExtractor.h"
#include "common/TimeUtil.h"
#include "common/compression/CompressorFactory.h"
#include "common/http/Constant.h"
#include "common/http/HttpRequest.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/flusher/sls/PackIdManager.h"
#include "plugin/flusher/sls/SLSClientManager.h"
Expand Down Expand Up @@ -58,6 +60,7 @@ DEFINE_FLAG_INT32(unknow_error_try_max, "discard data when try times > this valu
DEFINE_FLAG_BOOL(enable_metricstore_channel, "only works for metrics data for enhance metrics query performance", true);
DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024);
DEFINE_FLAG_DOUBLE(sls_serialize_size_expansion_ratio, "", 1.2);
DEFINE_FLAG_INT32(sls_request_dscp, "set dscp for sls request, from 0 to 63", -1);

DECLARE_FLAG_BOOL(send_prefer_real_ip);

Expand Down Expand Up @@ -1207,7 +1210,8 @@ unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostLogStoreLogsRequest(const stri
item->mData,
item,
INT32_FLAG(default_http_request_timeout_sec),
1);
1,
CurlSocket(INT32_FLAG(sls_request_dscp)));
}

unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostMetricStoreLogsRequest(const string& accessKeyId,
Expand Down Expand Up @@ -1239,7 +1243,8 @@ unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostMetricStoreLogsRequest(const s
item->mData,
item,
INT32_FLAG(default_http_request_timeout_sec),
1);
1,
CurlSocket(INT32_FLAG(sls_request_dscp)));
}

unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostAPMBackendRequest(const string& accessKeyId,
Expand Down Expand Up @@ -1274,7 +1279,8 @@ unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostAPMBackendRequest(const string
item->mData,
item,
INT32_FLAG(default_http_request_timeout_sec),
1);
1,
CurlSocket(INT32_FLAG(sls_request_dscp)));
}

sls_logs::SlsCompressType ConvertCompressType(CompressType type) {
Expand Down
7 changes: 6 additions & 1 deletion core/runner/sink/http/HttpSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "runner/sink/http/HttpSink.h"

#include <optional>

#include "app_config/AppConfig.h"
#include "collection_pipeline/plugin/interface/HttpFlusher.h"
#include "collection_pipeline/queue/QueueKeyManager.h"
Expand Down Expand Up @@ -131,7 +133,10 @@ bool HttpSink::AddRequestToClient(unique_ptr<HttpSinkRequest>&& request) {
headers,
request->mTimeout,
AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(),
AppConfig::GetInstance()->GetBindInterface());
AppConfig::GetInstance()->GetBindInterface(),
false,
std::nullopt,
std::move(request->mSocket));
if (curl == nullptr) {
request->mItem->mStatus = SendingStatus::IDLE;
request->mResponse.SetNetworkStatus(NetworkCode::Other, "failed to init curl handler");
Expand Down
20 changes: 18 additions & 2 deletions core/runner/sink/http/HttpSinkRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include <optional>

#include "collection_pipeline/queue/SenderQueueItem.h"
#include "common/http/HttpRequest.h"

Expand All @@ -34,8 +36,22 @@ struct HttpSinkRequest : public AsynHttpRequest {
const std::string& body,
SenderQueueItem* item,
uint32_t timeout = static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_sec)),
uint32_t maxTryCnt = static_cast<uint32_t>(INT32_FLAG(default_http_request_max_try_cnt)))
: AsynHttpRequest(method, httpsFlag, host, port, url, query, header, body, HttpResponse(), timeout, maxTryCnt),
uint32_t maxTryCnt = static_cast<uint32_t>(INT32_FLAG(default_http_request_max_try_cnt)),
std::optional<CurlSocket> socket = std::nullopt)
: AsynHttpRequest(method,
httpsFlag,
host,
port,
url,
query,
header,
body,
HttpResponse(),
timeout,
maxTryCnt,
false,
std::nullopt,
std::move(socket)),
mItem(item) {}

bool IsContextValid() const override { return true; }
Expand Down
1 change: 1 addition & 0 deletions docs/cn/configuration/system-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
| `ebpf.receive_event_chan_cap` | Int | 用于接收内核事件的队列大小,默认为 4096 |
| `ebpf.admin_config.debug_mode` | Bool | 是否开启 ebpf debug 模式,默认为 false |
| `ebpf.admin_config.log_level` | String | ebpf 相关的日志级别,包括 info warn 和 debug,默认为 warn |
| `curl_ip_dscp` | Int | 设置C++部分发送网络请求中的 DSCP 字段,取值范围为0到63。默认不设置。 |

### 典型配置

Expand Down

0 comments on commit 256b281

Please sign in to comment.