add retries when writing to remote s3
fix https://github.com/chrislusf/seaweedfs/issues/2465
This commit is contained in:
@@ -199,8 +199,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
|
|||||||
return client.WriteDirectory(dest, message.NewEntry)
|
return client.WriteDirectory(dest, message.NewEntry)
|
||||||
}
|
}
|
||||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
||||||
reader := filer.NewFileReader(filerSource, message.NewEntry)
|
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
|
||||||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
|
|
||||||
if writeErr != nil {
|
if writeErr != nil {
|
||||||
return writeErr
|
return writeErr
|
||||||
}
|
}
|
||||||
@@ -264,9 +263,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
|
|||||||
return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
|
return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
|
||||||
} else {
|
} else {
|
||||||
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
|
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
|
||||||
reader := filer.NewFileReader(filerSource, message.NewEntry)
|
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
|
||||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
|
|
||||||
remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
|
|
||||||
if writeErr != nil {
|
if writeErr != nil {
|
||||||
return writeErr
|
return writeErr
|
||||||
}
|
}
|
||||||
@@ -303,9 +300,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
|
|||||||
if message.NewEntry.IsDirectory {
|
if message.NewEntry.IsDirectory {
|
||||||
return client.WriteDirectory(newDest, message.NewEntry)
|
return client.WriteDirectory(newDest, message.NewEntry)
|
||||||
}
|
}
|
||||||
reader := filer.NewFileReader(filerSource, message.NewEntry)
|
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
|
||||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
|
|
||||||
remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
|
|
||||||
if writeErr != nil {
|
if writeErr != nil {
|
||||||
return writeErr
|
return writeErr
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -108,8 +108,7 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
|
|||||||
return client.WriteDirectory(dest, message.NewEntry)
|
return client.WriteDirectory(dest, message.NewEntry)
|
||||||
}
|
}
|
||||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
||||||
reader := filer.NewFileReader(filerSource, message.NewEntry)
|
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
|
||||||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
|
|
||||||
if writeErr != nil {
|
if writeErr != nil {
|
||||||
return writeErr
|
return writeErr
|
||||||
}
|
}
|
||||||
@@ -146,9 +145,7 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
|
|||||||
if err := client.DeleteFile(oldDest); err != nil {
|
if err := client.DeleteFile(oldDest); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
reader := filer.NewFileReader(filerSource, message.NewEntry)
|
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
|
||||||
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
|
||||||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
|
|
||||||
if writeErr != nil {
|
if writeErr != nil {
|
||||||
return writeErr
|
return writeErr
|
||||||
}
|
}
|
||||||
@@ -160,6 +157,20 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
|
|||||||
return eachEntryFunc, nil
|
return eachEntryFunc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func retriedWriteFile(client remote_storage.RemoteStorageClient, filerSource *source.FilerSource, newEntry *filer_pb.Entry, dest *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
|
||||||
|
var writeErr error
|
||||||
|
err = util.Retry("writeFile", func() error {
|
||||||
|
reader := filer.NewFileReader(filerSource, newEntry)
|
||||||
|
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
||||||
|
remoteEntry, writeErr = client.WriteFile(dest, newEntry, reader)
|
||||||
|
if writeErr != nil {
|
||||||
|
return writeErr
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time {
|
func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time {
|
||||||
// 1. specified by timeAgo
|
// 1. specified by timeAgo
|
||||||
// 2. last offset timestamp for this directory
|
// 2. last offset timestamp for this directory
|
||||||
|
|||||||
Reference in New Issue
Block a user