support compacting a volume

This commit is contained in:
Chris Lu
2012-11-07 01:51:43 -08:00
parent 9630825576
commit 86c8f248bd
16 changed files with 709 additions and 199 deletions

View File

@@ -47,17 +47,19 @@ func runFix(cmd *Command, args []string) bool {
//skip the volume super block
dataFile.Seek(storage.SuperBlockSize, 0)
n, length := storage.ReadNeedle(dataFile)
n, rest := storage.ReadNeedle(dataFile)
dataFile.Seek(int64(rest), 1)
nm := storage.NewNeedleMap(indexFile)
offset := uint32(storage.SuperBlockSize)
for n != nil {
debug("key", n.Id, "volume offset", offset, "data_size", n.Size, "length", length)
debug("key", n.Id, "volume offset", offset, "data_size", n.Size, "rest", rest)
if n.Size > 0 {
count, pe := nm.Put(n.Id, offset/8, n.Size)
debug("saved", count, "with error", pe)
}
offset += length
n, length = storage.ReadNeedle(dataFile)
offset += rest+16
n, rest = storage.ReadNeedle(dataFile)
dataFile.Seek(int64(rest), 1)
}
return true
}

View File

@@ -122,6 +122,27 @@ func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, m)
}
func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
count := 0
rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
if topo.FreeSpace() < count*rt.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
} else {
count, err = vg.GrowByCountAndType(count, rt, topo)
}
}
}
if err != nil {
w.WriteHeader(http.StatusNotAcceptable)
writeJson(w, r, map[string]string{"error": err.Error()})
} else {
w.WriteHeader(http.StatusNotAcceptable)
writeJson(w, r, map[string]interface{}{"count": count})
}
}
func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
count := 0
rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))

View File

