added context to filer_client method calls (#6808)
Co-authored-by: akosov <a.kosov@kryptonite.ru>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package logstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/parquet-go/parquet-go"
|
||||
@@ -50,7 +51,7 @@ func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, tim
|
||||
}
|
||||
|
||||
func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration) (partitionVersions []time.Time, err error) {
|
||||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
t, err := topic.ParseTopicVersion(entry.Name)
|
||||
if err != nil {
|
||||
// skip non-partition directories
|
||||
@@ -66,7 +67,7 @@ func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeA
|
||||
|
||||
func collectTopicVersionsPartitions(filerClient filer_pb.FilerClient, t topic.Topic, topicVersion time.Time) (partitions []topic.Partition, err error) {
|
||||
version := topicVersion.Format(topic.PartitionGenerationFormat)
|
||||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
if !entry.IsDirectory {
|
||||
return nil
|
||||
}
|
||||
@@ -151,7 +152,7 @@ func groupFilesBySize(logFiles []*filer_pb.Entry, maxGroupSize int64) (logFileGr
|
||||
}
|
||||
|
||||
func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, timeAgo time.Duration, minTsNs, maxTsNs int64) (logFiles []*filer_pb.Entry, err error) {
|
||||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
if strings.HasSuffix(entry.Name, ".parquet") {
|
||||
return nil
|
||||
}
|
||||
@@ -173,7 +174,7 @@ func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, time
|
||||
}
|
||||
|
||||
func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) (minTsNs, maxTsNs int64, err error) {
|
||||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
if !strings.HasSuffix(entry.Name, ".parquet") {
|
||||
return nil
|
||||
}
|
||||
@@ -354,7 +355,7 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile
|
||||
|
||||
// write the entry to partitionDir
|
||||
if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
|
||||
return filer_pb.CreateEntry(context.Background(), client, &filer_pb.CreateEntryRequest{
|
||||
Directory: partitionDir,
|
||||
Entry: entry,
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user