Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl(rest): support LRO operation types without name method #14924

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,44 @@ future<StatusOr<ReturnType>> AsyncRestLongRunningOperation(
});
}

template <typename ReturnType, typename OperationType,
typename GetOperationRequestType, typename CancelOperationRequestType,
typename RequestType, typename StartFunctor, typename RetryPolicyType,
typename CompletionQueue>
future<StatusOr<ReturnType>> AsyncRestLongRunningOperation(
CompletionQueue cq, internal::ImmutableOptions options,
RequestType&& request, StartFunctor&& start,
AsyncRestPollLongRunningOperation<OperationType, GetOperationRequestType>
poll,
AsyncRestCancelLongRunningOperation<CancelOperationRequestType> cancel,
LongRunningOperationValueExtractor<ReturnType, OperationType>
value_extractor,
std::unique_ptr<RetryPolicyType> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy, Idempotency idempotent,
std::unique_ptr<PollingPolicy> polling_policy, char const* location,
std::function<bool(OperationType const&)> is_operation_done,
std::function<void(std::string const&, GetOperationRequestType&)>
get_request_set_operation_name,
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name,
std::function<std::string(StatusOr<OperationType> const&)> operation_name) {
auto operation =
AsyncRestRetryLoop(std::move(retry_policy), std::move(backoff_policy),
idempotent, cq, std::forward<StartFunctor>(start),
options, std::forward<RequestType>(request), location);
auto loc = std::string{location};
return AsyncRestPollingLoop<OperationType, GetOperationRequestType,
CancelOperationRequestType>(
std::move(cq), std::move(options), std::move(operation),
std::move(poll), std::move(cancel), std::move(polling_policy),
std::move(location), is_operation_done,
get_request_set_operation_name, cancel_request_set_operation_name,
operation_name)
.then([value_extractor, loc](future<StatusOr<OperationType>> g) {
return value_extractor(g.get(), loc);
});
}

