Skip to content

Commit 908e931

Browse files
committed
Rename and reorganize output namespace
Simplify naming for classes in the output namespace. Create dedicated namespaces for gprc and log implementations of IClient.
1 parent 7662fa9 commit 908e931

File tree

13 files changed

+166
-144
lines changed

13 files changed

+166
-144
lines changed

collector/lib/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ file(GLOB COLLECTOR_LIB_SRC_FILES
22
${CMAKE_CURRENT_SOURCE_DIR}/*.cpp
33
${CMAKE_CURRENT_SOURCE_DIR}/system-inspector/*.cpp
44
${CMAKE_CURRENT_SOURCE_DIR}/output/*.cpp
5+
${CMAKE_CURRENT_SOURCE_DIR}/output/*/*.cpp
56
)
67

78
add_library(collector_lib ${DRIVER_HEADERS} ${COLLECTOR_LIB_SRC_FILES})

collector/lib/ProcessSignalHandler.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include "ProcessSignalFormatter.h"
88
#include "RateLimit.h"
99
#include "SignalHandler.h"
10-
#include "output/SensorClientFormatter.h"
10+
#include "output/Formatter.h"
1111
#include "system-inspector/Service.h"
1212

1313
// forward declarations
@@ -51,7 +51,7 @@ class ProcessSignalHandler : public SignalHandler {
5151

5252
output::Output* client_;
5353
ProcessSignalFormatter signal_formatter_;
54-
output::SensorClientFormatter sensor_formatter_;
54+
output::Formatter sensor_formatter_;
5555
system_inspector::Stats* stats_;
5656
RateLimitCache rate_limiter_;
5757
};

collector/lib/output/SensorClientFormatter.cpp renamed to collector/lib/output/Formatter.cpp

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#include "SensorClientFormatter.h"
1+
#include "Formatter.h"
22

33
#include <uuid/uuid.h>
44

