Add Iceberg table details view
This commit is contained in:
@@ -7,6 +7,8 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -218,6 +220,7 @@ func (s *AdminServer) GetIcebergNamespacesData(ctx context.Context, catalogName,
|
||||
|
||||
return IcebergNamespacesData{
|
||||
CatalogName: catalogName,
|
||||
BucketARN: bucketArn,
|
||||
Namespaces: namespaces,
|
||||
TotalNamespaces: len(namespaces),
|
||||
LastUpdated: time.Now(),
|
||||
@@ -242,12 +245,288 @@ func (s *AdminServer) GetIcebergTablesData(ctx context.Context, catalogName, buc
|
||||
return IcebergTablesData{
|
||||
CatalogName: catalogName,
|
||||
NamespaceName: namespace,
|
||||
BucketARN: bucketArn,
|
||||
Tables: tables,
|
||||
TotalTables: len(tables),
|
||||
LastUpdated: time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetIcebergTableDetailsData returns Iceberg table metadata and snapshot information.
|
||||
func (s *AdminServer) GetIcebergTableDetailsData(ctx context.Context, catalogName, bucketArn, namespace, tableName string) (IcebergTableDetailsData, error) {
|
||||
var resp s3tables.GetTableResponse
|
||||
req := &s3tables.GetTableRequest{
|
||||
TableBucketARN: bucketArn,
|
||||
Namespace: []string{namespace},
|
||||
Name: tableName,
|
||||
}
|
||||
if err := s.executeS3TablesOperation(ctx, "GetTable", req, &resp); err != nil {
|
||||
return IcebergTableDetailsData{}, err
|
||||
}
|
||||
|
||||
details := IcebergTableDetailsData{
|
||||
CatalogName: catalogName,
|
||||
NamespaceName: namespace,
|
||||
TableName: resp.Name,
|
||||
BucketARN: bucketArn,
|
||||
TableARN: resp.TableARN,
|
||||
Format: resp.Format,
|
||||
CreatedAt: resp.CreatedAt,
|
||||
ModifiedAt: resp.ModifiedAt,
|
||||
MetadataLocation: resp.MetadataLocation,
|
||||
}
|
||||
|
||||
applyIcebergMetadata(resp.Metadata, &details)
|
||||
return details, nil
|
||||
}
|
||||
|
||||
type icebergFullMetadata struct {
|
||||
FormatVersion int `json:"format-version"`
|
||||
TableUUID string `json:"table-uuid"`
|
||||
Location string `json:"location"`
|
||||
LastUpdatedMs int64 `json:"last-updated-ms"`
|
||||
Schemas []icebergSchema `json:"schemas"`
|
||||
Schema *icebergSchema `json:"schema"`
|
||||
CurrentSchemaID int `json:"current-schema-id"`
|
||||
PartitionSpecs []icebergPartitionSpec `json:"partition-specs"`
|
||||
PartitionSpec *icebergPartitionSpec `json:"partition-spec"`
|
||||
DefaultSpecID int `json:"default-spec-id"`
|
||||
Properties map[string]string `json:"properties"`
|
||||
Snapshots []icebergSnapshot `json:"snapshots"`
|
||||
CurrentSnapshotID int64 `json:"current-snapshot-id"`
|
||||
}
|
||||
|
||||
type icebergSchema struct {
|
||||
SchemaID int `json:"schema-id"`
|
||||
Fields []icebergSchemaField `json:"fields"`
|
||||
}
|
||||
|
||||
type icebergSchemaField struct {
|
||||
ID int `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Required bool `json:"required"`
|
||||
}
|
||||
|
||||
type icebergPartitionSpec struct {
|
||||
SpecID int `json:"spec-id"`
|
||||
Fields []icebergPartitionField `json:"fields"`
|
||||
}
|
||||
|
||||
type icebergPartitionField struct {
|
||||
SourceID int `json:"source-id"`
|
||||
FieldID int `json:"field-id"`
|
||||
Name string `json:"name"`
|
||||
Transform string `json:"transform"`
|
||||
}
|
||||
|
||||
type icebergSnapshot struct {
|
||||
SnapshotID int64 `json:"snapshot-id"`
|
||||
TimestampMs int64 `json:"timestamp-ms"`
|
||||
ManifestList string `json:"manifest-list"`
|
||||
Summary map[string]string `json:"summary"`
|
||||
}
|
||||
|
||||
func applyIcebergMetadata(metadata *s3tables.TableMetadata, details *IcebergTableDetailsData) {
|
||||
if details == nil || metadata == nil {
|
||||
return
|
||||
}
|
||||
if len(metadata.FullMetadata) == 0 {
|
||||
details.SchemaFields = schemaFieldsFromIceberg(metadata.Iceberg)
|
||||
return
|
||||
}
|
||||
|
||||
var full icebergFullMetadata
|
||||
if err := json.Unmarshal(metadata.FullMetadata, &full); err != nil {
|
||||
glog.V(1).Infof("iceberg metadata parse failed: %v", err)
|
||||
details.MetadataError = fmt.Sprintf("Failed to parse Iceberg metadata: %v", err)
|
||||
details.SchemaFields = schemaFieldsFromIceberg(metadata.Iceberg)
|
||||
return
|
||||
}
|
||||
|
||||
details.TableLocation = full.Location
|
||||
details.SchemaFields = schemaFieldsFromFullMetadata(full, metadata.Iceberg)
|
||||
details.PartitionFields = partitionFieldsFromFullMetadata(full)
|
||||
details.Properties = propertiesFromFullMetadata(full.Properties)
|
||||
details.Snapshots = snapshotsFromFullMetadata(full.Snapshots)
|
||||
details.SnapshotCount = len(full.Snapshots)
|
||||
details.HasSnapshotCount = true
|
||||
if metricsSnapshot := selectSnapshotForMetrics(full); metricsSnapshot != nil {
|
||||
if value, ok := parseSummaryInt(metricsSnapshot.Summary, "total-data-files", "total-data-file-count", "total-files", "total-file-count"); ok {
|
||||
details.DataFileCount = value
|
||||
details.HasDataFileCount = true
|
||||
}
|
||||
if value, ok := parseSummaryInt(metricsSnapshot.Summary, "total-files-size", "total-data-files-size", "total-file-size", "total-data-file-size", "total-data-size", "total-size"); ok {
|
||||
details.TotalSizeBytes = value
|
||||
details.HasTotalSize = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func schemaFieldsFromFullMetadata(full icebergFullMetadata, fallback *s3tables.IcebergMetadata) []IcebergSchemaFieldInfo {
|
||||
if schema := selectSchema(full); schema != nil {
|
||||
fields := make([]IcebergSchemaFieldInfo, 0, len(schema.Fields))
|
||||
for _, field := range schema.Fields {
|
||||
fields = append(fields, IcebergSchemaFieldInfo{
|
||||
ID: field.ID,
|
||||
Name: field.Name,
|
||||
Type: field.Type,
|
||||
Required: field.Required,
|
||||
})
|
||||
}
|
||||
return fields
|
||||
}
|
||||
return schemaFieldsFromIceberg(fallback)
|
||||
}
|
||||
|
||||
func schemaFieldsFromIceberg(metadata *s3tables.IcebergMetadata) []IcebergSchemaFieldInfo {
|
||||
if metadata == nil {
|
||||
return nil
|
||||
}
|
||||
fields := make([]IcebergSchemaFieldInfo, 0, len(metadata.Schema.Fields))
|
||||
for _, field := range metadata.Schema.Fields {
|
||||
fields = append(fields, IcebergSchemaFieldInfo{
|
||||
Name: field.Name,
|
||||
Type: field.Type,
|
||||
Required: field.Required,
|
||||
})
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
func selectSchema(full icebergFullMetadata) *icebergSchema {
|
||||
if len(full.Schemas) == 0 && full.Schema == nil {
|
||||
return nil
|
||||
}
|
||||
if len(full.Schemas) == 0 {
|
||||
return full.Schema
|
||||
}
|
||||
if full.CurrentSchemaID != 0 {
|
||||
for i := range full.Schemas {
|
||||
if full.Schemas[i].SchemaID == full.CurrentSchemaID {
|
||||
return &full.Schemas[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
return &full.Schemas[0]
|
||||
}
|
||||
|
||||
func partitionFieldsFromFullMetadata(full icebergFullMetadata) []IcebergPartitionFieldInfo {
|
||||
var spec *icebergPartitionSpec
|
||||
if len(full.PartitionSpecs) == 0 && full.PartitionSpec == nil {
|
||||
return nil
|
||||
}
|
||||
if len(full.PartitionSpecs) == 0 {
|
||||
spec = full.PartitionSpec
|
||||
} else {
|
||||
if full.DefaultSpecID != 0 {
|
||||
for i := range full.PartitionSpecs {
|
||||
if full.PartitionSpecs[i].SpecID == full.DefaultSpecID {
|
||||
spec = &full.PartitionSpecs[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if spec == nil {
|
||||
spec = &full.PartitionSpecs[0]
|
||||
}
|
||||
}
|
||||
if spec == nil {
|
||||
return nil
|
||||
}
|
||||
fields := make([]IcebergPartitionFieldInfo, 0, len(spec.Fields))
|
||||
for _, field := range spec.Fields {
|
||||
fields = append(fields, IcebergPartitionFieldInfo{
|
||||
Name: field.Name,
|
||||
Transform: field.Transform,
|
||||
SourceID: field.SourceID,
|
||||
FieldID: field.FieldID,
|
||||
})
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
func propertiesFromFullMetadata(properties map[string]string) []IcebergPropertyInfo {
|
||||
if len(properties) == 0 {
|
||||
return nil
|
||||
}
|
||||
keys := make([]string, 0, len(properties))
|
||||
for key := range properties {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
entries := make([]IcebergPropertyInfo, 0, len(keys))
|
||||
for _, key := range keys {
|
||||
entries = append(entries, IcebergPropertyInfo{Key: key, Value: properties[key]})
|
||||
}
|
||||
return entries
|
||||
}
|
||||
|
||||
func snapshotsFromFullMetadata(snapshots []icebergSnapshot) []IcebergSnapshotInfo {
|
||||
if len(snapshots) == 0 {
|
||||
return nil
|
||||
}
|
||||
sort.Slice(snapshots, func(i, j int) bool {
|
||||
return snapshots[i].TimestampMs > snapshots[j].TimestampMs
|
||||
})
|
||||
info := make([]IcebergSnapshotInfo, 0, len(snapshots))
|
||||
for _, snapshot := range snapshots {
|
||||
operation := ""
|
||||
if snapshot.Summary != nil {
|
||||
operation = snapshot.Summary["operation"]
|
||||
}
|
||||
timestamp := time.Time{}
|
||||
if snapshot.TimestampMs > 0 {
|
||||
timestamp = time.Unix(0, snapshot.TimestampMs*int64(time.Millisecond))
|
||||
}
|
||||
info = append(info, IcebergSnapshotInfo{
|
||||
SnapshotID: snapshot.SnapshotID,
|
||||
Timestamp: timestamp,
|
||||
Operation: operation,
|
||||
ManifestList: snapshot.ManifestList,
|
||||
})
|
||||
}
|
||||
return info
|
||||
}
|
||||
|
||||
func selectSnapshotForMetrics(full icebergFullMetadata) *icebergSnapshot {
|
||||
if len(full.Snapshots) == 0 {
|
||||
return nil
|
||||
}
|
||||
if full.CurrentSnapshotID != 0 {
|
||||
for i := range full.Snapshots {
|
||||
if full.Snapshots[i].SnapshotID == full.CurrentSnapshotID {
|
||||
return &full.Snapshots[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
latest := full.Snapshots[0]
|
||||
for _, snapshot := range full.Snapshots[1:] {
|
||||
if snapshot.TimestampMs > latest.TimestampMs {
|
||||
latest = snapshot
|
||||
}
|
||||
}
|
||||
return &latest
|
||||
}
|
||||
|
||||
func parseSummaryInt(summary map[string]string, keys ...string) (int64, bool) {
|
||||
if len(summary) == 0 {
|
||||
return 0, false
|
||||
}
|
||||
for _, key := range keys {
|
||||
value, ok := summary[key]
|
||||
if !ok || value == "" {
|
||||
continue
|
||||
}
|
||||
parsed, err := strconv.ParseInt(value, 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
return parsed, true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// API handlers
|
||||
|
||||
func (s *AdminServer) ListS3TablesBucketsAPI(c *gin.Context) {
|
||||
|
||||
@@ -621,6 +621,7 @@ type IcebergNamespaceInfo struct {
|
||||
type IcebergNamespacesData struct {
|
||||
Username string `json:"username"`
|
||||
CatalogName string `json:"catalog_name"`
|
||||
BucketARN string `json:"bucket_arn"`
|
||||
Namespaces []IcebergNamespaceInfo `json:"namespaces"`
|
||||
TotalNamespaces int `json:"total_namespaces"`
|
||||
LastUpdated time.Time `json:"last_updated"`
|
||||
@@ -635,7 +636,59 @@ type IcebergTablesData struct {
|
||||
Username string `json:"username"`
|
||||
CatalogName string `json:"catalog_name"`
|
||||
NamespaceName string `json:"namespace_name"`
|
||||
BucketARN string `json:"bucket_arn"`
|
||||
Tables []IcebergTableInfo `json:"tables"`
|
||||
TotalTables int `json:"total_tables"`
|
||||
LastUpdated time.Time `json:"last_updated"`
|
||||
}
|
||||
|
||||
type IcebergSchemaFieldInfo struct {
|
||||
ID int `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Required bool `json:"required"`
|
||||
}
|
||||
|
||||
type IcebergPartitionFieldInfo struct {
|
||||
Name string `json:"name"`
|
||||
Transform string `json:"transform"`
|
||||
SourceID int `json:"source_id"`
|
||||
FieldID int `json:"field_id"`
|
||||
}
|
||||
|
||||
type IcebergPropertyInfo struct {
|
||||
Key string `json:"key"`
|
||||
Value string `json:"value"`
|
||||
}
|
||||
|
||||
type IcebergSnapshotInfo struct {
|
||||
SnapshotID int64 `json:"snapshot_id"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Operation string `json:"operation"`
|
||||
ManifestList string `json:"manifest_list"`
|
||||
}
|
||||
|
||||
type IcebergTableDetailsData struct {
|
||||
Username string `json:"username"`
|
||||
CatalogName string `json:"catalog_name"`
|
||||
NamespaceName string `json:"namespace_name"`
|
||||
TableName string `json:"table_name"`
|
||||
BucketARN string `json:"bucket_arn"`
|
||||
TableARN string `json:"table_arn"`
|
||||
Format string `json:"format"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
ModifiedAt time.Time `json:"modified_at"`
|
||||
MetadataLocation string `json:"metadata_location"`
|
||||
TableLocation string `json:"table_location"`
|
||||
SchemaFields []IcebergSchemaFieldInfo `json:"schema_fields"`
|
||||
PartitionFields []IcebergPartitionFieldInfo `json:"partition_fields"`
|
||||
Properties []IcebergPropertyInfo `json:"properties"`
|
||||
Snapshots []IcebergSnapshotInfo `json:"snapshots"`
|
||||
SnapshotCount int `json:"snapshot_count"`
|
||||
HasSnapshotCount bool `json:"has_snapshot_count"`
|
||||
DataFileCount int64 `json:"data_file_count"`
|
||||
HasDataFileCount bool `json:"has_data_file_count"`
|
||||
TotalSizeBytes int64 `json:"total_size_bytes"`
|
||||
HasTotalSize bool `json:"has_total_size"`
|
||||
MetadataError string `json:"metadata_error,omitempty"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user