add TTL support

The volume TTL and file TTL are not necessarily the same. as long as
file TTL is smaller than volume TTL, it'll be fine.

volume TTL is used when assigning file id, e.g.
http://.../dir/assign?ttl=3h

file TTL is used when uploading
This commit is contained in:
Chris Lu
2014-09-20 12:38:59 -07:00
parent a092794804
commit b9aee2defb
31 changed files with 416 additions and 105 deletions

View File

@@ -201,7 +201,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
start := time.Now()
fileSize := int64(*b.fileSize + rand.Intn(64))
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil {
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
if _, ok := serverLimitChan[fp.Server]; !ok {
serverLimitChan[fp.Server] = make(chan bool, 7)

View File

@@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
}
vid := storage.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil, nil)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}

View File

@@ -12,6 +12,7 @@ var (
uploadReplication *string
uploadCollection *string
uploadDir *string
uploadTtl *string
include *string
maxMB *int
)
@@ -24,6 +25,7 @@ func init() {
include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
uploadReplication = cmdUpload.Flag.String("replication", "", "replication type")
uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name")
uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
}
@@ -67,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB)
results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -84,7 +86,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB)
results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}

View File

@@ -99,14 +99,14 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r)
fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r)
if pe != nil {
writeJsonError(w, r, pe)
return
}
debug("assigning file id for", fname)
assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"))
assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl"))
if ae != nil {
writeJsonError(w, r, ae)
return

View File

@@ -109,7 +109,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection)
assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection, query.Get("ttl"))
if ae != nil {
glog.V(0).Infoln("failing to assign a file id", ae.Error())
writeJsonError(w, r, ae)
@@ -131,14 +131,14 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
resp, do_err := util.Do(request)
if do_err != nil {
glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
writeJsonError(w, r, do_err)
return
}
defer resp.Body.Close()
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
glog.V(0).Infoln("failing to upload to volume server", ra_err.Error())
glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
writeJsonError(w, r, ra_err)
return
}
@@ -146,12 +146,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
var ret operation.UploadResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.V(0).Infoln("failing to read upload resonse", string(resp_body))
glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
writeJsonError(w, r, unmarshal_err)
return
}
if ret.Error != "" {
glog.V(0).Infoln("failing to post to volume server", ret.Error)
glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
writeJsonError(w, r, errors.New(ret.Error))
return
}
@@ -169,7 +169,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
glog.V(0).Infoln("failing to write to filer server", db_err.Error())
glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error())
writeJsonError(w, r, db_err)
return
}

View File

@@ -55,7 +55,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
}
}
ms.Topo.RegisterVolumes(joinMessage)
ms.Topo.ProcessJoinMessage(joinMessage)
writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
}
@@ -144,7 +144,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *
}
func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
@@ -157,9 +157,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
ttl, err := storage.ReadTTL(r.FormValue("ttl"))
if err != nil {
return nil, err
}
volumeGrowOption := &topology.VolumeGrowOption{
Collection: r.FormValue("collection"),
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
DataNode: r.FormValue("dataNode"),

View File

@@ -16,7 +16,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"))
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl"))
if err == nil {
writeJsonQuiet(w, r, map[string]string{"error": ""})
} else {