|
@@ -13,6 +13,7 @@ import (
|
|
"net/http"
|
|
"net/http"
|
|
"net/url"
|
|
"net/url"
|
|
"os"
|
|
"os"
|
|
|
|
+ "sync"
|
|
|
|
|
|
"path/filepath"
|
|
"path/filepath"
|
|
"runtime"
|
|
"runtime"
|
|
@@ -46,6 +47,7 @@ var (
|
|
thumbRenderHandler *metadata.RenderHandler
|
|
thumbRenderHandler *metadata.RenderHandler
|
|
shareEntryTable *shareEntry.ShareEntryTable
|
|
shareEntryTable *shareEntry.ShareEntryTable
|
|
shareManager *share.Manager
|
|
shareManager *share.Manager
|
|
|
|
+ wsConnectionStore sync.Map
|
|
)
|
|
)
|
|
|
|
|
|
type trashedFile struct {
|
|
type trashedFile struct {
|
|
@@ -60,6 +62,16 @@ type trashedFile struct {
|
|
OriginalFilename string
|
|
OriginalFilename string
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+type fileOperationTask struct {
|
|
|
|
+ ID string //Unique id for the task operation
|
|
|
|
+ Owner string //Owner of the file opr
|
|
|
|
+ Src string //Source folder for opr
|
|
|
|
+ Dest string //Destination folder for opr
|
|
|
|
+ Progress float64 //Progress for the operation
|
|
|
|
+ LatestFile string //Latest file that is current transfering
|
|
|
|
+ FileOperationSignal int //Current control signal of the file opr
|
|
|
|
+}
|
|
|
|
+
|
|
func FileSystemInit() {
|
|
func FileSystemInit() {
|
|
router := prout.NewModuleRouter(prout.RouterOption{
|
|
router := prout.NewModuleRouter(prout.RouterOption{
|
|
ModuleName: "File Manager",
|
|
ModuleName: "File Manager",
|
|
@@ -195,6 +207,13 @@ func FileSystemInit() {
|
|
//Share function is now routed by the main router
|
|
//Share function is now routed by the main router
|
|
//http.HandleFunc("/share", shareManager.HandleShareAccess)
|
|
//http.HandleFunc("/share", shareManager.HandleShareAccess)
|
|
|
|
|
|
|
|
+ /*
|
|
|
|
+ File Operation Resume Functions
|
|
|
|
+ */
|
|
|
|
+ //Create a sync map for file operation opened connections
|
|
|
|
+ wsConnectionStore = sync.Map{}
|
|
|
|
+ router.HandleFunc("/system/file_system/ongoing", system_fs_HandleOnGoingTasks)
|
|
|
|
+
|
|
/*
|
|
/*
|
|
Nighly Tasks
|
|
Nighly Tasks
|
|
|
|
|
|
@@ -1372,7 +1391,7 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
if operation == "move" || operation == "copy" || operation == "zip" || operation == "unzip" {
|
|
if operation == "move" || operation == "copy" || operation == "zip" || operation == "unzip" {
|
|
|
|
|
|
} else {
|
|
} else {
|
|
- systemWideLogger.PrintAndLog("File System", "This file operation is not supported on WebSocket file operations endpoint. Please use the legacy endpoint instead. Received: "+operation, errors.New("operaiton not supported on websocket endpoint"))
|
|
|
|
|
|
+ systemWideLogger.PrintAndLog("File System", "This file operation is not supported on WebSocket file operations endpoint. Please use the POST request endpoint instead. Received: "+operation, errors.New("operaiton not supported on websocket endpoint"))
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte("500 - Not supported operation"))
|
|
w.Write([]byte("500 - Not supported operation"))
|
|
return
|
|
return
|
|
@@ -1389,9 +1408,26 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ //Create the file operation task and remember it
|
|
|
|
+ oprId := strconv.Itoa(int(time.Now().Unix())) + "_" + uuid.NewV4().String()
|
|
|
|
+ thisFileOperationTask := fileOperationTask{
|
|
|
|
+ ID: oprId,
|
|
|
|
+ Owner: userinfo.Username,
|
|
|
|
+ Src: arozfs.ToSlash(filepath.Dir(sourceFiles[0])),
|
|
|
|
+ Dest: arozfs.ToSlash(vdestFile),
|
|
|
|
+ Progress: 0.0,
|
|
|
|
+ LatestFile: arozfs.ToSlash(filepath.Base(sourceFiles[0])),
|
|
|
|
+ }
|
|
|
|
+ wsConnectionStore.Store(oprId, &thisFileOperationTask)
|
|
|
|
+
|
|
|
|
+ //Send over the oprId for this file operation for tracking
|
|
|
|
+ time.Sleep(300 * time.Millisecond)
|
|
|
|
+ c.WriteMessage(1, []byte("{\"oprid\":\""+oprId+"\"}"))
|
|
|
|
+
|
|
type ProgressUpdate struct {
|
|
type ProgressUpdate struct {
|
|
LatestFile string
|
|
LatestFile string
|
|
Progress int
|
|
Progress int
|
|
|
|
+ StatusFlag int
|
|
Error string
|
|
Error string
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1413,10 +1449,13 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
LatestFile: filepath.Base(vsrcs),
|
|
LatestFile: filepath.Base(vsrcs),
|
|
Progress: -1,
|
|
Progress: -1,
|
|
Error: "File not exists",
|
|
Error: "File not exists",
|
|
|
|
+ StatusFlag: filesystem.FsOpr_Error,
|
|
}
|
|
}
|
|
js, _ := json.Marshal(stopStatus)
|
|
js, _ := json.Marshal(stopStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
c.Close()
|
|
c.Close()
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
rsrc, err := thisSrcFsh.FileSystemAbstraction.VirtualPathToRealPath(subpath, userinfo.Username)
|
|
rsrc, err := thisSrcFsh.FileSystemAbstraction.VirtualPathToRealPath(subpath, userinfo.Username)
|
|
@@ -1425,10 +1464,13 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
LatestFile: filepath.Base(rsrc),
|
|
LatestFile: filepath.Base(rsrc),
|
|
Progress: -1,
|
|
Progress: -1,
|
|
Error: "File not exists",
|
|
Error: "File not exists",
|
|
|
|
+ StatusFlag: filesystem.FsOpr_Error,
|
|
}
|
|
}
|
|
js, _ := json.Marshal(stopStatus)
|
|
js, _ := json.Marshal(stopStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
c.Close()
|
|
c.Close()
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1444,15 +1486,18 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
}
|
|
}
|
|
|
|
|
|
//Create the zip file
|
|
//Create the zip file
|
|
- err = filesystem.ArozZipFileWithProgress(sourceFileFsh, realSourceFiles, zipDestFsh, zipDestPath, false, func(currentFilename string, _ int, _ int, progress float64) {
|
|
|
|
|
|
+ err = filesystem.ArozZipFileWithProgress(sourceFileFsh, realSourceFiles, zipDestFsh, zipDestPath, false, func(currentFilename string, _ int, _ int, progress float64) int {
|
|
|
|
+ sig, _ := UpdateOngoingFileOperation(oprId, currentFilename, math.Ceil(progress))
|
|
currentStatus := ProgressUpdate{
|
|
currentStatus := ProgressUpdate{
|
|
LatestFile: currentFilename,
|
|
LatestFile: currentFilename,
|
|
Progress: int(math.Ceil(progress)),
|
|
Progress: int(math.Ceil(progress)),
|
|
Error: "",
|
|
Error: "",
|
|
|
|
+ StatusFlag: sig,
|
|
}
|
|
}
|
|
|
|
|
|
js, _ := json.Marshal(currentStatus)
|
|
js, _ := json.Marshal(currentStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
|
|
+ return sig
|
|
})
|
|
})
|
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -1479,10 +1524,14 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
LatestFile: filepath.Base(vdestFile),
|
|
LatestFile: filepath.Base(vdestFile),
|
|
Progress: -1,
|
|
Progress: -1,
|
|
Error: "Access Denied: No Write Permission",
|
|
Error: "Access Denied: No Write Permission",
|
|
|
|
+ StatusFlag: filesystem.FsOpr_Error,
|
|
}
|
|
}
|
|
js, _ := json.Marshal(stopStatus)
|
|
js, _ := json.Marshal(stopStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
c.Close()
|
|
c.Close()
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
|
|
+ return
|
|
}
|
|
}
|
|
|
|
|
|
//Create the destination folder
|
|
//Create the destination folder
|
|
@@ -1497,10 +1546,14 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
LatestFile: filepath.Base(vsrcs),
|
|
LatestFile: filepath.Base(vsrcs),
|
|
Progress: -1,
|
|
Progress: -1,
|
|
Error: "File not exists",
|
|
Error: "File not exists",
|
|
|
|
+ StatusFlag: filesystem.FsOpr_Error,
|
|
}
|
|
}
|
|
js, _ := json.Marshal(stopStatus)
|
|
js, _ := json.Marshal(stopStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
c.Close()
|
|
c.Close()
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
|
|
+ return
|
|
}
|
|
}
|
|
thisSrcFshAbs := thisSrcFsh.FileSystemAbstraction
|
|
thisSrcFshAbs := thisSrcFsh.FileSystemAbstraction
|
|
rsrc, err := thisSrcFshAbs.VirtualPathToRealPath(subpath, userinfo.Username)
|
|
rsrc, err := thisSrcFshAbs.VirtualPathToRealPath(subpath, userinfo.Username)
|
|
@@ -1509,10 +1562,14 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
LatestFile: filepath.Base(rsrc),
|
|
LatestFile: filepath.Base(rsrc),
|
|
Progress: -1,
|
|
Progress: -1,
|
|
Error: "File not exists",
|
|
Error: "File not exists",
|
|
|
|
+ StatusFlag: filesystem.FsOpr_Error,
|
|
}
|
|
}
|
|
js, _ := json.Marshal(stopStatus)
|
|
js, _ := json.Marshal(stopStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
c.Close()
|
|
c.Close()
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
|
|
+ return
|
|
}
|
|
}
|
|
if thisSrcFsh.RequireBuffer {
|
|
if thisSrcFsh.RequireBuffer {
|
|
localBufferFilepath, err := bufferRemoteFileToLocal(thisSrcFsh, rsrc, false)
|
|
localBufferFilepath, err := bufferRemoteFileToLocal(thisSrcFsh, rsrc, false)
|
|
@@ -1521,10 +1578,14 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
LatestFile: filepath.Base(rsrc),
|
|
LatestFile: filepath.Base(rsrc),
|
|
Progress: -1,
|
|
Progress: -1,
|
|
Error: "Failed to buffer file to local disk",
|
|
Error: "Failed to buffer file to local disk",
|
|
|
|
+ StatusFlag: filesystem.FsOpr_Error,
|
|
}
|
|
}
|
|
js, _ := json.Marshal(stopStatus)
|
|
js, _ := json.Marshal(stopStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
c.Close()
|
|
c.Close()
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
|
|
+ return
|
|
}
|
|
}
|
|
realSourceFiles = append(realSourceFiles, localBufferFilepath)
|
|
realSourceFiles = append(realSourceFiles, localBufferFilepath)
|
|
} else {
|
|
} else {
|
|
@@ -1539,16 +1600,19 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
}
|
|
}
|
|
|
|
|
|
//Unzip the files
|
|
//Unzip the files
|
|
- filesystem.ArozUnzipFileWithProgress(realSourceFiles, unzipDest, func(currentFile string, filecount int, totalfile int, progress float64) {
|
|
|
|
|
|
+ filesystem.ArozUnzipFileWithProgress(realSourceFiles, unzipDest, func(currentFile string, filecount int, totalfile int, progress float64) int {
|
|
//Generate the status update struct
|
|
//Generate the status update struct
|
|
|
|
+ sig, _ := UpdateOngoingFileOperation(oprId, filepath.Base(currentFile), math.Ceil(progress))
|
|
currentStatus := ProgressUpdate{
|
|
currentStatus := ProgressUpdate{
|
|
LatestFile: filepath.Base(currentFile),
|
|
LatestFile: filepath.Base(currentFile),
|
|
Progress: int(math.Ceil(progress)),
|
|
Progress: int(math.Ceil(progress)),
|
|
Error: "",
|
|
Error: "",
|
|
|
|
+ StatusFlag: sig,
|
|
}
|
|
}
|
|
-
|
|
|
|
js, _ := json.Marshal(currentStatus)
|
|
js, _ := json.Marshal(currentStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
|
|
+
|
|
|
|
+ return sig
|
|
})
|
|
})
|
|
|
|
|
|
if destFsh.RequireBuffer {
|
|
if destFsh.RequireBuffer {
|
|
@@ -1574,6 +1638,8 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
} else {
|
|
} else {
|
|
//Other operations that allow multiple source files to handle one by one
|
|
//Other operations that allow multiple source files to handle one by one
|
|
for i := 0; i < len(sourceFiles); i++ {
|
|
for i := 0; i < len(sourceFiles); i++ {
|
|
|
|
+ //TODO: REMOVE DEBUG
|
|
|
|
+ time.Sleep(3 * time.Second)
|
|
vsrcFile := sourceFiles[i]
|
|
vsrcFile := sourceFiles[i]
|
|
thisSrcFsh, subpath, err := GetFSHandlerSubpathFromVpath(vsrcFile)
|
|
thisSrcFsh, subpath, err := GetFSHandlerSubpathFromVpath(vsrcFile)
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -1581,10 +1647,13 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
LatestFile: filepath.Base(vsrcFile),
|
|
LatestFile: filepath.Base(vsrcFile),
|
|
Progress: -1,
|
|
Progress: -1,
|
|
Error: "File not exists",
|
|
Error: "File not exists",
|
|
|
|
+ StatusFlag: filesystem.FsOpr_Error,
|
|
}
|
|
}
|
|
js, _ := json.Marshal(stopStatus)
|
|
js, _ := json.Marshal(stopStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
c.Close()
|
|
c.Close()
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
thisSrcFshAbs := thisSrcFsh.FileSystemAbstraction
|
|
thisSrcFshAbs := thisSrcFsh.FileSystemAbstraction
|
|
@@ -1596,28 +1665,34 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
LatestFile: filepath.Base(rsrcFile),
|
|
LatestFile: filepath.Base(rsrcFile),
|
|
Progress: -1,
|
|
Progress: -1,
|
|
Error: "File not exists",
|
|
Error: "File not exists",
|
|
|
|
+ StatusFlag: filesystem.FsOpr_Error,
|
|
}
|
|
}
|
|
js, _ := json.Marshal(stopStatus)
|
|
js, _ := json.Marshal(stopStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
c.Close()
|
|
c.Close()
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
if operation == "move" {
|
|
if operation == "move" {
|
|
- err := filesystem.FileMove(thisSrcFsh, rsrcFile, destFsh, rdestFile, existsOpr, true, func(progress int, currentFile string) {
|
|
|
|
|
|
+ err := filesystem.FileMove(thisSrcFsh, rsrcFile, destFsh, rdestFile, existsOpr, true, func(progress int, currentFile string) int {
|
|
//Multply child progress to parent progress
|
|
//Multply child progress to parent progress
|
|
blockRatio := float64(100) / float64(len(sourceFiles))
|
|
blockRatio := float64(100) / float64(len(sourceFiles))
|
|
overallRatio := blockRatio*float64(i) + blockRatio*(float64(progress)/float64(100))
|
|
overallRatio := blockRatio*float64(i) + blockRatio*(float64(progress)/float64(100))
|
|
|
|
|
|
//Construct return struct
|
|
//Construct return struct
|
|
|
|
+ sig, _ := UpdateOngoingFileOperation(oprId, filepath.Base(currentFile), math.Ceil(overallRatio))
|
|
currentStatus := ProgressUpdate{
|
|
currentStatus := ProgressUpdate{
|
|
LatestFile: filepath.Base(currentFile),
|
|
LatestFile: filepath.Base(currentFile),
|
|
Progress: int(overallRatio),
|
|
Progress: int(overallRatio),
|
|
Error: "",
|
|
Error: "",
|
|
|
|
+ StatusFlag: sig,
|
|
}
|
|
}
|
|
|
|
|
|
js, _ := json.Marshal(currentStatus)
|
|
js, _ := json.Marshal(currentStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
|
|
+ return sig
|
|
})
|
|
})
|
|
|
|
|
|
//Handle move starting error
|
|
//Handle move starting error
|
|
@@ -1626,10 +1701,13 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
LatestFile: filepath.Base(rsrcFile),
|
|
LatestFile: filepath.Base(rsrcFile),
|
|
Progress: -1,
|
|
Progress: -1,
|
|
Error: err.Error(),
|
|
Error: err.Error(),
|
|
|
|
+ StatusFlag: filesystem.FsOpr_Error,
|
|
}
|
|
}
|
|
js, _ := json.Marshal(stopStatus)
|
|
js, _ := json.Marshal(stopStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
c.Close()
|
|
c.Close()
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1637,20 +1715,22 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
metadata.RemoveCache(thisSrcFsh, rsrcFile)
|
|
metadata.RemoveCache(thisSrcFsh, rsrcFile)
|
|
|
|
|
|
} else if operation == "copy" {
|
|
} else if operation == "copy" {
|
|
- err := filesystem.FileCopy(thisSrcFsh, rsrcFile, destFsh, rdestFile, existsOpr, func(progress int, currentFile string) {
|
|
|
|
|
|
+ err := filesystem.FileCopy(thisSrcFsh, rsrcFile, destFsh, rdestFile, existsOpr, func(progress int, currentFile string) int {
|
|
//Multply child progress to parent progress
|
|
//Multply child progress to parent progress
|
|
blockRatio := float64(100) / float64(len(sourceFiles))
|
|
blockRatio := float64(100) / float64(len(sourceFiles))
|
|
overallRatio := blockRatio*float64(i) + blockRatio*(float64(progress)/float64(100))
|
|
overallRatio := blockRatio*float64(i) + blockRatio*(float64(progress)/float64(100))
|
|
|
|
|
|
//Construct return struct
|
|
//Construct return struct
|
|
|
|
+ sig, _ := UpdateOngoingFileOperation(oprId, filepath.Base(currentFile), math.Ceil(overallRatio))
|
|
currentStatus := ProgressUpdate{
|
|
currentStatus := ProgressUpdate{
|
|
LatestFile: filepath.Base(currentFile),
|
|
LatestFile: filepath.Base(currentFile),
|
|
Progress: int(overallRatio),
|
|
Progress: int(overallRatio),
|
|
Error: "",
|
|
Error: "",
|
|
|
|
+ StatusFlag: sig,
|
|
}
|
|
}
|
|
-
|
|
|
|
js, _ := json.Marshal(currentStatus)
|
|
js, _ := json.Marshal(currentStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
|
|
+ return sig
|
|
})
|
|
})
|
|
|
|
|
|
//Handle Copy starting error
|
|
//Handle Copy starting error
|
|
@@ -1659,16 +1739,23 @@ func system_fs_handleWebSocketOpr(w http.ResponseWriter, r *http.Request) {
|
|
LatestFile: filepath.Base(rsrcFile),
|
|
LatestFile: filepath.Base(rsrcFile),
|
|
Progress: -1,
|
|
Progress: -1,
|
|
Error: err.Error(),
|
|
Error: err.Error(),
|
|
|
|
+ StatusFlag: filesystem.FsOpr_Error,
|
|
}
|
|
}
|
|
js, _ := json.Marshal(stopStatus)
|
|
js, _ := json.Marshal(stopStatus)
|
|
c.WriteMessage(1, js)
|
|
c.WriteMessage(1, js)
|
|
c.Close()
|
|
c.Close()
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ //Remove the task from ongoing tasks list
|
|
|
|
+ //TODO: REMOVE DEBUG
|
|
|
|
+ wsConnectionStore.Delete(oprId)
|
|
|
|
+
|
|
//Close WebSocket connection after finished
|
|
//Close WebSocket connection after finished
|
|
time.Sleep(1 * time.Second)
|
|
time.Sleep(1 * time.Second)
|
|
c.WriteControl(8, []byte{}, time.Now().Add(time.Second))
|
|
c.WriteControl(8, []byte{}, time.Now().Add(time.Second))
|
|
@@ -3239,3 +3326,113 @@ func cleanFsBufferFileFromList(filelist []string) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+/*
|
|
|
|
+ File operation load and resume features
|
|
|
|
+*/
|
|
|
|
+
|
|
|
|
+// Handle all the on going task requests.
|
|
|
|
+// Accept parameter: flag={continue / pause / stop}
|
|
|
|
+func system_fs_HandleOnGoingTasks(w http.ResponseWriter, r *http.Request) {
|
|
|
|
+ //Get the user information
|
|
|
|
+ userinfo, err := userHandler.GetUserInfoFromRequest(w, r)
|
|
|
|
+ if err != nil {
|
|
|
|
+ utils.SendErrorResponse(w, "User not logged in")
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ statusFlag, _ := utils.PostPara(r, "flag")
|
|
|
|
+ oprid, _ := utils.PostPara(r, "oprid")
|
|
|
|
+
|
|
|
|
+ if statusFlag == "" {
|
|
|
|
+ //No flag defined. Print all operations
|
|
|
|
+ ongoingTasks := GetAllOngoingFileOperationForUser(userinfo.Username)
|
|
|
|
+ js, _ := json.Marshal(ongoingTasks)
|
|
|
|
+ utils.SendJSONResponse(w, string(js))
|
|
|
|
+ } else if statusFlag != "" {
|
|
|
|
+ if oprid == "" {
|
|
|
|
+ utils.SendErrorResponse(w, "oprid is empty or not set")
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //Get the operation record
|
|
|
|
+ oprRecord, err := GetOngoingFileOperationByOprID(oprid)
|
|
|
|
+ if err != nil {
|
|
|
|
+ utils.SendErrorResponse(w, err.Error())
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if statusFlag == "continue" {
|
|
|
|
+ //Continue the file operation
|
|
|
|
+ oprRecord.FileOperationSignal = filesystem.FsOpr_Continue
|
|
|
|
+ } else if statusFlag == "pause" {
|
|
|
|
+ //Pause the file operation until the flag is set to other status
|
|
|
|
+ oprRecord.FileOperationSignal = filesystem.FsOpr_Pause
|
|
|
|
+ } else if statusFlag == "cancel" {
|
|
|
|
+ //Cancel and stop the operation
|
|
|
|
+ oprRecord.FileOperationSignal = filesystem.FsOpr_Cancel
|
|
|
|
+ } else {
|
|
|
|
+ utils.SendErrorResponse(w, "unsupported operation")
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ SetOngoingFileOperation(oprRecord)
|
|
|
|
+
|
|
|
|
+ utils.SendOK(w)
|
|
|
|
+ } else if oprid != "" && statusFlag == "" {
|
|
|
|
+ //Get the operation record
|
|
|
|
+ oprRecord, err := GetOngoingFileOperationByOprID(oprid)
|
|
|
|
+ if err != nil {
|
|
|
|
+ utils.SendErrorResponse(w, err.Error())
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ js, _ := json.Marshal(oprRecord)
|
|
|
|
+ utils.SendJSONResponse(w, string(js))
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func GetAllOngoingFileOperationForUser(username string) []*fileOperationTask {
|
|
|
|
+ results := []*fileOperationTask{}
|
|
|
|
+ wsConnectionStore.Range(func(key, value interface{}) bool {
|
|
|
|
+ //oprid := key.(string)
|
|
|
|
+ taskInfo := value.(*fileOperationTask)
|
|
|
|
+ if taskInfo.Owner == username {
|
|
|
|
+ results = append(results, taskInfo)
|
|
|
|
+ }
|
|
|
|
+ return true
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ return results
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Get an ongoing task record
|
|
|
|
+func GetOngoingFileOperationByOprID(oprid string) (*fileOperationTask, error) {
|
|
|
|
+ object, ok := wsConnectionStore.Load(oprid)
|
|
|
|
+ if !ok {
|
|
|
|
+ return nil, errors.New("task not exists")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return object.(*fileOperationTask), nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Set or update an ongoing task record
|
|
|
|
+func SetOngoingFileOperation(opr *fileOperationTask) {
|
|
|
|
+ wsConnectionStore.Store(opr.ID, opr)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Update the status of an onging task record, return latest status code and error if any
|
|
|
|
+func UpdateOngoingFileOperation(oprid string, currentFile string, progress float64) (int, error) {
|
|
|
|
+ t, err := GetOngoingFileOperationByOprID(oprid)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return 0, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ t.LatestFile = currentFile
|
|
|
|
+ t.Progress = progress
|
|
|
|
+
|
|
|
|
+ SetOngoingFileOperation(t)
|
|
|
|
+ return t.FileOperationSignal, nil
|
|
|
|
+}
|