Skip to content

Commit

Permalink
Merge pull request #234 from olegKoshmeliuk/caching-srclient
Browse files Browse the repository at this point in the history
feat: added caching to arvo client when producing messages
  • Loading branch information
d-rk authored Feb 10, 2025
2 parents 08c783a + d9ac4d4 commit a1cee5e
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 86 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- [#234](https://github.com/deviceinsight/kafkactl/pull/234) caching to arvo client when producing messages

### Removed
- [#231](https://github.com/deviceinsight/kafkactl/issues/231) Remove support for installing kafkactl via snap.
Expand Down
98 changes: 98 additions & 0 deletions internal/CachingSchemaRegistry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package internal

import (
"net/http"
"time"

"github.com/deviceinsight/kafkactl/v5/internal/helpers/avro"
"github.com/deviceinsight/kafkactl/v5/internal/output"
"github.com/pkg/errors"
"github.com/riferrei/srclient"
)

type CachingSchemaRegistry struct {
subjects []string
schemas map[int]string
latestSchemas map[string]*srclient.Schema
client srclient.ISchemaRegistryClient
}

func CreateCachingSchemaRegistry(context *ClientContext) (*CachingSchemaRegistry, error) {

timeout := context.Avro.RequestTimeout

if context.Avro.RequestTimeout <= 0 {
timeout = 5 * time.Second
}

httpClient := &http.Client{Timeout: timeout}

if context.Avro.TLS.Enabled {
output.Debugf("avro TLS is enabled.")

tlsConfig, err := setupTLSConfig(context.Avro.TLS)
if err != nil {
return nil, errors.Wrap(err, "failed to setup avro tls config")
}

httpClient.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}

baseURL := avro.FormatBaseURL(context.Avro.SchemaRegistry)
client := srclient.CreateSchemaRegistryClientWithOptions(baseURL, httpClient, 16)

if context.Avro.Username != "" {
output.Debugf("avro BasicAuth is enabled.")
client.SetCredentials(context.Avro.Username, context.Avro.Password)
}
return &CachingSchemaRegistry{
client: client,
schemas: make(map[int]string),
latestSchemas: make(map[string]*srclient.Schema),
}, nil

}

func (registry *CachingSchemaRegistry) GetSchemaByVersion(subject string, schemaVersion int) (*srclient.Schema, error) {
return registry.client.GetSchemaByVersion(subject, schemaVersion)
}

func (registry *CachingSchemaRegistry) GetLatestSchema(subject string) (*srclient.Schema, error) {
var err error

if _, ok := registry.latestSchemas[subject]; !ok {
var schema *srclient.Schema
schema, err = registry.client.GetLatestSchema(subject)
if err == nil {
registry.latestSchemas[subject] = schema
}
}

return registry.latestSchemas[subject], err
}

func (registry *CachingSchemaRegistry) Subjects() ([]string, error) {
var err error

if len(registry.subjects) == 0 {
registry.subjects, err = registry.client.GetSubjects()
}

return registry.subjects, err
}

func (registry *CachingSchemaRegistry) GetSchemaByID(id int) (string, error) {
var err error

if _, ok := registry.schemas[id]; !ok {
var schema *srclient.Schema
schema, err = registry.client.GetSchema(id)
if err == nil {
registry.schemas[id] = schema.Schema()
}
}

return registry.schemas[id], err
}
37 changes: 0 additions & 37 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"net/http"
"os"
"os/user"
"regexp"
"strings"
"time"

"github.com/riferrei/srclient"

"github.com/deviceinsight/kafkactl/v5/internal/auth"

"github.com/deviceinsight/kafkactl/v5/internal/global"
Expand Down Expand Up @@ -274,40 +271,6 @@ func CreateClientConfig(context *ClientContext) (*sarama.Config, error) {
return config, nil
}

func CreateAvroSchemaRegistryClient(context *ClientContext) (srclient.ISchemaRegistryClient, error) {

timeout := context.Avro.RequestTimeout

if context.Avro.RequestTimeout <= 0 {
timeout = 5 * time.Second
}

httpClient := &http.Client{Timeout: timeout}

if context.Avro.TLS.Enabled {
output.Debugf("avro TLS is enabled.")

tlsConfig, err := setupTLSConfig(context.Avro.TLS)
if err != nil {
return nil, errors.Wrap(err, "failed to setup avro tls config")
}

httpClient.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}

baseURL := avro.FormatBaseURL(context.Avro.SchemaRegistry)
client := srclient.CreateSchemaRegistryClientWithOptions(baseURL, httpClient, 16)

if context.Avro.Username != "" {
output.Debugf("avro BasicAuth is enabled.")
client.SetCredentials(context.Avro.Username, context.Avro.Password)
}

return client, nil
}

func GetClientID(context *ClientContext, defaultPrefix string) string {

var (
Expand Down
3 changes: 2 additions & 1 deletion internal/consume/AvroMessageDeserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/deviceinsight/kafkactl/v5/internal/helpers/avro"

"github.com/IBM/sarama"
"github.com/deviceinsight/kafkactl/v5/internal"
"github.com/deviceinsight/kafkactl/v5/internal/output"
"github.com/deviceinsight/kafkactl/v5/internal/util"
"github.com/linkedin/goavro/v2"
Expand All @@ -18,7 +19,7 @@ import (
type AvroMessageDeserializer struct {
topic string
jsonCodec avro.JSONCodec
registry *CachingSchemaRegistry
registry *internal.CachingSchemaRegistry
}

type avroMessage struct {
Expand Down
39 changes: 0 additions & 39 deletions internal/consume/CachingSchemaRegistry.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/consume/ProtobufMessageDeserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func CreateProtobufMessageDeserializer(context protobuf.SearchContext, keyType,
}

keyDescriptor := protobuf.ResolveMessageType(context, keyType)
if valueDescriptor == nil && keyType != "" {
if keyType != "" {
return nil, errors.Errorf("key message type %q not found in provided files", valueType)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/consume/consume-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (operation *Operation) Consume(topic string, flags Flags) error {
var deserializers MessageDeserializerChain

if clientContext.Avro.SchemaRegistry != "" {
client, err := internal.CreateAvroSchemaRegistryClient(&clientContext)
client, err := internal.CreateCachingSchemaRegistry(&clientContext)
if err != nil {
return err
}

deserializer := AvroMessageDeserializer{topic: topic, registry: CreateCachingSchemaRegistry(client),
deserializer := AvroMessageDeserializer{topic: topic, registry: client,
jsonCodec: clientContext.Avro.JSONCodec}

deserializers = append(deserializers, deserializer)
Expand Down
7 changes: 4 additions & 3 deletions internal/producer/AvroMessageSerializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package producer
import (
"encoding/binary"

"github.com/deviceinsight/kafkactl/v5/internal"
"github.com/deviceinsight/kafkactl/v5/internal/helpers/avro"
"github.com/riferrei/srclient"

Expand All @@ -15,14 +16,14 @@ import (
type AvroMessageSerializer struct {
topic string
jsonCodec avro.JSONCodec
client srclient.ISchemaRegistryClient
client *internal.CachingSchemaRegistry
}

func (serializer AvroMessageSerializer) encode(rawData []byte, schemaVersion int, avroSchemaType string) ([]byte, error) {

subject := serializer.topic + "-" + avroSchemaType

subjects, err := serializer.client.GetSubjects()
subjects, err := serializer.client.Subjects()

if err != nil {
return nil, errors.Wrap(err, "failed to list available avro schemas")
Expand Down Expand Up @@ -80,7 +81,7 @@ func (serializer AvroMessageSerializer) encode(rawData []byte, schemaVersion int

func (serializer AvroMessageSerializer) CanSerialize(topic string) (bool, error) {

subjects, err := serializer.client.GetSubjects()
subjects, err := serializer.client.Subjects()

if err != nil {
return false, errors.Wrap(err, "failed to list available avro schemas")
Expand Down
2 changes: 1 addition & 1 deletion internal/producer/producer-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (operation *Operation) Produce(topic string, flags Flags) error {
serializers := MessageSerializerChain{topic: topic}

if clientContext.Avro.SchemaRegistry != "" {
client, err := internal.CreateAvroSchemaRegistryClient(&clientContext)
client, err := internal.CreateCachingSchemaRegistry(&clientContext)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions internal/testutil/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
"testing"
"time"

"github.com/riferrei/srclient"

"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/deviceinsight/kafkactl/v5/internal/util"
"github.com/riferrei/srclient"
)

func CreateTopic(t *testing.T, topicPrefix string, flags ...string) string {
Expand Down

0 comments on commit a1cee5e

Please sign in to comment.