fix(tikv): improve context propagation and refactor batch delete logic (#7558)
* fix(tikv): improve context propagation and refactor batch delete logic Address review comments from PR #7557: 1. Replace context.TODO() with ctx in txn.Get calls - Fixes timeout/cancellation propagation in FindEntry - Fixes timeout/cancellation propagation in KvGet 2. Refactor DeleteFolderChildren to use flush helper - Eliminates code duplication - Cleaner and more maintainable These changes ensure proper context propagation throughout all TiKV operations and improve code maintainability. * error formatting
This commit is contained in:
@@ -112,7 +112,7 @@ func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*fil
|
|||||||
}
|
}
|
||||||
var value []byte = nil
|
var value []byte = nil
|
||||||
err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
|
err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
|
||||||
val, err := txn.Get(context.TODO(), key)
|
val, err := txn.Get(ctx, key)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
value = val
|
value = val
|
||||||
}
|
}
|
||||||
@@ -180,6 +180,17 @@ func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.Full
|
|||||||
|
|
||||||
var keys [][]byte
|
var keys [][]byte
|
||||||
|
|
||||||
|
flush := func() error {
|
||||||
|
if len(keys) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := store.deleteBatch(ctx, keys); err != nil {
|
||||||
|
return fmt.Errorf("delete batch in %s, error: %w", path, err)
|
||||||
|
}
|
||||||
|
keys = keys[:0]
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
for iter.Valid() {
|
for iter.Valid() {
|
||||||
key := iter.Key()
|
key := iter.Key()
|
||||||
if !bytes.HasPrefix(key, directoryPrefix) {
|
if !bytes.HasPrefix(key, directoryPrefix) {
|
||||||
@@ -189,10 +200,9 @@ func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.Full
|
|||||||
keys = append(keys, append([]byte(nil), key...))
|
keys = append(keys, append([]byte(nil), key...))
|
||||||
|
|
||||||
if len(keys) >= store.batchCommitSize {
|
if len(keys) >= store.batchCommitSize {
|
||||||
if err := store.deleteBatch(ctx, keys); err != nil {
|
if err := flush(); err != nil {
|
||||||
return fmt.Errorf("delete batch in %s, error: %v", path, err)
|
return err
|
||||||
}
|
}
|
||||||
keys = keys[:0]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := iter.Next(); err != nil {
|
if err := iter.Next(); err != nil {
|
||||||
@@ -200,13 +210,7 @@ func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.Full
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(keys) > 0 {
|
return flush()
|
||||||
if err := store.deleteBatch(ctx, keys); err != nil {
|
|
||||||
return fmt.Errorf("delete batch in %s, error: %v", path, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *TikvStore) deleteBatch(ctx context.Context, keys [][]byte) error {
|
func (store *TikvStore) deleteBatch(ctx context.Context, keys [][]byte) error {
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) {
|
|||||||
}
|
}
|
||||||
var data []byte = nil
|
var data []byte = nil
|
||||||
err = tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
|
err = tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
|
||||||
val, err := txn.Get(context.TODO(), key)
|
val, err := txn.Get(ctx, key)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
data = val
|
data = val
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user