diff --git a/code/go/0chain.net/blobbercore/allocation/allocationchange.go b/code/go/0chain.net/blobbercore/allocation/allocationchange.go index d835d7671..ccb978a68 100644 --- a/code/go/0chain.net/blobbercore/allocation/allocationchange.go +++ b/code/go/0chain.net/blobbercore/allocation/allocationchange.go @@ -66,14 +66,13 @@ func (ac *AllocationChangeCollector) BeforeSave(tx *gorm.DB) error { } type AllocationChange struct { - ChangeID int64 `gorm:"column:id;primaryKey"` Size int64 `gorm:"column:size;not null;default:0"` Operation string `gorm:"column:operation;size:20;not null"` ConnectionID string `gorm:"column:connection_id;size:64;not null"` Connection AllocationChangeCollector `gorm:"foreignKey:ConnectionID"` // References allocation_connections(id) Input string `gorm:"column:input"` FilePath string `gorm:"-"` - LookupHash string `gorm:"column:lookup_hash;size:64"` + LookupHash string `gorm:"column:lookup_hash;primaryKey;size:64"` datastore.ModelWithTS } @@ -95,7 +94,10 @@ func (ac *AllocationChange) BeforeSave(tx *gorm.DB) error { func (change *AllocationChange) Save(ctx context.Context) error { db := datastore.GetStore().GetTransaction(ctx) - return db.Save(change).Error + return db.Table(change.TableName()).Where("lookup_hash = ?", change.ConnectionID, change.LookupHash).Updates(map[string]interface{}{ + "size": change.Size, + "input": change.Input, + }).Error } func (change *AllocationChange) Create(ctx context.Context) error { @@ -218,10 +220,12 @@ func (cc *AllocationChangeCollector) ComputeProperties() { func (cc *AllocationChangeCollector) ApplyChanges(ctx context.Context, allocationRoot, prevAllocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) { + now := time.Now() rootRef, err := cc.GetRootRef(ctx) if err != nil { return rootRef, err } + elapsedRootRef := time.Since(now) if rootRef.Hash != prevAllocationRoot { return rootRef, common.NewError("invalid_prev_root", "Invalid prev root") } @@ -232,12 +236,16 @@ func (cc *AllocationChangeCollector) ApplyChanges(ctx context.Context, allocatio return rootRef, err } } + elapsedApplyChanges := time.Since(now) - elapsedRootRef collector := reference.NewCollector(len(cc.Changes)) _, err = rootRef.CalculateHash(ctx, true, collector) if err != nil { return rootRef, err } + elapsedCalculateHash := time.Since(now) - elapsedApplyChanges - elapsedRootRef err = collector.Finalize(ctx) + elapsedFinalize := time.Since(now) - elapsedCalculateHash - elapsedApplyChanges - elapsedRootRef + logging.Logger.Info("applyChanges: ", zap.String("allocation_id", cc.AllocationID), zap.Duration("elapsedRootRef", elapsedRootRef), zap.Duration("elapsedApplyChanges", elapsedApplyChanges), zap.Duration("elapsedCalculateHash", elapsedCalculateHash), zap.Duration("elapsedFinalize", elapsedFinalize)) return rootRef, err } diff --git a/code/go/0chain.net/blobbercore/allocation/file_changer_base.go b/code/go/0chain.net/blobbercore/allocation/file_changer_base.go index 3bc327820..337ee9be4 100644 --- a/code/go/0chain.net/blobbercore/allocation/file_changer_base.go +++ b/code/go/0chain.net/blobbercore/allocation/file_changer_base.go @@ -94,7 +94,7 @@ type FileCommand interface { ProcessThumbnail(allocationObj *Allocation) error // UpdateChange update AllocationChangeProcessor. It will be president in db for committing transcation - UpdateChange(ctx context.Context, connectionObj *AllocationChangeCollector) error + UpdateChange(ctx context.Context) error // AddChange add Allocation change to db AddChange(ctx context.Context) error diff --git a/code/go/0chain.net/blobbercore/handler/file_command_delete.go b/code/go/0chain.net/blobbercore/handler/file_command_delete.go index b8b9ae0f2..22234e54d 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_delete.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_delete.go @@ -11,6 +11,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/encryption" ) // DeleteFileCommand command for deleting file @@ -62,10 +63,12 @@ func (cmd *DeleteFileCommand) IsValidated(ctx context.Context, req *http.Request } // UpdateChange add DeleteFileChange in db -func (cmd *DeleteFileCommand) UpdateChange(ctx context.Context, connectionObj *allocation.AllocationChangeCollector) error { - connectionObj.AddChange(cmd.allocationChange, cmd.changeProcessor) - - return connectionObj.Save(ctx) +func (cmd *DeleteFileCommand) UpdateChange(ctx context.Context) error { + err := cmd.AddChange(ctx) + if err == gorm.ErrDuplicatedKey { + return nil + } + return err } func (cmd *DeleteFileCommand) AddChange(ctx context.Context) error { @@ -93,7 +96,7 @@ func (cmd *DeleteFileCommand) ProcessContent(_ context.Context, allocationObj *a cmd.allocationChange.ConnectionID = connectionID cmd.allocationChange.Size = 0 - deleteSize cmd.allocationChange.Operation = constants.FileOperationDelete - cmd.allocationChange.LookupHash = cmd.existingFileRef.LookupHash + cmd.allocationChange.LookupHash = encryption.Hash(connectionID + cmd.path) allocation.UpdateConnectionObjSize(connectionID, cmd.allocationChange.Size) diff --git a/code/go/0chain.net/blobbercore/handler/file_command_update.go b/code/go/0chain.net/blobbercore/handler/file_command_update.go index ef2458141..b670cca3d 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_update.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_update.go @@ -205,35 +205,16 @@ func (cmd *UpdateFileCommand) reloadChange() { } // UpdateChange add UpdateFileChanger in db -func (cmd *UpdateFileCommand) UpdateChange(ctx context.Context, connectionObj *allocation.AllocationChangeCollector) error { - cmd.fileChanger.AllocationID = connectionObj.AllocationID - for _, c := range connectionObj.Changes { - filePath, _ := c.GetOrParseAffectedFilePath() - if c.Operation != sdkConst.FileOperationUpdate || cmd.fileChanger.Path != filePath { - continue - } - - c.Size = connectionObj.Size - c.Input, _ = cmd.fileChanger.Marshal() - - //c.ModelWithTS.UpdatedAt = time.Now() - err := connectionObj.Save(ctx) - if err != nil { - return err - } - - return c.Save(ctx) - } - - //NOT FOUND - connectionObj.AddChange(cmd.allocationChange, cmd.fileChanger) - - return connectionObj.Save(ctx) +func (cmd *UpdateFileCommand) UpdateChange(ctx context.Context) error { + connectionInput, _ := cmd.fileChanger.Marshal() + cmd.allocationChange.LookupHash = encryption.Hash(cmd.fileChanger.ConnectionID + cmd.fileChanger.Path) + cmd.allocationChange.Input = connectionInput + return cmd.allocationChange.Save(ctx) } func (cmd *UpdateFileCommand) AddChange(ctx context.Context) error { connectionInput, _ := cmd.fileChanger.Marshal() - cmd.allocationChange.LookupHash = cmd.existingFileRef.LookupHash + cmd.allocationChange.LookupHash = encryption.Hash(cmd.fileChanger.ConnectionID + cmd.fileChanger.Path) cmd.allocationChange.Input = connectionInput return cmd.allocationChange.Create(ctx) } diff --git a/code/go/0chain.net/blobbercore/handler/file_command_upload.go b/code/go/0chain.net/blobbercore/handler/file_command_upload.go index ded271b1e..31babffc5 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_upload.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_upload.go @@ -223,34 +223,16 @@ func (cmd *UploadFileCommand) reloadChange() { } // UpdateChange replace AddFileChange in db -func (cmd *UploadFileCommand) UpdateChange(ctx context.Context, connectionObj *allocation.AllocationChangeCollector) error { - cmd.fileChanger.AllocationID = connectionObj.AllocationID - for _, c := range connectionObj.Changes { - filePath, _ := c.GetOrParseAffectedFilePath() - if c.Operation != constants.FileOperationInsert || cmd.fileChanger.Path != filePath { - continue - } - c.Size = cmd.fileChanger.Size - c.Input, _ = cmd.fileChanger.Marshal() - - //c.ModelWithTS.UpdatedAt = time.Now() - err := connectionObj.Save(ctx) - if err != nil { - return err - } - - return c.Save(ctx) - } - - //NOT FOUND - connectionObj.AddChange(cmd.allocationChange, cmd.fileChanger) - - return connectionObj.Save(ctx) +func (cmd *UploadFileCommand) UpdateChange(ctx context.Context) error { + connectionInput, _ := cmd.fileChanger.Marshal() + cmd.allocationChange.LookupHash = encryption.Hash(cmd.fileChanger.ConnectionID + cmd.fileChanger.Path) + cmd.allocationChange.Input = connectionInput + return cmd.allocationChange.Save(ctx) } func (cmd *UploadFileCommand) AddChange(ctx context.Context) error { connectionInput, _ := cmd.fileChanger.Marshal() - cmd.allocationChange.LookupHash = reference.GetReferenceLookup(cmd.fileChanger.AllocationID, cmd.fileChanger.Path) + cmd.allocationChange.LookupHash = encryption.Hash(cmd.fileChanger.ConnectionID + cmd.fileChanger.Path) cmd.allocationChange.Input = connectionInput return cmd.allocationChange.Create(ctx) } diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index 3144d1054..968326e11 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -601,6 +601,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b } if latestWriteMarkerEntity.WM.ChainSize+connectionObj.Size != writeMarker.ChainSize { + logging.Logger.Error("latestWMCommit: ", zap.String("allocationRoot", latestWriteMarkerEntity.WM.AllocationRoot), zap.String("allocationID", allocationID), zap.Int64("latestWMChainLength", latestWriteMarkerEntity.WM.ChainSize), zap.Int64("connectionObjSize", connectionObj.Size), zap.Int64("writeMarkerChainSize", writeMarker.ChainSize)) return nil, common.NewErrorf("invalid_chain_size", "Invalid chain size. expected:%v got %v", latestWriteMarkerEntity.WM.ChainSize+connectionObj.Size, writeMarker.ChainSize) } @@ -671,7 +672,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b } elapsedApplyChanges := time.Since(startTime) - elapsedAllocation - elapsedGetLock - - elapsedGetConnObj - elapsedVerifyWM - elapsedWritePreRedeem + elapsedGetConnObj - elapsedVerifyWM - elapsedWritePreRedeem - elapsedMoveToFilestore allocationRoot := rootRef.Hash fileMetaRoot := rootRef.FileMetaHash @@ -1297,11 +1298,7 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*all } // Update/Save the change if res.UpdateChange { - dbConnectionObj, err := allocation.GetAllocationChanges(ctx, connectionID, allocationID, clientID) - if err != nil { - return nil, err - } - err = cmd.UpdateChange(ctx, dbConnectionObj) + err = cmd.UpdateChange(ctx) if err != nil { return nil, err } diff --git a/code/go/0chain.net/blobbercore/handler/storage_handler.go b/code/go/0chain.net/blobbercore/handler/storage_handler.go index 18bc75e4d..930266e80 100644 --- a/code/go/0chain.net/blobbercore/handler/storage_handler.go +++ b/code/go/0chain.net/blobbercore/handler/storage_handler.go @@ -21,6 +21,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker" "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/blobber/code/go/0chain.net/core/encryption" + "github.com/0chain/blobber/code/go/0chain.net/core/logging" . "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/blobber/code/go/0chain.net/core/node" ) @@ -639,6 +640,7 @@ func (fsh *StorageHandler) GetObjectTree(ctx context.Context, r *http.Request) ( if latestWM.Status == writemarker.Committed { latestWM.WM.ChainLength = 0 // start a new chain } + logging.Logger.Info("latestWMrefPath: ", zap.String("allocationRoot", latestWM.WM.AllocationRoot), zap.String("allocationID", allocationID), zap.Int64("latestWMChainLength", latestWM.WM.ChainSize)) refPathResult.LatestWM = &latestWM.WM } return &refPathResult, nil diff --git a/goose/migrations/1717416291_change_lookuphash.sql b/goose/migrations/1717416291_change_lookuphash.sql index e2d8e71dd..7481b5e0b 100644 --- a/goose/migrations/1717416291_change_lookuphash.sql +++ b/goose/migrations/1717416291_change_lookuphash.sql @@ -1,6 +1,5 @@ -- +goose Up -- +goose StatementBegin -ALTER TABLE allocation_changes ADD COLUMN lookup_hash character varying(64); +ALTER TABLE allocation_changes ADD COLUMN lookup_hash character varying(64); --- CREATE UNIQUE INDEX idx_allocation_changes_lookup_hash ON allocation_changes USING HASH(lookup_hash,connection_id); -- +goose StatementEnd \ No newline at end of file diff --git a/goose/migrations/1717845128_connection_index.sql b/goose/migrations/1717845128_connection_index.sql new file mode 100644 index 000000000..812249c48 --- /dev/null +++ b/goose/migrations/1717845128_connection_index.sql @@ -0,0 +1,7 @@ +-- +goose Up +-- +goose StatementBegin + +ALTER TABLE allocation_changes DROP CONSTRAINT allocation_changes_pkey CASCADE, +ADD CONSTRAINT allocation_changes_pkey PRIMARY KEY (lookup_hash); +ALTER TABLE allocation_changes DROP COLUMN id; +-- +goose StatementEnd \ No newline at end of file