s3tables: propagate request context to filer operations
This commit is contained in:
@@ -61,7 +61,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
|
||||
namespacePath := getNamespacePath(bucketName, req.Namespace)
|
||||
var namespaceExists bool
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
_, err := h.getExtendedAttribute(client, namespacePath, ExtendedKeyMetadata)
|
||||
_, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
|
||||
namespaceExists = err == nil
|
||||
return nil
|
||||
})
|
||||
@@ -76,7 +76,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
|
||||
// Check if table already exists
|
||||
exists := false
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
_, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata)
|
||||
_, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
|
||||
exists = err == nil
|
||||
return nil
|
||||
})
|
||||
@@ -109,18 +109,18 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// Create table directory
|
||||
if err := h.createDirectory(client, tablePath); err != nil {
|
||||
if err := h.createDirectory(r.Context(), client, tablePath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create data subdirectory for Iceberg files
|
||||
dataPath := tablePath + "/data"
|
||||
if err := h.createDirectory(client, dataPath); err != nil {
|
||||
if err := h.createDirectory(r.Context(), client, dataPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set metadata as extended attribute
|
||||
if err := h.setExtendedAttribute(client, tablePath, ExtendedKeyMetadata, metadataBytes); err != nil {
|
||||
if err := h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata, metadataBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -130,7 +130,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal tags: %w", err)
|
||||
}
|
||||
if err := h.setExtendedAttribute(client, tablePath, ExtendedKeyTags, tagsBytes); err != nil {
|
||||
if err := h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyTags, tagsBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -189,7 +189,7 @@ func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request,
|
||||
|
||||
var metadata tableMetadataInternal
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
data, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata)
|
||||
data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -247,10 +247,10 @@ func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Reques
|
||||
|
||||
// If namespace is specified, list tables in that namespace only
|
||||
if req.Namespace != "" {
|
||||
err = h.listTablesInNamespace(filerClient, bucketName, req.Namespace, req.Prefix, maxTables, &tables)
|
||||
err = h.listTablesInNamespace(r.Context(), filerClient, bucketName, req.Namespace, req.Prefix, maxTables, &tables)
|
||||
} else {
|
||||
// List tables in all namespaces
|
||||
err = h.listTablesInAllNamespaces(filerClient, bucketName, req.Prefix, maxTables, &tables)
|
||||
err = h.listTablesInAllNamespaces(r.Context(), filerClient, bucketName, req.Prefix, maxTables, &tables)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@@ -265,11 +265,11 @@ func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Reques
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *S3TablesHandler) listTablesInNamespace(filerClient FilerClient, bucketName, namespace, prefix string, maxTables int, tables *[]TableSummary) error {
|
||||
func (h *S3TablesHandler) listTablesInNamespace(ctx context.Context, filerClient FilerClient, bucketName, namespace, prefix string, maxTables int, tables *[]TableSummary) error {
|
||||
namespacePath := getNamespacePath(bucketName, namespace)
|
||||
|
||||
return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
||||
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
|
||||
Directory: namespacePath,
|
||||
Limit: uint32(maxTables),
|
||||
})
|
||||
@@ -323,12 +323,12 @@ func (h *S3TablesHandler) listTablesInNamespace(filerClient FilerClient, bucketN
|
||||
})
|
||||
}
|
||||
|
||||
func (h *S3TablesHandler) listTablesInAllNamespaces(filerClient FilerClient, bucketName, prefix string, maxTables int, tables *[]TableSummary) error {
|
||||
func (h *S3TablesHandler) listTablesInAllNamespaces(ctx context.Context, filerClient FilerClient, bucketName, prefix string, maxTables int, tables *[]TableSummary) error {
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
|
||||
return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// List all namespaces first
|
||||
resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
||||
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
|
||||
Directory: bucketPath,
|
||||
Limit: 1000,
|
||||
})
|
||||
@@ -354,7 +354,7 @@ func (h *S3TablesHandler) listTablesInAllNamespaces(filerClient FilerClient, buc
|
||||
namespace := entry.Entry.Name
|
||||
|
||||
// List tables in this namespace
|
||||
if err := h.listTablesInNamespace(filerClient, bucketName, namespace, prefix, maxTables-len(*tables), tables); err != nil {
|
||||
if err := h.listTablesInNamespace(ctx, filerClient, bucketName, namespace, prefix, maxTables-len(*tables), tables); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -391,7 +391,7 @@ func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Reque
|
||||
// Check if table exists
|
||||
var tableExists bool
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
_, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata)
|
||||
_, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
|
||||
tableExists = err == nil
|
||||
return nil
|
||||
})
|
||||
@@ -403,7 +403,7 @@ func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
// Delete the table
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
return h.deleteDirectory(client, tablePath)
|
||||
return h.deleteDirectory(r.Context(), client, tablePath)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user