Files
seaweedFS/weed/filer/hbase/hbase_store.go
tam-i13 b669607fcd Add error list each entry func (#7485)
* added error return in type ListEachEntryFunc

* return error if errClose

* fix fmt.Errorf

* fix return errClose

* use %w fmt.Errorf

* added entry in messege error

* add callbackErr in ListDirectoryEntries

* fix error

* add log

* clear err when the scanner stops on io.EOF, so returning err doesn’t surface EOF as a failure.

* more info in error

* add ctx to logs, error handling

* fix return eachEntryFunc

* fix

* fix log

* fix return

* fix foundationdb test s

* fix eachEntryFunc

* fix return resEachEntryFuncErr

* Update weed/filer/filer.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/filer/elastic/v7/elastic_store.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/filer/hbase/hbase_store.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/filer/foundationdb/foundationdb_store.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/filer/ydb/ydb_store.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* fix

* add scanErr

---------

Co-authored-by: Roman Tamarov <r.tamarov@kryptonite.ru>
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
Co-authored-by: chrislu <chris.lu@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-11-25 19:35:19 -08:00

239 lines
5.9 KiB
Go

package hbase
import (
"bytes"
"context"
"fmt"
"io"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/tsuna/gohbase"
"github.com/tsuna/gohbase/hrpc"
)
func init() {
filer.Stores = append(filer.Stores, &HbaseStore{})
}
type HbaseStore struct {
Client gohbase.Client
table []byte
cfKv string
cfMetaDir string
column string
}
func (store *HbaseStore) GetName() string {
return "hbase"
}
func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"zkquorum"),
configuration.GetString(prefix+"table"),
)
}
func (store *HbaseStore) initialize(zkquorum, table string) (err error) {
store.Client = gohbase.NewClient(zkquorum)
store.table = []byte(table)
store.cfKv = "kv"
store.cfMetaDir = "meta"
store.column = "a"
// check table exists
key := "whatever"
headers := map[string][]string{store.cfMetaDir: nil}
get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers))
if err != nil {
return fmt.Errorf("NewGet returned an error: %w", err)
}
_, err = store.Client.Get(get)
if err != gohbase.TableNotFound {
return nil
}
// create table
adminClient := gohbase.NewAdminClient(zkquorum)
cFamilies := []string{store.cfKv, store.cfMetaDir}
cf := make(map[string]map[string]string, len(cFamilies))
for _, f := range cFamilies {
cf[f] = nil
}
ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf)
if err := adminClient.CreateTable(ct); err != nil {
return err
}
return nil
}
func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec)
}
func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry)
}
func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) {
value, err := store.doGet(ctx, store.cfMetaDir, []byte(path))
if err != nil {
if err == filer.ErrKvNotFound {
return nil, filer_pb.ErrNotFound
}
return nil, err
}
entry = &filer.Entry{
FullPath: path,
}
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
return entry, nil
}
func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) {
return store.doDelete(ctx, store.cfMetaDir, []byte(path))
}
func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(path.Child(""))
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
if err != nil {
return err
}
scanner := store.Client.Scan(scan)
defer scanner.Close()
for {
res, err := scanner.Next()
if err != nil {
break
}
if len(res.Cells) == 0 {
continue
}
cell := res.Cells[0]
if !bytes.HasPrefix(cell.Row, expectedPrefix) {
break
}
fullpath := util.FullPath(cell.Row)
dir, _ := fullpath.DirAndName()
if dir != string(path) {
continue
}
err = store.doDelete(ctx, store.cfMetaDir, cell.Row)
if err != nil {
break
}
}
return
}
func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(dirPath.Child(prefix))
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
if err != nil {
return lastFileName, err
}
scanner := store.Client.Scan(scan)
defer scanner.Close()
for {
res, scanErr := scanner.Next()
if scanErr == io.EOF {
break
}
if scanErr != nil {
return lastFileName, scanErr
}
if len(res.Cells) == 0 {
continue
}
cell := res.Cells[0]
if !bytes.HasPrefix(cell.Row, expectedPrefix) {
break
}
fullpath := util.FullPath(cell.Row)
dir, fileName := fullpath.DirAndName()
if dir != string(dirPath) {
continue
}
value := cell.Value
if fileName == startFileName && !includeStartFile {
continue
}
limit--
if limit < 0 {
break
}
lastFileName = fileName
entry := &filer.Entry{
FullPath: fullpath,
}
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil {
err = decodeErr
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
if resEachEntryFuncErr != nil {
return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr)
}
if !resEachEntryFunc {
break
}
}
return lastFileName, err
}
func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
func (store *HbaseStore) CommitTransaction(ctx context.Context) error {
return nil
}
func (store *HbaseStore) RollbackTransaction(ctx context.Context) error {
return nil
}
func (store *HbaseStore) Shutdown() {
store.Client.Close()
}