Revert "Merge branch 'master' into sub"

This reverts commit 4d414f54a2, reversing
changes made to 4827425146.
This commit is contained in:
chrislu
2023-09-18 16:13:20 -07:00
parent 8cb42c39ad
commit 0bb97709d4
21 changed files with 199 additions and 290 deletions

View File

@@ -139,7 +139,7 @@ func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string,
func (lm *LockManager) GetLockOwner(key string) (owner string, err error) {
lm.locks.Range(func(k string, lock *Lock) bool {
if k == key && lock != nil {
if k == key {
owner = lock.Owner
return false
}

View File

@@ -2,12 +2,11 @@ package mount
import (
"fmt"
"io"
"sync"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/page_writer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"io"
"sync"
)
type ChunkedDirtyPages struct {
@@ -83,7 +82,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reade
}
func (pages *ChunkedDirtyPages) Destroy() {
func (pages ChunkedDirtyPages) Destroy() {
pages.uploadPipeline.Shutdown()
}

View File

@@ -1,20 +0,0 @@
package balancer
import (
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) {
return []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:17777",
FollowerBrokers: []string{"localhost:17777"},
Partition: &mq_pb.Partition{
RingSize: MaxPartitionCount,
RangeStart: 0,
RangeStop: MaxPartitionCount,
},
},
}
}

View File

@@ -1,62 +0,0 @@
package balancer
import (
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"reflect"
"testing"
)
func Test_allocateOneBroker(t *testing.T) {
brokers := cmap.New[*BrokerStats]()
brokers.SetIfAbsent("localhost:17777", &BrokerStats{
TopicPartitionCount: 0,
ConsumerCount: 0,
CpuUsagePercent: 0,
})
tests := []struct {
name string
args args
wantAssignments []*mq_pb.BrokerPartitionAssignment
}{
{
name: "test only one broker",
args: args{
brokers: brokers,
partitionCount: 6,
},
wantAssignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:17777",
FollowerBrokers: []string{"localhost:17777"},
Partition: &mq_pb.Partition{
RingSize: MaxPartitionCount,
RangeStart: 0,
RangeStop: MaxPartitionCount,
},
},
},
},
}
testThem(t, tests)
}
type args struct {
brokers cmap.ConcurrentMap[string, *BrokerStats]
partitionCount int
}
func testThem(t *testing.T, tests []struct {
name string
args args
wantAssignments []*mq_pb.BrokerPartitionAssignment
}) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) {
t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments)
}
})
}
}

View File

@@ -1,67 +1,16 @@
package balancer
import (
"fmt"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
const (
MaxPartitionCount = 8 * 9 * 5 * 7 //2520
)
type Balancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats]
}
type BrokerStats struct {
TopicPartitionCount int32
ConsumerCount int32
CpuUsagePercent int32
Stats cmap.ConcurrentMap[string, *TopicPartitionStats]
}
func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
bs.TopicPartitionCount = int32(len(stats.Stats))
bs.CpuUsagePercent = stats.CpuUsagePercent
var consumerCount int32
currentTopicPartitions := bs.Stats.Items()
for _, topicPartitionStats := range stats.Stats {
tps := &TopicPartitionStats{
TopicPartition: TopicPartition{
Namespace: topicPartitionStats.Topic.Namespace,
Topic: topicPartitionStats.Topic.Name,
RangeStart: topicPartitionStats.Partition.RangeStart,
RangeStop: topicPartitionStats.Partition.RangeStop,
},
ConsumerCount: topicPartitionStats.ConsumerCount,
IsLeader: topicPartitionStats.IsLeader,
}
consumerCount += topicPartitionStats.ConsumerCount
key := tps.TopicPartition.String()
bs.Stats.Set(key, tps)
delete(currentTopicPartitions, key)
}
// remove the topic partitions that are not in the stats
for key := range currentTopicPartitions {
bs.Stats.Remove(key)
}
bs.ConsumerCount = consumerCount
}
type TopicPartition struct {
Namespace string
Topic string
RangeStart int32
RangeStop int32
}
type TopicPartitionStats struct {
TopicPartition
ConsumerCount int32
IsLeader bool
}
func NewBalancer() *Balancer {
@@ -71,11 +20,5 @@ func NewBalancer() *Balancer {
}
func NewBrokerStats() *BrokerStats {
return &BrokerStats{
Stats: cmap.New[*TopicPartitionStats](),
}
}
func (tp *TopicPartition) String() string {
return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop)
return &BrokerStats{}
}

