@@ -18,11 +18,17 @@ package ffdx
1818
1919import (
2020 "context"
21+ "crypto/x509"
2122 "encoding/json"
23+ "encoding/pem"
24+ "errors"
2225 "fmt"
2326 "io"
2427 "strings"
2528 "sync"
29+ "time"
30+
31+ "github.com/hyperledger/firefly/internal/metrics"
2632
2733 "github.com/go-resty/resty/v2"
2834 "github.com/hyperledger/firefly-common/pkg/config"
@@ -54,6 +60,8 @@ type FFDX struct {
5460 retry * retry.Retry
5561 backgroundStart bool
5662 backgroundRetry * retry.Retry
63+
64+ metrics metrics.Manager // optional
5765}
5866
5967type dxNode struct {
@@ -168,7 +176,7 @@ func (h *FFDX) Name() string {
168176 return "ffdx"
169177}
170178
171- func (h * FFDX ) Init (ctx context.Context , cancelCtx context.CancelFunc , config config.Section ) (err error ) {
179+ func (h * FFDX ) Init (ctx context.Context , cancelCtx context.CancelFunc , config config.Section , metrics metrics. Manager ) (err error ) {
172180 h .ctx = log .WithLogField (ctx , "dx" , "https" )
173181 h .cancelCtx = cancelCtx
174182 h .ackChannel = make (chan * ack )
@@ -179,6 +187,7 @@ func (h *FFDX) Init(ctx context.Context, cancelCtx context.CancelFunc, config co
179187 }
180188 h .needsInit = config .GetBool (DataExchangeInitEnabled )
181189 h .nodes = make (map [string ]* dxNode )
190+ h .metrics = metrics
182191
183192 if config .GetString (ffresty .HTTPConfigURL ) == "" {
184193 return i18n .NewError (ctx , coremsgs .MsgMissingPluginConfig , "url" , "dataexchange.ffdx" )
@@ -295,7 +304,13 @@ func (h *FFDX) beforeConnect(ctx context.Context, w wsclient.WSClient) error {
295304 return fmt .Errorf ("DX returned non-ready status: %s" , status .Status )
296305 }
297306 }
307+
298308 h .initialized = true
309+
310+ for _ , cb := range h .callbacks .handlers {
311+ cb .DXConnect (h )
312+ }
313+
299314 return nil
300315}
301316
@@ -448,6 +463,91 @@ func (h *FFDX) TransferBlob(ctx context.Context, nsOpID string, peer, sender fft
448463 return nil
449464}
450465
466+ func (h * FFDX ) CheckNodeIdentityStatus (ctx context.Context , node * core.Identity ) error {
467+ if node == nil {
468+ return i18n .NewError (ctx , coremsgs .MsgNodeNotProvidedForCheck )
469+ }
470+
471+ var mismatchState = metrics .NodeIdentityDXCertMismatchStatusUnknown
472+ defer func () {
473+ if h .metrics != nil && h .metrics .IsMetricsEnabled () {
474+ h .metrics .NodeIdentityDXCertMismatch (node .Namespace , mismatchState )
475+ }
476+ log .L (ctx ).Debugf ("Identity status checked against DX node='%s' mismatch_state='%s'" , node .Name , mismatchState )
477+ }()
478+
479+ dxPeer , err := h .GetEndpointInfo (ctx , node .Name ) // should be the same as the local node
480+ if err != nil {
481+ return err
482+ }
483+
484+ dxPeerCert := dxPeer .GetString ("cert" )
485+ // if this occurs, it is either a misconfigured / broken DX or likely a DX that is compatible from an API perspective
486+ // but does not have the same peer info as the HTTPS mTLS DX
487+ if dxPeerCert == "" {
488+ log .L (ctx ).Debugf ("DX peer does not have a 'cert', DX plugin may be unsupported" )
489+ return nil
490+ }
491+
492+ expiry , err := extractSoonestExpiryFromCertBundle (strings .ReplaceAll (dxPeerCert , `\n` , "\n " ))
493+ if err == nil {
494+ if expiry .Before (time .Now ()) {
495+ log .L (ctx ).Warnf ("DX certificate for node '%s' has expired" , node .Name )
496+ }
497+
498+ if h .metrics != nil && h .metrics .IsMetricsEnabled () {
499+ h .metrics .NodeIdentityDXCertExpiry (node .Namespace , expiry )
500+ }
501+ } else {
502+ log .L (ctx ).Errorf ("Failed to find x509 cert within DX cert bundle node='%s' namespace='%s'" , node .Name , node .Namespace )
503+ }
504+
505+ if node .Profile == nil {
506+ return i18n .NewError (ctx , coremsgs .MsgNodeNotProvidedForCheck )
507+ }
508+
509+ nodeCert := node .Profile .GetString ("cert" )
510+ if nodeCert != "" {
511+ mismatchState = metrics .NodeIdentityDXCertMismatchStatusHealthy
512+ if dxPeerCert != nodeCert {
513+ log .L (ctx ).Warnf ("DX certificate for node '%s' is out-of-sync with on-chain identity" , node .Name )
514+ mismatchState = metrics .NodeIdentityDXCertMismatchStatusMismatched
515+ }
516+ }
517+
518+ return nil
519+ }
520+
521+ // We assume the cert with the soonest expiry is the leaf cert, but even if its the CA,
522+ // that's what will invalidate the leaf anyways, so really we only care about the soonest expiry.
523+ // So we loop through the bundle finding the soonest expiry, not necessarily the leaf.
524+ func extractSoonestExpiryFromCertBundle (certBundle string ) (time.Time , error ) {
525+ var expiringCert * x509.Certificate
526+ var block * pem.Block
527+ var rest = []byte (certBundle )
528+
529+ for {
530+ block , rest = pem .Decode (rest )
531+ if block == nil {
532+ break
533+ }
534+
535+ cert , err := x509 .ParseCertificate (block .Bytes )
536+ if err != nil {
537+ return time.Time {}, fmt .Errorf ("failed to parse non-certificate within bundle: %v" , err )
538+ }
539+ if expiringCert == nil || cert .NotAfter .Before (expiringCert .NotAfter ) {
540+ expiringCert = cert
541+ }
542+ }
543+
544+ if expiringCert == nil {
545+ return time.Time {}, errors .New ("no valid certificate found" )
546+ }
547+
548+ return expiringCert .NotAfter .UTC (), nil
549+ }
550+
451551func (h * FFDX ) ackLoop () {
452552 for {
453553 select {
0 commit comments