Skip to content

Commit

Permalink
impl(rest): support LRO operation types without name method (#14924)
Browse files Browse the repository at this point in the history
  • Loading branch information
scotthart authored Jan 10, 2025
1 parent bbb1355 commit 90d72c0
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 5 deletions.
70 changes: 70 additions & 0 deletions google/cloud/internal/async_rest_long_running_operation_custom.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,44 @@ future<StatusOr<ReturnType>> AsyncRestLongRunningOperation(
});
}

template <typename ReturnType, typename OperationType,
typename GetOperationRequestType, typename CancelOperationRequestType,
typename RequestType, typename StartFunctor, typename RetryPolicyType,
typename CompletionQueue>
future<StatusOr<ReturnType>> AsyncRestLongRunningOperation(
CompletionQueue cq, internal::ImmutableOptions options,
RequestType&& request, StartFunctor&& start,
AsyncRestPollLongRunningOperation<OperationType, GetOperationRequestType>
poll,
AsyncRestCancelLongRunningOperation<CancelOperationRequestType> cancel,
LongRunningOperationValueExtractor<ReturnType, OperationType>
value_extractor,
std::unique_ptr<RetryPolicyType> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy, Idempotency idempotent,
std::unique_ptr<PollingPolicy> polling_policy, char const* location,
std::function<bool(OperationType const&)> is_operation_done,
std::function<void(std::string const&, GetOperationRequestType&)>
get_request_set_operation_name,
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name,
std::function<std::string(StatusOr<OperationType> const&)> operation_name) {
auto operation =
AsyncRestRetryLoop(std::move(retry_policy), std::move(backoff_policy),
idempotent, cq, std::forward<StartFunctor>(start),
options, std::forward<RequestType>(request), location);
auto loc = std::string{location};
return AsyncRestPollingLoop<OperationType, GetOperationRequestType,
CancelOperationRequestType>(
std::move(cq), std::move(options), std::move(operation),
std::move(poll), std::move(cancel), std::move(polling_policy),
std::move(location), is_operation_done,
get_request_set_operation_name, cancel_request_set_operation_name,
operation_name)
.then([value_extractor, loc](future<StatusOr<OperationType>> g) {
return value_extractor(g.get(), loc);
});
}

