Răsfoiți Sursa

Added storage heartbeat system

Toby Chui 2 ani în urmă
părinte
comite
8ba7e6f7cc

+ 4 - 0
mod/filesystem/abstractions/ftpfs/ftpfs.go

@@ -214,6 +214,10 @@ func (l FTPFSAbstraction) Walk(root string, walkFn filepath.WalkFunc) error {
 	return arozfs.ErrOperationNotSupported
 }
 
+func (l FTPFSAbstraction) Heartbeat() error {
+	return nil
+}
+
 //Utilities
 func filterFilepath(rawpath string) string {
 	rawpath = arozfs.ToSlash(filepath.Clean(strings.TrimSpace(rawpath)))

+ 4 - 0
mod/filesystem/abstractions/localfs/localfs.go

@@ -244,3 +244,7 @@ func (l LocalFileSystemAbstraction) ReadStream(filename string) (io.ReadCloser,
 func (l LocalFileSystemAbstraction) Walk(root string, walkFn filepath.WalkFunc) error {
 	return filepath.Walk(root, walkFn)
 }
+
+func (l LocalFileSystemAbstraction) Heartbeat() error {
+	return nil
+}

+ 11 - 46
mod/filesystem/abstractions/smbfs/smbfs.go

@@ -88,56 +88,12 @@ func NewServerMessageBlockFileSystemAbstraction(uuid string, hierarchy string, i
 				return
 			case <-ticker.C:
 				//Ping the smb server to make sure it knows we are alive
-				_, err := s.share.Stat("")
-				if err != nil {
-					//Disconnect the connection and re-establish connection
-					s.reconnect()
-
-				}
+				s.share.Stat("")
 			}
 		}
 	}(&fsAbstraction)
