Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/share/change_stream/ob_change_stream_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ int ObCSDispatcher::do_dispatch_()

ObMultiVersionSchemaService *schema_service = nullptr;
bool trans_started = false;
// Bump worker timeout to 5 minutes so that ObInnerSQLConnection propagates
// the same value to session's ob_query_timeout / ob_trx_timeout when the
// batch's inner-SQL statements are executed.
static const int64_t CS_DISPATCH_TRANS_TIMEOUT_US = 5L * 60L * 1000L * 1000L;
const int64_t saved_worker_timeout_ts = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtil::current_time() + CS_DISPATCH_TRANS_TIMEOUT_US);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(exec_ctx->init_plugins())) {
LOG_WARN("init plugins failed", KR(ret));
Expand All @@ -509,6 +515,7 @@ int ObCSDispatcher::do_dispatch_()
} else {
trans_started = true;
}
THIS_WORKER.set_timeout_ts(saved_worker_timeout_ts);

// ── Phase 2: push subtasks to workers ──
// task_count_ is set once and NEVER modified afterwards; this avoids a
Expand Down
2 changes: 1 addition & 1 deletion src/share/change_stream/ob_change_stream_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace share
{

/// Interval for advancing min_dep_lsn to global_stat (us).
static constexpr int64_t CS_FETCHER_MIN_DEP_LSN_ADVANCE_INTERVAL_US = 10 * 1000 * 1000;
static constexpr int64_t CS_FETCHER_MIN_DEP_LSN_ADVANCE_INTERVAL_US = 5 * 1000 * 1000;
/// Interval for advancing refresh_scn to global_stat (us).
static constexpr int64_t CS_FETCHER_REFRESH_SCN_ADVANCE_INTERVAL_US = 200 * 1000;
/// Interval for schema version check and mode switching (us).
Expand Down
13 changes: 9 additions & 4 deletions src/share/change_stream/ob_cs_plugin_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ int ObCSAsyncIndexProcessor::resolve_vector_index_info_(
common::ObSEArray<schema::ObColDesc, 16> col_descs;
if (OB_FAIL(data_table_schema->get_simple_index_infos(simple_index_infos))) {
LOG_WARN("fail to get simple index infos", K(ret), K(table_id));
} else if (OB_FAIL(data_table_schema->get_column_ids(col_descs))) {
LOG_WARN("fail to get column descs", K(ret), K(table_id));
} else if (OB_FAIL(data_table_schema->get_store_column_ids(col_descs))) {
LOG_WARN("fail to get store column descs", K(ret), K(table_id));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < simple_index_infos.count(); ++i) {
const uint64_t index_table_id = simple_index_infos.at(i).table_id_;
Expand Down Expand Up @@ -720,7 +720,12 @@ int ObCSAsyncIndexProcessor::build_das_ins_rtdef_(common::ObArenaAllocator &allo
LOG_WARN("ins_rtdef is null after allocation", K(ret));
} else {
const int64_t current_time = common::ObTimeUtility::current_time();
const int64_t timeout_us = GCONF.internal_sql_execute_timeout;
// Use at least 5 minutes for change-stream async index DAS insert to
// tolerate large batches; internal_sql_execute_timeout default is 30s
// which is too small here.
static const int64_t CS_ASYNC_INDEX_DAS_TIMEOUT_US = 5L * 60L * 1000L * 1000L;
const int64_t default_timeout_us = GCONF.internal_sql_execute_timeout;
const int64_t timeout_us = MAX(default_timeout_us, CS_ASYNC_INDEX_DAS_TIMEOUT_US);
ins_rtdef->timeout_ts_ = current_time + timeout_us;
ins_rtdef->tenant_schema_version_ = ctx_.schema_version_;
ins_rtdef->prelock_ = false;
Expand Down Expand Up @@ -1202,7 +1207,7 @@ int ObCSAsyncIndexProcessor::write_to_vsag_(
}
}
}
if (OB_SUCC(ret) && OB_NOT_NULL(adaptor) && REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
if (OB_SUCC(ret) && OB_NOT_NULL(adaptor) && REACH_TIME_INTERVAL(500 * 1000)) {
int tmp_ret = adaptor->refresh_bitmap_background();
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("background bitmap refresh failed (non-fatal), will retry on next query",
Expand Down
Loading