@@ -6,7 +6,7 @@ import (
)
const (
VERSION = "0.23"
VERSION = "0.24"
)
var cmdVersion = &Command{

View File

@@ -55,7 +55,25 @@ func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
} else {
writeJson(w, r, map[string]string{"error": err.Error()})
}
debug("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
}
func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
err := store.CompactVolume(r.FormValue("volume"))
if err == nil {
writeJson(w, r, map[string]string{"error": ""})
} else {
writeJson(w, r, map[string]string{"error": err.Error()})
}
debug("compacted volume =", r.FormValue("volume"), ", error =", err)
}
func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
count, err := store.CommitCompactVolume(r.FormValue("volume"))
if err == nil {
writeJson(w, r, map[string]interface{}{"error": "", "size":count})
} else {
writeJson(w, r, map[string]string{"error": err.Error()})
}
debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
}
func storeHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
@@ -250,9 +268,9 @@ func distributedOperation(volumeId storage.VolumeId, op func(location operation.
}
func runVolume(cmd *Command, args []string) bool {
if *vMaxCpu < 1 {
*vMaxCpu = runtime.NumCPU()
}
if *vMaxCpu < 1 {
*vMaxCpu = runtime.NumCPU()
}
runtime.GOMAXPROCS(*vMaxCpu)
fileInfo, err := os.Stat(*volumeFolder)
if err != nil {
@@ -273,6 +291,8 @@ func runVolume(cmd *Command, args []string) bool {
http.HandleFunc("/", storeHandler)
http.HandleFunc("/status", statusHandler)
http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler)
http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler)
go func() {
for {

View File

@@ -116,8 +116,7 @@ func ReadNeedle(r *os.File) (*Needle, uint32) {
n.Id = util.BytesToUint64(bytes[4:12])
n.Size = util.BytesToUint32(bytes[12:16])
rest := 8 - ((n.Size + 16 + 4) % 8)
r.Seek(int64(n.Size+4+rest), 1)
return n, 16 + n.Size + 4 + rest
return n, n.Size + 4 + rest
}
func ParseKeyHash(key_hash_string string) (uint64, uint32) {
key_hash_bytes, khe := hex.DecodeString(key_hash_string)

View File

@@ -44,9 +44,11 @@ func LoadNeedleMap(file *os.File) *NeedleMap {
size := util.BytesToUint32(bytes[i+12 : i+16])
if offset > 0 {
nm.m.Set(Key(key), offset, size)
log.Println("reading key", key, "offset", offset, "size", size)
nm.fileCounter++
} else {
nm.m.Delete(Key(key))
log.Println("removing key", key)
nm.deletionCounter++
}
}

View File

@@ -0,0 +1,123 @@
package storage
import (
"errors"
)
type ReplicationType string
const (
Copy000 = ReplicationType("000") // single copy
Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
Copy100 = ReplicationType("100") // 2 copies, each on different data center
Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
LengthRelicationType = 6
CopyNil = ReplicationType(255) // nil value
)
func NewReplicationTypeFromString(t string) (ReplicationType, error) {
switch t {
case "000":
return Copy000, nil
case "001":
return Copy001, nil
case "010":
return Copy010, nil
case "100":
return Copy100, nil
case "110":
return Copy110, nil
case "200":
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:"+t)
}
func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
switch b {
case byte(000):
return Copy000, nil
case byte(001):
return Copy001, nil
case byte(010):
return Copy010, nil
case byte(100):
return Copy100, nil
case byte(110):
return Copy110, nil
case byte(200):
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:"+string(b))
}
func (r *ReplicationType) String() string {
switch *r {
case Copy000:
return "000"
case Copy001:
return "001"
case Copy010:
return "010"
case Copy100:
return "100"
case Copy110:
return "110"
case Copy200:
return "200"
}
return "000"
}
func (r *ReplicationType) Byte() byte {
switch *r {
case Copy000:
return byte(000)
case Copy001:
return byte(001)
case Copy010:
return byte(010)
case Copy100:
return byte(100)
case Copy110:
return byte(110)
case Copy200:
return byte(200)
}
return byte(000)
}
func (repType ReplicationType)GetReplicationLevelIndex() int {
switch repType {
case Copy000:
return 0
case Copy001:
return 1
case Copy010:
return 2
case Copy100:
return 3
case Copy110:
return 4
case Copy200:
return 5
}
return -1
}
func (repType ReplicationType)GetCopyCount() int {
switch repType {
case Copy000:
return 1
case Copy001:
return 2
case Copy010:
return 2
case Copy100:
return 2
case Copy110:
return 3
case Copy200:
return 3
}
return 0
}

View File

@@ -36,7 +36,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 {
id_string := range_string
id, err := strconv.ParseUint(id_string, 10, 64)
id, err := NewVolumeId(id_string)
if err != nil {
return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!")
}
@@ -68,6 +68,21 @@ func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
s.volumes[vid] = NewVolume(s.dir, vid, replicationType)
return nil
}
func (s *Store) CompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
}
return s.volumes[vid].compact()
}
func (s *Store) CommitCompactVolume(volumeIdString string) (int,error) {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
return 0, errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
}
return s.volumes[vid].commitCompact()
}
func (s *Store) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
for _, dir := range dirs {

View File

@@ -1,11 +1,11 @@
package storage
import (
"errors"
"log"
"os"
"path"
"sync"
"errors"
)
const (
@@ -21,29 +21,30 @@ type Volume struct {
replicaType ReplicationType
accessLock sync.Mutex
}
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) {
var e error
v = &Volume{dir: dirname, Id: id, replicaType: replicationType}
fileName := id.String()
v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
v.load()
return
}
func (v *Volume) load() {
var e error
fileName := path.Join(v.dir, v.Id.String())
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
if e != nil {
log.Fatalf("New Volume [ERROR] %s\n", e)
}
if replicationType == CopyNil {
if v.replicaType == CopyNil {
v.readSuperBlock()
} else {
v.maybeWriteSuperBlock()
}
indexFile, ie := os.OpenFile(path.Join(v.dir, fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644)
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
if ie != nil {
log.Fatalf("Write Volume Index [ERROR] %s\n", ie)
}
v.nm = LoadNeedleMap(indexFile)
return
}
func (v *Volume) Size() int64 {
stat, e := v.dataFile.Stat()
@@ -107,3 +108,75 @@ func (v *Volume) read(n *Needle) (int, error) {
}
return -1, errors.New("Not Found")
}
func (v *Volume) compact() error {
v.accessLock.Lock()
defer v.accessLock.Unlock()
filePath := path.Join(v.dir, v.Id.String())
return v.copyDataAndGenerateIndexFile(filePath+".dat", filePath+".cpd", filePath+".cpx")
}
func (v *Volume) commitCompact() (int, error) {
v.accessLock.Lock()
defer v.accessLock.Unlock()
v.dataFile.Close()
os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat"))
os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx"))
v.load()
return 0, nil
}
func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) (err error) {
src, err := os.OpenFile(srcName, os.O_RDONLY, 0644)
if err != nil {
return err
}
defer src.Close()
dst, err := os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
defer dst.Close()
idx, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
defer idx.Close()
src.Seek(0, 0)
header := make([]byte, SuperBlockSize)
if _, error := src.Read(header); error == nil {
dst.Write(header)
}
n, rest := ReadNeedle(src)
nm := NewNeedleMap(idx)
old_offset := uint32(SuperBlockSize)
new_offset := uint32(SuperBlockSize)
for n != nil {
nv, ok := v.nm.Get(n.Id)
//log.Println("file size is", n.Size, "rest", rest)
if !ok || nv.Offset*8 != old_offset {
log.Println("expected offset should be", nv.Offset*8, "skipping", (rest - 16), "key", n.Id, "volume offset", old_offset, "data_size", n.Size, "rest", rest)
src.Seek(int64(rest), 1)
} else {
if nv.Size > 0 {
nm.Put(n.Id, new_offset/8, n.Size)
bytes := make([]byte, n.Size+4)
src.Read(bytes)
n.Data = bytes[:n.Size]
n.Checksum = NewCRC(n.Data)
n.Append(dst)
new_offset += rest+16
log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
}
src.Seek(int64(rest-n.Size-4), 1)
}
old_offset += rest+16
n, rest = ReadNeedle(src)
}
return nil
}

View File

@@ -1,7 +1,6 @@
package storage
import (
"errors"
)
type VolumeInfo struct {
@@ -11,120 +10,3 @@ type VolumeInfo struct {
FileCount int
DeleteCount int
}
type ReplicationType string
const (
Copy000 = ReplicationType("000") // single copy
Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
Copy100 = ReplicationType("100") // 2 copies, each on different data center
Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
LengthRelicationType = 6
CopyNil = ReplicationType(255) // nil value
)
func NewReplicationTypeFromString(t string) (ReplicationType, error) {
switch t {
case "000":
return Copy000, nil
case "001":
return Copy001, nil
case "010":
return Copy010, nil
case "100":
return Copy100, nil
case "110":
return Copy110, nil
case "200":
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:"+t)
}
func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
switch b {
case byte(000):
return Copy000, nil
case byte(001):
return Copy001, nil
case byte(010):
return Copy010, nil
case byte(100):
return Copy100, nil
case byte(110):
return Copy110, nil
case byte(200):
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:"+string(b))
}
func (r *ReplicationType) String() string {
switch *r {
case Copy000:
return "000"
case Copy001:
return "001"
case Copy010:
return "010"
case Copy100:
return "100"
case Copy110:
return "110"
case Copy200:
return "200"
}
return "000"
}
func (r *ReplicationType) Byte() byte {
switch *r {
case Copy000:
return byte(000)
case Copy001:
return byte(001)
case Copy010:
return byte(010)
case Copy100:
return byte(100)
case Copy110:
return byte(110)
case Copy200:
return byte(200)
}
return byte(000)
}
func (repType ReplicationType)GetReplicationLevelIndex() int {
switch repType {
case Copy000:
return 0
case Copy001:
return 1
case Copy010:
return 2
case Copy100:
return 3
case Copy110:
return 4
case Copy200:
return 5
}
return -1
}
func (repType ReplicationType)GetCopyCount() int {
switch repType {
case Copy000:
return 1
case Copy001:
return 2
case Copy010:
return 2
case Copy100:
return 2
case Copy110:
return 3
case Copy200:
return 3
}
return 0
}

View File

@@ -143,49 +143,3 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
return dc
}
func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
m["Free"] = t.FreeSpace()
var dcs []interface{}
for _, c := range t.Children() {
dc := c.(*DataCenter)
dcs = append(dcs, dc.ToMap())
}
m["DataCenters"] = dcs
var layouts []interface{}
for _, layout := range t.replicaType2VolumeLayout {
if layout != nil {
layouts = append(layouts, layout.ToMap())
}
}
m["layouts"] = layouts
return m
}
func (t *Topology) ToVolumeMap() interface{} {
m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
m["Free"] = t.FreeSpace()
dcs := make(map[NodeId]interface{})
for _, c := range t.Children() {
dc := c.(*DataCenter)
racks := make(map[NodeId]interface{})
for _, r := range dc.Children() {
rack := r.(*Rack)
dataNodes := make(map[NodeId]interface{})
for _, d := range rack.Children() {
dn := d.(*DataNode)
var volumes []interface{}
for _, v := range dn.volumes {
volumes = append(volumes, v)
}
dataNodes[d.Id()] = volumes
}
racks[r.Id()] = dataNodes
}
dcs[dc.Id()] = racks
}
m["DataCenters"] = dcs
return m
}

View File

@@ -0,0 +1,87 @@
package topology
import (
"encoding/json"
"errors"
"fmt"
"net/url"
"pkg/storage"
"pkg/util"
"time"
)
func (t *Topology) Vacuum() int {
total_counter := 0
for _, vl := range t.replicaType2VolumeLayout {
if vl != nil {
for vid, locationlist := range vl.vid2location {
each_volume_counter := 0
vl.removeFromWritable(vid)
ch := make(chan int, locationlist.Length())
for _, dn := range locationlist.list {
go func(url string, vid storage.VolumeId) {
vacuumVolume_Compact(url, vid)
}(dn.Url(), vid)
}
for _ = range locationlist.list {
select {
case count := <-ch:
each_volume_counter += count
case <-time.After(30 * time.Minute):
each_volume_counter = 0
break
}
}
if each_volume_counter > 0 {
for _, dn := range locationlist.list {
if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
fmt.Println("Error when committing on", dn.Url(), e)
panic(e)
}
}
vl.setVolumeWritable(vid)
total_counter += each_volume_counter
}
}
}
}
return 0
}
type VacuumVolumeResult struct {
Bytes int
Error string
}
func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) (error, int) {
values := make(url.Values)
values.Add("volume", vid.String())
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values)
if err != nil {
return err, 0
}
var ret VacuumVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err, 0
}
if ret.Error != "" {
return errors.New(ret.Error), 0
}
return nil, ret.Bytes
}
func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
values := make(url.Values)
values.Add("volume", vid.String())
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values)
if err != nil {
return err
}
var ret VacuumVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err
}
if ret.Error != "" {
return errors.New(ret.Error)
}
return nil
}

