@@ -1,9 +1,10 @@
package etcd
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"go.etcd.io/etcd/client/pkg/v3/transport"
"strings"
"time"
@@ -26,49 +27,78 @@ func init() {
type EtcdStore struct {
client * clientv3 . Client
etcdKeyPrefix string
timeout time . Duration
}
func ( store * EtcdStore ) GetName ( ) string {
return "etcd"
}
func ( store * EtcdStore ) Initialize ( configuration weed_util . Configuration , prefix string ) ( err error ) {
func ( store * EtcdStore ) Initialize ( configuration weed_util . Configuration , prefix string ) error {
configuration . SetDefault ( prefix + "servers" , "localhost:2379" )
configuration . SetDefault ( prefix + "timeout" , "3s" )
servers := configuration . GetString ( prefix + "servers" )
if servers == "" {
servers = "localhost:2379"
}
username := configuration . GetString ( prefix + "username" )
password := configuration . GetString ( prefix + "password" )
store . etcdKeyPrefix = configuration . GetString ( prefix + "key_prefix" )
timeout := configuration . GetString ( prefix + "timeout" )
if timeout == "" {
timeout = "3s"
timeoutStr := configuration . GetString ( prefix + "timeout" )
timeout , err := time . ParseDuration ( timeoutStr )
if err ! = nil {
return fmt . Errorf ( "parse etcd store timeout: %v" , err )
}
store . timeout = timeout
certFile := configuration . GetString ( prefix + "tls_client_crt_file" )
keyFile := configuration . GetString ( prefix + "tls_client_key_file" )
caFile := configuration . GetString ( prefix + "tls_ca_file" )
var tlsConfig * tls . Config
if caFile != "" {
tlsInfo := transport . TLSInfo {
CertFile : certFile ,
KeyFile : keyFile ,
TrustedCAFile : caFile ,
}
var err error
tlsConfig , err = tlsInfo . ClientConfig ( )
if err != nil {
return fmt . Errorf ( "TLS client configuration error: %v" , err )
}
}
return store . initialize ( servers , username , password , timeout )
return store . initialize ( servers , username , password , store . timeout , tlsConfig )
}
func ( store * EtcdStore ) initialize ( servers string , username string , password string , timeout string ) ( err error ) {
func ( store * EtcdStore ) initialize ( servers , username , password string , timeout time . Duration , tlsConfig * tls . Config ) error {
glog . Infof ( "filer store etcd: %s" , servers )
to , err := time . ParseDuration ( timeout )
if err != nil {
return fmt . Errorf ( "parse timeout %s: %s" , timeout , err )
}
store . client , err = clientv3 . New ( clientv3 . Config {
clien t, err := clientv3 . New ( clientv3 . Config {
Endpoints : strings . Split ( servers , "," ) ,
Username : username ,
Password : password ,
DialTimeout : to ,
DialTimeout : timeout ,
TLS : tlsConfig ,
} )
if err != nil {
return fmt . Errorf ( "connect to etcd %s: %s" , servers , err )
}
return
ctx , cancel := context . WithTimeout ( context . Background ( ) , store . timeout )
defer cancel ( )
resp , err := client . Status ( ctx , client . Endpoints ( ) [ 0 ] )
if err != nil {
client . Close ( )
return fmt . Errorf ( "error checking etcd connection: %s" , err )
}
glog . V ( 0 ) . Infof ( "с onnection to etcd has been successfully verified. etcd version: %s" , resp . Version )
store . client = client
return nil
}
func ( store * EtcdStore ) BeginTransaction ( ctx context . Context ) ( context . Context , error ) {
@@ -159,15 +189,13 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_u
}
resp , err := store . client . Get ( ctx , store . etcdKeyPrefix + string ( lastFileStart ) ,
clientv3 . WithFromKey ( ) , clientv3 . WithLimit ( limit + 1 ) )
clientv3 . WithRange ( clientv3 . GetPrefixRangeEnd ( store . etcdKeyPrefix + string ( directoryPrefix ) ) ) ,
clientv3 . WithLimit ( limit + 1 ) )
if err != nil {
return lastFileName , fmt . Errorf ( "list %s : %v" , dirPath , err )
}
for _ , kv := range resp . Kvs {
if ! bytes . HasPrefix ( kv . Key , directoryPrefix ) {
break
}
fileName := getNameFromKey ( kv . Key )
if fileName == "" {
continue