convert error fromating to %w everywhere (#6995)

This commit is contained in:
Chris Lu
2025-07-16 23:39:27 -07:00
committed by GitHub
parent a524b4f485
commit 69553e5ba6
174 changed files with 524 additions and 524 deletions

View File

@@ -25,7 +25,7 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stop
return pb.WithBrokerGrpcClient(true, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
stream, err := client.PublisherToPubBalancer(context.Background())
if err != nil {
return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
return fmt.Errorf("connect to balancer %v: %w", brokerBalancer, err)
}
defer stream.CloseSend()
err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
@@ -36,7 +36,7 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stop
},
})
if err != nil {
return fmt.Errorf("send init message: %v", err)
return fmt.Errorf("send init message: %w", err)
}
for {

View File

@@ -66,7 +66,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
// save the topic configuration on filer
if err := b.fca.SaveTopicConfToFiler(t, resp); err != nil {
return nil, fmt.Errorf("configure topic: %v", err)
return nil, fmt.Errorf("configure topic: %w", err)
}
b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)

View File

@@ -164,14 +164,14 @@ func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request
if conf, createdAtNs, modifiedAtNs, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
glog.V(0).Infof("get topic configuration %s: %v", request.Topic, err)
return nil, fmt.Errorf("failed to read topic configuration: %v", err)
return nil, fmt.Errorf("failed to read topic configuration: %w", err)
}
// Ensure topic assignments are active
err = b.ensureTopicActiveAssignments(t, conf)
if err != nil {
glog.V(0).Infof("ensure topic active assignments %s: %v", request.Topic, err)
return nil, fmt.Errorf("failed to ensure topic assignments: %v", err)
return nil, fmt.Errorf("failed to ensure topic assignments: %w", err)
}
// Build the response with complete configuration including metadata
@@ -208,7 +208,7 @@ func (b *MessageQueueBroker) GetTopicPublishers(ctx context.Context, request *mq
var conf *mq_pb.ConfigureTopicResponse
if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
glog.V(0).Infof("get topic configuration for publishers %s: %v", request.Topic, err)
return nil, fmt.Errorf("failed to read topic configuration: %v", err)
return nil, fmt.Errorf("failed to read topic configuration: %w", err)
}
// Collect publishers from each partition that is hosted on this broker
@@ -262,7 +262,7 @@ func (b *MessageQueueBroker) GetTopicSubscribers(ctx context.Context, request *m
var conf *mq_pb.ConfigureTopicResponse
if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
glog.V(0).Infof("get topic configuration for subscribers %s: %v", request.Topic, err)
return nil, fmt.Errorf("failed to read topic configuration: %v", err)
return nil, fmt.Errorf("failed to read topic configuration: %w", err)
}
// Collect subscribers from each partition that is hosted on this broker

View File

@@ -145,7 +145,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// send to the local partition
if err = localTopicPartition.Publish(dataMessage); err != nil {
return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err)
return fmt.Errorf("topic %v partition %v publish error: %w", initMessage.Topic, initMessage.Partition, err)
}
// Update published offset and last seen time for this publisher

View File

@@ -15,7 +15,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
}
req, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive init message: %v", err)
return fmt.Errorf("receive init message: %w", err)
}
// process init message

View File

@@ -14,12 +14,12 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio
conf, readConfErr := b.fca.ReadTopicConfFromFiler(t)
if readConfErr != nil {
glog.Errorf("topic %v not found: %v", t, readConfErr)
return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
return nil, fmt.Errorf("topic %v not found: %w", t, readConfErr)
}
localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf)
if getOrGenError != nil {
glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
return nil, fmt.Errorf("topic %v partition %v not setup: %w", t, partition, getOrGenError)
}
return localTopicPartition, nil
}

View File