View File

@@ -1,43 +0,0 @@
package balancer
import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
// find existing topic partition assignments
for brokerStatsItem := range b.Brokers.IterBuffered() {
broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
topicPartitionStat := topicPartitionStatsItem.Val
if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
topicPartitionStat.TopicPartition.Topic == topic.Name {
assignment := &mq_pb.BrokerPartitionAssignment{
Partition: &mq_pb.Partition{
RingSize: MaxPartitionCount,
RangeStart: topicPartitionStat.RangeStart,
RangeStop: topicPartitionStat.RangeStop,
},
}
if topicPartitionStat.IsLeader {
assignment.LeaderBroker = broker
} else {
assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker)
}
assignments = append(assignments, assignment)
}
}
}
if len(assignments) > 0 {
return assignments, nil
}
// find the topic partitions on the filer
// if the topic is not found
// if the request is_for_publish
// create the topic
// if the request is_for_subscribe
// return error not found
// t := topic.FromPbTopic(request.Topic)
return allocateTopicPartitions(b.Brokers, 6), nil
}

View File

@@ -12,6 +12,10 @@ import (
"sync"
)
const (
MaxPartitionCount = 1024
)
func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
ret := &mq_pb.FindBrokerLeaderResponse{}
err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {

View File

@@ -0,0 +1,81 @@
package topic_allocation
import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"modernc.org/mathutil"
)
const (
DefaultBrokerCount = 4
)
// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions
func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) {
// create a previous assignment if not exists
if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 {
prevAssignment = &mq_pb.TopicPartitionsAssignment{
PartitionCount: topic.PartitionCount,
}
partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount
for i := 0; i < DefaultBrokerCount; i++ {
prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
PartitionStart: int32(i * partitionCountForEachBroker),
PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount),
})
}
}
// create a new assignment
assignment = &mq_pb.TopicPartitionsAssignment{
PartitionCount: prevAssignment.PartitionCount,
}
// allocate partitions for each partition range
for _, brokerPartition := range prevAssignment.BrokerPartitions {
// allocate partitions for each partition range
leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers)
if err != nil {
return nil, err
}
followerBrokers := make([]string, len(followers))
for i, follower := range followers {
followerBrokers[i] = string(follower)
}
assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
PartitionStart: brokerPartition.PartitionStart,
PartitionStop: brokerPartition.PartitionStop,
LeaderBroker: string(leader),
FollowerBrokers: followerBrokers,
})
}
return
}
func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) {
// allocate leader
leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers)
if err != nil {
return
}
// allocate followers
followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers)
if err != nil {
return
}
return
}
func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) {
return
}
func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) {
return
}

View File

@@ -94,6 +94,9 @@ type FilerServer struct {
// track known metadata listeners
knownListenersLock sync.Mutex
knownListeners map[int32]int32
// client to assign file id
assignProxy *operation.AssignProxy
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
@@ -132,6 +135,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepMasterClientConnected()
fs.assignProxy, err = operation.NewAssignProxy(fs.filer.GetMaster, fs.grpcDialOption, 16)
if !util.LoadConfiguration("filer", false) {
v.SetDefault("leveldb2.enabled", true)
v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
@@ -184,7 +189,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot)
return fs, nil
return fs, err
}
func (fs *FilerServer) checkWithMaster() {

View File

@@ -30,6 +30,12 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
return
}
isReadHttpCall := r.Method == "GET" || r.Method == "HEAD"
if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) {
writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
return
}
// proxy to volume servers
var fileId string
if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") {
@@ -42,12 +48,6 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
return
}
isReadHttpCall := r.Method == "GET" || r.Method == "HEAD"
if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) {
writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
return
}
stats.FilerRequestCounter.WithLabelValues(r.Method).Inc()
defer func() {
stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds())

View File

@@ -43,7 +43,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
ar, altRequest := so.ToAssignRequests(1)
assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
assignResult, ae := fs.assignProxy.Assign(ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
err = ae

View File

@@ -36,9 +36,7 @@ func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle
func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
_, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
if err != nil {
return err
}
if len(intervals) == 0 {
return erasure_coding.NotFoundError
}