diff --git a/src/share/change_stream/ob_change_stream_dispatcher.cpp b/src/share/change_stream/ob_change_stream_dispatcher.cpp index f1289a744..bb85aac6a 100644 --- a/src/share/change_stream/ob_change_stream_dispatcher.cpp +++ b/src/share/change_stream/ob_change_stream_dispatcher.cpp @@ -498,6 +498,12 @@ int ObCSDispatcher::do_dispatch_() ObMultiVersionSchemaService *schema_service = nullptr; bool trans_started = false; + // Bump worker timeout to 5 minutes so that ObInnerSQLConnection propagates + // the same value to session's ob_query_timeout / ob_trx_timeout when the + // batch's inner-SQL statements are executed. + static const int64_t CS_DISPATCH_TRANS_TIMEOUT_US = 5L * 60L * 1000L * 1000L; + const int64_t saved_worker_timeout_ts = THIS_WORKER.get_timeout_ts(); + THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + CS_DISPATCH_TRANS_TIMEOUT_US); if (OB_FAIL(ret)) { } else if (OB_FAIL(exec_ctx->init_plugins())) { LOG_WARN("init plugins failed", KR(ret)); @@ -509,6 +515,7 @@ int ObCSDispatcher::do_dispatch_() } else { trans_started = true; } + THIS_WORKER.set_timeout_ts(saved_worker_timeout_ts); // ── Phase 2: push subtasks to workers ── // task_count_ is set once and NEVER modified afterwards; this avoids a diff --git a/src/share/change_stream/ob_change_stream_fetcher.h b/src/share/change_stream/ob_change_stream_fetcher.h index 6c98f1e13..8c0882fc8 100644 --- a/src/share/change_stream/ob_change_stream_fetcher.h +++ b/src/share/change_stream/ob_change_stream_fetcher.h @@ -40,7 +40,7 @@ namespace share { /// Interval for advancing min_dep_lsn to global_stat (us). -static constexpr int64_t CS_FETCHER_MIN_DEP_LSN_ADVANCE_INTERVAL_US = 10 * 1000 * 1000; +static constexpr int64_t CS_FETCHER_MIN_DEP_LSN_ADVANCE_INTERVAL_US = 5 * 1000 * 1000; /// Interval for advancing refresh_scn to global_stat (us). static constexpr int64_t CS_FETCHER_REFRESH_SCN_ADVANCE_INTERVAL_US = 200 * 1000; /// Interval for schema version check and mode switching (us). diff --git a/src/share/change_stream/ob_cs_plugin_async_index.cpp b/src/share/change_stream/ob_cs_plugin_async_index.cpp index 21260d3d0..19f9930b5 100644 --- a/src/share/change_stream/ob_cs_plugin_async_index.cpp +++ b/src/share/change_stream/ob_cs_plugin_async_index.cpp @@ -283,8 +283,8 @@ int ObCSAsyncIndexProcessor::resolve_vector_index_info_( common::ObSEArray col_descs; if (OB_FAIL(data_table_schema->get_simple_index_infos(simple_index_infos))) { LOG_WARN("fail to get simple index infos", K(ret), K(table_id)); - } else if (OB_FAIL(data_table_schema->get_column_ids(col_descs))) { - LOG_WARN("fail to get column descs", K(ret), K(table_id)); + } else if (OB_FAIL(data_table_schema->get_store_column_ids(col_descs))) { + LOG_WARN("fail to get store column descs", K(ret), K(table_id)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < simple_index_infos.count(); ++i) { const uint64_t index_table_id = simple_index_infos.at(i).table_id_; @@ -720,7 +720,12 @@ int ObCSAsyncIndexProcessor::build_das_ins_rtdef_(common::ObArenaAllocator &allo LOG_WARN("ins_rtdef is null after allocation", K(ret)); } else { const int64_t current_time = common::ObTimeUtility::current_time(); - const int64_t timeout_us = GCONF.internal_sql_execute_timeout; + // Use at least 5 minutes for change-stream async index DAS insert to + // tolerate large batches; internal_sql_execute_timeout default is 30s + // which is too small here. + static const int64_t CS_ASYNC_INDEX_DAS_TIMEOUT_US = 5L * 60L * 1000L * 1000L; + const int64_t default_timeout_us = GCONF.internal_sql_execute_timeout; + const int64_t timeout_us = MAX(default_timeout_us, CS_ASYNC_INDEX_DAS_TIMEOUT_US); ins_rtdef->timeout_ts_ = current_time + timeout_us; ins_rtdef->tenant_schema_version_ = ctx_.schema_version_; ins_rtdef->prelock_ = false; @@ -1202,7 +1207,7 @@ int ObCSAsyncIndexProcessor::write_to_vsag_( } } } - if (OB_SUCC(ret) && OB_NOT_NULL(adaptor) && REACH_TIME_INTERVAL(1 * 1000 * 1000)) { + if (OB_SUCC(ret) && OB_NOT_NULL(adaptor) && REACH_TIME_INTERVAL(500 * 1000)) { int tmp_ret = adaptor->refresh_bitmap_background(); if (OB_SUCCESS != tmp_ret) { LOG_WARN("background bitmap refresh failed (non-fatal), will retry on next query",