api for sub

This commit is contained in:
chrislu
2023-09-04 21:43:50 -07:00
parent 9e4f985698
commit ba67e6ca29
7 changed files with 189 additions and 136 deletions

View File

@@ -3,28 +3,37 @@ package sub_client
import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type SubscriberConfiguration struct {
ConsumerGroup string
ConsumerId string
ClientId string
GroupId string
GroupInstanceId string
BootstrapServers []string
GrpcDialOption grpc.DialOption
}
type ContentConfiguration struct {
Namespace string
Topic string
Filter string
}
type OnEachMessageFunc func(key, value []byte) (shouldContinue bool)
type OnCompletionFunc func()
type TopicSubscriber struct {
config *SubscriberConfiguration
namespace string
topic string
SubscriberConfig *SubscriberConfiguration
ContentConfig *ContentConfiguration
brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
grpcDialOption grpc.DialOption
OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc
}
func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber {
func NewTopicSubscriber(subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
return &TopicSubscriber{
config: config,
namespace: namespace,
topic: topic,
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
SubscriberConfig: subscriber,
ContentConfig: content,
}
}
@@ -34,3 +43,11 @@ func (sub *TopicSubscriber) Connect(bootstrapBroker string) error {
}
return nil
}
func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
sub.OnEachMessageFunc = onEachMessageFn
}
func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) {
sub.OnCompletionFunc = onCompeletionFn
}