Skip to content

Commit

Permalink
Merge pull request #5075 from 2403905/issue-7075
Browse files Browse the repository at this point in the history
ocm share notification handling added
  • Loading branch information
2403905 authored Feb 17, 2025
2 parents 730ca9c + d6f630e commit 8aefc9e
Show file tree
Hide file tree
Showing 15 changed files with 375 additions and 24 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/add-ocm-notificatin-handler.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Add the ocm notification handler

Added the ocm notification handler that allows receiving a notification from a remote party about changes to a previously known entity.

https://github.com/cs3org/reva/pull/5075
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/eventials/go-tus v0.0.0-20220610120217-05d0564bb571
github.com/gdexlab/go-render v1.0.1
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/render v1.0.3
github.com/go-ldap/ldap/v3 v3.4.8
github.com/go-micro/plugins/v4/events/natsjs v1.2.2
github.com/go-micro/plugins/v4/server/http v1.2.2
Expand Down Expand Up @@ -109,6 +110,7 @@ require (
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371 // indirect
github.com/ajg/form v1.5.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-simplejson v0.5.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371 h1:kkhsdkhsCvIsutKu5zLMgWtgh9YxGCNAw8Ad8hjwfYg=
github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -181,6 +183,8 @@ github.com/go-asn1-ber/asn1-ber v1.5.5 h1:MNHlNMBDgEKD4TcKr36vQN68BA00aDfjIt3/bD
github.com/go-asn1-ber/asn1-ber v1.5.5/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/render v1.0.3 h1:AsXqd2a1/INaIfUSKq3G5uA8weYx20FOsM7uSoCyyt4=
github.com/go-chi/render v1.0.3/go.mod h1:/gr3hVkmYR0YlEy3LxCuVRFzEu9Ruok+gFqbIofjao0=
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 h1:+zs/tPmkDkHx3U66DAb0lQFJrpS6731Oaa12ikc+DiI=
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376/go.mod h1:an3vInlBmSxCcxctByoQdvwPiA7DTK7jaaFDBTtu0ic=
github.com/go-git/go-billy/v5 v5.5.0 h1:yEY4yhzCDuMGSv83oGxiBotRzhwhNr8VZyphhiu+mTU=
Expand Down
2 changes: 1 addition & 1 deletion internal/grpc/services/gateway/ocmcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *svc) DeleteOCMCoreShare(ctx context.Context, req *ocmcore.DeleteOCMCore

res, err := c.DeleteOCMCoreShare(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling UpdateOCMCoreShare")
return nil, errors.Wrap(err, "gateway: error calling DeleteOCMCoreShare")
}

return res, nil
Expand Down
37 changes: 35 additions & 2 deletions internal/grpc/services/ocmcore/ocmcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@ package ocmcore

import (
"context"
"errors"
"fmt"
"time"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
ocmcore "github.com/cs3org/go-cs3apis/cs3/ocm/core/v1beta1"
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
providerpb "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/ocm/share"
"github.com/cs3org/reva/v2/pkg/ocm/share/repository/registry"
ocmuser "github.com/cs3org/reva/v2/pkg/ocm/user"
"github.com/cs3org/reva/v2/pkg/rgrpc"
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/cs3org/reva/v2/pkg/utils/cfg"
"github.com/rs/zerolog"
"google.golang.org/grpc"
Expand Down Expand Up @@ -93,7 +97,11 @@ func (s *service) Close() error {
}

func (s *service) UnprotectedEndpoints() []string {
return []string{"/cs3.ocm.core.v1beta1.OcmCoreAPI/CreateOCMCoreShare"}
return []string{
ocmcore.OcmCoreAPI_CreateOCMCoreShare_FullMethodName,
ocmcore.OcmCoreAPI_UpdateOCMCoreShare_FullMethodName,
ocmcore.OcmCoreAPI_DeleteOCMCoreShare_FullMethodName,
}
}

// CreateOCMCoreShare is called when an OCM request comes into this reva instance from.
Expand Down Expand Up @@ -144,5 +152,30 @@ func (s *service) UpdateOCMCoreShare(ctx context.Context, req *ocmcore.UpdateOCM
}

func (s *service) DeleteOCMCoreShare(ctx context.Context, req *ocmcore.DeleteOCMCoreShareRequest) (*ocmcore.DeleteOCMCoreShareResponse, error) {
return nil, errtypes.NotSupported("not implemented")
grantee := utils.ReadPlainFromOpaque(req.GetOpaque(), "grantee")
if grantee == "" {
return nil, errtypes.UserRequired("missing remote user id in a metadata")
}

user := &userpb.User{Id: ocmuser.RemoteID(&userpb.UserId{OpaqueId: grantee})}

err := s.repo.DeleteReceivedShare(ctx, user, &ocm.ShareReference{
Spec: &ocm.ShareReference_Id{
Id: &ocm.ShareId{
OpaqueId: req.GetId(),
},
},
})
res := &ocmcore.DeleteOCMCoreShareResponse{}
if err == nil {
res.Status = status.NewOK(ctx)
} else {
var notFound errtypes.NotFound
if errors.As(err, &notFound) {
res.Status = status.NewNotFound(ctx, "remote ocm share not found")
} else {
res.Status = status.NewInternal(ctx, "error deleting remote ocm share")
}
}
return res, nil
}
64 changes: 61 additions & 3 deletions internal/grpc/services/ocmshareprovider/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/ocm/client"
"github.com/cs3org/reva/v2/pkg/ocm/payload"
"github.com/cs3org/reva/v2/pkg/ocm/share"
"github.com/cs3org/reva/v2/pkg/ocm/share/repository/registry"
ocmuser "github.com/cs3org/reva/v2/pkg/ocm/user"
Expand Down Expand Up @@ -379,20 +380,77 @@ func (s *service) CreateOCMShare(ctx context.Context, req *ocm.CreateOCMShareReq
}

func (s *service) RemoveOCMShare(ctx context.Context, req *ocm.RemoveOCMShareRequest) (*ocm.RemoveOCMShareResponse, error) {
// TODO (gdelmont): notify the remote provider using the /notification ocm endpoint
// https://cs3org.github.io/OCM-API/docs.html?branch=develop&repo=OCM-API&user=cs3org#/paths/~1notifications/post
user := ctxpkg.ContextMustGetUser(ctx)
getShareRes, err := s.GetOCMShare(ctx, &ocm.GetOCMShareRequest{
Ref: req.Ref,
})
if err != nil {
return &ocm.RemoveOCMShareResponse{
Status: status.NewInternal(ctx, "error getting ocm share"),
}, nil
}
if getShareRes.Status.Code != rpc.Code_CODE_OK {
res := &ocm.RemoveOCMShareResponse{
Status: getShareRes.GetStatus(),
}
return res, nil
}

if err := s.repo.DeleteShare(ctx, user, req.Ref); err != nil {
if errors.Is(err, share.ErrShareNotFound) {
return &ocm.RemoveOCMShareResponse{
Status: status.NewNotFound(ctx, "share does not exist"),
}, nil
}
return &ocm.RemoveOCMShareResponse{
Status: status.NewInternal(ctx, "error removing share"),
Status: status.NewInternal(ctx, "error deleting share"),
}, nil
}

// TODO: We should not fail the whole operation if the notification fails
gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
return &ocm.RemoveOCMShareResponse{
Status: status.NewInternal(ctx, "error getting gateway client"),
}, nil
}

providerInfoResp, err := gatewayClient.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{
Domain: getShareRes.GetShare().GetGrantee().GetUserId().GetIdp(),
})
if err != nil {
return &ocm.RemoveOCMShareResponse{
Status: status.NewInternal(ctx, "error getting provider info"),
}, nil
}

if providerInfoResp.Status.Code != rpc.Code_CODE_OK {
return &ocm.RemoveOCMShareResponse{
Status: providerInfoResp.Status,
}, nil
}

ocmEndpoint, err := getOCMEndpoint(providerInfoResp.GetProviderInfo())
if err != nil {
return &ocm.RemoveOCMShareResponse{
Status: status.NewInternal(ctx, "the selected provider does not have an OCM endpoint"),
}, nil
}
newShareReq := &payload.NotificationRequest{
NotificationType: payload.SHARE_UNSHARED,
ResourceType: "file", // use type "file" for shared files or folders
ProviderId: getShareRes.GetShare().GetId().GetOpaqueId(),
Notification: &payload.Notification{
Grantee: getShareRes.GetShare().GetGrantee().GetUserId().GetOpaqueId(),
},
}
// https://cs3org.github.io/OCM-API/docs.html?branch=develop&repo=OCM-API&user=cs3org#/paths/~1notifications/post
err = s.client.NotifyRemote(ctx, ocmEndpoint, newShareReq)
if err != nil {
// Continue even if the notification fails
appctx.GetLogger(ctx).Err(err).Msg("error notifying ocm remote provider")
}

return &ocm.RemoveOCMShareResponse{
Status: status.NewOK(ctx),
}, nil
Expand Down
113 changes: 102 additions & 11 deletions internal/http/services/ocmd/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,36 @@
package ocmd

import (
"io"
"context"
"encoding/json"
"fmt"
"mime"
"net/http"

"github.com/cs3org/reva/v2/internal/http/services/reqres"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
ocmcore "github.com/cs3org/go-cs3apis/cs3/ocm/core/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/ocm/payload"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-chi/render"
)

// var validate = validator.New()

type notifHandler struct {
gatewaySelector *pool.Selector[gateway.GatewayAPIClient]
}

func (h *notifHandler) init(c *config) error {
gatewaySelector, err := pool.GatewaySelector(c.GatewaySvc)
if err != nil {
return err
}
h.gatewaySelector = gatewaySelector

return nil
}

Expand All @@ -42,25 +58,100 @@ func (h *notifHandler) init(c *config) error {
func (h *notifHandler) Notifications(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := appctx.GetLogger(ctx)
req, err := getNotification(r)
req, err := getNotification(w, r)
if err != nil {
reqres.WriteError(w, r, reqres.APIErrorInvalidParameter, err.Error(), nil)
renderErrorBadRequest(w, r, http.StatusBadRequest, err.Error())
return
}

// TODO(lopresti) this is all to be implemented. For now we just log what we got
log.Debug().Msgf("Received OCM notification: %+v", req)

// this is to please Nextcloud
w.WriteHeader(http.StatusCreated)
var status *rpc.Status
switch req.NotificationType {
case payload.SHARE_UNSHARED:
if req.Notification.Grantee == "" {
renderErrorBadRequest(w, r, http.StatusBadRequest, "grantee is required")
}
status, err = h.handleShareUnshared(ctx, req)
if err != nil {
log.Err(err).Any("NotificationRequest", req).Msg("error getting gateway client")
renderErrorBadRequest(w, r, http.StatusInternalServerError, status.GetMessage())
}
case payload.SHARE_CHANGE_PERMISSION:
// TODO implement the SHARE_CHANGE_PERMISSION
w.WriteHeader(http.StatusNotImplemented)
return
default:
renderErrorBadRequest(w, r, http.StatusBadRequest, "NotificationType "+req.NotificationType+" is not supported")
return
}
// parse the response status
switch status.GetCode() {
case rpc.Code_CODE_OK:
w.WriteHeader(http.StatusCreated)
return
case rpc.Code_CODE_INVALID_ARGUMENT:
renderErrorBadRequest(w, r, http.StatusBadRequest, status.GetMessage())
return
case rpc.Code_CODE_UNAUTHENTICATED:
w.WriteHeader(http.StatusUnauthorized)
return
case rpc.Code_CODE_PERMISSION_DENIED:
w.WriteHeader(http.StatusForbidden)
return
default:
log.Error().Str("code", status.GetCode().String()).Str("message", status.GetMessage()).Str("NotificationType", req.NotificationType).Msg("error handling notification")
w.WriteHeader(http.StatusInternalServerError)
}
}

func (h *notifHandler) handleShareUnshared(ctx context.Context, req *payload.NotificationRequest) (*rpc.Status, error) {
gatewayClient, err := h.gatewaySelector.Next()
if err != nil {
return nil, fmt.Errorf("error getting gateway client: %w", err)
}

o := &typesv1beta1.Opaque{}
utils.AppendPlainToOpaque(o, "grantee", req.Notification.Grantee)

res, err := gatewayClient.DeleteOCMCoreShare(ctx, &ocmcore.DeleteOCMCoreShareRequest{
Id: req.ProviderId,
Opaque: o,
})
if err != nil {
return nil, fmt.Errorf("error calling DeleteOCMCoreShare: %w", err)
}
return res.GetStatus(), nil
}

func getNotification(r *http.Request) (string, error) {
// var req notificationRequest
func getNotification(w http.ResponseWriter, r *http.Request) (*payload.NotificationRequest, error) {
contentType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err == nil && contentType == "application/json" {
bytes, _ := io.ReadAll(r.Body)
return string(bytes), nil
n := &payload.NotificationRequest{}
err := json.NewDecoder(r.Body).Decode(&n)
if err != nil {
return nil, err
}
return n, nil
}
return nil, err
}

func renderJSON(w http.ResponseWriter, r *http.Request, statusCode int, resp any) {
render.Status(r, statusCode)
render.JSON(w, r, resp)
}

func renderErrorBadRequest(w http.ResponseWriter, r *http.Request, statusCode int, message string) {
resp := &payload.ErrorMessageResponse{
Message: "BAD_REQUEST",
ValidationErrors: []*payload.ValidationError{
{
Name: "Notification",
Message: message,
},
},
}
return "", nil
renderJSON(w, r, http.StatusBadRequest, resp)
}
12 changes: 5 additions & 7 deletions internal/http/services/ocmd/shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ import (
"strings"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
providerpb "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
ocmcore "github.com/cs3org/go-cs3apis/cs3/ocm/core/v1beta1"
ocmprovider "github.com/cs3org/go-cs3apis/cs3/ocm/provider/v1beta1"
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
providerpb "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/internal/http/services/reqres"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
Expand Down Expand Up @@ -78,7 +76,7 @@ type createShareRequest struct {
Protocols Protocols `json:"protocol" validate:"required"`
}

// CreateShare sends all the informations to the consumer needed to start
// CreateShare sends all the information to the consumer needed to start
// synchronization between the two services.
func (h *sharesHandler) CreateShare(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
Expand Down Expand Up @@ -180,7 +178,7 @@ func (h *sharesHandler) CreateShare(w http.ResponseWriter, r *http.Request) {
return
}

if userRes.Status.Code != rpc.Code_CODE_OK {
if createShareResp.Status.Code != rpc.Code_CODE_OK {
// TODO: define errors in the cs3apis
reqres.WriteError(w, r, reqres.APIErrorServerError, "error creating ocm share", errors.New(createShareResp.Status.Message))
return
Expand Down
Loading

0 comments on commit 8aefc9e

Please sign in to comment.