@@ -44,13 +44,13 @@ func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitio
stream, err := agentClient.PublishRecord(context.Background())
if err != nil {
return nil, fmt.Errorf("publish record: %v", err)
return nil, fmt.Errorf("publish record: %w", err)
}
if err = stream.Send(&mq_agent_pb.PublishRecordRequest{
SessionId: resp.SessionId,
}); err != nil {
return nil, fmt.Errorf("send session id: %v", err)
return nil, fmt.Errorf("send session id: %w", err)
}
return &PublishSession{
@@ -67,7 +67,7 @@ func (a *PublishSession) CloseSession() error {
}
err := a.stream.CloseSend()
if err != nil {
return fmt.Errorf("close send: %v", err)
return fmt.Errorf("close send: %w", err)
}
a.schema = nil
return err

View File

@@ -50,13 +50,13 @@ func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*Subscri
stream, err := agentClient.SubscribeRecord(context.Background())
if err != nil {
return nil, fmt.Errorf("subscribe record: %v", err)
return nil, fmt.Errorf("subscribe record: %w", err)
}
if err = stream.Send(&mq_agent_pb.SubscribeRecordRequest{
Init: initRequest,
}); err != nil {
return nil, fmt.Errorf("send session id: %v", err)
return nil, fmt.Errorf("send session id: %w", err)
}
return &SubscribeSession{

View File

@@ -38,7 +38,7 @@ func (p *TopicPublisher) PublishRecord(key []byte, recordValue *schema_pb.Record
// serialize record value
value, err := proto.Marshal(recordValue)
if err != nil {
return fmt.Errorf("failed to marshal record value: %v", err)
return fmt.Errorf("failed to marshal record value: %w", err)
}
return p.doPublish(key, value)

View File

@@ -137,7 +137,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
stream, err := brokerClient.PublishMessage(context.Background())
if err != nil {
return fmt.Errorf("create publish client: %v", err)
return fmt.Errorf("create publish client: %w", err)
}
publishClient := &PublishClient{
SeaweedMessaging_PublishMessageClient: stream,
@@ -154,12 +154,12 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
},
},
}); err != nil {
return fmt.Errorf("send init message: %v", err)
return fmt.Errorf("send init message: %w", err)
}
// process the hello message
resp, err := stream.Recv()
if err != nil {
return fmt.Errorf("recv init response: %v", err)
return fmt.Errorf("recv init response: %w", err)
}
if resp.Error != "" {
return fmt.Errorf("init response error: %v", resp.Error)
@@ -208,7 +208,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
Data: data,
},
}); err != nil {
return fmt.Errorf("send publish data: %v", err)
return fmt.Errorf("send publish data: %w", err)
}
publishCounter++
atomic.StoreInt64(&publishedTsNs, data.TsNs)
@@ -218,7 +218,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
} else {
// CloseSend would cancel the context on the server side
if err := publishClient.CloseSend(); err != nil {
return fmt.Errorf("close send: %v", err)
return fmt.Errorf("close send: %w", err)
}
}

View File

@@ -22,7 +22,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient, err := client.SubscribeMessage(context.Background())
if err != nil {
return fmt.Errorf("create subscribe client: %v", err)
return fmt.Errorf("create subscribe client: %w", err)
}
slidingWindowSize := sub.SubscriberConfig.SlidingWindowSize
@@ -94,7 +94,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
if errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("subscribe recv: %v", err)
return fmt.Errorf("subscribe recv: %w", err)
}
if resp.Message == nil {
glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)

View File

@@ -31,7 +31,7 @@ func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, tim
// list the topic partition versions
topicVersions, err := collectTopicVersions(filerClient, t, timeAgo)
if err != nil {
return fmt.Errorf("list topic files: %v", err)
return fmt.Errorf("list topic files: %w", err)
}
// compact the partitions
@@ -120,7 +120,7 @@ func compactTopicPartitionDir(filerClient filer_pb.FilerClient, topicName, parti
// create a parquet schema
parquetSchema, err := schema.ToParquetSchema(topicName, recordType)
if err != nil {
return fmt.Errorf("ToParquetSchema failed: %v", err)
return fmt.Errorf("ToParquetSchema failed: %w", err)
}
// TODO parallelize the writing
@@ -210,7 +210,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
tempFile, err := os.CreateTemp(".", "t*.parquet")
if err != nil {
return fmt.Errorf("create temp file: %v", err)
return fmt.Errorf("create temp file: %w", err)
}
defer func() {
tempFile.Close()
@@ -241,7 +241,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
record := &schema_pb.RecordValue{}
if err := proto.Unmarshal(entry.Data, record); err != nil {
return fmt.Errorf("unmarshal record value: %v", err)
return fmt.Errorf("unmarshal record value: %w", err)
}
record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
@@ -256,7 +256,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
}
if err := schema.AddRecordValue(rowBuilder, recordType, parquetLevels, record); err != nil {
return fmt.Errorf("add record value: %v", err)
return fmt.Errorf("add record value: %w", err)
}
rows = append(rows, rowBuilder.Row())
@@ -264,18 +264,18 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
return nil
}); err != nil {
return fmt.Errorf("iterate log entry %v/%v: %v", partitionDir, logFile.Name, err)
return fmt.Errorf("iterate log entry %v/%v: %w", partitionDir, logFile.Name, err)
}
fmt.Printf("processed %d rows\n", len(rows))
if _, err := writer.WriteRows(rows); err != nil {
return fmt.Errorf("write rows: %v", err)
return fmt.Errorf("write rows: %w", err)
}
}
if err := writer.Close(); err != nil {
return fmt.Errorf("close writer: %v", err)
return fmt.Errorf("close writer: %w", err)
}
// write to parquet file to partitionDir
@@ -291,13 +291,13 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error {
uploader, err := operation.NewUploader()
if err != nil {
return fmt.Errorf("new uploader: %v", err)
return fmt.Errorf("new uploader: %w", err)
}
// get file size
fileInfo, err := sourceFile.Stat()
if err != nil {
return fmt.Errorf("stat source file: %v", err)
return fmt.Errorf("stat source file: %w", err)
}
// upload file in chunks
@@ -360,7 +360,7 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile
Entry: entry,
})
}); err != nil {
return fmt.Errorf("create entry: %v", err)
return fmt.Errorf("create entry: %w", err)
}
fmt.Printf("saved to %s/%s\n", partitionDir, parquetFileName)
@@ -436,12 +436,12 @@ func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (proc
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
pos += 4 + int(size)
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %w", err)
return
}
if _, err = eachLogEntryFn(logEntry); err != nil {
err = fmt.Errorf("process log entry %v: %v", logEntry, err)
err = fmt.Errorf("process log entry %v: %w", logEntry, err)
return
}

