Browse Source

Finalized buffer fs local buffer design

Toby Chui 3 years ago
parent
commit
22399d3076

+ 1 - 1
.gitignore

@@ -6,7 +6,7 @@
 #Testing Folders
 test/deviceA/*
 test/deviceB/*
-test/
+_test/
 tmp/*
 files/users*
 __debug_bin

+ 2 - 0
go.mod

@@ -26,6 +26,7 @@ require (
 	github.com/kevinburke/ssh_config v1.2.0 // indirect
 	github.com/klauspost/compress v1.15.4 // indirect
 	github.com/koron/go-ssdp v0.0.3
+	github.com/kr/pretty v0.3.0 // indirect
 	github.com/mholt/archiver/v3 v3.5.1
 	github.com/miekg/dns v1.1.49 // indirect
 	github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
@@ -34,6 +35,7 @@ require (
 	github.com/oov/psd v0.0.0-20220121172623-5db5eafcecbb
 	github.com/pierrec/lz4/v4 v4.1.14 // indirect
 	github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f
+	github.com/rogpeppe/go-internal v1.8.0 // indirect
 	github.com/satori/go.uuid v1.2.0
 	github.com/sergi/go-diff v1.2.0 // indirect
 	github.com/spf13/afero v1.8.2

+ 6 - 1
go.sum

@@ -310,8 +310,9 @@ github.com/koron/go-ssdp v0.0.3 h1:JivLMY45N76b4p/vsWGOKewBQu6uf39y8l+AQ7sDKx8=
 github.com/koron/go-ssdp v0.0.3/go.mod h1:b2MxI6yh02pKrsyNoQUsk4+YNikaGhe4894J+Q5lDvA=
 github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
-github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
 github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
+github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
+github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -359,6 +360,7 @@ github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
 github.com/pierrec/lz4/v4 v4.1.2/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
 github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
 github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -372,6 +374,9 @@ github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f h1:a7clxaGmmqtdN
 github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f/go.mod h1:/mK7FZ3mFYEn9zvNPhpngTyatyehSwte5bJZ4ehL5Xw=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
+github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
+github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
 github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
 github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc=
 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=

+ 5 - 0
main.flags.go

@@ -80,6 +80,11 @@ var file_opr_buff = flag.Int("iobuf", 1024, "Amount of buffer memory for IO oper
 var enable_dir_listing = flag.Bool("dir_list", true, "Enable directory listing")
 var enable_asyncFileUpload = flag.Bool("upload_async", false, "Enable file upload buffering to run in async mode (Faster upload, require RAM >= 8GB)")
 
+//Flags related to file system abstractions
+var bufferPoolSize = flag.Int("buffpool_size", 1024, "Maxmium buffer pool size (in MB) for buffer required file system abstractions")
+var bufferFileMaxSize = flag.Int("bufffile_size", 25, "Maxmium buffer file size (in MB) for buffer required file system abstractions")
+var enable_buffering = flag.Bool("enable_buffpool", true, "Enable buffer pool for buffer required file system abstractions")
+
 //Flags related to compatibility or testing
 var enable_beta_scanning_support = flag.Bool("beta_scan", false, "Allow compatibility to ArOZ Online Beta Clusters")
 var enable_console = flag.Bool("console", false, "Enable the debugging console.")

+ 128 - 15
mediaServer.go

@@ -1,14 +1,18 @@
 package main
 
 import (
+	"crypto/md5"
+	"encoding/hex"
 	"errors"
 	"io"
 	"log"
 	"net/http"
 	"net/url"
+	"os"
 	"path/filepath"
 	"strconv"
 	"strings"
+	"time"
 
 	"imuslab.com/arozos/mod/common"
 	"imuslab.com/arozos/mod/filesystem"
@@ -42,41 +46,41 @@ func mediaServer_init() {
 	http.HandleFunc("/media/download/", serverMedia)
 }
 
-//This function validate the incoming media request and return the real path for the targed file
-func media_server_validateSourceFile(w http.ResponseWriter, r *http.Request) (*filesystem.FileSystemHandler, string, error) {
+//This function validate the incoming media request and return fsh, vpath, rpath and err if any
+func media_server_validateSourceFile(w http.ResponseWriter, r *http.Request) (*filesystem.FileSystemHandler, string, string, error) {
 	username, err := authAgent.GetUserName(w, r)
 	if err != nil {
-		return nil, "", errors.New("User not logged in")
+		return nil, "", "", errors.New("User not logged in")
 	}
 
 	userinfo, _ := userHandler.GetUserInfoFromUsername(username)
 
 	//Validate url valid
 	if strings.Count(r.URL.String(), "?") > 1 {
-		return nil, "", errors.New("Invalid paramters. Multiple ? found")
+		return nil, "", "", errors.New("Invalid paramters. Multiple ? found")
 	}
 
 	targetfile, _ := common.Mv(r, "file", false)
 	targetfile, err = url.QueryUnescape(targetfile)
 	if err != nil {
-		return nil, "", err
+		return nil, "", "", err
 	}
 	if targetfile == "" {
-		return nil, "", errors.New("Missing paramter 'file'")
+		return nil, "", "", errors.New("Missing paramter 'file'")
 	}
 
 	//Translate the virtual directory to realpath
 	fsh, subpath, err := GetFSHandlerSubpathFromVpath(targetfile)
 	if err != nil {
-		return nil, "", errors.New("Unable to load from target file system")
+		return nil, "", "", errors.New("Unable to load from target file system")
 	}
 	fshAbs := fsh.FileSystemAbstraction
 	realFilepath, err := fshAbs.VirtualPathToRealPath(subpath, userinfo.Username)
 	if fshAbs.FileExists(realFilepath) && fshAbs.IsDir(realFilepath) {
-		return nil, "", errors.New("Given path is not a file")
+		return nil, "", "", errors.New("Given path is not a file")
 	}
 	if err != nil {
-		return nil, "", errors.New("Unable to translate the given filepath")
+		return nil, "", "", errors.New("Unable to translate the given filepath")
 	}
 
 	if !fshAbs.FileExists(realFilepath) {
@@ -100,22 +104,22 @@ func media_server_validateSourceFile(w http.ResponseWriter, r *http.Request) (*f
 		possibleRealpath, err := fshAbs.VirtualPathToRealPath(possibleVirtualFilePath, userinfo.Username)
 		if err != nil {
 			log.Println("Error when trying to serve file in compatibility mode", err.Error())
-			return nil, "", errors.New("Error when trying to serve file in compatibility mode")
+			return nil, "", "", errors.New("Error when trying to serve file in compatibility mode")
 		}
 		if fshAbs.FileExists(possibleRealpath) {
 			realFilepath = possibleRealpath
 			log.Println("[Media Server] Serving file " + filepath.Base(possibleRealpath) + " in compatibility mode. Do not to use '&' or '+' sign in filename! ")
-			return fsh, realFilepath, nil
+			return fsh, targetfile, realFilepath, nil
 		} else {
-			return nil, "", errors.New("File not exists")
+			return nil, "", "", errors.New("File not exists")
 		}
 	}
 
-	return fsh, realFilepath, nil
+	return fsh, targetfile, realFilepath, nil
 }
 
 func serveMediaMime(w http.ResponseWriter, r *http.Request) {
-	targetFsh, realFilepath, err := media_server_validateSourceFile(w, r)
+	targetFsh, _, realFilepath, err := media_server_validateSourceFile(w, r)
 	if err != nil {
 		common.SendErrorResponse(w, err.Error())
 		return
@@ -140,8 +144,9 @@ func serveMediaMime(w http.ResponseWriter, r *http.Request) {
 }
 
 func serverMedia(w http.ResponseWriter, r *http.Request) {
+	userinfo, _ := userHandler.GetUserInfoFromRequest(w, r)
 	//Serve normal media files
-	targetFsh, realFilepath, err := media_server_validateSourceFile(w, r)
+	targetFsh, vpath, realFilepath, err := media_server_validateSourceFile(w, r)
 	if err != nil {
 		common.SendErrorResponse(w, err.Error())
 		return
@@ -201,6 +206,26 @@ func serverMedia(w http.ResponseWriter, r *http.Request) {
 	} else {
 		if targetFsh.RequireBuffer || !filesystem.FileExists(realFilepath) {
 			w.Header().Set("Content-Length", strconv.Itoa(int(targetFshAbs.GetFileSize(realFilepath))))
+			//Check buffer exists
+			ps, _ := targetFsh.GetUniquePathHash(vpath, userinfo.Username)
+			buffpool := filepath.Join(*tmp_directory, "fsbuffpool")
+			buffFile := filepath.Join(buffpool, ps)
+			if fs.FileExists(buffFile) {
+				//Stream the buff file if hash matches
+				remoteFileHash, err := getHashFromRemoteFile(targetFsh.FileSystemAbstraction, realFilepath)
+				if err == nil {
+					localFileHash, err := os.ReadFile(buffFile + ".hash")
+					if err == nil {
+						if string(localFileHash) == remoteFileHash {
+							//Hash matches. Serve local buffered file
+							http.ServeFile(w, r, buffFile)
+							return
+						}
+					}
+				}
+
+			}
+
 			remoteStream, err := targetFshAbs.ReadStream(realFilepath)
 			if err != nil {
 				common.SendErrorResponse(w, err.Error())
@@ -208,6 +233,13 @@ func serverMedia(w http.ResponseWriter, r *http.Request) {
 			}
 			io.Copy(w, remoteStream)
 			remoteStream.Close()
+
+			if *enable_buffering {
+				os.MkdirAll(buffpool, 0775)
+				go func() {
+					BufferRemoteFileToTmp(buffFile, targetFsh, realFilepath)
+				}()
+			}
 		} else {
 			http.ServeFile(w, r, realFilepath)
 		}
@@ -215,3 +247,84 @@ func serverMedia(w http.ResponseWriter, r *http.Request) {
 	}
 
 }
+
+func BufferRemoteFileToTmp(buffFile string, fsh *filesystem.FileSystemHandler, rpath string) error {
+	if fs.FileExists(buffFile + ".download") {
+		return errors.New("another buffer process running")
+	}
+
+	//Generate a stat file for the buffer
+	hash, err := getHashFromRemoteFile(fsh.FileSystemAbstraction, rpath)
+	if err != nil {
+		//Do not buffer
+		return err
+	}
+	os.WriteFile(buffFile+".hash", []byte(hash), 0775)
+
+	//Buffer the file from remote to local
+	f, err := fsh.FileSystemAbstraction.ReadStream(rpath)
+	if err != nil {
+		os.Remove(buffFile + ".hash")
+		return err
+	}
+	defer f.Close()
+
+	dest, err := os.OpenFile(buffFile+".download", os.O_CREATE|os.O_WRONLY, 0775)
+	if err != nil {
+		os.Remove(buffFile + ".hash")
+		return err
+	}
+	defer dest.Close()
+
+	io.Copy(dest, f)
+	f.Close()
+	dest.Close()
+
+	os.Rename(buffFile+".download", buffFile)
+
+	//Clean the oldest buffpool item if size too large
+	dirsize, _ := fs.GetDirctorySize(filepath.Dir(buffFile), false)
+	oldestModtime := time.Now().Unix()
+	oldestFile := ""
+	for int(dirsize) > *bufferPoolSize<<20 {
+		//fmt.Println("CLEARNING BUFF", dirsize)
+		files, _ := filepath.Glob(filepath.ToSlash(filepath.Dir(buffFile)) + "/*")
+		for _, file := range files {
+			if filepath.Ext(file) == ".hash" {
+				continue
+			}
+			thisModTime, _ := fs.GetModTime(file)
+			if thisModTime < oldestModtime {
+				oldestModtime = thisModTime
+				oldestFile = file
+			}
+		}
+
+		os.Remove(oldestFile)
+		os.Remove(oldestFile + ".hash")
+
+		dirsize, _ = fs.GetDirctorySize(filepath.Dir(buffFile), false)
+		oldestModtime = time.Now().Unix()
+	}
+	return nil
+}
+
+func getHashFromRemoteFile(fshAbs filesystem.FileSystemAbstraction, rpath string) (string, error) {
+	filestat, err := fshAbs.Stat(rpath)
+	if err != nil {
+		//Always pull from remote
+		return "", err
+	}
+
+	if filestat.Size() >= int64(*bufferPoolSize<<20) {
+		return "", errors.New("Unable to buffer: file larger than buffpool size")
+	}
+
+	if filestat.Size() >= int64(*bufferFileMaxSize<<20) {
+		return "", errors.New("File larger than max buffer file size")
+	}
+
+	statHash := strconv.Itoa(int(filestat.ModTime().Unix() + filestat.Size()))
+	hash := md5.Sum([]byte(statHash))
+	return hex.EncodeToString(hash[:]), nil
+}

+ 1 - 12
mod/filesystem/abstractions/webdavfs/webdavfs.go

@@ -28,16 +28,10 @@ type WebDAVFileSystem struct {
 	Hierarchy string
 	root      string
 	user      string
-	tmp       string
 	c         *gowebdav.Client
 }
 
-type myFileType struct {
-	io.Reader
-	io.Closer
-}
-
-func NewWebDAVMount(UUID string, Hierarchy string, root string, user string, password string, tmpBuff string) (*WebDAVFileSystem, error) {
+func NewWebDAVMount(UUID string, Hierarchy string, root string, user string, password string) (*WebDAVFileSystem, error) {
 	//Connect to webdav server
 	c := gowebdav.NewClient(root, user, password)
 	err := c.Connect()
@@ -47,17 +41,12 @@ func NewWebDAVMount(UUID string, Hierarchy string, root string, user string, pas
 	} else {
 		log.Println("[WebDAV FS] Connected to remote: " + root)
 	}
-
-	//Create tmp buff folder if not exists
-	os.MkdirAll(tmpBuff, 0775)
-
 	return &WebDAVFileSystem{
 		UUID:      UUID,
 		Hierarchy: Hierarchy,
 		c:         c,
 		root:      root,
 		user:      user,
-		tmp:       tmpBuff,
 	}, nil
 }
 

+ 1 - 1
mod/filesystem/filesystem.go

@@ -176,7 +176,7 @@ func NewFileSystemHandler(option FileSystemOption) (*FileSystemHandler, error) {
 		user := option.Username
 		password := option.Password
 
-		webdavfs, err := webdavfs.NewWebDAVMount(option.Uuid, option.Hierarchy, root, user, password, "./tmp/webdavBuff")
+		webdavfs, err := webdavfs.NewWebDAVMount(option.Uuid, option.Hierarchy, root, user, password)
 		if err != nil {
 			return nil, err
 		}

+ 1 - 3
mod/storage/webdav/fshAdapter.go

@@ -14,14 +14,12 @@ import (
 type FshWebDAVAdapter struct {
 	fsh      *filesystem.FileSystemHandler
 	username string
-	buffDir  string //The folder to put buffer files
 }
 
-func NewFshWebDAVAdapter(fsh *filesystem.FileSystemHandler, username string, buffDir string) *FshWebDAVAdapter {
+func NewFshWebDAVAdapter(fsh *filesystem.FileSystemHandler, username string) *FshWebDAVAdapter {
 	return &FshWebDAVAdapter{
 		fsh,
 		username,
-		buffDir,
 	}
 }
 

+ 1 - 1
mod/storage/webdav/webdav.go

@@ -301,7 +301,7 @@ func (s *Server) serveReadOnlyWebDav(w http.ResponseWriter, r *http.Request) {
 
 func (s *Server) getFsFromRealRoot(fsh *filesystem.FileSystemHandler, username string, prefix string) *webdav.Handler {
 	//Create a webdav adapter from the fsh
-	fshadapter := NewFshWebDAVAdapter(fsh, username, "./tmp/webdavBuff")
+	fshadapter := NewFshWebDAVAdapter(fsh, username)
 	fs := &webdav.Handler{
 		Prefix:     prefix,
 		FileSystem: fshadapter,