|
|
|
|
@@ -4,9 +4,11 @@ import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"os"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/gocql/gocql"
|
|
|
|
|
gocql "github.com/apache/cassandra-gocql-driver/v2"
|
|
|
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
|
|
|
@@ -29,14 +31,23 @@ func (store *Cassandra2Store) GetName() string {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (store *Cassandra2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
|
|
|
|
|
enableHostVerification := true
|
|
|
|
|
if val := configuration.GetString(prefix + "ssl_enable_host_verification"); val != "" {
|
|
|
|
|
enableHostVerification = configuration.GetBool(prefix + "ssl_enable_host_verification")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return store.initialize(
|
|
|
|
|
configuration.GetString(prefix+"keyspace"),
|
|
|
|
|
configuration.GetStringSlice(prefix+"hosts"),
|
|
|
|
|
configuration.GetString(prefix+"username"),
|
|
|
|
|
configuration.GetString(prefix+"password"),
|
|
|
|
|
configuration.GetString(prefix+"ssl_ca_path"),
|
|
|
|
|
configuration.GetString(prefix+"ssl_cert_path"),
|
|
|
|
|
configuration.GetString(prefix+"ssl_key_path"),
|
|
|
|
|
configuration.GetStringSlice(prefix+"superLargeDirectories"),
|
|
|
|
|
configuration.GetString(prefix+"localDC"),
|
|
|
|
|
configuration.GetInt(prefix+"connection_timeout_millisecond"),
|
|
|
|
|
enableHostVerification,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -45,11 +56,50 @@ func (store *Cassandra2Store) isSuperLargeDirectory(dir string) (dirHash string,
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (store *Cassandra2Store) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string, localDC string, timeout int) (err error) {
|
|
|
|
|
func (store *Cassandra2Store) initialize(keyspace string, hosts []string, username string, password string, sslCaPath string, sslCertPath string, sslKeyPath string, superLargeDirectories []string, localDC string, timeout int, enableHostVerification bool) (err error) {
|
|
|
|
|
store.cluster = gocql.NewCluster(hosts...)
|
|
|
|
|
if username != "" && password != "" {
|
|
|
|
|
store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
|
|
|
|
|
}
|
|
|
|
|
if sslCaPath != "" || sslCertPath != "" || sslKeyPath != "" {
|
|
|
|
|
if (sslCertPath != "" && sslKeyPath == "") || (sslCertPath == "" && sslKeyPath != "") {
|
|
|
|
|
return fmt.Errorf("both ssl_cert_path and ssl_key_path must be provided for mTLS, or neither")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, path := range []string{sslCaPath, sslCertPath, sslKeyPath} {
|
|
|
|
|
if path != "" {
|
|
|
|
|
if _, err := os.Stat(path); err != nil {
|
|
|
|
|
return fmt.Errorf("ssl file %s not found: %v", path, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
store.cluster.SslOpts = &gocql.SslOptions{
|
|
|
|
|
CaPath: sslCaPath,
|
|
|
|
|
CertPath: sslCertPath,
|
|
|
|
|
KeyPath: sslKeyPath,
|
|
|
|
|
EnableHostVerification: enableHostVerification,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// check if port is already specified in hosts
|
|
|
|
|
hasPort := false
|
|
|
|
|
for _, host := range hosts {
|
|
|
|
|
if strings.Contains(host, ":") {
|
|
|
|
|
hasPort = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !hasPort {
|
|
|
|
|
// standard cassandra port is 9042, but AWS keyspaces uses 9142
|
|
|
|
|
store.cluster.Port = 9142
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if sslCertPath != "" {
|
|
|
|
|
glog.V(0).Infof("TLS enabled: mTLS with cert %s", sslCertPath)
|
|
|
|
|
} else {
|
|
|
|
|
glog.V(0).Infof("TLS enabled: server-verification with ca %s", sslCaPath)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
store.cluster.Keyspace = keyspace
|
|
|
|
|
store.cluster.Timeout = time.Duration(timeout) * time.Millisecond
|
|
|
|
|
glog.V(0).Infof("timeout = %d", timeout)
|
|
|
|
|
|