/*
* AsyncAwaitRestLongRunningOperation for services that do not conform to
* AIP-151.
Expand Down Expand Up @@ -105,6 +143,38 @@ future<StatusOr<ReturnType>> AsyncRestAwaitLongRunningOperation(
});
}

template <typename ReturnType, typename OperationType,
typename GetOperationRequestType, typename CancelOperationRequestType,
typename CompletionQueue>
future<StatusOr<ReturnType>> AsyncRestAwaitLongRunningOperation(
CompletionQueue cq, internal::ImmutableOptions options,
OperationType operation,
AsyncRestPollLongRunningOperation<OperationType, GetOperationRequestType>
poll,
AsyncRestCancelLongRunningOperation<CancelOperationRequestType> cancel,
LongRunningOperationValueExtractor<ReturnType, OperationType>
value_extractor,
std::unique_ptr<PollingPolicy> polling_policy, char const* location,
std::function<bool(OperationType const&)> is_operation_done,
std::function<void(std::string const&, GetOperationRequestType&)>
get_request_set_operation_name,
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name,
std::function<std::string(StatusOr<OperationType> const&)> operation_name) {
auto loc = std::string{location};
return AsyncRestPollingLoop<OperationType, GetOperationRequestType,
CancelOperationRequestType>(
std::move(cq), std::move(options),
make_ready_future(StatusOr<OperationType>(operation)),
std::move(poll), std::move(cancel), std::move(polling_policy),
std::move(location), is_operation_done,
get_request_set_operation_name, cancel_request_set_operation_name,
operation_name)
.then([value_extractor, loc](future<StatusOr<OperationType>> g) {
return value_extractor(g.get(), loc);
});
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace rest_internal
} // namespace cloud
Expand Down
24 changes: 24 additions & 0 deletions google/cloud/internal/async_rest_polling_loop_custom.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,30 @@ future<StatusOr<OperationType>> AsyncRestPollingLoop(
return loop->Start(std::move(op));
}

template <typename OperationType, typename GetOperationRequestType,
typename CancelOperationRequestType>
future<StatusOr<OperationType>> AsyncRestPollingLoop(
google::cloud::CompletionQueue cq, internal::ImmutableOptions options,
future<StatusOr<OperationType>> op,
AsyncRestPollLongRunningOperation<OperationType, GetOperationRequestType>
poll,
AsyncRestCancelLongRunningOperation<CancelOperationRequestType> cancel,
std::unique_ptr<PollingPolicy> polling_policy, std::string location,
std::function<bool(OperationType const&)> is_operation_done,
std::function<void(std::string const&, GetOperationRequestType&)>
get_request_set_operation_name,
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name,
std::function<std::string(StatusOr<OperationType> const&)> operation_name) {
auto loop = std::make_shared<AsyncRestPollingLoopImpl<
OperationType, GetOperationRequestType, CancelOperationRequestType>>(
std::move(cq), options, std::move(poll), std::move(cancel),
std::move(polling_policy), std::move(location), is_operation_done,
get_request_set_operation_name, cancel_request_set_operation_name,
operation_name);
return loop->Start(std::move(op));
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace rest_internal
} // namespace cloud
Expand Down
97 changes: 97 additions & 0 deletions google/cloud/internal/async_rest_polling_loop_custom_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ class BespokeOperationType {
std::string name_;
};

class BespokeOperationTypeNoNameMethod {
public:
bool is_done() const { return is_done_; }
void set_is_done(bool is_done) { is_done_ = is_done; }
std::string const& pseudo_name_function() const { return name_; }
void set_name(std::string name) { name_ = std::move(name); }
std::string* mutable_name() { return &name_; }
bool operator==(BespokeOperationTypeNoNameMethod const& other) const {
return is_done_ == other.is_done_ && name_ == other.name_;
}

private:
bool is_done_;
std::string name_;
};

class BespokeGetOperationRequestType {
public:
std::string const& name() const { return name_; }
Expand Down Expand Up @@ -176,6 +192,87 @@ TEST(AsyncRestPollingLoopTest, PollThenSuccessWithBespokeOperationTypes) {
EXPECT_THAT(*actual, Eq(expected));
}

class MockBespokeOperationNoNameMethodStub {
public:
MOCK_METHOD(future<StatusOr<BespokeOperationTypeNoNameMethod>>,
AsyncGetOperation,
(CompletionQueue&, std::unique_ptr<RestContext>, ImmutableOptions,
BespokeGetOperationRequestType const&),
());

MOCK_METHOD(future<Status>, AsyncCancelOperation,
(CompletionQueue&, std::unique_ptr<RestContext>, ImmutableOptions,
BespokeCancelOperationRequestType const&),
());
};

TEST(AsyncRestPollingLoopTest,
PollThenSuccessWithBespokeOperationTypeNoNameMethod) {
Response response;
response.set_seconds(123456);
BespokeOperationTypeNoNameMethod starting_op;
starting_op.set_name("test-op-name");
starting_op.set_is_done(false);
BespokeOperationTypeNoNameMethod expected = starting_op;
expected.set_is_done(true);

auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer)
.WillOnce([](std::chrono::nanoseconds) {
return make_ready_future(
make_status_or(std::chrono::system_clock::now()));
});
CompletionQueue cq(mock_cq);

auto mock = std::make_shared<MockBespokeOperationNoNameMethodStub>();
EXPECT_CALL(*mock, AsyncGetOperation)
.WillOnce([&](CompletionQueue&, std::unique_ptr<RestContext>,
ImmutableOptions const& options,
BespokeGetOperationRequestType const&) {
EXPECT_EQ(options->get<StringOption>(), CurrentTestName());
return make_ready_future(make_status_or(expected));
});
auto policy = std::make_unique<MockPollingPolicy>();
EXPECT_CALL(*policy, clone()).Times(0);
EXPECT_CALL(*policy, OnFailure).Times(0);
EXPECT_CALL(*policy, WaitPeriod)
.WillRepeatedly(Return(std::chrono::milliseconds(1)));

auto current = internal::MakeImmutableOptions(
Options{}.set<StringOption>(CurrentTestName()));
auto pending = AsyncRestPollingLoop<BespokeOperationTypeNoNameMethod,
BespokeGetOperationRequestType,
BespokeCancelOperationRequestType>(
std::move(cq), current, make_ready_future(make_status_or(starting_op)),
[mock](CompletionQueue& cq, std::unique_ptr<RestContext> context,
ImmutableOptions options,
BespokeGetOperationRequestType const& request) {
return mock->AsyncGetOperation(cq, std::move(context),
std::move(options), request);
},
[mock](CompletionQueue& cq, std::unique_ptr<RestContext> context,
ImmutableOptions options,
BespokeCancelOperationRequestType const& request) {
return mock->AsyncCancelOperation(cq, std::move(context),
std::move(options), request);
},
std::move(policy), "test-function",
[](BespokeOperationTypeNoNameMethod const& op) { return op.is_done(); },
[](std::string const& s, BespokeGetOperationRequestType& op) {
op.set_name(s);
},
[](std::string const& s, BespokeCancelOperationRequestType& op) {
op.set_name(s);
},
[](StatusOr<BespokeOperationTypeNoNameMethod> const& op) {
return op->pseudo_name_function();
});
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
StatusOr<BespokeOperationTypeNoNameMethod> actual = pending.get();
ASSERT_THAT(actual, IsOk());
EXPECT_THAT(*actual, Eq(expected));
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace rest_internal
Expand Down
36 changes: 31 additions & 5 deletions google/cloud/internal/async_rest_polling_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ namespace cloud {
namespace rest_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

template <typename OperationType>
std::string DefaultOperationName(StatusOr<OperationType> const& operation) {
return operation->name();
}

template <typename OperationType, typename GetOperationRequestType,
typename CancelOperationRequestType>
class AsyncRestPollingLoopImpl
Expand All @@ -52,7 +57,8 @@ class AsyncRestPollingLoopImpl
std::function<void(std::string const&, GetOperationRequestType&)>
get_request_set_operation_name,
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name)
cancel_request_set_operation_name,
std::function<std::string(StatusOr<OperationType> const&)> operation_name)
: cq_(std::move(cq)),
options_(std::move(options)),
poll_(std::move(poll)),
Expand All @@ -64,7 +70,8 @@ class AsyncRestPollingLoopImpl
get_request_set_operation_name_(
std::move(get_request_set_operation_name)),
cancel_request_set_operation_name_(
std::move(cancel_request_set_operation_name)) {}
std::move(cancel_request_set_operation_name)),
operation_name_(std::move(operation_name)) {}

AsyncRestPollingLoopImpl(
google::cloud::CompletionQueue cq, internal::ImmutableOptions options,
Expand All @@ -85,6 +92,23 @@ class AsyncRestPollingLoopImpl
r.set_name(name);
}) {}

AsyncRestPollingLoopImpl(
google::cloud::CompletionQueue cq, internal::ImmutableOptions options,
AsyncRestPollLongRunningOperation<OperationType, GetOperationRequestType>
poll,
AsyncRestCancelLongRunningOperation<CancelOperationRequestType> cancel,
std::unique_ptr<PollingPolicy> polling_policy, std::string location,
std::function<bool(OperationType const&)> is_operation_done,
std::function<void(std::string const&, GetOperationRequestType&)>
get_request_set_operation_name,
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name)
: AsyncRestPollingLoopImpl(
std::move(cq), std::move(options), poll, cancel,
std::move(polling_policy), std::move(location), is_operation_done,
get_request_set_operation_name, cancel_request_set_operation_name,
DefaultOperationName<OperationType>) {}

future<StatusOr<OperationType>> Start(future<StatusOr<OperationType>> op) {
auto self = this->shared_from_this();
auto w = WeakFromThis();
Expand Down Expand Up @@ -131,15 +155,16 @@ class AsyncRestPollingLoopImpl

void OnStart(StatusOr<OperationType> op) {
if (!op) return promise_.set_value(std::move(op));
internal::AddSpanAttribute(*options_, "gl-cpp.LRO_name", op->name());
auto operation_name = operation_name_(op);
internal::AddSpanAttribute(*options_, "gl-cpp.LRO_name", operation_name);
if (is_operation_done_(*op)) return promise_.set_value(std::move(op));
GCP_LOG(DEBUG) << location_ << "() polling loop starting for "
<< op->name();
<< operation_name;
bool do_cancel = false;
{
std::unique_lock<std::mutex> lk(mu_);
std::swap(delayed_cancel_, do_cancel);
op_name_ = std::move(*op->mutable_name());
op_name_ = std::move(operation_name);
}
if (do_cancel) DoCancel();
return Wait();
Expand Down Expand Up @@ -212,6 +237,7 @@ class AsyncRestPollingLoopImpl
get_request_set_operation_name_;
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name_;
std::function<std::string(StatusOr<OperationType> const&)> operation_name_;

// `delayed_cancel_` and `op_name_`, in contrast, are also used from
// `DoCancel()`, which is called asynchronously, so they need locking.
Expand Down
Loading