Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions golang/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ import (
"sync"
"time"

innerMD "github.com/apache/rocketmq-clients/golang/v5/metadata"
"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
"github.com/google/uuid"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"

innerMD "github.com/apache/rocketmq-clients/golang/v5/metadata"
"github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
)

type Client interface {
Expand All @@ -59,7 +60,7 @@ type defaultClientSession struct {
cli *defaultClient
timeout time.Duration
recovering bool
recoveryWaitTime time.Duration `default:"5s"`
recoveryWaitTime time.Duration
}

func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientSession, error) {
Expand All @@ -68,10 +69,11 @@ func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientS
return nil, err
}
cs := &defaultClientSession{
endpoints: endpoints,
cli: cli,
timeout: 365 * 24 * time.Hour,
recovering: false,
endpoints: endpoints,
cli: cli,
timeout: 365 * 24 * time.Hour,
recovering: false,
recoveryWaitTime: 5 * time.Second,
}
cs.startUp()
return cs, nil
Expand Down Expand Up @@ -352,13 +354,15 @@ func (cli *defaultClient) getMessageQueues(ctx context.Context, topic string) ([
// telemeter to all messageQueues
endpointsSet := make(map[string]bool)
for _, messageQueue := range route {
target := utils.EndpointsToString(messageQueue.GetBroker().GetEndpoints())
if _, ok := endpointsSet[target]; ok {
continue
}
endpointsSet[target] = true
if err = cli.mustSyncSettingsToTargert(target); err != nil {
return nil, err
for _, address := range messageQueue.GetBroker().GetEndpoints().GetAddresses() {
target := utils.ParseAddress(address)
if _, ok := endpointsSet[target]; ok {
continue
}
endpointsSet[target] = true
if err = cli.mustSyncSettingsToTargert(target); err != nil {
return nil, err
}
}
}

Expand Down Expand Up @@ -403,12 +407,14 @@ func (cli *defaultClient) getTotalTargets() []string {
cli.router.Range(func(_, v interface{}) bool {
messageQueues := v.([]*v2.MessageQueue)
for _, messageQueue := range messageQueues {
target := utils.EndpointsToString(messageQueue.GetBroker().GetEndpoints())
if _, ok := endpointsSet[target]; ok {
continue
for _, address := range messageQueue.GetBroker().GetEndpoints().GetAddresses() {
target := utils.ParseAddress(address)
if _, ok := endpointsSet[target]; ok {
continue
}
endpointsSet[target] = true
endpoints = append(endpoints, target)
}
endpointsSet[target] = true
endpoints = append(endpoints, target)
}
return true
})
Expand Down
10 changes: 2 additions & 8 deletions golang/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package golang

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -184,13 +183,8 @@ func (cm *defaultClientManager) cleanRpcClient() {
}
}
func (cm *defaultClientManager) getRpcClient(endpoints *v2.Endpoints) (RpcClient, error) {
var target string
if endpoints.GetScheme() == v2.AddressScheme_IPv4 || endpoints.GetScheme() == v2.AddressScheme_IPv6 {
serviceName := utils.EndpointsToString(endpoints)
target = fmt.Sprintf("%s:///%s", DefaultScheme, serviceName)
} else {
target = utils.ParseAddress(utils.SelectAnAddress(endpoints))
}
target := utils.ParseAddress(utils.SelectAnAddress(endpoints))

cm.rpcClientTableLock.RLock()
item, ok := cm.rpcClientTable[target]
cm.rpcClientTableLock.RUnlock()
Expand Down
4 changes: 1 addition & 3 deletions golang/client_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,11 @@ func TestCMUnRegisterClient(t *testing.T) {
var (
fakeHost = "127.0.0.1"
fakePort int32 = 80
fakeScheme = "ip"
fakeAddress = fmt.Sprintf("%s:///%s:%d", fakeScheme, fakeHost, fakePort)
fakeAddress = fmt.Sprintf("%s:%d", fakeHost, fakePort)
)

func fakeEndpoints() *v2.Endpoints {
return &v2.Endpoints{
Scheme: v2.AddressScheme_IPv4,
Addresses: []*v2.Address{
{
Host: fakeHost,
Expand Down
102 changes: 67 additions & 35 deletions golang/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ package golang

import (
"context"
"errors"
"sync"
"time"

"go.uber.org/atomic"

"contrib.go.opencensus.io/exporter/ocagent"
"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
)

type InvocationStatus string
Expand Down Expand Up @@ -93,43 +93,75 @@ func init() {
}

type defaultClientMeter struct {
enabled atomic.Bool
enabled bool
endpoints *v2.Endpoints
ocaExporter view.Exporter
mutex sync.Mutex
rwMutex sync.RWMutex
}

func (dcm *defaultClientMeter) shutdown() {
if !dcm.enabled.Load() {
dcm.rwMutex.RLock()
if !dcm.enabled {
dcm.rwMutex.RUnlock()
return
}
dcm.mutex.Lock()
defer dcm.mutex.Unlock()
dcm.rwMutex.RUnlock()

dcm.rwMutex.Lock()
defer dcm.rwMutex.Unlock()
if !dcm.enabled { // Double check
return
}
dcm.enabled = false
dcm.endpoints = nil
view.UnregisterExporter(dcm.ocaExporter)
if dcm.ocaExporter != nil {
exporter, ok := dcm.ocaExporter.(*ocagent.Exporter)
if ok {
err := exporter.Stop()
if err != nil {
sugarBaseLogger.Errorf("ocExporter stop failed, err=%w", err)
}
exporter, ok := dcm.ocaExporter.(*ocagent.Exporter)
if ok {
err := exporter.Stop()
if err != nil {
sugarBaseLogger.Errorf("ocExporter stop failed, err=%w", err)
}
}
dcm.ocaExporter = nil
}

func (dcm *defaultClientMeter) start() {
if !dcm.enabled.Load() {
return
func (dcm *defaultClientMeter) start(endpoints *v2.Endpoints, exporter view.Exporter) error {
if endpoints == nil || exporter == nil {
return errors.New("endpoints or exporter cannot be nil")
}

dcm.rwMutex.RLock()
if dcm.enabled {
dcm.rwMutex.RUnlock()
return errors.New("client meter is already enabled and cannot be started again")
}
dcm.rwMutex.RUnlock()

dcm.rwMutex.Lock()
defer dcm.rwMutex.Unlock()
if dcm.enabled { // Double check
return errors.New("client meter is already enabled and cannot be started again")
}
dcm.enabled = true
dcm.endpoints = endpoints
dcm.ocaExporter = exporter
view.RegisterExporter(dcm.ocaExporter)
return nil
}

var NewDefaultClientMeter = func(exporter view.Exporter, on bool, endpoints *v2.Endpoints, clientID string) *defaultClientMeter {
return &defaultClientMeter{
enabled: *atomic.NewBool(on),
endpoints: endpoints,
ocaExporter: exporter,
func (dcm *defaultClientMeter) isEnabled() bool {
dcm.rwMutex.RLock()
defer dcm.rwMutex.RUnlock()
return dcm.enabled
}

func (dcm *defaultClientMeter) compareEndpoints(endpoints *v2.Endpoints) bool {
dcm.rwMutex.RLock()
defer dcm.rwMutex.RUnlock()
if !dcm.enabled {
return false
}
return utils.CompareEndpoints(dcm.endpoints, endpoints)
}

type MessageMeterInterceptor interface {
Expand All @@ -152,7 +184,6 @@ var _ = ClientMeterProvider(&defaultClientMeterProvider{})
type defaultClientMeterProvider struct {
client Client
clientMeter *defaultClientMeter
globalMutex sync.Mutex
}

func (dcmp *defaultClientMeterProvider) getClientImpl() isClient {
Expand Down Expand Up @@ -318,26 +349,24 @@ func (dmmi *defaultMessageMeterInterceptor) doAfter(messageHookPoints MessageHoo
return nil
}
func (dcmp *defaultClientMeterProvider) isEnabled() bool {
return dcmp.clientMeter.enabled.Load()
return dcmp.clientMeter.isEnabled()
}
func (dcmp *defaultClientMeterProvider) getClientID() string {
return dcmp.client.GetClientID()
}
func (dcmp *defaultClientMeterProvider) Reset(metric *v2.Metric) {
dcmp.globalMutex.Lock()
defer dcmp.globalMutex.Unlock()
endpoints := metric.GetEndpoints()
if dcmp.clientMeter.enabled.Load() && metric.GetOn() && utils.CompareEndpoints(dcmp.clientMeter.endpoints, endpoints) {
if metric.GetOn() && dcmp.clientMeter.compareEndpoints(endpoints) {
sugarBaseLogger.Infof("metric settings is satisfied by the current message meter, clientId=%s", dcmp.client.GetClientID())
return
}

dcmp.clientMeter.shutdown()
if !metric.GetOn() {
dcmp.clientMeter.shutdown()
sugarBaseLogger.Infof("metric is off, clientId=%s", dcmp.client.GetClientID())
dcmp.clientMeter = NewDefaultClientMeter(nil, false, nil, dcmp.client.GetClientID())
return
}

agentAddr := utils.ParseAddress(utils.SelectAnAddress(endpoints))
exporter, err := ocagent.NewExporter(
ocagent.WithInsecure(),
Expand All @@ -349,17 +378,20 @@ func (dcmp *defaultClientMeterProvider) Reset(metric *v2.Metric) {
sugarBaseLogger.Errorf("exception raised when resetting message meter, clientId=%s", dcmp.client.GetClientID())
return
}

// Reset message meter.
dcmp.clientMeter.shutdown()
dcmp.clientMeter = NewDefaultClientMeter(exporter, true, endpoints, dcmp.client.GetClientID())
dcmp.clientMeter.start()
err = dcmp.clientMeter.start(endpoints, exporter)
if err != nil {
sugarBaseLogger.Errorf("exception raised when resetting message meter, err=%v, clientId=%s", err, dcmp.client.GetClientID())
return
}
sugarBaseLogger.Infof("metrics is on, endpoints=%v, clientId=%s", endpoints, dcmp.client.GetClientID())
}

var NewDefaultClientMeterProvider = func(client *defaultClient) ClientMeterProvider {
cmp := &defaultClientMeterProvider{
client: client,
clientMeter: NewDefaultClientMeter(nil, false, nil, "nil"),
clientMeter: &defaultClientMeter{},
}
client.registerMessageInterceptor(NewDefaultMessageMeterInterceptor(cmp))
return cmp
Expand Down
82 changes: 0 additions & 82 deletions golang/name_resolver.go

This file was deleted.

Loading