fix: comprehensive go vet error fixes and add CI enforcement (#7861)

* fix: use keyed fields in struct literals

- Replace unsafe reflect.StringHeader/SliceHeader with safe unsafe.String/Slice (weed/query/sqltypes/unsafe.go)
- Add field names to Type_ScalarType struct literals (weed/mq/schema/schema_builder.go)
- Add Duration field name to FlexibleDuration struct literals across test files
- Add field names to bson.D struct literals (weed/filer/mongodb/mongodb_store_kv.go)

Fixes go vet warnings about unkeyed struct literals.

* fix: remove unreachable code

- Remove unreachable return statements after infinite for loops
- Remove unreachable code after if/else blocks where all paths return
- Simplify recursive logic by removing unnecessary for loop (inode_to_path.go)
- Fix Type_ScalarType literal to use enum value directly (schema_builder.go)
- Call onCompletionFn on stream error (subscribe_session.go)

Files fixed:
- weed/query/sqltypes/unsafe.go
- weed/mq/schema/schema_builder.go
- weed/mq/client/sub_client/connect_to_sub_coordinator.go
- weed/filer/redis3/ItemList.go
- weed/mq/client/agent_client/subscribe_session.go
- weed/mq/broker/broker_grpc_pub_balancer.go
- weed/mount/inode_to_path.go
- weed/util/skiplist/name_list.go

* fix: avoid copying lock values in protobuf messages

- Use proto.Merge() instead of direct assignment to avoid copying sync.Mutex in S3ApiConfiguration (iamapi_server.go)
- Add explicit comments noting that channel-received values are already copies before taking addresses (volume_grpc_client_to_master.go)

The protobuf messages contain sync.Mutex fields from the message state, which should not be copied.
Using proto.Merge() properly merges messages without copying the embedded mutex.

* fix: correct byte array size for uint32 bit shift operations

The generateAccountId() function only needs 4 bytes to create a uint32 value.
Changed from allocating 8 bytes to 4 bytes to match the actual usage.

This fixes go vet warning about shifting 8-bit values (bytes) by more than 8 bits.

* fix: ensure context cancellation on all error paths

In broker_client_subscribe.go, ensure subscriberCancel() is called on all error return paths:
- When stream creation fails
- When partition assignment fails
- When sending initialization message fails

This prevents context leaks when an error occurs during subscriber creation.

* fix: ensure subscriberCancel called for CreateFreshSubscriber stream.Send error

Ensure subscriberCancel() is called when stream.Send fails in CreateFreshSubscriber.

* ci: add go vet step to prevent future lint regressions

- Add go vet step to GitHub Actions workflow
- Filter known protobuf lock warnings (MessageState sync.Mutex)
  These are expected in generated protobuf code and are safe
- Prevents accumulation of go vet errors in future PRs
- Step runs before build to catch issues early

* fix: resolve remaining syntax and logic errors in vet fixes

- Fixed syntax errors in filer_sync.go caused by missing closing braces
- Added missing closing brace for if block and function
- Synchronized fixes to match previous commits on branch

* fix: add missing return statements to daemon functions

- Add 'return false' after infinite loops in filer_backup.go and filer_meta_backup.go
- Satisfies declared bool return type signatures
- Maintains consistency with other daemon functions (runMaster, runFilerSynchronize, runWorker)
- While unreachable, explicitly declares the return satisfies function signature contract

* fix: add nil check for onCompletionFn in SubscribeMessageRecord

- Check if onCompletionFn is not nil before calling it
- Prevents potential panic if nil function is passed
- Matches pattern used in other callback functions

* docs: clarify unreachable return statements in daemon functions

- Add comments documenting that return statements satisfy function signature
- Explains that these returns follow infinite loops and are unreachable
- Improves code clarity for future maintainers
This commit is contained in:
Chris Lu
2025-12-23 14:48:50 -08:00
committed by GitHub
parent 88ed187c27
commit 1261e93ef2
25 changed files with 100 additions and 93 deletions

View File

@@ -2,6 +2,7 @@ package broker
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
@@ -44,6 +45,4 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
// glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats)
}
}
return nil
}

View File

@@ -3,6 +3,7 @@ package agent_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
@@ -76,12 +77,11 @@ func (a *SubscribeSession) SubscribeMessageRecord(
for {
resp, err := a.stream.Recv()
if err != nil {
if onCompletionFn != nil {
onCompletionFn()
}
return err
}
onEachMessageFn(resp.Key, resp.Value)
}
if onCompletionFn != nil {
onCompletionFn()
}
return nil
}

View File

@@ -1,10 +1,11 @@
package sub_client
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"time"
)
func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
@@ -94,8 +95,6 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
sub.brokerPartitionAssignmentChan <- resp
glog.V(0).Infof("Received assignment: %+v", resp)
}
return nil
})
}
glog.V(0).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)

View File

@@ -44,12 +44,14 @@ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, sta
stream, err := bc.client.SubscribeMessage(subscriberCtx)
if err != nil {
subscriberCancel()
return nil, fmt.Errorf("failed to create subscribe stream: %v", err)
}
// Get the actual partition assignment from the broker
actualPartition, err := bc.getActualPartitionAssignment(topic, partition)
if err != nil {
subscriberCancel()
return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err)
}
@@ -63,6 +65,7 @@ func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, sta
topic, partition, startOffset, offsetType, consumerGroup, consumerID)
if err := stream.Send(initReq); err != nil {
subscriberCancel()
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
}
@@ -163,12 +166,14 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
stream, err := bc.client.SubscribeMessage(subscriberCtx)
if err != nil {
subscriberCancel()
return nil, fmt.Errorf("failed to create subscribe stream: %v", err)
}
// Get the actual partition assignment from the broker instead of using Kafka partition mapping
// Get the actual partition assignment from the broker
actualPartition, err := bc.getActualPartitionAssignment(topic, partition)
if err != nil {
subscriberCancel()
return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err)
}
@@ -198,6 +203,7 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
// Send init message using the actual partition structure that the broker allocated
initReq := createSubscribeInitMessage(topic, actualPartition, offsetValue, offsetType, consumerGroup, consumerID)
if err := stream.Send(initReq); err != nil {
subscriberCancel()
return nil, fmt.Errorf("failed to send subscribe init: %v", err)
}

View File

@@ -8,19 +8,19 @@ import (
var (
// Basic scalar types
TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BOOL}}
TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT32}}
TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_INT64}}
TypeFloat = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_FLOAT}}
TypeDouble = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DOUBLE}}
TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_BYTES}}
TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_STRING}}
TypeBoolean = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BOOL}}
TypeInt32 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}
TypeInt64 = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}
TypeFloat = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_FLOAT}}
TypeDouble = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}
TypeBytes = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}}
TypeString = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}
// Parquet logical types
TypeTimestamp = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_TIMESTAMP}}
TypeDate = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DATE}}
TypeDecimal = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_DECIMAL}}
TypeTime = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{schema_pb.ScalarType_TIME}}
TypeTimestamp = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_TIMESTAMP}}
TypeDate = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DATE}}
TypeDecimal = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DECIMAL}}
TypeTime = &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_TIME}}
)
type RecordTypeBuilder struct {