diff --git a/ci/cloudbuild/dockerfiles/fedora-latest-cmake.Dockerfile b/ci/cloudbuild/dockerfiles/fedora-latest-cmake.Dockerfile index 88ade56..d3ad9ea 100644 --- a/ci/cloudbuild/dockerfiles/fedora-latest-cmake.Dockerfile +++ b/ci/cloudbuild/dockerfiles/fedora-latest-cmake.Dockerfile @@ -67,15 +67,20 @@ RUN curl -fsSL https://distfiles.ariadne.space/pkgconf/pkgconf-2.2.0.tar.gz | \ # set the search path. ENV PKG_CONFIG_PATH=/usr/local/lib64/pkgconfig:/usr/local/lib/pkgconfig:/usr/lib64/pkgconfig +# In order to work around https://github.com/llvm/llvm-project/issues/102443 we set +# these compiler env vars so that all our dependencies are built the same way. +ENV CC="clang" +ENV CXX="clang++" +ENV CXXFLAGS="-fclang-abi-compat=17" + # We disable the inline namespace because otherwise Abseil LTS updates break our # `check-api` build. WORKDIR /var/tmp/build RUN curl -fsSL https://github.com/abseil/abseil-cpp/archive/20240722.0.tar.gz | \ tar -xzf - --strip-components=1 && \ - sed -i 's/^#define ABSL_OPTION_USE_\(.*\) 2/#define ABSL_OPTION_USE_\1 0/' "absl/base/options.h" && \ - sed -i 's/^#define ABSL_OPTION_USE_INLINE_NAMESPACE 1$/#define ABSL_OPTION_USE_INLINE_NAMESPACE 0/' "absl/base/options.h" && \ cmake \ -DCMAKE_BUILD_TYPE="Release" \ + -DCMAKE_CXX_STANDARD=17 \ -DABSL_BUILD_TESTING=OFF \ -DBUILD_SHARED_LIBS=yes \ -GNinja -S . -B cmake-out && \ @@ -129,10 +134,11 @@ RUN curl -fsSL https://github.com/nlohmann/json/archive/v3.11.3.tar.gz | \ ldconfig && cd /var/tmp && rm -fr build WORKDIR /var/tmp/build/protobuf -RUN curl -fsSL https://github.com/protocolbuffers/protobuf/archive/v29.0.tar.gz | \ +RUN curl -fsSL https://github.com/protocolbuffers/protobuf/archive/v29.3.tar.gz | \ tar -xzf - --strip-components=1 && \ cmake \ -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_CXX_STANDARD=17 \ -DBUILD_SHARED_LIBS=yes \ -Dprotobuf_BUILD_TESTS=OFF \ -Dprotobuf_ABSL_PROVIDER=package \ @@ -144,7 +150,7 @@ WORKDIR /var/tmp/build/ RUN curl -fsSL https://github.com/open-telemetry/opentelemetry-cpp/archive/v1.18.0.tar.gz | \ tar -xzf - --strip-components=1 && \ cmake \ - -DCMAKE_CXX_STANDARD=14 \ + -DCMAKE_CXX_STANDARD=17 \ -DCMAKE_BUILD_TYPE=Release \ -DCMAKE_POSITION_INDEPENDENT_CODE=TRUE \ -DBUILD_SHARED_LIBS=ON \ @@ -159,10 +165,11 @@ RUN curl -fsSL https://github.com/open-telemetry/opentelemetry-cpp/archive/v1.18 WORKDIR /var/tmp/build/grpc RUN dnf makecache && dnf install -y c-ares-devel re2-devel -RUN curl -fsSL https://github.com/grpc/grpc/archive/v1.67.0.tar.gz | \ +RUN curl -fsSL https://github.com/grpc/grpc/archive/v1.69.0.tar.gz | \ tar -xzf - --strip-components=1 && \ cmake \ -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_CXX_STANDARD=17 \ -DBUILD_SHARED_LIBS=ON \ -DgRPC_INSTALL=ON \ -DgRPC_BUILD_TESTS=OFF \ @@ -184,10 +191,12 @@ RUN curl -fsSL https://github.com/grpc/grpc/archive/v1.67.0.tar.gz | \ # files. ENV LD_LIBRARY_PATH=/usr/local/lib:/usr/local/lib64:${LD_LIBRARY_PATH} WORKDIR /var/tmp/build/google-cloud-cpp -RUN curl -fsSL https://github.com/googleapis/google-cloud-cpp/archive/v2.31.0.tar.gz | \ +RUN curl -fsSL https://github.com/googleapis/google-cloud-cpp/archive/v2.33.0.tar.gz | \ tar -xzf - --strip-components=1 && \ cmake \ -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_CXX_STANDARD=17 \ + -DGOOGLE_CLOUD_CPP_ENABLE_CLANG_ABI_COMPAT_17=ON \ -DBUILD_SHARED_LIBS=yes \ -DGOOGLE_CLOUD_CPP_ENABLE="bigquerycontrol,bigquery" \ -GNinja -S . -B cmake-out && \ @@ -198,6 +207,7 @@ WORKDIR /var/tmp/build/arrow RUN curl -fsSL https://github.com/apache/arrow/archive/apache-arrow-18.1.0.tar.gz | \ tar -xzf - --strip-components=1 && \ cmake \ + -DCMAKE_CXX_STANDARD=17 \ -GNinja -S cpp -B cmake-out \ --preset ninja-release-minimal \ -DARROW_BUILD_STATIC=ON && \ diff --git a/google/cloud/bigquery_unified/CMakeLists.txt b/google/cloud/bigquery_unified/CMakeLists.txt index ae1e88f..aaf28f2 100644 --- a/google/cloud/bigquery_unified/CMakeLists.txt +++ b/google/cloud/bigquery_unified/CMakeLists.txt @@ -135,6 +135,7 @@ set(bigquery_unified_library_files connection.h idempotency_policy.cc idempotency_policy.h + internal/async_rest_long_running_operation_custom.h internal/connection_impl.cc internal/connection_impl.h internal/default_options.cc diff --git a/google/cloud/bigquery_unified/client.cc b/google/cloud/bigquery_unified/client.cc index 5abc202..9281679 100644 --- a/google/cloud/bigquery_unified/client.cc +++ b/google/cloud/bigquery_unified/client.cc @@ -68,15 +68,13 @@ StreamRange Client::ListJobs( } future> Client::InsertJob( - google::cloud::bigquery::v2::InsertJobRequest const& request, - Options opts) { - return make_ready_future>( - internal::UnimplementedError("not implemented")); + google::cloud::bigquery::v2::Job const& job, Options opts) { + return connection_->InsertJob( + job, internal::MergeOptions(std::move(opts), options_)); } StatusOr Client::InsertJob( - google::cloud::NoAwaitTag, - google::cloud::bigquery::v2::InsertJobRequest const& request, + google::cloud::NoAwaitTag, google::cloud::bigquery::v2::Job const& job, Options opts) { return internal::UnimplementedError("not implemented"); } diff --git a/google/cloud/bigquery_unified/client.h b/google/cloud/bigquery_unified/client.h index ea75a72..728dcf4 100644 --- a/google/cloud/bigquery_unified/client.h +++ b/google/cloud/bigquery_unified/client.h @@ -92,27 +92,25 @@ class Client { google::cloud::bigquery::v2::JobReference const& job_reference, Options opts = {}); + // DeleteJob + Status DeleteJob(google::cloud::bigquery::v2::DeleteJobRequest const& request, + Options opts = {}); + // GetJob StatusOr GetJob( google::cloud::bigquery::v2::GetJobRequest const& request, Options opts = {}); - // DeleteJob - Status DeleteJob(google::cloud::bigquery::v2::DeleteJobRequest const& request, - Options opts = {}); - // ListJobs StreamRange ListJobs( google::cloud::bigquery::v2::ListJobsRequest request, Options opts = {}); // InsertJob future> InsertJob( - google::cloud::bigquery::v2::InsertJobRequest const& request, - Options opts = {}); + google::cloud::bigquery::v2::Job const& job, Options opts = {}); StatusOr InsertJob( - google::cloud::NoAwaitTag, - google::cloud::bigquery::v2::InsertJobRequest const& request, + google::cloud::NoAwaitTag, google::cloud::bigquery::v2::Job const& job, Options opts = {}); future> InsertJob( diff --git a/google/cloud/bigquery_unified/connection.cc b/google/cloud/bigquery_unified/connection.cc index e0a8015..1ac643c 100644 --- a/google/cloud/bigquery_unified/connection.cc +++ b/google/cloud/bigquery_unified/connection.cc @@ -71,16 +71,14 @@ StreamRange Connection::ListJobs( // InsertJob future> Connection::InsertJob( - google::cloud::bigquery::v2::InsertJobRequest const& request, - Options opts) { + google::cloud::bigquery::v2::Job const& job, Options opts) { return google::cloud::make_ready_future< StatusOr>( Status(StatusCode::kUnimplemented, "not implemented")); } StatusOr Connection::InsertJob( - google::cloud::NoAwaitTag, - google::cloud::bigquery::v2::InsertJobRequest const& request, + google::cloud::NoAwaitTag, google::cloud::bigquery::v2::Job const& job, Options opts) { return internal::UnimplementedError("not implemented"); } diff --git a/google/cloud/bigquery_unified/connection.h b/google/cloud/bigquery_unified/connection.h index 4a81972..b538658 100644 --- a/google/cloud/bigquery_unified/connection.h +++ b/google/cloud/bigquery_unified/connection.h @@ -77,12 +77,10 @@ class Connection { // InsertJob virtual future> InsertJob( - google::cloud::bigquery::v2::InsertJobRequest const& request, - Options opts); + google::cloud::bigquery::v2::Job const& job, Options opts); virtual StatusOr InsertJob( - google::cloud::NoAwaitTag, - google::cloud::bigquery::v2::InsertJobRequest const& request, + google::cloud::NoAwaitTag, google::cloud::bigquery::v2::Job const& job, Options opts); virtual future> InsertJob( diff --git a/google/cloud/bigquery_unified/google_cloud_cpp_bigquery_bigquery_unified.bzl b/google/cloud/bigquery_unified/google_cloud_cpp_bigquery_bigquery_unified.bzl index af49ced..3245fa6 100644 --- a/google/cloud/bigquery_unified/google_cloud_cpp_bigquery_bigquery_unified.bzl +++ b/google/cloud/bigquery_unified/google_cloud_cpp_bigquery_bigquery_unified.bzl @@ -20,6 +20,7 @@ google_cloud_cpp_bigquery_bigquery_unified_hdrs = [ "client.h", "connection.h", "idempotency_policy.h", + "internal/async_rest_long_running_operation_custom.h", "internal/connection_impl.h", "internal/default_options.h", "job_options.h", diff --git a/google/cloud/bigquery_unified/integration_tests/job_integration_test.cc b/google/cloud/bigquery_unified/integration_tests/job_integration_test.cc index 66066e8..66cb16f 100644 --- a/google/cloud/bigquery_unified/integration_tests/job_integration_test.cc +++ b/google/cloud/bigquery_unified/integration_tests/job_integration_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/bigquery_unified/client.h" +#include "google/cloud/bigquery_unified/job_options.h" #include "google/cloud/bigquery_unified/testing_util/status_matchers.h" #include "google/cloud/bigquery_unified/version.h" #include "google/cloud/internal/getenv.h" @@ -22,6 +23,7 @@ namespace google::cloud::bigquery_unified { GOOGLE_CLOUD_CPP_BIGQUERY_INLINE_NAMESPACE_BEGIN namespace { +using ::google::cloud::bigquery_unified::testing_util::IsOk; using ::testing::Eq; class JobIntegrationTest : public ::testing::Test { @@ -34,6 +36,35 @@ class JobIntegrationTest : public ::testing::Test { std::string project_id_; }; +TEST_F(JobIntegrationTest, InsertJob) { + namespace bigquery_proto = google::cloud::bigquery::v2; + + bigquery_proto::JobConfigurationQuery query; + query.mutable_use_legacy_sql()->set_value(false); + query.set_query( + "SELECT name, state, year, sum(number) as total " + "FROM `bigquery-public-data.usa_names.usa_1910_2013` " + "WHERE year >= 2000 " + "GROUP BY name, state, year " + "LIMIT 100"); + + bigquery_proto::JobConfiguration config; + *config.mutable_query() = query; + config.mutable_labels()->insert({"test_suite", "job_integration_test"}); + config.mutable_labels()->insert({"test_case", "insert_job"}); + + bigquery_proto::Job query_job_request; + *query_job_request.mutable_configuration() = config; + std::shared_ptr connection = + google::cloud::bigquery_unified::MakeConnection(); + auto client = google::cloud::bigquery_unified::Client(connection); + + auto options = + google::cloud::Options{}.set(project_id_); + auto query_job = client.InsertJob(query_job_request, options).get(); + EXPECT_THAT(query_job, IsOk()); +} + TEST_F(JobIntegrationTest, GetJob) { namespace bigquery_proto = google::cloud::bigquery::v2; std::shared_ptr connection = @@ -41,7 +72,7 @@ TEST_F(JobIntegrationTest, GetJob) { auto client = google::cloud::bigquery_unified::Client(connection); // TODO: hard coding this id is brittle but currently necessary. - std::string const job_id = "job_TyRhPS6z-5_e9JSwtT8ieuwDOdLD"; + std::string const job_id = "job_XORZAqWx6R3xcDQCL9K_-2peocI7"; bigquery_proto::GetJobRequest get_request; get_request.set_project_id(project_id_); get_request.set_job_id(job_id); diff --git a/google/cloud/bigquery_unified/internal/async_rest_long_running_operation_custom.h b/google/cloud/bigquery_unified/internal/async_rest_long_running_operation_custom.h new file mode 100644 index 0000000..eddf1aa --- /dev/null +++ b/google/cloud/bigquery_unified/internal/async_rest_long_running_operation_custom.h @@ -0,0 +1,295 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_BIGQUERY_GOOGLE_CLOUD_BIGQUERY_UNIFIED_INTERNAL_ASYNC_REST_LONG_RUNNING_OPERATION_CUSTOM_H +#define GOOGLE_CLOUD_CPP_BIGQUERY_GOOGLE_CLOUD_BIGQUERY_UNIFIED_INTERNAL_ASYNC_REST_LONG_RUNNING_OPERATION_CUSTOM_H + +#include "google/cloud/bigquery_unified/version.h" +#include "google/cloud/future.h" +#include "google/cloud/internal/async_rest_long_running_operation.h" +#include "google/cloud/polling_policy.h" +#include "google/cloud/status_or.h" +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigquery_unified_internal { +GOOGLE_CLOUD_CPP_BIGQUERY_INLINE_NAMESPACE_BEGIN + +// TODO: These types are mostly copy/paste from +// google/cloud/internal/async_rest_long_running_operation_custom.h, +// google/cloud/internal/async_rest_polling_loop.h, and +// google/cloud/internal/async_rest_polling_loop_impl.h. +// However, we need to provide a mechanism to get the rpc name from an +// OperationType that does not have a `name` method. We do so here by providing +// a std::function that takes the OperationTypes as a parameter and returns a +// std::string. +// Once we update the code in google-cloud-cpp to accept a operation_name +// function and default it to OperationType::name, all this code can go away. + +template +using AsyncRestPollLongRunningOperation = + std::function>( + google::cloud::CompletionQueue&, + std::unique_ptr, internal::ImmutableOptions, + GetOperationRequestType const&)>; + +template +using AsyncRestCancelLongRunningOperation = std::function( + google::cloud::CompletionQueue&, + std::unique_ptr, internal::ImmutableOptions, + CancelOperationRequestType const&)>; + +template +class AsyncRestPollingLoopImpl + : public std::enable_shared_from_this> { + public: + AsyncRestPollingLoopImpl( + google::cloud::CompletionQueue cq, internal::ImmutableOptions options, + AsyncRestPollLongRunningOperation + poll, + AsyncRestCancelLongRunningOperation cancel, + std::unique_ptr polling_policy, std::string location, + std::function is_operation_done, + std::function + get_request_set_operation_name, + std::function + cancel_request_set_operation_name, + std::function const&)> operation_name) + : cq_(std::move(cq)), + options_(std::move(options)), + poll_(std::move(poll)), + cancel_(std::move(cancel)), + polling_policy_(std::move(polling_policy)), + location_(std::move(location)), + promise_(null_promise_t{}), + is_operation_done_(std::move(is_operation_done)), + get_request_set_operation_name_( + std::move(get_request_set_operation_name)), + cancel_request_set_operation_name_( + std::move(cancel_request_set_operation_name)), + operation_name_(std::move(operation_name)) {} + + future> Start(future> op) { + auto self = this->shared_from_this(); + auto w = WeakFromThis(); + promise_ = promise>( + [w, c = internal::CallContext{options_}]() mutable { + if (auto self = w.lock()) { + internal::ScopedCallContext scope(std::move(c)); + self->DoCancel(); + } + }); + op.then( + [self](future> f) { self->OnStart(f.get()); }); + return promise_.get_future(); + } + + private: + using TimerResult = future>; + + std::weak_ptr WeakFromThis() { + return this->shared_from_this(); + } + + void DoCancel() { + CancelOperationRequestType request; + { + std::unique_lock lk(mu_); + if (op_name_.empty()) { + delayed_cancel_ = true; // Wait for OnStart() to set `op_name_`. + return; + } + cancel_request_set_operation_name_(op_name_, request); + } + // Cancels are best effort, so we use weak pointers. + auto w = WeakFromThis(); + cancel_(cq_, std::make_unique(), options_, + request) + .then([w](future f) { + if (auto self = w.lock()) self->OnCancel(f.get()); + }); + } + + void OnCancel(Status const& status) { + GCP_LOG(DEBUG) << location_ << "() cancelled: " << status; + } + + void OnStart(StatusOr op) { + if (!op) return promise_.set_value(std::move(op)); + auto operation_name = operation_name_(op); + internal::AddSpanAttribute(*options_, "gl-cpp.LRO_name", operation_name); + if (is_operation_done_(*op)) return promise_.set_value(std::move(op)); + GCP_LOG(DEBUG) << location_ << "() polling loop starting for " + << operation_name; + bool do_cancel = false; + { + std::unique_lock lk(mu_); + std::swap(delayed_cancel_, do_cancel); + op_name_ = operation_name; + } + if (do_cancel) DoCancel(); + return Wait(); + } + + void Wait() { + std::chrono::milliseconds duration = polling_policy_->WaitPeriod(); + GCP_LOG(DEBUG) << location_ << "() polling loop waiting " + << duration.count() << "ms"; + auto self = this->shared_from_this(); + internal::TracedAsyncBackoff(cq_, *options_, duration, "Async Backoff") + .then([self](TimerResult f) { self->OnTimer(std::move(f)); }); + } + + void OnTimer(TimerResult f) { + GCP_LOG(DEBUG) << location_ << "() polling loop awakened"; + auto t = f.get(); + if (!t) return promise_.set_value(std::move(t).status()); + GetOperationRequestType request; + { + std::unique_lock lk(mu_); + get_request_set_operation_name_(op_name_, request); + } + auto self = this->shared_from_this(); + poll_(cq_, std::make_unique(), options_, + request) + .then([self](future> g) { + self->OnPoll(std::move(g)); + }); + } + + void OnPoll(future> f) { + GCP_LOG(DEBUG) << location_ << "() polling loop result"; + auto op = f.get(); + if (op && is_operation_done_(*op)) { + return promise_.set_value(*std::move(op)); + } + // Update the polling policy even on successful requests, so we can stop + // after too many polling attempts. + if (!polling_policy_->OnFailure(op.status())) { + if (op) { + // We should not be fabricating a `Status` value here. Rather, we + // should cancel the operation and wait for the next poll to return + // an accurate status to the user, otherwise they will have no idea + // how to react. But for now, we leave the operation running. It + // may eventually complete. + return promise_.set_value(internal::DeadlineExceededError( + location_ + "() - polling loop terminated by " + "polling policy", + GCP_ERROR_INFO())); + } + // This could be a transient error if the policy is exhausted. + return promise_.set_value(std::move(op).status()); + } + return Wait(); + } + + // These member variables are initialized in the constructor or from + // `Start()`, and then only used from the `On*()` callbacks, which are + // serialized, so they need no external synchronization. + google::cloud::CompletionQueue cq_; + internal::ImmutableOptions options_; + AsyncRestPollLongRunningOperation + poll_; + AsyncRestCancelLongRunningOperation cancel_; + std::unique_ptr polling_policy_; + std::string location_; + promise> promise_; + std::function is_operation_done_; + std::function + get_request_set_operation_name_; + std::function + cancel_request_set_operation_name_; + std::function const&)> operation_name_; + + // `delayed_cancel_` and `op_name_`, in contrast, are also used from + // `DoCancel()`, which is called asynchronously, so they need locking. + std::mutex mu_; + bool delayed_cancel_ = false; // GUARDED_BY(mu_) + std::string op_name_; // GUARDED_BY(mu_) +}; + +/** + * Customizable polling loop for services that do not conform to AIP-151. + */ +template +future> AsyncRestPollingLoop( + google::cloud::CompletionQueue cq, internal::ImmutableOptions options, + future> op, + AsyncRestPollLongRunningOperation + poll, + AsyncRestCancelLongRunningOperation cancel, + std::unique_ptr polling_policy, std::string location, + std::function is_operation_done, + std::function + get_request_set_operation_name, + std::function + cancel_request_set_operation_name, + std::function const&)> operation_name) { + auto loop = std::make_shared>( + std::move(cq), options, std::move(poll), std::move(cancel), + std::move(polling_policy), std::move(location), is_operation_done, + get_request_set_operation_name, cancel_request_set_operation_name, + operation_name); + return loop->Start(std::move(op)); +} + +/* + * AsyncAwaitRestLongRunningOperation for services that do not conform to + * AIP-151. + */ +template +future> AsyncRestAwaitLongRunningOperation( + CompletionQueue cq, internal::ImmutableOptions options, + OperationType operation, + AsyncRestPollLongRunningOperation + poll, + AsyncRestCancelLongRunningOperation cancel, + rest_internal::LongRunningOperationValueExtractor + value_extractor, + std::unique_ptr polling_policy, char const* location, + std::function is_operation_done, + std::function + get_request_set_operation_name, + std::function + cancel_request_set_operation_name, + std::function const&)> operation_name) { + auto loc = std::string{location}; + return AsyncRestPollingLoop( + std::move(cq), std::move(options), + make_ready_future(StatusOr(operation)), + std::move(poll), std::move(cancel), std::move(polling_policy), + std::move(location), is_operation_done, + get_request_set_operation_name, cancel_request_set_operation_name, + operation_name) + .then([value_extractor, loc](future> g) { + return value_extractor(g.get(), loc); + }); +} + +GOOGLE_CLOUD_CPP_BIGQUERY_INLINE_NAMESPACE_END +} // namespace bigquery_unified_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_BIGQUERY_GOOGLE_CLOUD_BIGQUERY_UNIFIED_INTERNAL_ASYNC_REST_LONG_RUNNING_OPERATION_CUSTOM_H diff --git a/google/cloud/bigquery_unified/internal/connection_impl.cc b/google/cloud/bigquery_unified/internal/connection_impl.cc index 5a6dbef..eb1dab0 100644 --- a/google/cloud/bigquery_unified/internal/connection_impl.cc +++ b/google/cloud/bigquery_unified/internal/connection_impl.cc @@ -13,25 +13,54 @@ // limitations under the License. #include "google/cloud/bigquery_unified/internal/connection_impl.h" +#include "google/cloud/bigquery_unified/idempotency_policy.h" +#include "google/cloud/bigquery_unified/internal/async_rest_long_running_operation_custom.h" +#include "google/cloud/bigquery_unified/internal/default_options.h" +#include "google/cloud/bigquery_unified/job_options.h" +#include "google/cloud/bigquery_unified/retry_policy.h" #include "google/cloud/bigquerycontrol/v2/internal/job_option_defaults.h" #include "google/cloud/bigquerycontrol/v2/internal/job_rest_connection_impl.h" #include "google/cloud/bigquerycontrol/v2/internal/job_rest_stub_factory.h" #include "google/cloud/bigquerycontrol/v2/internal/job_tracing_connection.h" #include "google/cloud/background_threads.h" #include "google/cloud/internal/rest_background_threads_impl.h" +#include "google/cloud/internal/rest_retry_loop.h" namespace google::cloud::bigquery_unified_internal { GOOGLE_CLOUD_CPP_BIGQUERY_INLINE_NAMESPACE_BEGIN +namespace { + +std::unique_ptr retry_policy( + Options const& options) { + return options.get()->clone(); +} + +std::unique_ptr backoff_policy(Options const& options) { + return options.get()->clone(); +} + +std::unique_ptr idempotency_policy( + Options const& options) { + return options.get()->clone(); +} + +std::unique_ptr polling_policy(Options const& options) { + return options.get()->clone(); +} + +} // namespace ConnectionImpl::ConnectionImpl( std::shared_ptr job_connection, google::cloud::Options job_options, std::shared_ptr job_stub, + std::unique_ptr background, google::cloud::Options options) : job_connection_(std::move(job_connection)), job_stub_(std::move(job_stub)), job_options_(std::move(job_options)), + background_(std::move(background)), options_(std::move(options)) {} StatusOr ConnectionImpl::GetJob( @@ -43,10 +72,94 @@ StatusOr ConnectionImpl::GetJob( return job_connection_->GetJob(request); } +future> ConnectionImpl::InsertJob( + google::cloud::bigquery::v2::Job const& job, Options opts) { + // TODO: Instead of creating an OptionsSpan, pass opts when job_connection_ + // supports it. + internal::OptionsSpan span(internal::MergeOptions( + std::move(opts), internal::MergeOptions(options_, job_options_))); + auto current_options = google::cloud::internal::SaveCurrentOptions(); + google::cloud::bigquery::v2::InsertJobRequest insert_request; + auto const billing_project = + current_options->has() + ? current_options->get() + : ""; + + insert_request.set_project_id(billing_project); + *insert_request.mutable_job() = job; + auto idempotency = idempotency_policy(*current_options) + ->InsertJob(insert_request, *current_options); + auto insert_response = rest_internal::RestRetryLoop( + retry_policy(*current_options), backoff_policy(*current_options), + std::move(idempotency), + [stub = job_stub_]( + rest_internal::RestContext& context, google::cloud::Options options, + google::cloud::bigquery::v2::InsertJobRequest const& request) + -> StatusOr { + return stub->InsertJob(context, options, request); + }, + *current_options, insert_request, __func__); + + return bigquery_unified_internal::AsyncRestAwaitLongRunningOperation< + google::cloud::bigquery::v2::Job, google::cloud::bigquery::v2::Job, + google::cloud::bigquery::v2::GetJobRequest, + google::cloud::bigquery::v2::CancelJobRequest>( + background_->cq(), current_options, *insert_response, + [stub = job_stub_]( + CompletionQueue& cq, + std::unique_ptr context, + google::cloud::internal::ImmutableOptions options, + google::cloud::bigquery::v2::GetJobRequest const& request) + -> future> { + return make_ready_future( + stub->GetJob(*std::move(context), *std::move(options), request)); + }, + [stub = job_stub_]( + CompletionQueue& cq, + std::unique_ptr context, + google::cloud::internal::ImmutableOptions options, + google::cloud::bigquery::v2::CancelJobRequest const& request) + -> future { + auto cancel_response = + stub->CancelJob(*std::move(context), *std::move(options), request); + if (!cancel_response) { + return make_ready_future(std::move(cancel_response).status()); + } + return make_ready_future(Status{}); + }, + [](StatusOr op, std::string const&) { + return op; + }, + polling_policy(*current_options), __func__, + [](google::cloud::bigquery::v2::Job const& op) { + return op.status().state() == "DONE"; + }, + [ref = insert_response->job_reference()]( + std::string const&, google::cloud::bigquery::v2::GetJobRequest& r) { + r.set_project_id(ref.project_id()); + r.set_job_id(ref.job_id()); + r.set_location(ref.location().value()); + }, + [ref = insert_response->job_reference()]( + std::string const&, + google::cloud::bigquery::v2::CancelJobRequest& r) { + r.set_project_id(ref.project_id()); + r.set_job_id(ref.job_id()); + r.set_location(ref.location().value()); + }, + [](StatusOr const&) { + return std::string{"InsertJob"}; + }); +} + std::shared_ptr MakeDefaultConnectionImpl( Options options) { + auto background = std::make_unique< + rest_internal::AutomaticallyCreatedRestBackgroundThreads>(); auto job_options = bigquerycontrol_v2_internal::JobServiceDefaultOptions(options); + // TODO: JobServiceRestConnectionImpl requires background threads, but does it + // actually use them? Needs further investigation. auto job_background = std::make_unique< rest_internal::AutomaticallyCreatedRestBackgroundThreads>(); auto job_stub = @@ -62,7 +175,7 @@ std::shared_ptr MakeDefaultConnectionImpl( // operations. return std::make_shared( std::move(job_connection), std::move(job_options), std::move(job_stub), - std::move(options)); + std::move(background), std::move(options)); } GOOGLE_CLOUD_CPP_BIGQUERY_INLINE_NAMESPACE_END diff --git a/google/cloud/bigquery_unified/internal/connection_impl.h b/google/cloud/bigquery_unified/internal/connection_impl.h index 2b98260..73e5e8d 100644 --- a/google/cloud/bigquery_unified/internal/connection_impl.h +++ b/google/cloud/bigquery_unified/internal/connection_impl.h @@ -19,6 +19,7 @@ #include "google/cloud/bigquery_unified/version.h" #include "google/cloud/bigquerycontrol/v2/internal/job_rest_stub.h" #include "google/cloud/bigquerycontrol/v2/job_connection.h" +#include "google/cloud/background_threads.h" namespace google::cloud::bigquery_unified_internal { GOOGLE_CLOUD_CPP_BIGQUERY_INLINE_NAMESPACE_BEGIN @@ -30,6 +31,7 @@ class ConnectionImpl : public bigquery_unified::Connection { job_connection, google::cloud::Options job_options, std::shared_ptr job_stub, + std::unique_ptr background, google::cloud::Options options); ~ConnectionImpl() override = default; @@ -40,10 +42,14 @@ class ConnectionImpl : public bigquery_unified::Connection { google::cloud::bigquery::v2::GetJobRequest const& request, Options opts) override; + future> InsertJob( + google::cloud::bigquery::v2::Job const& job, Options opts) override; + private: std::shared_ptr job_connection_; std::shared_ptr job_stub_; Options job_options_; + std::unique_ptr background_; Options options_; }; diff --git a/google/cloud/bigquery_unified/mocks/mock_connection.h b/google/cloud/bigquery_unified/mocks/mock_connection.h index 167b93e..b84e086 100644 --- a/google/cloud/bigquery_unified/mocks/mock_connection.h +++ b/google/cloud/bigquery_unified/mocks/mock_connection.h @@ -73,14 +73,12 @@ class MockConnection : public Connection { // InsertJob MOCK_METHOD(future>, InsertJob, - (google::cloud::bigquery::v2::InsertJobRequest const& request, - Options opts), + (google::cloud::bigquery::v2::Job const& job, Options opts), (override)); MOCK_METHOD(StatusOr, InsertJob, (google::cloud::NoAwaitTag, - google::cloud::bigquery::v2::InsertJobRequest const& request, - Options opts), + google::cloud::bigquery::v2::Job const& job, Options opts), (override)); MOCK_METHOD(future>, InsertJob,