Files
seaweedFS/weed/mq/kafka/integration/seaweedmq_handler_topics.go
Chris Lu 995dfc4d5d chore: remove ~50k lines of unreachable dead code (#8913)
* chore: remove unreachable dead code across the codebase

Remove ~50,000 lines of unreachable code identified by static analysis.

Major removals:
- weed/filer/redis_lua: entire unused Redis Lua filer store implementation
- weed/wdclient/net2, resource_pool: unused connection/resource pool packages
- weed/plugin/worker/lifecycle: unused lifecycle plugin worker
- weed/s3api: unused S3 policy templates, presigned URL IAM, streaming copy,
  multipart IAM, key rotation, and various SSE helper functions
- weed/mq/kafka: unused partition mapping, compression, schema, and protocol functions
- weed/mq/offset: unused SQL storage and migration code
- weed/worker: unused registry, task, and monitoring functions
- weed/query: unused SQL engine, parquet scanner, and type functions
- weed/shell: unused EC proportional rebalance functions
- weed/storage/erasure_coding/distribution: unused distribution analysis functions
- Individual unreachable functions removed from 150+ files across admin,
  credential, filer, iam, kms, mount, mq, operation, pb, s3api, server,
  shell, storage, topology, and util packages

* fix(s3): reset shared memory store in IAM test to prevent flaky failure

TestLoadIAMManagerFromConfig_EmptyConfigWithFallbackKey was flaky because
the MemoryStore credential backend is a singleton registered via init().
Earlier tests that create anonymous identities pollute the shared store,
causing LookupAnonymous() to unexpectedly return true.

Fix by calling Reset() on the memory store before the test runs.

* style: run gofmt on changed files

* fix: restore KMS functions used by integration tests

* fix(plugin): prevent panic on send to closed worker session channel

The Plugin.sendToWorker method could panic with "send on closed channel"
when a worker disconnected while a message was being sent. The race was
between streamSession.close() closing the outgoing channel and sendToWorker
writing to it concurrently.

Add a done channel to streamSession that is closed before the outgoing
channel, and check it in sendToWorker's select to safely detect closed
sessions without panicking.
2026-04-03 16:04:27 -07:00

281 lines
8.8 KiB
Go

package integration
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// CreateTopic creates a new topic in both Kafka registry and SeaweedMQ
func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error {
return h.CreateTopicWithSchema(name, partitions, nil)
}
// CreateTopicWithSchema creates a topic with optional value schema
func (h *SeaweedMQHandler) CreateTopicWithSchema(name string, partitions int32, recordType *schema_pb.RecordType) error {
return h.CreateTopicWithSchemas(name, partitions, nil, recordType)
}
// CreateTopicWithSchemas creates a topic with optional key and value schemas
func (h *SeaweedMQHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error {
// Check if topic already exists in filer
if h.checkTopicInFiler(name) {
return fmt.Errorf("topic %s already exists", name)
}
// Create SeaweedMQ topic reference
seaweedTopic := &schema_pb.Topic{
Namespace: "kafka",
Name: name,
}
// Configure topic with SeaweedMQ broker via gRPC
if len(h.brokerAddresses) > 0 {
brokerAddress := h.brokerAddresses[0] // Use first available broker
glog.V(1).Infof("Configuring topic %s with broker %s", name, brokerAddress)
// Load security configuration for broker connection
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.mq")
err := pb.WithBrokerGrpcClient(false, brokerAddress, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
// Convert dual schemas to flat schema format
var flatSchema *schema_pb.RecordType
var keyColumns []string
if keyRecordType != nil || valueRecordType != nil {
flatSchema, keyColumns = schema.CombineFlatSchemaFromKeyValue(keyRecordType, valueRecordType)
}
_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
Topic: seaweedTopic,
PartitionCount: partitions,
MessageRecordType: flatSchema,
KeyColumns: keyColumns,
})
if err != nil {
return fmt.Errorf("configure topic with broker: %w", err)
}
glog.V(1).Infof("successfully configured topic %s with broker", name)
return nil
})
if err != nil {
return fmt.Errorf("failed to configure topic %s with broker %s: %w", name, brokerAddress, err)
}
} else {
glog.Warningf("No brokers available - creating topic %s in gateway memory only (testing mode)", name)
}
// Topic is now stored in filer only via SeaweedMQ broker
// No need to create in-memory topic info structure
// Offset management now handled directly by SMQ broker - no initialization needed
// Invalidate cache after successful topic creation
h.InvalidateTopicExistsCache(name)
glog.V(1).Infof("Topic %s created successfully with %d partitions", name, partitions)
return nil
}
// CreateTopicWithRecordType creates a topic with flat schema and key columns
func (h *SeaweedMQHandler) CreateTopicWithRecordType(name string, partitions int32, flatSchema *schema_pb.RecordType, keyColumns []string) error {
// Check if topic already exists in filer
if h.checkTopicInFiler(name) {
return fmt.Errorf("topic %s already exists", name)
}
// Create SeaweedMQ topic reference
seaweedTopic := &schema_pb.Topic{
Namespace: "kafka",
Name: name,
}
// Configure topic with SeaweedMQ broker via gRPC
if len(h.brokerAddresses) > 0 {
brokerAddress := h.brokerAddresses[0] // Use first available broker
glog.V(1).Infof("Configuring topic %s with broker %s", name, brokerAddress)
// Load security configuration for broker connection
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.mq")
err := pb.WithBrokerGrpcClient(false, brokerAddress, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
Topic: seaweedTopic,
PartitionCount: partitions,
MessageRecordType: flatSchema,
KeyColumns: keyColumns,
})
if err != nil {
return fmt.Errorf("failed to configure topic: %w", err)
}
glog.V(1).Infof("successfully configured topic %s with broker", name)
return nil
})
if err != nil {
return err
}
} else {
glog.Warningf("No broker addresses configured, topic %s not created in SeaweedMQ", name)
}
// Topic is now stored in filer only via SeaweedMQ broker
// No need to create in-memory topic info structure
glog.V(1).Infof("Topic %s created successfully with %d partitions using flat schema", name, partitions)
return nil
}
// DeleteTopic removes a topic from both Kafka registry and SeaweedMQ
func (h *SeaweedMQHandler) DeleteTopic(name string) error {
// Check if topic exists in filer
if !h.checkTopicInFiler(name) {
return fmt.Errorf("topic %s does not exist", name)
}
// Get topic info to determine partition count for cleanup
topicInfo, exists := h.GetTopicInfo(name)
if !exists {
return fmt.Errorf("topic %s info not found", name)
}
// Close all publisher sessions for this topic
for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ {
if h.brokerClient != nil {
h.brokerClient.ClosePublisher(name, partitionID)
}
}
// Topic removal from filer would be handled by SeaweedMQ broker
// No in-memory cache to clean up
// Offset management handled by SMQ broker - no cleanup needed
return nil
}
// TopicExists checks if a topic exists in SeaweedMQ broker (includes in-memory topics)
// Uses a 5-second cache to reduce broker queries
func (h *SeaweedMQHandler) TopicExists(name string) bool {
// Check cache first
h.topicExistsCacheMu.RLock()
if entry, found := h.topicExistsCache[name]; found {
if time.Now().Before(entry.expiresAt) {
h.topicExistsCacheMu.RUnlock()
return entry.exists
}
}
h.topicExistsCacheMu.RUnlock()
// Cache miss or expired - query broker
var exists bool
// Check via SeaweedMQ broker (includes in-memory topics)
if h.brokerClient != nil {
var err error
exists, err = h.brokerClient.TopicExists(name)
if err != nil {
// Don't cache errors
return false
}
} else {
// Return false if broker is unavailable
return false
}
// Update cache
h.topicExistsCacheMu.Lock()
h.topicExistsCache[name] = &topicExistsCacheEntry{
exists: exists,
expiresAt: time.Now().Add(h.topicExistsCacheTTL),
}
h.topicExistsCacheMu.Unlock()
return exists
}
// InvalidateTopicExistsCache removes a topic from the existence cache
// Should be called after creating or deleting a topic
func (h *SeaweedMQHandler) InvalidateTopicExistsCache(name string) {
h.topicExistsCacheMu.Lock()
delete(h.topicExistsCache, name)
h.topicExistsCacheMu.Unlock()
}
// GetTopicInfo returns information about a topic from broker
func (h *SeaweedMQHandler) GetTopicInfo(name string) (*KafkaTopicInfo, bool) {
// Get topic configuration from broker
if h.brokerClient != nil {
config, err := h.brokerClient.GetTopicConfiguration(name)
if err == nil && config != nil {
topicInfo := &KafkaTopicInfo{
Name: name,
Partitions: config.PartitionCount,
CreatedAt: config.CreatedAtNs,
}
return topicInfo, true
}
glog.V(2).Infof("Failed to get topic configuration for %s from broker: %v", name, err)
}
// Fallback: check if topic exists in filer (for backward compatibility)
if !h.checkTopicInFiler(name) {
return nil, false
}
// Return default info if broker query failed but topic exists in filer
topicInfo := &KafkaTopicInfo{
Name: name,
Partitions: 1, // Default to 1 partition if broker query failed
CreatedAt: 0,
}
return topicInfo, true
}
// ListTopics returns all topic names from SeaweedMQ broker (includes in-memory topics)
func (h *SeaweedMQHandler) ListTopics() []string {
// Get topics from SeaweedMQ broker (includes in-memory topics)
if h.brokerClient != nil {
topics, err := h.brokerClient.ListTopics()
if err == nil {
return topics
}
}
// Return empty list if broker is unavailable
return []string{}
}
// checkTopicInFiler checks if a topic exists in the filer
func (h *SeaweedMQHandler) checkTopicInFiler(topicName string) bool {
if h.filerClientAccessor == nil {
return false
}
var exists bool
h.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: "/topics/kafka",
Name: topicName,
}
_, err := client.LookupDirectoryEntry(context.Background(), request)
exists = (err == nil)
return nil // Don't propagate error, just check existence
})
return exists
}