add publisher name for debugging

This commit is contained in:
chrislu
2024-03-31 00:19:16 -07:00
parent ca4f89a6f6
commit c9df613b6b
7 changed files with 238 additions and 213 deletions

View File

@@ -17,6 +17,7 @@ type PublisherConfiguration struct {
CreateTopic bool
CreateTopicPartitionCount int32
Brokers []string
PublisherName string // for debugging
}
type PublishClient struct {

View File

@@ -146,6 +146,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
Partition: job.Partition,
AckInterval: 128,
FollowerBrokers: job.FollowerBrokers,
PublisherName: p.config.PublisherName,
},
},
}); err != nil {
@@ -184,9 +185,9 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return
}
if ackResp.AckSequence > 0 {
log.Printf("ack %d", ackResp.AckSequence)
log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
}
if atomic.LoadInt64(&publishedTsNs) == ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
return
}
}