View File

@@ -34,7 +34,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
pos += 4 + int(size)
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %w", err)
return
}
if logEntry.TsNs <= starTsNs {
@@ -48,7 +48,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
// fmt.Printf(" read logEntry: %v, ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC())
if _, err = eachLogEntryFn(logEntry); err != nil {
err = fmt.Errorf("process log entry %v: %v", logEntry, err)
err = fmt.Errorf("process log entry %v: %w", logEntry, err)
return
}

View File

@@ -69,7 +69,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
// convert parquet row to schema_pb.RecordValue
recordValue, err := schema.ToRecordValue(recordType, parquetLevels, row)
if err != nil {
return processedTsNs, fmt.Errorf("ToRecordValue failed: %v", err)
return processedTsNs, fmt.Errorf("ToRecordValue failed: %w", err)
}
processedTsNs = recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value()
if processedTsNs <= starTsNs {
@@ -81,7 +81,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
data, marshalErr := proto.Marshal(recordValue)
if marshalErr != nil {
return processedTsNs, fmt.Errorf("marshal record value: %v", marshalErr)
return processedTsNs, fmt.Errorf("marshal record value: %w", marshalErr)
}
logEntry := &filer_pb.LogEntry{
@@ -93,7 +93,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
// fmt.Printf(" parquet entry %s ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC())
if _, err = eachLogEntryFn(logEntry); err != nil {
return processedTsNs, fmt.Errorf("process log entry %v: %v", logEntry, err)
return processedTsNs, fmt.Errorf("process log entry %v: %w", logEntry, err)
}
}

View File

@@ -9,7 +9,7 @@ import (
func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parquet.Schema, error) {
rootNode, err := toParquetFieldTypeRecord(recordType)
if err != nil {
return nil, fmt.Errorf("failed to convert record type to parquet schema: %v", err)
return nil, fmt.Errorf("failed to convert record type to parquet schema: %w", err)
}
// Fields are sorted by name, so the value should be sorted also

View File

@@ -155,7 +155,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
p.publishFolloweMeStream, err = followerClient.PublishFollowMe(ctx)
if err != nil {
return fmt.Errorf("fail to create publish client: %v", err)
return fmt.Errorf("fail to create publish client: %w", err)
}
if err = p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Init{

View File

@@ -52,12 +52,12 @@ func (t Topic) ReadConfFile(client filer_pb.SeaweedFilerClient) (*mq_pb.Configur
return nil, err
}
if err != nil {
return nil, fmt.Errorf("read topic.conf of %v: %v", t, err)
return nil, fmt.Errorf("read topic.conf of %v: %w", t, err)
}
// parse into filer conf object
conf := &mq_pb.ConfigureTopicResponse{}
if err = jsonpb.Unmarshal(data, conf); err != nil {
return nil, fmt.Errorf("unmarshal topic %v conf: %v", t, err)
return nil, fmt.Errorf("unmarshal topic %v conf: %w", t, err)
}
return conf, nil
}
@@ -75,7 +75,7 @@ func (t Topic) ReadConfFileWithMetadata(client filer_pb.SeaweedFilerClient) (*mq
if errors.Is(err, filer_pb.ErrNotFound) {
return nil, 0, 0, err
}
return nil, 0, 0, fmt.Errorf("lookup topic.conf of %v: %v", t, err)
return nil, 0, 0, fmt.Errorf("lookup topic.conf of %v: %w", t, err)
}
// Get file metadata
@@ -88,7 +88,7 @@ func (t Topic) ReadConfFileWithMetadata(client filer_pb.SeaweedFilerClient) (*mq
// Parse the configuration
conf := &mq_pb.ConfigureTopicResponse{}
if err = jsonpb.Unmarshal(resp.Entry.Content, conf); err != nil {
return nil, 0, 0, fmt.Errorf("unmarshal topic %v conf: %v", t, err)
return nil, 0, 0, fmt.Errorf("unmarshal topic %v conf: %w", t, err)
}
return conf, createdAtNs, modifiedAtNs, nil
@@ -98,7 +98,7 @@ func (t Topic) WriteConfFile(client filer_pb.SeaweedFilerClient, conf *mq_pb.Con
var buf bytes.Buffer
filer.ProtoToText(&buf, conf)
if err := filer.SaveInsideFiler(client, t.Dir(), filer.TopicConfFile, buf.Bytes()); err != nil {
return fmt.Errorf("save topic %v conf: %v", t, err)
return fmt.Errorf("save topic %v conf: %w", t, err)
}
return nil
}