/*
* AsyncAwaitRestLongRunningOperation for services that do not conform to
* AIP-151.
Expand Down Expand Up @@ -105,6 +143,38 @@ future<StatusOr<ReturnType>> AsyncRestAwaitLongRunningOperation(
});
}

template <typename ReturnType, typename OperationType,
typename GetOperationRequestType, typename CancelOperationRequestType,
typename CompletionQueue>
future<StatusOr<ReturnType>> AsyncRestAwaitLongRunningOperation(
CompletionQueue cq, internal::ImmutableOptions options,
OperationType operation,
AsyncRestPollLongRunningOperation<OperationType, GetOperationRequestType>
poll,
AsyncRestCancelLongRunningOperation<CancelOperationRequestType> cancel,
LongRunningOperationValueExtractor<ReturnType, OperationType>
value_extractor,
std::unique_ptr<PollingPolicy> polling_policy, char const* location,
std::function<bool(OperationType const&)> is_operation_done,
std::function<void(std::string const&, GetOperationRequestType&)>
get_request_set_operation_name,
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name,
std::function<std::string(StatusOr<OperationType> const&)> operation_name) {
auto loc = std::string{location};
return AsyncRestPollingLoop<OperationType, GetOperationRequestType,
CancelOperationRequestType>(
std::move(cq), std::move(options),
make_ready_future(StatusOr<OperationType>(operation)),
std::move(poll), std::move(cancel), std::move(polling_policy),
std::move(location), is_operation_done,
get_request_set_operation_name, cancel_request_set_operation_name,
operation_name)
.then([value_extractor, loc](future<StatusOr<OperationType>> g) {
return value_extractor(g.get(), loc);
});
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace rest_internal
} // namespace cloud
Expand Down
24 changes: 24 additions & 0 deletions google/cloud/internal/async_rest_polling_loop_custom.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,30 @@ future<StatusOr<OperationType>> AsyncRestPollingLoop(
return loop->Start(std::move(op));
}

template <typename OperationType, typename GetOperationRequestType,
typename CancelOperationRequestType>
future<StatusOr<OperationType>> AsyncRestPollingLoop(
google::cloud::CompletionQueue cq, internal::ImmutableOptions options,
future<StatusOr<OperationType>> op,
AsyncRestPollLongRunningOperation<OperationType, GetOperationRequestType>
poll,
AsyncRestCancelLongRunningOperation<CancelOperationRequestType> cancel,
std::unique_ptr<PollingPolicy> polling_policy, std::string location,
std::function<bool(OperationType const&)> is_operation_done,
std::function<void(std::string const&, GetOperationRequestType&)>
get_request_set_operation_name,
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name,
std::function<std::string(StatusOr<OperationType> const&)> operation_name) {
auto loop = std::make_shared<AsyncRestPollingLoopImpl<
OperationType, GetOperationRequestType, CancelOperationRequestType>>(
std::move(cq), options, std::move(poll), std::move(cancel),
std::move(polling_policy), std::move(location), is_operation_done,
get_request_set_operation_name, cancel_request_set_operation_name,
operation_name);
return loop->Start(std::move(op));
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace rest_internal
} // namespace cloud
Expand Down
97 changes: 97 additions & 0 deletions google/cloud/internal/async_rest_polling_loop_custom_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ class BespokeOperationType {
std::string name_;
};

class BespokeOperationTypeNoNameMethod {
public:
bool is_done() const { return is_done_; }
void set_is_done(bool is_done) { is_done_ = is_done; }
std::string const& pseudo_name_function() const { return name_; }
void set_name(std::string name) { name_ = std::move(name); }
std::string* mutable_name() { return &name_; }
bool operator==(BespokeOperationTypeNoNameMethod const& other) const {
return is_done_ == other.is_done_ && name_ == other.name_;
}

private:
bool is_done_;
std::string name_;
};

class BespokeGetOperationRequestType {
public:
std::string const& name() const { return name_; }
Expand Down Expand Up @@ -176,6 +192,87 @@ TEST(AsyncRestPollingLoopTest, PollThenSuccessWithBespokeOperationTypes) {
EXPECT_THAT(*actual, Eq(expected));
}

class MockBespokeOperationNoNameMethodStub {
public:
MOCK_METHOD(future<StatusOr<BespokeOperationTypeNoNameMethod>>,
AsyncGetOperation,
(CompletionQueue&, std::unique_ptr<RestContext>, ImmutableOptions,
BespokeGetOperationRequestType const&),
());

MOCK_METHOD(future<Status>, AsyncCancelOperation,
(CompletionQueue&, std::unique_ptr<RestContext>, ImmutableOptions,
BespokeCancelOperationRequestType const&),
());
};

TEST(AsyncRestPollingLoopTest,
PollThenSuccessWithBespokeOperationTypeNoNameMethod) {
Response response;
response.set_seconds(123456);
BespokeOperationTypeNoNameMethod starting_op;
starting_op.set_name("test-op-name");
starting_op.set_is_done(false);
BespokeOperationTypeNoNameMethod expected = starting_op;
expected.set_is_done(true);

auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer)
.WillOnce([](std::chrono::nanoseconds) {
return make_ready_future(
make_status_or(std::chrono::system_clock::now()));
});
CompletionQueue cq(mock_cq);

auto mock = std::make_shared<MockBespokeOperationNoNameMethodStub>();
EXPECT_CALL(*mock, AsyncGetOperation)
.WillOnce([&](CompletionQueue&, std::unique_ptr<RestContext>,
ImmutableOptions const& options,
BespokeGetOperationRequestType const&) {
EXPECT_EQ(options->get<StringOption>(), CurrentTestName());
return make_ready_future(make_status_or(expected));
});
auto policy = std::make_unique<MockPollingPolicy>();
EXPECT_CALL(*policy, clone()).Times(0);
EXPECT_CALL(*policy, OnFailure).Times(0);
EXPECT_CALL(*policy, WaitPeriod)
.WillRepeatedly(Return(std::chrono::milliseconds(1)));

auto current = internal::MakeImmutableOptions(
Options{}.set<StringOption>(CurrentTestName()));
auto pending = AsyncRestPollingLoop<BespokeOperationTypeNoNameMethod,
BespokeGetOperationRequestType,
BespokeCancelOperationRequestType>(
std::move(cq), current, make_ready_future(make_status_or(starting_op)),
[mock](CompletionQueue& cq, std::unique_ptr<RestContext> context,
ImmutableOptions options,
BespokeGetOperationRequestType const& request) {
return mock->AsyncGetOperation(cq, std::move(context),
std::move(options), request);
},
[mock](CompletionQueue& cq, std::unique_ptr<RestContext> context,
ImmutableOptions options,
BespokeCancelOperationRequestType const& request) {
return mock->AsyncCancelOperation(cq, std::move(context),
std::move(options), request);
},
std::move(policy), "test-function",
[](BespokeOperationTypeNoNameMethod const& op) { return op.is_done(); },
[](std::string const& s, BespokeGetOperationRequestType& op) {
op.set_name(s);
},
[](std::string const& s, BespokeCancelOperationRequestType& op) {
op.set_name(s);
},
[](StatusOr<BespokeOperationTypeNoNameMethod> const& op) {
return op->pseudo_name_function();
});
internal::OptionsSpan overlay(Options{}.set<StringOption>("uh-oh"));
StatusOr<BespokeOperationTypeNoNameMethod> actual = pending.get();
ASSERT_THAT(actual, IsOk());
EXPECT_THAT(*actual, Eq(expected));
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace rest_internal
Expand Down
36 changes: 31 additions & 5 deletions google/cloud/internal/async_rest_polling_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ namespace cloud {
namespace rest_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

template <typename OperationType>
std::string DefaultOperationName(StatusOr<OperationType> const& operation) {
return operation->name();
}

template <typename OperationType, typename GetOperationRequestType,
typename CancelOperationRequestType>
class AsyncRestPollingLoopImpl
Expand All @@ -52,7 +57,8 @@ class AsyncRestPollingLoopImpl
std::function<void(std::string const&, GetOperationRequestType&)>
get_request_set_operation_name,
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name)
cancel_request_set_operation_name,
std::function<std::string(StatusOr<OperationType> const&)> operation_name)
: cq_(std::move(cq)),
options_(std::move(options)),
poll_(std::move(poll)),
Expand All @@ -64,7 +70,8 @@ class AsyncRestPollingLoopImpl
get_request_set_operation_name_(
std::move(get_request_set_operation_name)),
cancel_request_set_operation_name_(
std::move(cancel_request_set_operation_name)) {}
std::move(cancel_request_set_operation_name)),
operation_name_(std::move(operation_name)) {}

AsyncRestPollingLoopImpl(
google::cloud::CompletionQueue cq, internal::ImmutableOptions options,
Expand All @@ -85,6 +92,23 @@ class AsyncRestPollingLoopImpl
r.set_name(name);
}) {}

AsyncRestPollingLoopImpl(
google::cloud::CompletionQueue cq, internal::ImmutableOptions options,
AsyncRestPollLongRunningOperation<OperationType, GetOperationRequestType>
poll,
AsyncRestCancelLongRunningOperation<CancelOperationRequestType> cancel,
std::unique_ptr<PollingPolicy> polling_policy, std::string location,
std::function<bool(OperationType const&)> is_operation_done,
std::function<void(std::string const&, GetOperationRequestType&)>
get_request_set_operation_name,
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name)
: AsyncRestPollingLoopImpl(
std::move(cq), std::move(options), poll, cancel,
std::move(polling_policy), std::move(location), is_operation_done,
get_request_set_operation_name, cancel_request_set_operation_name,
DefaultOperationName<OperationType>) {}

future<StatusOr<OperationType>> Start(future<StatusOr<OperationType>> op) {
auto self = this->shared_from_this();
auto w = WeakFromThis();
Expand Down Expand Up @@ -131,15 +155,16 @@ class AsyncRestPollingLoopImpl

void OnStart(StatusOr<OperationType> op) {
if (!op) return promise_.set_value(std::move(op));
internal::AddSpanAttribute(*options_, "gl-cpp.LRO_name", op->name());
auto operation_name = operation_name_(op);
internal::AddSpanAttribute(*options_, "gl-cpp.LRO_name", operation_name);
if (is_operation_done_(*op)) return promise_.set_value(std::move(op));
GCP_LOG(DEBUG) << location_ << "() polling loop starting for "
<< op->name();
<< operation_name;
bool do_cancel = false;
{
std::unique_lock<std::mutex> lk(mu_);
std::swap(delayed_cancel_, do_cancel);
op_name_ = std::move(*op->mutable_name());
op_name_ = std::move(operation_name);
}
if (do_cancel) DoCancel();
return Wait();
Expand Down Expand Up @@ -212,6 +237,7 @@ class AsyncRestPollingLoopImpl
get_request_set_operation_name_;
std::function<void(std::string const&, CancelOperationRequestType&)>
cancel_request_set_operation_name_;
std::function<std::string(StatusOr<OperationType> const&)> operation_name_;

// `delayed_cancel_` and `op_name_`, in contrast, are also used from
// `DoCancel()`, which is called asynchronously, so they need locking.
Expand Down

0 comments on commit 90d72c0

Please sign in to comment.