s3tables: fix cross-namespace pagination in listTablesInAllNamespaces
This commit is contained in:
@@ -419,17 +419,18 @@ func (h *S3TablesHandler) listTablesInNamespaceWithClient(ctx context.Context, c
|
|||||||
func (h *S3TablesHandler) listTablesInAllNamespaces(ctx context.Context, filerClient FilerClient, bucketName, prefix, continuationToken string, maxTables int, tables *[]TableSummary) error {
|
func (h *S3TablesHandler) listTablesInAllNamespaces(ctx context.Context, filerClient FilerClient, bucketName, prefix, continuationToken string, maxTables int, tables *[]TableSummary) error {
|
||||||
bucketPath := getTableBucketPath(bucketName)
|
bucketPath := getTableBucketPath(bucketName)
|
||||||
|
|
||||||
var lastNamespace string
|
var continuationNamespace string
|
||||||
var startTableName string
|
var startTableName string
|
||||||
if continuationToken != "" {
|
if continuationToken != "" {
|
||||||
if parts := strings.SplitN(continuationToken, "/", 2); len(parts) == 2 {
|
if parts := strings.SplitN(continuationToken, "/", 2); len(parts) == 2 {
|
||||||
lastNamespace = parts[0]
|
continuationNamespace = parts[0]
|
||||||
startTableName = parts[1]
|
startTableName = parts[1]
|
||||||
} else {
|
} else {
|
||||||
lastNamespace = continuationToken
|
continuationNamespace = continuationToken
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lastNamespace := continuationNamespace
|
||||||
return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
for {
|
for {
|
||||||
// List namespaces in batches
|
// List namespaces in batches
|
||||||
@@ -437,7 +438,7 @@ func (h *S3TablesHandler) listTablesInAllNamespaces(ctx context.Context, filerCl
|
|||||||
Directory: bucketPath,
|
Directory: bucketPath,
|
||||||
Limit: 100,
|
Limit: 100,
|
||||||
StartFromFileName: lastNamespace,
|
StartFromFileName: lastNamespace,
|
||||||
InclusiveStartFrom: lastNamespace == "",
|
InclusiveStartFrom: lastNamespace == continuationNamespace && continuationNamespace != "" || lastNamespace == "",
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -455,6 +456,13 @@ func (h *S3TablesHandler) listTablesInAllNamespaces(ctx context.Context, filerCl
|
|||||||
if entry.Entry == nil {
|
if entry.Entry == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skip the start item if it was the continuation namespace but we already processed it
|
||||||
|
// (handled by the startTableName clearing logic below)
|
||||||
|
if lastNamespace == continuationNamespace && continuationNamespace != "" && entry.Entry.Name == continuationNamespace && startTableName == "" && len(*tables) > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
hasMore = true
|
hasMore = true
|
||||||
lastNamespace = entry.Entry.Name
|
lastNamespace = entry.Entry.Name
|
||||||
|
|
||||||
@@ -471,7 +479,7 @@ func (h *S3TablesHandler) listTablesInAllNamespaces(ctx context.Context, filerCl
|
|||||||
|
|
||||||
// List tables in this namespace
|
// List tables in this namespace
|
||||||
tableNameFilter := ""
|
tableNameFilter := ""
|
||||||
if namespace == lastNamespace {
|
if namespace == continuationNamespace {
|
||||||
tableNameFilter = startTableName
|
tableNameFilter = startTableName
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -480,6 +488,11 @@ func (h *S3TablesHandler) listTablesInAllNamespaces(ctx context.Context, filerCl
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clear startTableName after the first matching namespace is processed
|
||||||
|
if namespace == continuationNamespace {
|
||||||
|
startTableName = ""
|
||||||
|
}
|
||||||
|
|
||||||
if len(*tables) >= maxTables {
|
if len(*tables) >= maxTables {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user