This commit is contained in:
chrislu
2024-01-28 12:30:08 -08:00
parent cbf750a31f
commit dedfd31dfb
3 changed files with 28 additions and 22 deletions

View File

@@ -28,9 +28,9 @@ type EachPartitionPublishJob struct {
generation int
inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
}
func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *sync.WaitGroup) error {
func (p *TopicPublisher) StartSchedulerThread(wg *sync.WaitGroup) error {
if err := p.doEnsureConfigureTopic(bootstrapBrokers); err != nil {
if err := p.doEnsureConfigureTopic(); err != nil {
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
}
@@ -40,7 +40,7 @@ func (p *TopicPublisher) StartSchedulerThread(bootstrapBrokers []string, wg *syn
var errChan chan EachPartitionError
for {
glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
if assignments, err := p.doLookupTopicPartitions(bootstrapBrokers); err == nil {
if assignments, err := p.doLookupTopicPartitions(); err == nil {
generation++
glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
if errChan == nil {
@@ -183,12 +183,12 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return nil
}
func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err error) {
if len(bootstrapBrokers) == 0 {
func (p *TopicPublisher) doEnsureConfigureTopic() (err error) {
if len(p.config.Brokers) == 0 {
return fmt.Errorf("no bootstrap brokers")
}
var lastErr error
for _, brokerAddress := range bootstrapBrokers {
for _, brokerAddress := range p.config.Brokers {
err = pb.WithBrokerGrpcClient(false,
brokerAddress,
p.grpcDialOption,
@@ -212,12 +212,12 @@ func (p *TopicPublisher) doEnsureConfigureTopic(bootstrapBrokers []string) (err
return nil
}
func (p *TopicPublisher) doLookupTopicPartitions(bootstrapBrokers []string) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
if len(bootstrapBrokers) == 0 {
func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
if len(p.config.Brokers) == 0 {
return nil, fmt.Errorf("no bootstrap brokers")
}
var lastErr error
for _, brokerAddress := range bootstrapBrokers {
for _, brokerAddress := range p.config.Brokers {
err := pb.WithBrokerGrpcClient(false,
brokerAddress,
p.grpcDialOption,