segment serde
This commit is contained in:
@@ -53,16 +53,19 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
// good if the segment is still on the brokers
|
||||
isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
if isActive {
|
||||
for _, broker := range existingBrokers {
|
||||
ret.Brokers = append(ret.Brokers, string(broker))
|
||||
|
||||
if len(existingBrokers) > 0 {
|
||||
// good if the segment is still on the brokers
|
||||
isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
if isActive {
|
||||
for _, broker := range existingBrokers {
|
||||
ret.Brokers = append(ret.Brokers, string(broker))
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// randomly pick up to 10 brokers, and find the ones with the lightest load
|
||||
@@ -72,7 +75,7 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
|
||||
}
|
||||
|
||||
// save the allocated brokers info for this segment on the filer
|
||||
if err := broker.saveSegmentOnFiler(segment, selectedBrokers); err != nil {
|
||||
if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil {
|
||||
return ret, err
|
||||
}
|
||||
|
||||
@@ -82,10 +85,6 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
@@ -206,7 +205,3 @@ func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddres
|
||||
wg.Wait()
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) saveSegmentOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user