remove extra async execution

This commit is contained in:
chrislu
2022-01-17 15:50:11 -08:00
parent 7bf7af971b
commit 047446d5ca

View File

@@ -46,8 +46,7 @@ func (pages *MemoryChunkPages) FlushData() error {
if !pages.hasWrites { if !pages.hasWrites {
return nil return nil
} }
pages.saveChunkedFileToStorage() pages.uploadPipeline.FlushAll()
pages.writeWaitGroup.Wait()
if pages.lastErr != nil { if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr) return fmt.Errorf("flush data: %v", pages.lastErr)
} }
@@ -65,41 +64,24 @@ func (pages *MemoryChunkPages) GetStorageOptions() (collection, replication stri
return pages.collection, pages.replication return pages.collection, pages.replication
} }
func (pages *MemoryChunkPages) saveChunkedFileToStorage() {
pages.uploadPipeline.FlushAll()
}
func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
mtime := time.Now().UnixNano() mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1) defer cleanupFn()
writer := func() {
defer pages.writeWaitGroup.Done()
defer cleanupFn()
chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err)
pages.lastErr = err
return
}
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
pages.chunkAddLock.Lock()
pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk})
pages.fh.entryViewCache = nil
glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size)
pages.chunkAddLock.Unlock()
chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err)
pages.lastErr = err
return
} }
chunk.Mtime = mtime
if pages.fh.f.wfs.concurrentWriters != nil { pages.collection, pages.replication = collection, replication
pages.fh.f.wfs.concurrentWriters.Execute(writer) pages.chunkAddLock.Lock()
} else { pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk})
go writer() pages.fh.entryViewCache = nil
} glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size)
pages.chunkAddLock.Unlock()
} }