Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions balancers/balancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xslices"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
)
Expand All @@ -29,8 +29,8 @@ func SingleConn() *balancerConfig.Config {

type filterLocalDC struct{}

func (filterLocalDC) Allow(info balancerConfig.Info, c conn.Conn) bool {
return c.Endpoint().Location() == info.SelfLocation
func (filterLocalDC) Allow(info balancerConfig.Info, e endpoint.Info) bool {
return e.Location() == info.SelfLocation
}

func (filterLocalDC) String() string {
Expand Down Expand Up @@ -59,8 +59,8 @@ func PreferLocalDCWithFallBack(balancer *balancerConfig.Config) *balancerConfig.

type filterLocations []string

func (locations filterLocations) Allow(_ balancerConfig.Info, c conn.Conn) bool {
location := strings.ToUpper(c.Endpoint().Location())
func (locations filterLocations) Allow(_ balancerConfig.Info, e endpoint.Info) bool {
location := strings.ToUpper(e.Location())
for _, l := range locations {
if location == l {
return true
Expand Down Expand Up @@ -127,10 +127,10 @@ type Endpoint interface {
LocalDC() bool
}

type filterFunc func(info balancerConfig.Info, c conn.Conn) bool
type filterFunc func(info balancerConfig.Info, e endpoint.Info) bool

func (p filterFunc) Allow(info balancerConfig.Info, c conn.Conn) bool {
return p(info, c)
func (p filterFunc) Allow(info balancerConfig.Info, e endpoint.Info) bool {
return p(info, e)
}

func (p filterFunc) String() string {
Expand All @@ -140,8 +140,8 @@ func (p filterFunc) String() string {
// Prefer creates balancer which use endpoints by filter
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer.Filter = filterFunc(func(_ balancerConfig.Info, c conn.Conn) bool {
return filter(c.Endpoint())
balancer.Filter = filterFunc(func(_ balancerConfig.Info, e endpoint.Info) bool {
return filter(e)
})

return balancer
Expand Down
7 changes: 0 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,6 @@ func (c *Config) Meta() *meta.Meta {
return c.meta
}

// ConnectionTTL defines interval for parking grpc connections.
//
// If ConnectionTTL is zero - connections are not park.
func (c *Config) ConnectionTTL() time.Duration {
return c.connectionTTL
}

// Secure is a flag for secure connection
func (c *Config) Secure() bool {
return c.secure
Expand Down
79 changes: 34 additions & 45 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
internalDiscovery "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery"
discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
internalQuery "github.com/ydb-platform/ydb-go-sdk/v3/internal/query"
queryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
internalRatelimiter "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter"
Expand Down Expand Up @@ -51,8 +50,6 @@ var _ Connection = (*Driver)(nil)

// Driver type provide access to YDB service clients
type Driver struct {
ctxCancel context.CancelFunc

userInfo *dsn.UserInfo

logger log.Logger
Expand Down Expand Up @@ -90,7 +87,7 @@ type Driver struct {

databaseSQLOptions []xsql.ConnectorOption

pool *conn.Pool
// pool *conn.Pool

mtx sync.Mutex
balancer *balancer.Balancer
Expand Down Expand Up @@ -120,13 +117,10 @@ func (d *Driver) Close(ctx context.Context) (finalErr error) {
defer func() {
onDone(finalErr)
}()
d.ctxCancel()

d.mtx.Lock()
defer d.mtx.Unlock()

d.ctxCancel()

defer func() {
for _, f := range d.onClose {
f(d)
Expand All @@ -151,7 +145,7 @@ func (d *Driver) Close(ctx context.Context) (finalErr error) {
d.query.Close,
d.topic.Close,
d.balancer.Close,
d.pool.Release,
// d.pool.Release,
)

var issues []error
Expand Down Expand Up @@ -185,44 +179,44 @@ func (d *Driver) Secure() bool {

// Table returns table client
func (d *Driver) Table() table.Client {
return d.table.Get()
return d.table.Must()
}

// Query returns query client
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func (d *Driver) Query() *internalQuery.Client {
return d.query.Get()
return d.query.Must()
}

// Scheme returns scheme client
func (d *Driver) Scheme() scheme.Client {
return d.scheme.Get()
return d.scheme.Must()
}

// Coordination returns coordination client
func (d *Driver) Coordination() coordination.Client {
return d.coordination.Get()
return d.coordination.Must()
}

// Ratelimiter returns ratelimiter client
func (d *Driver) Ratelimiter() ratelimiter.Client {
return d.ratelimiter.Get()
return d.ratelimiter.Must()
}

// Discovery returns discovery client
func (d *Driver) Discovery() discovery.Client {
return d.discovery.Get()
return d.discovery.Must()
}

// Scripting returns scripting client
func (d *Driver) Scripting() scripting.Client {
return d.scripting.Get()
return d.scripting.Must()
}

// Topic returns topic client
func (d *Driver) Topic() topic.Client {
return d.topic.Get()
return d.topic.Must()
}

// Open connects to database by DSN and return driver runtime holder
Expand Down Expand Up @@ -308,16 +302,11 @@ func New(ctx context.Context, opts ...Option) (_ *Driver, err error) { //nolint:

//nolint:cyclop, nonamedreturns, funlen
func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) {
ctx, driverCtxCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
defer func() {
if err != nil {
driverCtxCancel()
}
}()
ctx, cancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
defer cancel()

d := &Driver{
children: make(map[uint64]*Driver),
ctxCancel: driverCtxCancel,
children: make(map[uint64]*Driver),
}

if caFile, has := os.LookupEnv("YDB_SSL_ROOT_CERTIFICATES_FILE"); has {
Expand Down Expand Up @@ -398,16 +387,16 @@ func (d *Driver) connect(ctx context.Context) (err error) {
))
}

if d.pool == nil {
d.pool = conn.NewPool(ctx, d.config)
}
//if d.pool == nil {
// d.pool = conn.NewPool(ctx, d.config)
//}

d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
d.balancer, err = balancer.New(ctx, d.config /*d.pool,*/, d.discoveryOptions...)
if err != nil {
return xerrors.WithStackTrace(err)
}

d.table = xsync.OnceValue(func() *internalTable.Client {
d.table = xsync.OnceValue(func() (*internalTable.Client, error) {
return internalTable.New(xcontext.ValueOnly(ctx),
d.balancer,
tableConfig.New(
Expand All @@ -419,10 +408,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
d.tableOptions...,
)...,
),
)
), nil
})

d.query = xsync.OnceValue(func() *internalQuery.Client {
d.query = xsync.OnceValue(func() (*internalQuery.Client, error) {
return internalQuery.New(xcontext.ValueOnly(ctx),
d.balancer,
queryConfig.New(
Expand All @@ -434,13 +423,13 @@ func (d *Driver) connect(ctx context.Context) (err error) {
d.queryOptions...,
)...,
),
)
), nil
})
if err != nil {
return xerrors.WithStackTrace(err)
}

d.scheme = xsync.OnceValue(func() *internalScheme.Client {
d.scheme = xsync.OnceValue(func() (*internalScheme.Client, error) {
return internalScheme.New(xcontext.ValueOnly(ctx),
d.balancer,
schemeConfig.New(
Expand All @@ -453,10 +442,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
d.schemeOptions...,
)...,
),
)
), nil
})

d.coordination = xsync.OnceValue(func() *internalCoordination.Client {
d.coordination = xsync.OnceValue(func() (*internalCoordination.Client, error) {
return internalCoordination.New(xcontext.ValueOnly(ctx),
d.balancer,
coordinationConfig.New(
Expand All @@ -468,10 +457,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
d.coordinationOptions...,
)...,
),
)
), nil
})

d.ratelimiter = xsync.OnceValue(func() *internalRatelimiter.Client {
d.ratelimiter = xsync.OnceValue(func() (*internalRatelimiter.Client, error) {
return internalRatelimiter.New(xcontext.ValueOnly(ctx),
d.balancer,
ratelimiterConfig.New(
Expand All @@ -483,12 +472,12 @@ func (d *Driver) connect(ctx context.Context) (err error) {
d.ratelimiterOptions...,
)...,
),
)
), nil
})

d.discovery = xsync.OnceValue(func() *internalDiscovery.Client {
d.discovery = xsync.OnceValue(func() (*internalDiscovery.Client, error) {
return internalDiscovery.New(xcontext.ValueOnly(ctx),
d.pool.Get(endpoint.New(d.config.Endpoint())),
d.balancer,
discoveryConfig.New(
append(
// prepend common params from root config
Expand All @@ -502,10 +491,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
d.discoveryOptions...,
)...,
),
)
), nil
})

d.scripting = xsync.OnceValue(func() *internalScripting.Client {
d.scripting = xsync.OnceValue(func() (*internalScripting.Client, error) {
return internalScripting.New(xcontext.ValueOnly(ctx),
d.balancer,
scriptingConfig.New(
Expand All @@ -517,10 +506,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
d.scriptingOptions...,
)...,
),
)
), nil
})

d.topic = xsync.OnceValue(func() *topicclientinternal.Client {
d.topic = xsync.OnceValue(func() (*topicclientinternal.Client, error) {
return topicclientinternal.New(xcontext.ValueOnly(ctx),
d.balancer,
d.config.Credentials(),
Expand All @@ -532,7 +521,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
},
d.topicOptions...,
)...,
)
), nil
})

return nil
Expand Down
Loading