@@ -16,9 +16,9 @@
1616
namespace collector::output {
1717

1818
using SignalStreamMessage = sensor::SignalStreamMessage;
19-
using Signal = SensorClientFormatter::Signal;
20-
using ProcessSignal = SensorClientFormatter::ProcessSignal;
21-
using LineageInfo = SensorClientFormatter::LineageInfo;
19+
using Signal = Formatter::Signal;
20+
using ProcessSignal = Formatter::ProcessSignal;
21+
using LineageInfo = Formatter::LineageInfo;
2222

2323
using Timestamp = google::protobuf::Timestamp;
2424
using TimeUtil = google::protobuf::util::TimeUtil;
@@ -57,17 +57,17 @@ std::string extract_proc_args(sinsp_threadinfo* tinfo) {
5757

5858
} // namespace
5959

60-
SensorClientFormatter::SensorClientFormatter(sinsp* inspector, const CollectorConfig& config)
60+
Formatter::Formatter(sinsp* inspector, const CollectorConfig& config)
6161
: event_names_(EventNames::GetInstance()),
6262
event_extractor_(std::make_unique<system_inspector::EventExtractor>()),
6363
container_metadata_(inspector),
6464
config_(config) {
6565
event_extractor_->Init(inspector);
6666
}
6767

68-
SensorClientFormatter::~SensorClientFormatter() = default;
68+
Formatter::~Formatter() = default;
6969

70-
const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_evt* event) {
70+
const sensor::ProcessSignal* Formatter::ToProtoMessage(sinsp_evt* event) {
7171
if (process_signals[event->get_type()] == ProcessSignalType::UNKNOWN_PROCESS_TYPE) {
7272
return nullptr;
7373
}
@@ -82,7 +82,7 @@ const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_evt* ev
8282
return CreateProcessSignal(event);
8383
}
8484

85-
const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
85+
const sensor::ProcessSignal* Formatter::ToProtoMessage(sinsp_threadinfo* tinfo) {
8686
Reset();
8787
if (!ValidateProcessDetails(tinfo)) {
8888
CLOG(INFO) << "Dropping process event: " << tinfo;
@@ -92,7 +92,7 @@ const sensor::ProcessSignal* SensorClientFormatter::ToProtoMessage(sinsp_threadi
9292
return CreateProcessSignal(tinfo);
9393
}
9494

95-
ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
95+
ProcessSignal* Formatter::CreateProcessSignal(sinsp_evt* event) {
9696
auto signal = AllocateRoot();
9797

9898
// set id
@@ -174,7 +174,7 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_evt* event) {
174174
return signal;
175175
}
176176

177-
ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_threadinfo* tinfo) {
177+
ProcessSignal* Formatter::CreateProcessSignal(sinsp_threadinfo* tinfo) {
178178
auto signal = AllocateRoot();
179179

180180
// set id
@@ -237,7 +237,7 @@ ProcessSignal* SensorClientFormatter::CreateProcessSignal(sinsp_threadinfo* tinf
237237
return signal;
238238
}
239239

240-
std::string SensorClientFormatter::ToString(sinsp_evt* event) {
240+
std::string Formatter::ToString(sinsp_evt* event) {
241241
std::stringstream ss;
242242
const std::string* path = event_extractor_->get_exepath(event);
243243
const std::string* name = event_extractor_->get_comm(event);
@@ -254,7 +254,7 @@ std::string SensorClientFormatter::ToString(sinsp_evt* event) {
254254
return ss.str();
255255
}
256256

257-
bool SensorClientFormatter::ValidateProcessDetails(const sinsp_threadinfo* tinfo) {
257+
bool Formatter::ValidateProcessDetails(const sinsp_threadinfo* tinfo) {
258258
if (tinfo == nullptr) {
259259
return false;
260260
}
@@ -266,11 +266,11 @@ bool SensorClientFormatter::ValidateProcessDetails(const sinsp_threadinfo* tinfo
266266
return true;
267267
}
268268

269-
bool SensorClientFormatter::ValidateProcessDetails(sinsp_evt* event) {
269+
bool Formatter::ValidateProcessDetails(sinsp_evt* event) {
270270
return ValidateProcessDetails(event->get_thread_info());
271271
}
272272

273-
void SensorClientFormatter::UpdateLineageStats(const std::vector<LineageInfo>& lineage) {
273+
void Formatter::UpdateLineageStats(const std::vector<LineageInfo>& lineage) {
274274
int string_total = std::accumulate(lineage.cbegin(), lineage.cend(), 0, [](int acc, const auto& l) {
275275
return acc + l.parent_exec_file_path().size();
276276
});
@@ -281,7 +281,7 @@ void SensorClientFormatter::UpdateLineageStats(const std::vector<LineageInfo>& l
281281
COUNTER_ADD(CollectorStats::process_lineage_string_total, string_total);
282282
}
283283

284-
std::vector<LineageInfo> SensorClientFormatter::GetProcessLineage(sinsp_threadinfo* tinfo) {
284+
std::vector<LineageInfo> Formatter::GetProcessLineage(sinsp_threadinfo* tinfo) {
285285
std::vector<LineageInfo> lineage;
286286
if (tinfo == nullptr) {
287287
return lineage;

collector/lib/output/SensorClientFormatter.h renamed to collector/lib/output/Formatter.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
#ifndef SENSOR_CLIENT_FORMATTER_H
2-
#define SENSOR_CLIENT_FORMATTER_H
1+
#ifndef OUTPUT_FORMATTER_H
2+
#define OUTPUT_FORMATTER_H
33

44
#include <memory>
55

@@ -22,15 +22,15 @@ class EventExtractor;
2222

2323
namespace collector::output {
2424

25-
class SensorClientFormatter : public ProtoSignalFormatter<sensor::ProcessSignal> {
25+
class Formatter : public ProtoSignalFormatter<sensor::ProcessSignal> {
2626
public:
27-
SensorClientFormatter(const SensorClientFormatter&) = delete;
28-
SensorClientFormatter(SensorClientFormatter&&) = delete;
29-
SensorClientFormatter& operator=(const SensorClientFormatter&) = delete;
30-
SensorClientFormatter& operator=(SensorClientFormatter&&) = delete;
31-
virtual ~SensorClientFormatter();
27+
Formatter(const Formatter&) = delete;
28+
Formatter(Formatter&&) = delete;
29+
Formatter& operator=(const Formatter&) = delete;
30+
Formatter& operator=(Formatter&&) = delete;
31+
virtual ~Formatter();
3232

33-
SensorClientFormatter(sinsp* inspector, const CollectorConfig& config);
33+
Formatter(sinsp* inspector, const CollectorConfig& config);
3434

3535
using Signal = v1::Signal;
3636
using ProcessSignal = sensor::ProcessSignal;

collector/lib/output/IClient.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#ifndef OUTPUT_ICLIENT_H
2+
#define OUTPUT_ICLIENT_H
3+
4+
#include "internalapi/sensor/collector_iservice.grpc.pb.h"
5+
6+
#include "SignalHandler.h"
7+
8+
namespace collector::output {
9+
10+
class IClient {
11+
public:
12+
using Service = sensor::CollectorService;
13+
14+
IClient() = default;
15+
IClient(const IClient&) = default;
16+
IClient(IClient&&) = delete;
17+
IClient& operator=(const IClient&) = default;
18+
IClient& operator=(IClient&&) = delete;
19+
virtual ~IClient() = default;
20+
21+
/**
22+
* Recreate the internal state of the object to allow communication.
23+
*
24+
* Mostly useful for handling gRPC reconnections.
25+
*
26+
* @returns true if the refresh was succesful, false otherwise.
27+
*/
28+
virtual bool Recreate() = 0;
29+
30+
/**
31+
* Send a message to sensor through the iservice.
32+
*
33+
* @param msg The message to be sent to sensor.
34+
* @returns A SignalHandler::Result with the outcome of the send
35+
* operation.
36+
*/
37+
virtual SignalHandler::Result SendMsg(const sensor::ProcessSignal& msg) = 0;
38+
};
39+
40+
} // namespace collector::output
41+
42+
#endif // OUTPUT_ICLIENT_H

collector/lib/output/Output.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "Output.h"
22

33
#include "GRPCUtil.h"
4+
#include "output/grpc/Client.h"
5+
#include "output/log/Client.h"
46

57
namespace collector::output {
68

@@ -10,7 +12,7 @@ Output::Output(const CollectorConfig& config)
1012
channel_ = config.grpc_channel;
1113

1214
if (use_sensor_client_) {
13-
auto sensor_client = std::make_unique<SensorClient>(channel_);
15+
auto sensor_client = std::make_unique<grpc::Client>(channel_);
1416
sensor_clients_.emplace_back(std::move(sensor_client));
1517
} else {
1618
auto signal_client = std::make_unique<SignalServiceClient>(channel_);
@@ -20,7 +22,7 @@ Output::Output(const CollectorConfig& config)
2022

2123
if (config.grpc_channel == nullptr || config.UseStdout()) {
2224
if (use_sensor_client_) {
23-
auto sensor_client = std::make_unique<SensorClientStdout>();
25+
auto sensor_client = std::make_unique<log::Client>();
2426
sensor_clients_.emplace_back(std::move(sensor_client));
2527
} else {
2628
auto signal_client = std::make_unique<StdoutSignalServiceClient>();

collector/lib/output/Output.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
#ifndef COLLECTOR_OUTPUT_H
2-
#define COLLECTOR_OUTPUT_H
1+
#ifndef OUTPUT_H
2+
#define OUTPUT_H
33

44
#include <variant>
55

66
#include "internalapi/sensor/signal_iservice.pb.h"
77

88
#include "CollectorConfig.h"
9-
#include "SensorClient.h"
9+
#include "IClient.h"
1010
#include "SignalHandler.h"
1111
#include "SignalServiceClient.h"
1212
#include "StoppableThread.h"
@@ -32,7 +32,7 @@ class Output {
3232
}
3333

3434
// Constructor for tests
35-
Output(std::unique_ptr<ISensorClient>&& sensor_client,
35+
Output(std::unique_ptr<IClient>&& sensor_client,
3636
std::unique_ptr<ISignalServiceClient>&& signal_client) {
3737
sensor_clients_.emplace_back(std::move(sensor_client));
3838
signal_clients_.emplace_back(std::move(signal_client));
@@ -65,7 +65,7 @@ class Output {
6565
SignalHandler::Result SensorOutput(const sensor::ProcessSignal& msg);
6666
SignalHandler::Result SignalOutput(const sensor::SignalStreamMessage& msg);
6767

68-
std::vector<std::unique_ptr<ISensorClient>> sensor_clients_;
68+
std::vector<std::unique_ptr<IClient>> sensor_clients_;
6969
std::vector<std::unique_ptr<ISignalServiceClient>> signal_clients_;
7070

7171
bool use_sensor_client_ = true;

collector/lib/output/SensorClient.h

Lines changed: 0 additions & 88 deletions
This file was deleted.

collector/lib/output/SensorClient.cpp renamed to collector/lib/output/grpc/Client.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
#include "SensorClient.h"
1+
#include "Client.h"
22

33
#include "Logging.h"
44

5-
namespace collector::output {
6-
bool SensorClient::Recreate() {
7-
context_ = std::make_unique<grpc::ClientContext>();
5+
namespace collector::output::grpc {
6+
bool Client::Recreate() {
7+
context_ = std::make_unique<::grpc::ClientContext>();
88
writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::CollectorService::Stub::AsyncPushProcesses, channel_, context_.get());
99
if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) {
1010
CLOG(ERROR) << "Signal stream not ready after 30 seconds. Retrying ...";
@@ -18,7 +18,7 @@ bool SensorClient::Recreate() {
1818
return true;
1919
}
2020

21-
SignalHandler::Result SensorClient::SendMsg(const sensor::ProcessSignal& msg) {
21+
SignalHandler::Result Client::SendMsg(const sensor::ProcessSignal& msg) {
2222
if (!stream_active_.load(std::memory_order_acquire)) {
2323
CLOG_THROTTLED(ERROR, std::chrono::seconds(10))
2424
<< "GRPC stream is not established";
@@ -44,4 +44,4 @@ SignalHandler::Result SensorClient::SendMsg(const sensor::ProcessSignal& msg) {
4444

4545
return SignalHandler::PROCESSED;
4646
}
47-
} // namespace collector::output
47+
} // namespace collector::output::grpc

0 commit comments

Comments
 (0)