-	return fsAbstraction, nil
-}
-
-func (a *ServerMessageBlockFileSystemAbstraction) reconnect() error {
-	//Close existing connection
-	a.Close()
-
-	//Restart connection
-	nd := net.Dialer{Timeout: 10 * time.Second}
-	conn, err := nd.Dial("tcp", a.ipaddr)
-	if err != nil {
-		log.Println("[SMB-FS] Unable to connect to remote: ", err.Error())
-		return err
-	}
 
-	d := &smb2.Dialer{
-		Initiator: &smb2.NTLMInitiator{
-			User:     a.user,
-			Password: a.pass,
-		},
-	}
-
-	s, err := d.Dial(conn)
-	if err != nil {
-		log.Println("[SMB-FS] Unable to connect to remote: ", err.Error())
-		return err
-	}
-
-	//Mound remote storage
-	fs, err := s.Mount(a.root)
-	if err != nil {
-		log.Println("[SMB-FS] Unable to connect to remote: ", err.Error())
-		return err
-	}
-
-	a.conn = &conn
-	a.session = s
-	a.share = fs
-
-	fmt.Println("[SMB-FS] Connection restarted")
-	return nil
+	return fsAbstraction, nil
 }
 
 func (a ServerMessageBlockFileSystemAbstraction) Chmod(filename string, mode os.FileMode) error {
@@ -213,11 +169,15 @@ func (a ServerMessageBlockFileSystemAbstraction) Stat(filename string) (os.FileI
 	return a.share.Stat(filename)
 }
 func (a ServerMessageBlockFileSystemAbstraction) Close() error {
+	//Stop connection checker
 	a.tickerChan <- true
+
+	//Unmount the smb folder
 	a.share.Umount()
 	a.session.Logoff()
 	conn := *(a.conn)
 	conn.Close()
+
 	return nil
 }
 
@@ -373,6 +333,11 @@ func (a ServerMessageBlockFileSystemAbstraction) Walk(root string, walkFn filepa
 	return err
 }
 
+func (a ServerMessageBlockFileSystemAbstraction) Heartbeat() error {
+	_, err := a.share.Stat("")
+	return err
+}
+
 /*
 
 	Optional Functions

+ 5 - 0
mod/filesystem/abstractions/webdavfs/webdavfs.go

@@ -242,6 +242,11 @@ func (e WebDAVFileSystem) Close() error {
 	return nil
 }
 
+func (e WebDAVFileSystem) Heartbeat() error {
+	_, err := e.c.ReadDir("/")
+	return err
+}
+
 /*
 	Helper Functions
 */

+ 44 - 4
mod/filesystem/filesystem.go

@@ -83,6 +83,7 @@ type FileSystemAbstraction interface {
 	WriteStream(string, io.Reader, os.FileMode) error
 	ReadStream(string) (io.ReadCloser, error)
 	Walk(string, filepath.WalkFunc) error
+	Heartbeat() error
 }
 
 //System Handler for returing
@@ -99,6 +100,7 @@ type FileSystemHandler struct {
 	FilesystemDatabase    *db.Database
 	FileSystemAbstraction FileSystemAbstraction
 	Filesystem            string
+	StartOptions          FileSystemOption
 	Closed                bool
 }
 
@@ -171,6 +173,7 @@ func NewFileSystemHandler(option FileSystemOption) (*FileSystemHandler, error) {
 			FilesystemDatabase:    fsdb,
 			FileSystemAbstraction: localfs.NewLocalFileSystemAbstraction(option.Uuid, rootpath, option.Hierarchy, option.Access == arozfs.FsReadOnly),
 			Filesystem:            fstype,
+			StartOptions:          option,
 			Closed:                false,
 		}, nil
 
@@ -196,6 +199,7 @@ func NewFileSystemHandler(option FileSystemOption) (*FileSystemHandler, error) {
 			FilesystemDatabase:    nil,
 			FileSystemAbstraction: webdavfs,
 			Filesystem:            fstype,
+			StartOptions:          option,
 			Closed:                false,
 		}, nil
 	} else if fstype == "smb" {
@@ -208,12 +212,19 @@ func NewFileSystemHandler(option FileSystemOption) (*FileSystemHandler, error) {
 		rootShare := pathChunks[1]
 		user := option.Username
 		password := option.Password
-
-		smbfs, err := smbfs.NewServerMessageBlockFileSystemAbstraction(option.Uuid, option.Hierarchy, ipAddr, rootShare, user, password)
+		smbfs, err := smbfs.NewServerMessageBlockFileSystemAbstraction(
+			option.Uuid,
+			option.Hierarchy,
+			ipAddr,
+			rootShare,
+			user,
+			password,
+		)
 		if err != nil {
 			return nil, err
 		}
-		return &FileSystemHandler{
+
+		thisFsh := FileSystemHandler{
 			Name:                  option.Name,
 			UUID:                  option.Uuid,
 			Path:                  option.Path,
@@ -225,8 +236,11 @@ func NewFileSystemHandler(option FileSystemOption) (*FileSystemHandler, error) {
 			FilesystemDatabase:    nil,
 			FileSystemAbstraction: smbfs,
 			Filesystem:            fstype,
+			StartOptions:          option,
 			Closed:                false,
-		}, nil
+		}
+
+		return &thisFsh, nil
 	} else if fstype == "ftp" {
 
 		ftpfs, err := ftpfs.NewFTPFSAbstraction(option.Uuid, option.Hierarchy, option.Path, option.Username, option.Password)
@@ -245,6 +259,7 @@ func NewFileSystemHandler(option FileSystemOption) (*FileSystemHandler, error) {
 			FilesystemDatabase:    nil,
 			FileSystemAbstraction: ftpfs,
 			Filesystem:            fstype,
+			StartOptions:          option,
 			Closed:                false,
 		}, nil
 
@@ -382,6 +397,31 @@ func (fsh *FileSystemHandler) DeleteFileRecord(rpath string) error {
 	return nil
 }
 
+//Reload the target file system abstraction
+func (fsh *FileSystemHandler) ReloadFileSystelAbstraction() error {
+	log.Println("[File System] Reloading File System Abstraction for " + fsh.Name)
+	//Load the start option for this fsh
+	originalStartOption := fsh.StartOptions
+
+	//Close the file system handler
+	fsh.Close()
+
+	//Give it a few ms to do physical disk stuffs
+	time.Sleep(100 * time.Millisecond)
+
+	//Generate a new fsh from original start option
+	reloadedFsh, err := NewFileSystemHandler(originalStartOption)
+	if err != nil {
+		return err
+	}
+
+	//Overwrite the pointers to target fsa
+	fsh.FileSystemAbstraction = reloadedFsh.FileSystemAbstraction
+	fsh.FilesystemDatabase = reloadedFsh.FilesystemDatabase
+	fsh.Closed = false
+	return nil
+}
+
 //Close an openeded File System
 func (fsh *FileSystemHandler) Close() {
 	//Set the close flag to true so others function wont access it

+ 1 - 1
startup.go

@@ -111,7 +111,7 @@ func RunStartup() {
 	system_resetpw_init()
 	mediaServer_init()
 	security_init()
-	//backup_init()
+	storageHeartbeatTickerInit()
 	OAuthInit()        //Oauth system init
 	ldapInit()         //LDAP system init
 	notificationInit() //Notification system init

+ 33 - 0
storage.go

@@ -3,10 +3,12 @@ package main
 import (
 	"errors"
 	"io/ioutil"
+	"log"
 	"os"
 	"path/filepath"
 	"runtime"
 	"strings"
+	"time"
 
 	"imuslab.com/arozos/mod/filesystem/arozfs"
 	"imuslab.com/arozos/mod/permission"
@@ -114,6 +116,37 @@ func LoadBaseStoragePool() error {
 	return nil
 }
 
+//Initialize the storage connection health check for all fsh.
+func storageHeartbeatTickerInit() {
+	ticker := time.NewTicker(60 * time.Second)
+	done := make(chan bool)
+	go func() {
+		for {
+			select {
+			case <-done:
+				return
+			case <-ticker.C:
+				StoragePerformFileSystemAbstractionConnectionHeartbeat()
+			}
+		}
+	}()
+
+}
+
+//Perform heartbeat to all connected file system abstraction.
+//Blocking function, use with go routine if needed
+func StoragePerformFileSystemAbstractionConnectionHeartbeat() {
+	allFsh := GetAllLoadedFsh()
+	for _, thisFsh := range allFsh {
+		err := thisFsh.FileSystemAbstraction.Heartbeat()
+		if err != nil {
+			log.Println("[Storage] File System Abstraction from " + thisFsh.Name + " report an error: " + err.Error())
+			//Reload this storage pool abstraction
+			thisFsh.ReloadFileSystelAbstraction()
+		}
+	}
+}
+
 //Initialize group storage pool
 func GroupStoragePoolInit() {
 	//Mount permission groups