diff --git a/cmd/client-s3.go b/cmd/client-s3.go index ff95a79300..777600b16b 100644 --- a/cmd/client-s3.go +++ b/cmd/client-s3.go @@ -739,6 +739,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo Host: record.Source.Host, Port: record.Source.Port, UserAgent: record.Source.UserAgent, + VersionID: record.S3.Object.VersionID, } } else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutRetention") { eventsInfo[i] = EventInfo{ @@ -750,6 +751,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo Host: record.Source.Host, Port: record.Source.Port, UserAgent: record.Source.UserAgent, + VersionID: record.S3.Object.VersionID, } } else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutLegalHold") { eventsInfo[i] = EventInfo{ @@ -761,6 +763,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo Host: record.Source.Host, Port: record.Source.Port, UserAgent: record.Source.UserAgent, + VersionID: record.S3.Object.VersionID, } } else { eventsInfo[i] = EventInfo{ @@ -772,6 +775,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo Host: record.Source.Host, Port: record.Source.Port, UserAgent: record.Source.UserAgent, + VersionID: record.S3.Object.VersionID, } } } else { @@ -784,6 +788,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo Host: record.Source.Host, Port: record.Source.Port, UserAgent: record.Source.UserAgent, + VersionID: record.S3.Object.VersionID, } } } @@ -1089,6 +1094,7 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre disableSha256 := putOpts.checksum.IsSet() // pre-emptively disable sha256 payload, if checksum is set. opts := minio.PutObjectOptions{ + Internal: minio.AdvancedPutOptions{SourceVersionID: putOpts.versionID}, UserMetadata: metadata, UserTags: tagsMap, Progress: progress, diff --git a/cmd/client.go b/cmd/client.go index cb81f57348..19b0572926 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -76,6 +76,7 @@ type PutOptions struct { concurrentStream bool ifNotExists bool checksum minio.ChecksumType + versionID string } // StatOptions holds options of the HEAD operation diff --git a/cmd/common-methods.go b/cmd/common-methods.go index 5472b4b46c..2775f89556 100644 --- a/cmd/common-methods.go +++ b/cmd/common-methods.go @@ -506,6 +506,7 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge multipartThreads: uint(multipartThreads), ifNotExists: uploadOpts.ifNotExists, checksum: uploadOpts.urls.checksum, + versionID: uploadOpts.urls.TargetContent.VersionID, } if isReadAt(reader) || length == 0 { diff --git a/cmd/mirror-main.go b/cmd/mirror-main.go index 41b99cbf95..9a289f83e4 100644 --- a/cmd/mirror-main.go +++ b/cmd/mirror-main.go @@ -100,6 +100,10 @@ var ( Name: "disable-multipart", Usage: "disable multipart upload feature", }, + cli.BoolFlag{ + Name: "with-versioning", + Usage: "enable mirroring of version history, including deletions, for versioned buckets", + }, cli.StringSliceFlag{ Name: "exclude", Usage: "exclude object(s) that match specified object name pattern", @@ -227,6 +231,9 @@ EXAMPLES: 16. Cross mirror between sites in a active-active deployment. Site-A: {{.Prompt}} {{.HelpName}} --active-active siteA siteB Site-B: {{.Prompt}} {{.HelpName}} --active-active siteB siteA + + 17. Mirror a bucket from MinIO cloud storage to other minIO cloud storage with versioning enabled. + {{.Prompt}} {{.HelpName}} --with-versioning myminio/bucket1 otherminio/bucket2 `, } @@ -376,7 +383,18 @@ func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs, event EventInfo) if mj.opts.isFake { return sURLs.WithError(nil) } - + // remote S3 to local file system + if sURLs.SourceContent != nil && sURLs.TargetAlias == "" { + // Construct proper path with alias. + sourceWithAlias := filepath.Join(sURLs.SourceAlias, sURLs.SourceContent.URL.Path) + sourceCient, pErr := newClient(sourceWithAlias) + if pErr != nil { + return sURLs.WithError(pErr) + } + if _, err := sourceCient.Stat(ctx, StatOptions{headOnly: true}); err == nil { + return sURLs.WithError(nil) + } + } // Construct proper path with alias. targetWithAlias := filepath.Join(sURLs.TargetAlias, sURLs.TargetContent.URL.Path) clnt, pErr := newClient(targetWithAlias) @@ -389,7 +407,7 @@ func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs, event EventInfo) clnt.AddUserAgent(uaMirrorAppName, ReleaseTag) } contentCh := make(chan *ClientContent, 1) - contentCh <- &ClientContent{URL: *newClientURL(sURLs.TargetContent.URL.Path)} + contentCh <- &ClientContent{URL: *newClientURL(sURLs.TargetContent.URL.Path), VersionID: sURLs.TargetContent.VersionID} close(contentCh) isRemoveBucket := false resultCh := clnt.Remove(ctx, false, isRemoveBucket, false, false, contentCh) @@ -702,6 +720,10 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo) if strings.HasPrefix(string(event.Type), "s3:ObjectCreated:") { sourceModTime, _ := time.Parse(time.RFC3339Nano, event.Time) + targetContent := &ClientContent{URL: *targetURL} + if mj.opts.enableVersion { + targetContent = &ClientContent{URL: *targetURL, VersionID: event.VersionID} + } mirrorURL := URLs{ SourceAlias: sourceAlias, SourceContent: &ClientContent{ @@ -713,7 +735,7 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo) Metadata: event.UserMetadata, }, TargetAlias: targetAlias, - TargetContent: &ClientContent{URL: *targetURL}, + TargetContent: targetContent, MD5: mj.opts.md5, checksum: mj.opts.checksum, DisableMultipart: mj.opts.disableMultipart, @@ -739,11 +761,15 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo) // Ignore delete cascading delete events if cyclical. continue } + targetContent := &ClientContent{URL: *targetURL} + if mj.opts.enableVersion { + targetContent = &ClientContent{URL: *targetURL, VersionID: event.VersionID} + } mirrorURL := URLs{ SourceAlias: sourceAlias, - SourceContent: nil, + SourceContent: &ClientContent{URL: *sourceURL}, TargetAlias: targetAlias, - TargetContent: &ClientContent{URL: *targetURL}, + TargetContent: targetContent, MD5: mj.opts.md5, checksum: mj.opts.checksum, DisableMultipart: mj.opts.disableMultipart, @@ -1024,6 +1050,7 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc md5: md5, checksum: checksum, disableMultipart: cli.Bool("disable-multipart"), + enableVersion: cli.Bool("with-versioning"), skipErrors: cli.Bool("skip-errors"), excludeOptions: cli.StringSlice("exclude"), excludeBuckets: cli.StringSlice("exclude-bucket"), @@ -1053,6 +1080,10 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc mirrorSrcBuckets := srcClt.GetURL().Type == objectStorage && srcClt.GetURL().Path == string(srcClt.GetURL().Separator) mirrorBucketsToBuckets := mirrorSrcBuckets && createDstBuckets + if cli.Bool("with-versioning") && (!checkIfBucketIsVersioned(ctx, srcURL) || !checkIfBucketIsVersioned(ctx, dstURL)) { + fatalIf(errInvalidArgument().Trace(cli.Command.Name), "You cannot specify --with-versioning for buckets where versioning is not enabled.") + } + if mirrorSrcBuckets || createDstBuckets { // Synchronize buckets using dirDifference function for d := range bucketDifference(ctx, srcClt, dstClt, mj.opts) { diff --git a/cmd/mirror-url.go b/cmd/mirror-url.go index 4f15a6a069..2ef41564d0 100644 --- a/cmd/mirror-url.go +++ b/cmd/mirror-url.go @@ -272,7 +272,7 @@ type mirrorOptions struct { skipErrors bool excludeOptions, excludeStorageClasses, excludeBuckets []string encKeyDB map[string][]prefixSSEPair - md5, disableMultipart bool + md5, disableMultipart, enableVersion bool olderThan, newerThan string storageClass string userMetadata map[string]string diff --git a/cmd/watch.go b/cmd/watch.go index 1da8cd6662..32e676ca40 100644 --- a/cmd/watch.go +++ b/cmd/watch.go @@ -37,6 +37,7 @@ type EventInfo struct { Port string UserAgent string Type notification.EventType + VersionID string } // WatchOptions contains watch configuration options