Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking β€œSign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Mysql Concurrency Perf Test #55907

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.ZoneOffset

/**
@@ -39,7 +40,6 @@ sealed class FeedBootstrap<T : Feed>(
/** [Feed] to emit records for. */
val feed: T
) {

/** Delegates to [StateManager.feeds]. */
val feeds: List<Feed>
get() = stateManager.feeds
@@ -76,18 +76,19 @@ sealed class FeedBootstrap<T : Feed>(
StreamRecordConsumer {

override fun accept(recordData: ObjectNode, changes: Map<Field, FieldValueChange>?) {
if (changes.isNullOrEmpty()) {
acceptWithoutChanges(recordData)
} else {
val protocolChanges: List<AirbyteRecordMessageMetaChange> =
changes.map { (field: Field, fieldValueChange: FieldValueChange) ->
AirbyteRecordMessageMetaChange()
.withField(field.id)
.withChange(fieldValueChange.protocolChange())
.withReason(fieldValueChange.protocolReason())
}
acceptWithChanges(recordData, protocolChanges)
}
return // dev-null
// if (changes.isNullOrEmpty()) {
// acceptWithoutChanges(recordData)
// } else {
// val protocolChanges: List<AirbyteRecordMessageMetaChange> =
// changes.map { (field: Field, fieldValueChange: FieldValueChange) ->
// AirbyteRecordMessageMetaChange()
// .withField(field.id)
// .withChange(fieldValueChange.protocolChange())
// .withReason(fieldValueChange.protocolReason())
// }
// acceptWithChanges(recordData, protocolChanges)
// }
}

private fun acceptWithoutChanges(recordData: ObjectNode) {
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.read

import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.command.OpaqueStateValue
@@ -79,10 +80,12 @@ class JdbcNonResumablePartitionReader<P : JdbcPartition<*>>(
),
)
.use { result: SelectQuerier.Result ->
var count = 0
for (row in result) {
out(row)
numRecords.incrementAndGet()
count ++
}
numRecords.addAndGet(count.toLong())
}
runComplete.set(true)
}
@@ -126,14 +129,18 @@ class JdbcResumablePartitionReader<P : JdbcSplittablePartition<*>>(
SelectQuerier.Parameters(reuseResultObject = true, fetchSize = fetchSize),
)
.use { result: SelectQuerier.Result ->
var lastRecordLocal: ObjectNode? = null
var count = 0
for (row in result) {
out(row)
lastRecord.set(row.data)
lastRecordLocal = row.data
// Check activity periodically to handle timeout.
if (numRecords.incrementAndGet() % fetchSize == 0L) {
if (++count % fetchSize == 0) {
coroutineContext.ensureActive()
}
}
numRecords.addAndGet(count.toLong())
lastRecordLocal?.let { lastRecord.set(it) }
}
runComplete.set(true)
}
@@ -157,7 +164,9 @@ class JdbcResumablePartitionReader<P : JdbcSplittablePartition<*>>(
streamState.updateLimitState { it.down }
}
}
val checkpointState: OpaqueStateValue = partition.incompleteState(lastRecord.get()!!)
val checkpointState: OpaqueStateValue = partition.incompleteState(
lastRecord.get() ?: JsonNodeFactory.instance.objectNode()
)
return PartitionReadCheckpoint(checkpointState, numRecords.get())
}
}
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ application {
airbyteBulkConnector {
core = 'extract'
toolkits = ['extract-jdbc', 'extract-cdc']
cdk = '0.342'
cdk = 'local'
}

dependencies {
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.11.9
dockerImageTag: 3.11.10
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ class MySqlSourceSelectQuerier(
val mySqlParameters: SelectQuerier.Parameters =
// MySQL requires this fetchSize setting on JDBC Statements to enable adaptive fetching.
// The ResultSet fetchSize value is what's used as an actual hint by the JDBC driver.
parameters.copy(statementFetchSize = Int.MIN_VALUE)
parameters.copy(reuseResultObject = false)
return wrapped.executeQuery(q, mySqlParameters)
}
}
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ airbyte:
connector:
extract:
jdbc:
mode: sequential
mode: concurrent
with-sampling: true
table-sample-size: 1024
throughput-bytes-per-second: 10000000
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.SourceConfigurationFactory
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Requires
import io.micronaut.context.env.Environment
@@ -14,15 +15,19 @@ import java.time.Duration
@Primary
class MySqlSourceTestConfigurationFactory(val featureFlags: Set<FeatureFlag>) :
SourceConfigurationFactory<MySqlSourceConfigurationSpecification, MySqlSourceConfiguration> {
override fun makeWithoutExceptionHandling(
private val log = KotlinLogging.logger {}

override fun makeWithoutExceptionHandling(
pojo: MySqlSourceConfigurationSpecification,
): MySqlSourceConfiguration =
MySqlSourceConfigurationFactory(featureFlags)
): MySqlSourceConfiguration {
log.info { "Concurrency: ${pojo.concurrency} => ${pojo.concurrency ?: 1 }" }
return MySqlSourceConfigurationFactory(featureFlags)
.makeWithoutExceptionHandling(pojo)
.copy(
maxConcurrency = 1,
maxConcurrency = pojo.concurrency ?: 1,
checkpointTargetInterval = Duration.ofSeconds(3),
debeziumHeartbeatInterval = Duration.ofMillis(100),
debeziumKeepAliveInterval = Duration.ofSeconds(1),
)
}
}