diff --git a/.github/workflows/pr-test-integration.yml b/.github/workflows/pr-test-integration.yml index 936627f87a08a..ffa6e9f4283bd 100644 --- a/.github/workflows/pr-test-integration.yml +++ b/.github/workflows/pr-test-integration.yml @@ -152,6 +152,48 @@ jobs: set -euo pipefail readarray -t PACKAGES <<< "$(./scripts/ci/backend-tests/pkgs-with-tests-named.sh -b TestIntegration | ./scripts/ci/backend-tests/shard.sh -N"$SHARD" -d-)" CGO_ENABLED=0 go test -p=1 -tags=mysql -timeout=8m -run '^TestIntegration' "${PACKAGES[@]}" + ydb: + needs: detect-changes + if: needs.detect-changes.outputs.changed == 'true' + strategy: + matrix: + shard: [ + 1/16, 2/16, 3/16, 4/16, + 5/16, 6/16, 7/16, 8/16, + 9/16, 10/16, 11/16, 12/16, + 13/16, 14/16, 15/16, 16/16, + ] + fail-fast: false + + name: YDB (${{ matrix.shard }}) + runs-on: ubuntu-x64-large-io + permissions: + contents: read + env: + GRAFANA_TEST_DB: ydb + services: + ydb: + image: ydbplatform/local-ydb:25.2.1.7 + options: --hostname=localhost + ports: + - 2136:2136 + steps: + - name: Checkout code + uses: actions/checkout@v5 + with: + persist-credentials: false + - name: Setup Go + uses: actions/setup-go@v5.5.0 + with: + go-version-file: go.mod + cache: true + - name: Run tests + env: + SHARD: ${{ matrix.shard }} + run: | + set -euo pipefail + readarray -t PACKAGES <<< "$(./scripts/ci/backend-tests/pkgs-with-tests-named.sh -b TestIntegration | ./scripts/ci/backend-tests/shard.sh -N"$SHARD" -d-)" + CGO_ENABLED=0 go test -p=1 -tags=ydb -timeout=8m -run '^TestIntegration' "${PACKAGES[@]}" postgres: needs: detect-changes if: needs.detect-changes.outputs.changed == 'true' @@ -201,7 +243,6 @@ jobs: set -euo pipefail readarray -t PACKAGES <<< "$(./scripts/ci/backend-tests/pkgs-with-tests-named.sh -b TestIntegration | ./scripts/ci/backend-tests/shard.sh -N"$SHARD" -d-)" CGO_ENABLED=0 go test -p=1 -tags=postgres -timeout=8m -run '^TestIntegration' "${PACKAGES[@]}" - # This is the job that is actually required by rulesets. # We want to only require one job instead of all the individual tests and shards. # Future work also allows us to start skipping some tests based on changed files. @@ -210,6 +251,7 @@ jobs: - mysql - postgres - sqlite + - ydb # always() is the best function here. # success() || failure() will skip this function if any need is also skipped. # That means conditional test suites will fail the entire requirement check. diff --git a/Makefile b/Makefile index c2ac23809c57f..e2fd07f0dfcd3 100644 --- a/Makefile +++ b/Makefile @@ -360,6 +360,14 @@ test-go-integration-mysql: devenv-mysql ## Run integration tests for mysql backe $(GO) test $(GO_RACE_FLAG) $(GO_TEST_FLAGS) -p=1 -count=1 -run "^TestIntegration" -covermode=atomic -timeout=10m \ $(shell ./scripts/ci/backend-tests/pkgs-with-tests-named.sh -b TestIntegration | ./scripts/ci/backend-tests/shard.sh -n$(SHARD) -m$(SHARDS) -s) + +.PHONY: test-go-integration-ydb +test-go-integration-ydb: devenv-ydb ## Run integration tests for ydb backend with flags. + @echo "test backend integration ydb tests" + GRAFANA_TEST_DB=ydb \ + $(GO) test $(GO_RACE_FLAG) $(GO_TEST_FLAGS) -p=1 -count=1 -run "^TestIntegration" -covermode=atomic -timeout=10m \ + $(shell ./scripts/ci/backend-tests/pkgs-with-tests-named.sh -b TestIntegration | ./scripts/ci/backend-tests/shard.sh -n$(SHARD) -m$(SHARDS) -s) + .PHONY: test-go-integration-redis test-go-integration-redis: ## Run integration tests for redis cache. @echo "test backend integration redis tests" @@ -512,6 +520,11 @@ devenv-mysql: @cd devenv; \ sources=mysql_tests +.PHONY: devenv-ydb +devenv-ydb: + @cd devenv; \ + sources=ydb_tests + ##@ Helpers # We separate the protobuf generation because most development tasks on diff --git a/devenv/docker/blocks/ydb_tests/.env b/devenv/docker/blocks/ydb_tests/.env new file mode 100644 index 0000000000000..e14fbbd35af92 --- /dev/null +++ b/devenv/docker/blocks/ydb_tests/.env @@ -0,0 +1 @@ +ydb_version=25.2.1.7 \ No newline at end of file diff --git a/devenv/docker/blocks/ydb_tests/docker-compose.yaml b/devenv/docker/blocks/ydb_tests/docker-compose.yaml new file mode 100644 index 0000000000000..b4bea58a64c21 --- /dev/null +++ b/devenv/docker/blocks/ydb_tests/docker-compose.yaml @@ -0,0 +1,6 @@ + ydbtest: + image: ydbplatform/local-ydb:${ydb_version} + hostname: localhost + ports: + - "2136:2136" # Simple + - "8765:8765" # Monitor diff --git a/go.mod b/go.mod index 8754c16fc81ea..f6628f8cbf926 100644 --- a/go.mod +++ b/go.mod @@ -258,6 +258,7 @@ require ( // Check go.work file for details github.com/grafana/grafana/pkg/promlib v0.0.8 // @grafana/oss-big-tent github.com/grafana/grafana/pkg/semconv v0.0.0-20250804150913-990f1c69ecc2 // @grafana/grafana-app-platform-squad + github.com/ydb-platform/ydb-go-sdk/v3 v3.118.0 // @grafana/grafana-search-and-storage ) // Replace the workspace versions @@ -653,7 +654,10 @@ require ( sigs.k8s.io/yaml v1.6.0 // indirect ) -require github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect +require ( + github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect + github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9 // indirect +) // Use fork of crewjam/saml with fixes for some issues until changes get merged into upstream replace github.com/crewjam/saml => github.com/grafana/saml v0.4.15-0.20240917091248-ae3bbdad8a56 diff --git a/go.sum b/go.sum index acb8cdb6a3790..0a862fcadc1c1 100644 --- a/go.sum +++ b/go.sum @@ -2289,6 +2289,8 @@ github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01j github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/redis/rueidis v1.0.64 h1:XqgbueDuNV3qFdVdQwAHJl1uNt90zUuAJuzqjH4cw6Y= github.com/redis/rueidis v1.0.64/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA= +github.com/rekby/fixenv v0.6.1 h1:jUFiSPpajT4WY2cYuc++7Y1zWrnCxnovGCIX72PZniM= +github.com/rekby/fixenv v0.6.1/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+XWk4c= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= @@ -2510,6 +2512,10 @@ github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGC github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9 h1:SKqSRP6/ocY2Z4twOqKEKxpmawVTHTvQiom7hrU6jt0= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-sdk/v3 v3.118.0 h1:wLnWJ29yzYdVds3WFRgkzij/2li9dknyRBO7B+reJfY= +github.com/ydb-platform/ydb-go-sdk/v3 v3.118.0/go.mod h1:UEMMk+JMunUveo2j+zlJEJ5I7ntf2+MbimciVNJYnNs= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= diff --git a/go.work.sum b/go.work.sum index 4ad9d238375d3..e09ec07dfa4a8 100644 --- a/go.work.sum +++ b/go.work.sum @@ -988,7 +988,6 @@ github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9 github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/cel-go v0.23.2/go.mod h1:52Pb6QsDbC5kvgxvZhiL9QX1oZEkcUF/ZqaPx1J5Wwo= github.com/google/cel-go v0.26.0/go.mod h1:A9O8OU9rdvrK5MQyrqfIxo1a0u4g3sF8KB6PUIaryMM= -github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY= github.com/google/go-jsonnet v0.18.0 h1:/6pTy6g+Jh1a1I2UMoAODkqELFiVIdOxbNwv0DDzoOg= github.com/google/go-jsonnet v0.18.0/go.mod h1:C3fTzyVJDslXdiTqw/bTFk7vSGyCtH3MGRbDfvEwGd0= github.com/google/go-pkcs11 v0.3.0 h1:PVRnTgtArZ3QQqTGtbtjtnIkzl2iY2kt24yqbrf7td8= @@ -1063,7 +1062,6 @@ github.com/grafana/dskit v0.0.0-20250818234656-8ff9c6532e85/go.mod h1:kImsvJ1xnm github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1/go.mod h1:j/s0jkda4UXTemDs7Pgw/vMT06alWc42CHisvYac0qw= -github.com/grafana/gomemcache v0.0.0-20250828162811-a96f6acee2fe/go.mod h1:j/s0jkda4UXTemDs7Pgw/vMT06alWc42CHisvYac0qw= github.com/grafana/grafana-app-sdk v0.40.1/go.mod h1:4P8h7VB6KcDjX9bAoBQc6IP8iNylxe6bSXLR9gA39gM= github.com/grafana/grafana-app-sdk v0.41.0 h1:SYHN3U7B1myRKY3UZZDkFsue9TDmAOap0UrQVTqtYBU= github.com/grafana/grafana-app-sdk v0.41.0/go.mod h1:Wg/3vEZfok1hhIWiHaaJm+FwkosfO98o8KbeLFEnZpY= @@ -1497,8 +1495,6 @@ github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= -github.com/rekby/fixenv v0.6.1 h1:jUFiSPpajT4WY2cYuc++7Y1zWrnCxnovGCIX72PZniM= -github.com/rekby/fixenv v0.6.1/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+XWk4c= github.com/relvacode/iso8601 v1.6.0 h1:eFXUhMJN3Gz8Rcq82f9DTMW0svjtAVuIEULglM7QHTU= github.com/relvacode/iso8601 v1.6.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I= github.com/richardartoul/molecule v1.0.0 h1:+LFA9cT7fn8KF39zy4dhOnwcOwRoqKiBkPqKqya+8+U= @@ -1675,6 +1671,7 @@ github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 h1:LY github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-sdk/v3 v3.108.1 h1:ixAiqjj2S/dNuJqrz4AxSqgw2P5OBMXp68hB5nNriUk= github.com/ydb-platform/ydb-go-sdk/v3 v3.108.1/go.mod h1:l5sSv153E18VvYcsmr51hok9Sjc16tEC8AXGbwrk+ho= +github.com/ydb-platform/ydb-go-sdk/v3 v3.117.2/go.mod h1:UEMMk+JMunUveo2j+zlJEJ5I7ntf2+MbimciVNJYnNs= github.com/yosssi/ace v0.0.5 h1:tUkIP/BLdKqrlrPwcmH0shwEEhTRHoGnc1wFIWmaBUA= github.com/yosssi/ace v0.0.5/go.mod h1:ALfIzm2vT7t5ZE7uoIZqF3TQ7SAOyupFZnkrF5id+K0= github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE= @@ -1926,6 +1923,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= diff --git a/pkg/infra/db/db.go b/pkg/infra/db/db.go index ede3d64677cfd..2d9cd71af056e 100644 --- a/pkg/infra/db/db.go +++ b/pkg/infra/db/db.go @@ -95,3 +95,11 @@ func IsTestDBMSSQL() bool { return false } + +func IsTestDBYDB() bool { + if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present { + return db == migrator.YDB + } + + return false +} diff --git a/pkg/infra/usagestats/statscollector/concurrent_users.go b/pkg/infra/usagestats/statscollector/concurrent_users.go index f80fad134d754..91a6b7f38ff71 100644 --- a/pkg/infra/usagestats/statscollector/concurrent_users.go +++ b/pkg/infra/usagestats/statscollector/concurrent_users.go @@ -36,11 +36,11 @@ func (s *Service) concurrentUsers(ctx context.Context) (*concurrentUsersStats, e // Retrieves concurrent users stats as a histogram. Buckets are accumulative and upper bound is inclusive. rawSQL := ` SELECT - COUNT(CASE WHEN tokens <= 3 THEN 1 END) AS bucket_le_3, - COUNT(CASE WHEN tokens <= 6 THEN 1 END) AS bucket_le_6, - COUNT(CASE WHEN tokens <= 9 THEN 1 END) AS bucket_le_9, - COUNT(CASE WHEN tokens <= 12 THEN 1 END) AS bucket_le_12, - COUNT(CASE WHEN tokens <= 15 THEN 1 END) AS bucket_le_15, + COUNT(CASE WHEN tokens <= 3 THEN 1 ELSE NULL END) AS bucket_le_3, + COUNT(CASE WHEN tokens <= 6 THEN 1 ELSE NULL END) AS bucket_le_6, + COUNT(CASE WHEN tokens <= 9 THEN 1 ELSE NULL END) AS bucket_le_9, + COUNT(CASE WHEN tokens <= 12 THEN 1 ELSE NULL END) AS bucket_le_12, + COUNT(CASE WHEN tokens <= 15 THEN 1 ELSE NULL END) AS bucket_le_15, COUNT(1) AS bucket_le_inf FROM (select count(1) as tokens from user_auth_token group by user_id) uat;` _, err := sess.SQL(rawSQL).Get(s.concurrentUserStatsCache.stats) diff --git a/pkg/services/anonymous/anonimpl/anonstore/database.go b/pkg/services/anonymous/anonimpl/anonstore/database.go index 6fa8766ba4688..ac4b79dcb64b7 100644 --- a/pkg/services/anonymous/anonimpl/anonstore/database.go +++ b/pkg/services/anonymous/anonimpl/anonstore/database.go @@ -176,6 +176,9 @@ func (s *AnonDBStore) CreateOrUpdateDevice(ctx context.Context, device *Device) client_ip = excluded.client_ip, user_agent = excluded.user_agent, updated_at = excluded.updated_at` + case migrator.YDB: + query = `UPSERT INTO anon_device (device_id, client_ip, user_agent, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5)` default: return fmt.Errorf("unsupported database driver: %s", s.sqlStore.GetDBType()) } diff --git a/pkg/services/authz/rbac/store/basic_role_query.sql b/pkg/services/authz/rbac/store/basic_role_query.sql index a6b1c2e82c32a..47f5173b9ca59 100644 --- a/pkg/services/authz/rbac/store/basic_role_query.sql +++ b/pkg/services/authz/rbac/store/basic_role_query.sql @@ -1,4 +1,4 @@ SELECT COALESCE(ou.role, 'None') AS role, u.is_admin FROM {{ .Ident .UserTable }} as u - LEFT JOIN {{ .Ident .OrgUserTable }} as ou ON ou.user_id = u.id AND ou.org_id = {{ .Arg .Query.OrgID }} -WHERE u.id = {{ .Arg .Query.UserID }} + LEFT JOIN {{ .Ident .OrgUserTable }} as ou ON ou.user_id = u.id +WHERE u.id = {{ .Arg .Query.UserID }} AND ou.org_id = {{ .Arg .Query.OrgID }} diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/resource_type_partition_0.json b/pkg/services/cloudmigration/cloudmigrationimpl/resource_type_partition_0.json new file mode 100644 index 0000000000000..b3e3a1edcbfdd --- /dev/null +++ b/pkg/services/cloudmigration/cloudmigrationimpl/resource_type_partition_0.json @@ -0,0 +1 @@ +{"checksum":"edbc5d2098a249296e989868b3e30835228a6ef4d03a1305bb0b8bf6fd5b887f","data":"3xPR6Xuxqqp8MYf1/tE95gu7ogwkxsfJZ+1f26787pqGaUH6Au20lgwVrPazSXe7VlAH4qi/VsUcuUlSrrTZGHSTLQeryyDqjni9ltP1WpKfW7scBccdwReacZXkoIaALViDl6jjAm+su8g1xVgZwCac8aAAWX9QvYSD4gw="} \ No newline at end of file diff --git a/pkg/services/dashboards/database/migrations/folder_uid_mig.go b/pkg/services/dashboards/database/migrations/folder_uid_mig.go index 067948c3e50f1..87279782edc24 100644 --- a/pkg/services/dashboards/database/migrations/folder_uid_mig.go +++ b/pkg/services/dashboards/database/migrations/folder_uid_mig.go @@ -40,6 +40,14 @@ func (m *FolderUIDMigration) Exec(sess *xorm.Session, mgrtr *migrator.Migrator) WHERE d.is_folder = ?` } + if mgrtr.Dialect.DriverName() == migrator.YDB { + q = ` + UPSERT INTO dashboard SELECT folder.uid AS folder_uid, d.created AS created, + d.data as data, d.org_id as org_id, d.slug as slug, + d.title as title, d.updated as updated, d.version as version + FROM dashboard folder JOIN dashboard d ON d.folder_id = folder.id WHERE d.is_folder = ?` + } + r, err := sess.Exec(q, mgrtr.Dialect.BooleanValue(false)) if err != nil { mgrtr.Logger.Error("Failed to migrate dashboard folder_uid for dashboards", "error", err) @@ -68,6 +76,15 @@ func (m *FolderUIDMigration) Exec(sess *xorm.Session, mgrtr *migrator.Migrator) ) WHERE is_folder = ?` } + + if mgrtr.Dialect.DriverName() == migrator.YDB { + q = ` + UPSERT INTO dashboard SELECT folder.parent_uid AS folder_uid, d.created AS created, + d.data as data, d.org_id as org_id, d.slug as slug, + d.title as title, d.updated as updated, d.version as version + FROM folder JOIN dashboard d ON d.org_id = folder.org_id WHERE d.is_folder = ?` + } + r, err = sess.Exec(q, mgrtr.Dialect.BooleanValue(true)) if err != nil { mgrtr.Logger.Error("Failed to migrate dashboard folder_uid for folders", "error", err) diff --git a/pkg/services/folder/folderimpl/folder.go b/pkg/services/folder/folderimpl/folder.go index bad681c60a86c..c6bf1c2086b4d 100644 --- a/pkg/services/folder/folderimpl/folder.go +++ b/pkg/services/folder/folderimpl/folder.go @@ -169,6 +169,12 @@ func (s *Service) DBMigration(db db.DB) { SELECT uid, org_id, title, created, updated FROM dashboard WHERE is_folder = true ON CONFLICT(uid, org_id) DO UPDATE SET title=excluded.title, updated=excluded.updated `) + } else if db.GetDialect().DriverName() == migrator.YDB { + // covered by UQE_folder_org_id_uid + _, err = sess.Exec(` + UPSERT INTO folder (uid, org_id, title, created, updated) + SELECT uid, org_id, title, created, updated FROM dashboard WHERE is_folder + `) } else { // covered by UQE_folder_org_id_uid _, err = sess.Exec(` @@ -183,10 +189,18 @@ func (s *Service) DBMigration(db db.DB) { if deleteOldFolders { // covered by UQE_folder_org_id_uid - _, err = sess.Exec(` + q := ` DELETE FROM folder WHERE NOT EXISTS (SELECT 1 FROM dashboard WHERE dashboard.uid = folder.uid AND dashboard.org_id = folder.org_id AND dashboard.is_folder = true) - `) + ` + + if db.GetDialect().DriverName() == migrator.YDB { + q = ` + DELETE FROM folder ON + SELECT folder.id AS id FROM folder JOIN dashboard ON dashboard.uid = folder.uid + WHERE dashboard.org_id = folder.org_id AND dashboard.is_folder = true` + } + _, err = sess.Exec(q) } return err }) diff --git a/pkg/services/folder/folderimpl/sqlstore.go b/pkg/services/folder/folderimpl/sqlstore.go index 0644cae2d3dae..f208de55f8a7d 100644 --- a/pkg/services/folder/folderimpl/sqlstore.go +++ b/pkg/services/folder/folderimpl/sqlstore.go @@ -513,7 +513,11 @@ func (ss *FolderStoreImpl) GetFolders(ctx context.Context, q folder.GetFoldersFr if q.Limit > 0 { s.WriteString(` ORDER BY f0.title ASC`) s.WriteString(` LIMIT ? OFFSET ?`) - args = append(args, q.Limit, (q.Page-1)*q.Limit) + if ss.db.GetDialect().DriverName() == migrator.YDB { + args = append(args, uint64(q.Limit), uint64((q.Page-1)*q.Limit)) + } else { + args = append(args, q.Limit, uint64((q.Page-1)*q.Limit)) + } } else if q.OrderByTitle { s.WriteString(` ORDER BY f0.title ASC`) } @@ -623,10 +627,14 @@ func getFullpathSQL(dialect migrator.Dialect) string { if dialect.DriverName() == migrator.MySQL { escaped = `\\/` } + replaceExpr := "REPLACE" + if dialect.DriverName() == migrator.YDB { + replaceExpr = "Unicode::ReplaceAll" + } concatCols := make([]string, 0, folder.MaxNestedFolderDepth) - concatCols = append(concatCols, fmt.Sprintf("COALESCE(REPLACE(f0.title, '/', '%s'), '')", escaped)) + concatCols = append(concatCols, fmt.Sprintf("COALESCE(%s(f0.title, '/', '%s'), '')", replaceExpr, escaped)) for i := 1; i <= folder.MaxNestedFolderDepth; i++ { - concatCols = append([]string{fmt.Sprintf("COALESCE(REPLACE(f%d.title, '/', '%s'), '')", i, escaped), "'/'"}, concatCols...) + concatCols = append([]string{fmt.Sprintf("COALESCE(%s(f%d.title, '/', '%s'), '')", replaceExpr, i, escaped), "'/'"}, concatCols...) } return dialect.Concat(concatCols...) } diff --git a/pkg/services/sqlstore/database_config.go b/pkg/services/sqlstore/database_config.go index f48130d5738d8..987f9953453b0 100644 --- a/pkg/services/sqlstore/database_config.go +++ b/pkg/services/sqlstore/database_config.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/go-sql-driver/mysql" + "github.com/ydb-platform/ydb-go-sdk/v3/sugar" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/sqlstore/migrator" @@ -146,7 +147,7 @@ func (dbCfg *DatabaseConfig) buildConnectionString(cfg *setting.Cfg, features fe protocol = "unix" } - cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true&clientFoundRows=true&parseTime=true", + cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true&clientFoundRows=true", dbCfg.User, dbCfg.Pwd, protocol, dbCfg.Host, dbCfg.Name) if dbCfg.SslMode == "true" || dbCfg.SslMode == "skip-verify" { @@ -210,6 +211,12 @@ func (dbCfg *DatabaseConfig) buildConnectionString(cfg *setting.Cfg, features fe } cnnstr += buildExtraConnectionString('&', dbCfg.UrlQueryParams) + case migrator.YDB: + cnnstr = sugar.DSN(dbCfg.Host, dbCfg.Name) + + cnnstr += buildExtraConnectionString(' ', dbCfg.UrlQueryParams) + + // cnnstr = "grpc://127.0.0.1:2136/local?go_query_mode=query&go_fake_tx=query&go_query_bind=numeric" // TODO: default: return fmt.Errorf("unknown database type: %s", dbCfg.Type) } diff --git a/pkg/services/sqlstore/migrations/accesscontrol/admin_only.go b/pkg/services/sqlstore/migrations/accesscontrol/admin_only.go index fc9a5c428ac3b..d8a7069ba70f5 100644 --- a/pkg/services/sqlstore/migrations/accesscontrol/admin_only.go +++ b/pkg/services/sqlstore/migrations/accesscontrol/admin_only.go @@ -32,13 +32,15 @@ func (m *adminOnlyMigrator) Exec(sess *xorm.Session, mg *migrator.Migrator) erro // Find all dashboards and folders that should have only admin permission in acl // When a dashboard or folder only has admin permission the acl table should be empty and the has_acl set to true + + // TODO: check in Postgres sql := ` SELECT res.uid, res.is_folder, res.org_id - FROM (SELECT dashboard.id, dashboard.uid, dashboard.is_folder, dashboard.org_id, count(dashboard_acl.id) as count + FROM (SELECT dashboard.id, dashboard.uid AS uid, dashboard.is_folder AS is_folder, dashboard.org_id AS org_id, count(dashboard_acl.id) as count FROM dashboard LEFT JOIN dashboard_acl ON dashboard.id = dashboard_acl.dashboard_id - WHERE dashboard.has_acl IS TRUE - GROUP BY dashboard.id) as res + WHERE dashboard.has_acl + GROUP BY dashboard.id, dashboard.uid, dashboard.is_folder, dashboard.org_id) as res WHERE res.count = 0 ` diff --git a/pkg/services/sqlstore/migrations/accesscontrol/migrations.go b/pkg/services/sqlstore/migrations/accesscontrol/migrations.go index 2502caf51dc53..edb4d7a3b5e96 100644 --- a/pkg/services/sqlstore/migrations/accesscontrol/migrations.go +++ b/pkg/services/sqlstore/migrations/accesscontrol/migrations.go @@ -157,15 +157,16 @@ func AddMigration(mg *migrator.Migrator) { {Name: "builtin_role", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, {Name: "role_name", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, }, - Indices: []*migrator.Index{ - {Cols: []string{"builtin_role", "role_name"}, Type: migrator.UniqueIndex}, - }, + } + + if mg.DBEngine.DriverName() == migrator.YDB { + seedAssignmentV1.Columns[0].IsPrimaryKey = true + seedAssignmentV1.Columns[1].IsPrimaryKey = true } mg.AddMigration("create seed assignment table", migrator.NewAddTableMigration(seedAssignmentV1)) //------- indexes ------------------ - mg.AddMigration("add unique index builtin_role_role_name", migrator.NewAddIndexMigration(seedAssignmentV1, seedAssignmentV1.Indices[0])) mg.AddMigration("add column hidden to role table", migrator.NewAddColumnMigration(roleV1, &migrator.Column{ Name: "hidden", Type: migrator.DB_Bool, Nullable: false, Default: "0", @@ -210,12 +211,4 @@ func AddMigration(mg *migrator.Migrator) { Type: migrator.UniqueIndex, Cols: []string{"org_id", "user_id", "role_id"}, })) - - mg.AddMigration("add permission role_id action index", migrator.NewAddIndexMigration(permissionV1, &migrator.Index{ - Cols: []string{"role_id", "action"}, - })) - - mg.AddMigration("Remove permission role_id index", migrator.NewDropIndexMigration(permissionV1, &migrator.Index{ - Cols: []string{"role_id"}, - })) } diff --git a/pkg/services/sqlstore/migrations/accesscontrol/orphaned.go b/pkg/services/sqlstore/migrations/accesscontrol/orphaned.go index 2ef0302cde29b..51736d35345e4 100644 --- a/pkg/services/sqlstore/migrations/accesscontrol/orphaned.go +++ b/pkg/services/sqlstore/migrations/accesscontrol/orphaned.go @@ -32,7 +32,7 @@ func (m *orphanedServiceAccountPermissions) Exec(sess *xorm.Session, mg *migrato var idents []string // find all permissions that are scopes directly to a service account - err := sess.SQL("SELECT DISTINCT p.identifier FROM permission AS p WHERE p.kind = 'serviceaccounts' AND NOT p.identifier = '*'").Find(&idents) + err := sess.SQL("SELECT DISTINCT p.identifier FROM permission AS p WHERE p.kind = 'serviceaccounts' AND p.identifier != '*'").Find(&idents) if err != nil { return fmt.Errorf("failed to fetch permissinos scoped to service accounts: %w", err) } diff --git a/pkg/services/sqlstore/migrations/accesscontrol/seed_assignment.go b/pkg/services/sqlstore/migrations/accesscontrol/seed_assignment.go index 4279dc01e6ef2..fb4ea666a880a 100644 --- a/pkg/services/sqlstore/migrations/accesscontrol/seed_assignment.go +++ b/pkg/services/sqlstore/migrations/accesscontrol/seed_assignment.go @@ -34,9 +34,11 @@ func AddSeedAssignmentMigrations(mg *migrator.Migrator) { Postgres("ALTER TABLE `seed_assignment` ALTER COLUMN role_name DROP NOT NULL;"). Mysql("ALTER TABLE seed_assignment MODIFY role_name VARCHAR(190) DEFAULT NULL;")) - mg.AddMigration("add unique index builtin_role_name back", - migrator.NewAddIndexMigration(seedAssignmentTable, - &migrator.Index{Cols: []string{"builtin_role", "role_name"}, Type: migrator.UniqueIndex})) + if mg.DBEngine.DriverName() != migrator.YDB { + mg.AddMigration("add unique index builtin_role_name back", + migrator.NewAddIndexMigration(seedAssignmentTable, + &migrator.Index{Cols: []string{"builtin_role", "role_name"}, Type: migrator.UniqueIndex})) + } mg.AddMigration("add unique index builtin_role_action_scope", migrator.NewAddIndexMigration(seedAssignmentTable, @@ -72,11 +74,11 @@ func (m *seedAssignmentPrimaryKeyMigrator) Exec(sess *xorm.Session, mig *migrato return err } - // sqlite does not allow to add constraint after a table is created + // sqlite and YDB do not allow to add constraint after a table is created // We need to create a new table with desired columns, move data to new table, delete old table and rename new table to old // create temp table - _, err := sess.Exec(` + q := ` CREATE TABLE seed_assignment_temp ( id INTEGER PRIMARY KEY AUTOINCREMENT, builtin_role TEXT, @@ -84,25 +86,54 @@ func (m *seedAssignmentPrimaryKeyMigrator) Exec(sess *xorm.Session, mig *migrato scope TEXT, role_name TEXT ); - `) + ` + + if driver == migrator.YDB { + q = ` + CREATE TABLE IF NOT EXISTS seed_assignment_temp ( + id SERIAL, + builtin_role TEXT, + action TEXT, + scope TEXT, + role_name TEXT, + PRIMARY KEY (id) + ); + ` + } + + _, err := sess.Exec(q) if err != nil { return err } + q = "INSERT INTO seed_assignment_temp (builtin_role, action, scope, role_name) SELECT * FROM seed_assignment;" + + if driver == migrator.YDB { + q = `INSERT INTO seed_assignment_temp SELECT * FROM seed_assignment` + } // copy data to temp table - _, err = sess.Exec("INSERT INTO seed_assignment_temp (builtin_role, action, scope, role_name) SELECT * FROM seed_assignment;") + _, err = sess.Exec(q) if err != nil { return err } + q = "DROP INDEX UQE_seed_assignment_builtin_role_action_scope;" + + if driver == migrator.YDB { + q = `ALTER TABLE seed_assignment DROP INDEX UQE_seed_assignment_builtin_role_action_scope` + } + // drop indices on old table - _, err = sess.Exec("DROP INDEX UQE_seed_assignment_builtin_role_action_scope;") + _, err = sess.Exec(q) if err != nil { return err } - _, err = sess.Exec("DROP INDEX UQE_seed_assignment_builtin_role_role_name;") - if err != nil { - return err + + if driver != migrator.YDB { + _, err = sess.Exec("DROP INDEX UQE_seed_assignment_builtin_role_role_name;") + if err != nil { + return err + } } // drop old table @@ -118,14 +149,15 @@ func (m *seedAssignmentPrimaryKeyMigrator) Exec(sess *xorm.Session, mig *migrato } // recreate indexes on new table - _, err = sess.Exec("CREATE UNIQUE INDEX UQE_seed_assignment_builtin_role_action_scope ON seed_assignment (builtin_role, action, scope);") - if err != nil { - return err - } - _, err = sess.Exec("CREATE UNIQUE INDEX UQE_seed_assignment_builtin_role_role_name ON seed_assignment (builtin_role, role_name);") - if err != nil { - return err - } + // TODO: return + // _, err = sess.Exec("CREATE UNIQUE INDEX UQE_seed_assignment_builtin_role_action_scope ON seed_assignment (builtin_role, action, scope);") + // if err != nil { + // return err + // } + // _, err = sess.Exec("CREATE UNIQUE INDEX UQE_seed_assignment_builtin_role_role_name ON seed_assignment (builtin_role, role_name);") + // if err != nil { + // return err + // } return nil } diff --git a/pkg/services/sqlstore/migrations/alert_mig.go b/pkg/services/sqlstore/migrations/alert_mig.go index 49159bbf12e4c..acf04eb994152 100644 --- a/pkg/services/sqlstore/migrations/alert_mig.go +++ b/pkg/services/sqlstore/migrations/alert_mig.go @@ -47,7 +47,7 @@ func addAlertMigrations(mg *Migrator) { alertRuleTagTable := Table{ Name: "alert_rule_tag", Columns: []*Column{ - {Name: "alert_id", Type: DB_BigInt, Nullable: false}, + {Name: "alert_id", Type: DB_BigInt, Nullable: false, IsPrimaryKey: true}, {Name: "tag_id", Type: DB_BigInt, Nullable: false}, }, Indices: []*Index{ diff --git a/pkg/services/sqlstore/migrations/annotation_mig.go b/pkg/services/sqlstore/migrations/annotation_mig.go index 896ef21aa8d56..90a7b855fa1c6 100644 --- a/pkg/services/sqlstore/migrations/annotation_mig.go +++ b/pkg/services/sqlstore/migrations/annotation_mig.go @@ -74,7 +74,7 @@ func addAnnotationMig(mg *Migrator) { annotationTagTable := Table{ Name: "annotation_tag", Columns: []*Column{ - {Name: "annotation_id", Type: DB_BigInt, Nullable: false}, + {Name: "annotation_id", Type: DB_BigInt, Nullable: false, IsPrimaryKey: true}, {Name: "tag_id", Type: DB_BigInt, Nullable: false}, }, Indices: []*Index{ @@ -118,7 +118,7 @@ func addAnnotationMig(mg *Migrator) { // // clear alert text // - updateTextFieldSQL := "UPDATE annotation SET TEXT = '' WHERE alert_id > 0" + updateTextFieldSQL := "UPDATE annotation SET text = '' WHERE alert_id > 0" mg.AddMigration("Update alert annotations and set TEXT to empty", NewRawSQLMigration(updateTextFieldSQL)) // @@ -263,6 +263,8 @@ func RunDashboardUIDMigrations(sess *xorm.Session, driverName string) error { LEFT JOIN dashboard ON annotation.dashboard_id = dashboard.id SET annotation.dashboard_uid = dashboard.uid WHERE annotation.dashboard_uid IS NULL and annotation.dashboard_id != 0;` + case YDB: + return nil } if _, err := sess.Exec(sql); err != nil { return fmt.Errorf("failed to set dashboard_uid for annotation: %w", err) diff --git a/pkg/services/sqlstore/migrations/cache_data_mig.go b/pkg/services/sqlstore/migrations/cache_data_mig.go index 3467b88962b37..559dbffa8065a 100644 --- a/pkg/services/sqlstore/migrations/cache_data_mig.go +++ b/pkg/services/sqlstore/migrations/cache_data_mig.go @@ -11,12 +11,7 @@ func addCacheMigration(mg *migrator.Migrator) { {Name: "expires", Type: migrator.DB_Integer, Length: 255, Nullable: false}, {Name: "created_at", Type: migrator.DB_Integer, Length: 255, Nullable: false}, }, - Indices: []*migrator.Index{ - {Cols: []string{"cache_key"}, Type: migrator.UniqueIndex}, - }, } mg.AddMigration("create cache_data table", migrator.NewAddTableMigration(cacheDataV1)) - - mg.AddMigration("add unique index cache_data.cache_key", migrator.NewAddIndexMigration(cacheDataV1, cacheDataV1.Indices[0])) } diff --git a/pkg/services/sqlstore/migrations/dashboard_acl.go b/pkg/services/sqlstore/migrations/dashboard_acl.go index ec9adf1e63841..cc8f511b8f954 100644 --- a/pkg/services/sqlstore/migrations/dashboard_acl.go +++ b/pkg/services/sqlstore/migrations/dashboard_acl.go @@ -49,8 +49,8 @@ INSERT INTO dashboard_acl updated ) VALUES - (-1,-1, 1,'Viewer','2017-06-20','2017-06-20'), - (-1,-1, 2,'Editor','2017-06-20','2017-06-20') + (-1,-1, 1,'Viewer',Date('2017-06-20'),Date('2017-06-20')), + (-1,-1, 2,'Editor',Date('2017-06-20'),Date('2017-06-20')) ` mg.AddMigration("save default acl rules in dashboard_acl table", NewRawSQLMigration(rawSQL)) diff --git a/pkg/services/sqlstore/migrations/dashboard_mig.go b/pkg/services/sqlstore/migrations/dashboard_mig.go index ff3494713abe0..262f1875e630b 100644 --- a/pkg/services/sqlstore/migrations/dashboard_mig.go +++ b/pkg/services/sqlstore/migrations/dashboard_mig.go @@ -303,6 +303,8 @@ func RunDashboardTagMigrations(sess *xorm.Session, driverName string) error { SET dashboard_tag.dashboard_uid = dashboard.uid, dashboard_tag.org_id = dashboard.org_id WHERE dashboard_tag.dashboard_uid IS NULL OR dashboard_tag.org_id IS NULL;` + case YDB: + return nil // TODO: } if _, err := sess.Exec(sql); err != nil { diff --git a/pkg/services/sqlstore/migrations/dashboard_public_mig.go b/pkg/services/sqlstore/migrations/dashboard_public_mig.go index 6af9424e78a64..a8a74e9362761 100644 --- a/pkg/services/sqlstore/migrations/dashboard_public_mig.go +++ b/pkg/services/sqlstore/migrations/dashboard_public_mig.go @@ -52,16 +52,16 @@ func addPublicDashboardMigration(mg *Migrator) { mg.AddMigration("create dashboard public config v1", NewAddTableMigration(dashboardPublicCfgV1)) // recreate table - no dependencies and was created with incorrect pkey type - addDropAllIndicesMigrations(mg, "v1", dashboardPublicCfgV1) + //addDropAllIndicesMigrations(mg, "v1", dashboardPublicCfgV1) mg.AddMigration("Drop old dashboard public config table", NewDropTableMigration("dashboard_public_config")) mg.AddMigration("recreate dashboard public config v1", NewAddTableMigration(dashboardPublicCfgV1)) - addTableIndicesMigrations(mg, "v1", dashboardPublicCfgV1) + //addTableIndicesMigrations(mg, "v1", dashboardPublicCfgV1) // recreate table - schema finalized for public dashboards v1 - addDropAllIndicesMigrations(mg, "v2", dashboardPublicCfgV1) + //addDropAllIndicesMigrations(mg, "v2", dashboardPublicCfgV1) mg.AddMigration("Drop public config table", NewDropTableMigration("dashboard_public_config")) mg.AddMigration("Recreate dashboard public config v2", NewAddTableMigration(dashboardPublicCfgV2)) - addTableIndicesMigrations(mg, "v2", dashboardPublicCfgV2) + //addTableIndicesMigrations(mg, "v2", dashboardPublicCfgV2) // rename table addTableRenameMigration(mg, "dashboard_public_config", "dashboard_public", "v2") diff --git a/pkg/services/sqlstore/migrations/dashboard_version_mig.go b/pkg/services/sqlstore/migrations/dashboard_version_mig.go index 1c9d997e2d006..6af9ef804f3de 100644 --- a/pkg/services/sqlstore/migrations/dashboard_version_mig.go +++ b/pkg/services/sqlstore/migrations/dashboard_version_mig.go @@ -44,11 +44,11 @@ func addDashboardVersionMigration(mg *Migrator) { SELECT dashboard.id, dashboard.version, - dashboard.version, - dashboard.version, + dashboard.version as parent_version, + dashboard.version as restored_from, dashboard.updated, COALESCE(dashboard.updated_by, -1), - '', + ''u, dashboard.data FROM dashboard;` mg.AddMigration("save existing dashboard data in dashboard_version table v1", NewRawSQLMigration(rawSQL)) diff --git a/pkg/services/sqlstore/migrations/datasource_mig.go b/pkg/services/sqlstore/migrations/datasource_mig.go index 3e6f5907c86d9..4d41eb96366ee 100644 --- a/pkg/services/sqlstore/migrations/datasource_mig.go +++ b/pkg/services/sqlstore/migrations/datasource_mig.go @@ -146,8 +146,4 @@ func addDataSourceMigration(mg *Migrator) { mg.AddMigration("Update secure_json_data column to MediumText", NewRawSQLMigration(""). Mysql("ALTER TABLE data_source MODIFY COLUMN secure_json_data MEDIUMTEXT;"), ) - - mg.AddMigration("Update json_data column to MediumText", NewRawSQLMigration(""). - Mysql("ALTER TABLE data_source MODIFY COLUMN json_data MEDIUMTEXT;"), - ) } diff --git a/pkg/services/sqlstore/migrations/db_file_storage.go b/pkg/services/sqlstore/migrations/db_file_storage.go index fcc49ae0d5700..a2efd6708319f 100644 --- a/pkg/services/sqlstore/migrations/db_file_storage.go +++ b/pkg/services/sqlstore/migrations/db_file_storage.go @@ -6,7 +6,7 @@ func addDbFileStorageMigration(mg *migrator.Migrator) { filesTable := migrator.Table{ Name: "file", Columns: []*migrator.Column{ - {Name: "path", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false}, + {Name: "path", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false, IsPrimaryKey: true}, // path_hash is used for indexing. we are using it to circumvent the max length limit of 191 for varchar2 fields in MySQL 5.6 {Name: "path_hash", Type: migrator.DB_NVarchar, Length: 64, Nullable: false}, @@ -48,7 +48,7 @@ func addDbFileStorageMigration(mg *migrator.Migrator) { {Name: "path_hash", Type: migrator.DB_NVarchar, Length: 64, Nullable: false}, // 191 is the maximum length of indexable VARCHAR fields in MySQL 5.6 <= with utf8mb4 encoding - {Name: "key", Type: migrator.DB_NVarchar, Length: 191, Nullable: false}, + {Name: "key", Type: migrator.DB_NVarchar, Length: 191, Nullable: false, IsPrimaryKey: true}, {Name: "value", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false}, }, Indices: []*migrator.Index{ diff --git a/pkg/services/sqlstore/migrations/folder_mig.go b/pkg/services/sqlstore/migrations/folder_mig.go index 93cdede517639..e9441e10b5d93 100644 --- a/pkg/services/sqlstore/migrations/folder_mig.go +++ b/pkg/services/sqlstore/migrations/folder_mig.go @@ -49,48 +49,19 @@ func addFolderMigrations(mg *migrator.Migrator) { INSERT INTO folder (uid, org_id, title, created, updated) SELECT uid, org_id, title, created, updated FROM dashboard WHERE is_folder = 1 ON CONFLICT DO UPDATE SET title=excluded.title, updated=excluded.updated + `).YDB(` + UPSERT INTO folder (uid, org_id, title, created, updated) + SELECT uid, org_id, title, created, updated FROM dashboard WHERE is_folder `)) mg.AddMigration("Remove ghost folders from the folder table", migrator.NewRawSQLMigration(` DELETE FROM folder WHERE NOT EXISTS (SELECT 1 FROM dashboard WHERE dashboard.uid = folder.uid AND dashboard.org_id = folder.org_id AND dashboard.is_folder = true) - `)) - - mg.AddMigration("Remove unique index UQE_folder_uid_org_id", migrator.NewDropIndexMigration(folderv1(), &migrator.Index{ - Type: migrator.UniqueIndex, - Cols: []string{"uid", "org_id"}, - })) - - mg.AddMigration("Add unique index UQE_folder_org_id_uid", migrator.NewAddIndexMigration(folderv1(), &migrator.Index{ - Type: migrator.UniqueIndex, - Cols: []string{"org_id", "uid"}, - })) - - mg.AddMigration("Remove unique index UQE_folder_title_parent_uid_org_id", migrator.NewDropIndexMigration(folderv1(), &migrator.Index{ - Type: migrator.UniqueIndex, - Cols: []string{"title", "parent_uid", "org_id"}, - })) - - mg.AddMigration("Add unique index UQE_folder_org_id_parent_uid_title", migrator.NewAddIndexMigration(folderv1(), &migrator.Index{ - Type: migrator.UniqueIndex, - Cols: []string{"org_id", "parent_uid", "title"}, - })) - - // No need to introduce IDX_folder_org_id_parent_uid because is covered by UQE_folder_org_id_parent_uid_title - mg.AddMigration("Remove index IDX_folder_parent_uid_org_id", migrator.NewDropIndexMigration(folderv1(), &migrator.Index{ - Cols: []string{"parent_uid", "org_id"}, - })) - - // Remove the unique name constraint - mg.AddMigration("Remove unique index UQE_folder_org_id_parent_uid_title", migrator.NewDropIndexMigration(folderv1(), &migrator.Index{ - Type: migrator.UniqueIndex, - Cols: []string{"org_id", "parent_uid", "title"}, - })) - - mg.AddMigration("Add index IDX_folder_org_id_parent_uid", migrator.NewAddIndexMigration(folderv1(), &migrator.Index{ - Name: "IDX_folder_org_id_parent_uid", - Cols: []string{"org_id", "parent_uid"}, - })) + `).YDB(` + DELETE FROM folder ON + SELECT folder.id AS id FROM folder JOIN dashboard ON dashboard.uid = folder.uid AND dashboard.org_id = folder.org_id + WHERE dashboard.is_folder + `)) // TODO: YDB case should be non empty } func folderv1() migrator.Table { @@ -99,7 +70,7 @@ func folderv1() migrator.Table { Name: "folder", Columns: []*migrator.Column{ {Name: "id", Type: migrator.DB_BigInt, IsPrimaryKey: true, IsAutoIncrement: true}, - {Name: "uid", Type: migrator.DB_NVarchar, Length: 40}, + {Name: "uid", Type: migrator.DB_NVarchar, Length: 40, Nullable: true}, {Name: "org_id", Type: migrator.DB_BigInt, Nullable: false}, {Name: "title", Type: migrator.DB_NVarchar, Length: 255, Nullable: false}, {Name: "description", Type: migrator.DB_NVarchar, Length: 255, Nullable: true}, diff --git a/pkg/services/sqlstore/migrations/libraryelements.go b/pkg/services/sqlstore/migrations/libraryelements.go index 903411997ac12..fe3833de028fb 100644 --- a/pkg/services/sqlstore/migrations/libraryelements.go +++ b/pkg/services/sqlstore/migrations/libraryelements.go @@ -76,6 +76,14 @@ func addLibraryElementsMigrations(mg *migrator.Migrator) { )` } + if mg.Dialect.DriverName() == migrator.YDB { + q = `UPDATE library_element ON + SELECT library_element.id AS id, dashboard.uid AS folder_uid + FROM library_element + JOIN dashboard + ON dashboard.id = library_element.folder_id AND library_element.org_id = dashboard.org_id` + } + mg.AddMigration("add library_element folder uid", migrator.NewAddColumnMigration(libraryElementsV1, &migrator.Column{ Name: "folder_uid", Type: migrator.DB_NVarchar, Length: 40, Nullable: true, })) diff --git a/pkg/services/sqlstore/migrations/preferences_mig.go b/pkg/services/sqlstore/migrations/preferences_mig.go index 148d9bb4196f1..e4a59368e2a11 100644 --- a/pkg/services/sqlstore/migrations/preferences_mig.go +++ b/pkg/services/sqlstore/migrations/preferences_mig.go @@ -98,6 +98,8 @@ func RunPreferencesMigration(sess *xorm.Session, driverName string) error { LEFT JOIN dashboard ON preferences.home_dashboard_id = dashboard.id SET preferences.home_dashboard_uid = dashboard.uid WHERE preferences.home_dashboard_uid IS NULL;` + case YDB: + return nil } if _, err := sess.Exec(sql); err != nil { diff --git a/pkg/services/sqlstore/migrations/secrets_mig.go b/pkg/services/sqlstore/migrations/secrets_mig.go index abd4e5aa34881..b191e803133b7 100644 --- a/pkg/services/sqlstore/migrations/secrets_mig.go +++ b/pkg/services/sqlstore/migrations/secrets_mig.go @@ -1,8 +1,6 @@ package migrations import ( - "fmt" - "github.com/grafana/grafana/pkg/services/sqlstore/migrator" ) @@ -17,6 +15,7 @@ func addSecretsMigration(mg *migrator.Migrator) { {Name: "encrypted_data", Type: migrator.DB_Blob, Nullable: false}, {Name: "created", Type: migrator.DB_DateTime, Nullable: false}, {Name: "updated", Type: migrator.DB_DateTime, Nullable: false}, + {Name: "label", Type: migrator.DB_NVarchar, Length: 100, Default: "''", Nullable: false}, }, Indices: []*migrator.Index{}, } @@ -42,35 +41,4 @@ func addSecretsMigration(mg *migrator.Migrator) { } mg.AddMigration("create secrets table", migrator.NewAddTableMigration(secretsV1)) - - mg.AddMigration("rename data_keys name column to id", migrator.NewRenameColumnMigration( - dataKeysV1, dataKeysV1.Columns[0], "id", - )) - - mg.AddMigration("add name column into data_keys", migrator.NewAddColumnMigration( - dataKeysV1, - &migrator.Column{ - Name: "name", - Type: migrator.DB_NVarchar, - Length: 100, - Default: "''", - Nullable: false, - }, - )) - - mg.AddMigration("copy data_keys id column values into name", migrator.NewRawSQLMigration( - fmt.Sprintf("UPDATE %s SET %s = %s", dataKeysV1.Name, "name", "id"), - )) - // ------- This is done for backward compatibility with versions > v8.3.x - mg.AddMigration("rename data_keys name column to label", migrator.NewRenameColumnMigration( - dataKeysV1, dataKeysV1.Columns[0], "label", - )) - - mg.AddMigration("rename data_keys id column back to name", migrator.NewRenameColumnMigration( - dataKeysV1, - &migrator.Column{Name: "id", Type: migrator.DB_NVarchar, Length: 100, IsPrimaryKey: true}, - "name", - )) - - // -------------------- } diff --git a/pkg/services/sqlstore/migrations/star_mig.go b/pkg/services/sqlstore/migrations/star_mig.go index 59ea8ed09cb81..25e34db3b5fe5 100644 --- a/pkg/services/sqlstore/migrations/star_mig.go +++ b/pkg/services/sqlstore/migrations/star_mig.go @@ -37,9 +37,6 @@ func addStarMigrations(mg *Migrator) { Cols: []string{"user_id", "dashboard_uid", "org_id"}, Type: UniqueIndex, })) - - // NOTE: in Grafana 12.2 the dashboard_id is no longer used - // However, we will keep the column + index so that rollback is still possible } // relies on the dashboard table existing & must be run after the dashboard migrations are run @@ -85,6 +82,8 @@ func RunStarMigrations(sess *xorm.Session, driverName string) error { star.org_id = dashboard.org_id, star.updated = NOW() WHERE star.dashboard_uid IS NULL OR star.org_id IS NULL;` + case YDB: + return nil // TODO: } if _, err := sess.Exec(sql); err != nil { diff --git a/pkg/services/sqlstore/migrations/temp_user.go b/pkg/services/sqlstore/migrations/temp_user.go index c0fc5bddf4485..4662514999708 100644 --- a/pkg/services/sqlstore/migrations/temp_user.go +++ b/pkg/services/sqlstore/migrations/temp_user.go @@ -105,9 +105,9 @@ func (m *SetCreatedForOutstandingInvites) SQL(dialect Dialect) string { } func (m *SetCreatedForOutstandingInvites) Exec(sess *xorm.Session, mg *Migrator) error { - created := time.Now().Unix() + created := int32(time.Now().Unix()) if _, err := sess.Exec("UPDATE "+mg.Dialect.Quote("temp_user")+ - " SET created = ?, updated = ? WHERE created = '0' AND status in ('SignUpStarted', 'InvitePending')", created, created); err != nil { + " SET created = ?, updated = ? WHERE created = 0 AND status in ('SignUpStarted', 'InvitePending')", created, created); err != nil { return err } return nil diff --git a/pkg/services/sqlstore/migrations/ualert/alert_rule_version_guid_mig.go b/pkg/services/sqlstore/migrations/ualert/alert_rule_version_guid_mig.go index 727035e728d64..65882a78b63ad 100644 --- a/pkg/services/sqlstore/migrations/ualert/alert_rule_version_guid_mig.go +++ b/pkg/services/sqlstore/migrations/ualert/alert_rule_version_guid_mig.go @@ -67,6 +67,9 @@ func (c setRuleGuidMigration) SQL(migrator.Dialect) string { } func (c setRuleGuidMigration) Exec(sess *xorm.Session, mg *migrator.Migrator) error { + if mg.DBEngine.DriverName() == migrator.YDB { + return nil + } var lastId *int64 for { var results []int64 diff --git a/pkg/services/sqlstore/migrations/ualert/tables.go b/pkg/services/sqlstore/migrations/ualert/tables.go index d6d98ebfec94c..9d193d3e4e597 100644 --- a/pkg/services/sqlstore/migrations/ualert/tables.go +++ b/pkg/services/sqlstore/migrations/ualert/tables.go @@ -172,10 +172,12 @@ func alertInstanceMigration(mg *migrator.Migrator) { mg.AddMigration("rename def_org_id to rule_org_id in alert_instance", migrator.NewRawSQLMigration(""). Default("ALTER TABLE alert_instance RENAME COLUMN def_org_id TO rule_org_id;"). + YDB("ALTER TABLE alert_instance ADD COLUMN rule_org_id BIGINT;"). Mysql("ALTER TABLE alert_instance CHANGE def_org_id rule_org_id BIGINT;")) mg.AddMigration("rename def_uid to rule_uid in alert_instance", migrator.NewRawSQLMigration(""). Default("ALTER TABLE alert_instance RENAME COLUMN def_uid TO rule_uid;"). + YDB("ALTER TABLE alert_instance ADD COLUMN rule_uid String;"). Mysql("ALTER TABLE alert_instance CHANGE def_uid rule_uid VARCHAR(40);")) mg.AddMigration("add index rule_org_id, rule_uid, current_state on alert_instance", migrator.NewAddIndexMigration(alertInstance, &migrator.Index{ @@ -509,7 +511,6 @@ func extractAlertmanagerConfigurationHistoryMigration(mg *migrator.Migrator) { // Since it's not always consistent as to what state the org ID indexes are in, just drop them all and rebuild from scratch. // This is not expensive since this table is guaranteed to have a small number of rows. mg.AddMigration("drop non-unique orgID index on alert_configuration", migrator.NewDropIndexMigration(migrator.Table{Name: "alert_configuration"}, &migrator.Index{Cols: []string{"org_id"}})) - mg.AddMigration("drop unique orgID index on alert_configuration if exists", migrator.NewDropIndexMigration(migrator.Table{Name: "alert_configuration"}, &migrator.Index{Type: migrator.UniqueIndex, Cols: []string{"org_id"}})) mg.AddMigration("extract alertmanager configuration history to separate table", &extractAlertmanagerConfigurationHistory{}) mg.AddMigration("add unique index on orgID to alert_configuration", migrator.NewAddIndexMigration(migrator.Table{Name: "alert_configuration"}, &migrator.Index{Type: migrator.UniqueIndex, Cols: []string{"org_id"}})) } diff --git a/pkg/services/sqlstore/migrations/user_auth_token_mig.go b/pkg/services/sqlstore/migrations/user_auth_token_mig.go index 2be726947ca37..1b0282f03a2c9 100644 --- a/pkg/services/sqlstore/migrations/user_auth_token_mig.go +++ b/pkg/services/sqlstore/migrations/user_auth_token_mig.go @@ -15,10 +15,10 @@ func addUserAuthTokenMigrations(mg *Migrator) { {Name: "user_agent", Type: DB_NVarchar, Length: 255, Nullable: false}, {Name: "client_ip", Type: DB_NVarchar, Length: 255, Nullable: false}, {Name: "auth_token_seen", Type: DB_Bool, Nullable: false}, - {Name: "seen_at", Type: DB_Int, Nullable: true}, - {Name: "rotated_at", Type: DB_Int, Nullable: false}, - {Name: "created_at", Type: DB_Int, Nullable: false}, - {Name: "updated_at", Type: DB_Int, Nullable: false}, + {Name: "seen_at", Type: DB_BigInt, Nullable: true}, + {Name: "rotated_at", Type: DB_BigInt, Nullable: false}, + {Name: "created_at", Type: DB_BigInt, Nullable: false}, + {Name: "updated_at", Type: DB_BigInt, Nullable: false}, }, Indices: []*Index{ {Cols: []string{"auth_token"}, Type: UniqueIndex}, @@ -39,7 +39,7 @@ func addUserAuthTokenMigrations(mg *Migrator) { userAuthTokenV1, &Column{ Name: "revoked_at", - Type: DB_Int, + Type: DB_BigInt, Nullable: true, }, ), diff --git a/pkg/services/sqlstore/migrations/usermig/service_account_multiple_org_login_migrator.go b/pkg/services/sqlstore/migrations/usermig/service_account_multiple_org_login_migrator.go index a690c6d459dc2..6648a48b1366e 100644 --- a/pkg/services/sqlstore/migrations/usermig/service_account_multiple_org_login_migrator.go +++ b/pkg/services/sqlstore/migrations/usermig/service_account_multiple_org_login_migrator.go @@ -72,6 +72,8 @@ func (p *ServiceAccountsSameLoginCrossOrgs) Exec(sess *xorm.Session, mg *migrato AND is_service_account = 1 AND login NOT LIKE 'sa-' || CAST(org_id AS TEXT) || '-%'; `) + case migrator.YDB: + return nil default: return fmt.Errorf("dialect not supported: %s", p.dialect) @@ -129,6 +131,8 @@ func (p *ServiceAccountsDeduplicateOrgInLogin) Exec(sess *xorm.Session, mg *migr WHERE u2.login = 'sa-' || CAST(u.org_id AS TEXT) || SUBSTRING(u.login, LENGTH('sa-'||CAST(u.org_id AS TEXT)||'-'||CAST(u.org_id AS TEXT))+1) );; `) + case migrator.YDB: + return nil default: return fmt.Errorf("dialect not supported: %s", dialect) } diff --git a/pkg/services/sqlstore/migrator/dialect.go b/pkg/services/sqlstore/migrator/dialect.go index bec421e36e25d..4331b6cabd857 100644 --- a/pkg/services/sqlstore/migrator/dialect.go +++ b/pkg/services/sqlstore/migrator/dialect.go @@ -3,10 +3,11 @@ package migrator import ( "context" "fmt" - "slices" "strconv" "strings" + "golang.org/x/exp/slices" + "github.com/grafana/grafana/pkg/services/sqlstore/session" "github.com/grafana/grafana/pkg/util/xorm" ) @@ -122,6 +123,7 @@ var supportedDialects = map[string]dialectFunc{ MySQL + "WithHooks": NewMysqlDialect, SQLite + "WithHooks": NewSQLite3Dialect, Postgres + "WithHooks": NewPostgresDialect, + YDB: NewYDBDialect, } func NewDialect(driverName string) Dialect { diff --git a/pkg/services/sqlstore/migrator/migrations.go b/pkg/services/sqlstore/migrator/migrations.go index b785fd867d543..6f3593e0c1f51 100644 --- a/pkg/services/sqlstore/migrator/migrations.go +++ b/pkg/services/sqlstore/migrator/migrations.go @@ -85,6 +85,10 @@ func (m *RawSQLMigration) Mssql(sql string) *RawSQLMigration { return m.Set(MSSQL, sql) } +func (m *RawSQLMigration) YDB(sql string) *RawSQLMigration { + return m.Set(YDB, sql) +} + type AddColumnMigration struct { MigrationBase tableName string diff --git a/pkg/services/sqlstore/migrator/types.go b/pkg/services/sqlstore/migrator/types.go index 15be7996dcc9f..c72bc14c36fce 100644 --- a/pkg/services/sqlstore/migrator/types.go +++ b/pkg/services/sqlstore/migrator/types.go @@ -12,6 +12,7 @@ const ( SQLite = "sqlite3" MySQL = "mysql" MSSQL = "mssql" + YDB = "ydb" ) type Migration interface { diff --git a/pkg/services/sqlstore/migrator/ydb_dialect.go b/pkg/services/sqlstore/migrator/ydb_dialect.go new file mode 100644 index 0000000000000..4522cd86bae19 --- /dev/null +++ b/pkg/services/sqlstore/migrator/ydb_dialect.go @@ -0,0 +1,418 @@ +package migrator + +import ( + "errors" + "fmt" + "net/url" + "strconv" + "strings" + + "github.com/lib/pq" + + "github.com/grafana/grafana/pkg/util/xorm" + "github.com/grafana/grafana/pkg/util/xorm/core" +) + +type YDBDialect struct { + BaseDialect +} + +func NewYDBDialect() Dialect { + d := YDBDialect{} + d.dialect = &d + d.driverName = YDB + return &d +} + +func (db *YDBDialect) IndexCheckSQL(tableName, indexName string) (string, []any) { + return "SELECT Path FROM `.sys/partition_stats` where Path LIKE '%/'" + + " || $1 || '/' || $2 || '/indexImplTable'", []any{tableName, indexName} +} + +func (db *YDBDialect) SupportEngine() bool { + return false +} + +func (db *YDBDialect) Quote(name string) string { + return "`" + name + "`" +} + +func (db *YDBDialect) Concat(strs ...string) string { + return strings.Join(strs, " || ") +} + +func (db *YDBDialect) LikeOperator(column string, wildcardBefore bool, pattern string, wildcardAfter bool) (string, string) { + param := pattern + if wildcardBefore { + param = "%" + param + } + if wildcardAfter { + param = param + "%" + } + return fmt.Sprintf("%s ILIKE ?", column), param +} + +func (db *YDBDialect) AutoIncrStr() string { + return "" +} + +func (db *YDBDialect) BooleanValue(value bool) any { + return value +} + +func (db *YDBDialect) BooleanStr(value bool) string { + return strconv.FormatBool(value) +} + +func (db *YDBDialect) BatchSize() int { + return 1000 +} + +func (db *YDBDialect) SQLType(c *Column) string { + xormDialect := core.QueryDialect(core.YDB) + column := &core.Column{ + SQLType: core.SQLType{ + Name: c.Type, + DefaultLength: c.Length, + DefaultLength2: c.Length2, + }, + IsAutoIncrement: c.IsAutoIncrement, + } + + return xormDialect.SqlType(column) +} + +func (b *YDBDialect) AddColumnSQL(tableName string, col *Column) string { + col.Nullable = true // Cannot add not null column without default value + col.Default = "" // Column addition with default value is not supported now + + return b.BaseDialect.AddColumnSQL(tableName, col) +} + +func (b *YDBDialect) RenameColumn(table Table, column *Column, newName string) string { + oldName := column.Name + column.Name = newName + sql := b.AddColumnSQL(table.Name, column) + ";" + if !column.IsPrimaryKey { + column.Name = oldName + sql += b.DropColumn(table, column) + } + + return sql +} + +// TODO: +func (b *YDBDialect) ColumnCheckSQL(tableName, columnName string) (string, []any) { + return "", nil +} + +func (b *YDBDialect) DropColumn(table Table, column *Column) string { + return fmt.Sprintf("alter table %s DROP COLUMN %s", b.dialect.Quote(table.Name), b.dialect.Quote(column.Name)) +} + +func (db *YDBDialect) DropIndexSQL(tableName string, index *Index) string { + return fmt.Sprintf("alter table %s DROP INDEX %s", db.dialect.Quote(tableName), db.dialect.Quote(index.XName(tableName))) +} + +func (db *YDBDialect) UpdateTableSQL(tableName string, columns []*Column) string { + return "" + var statements = []string{} + + for _, col := range columns { + statements = append(statements, "ALTER "+db.Quote(col.Name)+" TYPE "+db.SQLType(col)) + } + + return "ALTER TABLE " + db.Quote(tableName) + " " + strings.Join(statements, ", ") + ";" +} + +func (db *YDBDialect) CleanDB(engine *xorm.Engine) error { + sess := engine.NewSession() + defer sess.Close() + + if _, err := sess.Exec("DROP SCHEMA public CASCADE;"); err != nil { + return fmt.Errorf("%v: %w", "failed to drop schema public", err) + } + + if _, err := sess.Exec("CREATE SCHEMA public;"); err != nil { + return fmt.Errorf("%v: %w", "failed to create schema public", err) + } + + return nil +} + +func (b *YDBDialect) Default(col *Column) string { + if col.Type == DB_Bool { + // Ensure that all dialects support the same literals in the same way. + bl, err := strconv.ParseBool(col.Default) + if err != nil { + panic(fmt.Errorf("failed to create default value for column '%s': invalid boolean default value '%s'", col.Name, col.Default)) + } + return b.dialect.BooleanStr(bl) + } + + if col.Type == DB_NVarchar { + return `"` + col.Default + `"` + } + + return col.Default +} + +func (b *YDBDialect) ColStringNoPk(col *Column) string { + sql := b.dialect.Quote(col.Name) + " " + + sql += b.dialect.SQLType(col) + " NULL " // TODO: remove always NULL when done with add not null columns + + if col.Default != "" { + sql += "DEFAULT " + b.dialect.Default(col) + " " + } + + return sql +} + +func (b *YDBDialect) CreateTableSQL(table *Table) string { + sql := "CREATE TABLE IF NOT EXISTS " + sql += b.dialect.Quote(table.Name) + " (\n" + + pkList := table.PrimaryKeys + + for _, col := range table.Columns { + if len(pkList) == 0 && !col.Nullable { + pkList = []string{col.Name} + } + + sql += col.StringNoPk(b.dialect) + sql = strings.TrimSpace(sql) + sql += "\n, " + } + + quotedCols := []string{} + for _, col := range pkList { + quotedCols = append(quotedCols, b.dialect.Quote(col)) + } + + sql += "PRIMARY KEY ( " + strings.Join(quotedCols, ",") + " ), " + + sql = sql[:len(sql)-2] + ")" + + sql += ";" + return sql +} + +// TruncateDBTables truncates all the tables. +// A special case is the dashboard_acl table where we keep the default permissions. +func (db *YDBDialect) TruncateDBTables(engine *xorm.Engine) error { + tables, err := engine.Dialect().GetTables() + if err != nil { + return err + } + sess := engine.NewSession() + defer sess.Close() + + dbName, err := db.GetDBName(engine.DataSourceName()) + if err != nil { + return err + } + + for _, table := range tables { + switch table.Name { + case "": + continue + case "migration_log": + continue + case "dashboard_acl": + // keep default dashboard permissions + if _, err := sess.Exec(fmt.Sprintf("DELETE FROM %v WHERE dashboard_id != -1 AND org_id != -1;", db.Quote(table.Name))); err != nil { + return fmt.Errorf("failed to truncate table %q: %w", table.Name, err) + } + if _, err := sess.Exec(fmt.Sprintf("ALTER SEQUENCE %v RESTART WITH 3;", db.Quote(fmt.Sprintf("%s/%v/_serial_column_id", dbName, table.Name)))); err != nil { + return fmt.Errorf("failed to reset table %q: %w", table.Name, err) + } + default: + if _, err := sess.Exec(fmt.Sprintf("DELETE FROM %v;", db.Quote(table.Name))); err != nil { + if db.isUndefinedTable(err) { + continue + } + return fmt.Errorf("failed to truncate table %q: %w", table.Name, err) + } + + _, tableCols, err := engine.Dialect().GetColumns(table.Name) + if err != nil { + return err + } + + for _, column := range tableCols { + if column.IsAutoIncrement { + sequenceName := fmt.Sprintf("%v/%v/_serial_column_%v", dbName, table.Name, column.Name) + if _, err := sess.Exec(fmt.Sprintf("ALTER SEQUENCE %v RESTART;", db.Quote(sequenceName))); err != nil { + return fmt.Errorf("failed to reset sequence %q: %w", sequenceName, err) + } + } + } + } + } + + return nil +} + +func (db *YDBDialect) isThisError(err error, errcode string) bool { + var driverErr *pq.Error + if errors.As(err, &driverErr) { + if string(driverErr.Code) == errcode { + return true + } + } + + return false +} + +func (db *YDBDialect) ErrorMessage(err error) string { + var driverErr *pq.Error + if errors.As(err, &driverErr) { + return driverErr.Message + } + return "" +} + +func (db *YDBDialect) isUndefinedTable(err error) bool { + return db.isThisError(err, "42P01") +} + +func (db *YDBDialect) IsUniqueConstraintViolation(err error) bool { + return db.isThisError(err, "23505") +} + +func (db *YDBDialect) IsDeadlock(err error) bool { + return db.isThisError(err, "40P01") +} + +func (db *YDBDialect) CreateIndexSQL(tableName string, index *Index) string { + indexName := db.Quote(index.XName(tableName)) + tableName = db.Quote(tableName) + + colsIndex := make([]string, len(index.Cols)) + for i := 0; i < len(index.Cols); i++ { + colsIndex[i] = db.Quote(index.Cols[i]) + } + + indexOn := strings.Join(colsIndex, ",") + + var buf strings.Builder + buf.WriteString(fmt.Sprintf("ALTER TABLE %s ADD INDEX %s GLOBAL ON ( %s );", tableName, indexName, indexOn)) + + return buf.String() +} + +// UpsertSQL returns the upsert sql statement for PostgreSQL dialect +func (db *YDBDialect) UpsertSQL(tableName string, keyCols, updateCols []string) string { + str, _ := db.UpsertMultipleSQL(tableName, keyCols, updateCols, 1) + return str +} + +// UpsertMultipleSQL returns the upsert sql statement for PostgreSQL dialect +func (db *YDBDialect) UpsertMultipleSQL(tableName string, keyCols, updateCols []string, count int) (string, error) { + if count < 1 { + return "", fmt.Errorf("upsert statement must have count >= 1. Got %v", count) + } + columnsStr := strings.Builder{} + onConflictStr := strings.Builder{} + setStr := strings.Builder{} + + const separator = ", " + separatorVar := separator + for i, c := range updateCols { + if i == len(updateCols)-1 { + separatorVar = "" + } + + columnsStr.WriteString(fmt.Sprintf("%s%s", db.Quote(c), separatorVar)) + setStr.WriteString(fmt.Sprintf("%s=EXCLUDED.%s%s", db.Quote(c), db.Quote(c), separatorVar)) + } + + separatorVar = separator + for i, c := range keyCols { + if i == len(keyCols)-1 { + separatorVar = "" + } + onConflictStr.WriteString(fmt.Sprintf("%s%s", db.Quote(c), separatorVar)) + } + + valuesStr := strings.Builder{} + separatorVar = separator + nextPlaceHolder := 1 + + for i := 0; i < count; i++ { + if i == count-1 { + separatorVar = "" + } + + colPlaceHoldersStr := strings.Builder{} + placeHolderSep := separator + for j := 1; j <= len(updateCols); j++ { + if j == len(updateCols) { + placeHolderSep = "" + } + placeHolder := fmt.Sprintf("$%v%s", nextPlaceHolder, placeHolderSep) + nextPlaceHolder++ + colPlaceHoldersStr.WriteString(placeHolder) + } + colPlaceHolders := colPlaceHoldersStr.String() + + valuesStr.WriteString(fmt.Sprintf("(%s)%s", colPlaceHolders, separatorVar)) + } + + s := fmt.Sprintf(`UPSERT INTO %s (%s) VALUES %s;`, + tableName, + columnsStr.String(), + valuesStr.String(), + // onConflictStr.String(), + // setStr.String(), + ) + + return s, nil +} + +// func (db *YDBDialect) Lock(cfg LockCfg) error { +// // trying to obtain the lock for a resource identified by a 64-bit or 32-bit key value +// // the lock is exclusive: multiple lock requests stack, so that if the same resource is locked three times +// // it must then be unlocked three times to be released for other sessions' use. +// // it will either obtain the lock immediately and return true, +// // or return false if the lock cannot be acquired immediately. +// query := "SELECT pg_try_advisory_lock(?)" +// var success bool + +// _, err := cfg.Session.SQL(query, cfg.Key).Get(&success) +// if err != nil { +// return err +// } +// if !success { +// return ErrLockDB +// } + +// return nil +// } + +// func (db *YDBDialect) Unlock(cfg LockCfg) error { +// // trying to release a previously-acquired exclusive session level advisory lock. +// // it will either return true if the lock is successfully released or +// // false if the lock was not held (in addition an SQL warning will be reported by the server) +// query := "SELECT pg_advisory_unlock(?)" +// var success bool + +// _, err := cfg.Session.SQL(query, cfg.Key).Get(&success) +// if err != nil { +// return err +// } +// if !success { +// return ErrReleaseLockDB +// } +// return nil +// } + +func (db *YDBDialect) GetDBName(dsn string) (string, error) { + uri, err := url.Parse(dsn) + if err != nil { + return "", fmt.Errorf("failed on parse data source %v", dsn) + } + + return uri.Path, nil +} diff --git a/pkg/services/sqlstore/session.go b/pkg/services/sqlstore/session.go index 8269683493f3b..847500600a715 100644 --- a/pkg/services/sqlstore/session.go +++ b/pkg/services/sqlstore/session.go @@ -136,7 +136,7 @@ func (sess *DBSession) InsertId(bean any, dialect migrator.Dialect) error { func (sess *DBSession) WithReturningID(driverName string, query string, args []any) (int64, error) { var id int64 - if driverName == migrator.Postgres { + if driverName == migrator.Postgres || driverName == migrator.YDB { query = fmt.Sprintf("%s RETURNING id", query) if _, err := sess.SQL(query, args...).Get(&id); err != nil { return id, err diff --git a/pkg/services/sqlstore/session/session.go b/pkg/services/sqlstore/session/session.go index 92bd29ff25e37..f244a27b14e0b 100644 --- a/pkg/services/sqlstore/session/session.go +++ b/pkg/services/sqlstore/session/session.go @@ -102,7 +102,7 @@ func (gtx *SessionTx) ExecWithReturningId(ctx context.Context, query string, arg func execWithReturningId(ctx context.Context, driverName string, query string, sess Session, args ...any) (int64, error) { var id int64 - if driverName == "postgres" { + if driverName == "postgres" || driverName == "ydb" { query = fmt.Sprintf("%s RETURNING id", query) err := sess.Get(ctx, &id, query, args...) if err != nil { diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index d4842bee88460..5448e79c06f6a 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -369,6 +369,9 @@ func (ss *SQLStore) GetMigrationLockAttemptTimeout() int { } func (ss *SQLStore) RecursiveQueriesAreSupported() (bool, error) { + if ss.dialect.DriverName() == "ydb" { + return false, nil + } ss.recursiveQueriesMu.Lock() defer ss.recursiveQueriesMu.Unlock() if ss.recursiveQueriesAreSupported != nil { diff --git a/pkg/services/sqlstore/sqlstore_testinfra.go b/pkg/services/sqlstore/sqlstore_testinfra.go index 5ac0b8e828855..200c541261499 100644 --- a/pkg/services/sqlstore/sqlstore_testinfra.go +++ b/pkg/services/sqlstore/sqlstore_testinfra.go @@ -275,6 +275,8 @@ func createTemporaryDatabase(tb TestingTB) (*testDB, error) { driver, connString = newMySQLConnString(env("MYSQL_DB", "grafana_tests")) case "postgres": driver, connString = newPostgresConnString(env("POSTGRES_DB", "grafanatest")) + case "ydb": + driver, connString = newYDBConnString(env("YDB_DB", "grafanatest")) default: return nil, fmt.Errorf("unknown test db type: %s", dbType) } @@ -351,6 +353,10 @@ func newMySQLConnString(dbname string) (driver, connString string) { ) } +func newYDBConnString(_ string) (driver, connString string) { + return "ydb", "grpc://127.0.0.1:2136/local?go_query_mode=query&go_fake_tx=query&go_query_bind=numeric" // TODO: ... +} + func newSQLite3DB(tb TestingTB) (*testDB, error) { if os.Getenv("SQLITE_INMEMORY") == "true" { return &testDB{Driver: "sqlite3", Conn: "file::memory:"}, nil diff --git a/pkg/services/sqlstore/sqlutil/sqlutil.go b/pkg/services/sqlstore/sqlutil/sqlutil.go index 7d3d09a5ff3f0..ee9522ffa0e6f 100644 --- a/pkg/services/sqlstore/sqlutil/sqlutil.go +++ b/pkg/services/sqlstore/sqlutil/sqlutil.go @@ -48,6 +48,8 @@ func GetTestDB(dbType string) (*TestDB, error) { return postgresTestDB() case "sqlite3": return sqLite3TestDB() + case "ydb": + return ydbTestDB() } return nil, fmt.Errorf("unknown test db type: %s", dbType) @@ -167,3 +169,14 @@ func postgresTestDB() (*TestDB, error) { Cleanup: func() {}, }, nil } + +func ydbTestDB() (*TestDB, error) { + return &TestDB{ + DriverName: "ydb", + ConnStr: "grpc://127.0.0.1:2136/local?go_query_mode=query&go_fake_tx=query&go_query_bind=numeric", + Host: "127.0.0.1", + Port: "2136", + Database: "/local", + Cleanup: func() {}, + }, nil +} diff --git a/pkg/services/sqlstore/user.go b/pkg/services/sqlstore/user.go index a76261c465aa0..290d76952ae03 100644 --- a/pkg/services/sqlstore/user.go +++ b/pkg/services/sqlstore/user.go @@ -50,6 +50,10 @@ func (ss *SQLStore) createUser(ctx context.Context, sess *DBSession, args user.C } where := "LOWER(email)=LOWER(?) OR LOWER(login)=LOWER(?)" + if ss.dialect.DriverName() == "ydb" { + where = "Unicode::ToLower(email)=Unicode::ToLower(?) OR Unicode::ToLower(login)=Unicode::ToLower(?)" + } + args.Login = strings.ToLower(args.Login) args.Email = strings.ToLower(args.Email) diff --git a/pkg/services/user/userimpl/store.go b/pkg/services/user/userimpl/store.go index a777314737331..ce270dbf365ce 100644 --- a/pkg/services/user/userimpl/store.go +++ b/pkg/services/user/userimpl/store.go @@ -329,18 +329,18 @@ func (ss *sqlStore) GetSignedInUser(ctx context.Context, query *user.GetSignedIn org.id as org_id, u.is_service_account as is_service_account FROM ` + ss.dialect.Quote("user") + ` as u - LEFT OUTER JOIN org_user on org_user.org_id = ` + orgId + ` and org_user.user_id = u.id + LEFT OUTER JOIN org_user on org_user.user_id = u.id LEFT OUTER JOIN org on org.id = org_user.org_id ` sess := dbSess.Table("user") sess = sess.Context(ctx) switch { case query.UserID > 0: - sess.SQL(rawSQL+"WHERE u.id=?", query.UserID) + sess.SQL(rawSQL+"WHERE org_user.org_id = "+orgId+" and u.id=?", query.UserID) case query.Login != "": - sess.SQL(rawSQL+"WHERE LOWER(u.login)=LOWER(?)", query.Login) + sess.SQL(rawSQL+"WHERE org_user.org_id = "+orgId+" LOWER(u.login)=LOWER(?)", query.Login) case query.Email != "": - sess.SQL(rawSQL+"WHERE LOWER(u.email)=LOWER(?)", query.Email) + sess.SQL(rawSQL+"WHERE org_user.org_id = "+orgId+" LOWER(u.email)=LOWER(?)", query.Email) default: return user.ErrNoUniqueID } diff --git a/pkg/storage/secret/metadata/data/secure_value_lease_inactive.sql b/pkg/storage/secret/metadata/data/secure_value_lease_inactive.sql index 3b95aed3000c6..078b760e47090 100644 --- a/pkg/storage/secret/metadata/data/secure_value_lease_inactive.sql +++ b/pkg/storage/secret/metadata/data/secure_value_lease_inactive.sql @@ -1,4 +1,9 @@ -WITH to_update AS ( +UPDATE + {{ .Ident "secret_secure_value" }} +SET + {{ .Ident "lease_token" }} = {{ .Arg .LeaseToken }}, + {{ .Ident "lease_created" }} = {{ .Arg .Now }} +WHERE guid IN (SELECT guid FROM ( SELECT guid FROM ( SELECT guid, @@ -10,11 +15,5 @@ WITH to_update AS ( {{ .Arg .Now }} - {{ .Ident "lease_created" }} > {{ .Arg .LeaseTTL }} ) AS sub WHERE rn <= {{ .Arg .MaxBatchSize }} -) -UPDATE - {{ .Ident "secret_secure_value" }} -SET - {{ .Ident "lease_token" }} = {{ .Arg .LeaseToken }}, - {{ .Ident "lease_created" }} = {{ .Arg .Now }} -WHERE guid IN (SELECT guid FROM to_update) +)) ; \ No newline at end of file diff --git a/pkg/storage/unified/sql/db/dbimpl/db_engine.go b/pkg/storage/unified/sql/db/dbimpl/db_engine.go index 28fe6f9483c06..fdd4a2f75242f 100644 --- a/pkg/storage/unified/sql/db/dbimpl/db_engine.go +++ b/pkg/storage/unified/sql/db/dbimpl/db_engine.go @@ -11,7 +11,7 @@ import ( func getEngine(config *sqlstore.DatabaseConfig) (*xorm.Engine, error) { switch config.Type { - case dbTypeMySQL, dbTypePostgres, dbTypeSQLite: + case dbTypeMySQL, dbTypePostgres, dbTypeSQLite, dbTypeYDB: engine, err := xorm.NewEngine(config.Type, config.ConnectionString) if err != nil { return nil, fmt.Errorf("open database: %w", err) diff --git a/pkg/storage/unified/sql/db/dbimpl/dbimpl.go b/pkg/storage/unified/sql/db/dbimpl/dbimpl.go index 83d41adb6256b..db442842068b1 100644 --- a/pkg/storage/unified/sql/db/dbimpl/dbimpl.go +++ b/pkg/storage/unified/sql/db/dbimpl/dbimpl.go @@ -26,6 +26,7 @@ const ( dbTypeMySQL = "mysql" dbTypePostgres = "postgres" dbTypeSQLite = "sqlite3" + dbTypeYDB = "ydb" ) const grafanaDBInstrumentQueriesKey = "instrument_queries" diff --git a/pkg/storage/unified/sql/sqltemplate/dialect.go b/pkg/storage/unified/sql/sqltemplate/dialect.go index 2f2e74cf557af..0605a39fd49cc 100644 --- a/pkg/storage/unified/sql/sqltemplate/dialect.go +++ b/pkg/storage/unified/sql/sqltemplate/dialect.go @@ -23,6 +23,8 @@ func DialectForDriver(driverName string) Dialect { return PostgreSQL case "sqlite", "sqlite3": return SQLite + case "ydb": + return YDB default: return nil } diff --git a/pkg/storage/unified/sql/sqltemplate/dialect_ydb.go b/pkg/storage/unified/sql/sqltemplate/dialect_ydb.go new file mode 100644 index 0000000000000..4eea2663917bb --- /dev/null +++ b/pkg/storage/unified/sql/sqltemplate/dialect_ydb.go @@ -0,0 +1,34 @@ +package sqltemplate + +import ( + "fmt" +) + +// YDB is an implementation of Dialect for the YDB DMBS. +var YDB = ydb{} + +// var ( +// ErrydbUnsupportedIdent = errors.New("identifiers in ydb cannot contain the character with code zero") +// ) + +type ydb struct{} + +func (p ydb) DialectName() string { + return "ydb" +} + +func (p ydb) ArgPlaceholder(argNum int) string { + return fmt.Sprintf("$%d", argNum) +} + +func (p ydb) SelectFor(s ...string) (string, error) { + return rowLockingClauseAll.SelectFor(s...) +} + +func (p ydb) Ident(s string) (string, error) { + return "`" + s + "`", nil +} + +func (ydb) CurrentEpoch() string { + return "(EXTRACT(EPOCH FROM statement_timestamp()) * 1000000)::BIGINT" +} diff --git a/pkg/util/xorm/core/core.go b/pkg/util/xorm/core/core.go index cc1b494fd4147..469a2c7d4af20 100644 --- a/pkg/util/xorm/core/core.go +++ b/pkg/util/xorm/core/core.go @@ -1868,6 +1868,7 @@ const ( MYSQL = "mysql" MSSQL = "mssql" ORACLE = "oracle" + YDB = "ydb" ) // xorm SQL types diff --git a/pkg/util/xorm/dialect_ydb.go b/pkg/util/xorm/dialect_ydb.go new file mode 100644 index 0000000000000..73a070361ae3d --- /dev/null +++ b/pkg/util/xorm/dialect_ydb.go @@ -0,0 +1,990 @@ +package xorm + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net/url" + "strings" + + "github.com/ydb-platform/ydb-go-sdk/v3" + + "github.com/grafana/grafana/pkg/util/xorm/core" +) + +// type ydbDriver struct { +// } + +// func (p *ydbDriver) Parse(driverName, dataSourceName string) (*core.Uri, error) { +// // if strings.Contains(dataSourceName, "?") { +// // dataSourceName = dataSourceName[:strings.Index(dataSourceName, "?")] +// // } + +// return &core.Uri{DbType: core.YDB, DbName: dataSourceName}, nil +// } + +// from https://github.com/ydb-platform/ydb/blob/main/ydb/library/yql/sql/v1/SQLv1.g.in#L1117 +var ( + ydbReservedWords = map[string]bool{ + "ABORT": true, + "ACTION": true, + "ADD": true, + "AFTER": true, + "ALL": true, + "ALTER": true, + "ANALYZE": true, + "AND": true, + "ANSI": true, + "ANY": true, + "ARRAY": true, + "AS": true, + "ASC": true, + "ASSUME": true, + "ASYNC": true, + "ATTACH": true, + "AUTOINCREMENT": true, + "AUTOMAP": true, + "BEFORE": true, + "BEGIN": true, + "BERNOULLI": true, + "BETWEEN": true, + "BITCAST": true, + "BY": true, + "CALLABLE": true, + "CASCADE": true, + "CASE": true, + "CAST": true, + "CHANGEFEED": true, + "CHECK": true, + "COLLATE": true, + "COLUMN": true, + "COLUMNS": true, + "COMMIT": true, + "COMPACT": true, + "CONDITIONAL": true, + "CONFLICT": true, + "CONSTRAINT": true, + "COVER": true, + "CREATE": true, + "CROSS": true, + "CUBE": true, + "CURRENT": true, + "CURRENT_TIME": true, + "CURRENT_DATE": true, + "CURRENT_TIMESTAMP": true, + "DATABASE": true, + "DECIMAL": true, + "DECLARE": true, + "DEFAULT": true, + "DEFERRABLE": true, + "DEFERRED": true, + "DEFINE": true, + "DELETE": true, + "DESC": true, + "DETACH": true, + "DICT": true, + "DISABLE": true, + "DISCARD": true, + "DISTINCT": true, + "DO": true, + "DROP": true, + "EACH": true, + "ELSE": true, + "ERROR": true, + "EMPTY": true, + "EMPTY_ACTION": true, + "ENCRYPTED": true, + "END": true, + "ENUM": true, + "ERASE": true, + "ESCAPE": true, + "EVALUATE": true, + "EXCEPT": true, + "EXCLUDE": true, + "EXCLUSIVE": true, + "EXCLUSION": true, + "EXISTS": true, + "EXPLAIN": true, + "EXPORT": true, + "EXTERNAL": true, + "FAIL": true, + "FAMILY": true, + "FILTER": true, + "FLATTEN": true, + "FLOW": true, + "FOLLOWING": true, + "FOR": true, + "FOREIGN": true, + "FROM": true, + "FULL": true, + "FUNCTION": true, + "GLOB": true, + "GLOBAL": true, + "GROUP": true, + "GROUPING": true, + "GROUPS": true, + "HASH": true, + "HAVING": true, + "HOP": true, + "IF": true, + "IGNORE": true, + "ILIKE": true, + "IMMEDIATE": true, + "IMPORT": true, + "IN": true, + "INDEX": true, + "INDEXED": true, + "INHERITS": true, + "INITIALLY": true, + "INNER": true, + "INSERT": true, + "INSTEAD": true, + "INTERSECT": true, + "INTO": true, + "IS": true, + "ISNULL": true, + "JOIN": true, + "JSON_EXISTS": true, + "JSON_VALUE": true, + "JSON_QUERY": true, + "KEY": true, + "LEFT": true, + "LIKE": true, + "LIMIT": true, + "LIST": true, + "LOCAL": true, + "MATCH": true, + "NATURAL": true, + "NO": true, + "NOT": true, + "NOTNULL": true, + "NULL": true, + "NULLS": true, + "OBJECT": true, + "OF": true, + "OFFSET": true, + "ON": true, + "ONLY": true, + "OPTIONAL": true, + "OR": true, + "ORDER": true, + "OTHERS": true, + "OUTER": true, + "OVER": true, + "PARTITION": true, + "PASSING": true, + "PASSWORD": true, + "PLAN": true, + "PRAGMA": true, + "PRECEDING": true, + "PRESORT": true, + "PRIMARY": true, + "PROCESS": true, + "RAISE": true, + "RANGE": true, + "REDUCE": true, + "REFERENCES": true, + "REGEXP": true, + "REINDEX": true, + "RELEASE": true, + "RENAME": true, + "REPEATABLE": true, + "REPLACE": true, + "RESET": true, + "RESOURCE": true, + "RESPECT": true, + "RESTRICT": true, + "RESULT": true, + "RETURN": true, + "RETURNING": true, + "REVERT": true, + "RIGHT": true, + "RLIKE": true, + "ROLLBACK": true, + "ROLLUP": true, + "ROW": true, + "ROWS": true, + "SAMPLE": true, + "SAVEPOINT": true, + "SCHEMA": true, + "SELECT": true, + "SEMI": true, + "SET": true, + "SETS": true, + "STREAM": true, + "STRUCT": true, + "SUBQUERY": true, + "SYMBOLS": true, + "SYNC": true, + "SYSTEM": true, + "TABLE": true, + "TABLESAMPLE": true, + "TABLESTORE": true, + "TAGGED": true, + "TEMP": true, + "TEMPORARY": true, + "THEN": true, + "TIES": true, + "TO": true, + "TRANSACTION": true, + "TRIGGER": true, + "TUPLE": true, + "UNBOUNDED": true, + "UNCONDITIONAL": true, + "UNION": true, + "UNIQUE": true, + "UNKNOWN": true, + "UPDATE": true, + "UPSERT": true, + "USE": true, + "USER": true, + "USING": true, + "VACUUM": true, + "VALUES": true, + "VARIANT": true, + "VIEW": true, + "VIRTUAL": true, + "WHEN": true, + "WHERE": true, + "WINDOW": true, + "WITH": true, + "WITHOUT": true, + "WRAPPER": true, + "XOR": true, + "TRUE": true, + "FALSE": true, + } + + // ydbQuoter = core.Quoter{ + // Prefix: '`', + // Suffix: '`', + // IsReserved: core.AlwaysReserve, + // } +) + +const ( + // numeric types + yql_Bool = "BOOL" + + yql_Int8 = "INT8" + yql_Int16 = "INT16" + yql_Int32 = "INT32" + yql_Int64 = "INT64" + + yql_Uint8 = "UINT8" + yql_Uint16 = "UINT16" + yql_Uint32 = "UINT32" + yql_Uint64 = "UINT64" + + yql_Float = "FLOAT" + yql_Double = "DOUBLE" + yql_Decimal = "DECIMAL" + + // serial types + yql_Serial = "SERIAL" + yql_BigSerial = "BIGSERIAL" + + // string types + yql_String = "STRING" + yql_Utf8 = "UTF8" + yql_Json = "JSON" + yql_JsonDocument = "JSONDOCUMENT" + yql_Yson = "YSON" + + // Data and Time + yql_Date = "DATE" + yql_DateTime = "DATETIME" + yql_Timestamp = "TIMESTAMP" + yql_Interval = "INTERVAL" + + // Containers + yql_List = "LIST" +) + +func toYQLDataType(t string, isAutoIncrement bool) string { + switch t { + case core.Bool, core.Boolean: + return yql_Bool + case core.TinyInt: + return yql_Int8 + case core.SmallInt, core.MediumInt, core.Int, core.Integer, core.BigInt: + // if isAutoIncrement { + // return yql_Serial + // } + // return yql_Int32 + // case core.BigInt: + if isAutoIncrement { + return yql_BigSerial + } + return yql_Int64 + case core.Float: + return yql_Float + case core.Double: + return yql_Double + case core.Blob, core.LongBlob, core.MediumBlob, core.TinyBlob, core.VarBinary, core.Binary: + return yql_String + case core.Json: + return yql_Json + case core.Varchar, core.NVarchar, core.Char, core.NChar, + core.MediumText, core.LongText, core.Text, core.NText, core.TinyText: + return yql_Utf8 + case core.TimeStamp, core.Time, core.Date, core.DateTime: + return yql_Timestamp + case core.Serial: + return yql_Serial + case core.BigSerial: + return yql_BigSerial + default: + return t + } +} + +func yqlToSQLType(yqlType string) core.SQLType { + switch yqlType { + case yql_Bool: + return core.SQLType{Name: core.Bool, DefaultLength: 0, DefaultLength2: 0} + case yql_Int8: + return core.SQLType{Name: core.TinyInt, DefaultLength: 0, DefaultLength2: 0} + case yql_Int16: + return core.SQLType{Name: core.SmallInt, DefaultLength: 0, DefaultLength2: 0} + case yql_Int32: + return core.SQLType{Name: core.MediumInt, DefaultLength: 0, DefaultLength2: 0} + case yql_Int64: + return core.SQLType{Name: core.BigInt, DefaultLength: 0, DefaultLength2: 0} + case yql_Float: + return core.SQLType{Name: core.Float, DefaultLength: 0, DefaultLength2: 0} + case yql_Double: + return core.SQLType{Name: core.Double, DefaultLength: 0, DefaultLength2: 0} + case yql_String: + return core.SQLType{Name: core.Blob, DefaultLength: 0, DefaultLength2: 0} + case yql_Json: + return core.SQLType{Name: core.Json, DefaultLength: 0, DefaultLength2: 0} + case yql_Utf8: + return core.SQLType{Name: core.Varchar, DefaultLength: 255, DefaultLength2: 0} + case yql_Timestamp: + return core.SQLType{Name: core.TimeStamp, DefaultLength: 0, DefaultLength2: 0} + default: + return core.SQLType{Name: yqlType} + } +} + +func removeOptional(s string) string { + if s = strings.ToUpper(s); strings.HasPrefix(s, "OPTIONAL") { + s = strings.TrimPrefix(s, "OPTIONAL<") + s = strings.TrimSuffix(s, ">") + } + return s +} + +type ydbDialect struct { + core.Base + + tableParams map[string]string // TODO: maybe remove +} + +// TODO: +// func (w *ydbStmtWrapper) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) { +// if strings.HasSuffix(w.query, "LIMIT $3;\n") { +// for i, arg := range args { +// if arg.Ordinal == 3 { +// args[i].Value = uint64(arg.Value.(int64)) +// } +// } +// } + +// return w.stmtCtx.QueryContext(ctx, args) +// } + +func (db *ydbDialect) Init(d *core.DB, uri *core.Uri, drivername, dataSource string) error { + ydbDriver, err := ydb.Open(context.Background(), dataSource) + if err != nil { + return fmt.Errorf("failed to connect by data source name '%s': %w", dataSource, err) + } + + connector, err := ydb.Connector(ydbDriver, + ydb.WithQueryService(true), + ydb.WithFakeTx(ydb.QueryExecuteQueryMode), + ydb.WithNumericArgs(), + ydb.WithAutoDeclare(), + ) + if err != nil { + _ = ydbDriver.Close(context.Background()) + return err + } + + sqldb := sql.OpenDB(connector) + + d.DB = sqldb + + return db.Base.Init(core.FromDB(sqldb), db, uri, drivername, dataSource) +} + +func (db *ydbDialect) IndexCheckSql(tableName, idxName string) (string, []interface{}) { + return "SELECT Path FROM `.sys/partition_stats` where Path LIKE '%/'" + + " || $1 || '/' || $2 || '/indexImplTable'", []any{tableName, indexName} +} + +func (db *ydbDialect) SupportInsertMany() bool { + return true // TODO: +} + +func (db *ydbDialect) SupportCharset() bool { + return false // TODO: +} + +func (db *ydbDialect) SupportEngine() bool { + return false // TODO: +} + +func (db *ydbDialect) WithConn(ctx context.Context, f func(context.Context, *sql.Conn) error) error { + cc, err := db.DB().Conn(ctx) + if err != nil { + return err + } + defer cc.Close() + + return f(ctx, cc) +} + +func (db *ydbDialect) WithConnRaw(ctx context.Context, f func(d interface{}) error) error { + return db.WithConn(ctx, func(ctx context.Context, cc *sql.Conn) error { + return cc.Raw(f) + }) +} + +func (db *ydbDialect) SetParams(tableParams map[string]string) { + db.tableParams = tableParams +} + +func (db *ydbDialect) IsTableExist( + ctx context.Context, + tableName string) (_ bool, err error) { + var exists bool + err = db.WithConnRaw(ctx, func(dc interface{}) error { + q, ok := dc.(interface { + IsTableExists(context.Context, string) (bool, error) + }) + if !ok { + return fmt.Errorf("driver hasn't method IsTableExists()") + } + exists, err = q.IsTableExists(ctx, tableName) + if err != nil { + return err + } + return nil + }) + + if err != nil { + return false, err + } + return exists, nil +} + +func (db *ydbDialect) TableCheckSql(tableName string) (string, []any) { + return "SELECT Path FROM `.sys/partition_stats` where Path LIKE '%/' || $1", []any{tableName} +} + +func (db *ydbDialect) AutoIncrStr() string { + return "" +} + +func (db *ydbDialect) IsReserved(name string) bool { + _, ok := ydbReservedWords[strings.ToUpper(name)] + return ok +} + +func (db *ydbDialect) SqlType(column *core.Column) string { + return toYQLDataType(column.SQLType.Name, column.IsAutoIncrement) +} + +// https://pkg.go.dev/database/sql#ColumnType.DatabaseTypeName +func (db *ydbDialect) ColumnTypeKind(t string) int { + switch t { + // case "BOOL": + // return core.BOOL_TYPE + case "INT8", "INT16", "INT32", "INT64", "UINT8", "UINT16", "UINT32", "UINT64": + return core.NUMERIC_TYPE + case "UTF8": + return core.TEXT_TYPE + case "TIMESTAMP": + return core.TIME_TYPE + default: + return core.UNKNOW_TYPE + } +} + +func (db *ydbDialect) Quote(name string) string { + return "`" + name + "`" // TODO: +} + +func (db *ydbDialect) AddColumnSQL(tableName string, col *core.Column) string { + tableName = db.Quote(tableName) + columnName := db.Quote(col.Name) + dataType := db.SqlType(col) + + var buf strings.Builder + buf.WriteString(fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s;", tableName, columnName, dataType)) + + return buf.String() +} + +// YDB does not support this operation +func (db *ydbDialect) ModifyColumnSQL(tableName string, column *core.Column) string { + return "" +} + +func (db *ydbDialect) DropIndexSql(tableName string, index *core.Index) string { + tableName = db.Quote(tableName) + indexName := db.Quote(index.Name) + + var buf strings.Builder + buf.WriteString(fmt.Sprintf("ALTER TABLE %s DROP INDEX %s;", tableName, indexName)) + + return buf.String() +} + +func (db *ydbDialect) IndexOnTable() bool { + return true // TODO: +} + +// TODO: +func (db *ydbDialect) IsColumnExist( + tableName, + columnName string) (_ bool, err error) { + var exists bool + ctx := context.TODO() + err = db.WithConnRaw(ctx, func(dc interface{}) error { + q, ok := dc.(interface { + IsColumnExists(context.Context, string, string) (bool, error) + }) + if !ok { + return fmt.Errorf("conn hasn't method IsColumnExists()") + } + exists, err = q.IsColumnExists(ctx, tableName, columnName) + if err != nil { + return err + } + return nil + }) + + if err != nil { + return false, err + } + return exists, nil +} + +// TODO: +func (db *ydbDialect) GetColumns(tableName string) ( + _ []string, + _ map[string]*core.Column, + err error) { + + ctx := context.TODO() + + colNames := make([]string, 0) + colMaps := make(map[string]*core.Column) + + // db.nativeDriver + + err = db.WithConnRaw(ctx, func(dc interface{}) error { + q, ok := dc.(interface { + GetColumns(context.Context, string) ([]string, error) + GetColumnType(context.Context, string, string) (string, error) + IsPrimaryKey(context.Context, string, string) (bool, error) + }) + if !ok { + return fmt.Errorf("driver does not support method [GetColumns]") + } + + colNames, err = q.GetColumns(ctx, tableName) + if err != nil { + return err + } + + for _, colName := range colNames { + dataType, err := q.GetColumnType(ctx, tableName, colName) + if err != nil { + return err + } + dataType = removeOptional(dataType) + isPK, err := q.IsPrimaryKey(ctx, tableName, colName) + if err != nil { + return err + } + col := &core.Column{ + Name: colName, + TableName: tableName, + SQLType: yqlToSQLType(dataType), + IsPrimaryKey: isPK, + Nullable: !isPK, + Indexes: make(map[string]int), + } + if dataType == "SERIAL" || dataType == "BIGSERIAL" { + col.IsAutoIncrement = true + } + colMaps[colName] = col + } + return nil + }) + if err != nil { + return nil, nil, err + } + + return colNames, colMaps, nil +} + +func (db *ydbDialect) GetTables() (_ []*core.Table, err error) { + tables := make([]*core.Table, 0) + ctx := context.TODO() + err = db.WithConnRaw(ctx, func(dc interface{}) error { + q, ok := dc.(interface { + GetTables(context.Context, string, bool, bool) ([]string, error) + }) + if !ok { + return fmt.Errorf("driver does not support method [GetTables]") + } + tableNames, err := q.GetTables(ctx, ".", true, true) + if err != nil { + return err + } + for _, tableName := range tableNames { + table := core.NewEmptyTable() + table.Name = tableName + tables = append(tables, table) + } + return nil + }) + if err != nil { + return nil, err + } + return tables, nil +} + +func (db *ydbDialect) GetIndexes(tableName string) (_ map[string]*core.Index, err error) { + panic(tableName) + indexes := make(map[string]*core.Index, 0) + ctx := context.TODO() + err = db.WithConnRaw(ctx, func(dc interface{}) error { + q, ok := dc.(interface { + GetIndexes(context.Context, string) ([]string, error) + GetIndexColumns(context.Context, string, string) ([]string, error) + }) + if !ok { + return fmt.Errorf("driver does not support method [GetIndexes]") + } + indexNames, err := q.GetIndexes(ctx, tableName) + if err != nil { + return err + } + for _, indexName := range indexNames { + cols, err := q.GetIndexColumns(ctx, tableName, indexName) + if err != nil { + return err + } + indexes[indexName] = &core.Index{ + Name: indexName, + Type: core.IndexType, + Cols: cols, + } + } + return nil + }) + if err != nil { + return nil, err + } + return indexes, nil +} + +// !datbeohbbh! CreateTableSQL generate `CREATE TABLE` YQL. +// Method does not generate YQL for creating index. +func (db *ydbDialect) CreateTableSQL( + ctx context.Context, + _ any, + table *core.Table, + tableName string) (string, bool, error) { + tableName = db.Quote(tableName) + + var buf strings.Builder + buf.WriteString(fmt.Sprintf("CREATE TABLE %s ( ", tableName)) + + // build primary key + if len(table.PrimaryKeys) == 0 { + return "", false, errors.New("table must have at least one primary key") + } + pk := make([]string, len(table.PrimaryKeys)) + pkMap := make(map[string]bool) + for i := 0; i < len(table.PrimaryKeys); i++ { + pk[i] = db.Quote(table.PrimaryKeys[i]) + pkMap[pk[i]] = true + } + primaryKey := fmt.Sprintf("PRIMARY KEY ( %s )", strings.Join(pk, ", ")) + + // build column + columnsList := []string{} + for _, c := range table.Columns() { + columnName := db.Quote(c.Name) + dataType := db.SqlType(c) + + if _, isPk := pkMap[columnName]; isPk { + columnsList = append(columnsList, fmt.Sprintf("%s %s NOT NULL", columnName, dataType)) + } else { + columnsList = append(columnsList, fmt.Sprintf("%s %s", columnName, dataType)) + } + } + joinColumns := strings.Join(columnsList, ", ") + + buf.WriteString(strings.Join([]string{joinColumns, primaryKey}, ", ")) + buf.WriteString(" ) ") + + if len(db.tableParams) > 0 { + params := make([]string, 0) + for param, value := range db.tableParams { + if param == "" || value == "" { + continue + } + params = append(params, fmt.Sprintf("%s = %s", param, value)) + } + if len(params) > 0 { + buf.WriteString(fmt.Sprintf("WITH ( %s ) ", strings.Join(params, ", "))) + } + } + + buf.WriteString("; ") + + return buf.String(), true, nil +} + +func (db *ydbDialect) DropTableSQL(tableName string) (string, bool) { + tableName = db.Quote(tableName) + + var buf strings.Builder + buf.WriteString(fmt.Sprintf("DROP TABLE %s;", tableName)) + + return buf.String(), false +} + +type ydbSeqFilter struct { + Prefix string + Start int +} + +// TODO: +func (db *ydbDialect) Filters() []core.Filter { + return []core.Filter{&core.IdFilter{}, &core.SeqFilter{Prefix: "$", Start: 1}} +} + +const ( + ydb_grpc_Canceled uint32 = 1 + ydb_grpc_Unknown uint32 = 2 + ydb_grpc_InvalidArgument uint32 = 3 + ydb_grpc_DeadlineExceeded uint32 = 4 + ydb_grpc_NotFound uint32 = 5 + ydb_grpc_AlreadyExists uint32 = 6 + ydb_grpc_PermissionDenied uint32 = 7 + ydb_grpc_ResourceExhausted uint32 = 8 + ydb_grpc_FailedPrecondition uint32 = 9 + ydb_grpc_Aborted uint32 = 10 + ydb_grpc_OutOfRange uint32 = 11 + ydb_grpc_Unimplemented uint32 = 12 + ydb_grpc_Internal uint32 = 13 + ydb_grpc_Unavailable uint32 = 14 + ydb_grpc_DataLoss uint32 = 15 + ydb_grpc_Unauthenticated uint32 = 16 +) + +const ( + ydb_STATUS_CODE_UNSPECIFIED int32 = 0 + ydb_SUCCESS int32 = 400000 + ydb_BAD_REQUEST int32 = 400010 + ydb_UNAUTHORIZED int32 = 400020 + ydb_INTERNAL_ERROR int32 = 400030 + ydb_ABORTED int32 = 400040 + ydb_UNAVAILABLE int32 = 400050 + ydb_OVERLOADED int32 = 400060 + ydb_SCHEME_ERROR int32 = 400070 + ydb_GENERIC_ERROR int32 = 400080 + ydb_TIMEOUT int32 = 400090 + ydb_BAD_SESSION int32 = 400100 + ydb_PRECONDITION_FAILED int32 = 400120 + ydb_ALREADY_EXISTS int32 = 400130 + ydb_NOT_FOUND int32 = 400140 + ydb_SESSION_EXPIRED int32 = 400150 + ydb_CANCELLED int32 = 400160 + ydb_UNDETERMINED int32 = 400170 + ydb_UNSUPPORTED int32 = 400180 + ydb_SESSION_BUSY int32 = 400190 +) + +// https://github.com/ydb-platform/ydb-go-sdk/blob/ca13feb3ca560ac7385e79d4365ffe0cd8c23e21/errors.go#L27 +func (db *ydbDialect) IsRetryable(err error) bool { + var target interface { + error + Code() int32 + Name() string + } + if errors.Is(err, fmt.Errorf("unknown error")) || + errors.Is(err, context.DeadlineExceeded) || + errors.Is(err, context.Canceled) { + return false + } + if !errors.As(err, &target) { + return false + } + + switch target.Code() { + case + int32(ydb_grpc_Unknown), + int32(ydb_grpc_InvalidArgument), + int32(ydb_grpc_DeadlineExceeded), + int32(ydb_grpc_NotFound), + int32(ydb_grpc_AlreadyExists), + int32(ydb_grpc_PermissionDenied), + int32(ydb_grpc_FailedPrecondition), + int32(ydb_grpc_OutOfRange), + int32(ydb_grpc_Unimplemented), + int32(ydb_grpc_DataLoss), + int32(ydb_grpc_Unauthenticated): + return false + case + int32(ydb_grpc_Canceled), + int32(ydb_grpc_ResourceExhausted), + int32(ydb_grpc_Aborted), + int32(ydb_grpc_Internal), + int32(ydb_grpc_Unavailable): + return true + case + ydb_STATUS_CODE_UNSPECIFIED, + ydb_BAD_REQUEST, + ydb_UNAUTHORIZED, + ydb_INTERNAL_ERROR, + ydb_SCHEME_ERROR, + ydb_GENERIC_ERROR, + ydb_TIMEOUT, + ydb_PRECONDITION_FAILED, + ydb_ALREADY_EXISTS, + ydb_NOT_FOUND, + ydb_SESSION_EXPIRED, + ydb_CANCELLED, + ydb_UNSUPPORTED: + return false + case + ydb_ABORTED, + ydb_UNAVAILABLE, + ydb_OVERLOADED, + ydb_BAD_SESSION, + ydb_UNDETERMINED, + ydb_SESSION_BUSY: + return true + default: + return false + } +} + +type ydbDriver struct { + core.Base +} + +// DSN format: https://github.com/ydb-platform/ydb-go-sdk/blob/a804c31be0d3c44dfd7b21ed49d863619217b11d/connection.go#L339 +func (ydbDrv *ydbDriver) Parse(driverName, dataSourceName string) (*core.Uri, error) { + info := &core.Uri{DbType: core.YDB} + + uri, err := url.Parse(dataSourceName) + if err != nil { + return nil, fmt.Errorf("failed on parse data source %v", dataSourceName) + } + + const ( + secure = "grpcs" + insecure = "grpc" + ) + + if uri.Scheme != secure && uri.Scheme != insecure { + return nil, fmt.Errorf("unsupported scheme %v", uri.Scheme) + } + + info.Host = uri.Host + if spl := strings.Split(uri.Host, ":"); len(spl) > 1 { + info.Host = spl[0] + info.Port = spl[1] + } + + info.DbName = uri.Path + if info.DbName == "" { + return nil, errors.New("database path can not be empty") + } + + if uri.User != nil { + info.Passwd, _ = uri.User.Password() + info.User = uri.User.Username() + } + + return info, nil +} + +// https://pkg.go.dev/database/sql#ColumnType.DatabaseTypeName +func GenScanResult(columnType string) (interface{}, error) { + switch columnType = removeOptional(columnType); columnType { + case yql_Bool: + var ret sql.NullBool + return &ret, nil + case yql_Int16: + var ret sql.NullInt16 + return &ret, nil + case yql_Int32: + var ret sql.NullInt32 + return &ret, nil + case yql_Int64: + var ret sql.NullInt64 + return &ret, nil + case yql_Uint8: + var ret sql.NullByte + return &ret, nil + case yql_Double: + var ret sql.NullFloat64 + return &ret, nil + case yql_Utf8: + var ret sql.NullString + return &ret, nil + case yql_Timestamp: + var ret sql.NullTime + return &ret, nil + default: + var ret sql.RawBytes + return &ret, nil + } +} + +// func (ydbDrv *ydbDriver) Scan(ctx *Scan, rows *core.Rows, types []*sql.ColumnType, v ...interface{}) error { +// if err := rows.Scan(v...); err != nil { +// return err +// } + +// if ctx.UserLocation == nil { +// return nil +// } + +// for i := range v { +// // !datbeohbbh! YDB saves time in UTC. When returned value is time type, then value will be represented in local time. +// // So value in time type must be converted to UserLocation. +// switch des := v[i].(type) { +// case *time.Time: +// *des = (*des).In(ctx.UserLocation) +// case *sql.NullTime: +// if des.Valid { +// (*des).Time = (*des).Time.In(ctx.UserLocation) +// } +// case *interface{}: +// switch t := (*des).(type) { +// case time.Time: +// *des = t.In(ctx.UserLocation) +// case sql.NullTime: +// if t.Valid { +// *des = t.Time.In(ctx.UserLocation) +// } +// } +// } +// } + +// return nil +// } diff --git a/pkg/util/xorm/engine.go b/pkg/util/xorm/engine.go index 155a93421c7d2..e0c6e882b672b 100644 --- a/pkg/util/xorm/engine.go +++ b/pkg/util/xorm/engine.go @@ -718,6 +718,14 @@ func (engine *Engine) nowTime(col *core.Column) (any, time.Time) { func (engine *Engine) formatColTime(col *core.Column, t time.Time) (v any) { if t.IsZero() { + if engine.dialect.DBType() == core.YDB { + if col.Nullable { + var tf *time.Time + return tf + } + return t + } + if col.Nullable { return nil } @@ -732,6 +740,10 @@ func (engine *Engine) formatColTime(col *core.Column, t time.Time) (v any) { // formatTime format time as column type func (engine *Engine) formatTime(sqlTypeName string, t time.Time) (v any) { + if engine.dialect.DBType() == core.YDB { + return t + } + switch sqlTypeName { case core.Time: s := t.Format("2006-01-02 15:04:05") // time.RFC3339 diff --git a/pkg/util/xorm/result_wrapper.go b/pkg/util/xorm/result_wrapper.go new file mode 100644 index 0000000000000..4d4ae9079fbeb --- /dev/null +++ b/pkg/util/xorm/result_wrapper.go @@ -0,0 +1,68 @@ +package xorm + +import ( + "database/sql" + "errors" + "strings" +) + +// YDBResultWrapper wraps sql.Result to handle RowsAffected errors for YDB +type YDBResultWrapper struct { + sql.Result +} + +// NewYDBResultWrapper creates a new YDB result wrapper +func NewYDBResultWrapper(result sql.Result) *YDBResultWrapper { + return &YDBResultWrapper{Result: result} +} + +// RowsAffected returns the number of rows affected, or 0 if not implemented +func (w *YDBResultWrapper) RowsAffected() (int64, error) { + if w.Result == nil { + return 0, nil + } + + affected, err := w.Result.RowsAffected() + if err != nil { + // Check if the error indicates "not implemented" + if isNotImplementedError(err) { + return 0, nil // Return 0 instead of error for not implemented + } + return 0, err + } + return affected, nil +} + +// LastInsertId delegates to the wrapped result +func (w *YDBResultWrapper) LastInsertId() (int64, error) { + if w.Result == nil { + return 0, errors.New("LastInsertId is not supported") + } + return w.Result.LastInsertId() +} + +// isNotImplementedError checks if the error indicates "not implemented" +func isNotImplementedError(err error) bool { + if err == nil { + return false + } + + errStr := strings.ToLower(err.Error()) + // Common patterns for "not implemented" errors + notImplementedPatterns := []string{ + "not implemented", + "not supported", + "unimplemented", + "unsupported", + "rowsaffected not implemented", + "rowsaffected is not supported", + } + + for _, pattern := range notImplementedPatterns { + if strings.Contains(errStr, pattern) { + return true + } + } + + return false +} diff --git a/pkg/util/xorm/result_wrapper_test.go b/pkg/util/xorm/result_wrapper_test.go new file mode 100644 index 0000000000000..d11f86a04d4d7 --- /dev/null +++ b/pkg/util/xorm/result_wrapper_test.go @@ -0,0 +1,230 @@ +package xorm + +import ( + "errors" + "testing" +) + +// mockResult implements sql.Result for testing +type mockResult struct { + rowsAffected int64 + lastInsertId int64 + rowsAffectedErr error + lastInsertIdErr error +} + +func (m *mockResult) RowsAffected() (int64, error) { + return m.rowsAffected, m.rowsAffectedErr +} + +func (m *mockResult) LastInsertId() (int64, error) { + return m.lastInsertId, m.lastInsertIdErr +} + +func TestYDBResultWrapper_RowsAffected(t *testing.T) { + tests := []struct { + name string + mockResult *mockResult + expectedRows int64 + expectedErr error + }{ + { + name: "normal case - no error", + mockResult: &mockResult{ + rowsAffected: 5, + rowsAffectedErr: nil, + }, + expectedRows: 5, + expectedErr: nil, + }, + { + name: "not implemented error - should return 0", + mockResult: &mockResult{ + rowsAffected: 0, + rowsAffectedErr: errors.New("RowsAffected not implemented"), + }, + expectedRows: 0, + expectedErr: nil, + }, + { + name: "not supported error - should return 0", + mockResult: &mockResult{ + rowsAffected: 0, + rowsAffectedErr: errors.New("RowsAffected is not supported"), + }, + expectedRows: 0, + expectedErr: nil, + }, + { + name: "unimplemented error - should return 0", + mockResult: &mockResult{ + rowsAffected: 0, + rowsAffectedErr: errors.New("unimplemented feature"), + }, + expectedRows: 0, + expectedErr: nil, + }, + { + name: "other error - should return error", + mockResult: &mockResult{ + rowsAffected: 0, + rowsAffectedErr: errors.New("database connection failed"), + }, + expectedRows: 0, + expectedErr: errors.New("database connection failed"), + }, + { + name: "nil result - should return 0", + mockResult: nil, + expectedRows: 0, + expectedErr: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var wrapper *YDBResultWrapper + if tt.mockResult != nil { + wrapper = NewYDBResultWrapper(tt.mockResult) + } else { + wrapper = NewYDBResultWrapper(nil) + } + + rows, err := wrapper.RowsAffected() + + if rows != tt.expectedRows { + t.Errorf("expected rows affected %d, got %d", tt.expectedRows, rows) + } + + if tt.expectedErr == nil && err != nil { + t.Errorf("expected no error, got %v", err) + } else if tt.expectedErr != nil && err == nil { + t.Errorf("expected error %v, got nil", tt.expectedErr) + } else if tt.expectedErr != nil && err != nil && err.Error() != tt.expectedErr.Error() { + t.Errorf("expected error %v, got %v", tt.expectedErr, err) + } + }) + } +} + +func TestYDBResultWrapper_LastInsertId(t *testing.T) { + tests := []struct { + name string + mockResult *mockResult + expectedId int64 + expectedErr error + }{ + { + name: "normal case - no error", + mockResult: &mockResult{ + lastInsertId: 123, + lastInsertIdErr: nil, + }, + expectedId: 123, + expectedErr: nil, + }, + { + name: "error case", + mockResult: &mockResult{ + lastInsertId: 0, + lastInsertIdErr: errors.New("LastInsertId not supported"), + }, + expectedId: 0, + expectedErr: errors.New("LastInsertId not supported"), + }, + { + name: "nil result - should return error", + mockResult: nil, + expectedId: 0, + expectedErr: errors.New("LastInsertId is not supported"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var wrapper *YDBResultWrapper + if tt.mockResult != nil { + wrapper = NewYDBResultWrapper(tt.mockResult) + } else { + wrapper = NewYDBResultWrapper(nil) + } + + id, err := wrapper.LastInsertId() + + if id != tt.expectedId { + t.Errorf("expected last insert id %d, got %d", tt.expectedId, id) + } + + if tt.expectedErr == nil && err != nil { + t.Errorf("expected no error, got %v", err) + } else if tt.expectedErr != nil && err == nil { + t.Errorf("expected error %v, got nil", tt.expectedErr) + } else if tt.expectedErr != nil && err != nil && err.Error() != tt.expectedErr.Error() { + t.Errorf("expected error %v, got %v", tt.expectedErr, err) + } + }) + } +} + +func TestIsNotImplementedError(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + { + name: "nil error", + err: nil, + expected: false, + }, + { + name: "not implemented error", + err: errors.New("not implemented"), + expected: true, + }, + { + name: "not supported error", + err: errors.New("not supported"), + expected: true, + }, + { + name: "unimplemented error", + err: errors.New("unimplemented"), + expected: true, + }, + { + name: "unsupported error", + err: errors.New("unsupported"), + expected: true, + }, + { + name: "RowsAffected not implemented", + err: errors.New("RowsAffected not implemented"), + expected: true, + }, + { + name: "RowsAffected is not supported", + err: errors.New("RowsAffected is not supported"), + expected: true, + }, + { + name: "case insensitive - NOT IMPLEMENTED", + err: errors.New("NOT IMPLEMENTED"), + expected: true, + }, + { + name: "other error", + err: errors.New("database connection failed"), + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isNotImplementedError(tt.err) + if result != tt.expected { + t.Errorf("expected %v, got %v for error: %v", tt.expected, result, tt.err) + } + }) + } +} diff --git a/pkg/util/xorm/session_convert.go b/pkg/util/xorm/session_convert.go index 20ef88be2daff..b319e079cef68 100644 --- a/pkg/util/xorm/session_convert.go +++ b/pkg/util/xorm/session_convert.go @@ -500,6 +500,11 @@ func (session *Session) value2Interface(col *core.Column, fieldValue reflect.Val k := fieldType.Kind() if k == reflect.Ptr { if fieldValue.IsNil() { + sqlType := session.engine.dialect.SqlType(col) + if sqlType == yql_Utf8 { + var ret *string + return ret, nil + } return nil, nil } else if !fieldValue.IsValid() { session.engine.logger.Warn("the field[", col.FieldName, "] is invalid") diff --git a/pkg/util/xorm/session_insert.go b/pkg/util/xorm/session_insert.go index 0b2b79e9f216c..bef4353a5fe85 100644 --- a/pkg/util/xorm/session_insert.go +++ b/pkg/util/xorm/session_insert.go @@ -419,7 +419,10 @@ func (session *Session) innerInsert(bean any) (int64, error) { } } - if len(table.AutoIncrement) > 0 && session.engine.dialect.DBType() == core.POSTGRES { + var autoIncReturning = len(table.AutoIncrement) > 0 && + (session.engine.dialect.DBType() == core.POSTGRES || session.engine.dialect.DBType() == core.YDB) + + if autoIncReturning { buf.WriteString(" RETURNING " + session.engine.Quote(table.AutoIncrement)) } @@ -484,7 +487,7 @@ func (session *Session) innerInsert(bean any) (int64, error) { return 1, err } rowsAffected = 1 - } else if len(table.AutoIncrement) > 0 && (session.engine.dialect.DBType() == core.POSTGRES) { + } else if autoIncReturning { res, err := session.queryBytes(sqlStr, args...) if err != nil { diff --git a/pkg/util/xorm/statement.go b/pkg/util/xorm/statement.go index 6866463afcf5e..9d6375190a92a 100644 --- a/pkg/util/xorm/statement.go +++ b/pkg/util/xorm/statement.go @@ -350,7 +350,15 @@ func (statement *Statement) buildUpdates(bean any, if fieldType.Kind() == reflect.Ptr { if fieldValue.IsNil() { if includeNil { - args = append(args, nil) + var nilValue any + + sqlType := statement.Engine.dialect.SqlType(col) + if sqlType == yql_Utf8 { + var ret *string + nilValue = ret + } + + args = append(args, nilValue) colNames = append(colNames, fmt.Sprintf("%v=?", engine.Quote(col.Name))) } continue diff --git a/pkg/util/xorm/xorm.go b/pkg/util/xorm/xorm.go index da15e50762d48..fa5ead7b5f0bd 100644 --- a/pkg/util/xorm/xorm.go +++ b/pkg/util/xorm/xorm.go @@ -37,6 +37,7 @@ func regDrvsNDialects() bool { "postgres": {"postgres", func() core.Driver { return &pqDriver{} }, func() core.Dialect { return &postgres{} }}, "pgx": {"postgres", func() core.Driver { return &pqDriverPgx{} }, func() core.Dialect { return &postgres{} }}, "sqlite3": {"sqlite3", func() core.Driver { return &sqlite3Driver{} }, func() core.Dialect { return &sqlite3{} }}, + "ydb": {"ydb", func() core.Driver { return &ydbDriver{} }, func() core.Dialect { return &ydbDialect{} }}, } for driverName, v := range providedDrvsNDialects {