merge current message queue code changes (#6201)
* listing files to convert to parquet * write parquet files * save logs into parquet files * pass by value * compact logs into parquet format * can skip existing files * refactor * refactor * fix compilation * when no partition found * refactor * add untested parquet file read * rename package * refactor * rename files * remove unused * add merged log read func * parquet wants to know the file size * rewind by time * pass in stop ts * add stop ts * adjust log * minor * adjust log * skip .parquet files when reading message logs * skip non message files * Update subscriber_record.go * send messages * skip message data with only ts * skip non log files * update parquet-go package * ensure a valid record type * add new field to a record type * Update read_parquet_to_log.go * fix parquet file name generation * separating reading parquet and logs * add key field * add skipped logs * use in memory cache * refactor * refactor * refactor * refactor, and change compact log * refactor * rename * refactor * fix format * prefix v to version directory
This commit is contained in:
454
weed/mq/logstore/log_to_parquet.go
Normal file
454
weed/mq/logstore/log_to_parquet.go
Normal file
@@ -0,0 +1,454 @@
|
||||
package logstore
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/parquet-go/parquet-go"
|
||||
"github.com/parquet-go/parquet-go/compress/zstd"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
SW_COLUMN_NAME_TS = "_ts_ns"
|
||||
SW_COLUMN_NAME_KEY = "_key"
|
||||
)
|
||||
|
||||
func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
|
||||
// list the topic partition versions
|
||||
topicVersions, err := collectTopicVersions(filerClient, t, timeAgo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list topic files: %v", err)
|
||||
}
|
||||
|
||||
// compact the partitions
|
||||
for _, topicVersion := range topicVersions {
|
||||
partitions, err := collectTopicVersionsPartitions(filerClient, t, topicVersion)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list partitions %s/%s/%s: %v", t.Namespace, t.Name, topicVersion, err)
|
||||
}
|
||||
for _, partition := range partitions {
|
||||
err := compactTopicPartition(filerClient, t, timeAgo, recordType, partition, preference)
|
||||
if err != nil {
|
||||
return fmt.Errorf("compact partition %s/%s/%s/%s: %v", t.Namespace, t.Name, topicVersion, partition, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration) (partitionVersions []time.Time, err error) {
|
||||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
t, err := topic.ParseTopicVersion(entry.Name)
|
||||
if err != nil {
|
||||
// skip non-partition directories
|
||||
return nil
|
||||
}
|
||||
if t.Unix() < time.Now().Unix()-int64(timeAgo/time.Second) {
|
||||
partitionVersions = append(partitionVersions, t)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func collectTopicVersionsPartitions(filerClient filer_pb.FilerClient, t topic.Topic, topicVersion time.Time) (partitions []topic.Partition, err error) {
|
||||
version := topicVersion.Format(topic.PartitionGenerationFormat)
|
||||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
if !entry.IsDirectory {
|
||||
return nil
|
||||
}
|
||||
start, stop := topic.ParsePartitionBoundary(entry.Name)
|
||||
if start != stop {
|
||||
partitions = append(partitions, topic.Partition{
|
||||
RangeStart: start,
|
||||
RangeStop: stop,
|
||||
RingSize: topic.PartitionCount,
|
||||
UnixTimeNs: topicVersion.UnixNano(),
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func compactTopicPartition(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, partition topic.Partition, preference *operation.StoragePreference) error {
|
||||
partitionDir := topic.PartitionDir(t, partition)
|
||||
|
||||
// compact the partition directory
|
||||
return compactTopicPartitionDir(filerClient, t.Name, partitionDir, timeAgo, recordType, preference)
|
||||
}
|
||||
|
||||
func compactTopicPartitionDir(filerClient filer_pb.FilerClient, topicName, partitionDir string, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
|
||||
// read all existing parquet files
|
||||
minTsNs, maxTsNs, err := readAllParquetFiles(filerClient, partitionDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// read all log files
|
||||
logFiles, err := readAllLogFiles(filerClient, partitionDir, timeAgo, minTsNs, maxTsNs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(logFiles) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// divide log files into groups of 128MB
|
||||
logFileGroups := groupFilesBySize(logFiles, 128*1024*1024)
|
||||
|
||||
// write to parquet file
|
||||
parquetLevels, err := schema.ToParquetLevels(recordType)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ToParquetLevels failed %+v: %v", recordType, err)
|
||||
}
|
||||
|
||||
// create a parquet schema
|
||||
parquetSchema, err := schema.ToParquetSchema(topicName, recordType)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ToParquetSchema failed: %v", err)
|
||||
}
|
||||
|
||||
// TODO parallelize the writing
|
||||
for _, logFileGroup := range logFileGroups {
|
||||
if err = writeLogFilesToParquet(filerClient, partitionDir, recordType, logFileGroup, parquetSchema, parquetLevels, preference); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func groupFilesBySize(logFiles []*filer_pb.Entry, maxGroupSize int64) (logFileGroups [][]*filer_pb.Entry) {
|
||||
var logFileGroup []*filer_pb.Entry
|
||||
var groupSize int64
|
||||
for _, logFile := range logFiles {
|
||||
if groupSize+int64(logFile.Attributes.FileSize) > maxGroupSize {
|
||||
logFileGroups = append(logFileGroups, logFileGroup)
|
||||
logFileGroup = nil
|
||||
groupSize = 0
|
||||
}
|
||||
logFileGroup = append(logFileGroup, logFile)
|
||||
groupSize += int64(logFile.Attributes.FileSize)
|
||||
}
|
||||
if len(logFileGroup) > 0 {
|
||||
logFileGroups = append(logFileGroups, logFileGroup)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, timeAgo time.Duration, minTsNs, maxTsNs int64) (logFiles []*filer_pb.Entry, err error) {
|
||||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
if strings.HasSuffix(entry.Name, ".parquet") {
|
||||
return nil
|
||||
}
|
||||
if entry.Attributes.Crtime > time.Now().Unix()-int64(timeAgo/time.Second) {
|
||||
return nil
|
||||
}
|
||||
logTime, err := time.Parse(topic.TIME_FORMAT, entry.Name)
|
||||
if err != nil {
|
||||
// glog.Warningf("parse log time %s: %v", entry.Name, err)
|
||||
return nil
|
||||
}
|
||||
if maxTsNs > 0 && logTime.UnixNano() <= maxTsNs {
|
||||
return nil
|
||||
}
|
||||
logFiles = append(logFiles, entry)
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) (minTsNs, maxTsNs int64, err error) {
|
||||
err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
if !strings.HasSuffix(entry.Name, ".parquet") {
|
||||
return nil
|
||||
}
|
||||
if len(entry.Extended) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// read min ts
|
||||
minTsBytes := entry.Extended["min"]
|
||||
if len(minTsBytes) != 8 {
|
||||
return nil
|
||||
}
|
||||
minTs := int64(binary.BigEndian.Uint64(minTsBytes))
|
||||
if minTsNs == 0 || minTs < minTsNs {
|
||||
minTsNs = minTs
|
||||
}
|
||||
|
||||
// read max ts
|
||||
maxTsBytes := entry.Extended["max"]
|
||||
if len(maxTsBytes) != 8 {
|
||||
return nil
|
||||
}
|
||||
maxTs := int64(binary.BigEndian.Uint64(maxTsBytes))
|
||||
if maxTsNs == 0 || maxTs > maxTsNs {
|
||||
maxTsNs = maxTs
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) {
|
||||
|
||||
tempFile, err := os.CreateTemp(".", "t*.parquet")
|
||||
if err != nil {
|
||||
return fmt.Errorf("create temp file: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
tempFile.Close()
|
||||
os.Remove(tempFile.Name())
|
||||
}()
|
||||
|
||||
writer := parquet.NewWriter(tempFile, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}))
|
||||
rowBuilder := parquet.NewRowBuilder(parquetSchema)
|
||||
|
||||
var startTsNs, stopTsNs int64
|
||||
|
||||
for _, logFile := range logFileGroups {
|
||||
fmt.Printf("compact %s/%s ", partitionDir, logFile.Name)
|
||||
var rows []parquet.Row
|
||||
if err := iterateLogEntries(filerClient, logFile, func(entry *filer_pb.LogEntry) error {
|
||||
|
||||
if startTsNs == 0 {
|
||||
startTsNs = entry.TsNs
|
||||
}
|
||||
stopTsNs = entry.TsNs
|
||||
|
||||
if len(entry.Key) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// write to parquet file
|
||||
rowBuilder.Reset()
|
||||
|
||||
record := &schema_pb.RecordValue{}
|
||||
if err := proto.Unmarshal(entry.Data, record); err != nil {
|
||||
return fmt.Errorf("unmarshal record value: %v", err)
|
||||
}
|
||||
|
||||
record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
|
||||
Kind: &schema_pb.Value_Int64Value{
|
||||
Int64Value: entry.TsNs,
|
||||
},
|
||||
}
|
||||
record.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
|
||||
Kind: &schema_pb.Value_BytesValue{
|
||||
BytesValue: entry.Key,
|
||||
},
|
||||
}
|
||||
|
||||
if err := schema.AddRecordValue(rowBuilder, recordType, parquetLevels, record); err != nil {
|
||||
return fmt.Errorf("add record value: %v", err)
|
||||
}
|
||||
|
||||
rows = append(rows, rowBuilder.Row())
|
||||
|
||||
return nil
|
||||
|
||||
}); err != nil {
|
||||
return fmt.Errorf("iterate log entry %v/%v: %v", partitionDir, logFile.Name, err)
|
||||
}
|
||||
|
||||
fmt.Printf("processed %d rows\n", len(rows))
|
||||
|
||||
if _, err := writer.WriteRows(rows); err != nil {
|
||||
return fmt.Errorf("write rows: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
return fmt.Errorf("close writer: %v", err)
|
||||
}
|
||||
|
||||
// write to parquet file to partitionDir
|
||||
parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05"))
|
||||
if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil {
|
||||
return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error {
|
||||
uploader, err := operation.NewUploader()
|
||||
if err != nil {
|
||||
return fmt.Errorf("new uploader: %v", err)
|
||||
}
|
||||
|
||||
// get file size
|
||||
fileInfo, err := sourceFile.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat source file: %v", err)
|
||||
}
|
||||
|
||||
// upload file in chunks
|
||||
chunkSize := int64(4 * 1024 * 1024)
|
||||
chunkCount := (fileInfo.Size() + chunkSize - 1) / chunkSize
|
||||
entry := &filer_pb.Entry{
|
||||
Name: parquetFileName,
|
||||
Attributes: &filer_pb.FuseAttributes{
|
||||
Crtime: time.Now().Unix(),
|
||||
Mtime: time.Now().Unix(),
|
||||
FileMode: uint32(os.FileMode(0644)),
|
||||
FileSize: uint64(fileInfo.Size()),
|
||||
Mime: "application/vnd.apache.parquet",
|
||||
},
|
||||
}
|
||||
entry.Extended = make(map[string][]byte)
|
||||
minTsBytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs))
|
||||
entry.Extended["min"] = minTsBytes
|
||||
maxTsBytes := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs))
|
||||
entry.Extended["max"] = maxTsBytes
|
||||
|
||||
for i := int64(0); i < chunkCount; i++ {
|
||||
fileId, uploadResult, err, _ := uploader.UploadWithRetry(
|
||||
filerClient,
|
||||
&filer_pb.AssignVolumeRequest{
|
||||
Count: 1,
|
||||
Replication: preference.Replication,
|
||||
Collection: preference.Collection,
|
||||
TtlSec: 0, // TODO set ttl
|
||||
DiskType: preference.DiskType,
|
||||
Path: partitionDir + "/" + parquetFileName,
|
||||
},
|
||||
&operation.UploadOption{
|
||||
Filename: parquetFileName,
|
||||
Cipher: false,
|
||||
IsInputCompressed: false,
|
||||
MimeType: "application/vnd.apache.parquet",
|
||||
PairMap: nil,
|
||||
},
|
||||
func(host, fileId string) string {
|
||||
return fmt.Sprintf("http://%s/%s", host, fileId)
|
||||
},
|
||||
io.NewSectionReader(sourceFile, i*chunkSize, chunkSize),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("upload chunk %d: %v", i, err)
|
||||
}
|
||||
if uploadResult.Error != "" {
|
||||
return fmt.Errorf("upload result: %v", uploadResult.Error)
|
||||
}
|
||||
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano()))
|
||||
}
|
||||
|
||||
// write the entry to partitionDir
|
||||
if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
|
||||
Directory: partitionDir,
|
||||
Entry: entry,
|
||||
})
|
||||
}); err != nil {
|
||||
return fmt.Errorf("create entry: %v", err)
|
||||
}
|
||||
fmt.Printf("saved to %s/%s\n", partitionDir, parquetFileName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry, eachLogEntryFn func(entry *filer_pb.LogEntry) error) error {
|
||||
lookupFn := filer.LookupFn(filerClient)
|
||||
_, err := eachFile(logFile, lookupFn, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
|
||||
if err := eachLogEntryFn(logEntry); err != nil {
|
||||
return true, err
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
|
||||
if len(entry.Content) > 0 {
|
||||
// skip .offset files
|
||||
return
|
||||
}
|
||||
var urlStrings []string
|
||||
for _, chunk := range entry.Chunks {
|
||||
if chunk.Size == 0 {
|
||||
continue
|
||||
}
|
||||
if chunk.IsChunkManifest {
|
||||
fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name)
|
||||
return
|
||||
}
|
||||
urlStrings, err = lookupFileIdFn(chunk.FileId)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
|
||||
return
|
||||
}
|
||||
if len(urlStrings) == 0 {
|
||||
err = fmt.Errorf("no url found for %s", chunk.FileId)
|
||||
return
|
||||
}
|
||||
|
||||
// try one of the urlString until util.Get(urlString) succeeds
|
||||
var processed bool
|
||||
for _, urlString := range urlStrings {
|
||||
var data []byte
|
||||
if data, _, err = util_http.Get(urlString); err == nil {
|
||||
processed = true
|
||||
if processedTsNs, err = eachChunk(data, eachLogEntryFn); err != nil {
|
||||
return
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if !processed {
|
||||
err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
|
||||
for pos := 0; pos+4 < len(buf); {
|
||||
|
||||
size := util.BytesToUint32(buf[pos : pos+4])
|
||||
if pos+4+int(size) > len(buf) {
|
||||
err = fmt.Errorf("reach each log chunk: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
|
||||
return
|
||||
}
|
||||
entryData := buf[pos+4 : pos+4+int(size)]
|
||||
|
||||
logEntry := &filer_pb.LogEntry{}
|
||||
if err = proto.Unmarshal(entryData, logEntry); err != nil {
|
||||
pos += 4 + int(size)
|
||||
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err = eachLogEntryFn(logEntry); err != nil {
|
||||
err = fmt.Errorf("process log entry %v: %v", logEntry, err)
|
||||
return
|
||||
}
|
||||
|
||||
processedTsNs = logEntry.TsNs
|
||||
|
||||
pos += 4 + int(size)
|
||||
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
Reference in New Issue
Block a user