forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.h
394 lines (338 loc) · 14.9 KB
/
server.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
#pragma once
#include <chrono>
#include <cstdint>
#include <list>
#include <memory>
#include <string>
#include "envoy/server/options.h"
#include "envoy/server/process_context.h"
#include "envoy/stats/stats.h"
#include "common/common/assert.h"
#include "common/common/lock_guard.h"
#include "common/common/logger.h"
#include "common/common/thread.h"
#include "server/listener_hooks.h"
#include "server/options_impl.h"
#include "server/server.h"
#include "test/integration/server_stats.h"
#include "test/integration/tcp_dump.h"
#include "test/test_common/test_time_system.h"
#include "test/test_common/utility.h"
#include "absl/synchronization/notification.h"
#include "absl/types/optional.h"
namespace Envoy {
namespace Server {
// Create OptionsImpl structures suitable for tests.
OptionsImpl createTestOptionsImpl(const std::string& config_path, const std::string& config_yaml,
Network::Address::IpVersion ip_version,
bool allow_unknown_static_fields = false,
bool reject_unknown_dynamic_fields = false,
uint32_t concurrency = 1);
class TestDrainManager : public DrainManager {
public:
// Server::DrainManager
bool drainClose() const override { return draining_; }
void startDrainSequence(std::function<void()>) override {}
void startParentShutdownSequence() override {}
bool draining_{};
};
class TestComponentFactory : public ComponentFactory {
public:
Server::DrainManagerPtr createDrainManager(Server::Instance&) override {
return Server::DrainManagerPtr{new Server::TestDrainManager()};
}
Runtime::LoaderPtr createRuntime(Server::Instance& server,
Server::Configuration::Initial& config) override {
return Server::InstanceUtil::createRuntime(server, config);
}
};
} // namespace Server
namespace Stats {
/**
* This is a wrapper for Scopes for the TestIsolatedStoreImpl to ensure new scopes do
* not interact with the store without grabbing the lock from TestIsolatedStoreImpl.
*/
class TestScopeWrapper : public Scope {
public:
TestScopeWrapper(Thread::MutexBasicLockable& lock, ScopePtr wrapped_scope)
: lock_(lock), wrapped_scope_(std::move(wrapped_scope)) {}
ScopePtr createScope(const std::string& name) override {
Thread::LockGuard lock(lock_);
return ScopePtr{new TestScopeWrapper(lock_, wrapped_scope_->createScope(name))};
}
void deliverHistogramToSinks(const Histogram& histogram, uint64_t value) override {
Thread::LockGuard lock(lock_);
wrapped_scope_->deliverHistogramToSinks(histogram, value);
}
Counter& counterFromStatName(StatName name) override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->counterFromStatName(name);
}
Gauge& gaugeFromStatName(StatName name, Gauge::ImportMode import_mode) override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->gaugeFromStatName(name, import_mode);
}
Histogram& histogramFromStatName(StatName name, Histogram::Unit unit) override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->histogramFromStatName(name, unit);
}
NullGaugeImpl& nullGauge(const std::string& str) override {
return wrapped_scope_->nullGauge(str);
}
Counter& counter(const std::string& name) override {
StatNameManagedStorage storage(name, symbolTable());
return counterFromStatName(storage.statName());
}
Gauge& gauge(const std::string& name, Gauge::ImportMode import_mode) override {
StatNameManagedStorage storage(name, symbolTable());
return gaugeFromStatName(storage.statName(), import_mode);
}
Histogram& histogram(const std::string& name, Histogram::Unit unit) override {
StatNameManagedStorage storage(name, symbolTable());
return histogramFromStatName(storage.statName(), unit);
}
OptionalCounter findCounter(StatName name) const override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->findCounter(name);
}
OptionalGauge findGauge(StatName name) const override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->findGauge(name);
}
OptionalHistogram findHistogram(StatName name) const override {
Thread::LockGuard lock(lock_);
return wrapped_scope_->findHistogram(name);
}
const SymbolTable& constSymbolTable() const override {
return wrapped_scope_->constSymbolTable();
}
SymbolTable& symbolTable() override { return wrapped_scope_->symbolTable(); }
private:
Thread::MutexBasicLockable& lock_;
ScopePtr wrapped_scope_;
};
/**
* This is a variant of the isolated store that has locking across all operations so that it can
* be used during the integration tests.
*/
class TestIsolatedStoreImpl : public StoreRoot {
public:
// Stats::Scope
Counter& counterFromStatName(StatName name) override {
Thread::LockGuard lock(lock_);
return store_.counterFromStatName(name);
}
Counter& counter(const std::string& name) override {
Thread::LockGuard lock(lock_);
return store_.counter(name);
}
ScopePtr createScope(const std::string& name) override {
Thread::LockGuard lock(lock_);
return ScopePtr{new TestScopeWrapper(lock_, store_.createScope(name))};
}
void deliverHistogramToSinks(const Histogram&, uint64_t) override {}
Gauge& gaugeFromStatName(StatName name, Gauge::ImportMode import_mode) override {
Thread::LockGuard lock(lock_);
return store_.gaugeFromStatName(name, import_mode);
}
Gauge& gauge(const std::string& name, Gauge::ImportMode import_mode) override {
Thread::LockGuard lock(lock_);
return store_.gauge(name, import_mode);
}
Histogram& histogramFromStatName(StatName name, Histogram::Unit unit) override {
Thread::LockGuard lock(lock_);
return store_.histogramFromStatName(name, unit);
}
NullGaugeImpl& nullGauge(const std::string& name) override { return store_.nullGauge(name); }
Histogram& histogram(const std::string& name, Histogram::Unit unit) override {
Thread::LockGuard lock(lock_);
return store_.histogram(name, unit);
}
OptionalCounter findCounter(StatName name) const override {
Thread::LockGuard lock(lock_);
return store_.findCounter(name);
}
OptionalGauge findGauge(StatName name) const override {
Thread::LockGuard lock(lock_);
return store_.findGauge(name);
}
OptionalHistogram findHistogram(StatName name) const override {
Thread::LockGuard lock(lock_);
return store_.findHistogram(name);
}
const SymbolTable& constSymbolTable() const override { return store_.constSymbolTable(); }
SymbolTable& symbolTable() override { return store_.symbolTable(); }
// Stats::Store
std::vector<CounterSharedPtr> counters() const override {
Thread::LockGuard lock(lock_);
return store_.counters();
}
std::vector<GaugeSharedPtr> gauges() const override {
Thread::LockGuard lock(lock_);
return store_.gauges();
}
std::vector<ParentHistogramSharedPtr> histograms() const override {
Thread::LockGuard lock(lock_);
return store_.histograms();
}
// Stats::StoreRoot
void addSink(Sink&) override {}
void setTagProducer(TagProducerPtr&&) override {}
void setStatsMatcher(StatsMatcherPtr&&) override {}
void initializeThreading(Event::Dispatcher&, ThreadLocal::Instance&) override {}
void shutdownThreading() override {}
void mergeHistograms(PostMergeCb) override {}
private:
mutable Thread::MutexBasicLockable lock_;
IsolatedStoreImpl store_;
};
} // namespace Stats
class IntegrationTestServer;
using IntegrationTestServerPtr = std::unique_ptr<IntegrationTestServer>;
/**
* Wrapper for running the real server for the purpose of integration tests.
* This class is an Abstract Base Class and delegates ownership and management
* of the actual envoy server to a derived class. See the documentation for
* createAndRunEnvoyServer().
*/
class IntegrationTestServer : public Logger::Loggable<Logger::Id::testing>,
public ListenerHooks,
public IntegrationTestServerStats,
public Server::ComponentFactory {
public:
static IntegrationTestServerPtr
create(const std::string& config_path, const Network::Address::IpVersion version,
std::function<void()> on_server_init_function, bool deterministic,
Event::TestTimeSystem& time_system, Api::Api& api,
bool defer_listener_finalization = false,
absl::optional<std::reference_wrapper<ProcessObject>> process_object = absl::nullopt,
bool allow_unknown_static_fields = false, bool reject_unknown_dynamic_fields = false,
uint32_t concurrency = 1);
// Note that the derived class is responsible for tearing down the server in its
// destructor.
~IntegrationTestServer() override;
void waitUntilListenersReady();
Server::TestDrainManager& drainManager() { return *drain_manager_; }
void setOnWorkerListenerAddedCb(std::function<void()> on_worker_listener_added) {
on_worker_listener_added_cb_ = std::move(on_worker_listener_added);
}
void setOnWorkerListenerRemovedCb(std::function<void()> on_worker_listener_removed) {
on_worker_listener_removed_cb_ = std::move(on_worker_listener_removed);
}
void onRuntimeCreated() override;
void start(const Network::Address::IpVersion version,
std::function<void()> on_server_init_function, bool deterministic,
bool defer_listener_finalization,
absl::optional<std::reference_wrapper<ProcessObject>> process_object,
bool allow_unknown_static_fields, bool reject_unknown_dynamic_fields,
uint32_t concurrency);
void waitForCounterEq(const std::string& name, uint64_t value) override {
TestUtility::waitForCounterEq(stat_store(), name, value, time_system_);
}
void waitForCounterGe(const std::string& name, uint64_t value) override {
TestUtility::waitForCounterGe(stat_store(), name, value, time_system_);
}
void waitForGaugeGe(const std::string& name, uint64_t value) override {
TestUtility::waitForGaugeGe(stat_store(), name, value, time_system_);
}
void waitForGaugeEq(const std::string& name, uint64_t value) override {
TestUtility::waitForGaugeEq(stat_store(), name, value, time_system_);
}
Stats::CounterSharedPtr counter(const std::string& name) override {
// When using the thread local store, only counters() is thread safe. This also allows us
// to test if a counter exists at all versus just defaulting to zero.
return TestUtility::findCounter(stat_store(), name);
}
Stats::GaugeSharedPtr gauge(const std::string& name) override {
// When using the thread local store, only gauges() is thread safe. This also allows us
// to test if a counter exists at all versus just defaulting to zero.
return TestUtility::findGauge(stat_store(), name);
}
std::vector<Stats::CounterSharedPtr> counters() override { return stat_store().counters(); }
std::vector<Stats::GaugeSharedPtr> gauges() override { return stat_store().gauges(); }
// ListenerHooks
void onWorkerListenerAdded() override;
void onWorkerListenerRemoved() override;
// Server::ComponentFactory
Server::DrainManagerPtr createDrainManager(Server::Instance&) override {
drain_manager_ = new Server::TestDrainManager();
return Server::DrainManagerPtr{drain_manager_};
}
Runtime::LoaderPtr createRuntime(Server::Instance& server,
Server::Configuration::Initial& config) override {
return Server::InstanceUtil::createRuntime(server, config);
}
// Should not be called until createAndRunEnvoyServer() is called.
virtual Server::Instance& server() PURE;
virtual Stats::Store& stat_store() PURE;
virtual Network::Address::InstanceConstSharedPtr admin_address() PURE;
protected:
IntegrationTestServer(Event::TestTimeSystem& time_system, Api::Api& api,
const std::string& config_path)
: time_system_(time_system), api_(api), config_path_(config_path) {}
// Create the running envoy server. This function will call serverReady() when the virtual
// functions server(), stat_store(), and admin_address() may be called, but before the server
// has been started.
// The subclass is also responsible for tearing down this server in its destructor.
virtual void createAndRunEnvoyServer(
OptionsImpl& options, Event::TimeSystem& time_system,
Network::Address::InstanceConstSharedPtr local_address, ListenerHooks& hooks,
Thread::BasicLockable& access_log_lock, Server::ComponentFactory& component_factory,
Runtime::RandomGeneratorPtr&& random_generator,
absl::optional<std::reference_wrapper<ProcessObject>> process_object) PURE;
// Will be called by subclass on server thread when the server is ready to be accessed. The
// server may not have been run yet, but all server access methods (server(), stat_store(),
// adminAddress()) will be available.
void serverReady();
private:
/**
* Runs the real server on a thread.
*/
void threadRoutine(const Network::Address::IpVersion version, bool deterministic,
absl::optional<std::reference_wrapper<ProcessObject>> process_object,
bool allow_unknown_static_fields, bool reject_unknown_dynamic_fields,
uint32_t concurrency);
Event::TestTimeSystem& time_system_;
Api::Api& api_;
const std::string config_path_;
Thread::ThreadPtr thread_;
Thread::CondVar listeners_cv_;
Thread::MutexBasicLockable listeners_mutex_;
uint64_t pending_listeners_;
ConditionalInitializer server_set_;
Server::TestDrainManager* drain_manager_{};
std::function<void()> on_worker_listener_added_cb_;
std::function<void()> on_worker_listener_removed_cb_;
TcpDumpPtr tcp_dump_;
};
// Default implementation of IntegrationTestServer
class IntegrationTestServerImpl : public IntegrationTestServer {
public:
IntegrationTestServerImpl(Event::TestTimeSystem& time_system, Api::Api& api,
const std::string& config_path)
: IntegrationTestServer(time_system, api, config_path) {}
~IntegrationTestServerImpl() override;
Server::Instance& server() override {
RELEASE_ASSERT(server_ != nullptr, "");
return *server_;
}
Stats::Store& stat_store() override {
RELEASE_ASSERT(stat_store_ != nullptr, "");
return *stat_store_;
}
Network::Address::InstanceConstSharedPtr admin_address() override { return admin_address_; }
private:
void createAndRunEnvoyServer(
OptionsImpl& options, Event::TimeSystem& time_system,
Network::Address::InstanceConstSharedPtr local_address, ListenerHooks& hooks,
Thread::BasicLockable& access_log_lock, Server::ComponentFactory& component_factory,
Runtime::RandomGeneratorPtr&& random_generator,
absl::optional<std::reference_wrapper<ProcessObject>> process_object) override;
// Owned by this class. An owning pointer is not used because the actual allocation is done
// on a stack in a non-main thread.
Server::Instance* server_{};
Stats::Store* stat_store_{};
Network::Address::InstanceConstSharedPtr admin_address_;
absl::Notification server_gone_;
};
} // namespace Envoy