View File

@@ -0,0 +1,51 @@
package topology
import (
)
func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
m["Free"] = t.FreeSpace()
var dcs []interface{}
for _, c := range t.Children() {
dc := c.(*DataCenter)
dcs = append(dcs, dc.ToMap())
}
m["DataCenters"] = dcs
var layouts []interface{}
for _, layout := range t.replicaType2VolumeLayout {
if layout != nil {
layouts = append(layouts, layout.ToMap())
}
}
m["layouts"] = layouts
return m
}
func (t *Topology) ToVolumeMap() interface{} {
m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
m["Free"] = t.FreeSpace()
dcs := make(map[NodeId]interface{})
for _, c := range t.Children() {
dc := c.(*DataCenter)
racks := make(map[NodeId]interface{})
for _, r := range dc.Children() {
rack := r.(*Rack)
dataNodes := make(map[NodeId]interface{})
for _, d := range rack.Children() {
dn := d.(*DataNode)
var volumes []interface{}
for _, v := range dn.volumes {
volumes = append(volumes, v)
}
dataNodes[d.Id()] = volumes
}
racks[r.Id()] = dataNodes
}
dcs[dc.Id()] = racks
}
m["DataCenters"] = dcs
return m
}

View File

@@ -27,6 +27,7 @@ func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
dnll.list = append(dnll.list, loc)
return true
}
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
for i, dnl := range dnll.list {
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {