scheduler.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package scheduler
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "io/ioutil"
  6. "log"
  7. "path/filepath"
  8. "time"
  9. "imuslab.com/arozos/mod/agi"
  10. "imuslab.com/arozos/mod/common"
  11. "imuslab.com/arozos/mod/info/logger"
  12. "imuslab.com/arozos/mod/user"
  13. )
  14. /*
  15. ArozOS System Scheduler
  16. author: tobychui
  17. This module provide scheduling executable feature for ArozOS
  18. Some feature was migrated from the v1.113 aecron module
  19. */
  20. type Job struct {
  21. Name string //The name of this job
  22. Creator string //The creator of this job. When execute, this user permission will be used
  23. Description string //Job description, can be empty
  24. ExecutionInterval int64 //Execuation interval in seconds
  25. BaseTime int64 //Exeuction basetime. The next interval is calculated using (current time - base time ) % execution interval
  26. FshID string //The target FSH ID that this script file is stored
  27. ScriptVpath string //The agi script file being called, require Vpath
  28. lastExecutionTime int64 //Last time this job being executed
  29. lastExecutionOutput string //The output of last execution
  30. }
  31. type ScheudlerOption struct {
  32. UserHandler *user.UserHandler
  33. Gateway *agi.Gateway
  34. Logger *logger.Logger
  35. CronFile string //The location of the cronfile which store the jobs registry in file format
  36. }
  37. type Scheduler struct {
  38. jobs []*Job
  39. options *ScheudlerOption
  40. ticker chan bool
  41. }
  42. var ()
  43. func NewScheduler(option *ScheudlerOption) (*Scheduler, error) {
  44. if !common.FileExists(option.CronFile) {
  45. //Cronfile not exists. Create it
  46. emptyJobList := []*Job{}
  47. ls, _ := json.Marshal(emptyJobList)
  48. err := ioutil.WriteFile(option.CronFile, ls, 0755)
  49. if err != nil {
  50. return nil, err
  51. }
  52. }
  53. //Load previous jobs from file
  54. jobs, err := loadJobsFromFile(option.CronFile)
  55. if err != nil {
  56. return nil, err
  57. }
  58. //Create the ArOZ Emulated Crontask
  59. thisScheduler := Scheduler{
  60. jobs: jobs,
  61. options: option,
  62. }
  63. option.Logger.PrintAndLog("Scheduler", "Scheduler started", nil)
  64. //Start the cronjob at 1 minute ticker interval
  65. go func() {
  66. //Delay start: Wait until seconds = 0
  67. for time.Now().Unix()%60 > 0 {
  68. time.Sleep(500 * time.Millisecond)
  69. }
  70. stopChannel := thisScheduler.createTicker(1 * time.Minute)
  71. thisScheduler.ticker = stopChannel
  72. option.Logger.PrintAndLog("Scheduler", "ArozOS System Scheduler Started", nil)
  73. }()
  74. //Return the crontask
  75. return &thisScheduler, nil
  76. }
  77. func (a *Scheduler) createTicker(duration time.Duration) chan bool {
  78. ticker := time.NewTicker(duration)
  79. stop := make(chan bool, 1)
  80. go func() {
  81. defer log.Println("Scheduler Stopped")
  82. for {
  83. select {
  84. case <-ticker.C:
  85. //Run jobs
  86. for _, thisJob := range a.jobs {
  87. if (time.Now().Unix()-thisJob.BaseTime)%thisJob.ExecutionInterval == 0 {
  88. //Execute this job
  89. //Get the creator userinfo
  90. targetUser, err := a.options.UserHandler.GetUserInfoFromUsername(thisJob.Creator)
  91. if err != nil {
  92. a.cronlogError("User "+thisJob.Creator+" no longer exists", err)
  93. return
  94. }
  95. //Check if the script exists
  96. fsh, err := targetUser.GetFileSystemHandlerFromVirtualPath(thisJob.ScriptVpath)
  97. if err != nil {
  98. a.cronlogError("Unable to resolve required vpath for job: "+thisJob.Name+" for user "+thisJob.Creator, err)
  99. return
  100. }
  101. rpath, err := fsh.FileSystemAbstraction.VirtualPathToRealPath(thisJob.ScriptVpath, targetUser.Username)
  102. if err != nil {
  103. a.cronlogError("Unable to resolve file real path for job: "+thisJob.Name+" for user "+thisJob.Creator, err)
  104. return
  105. }
  106. if !fsh.FileSystemAbstraction.FileExists(rpath) {
  107. //This job no longer exists in the file system. Remove it
  108. a.cronlog("Removing job " + thisJob.Name + " by " + thisJob.Creator + " as job file no longer exists")
  109. a.RemoveJobFromScheduleList(thisJob.Name)
  110. return
  111. }
  112. clonedJobStructure := *thisJob
  113. ext := filepath.Ext(rpath)
  114. if ext == ".js" || ext == ".agi" {
  115. //Run using AGI interface in go routine
  116. go func(thisJob Job) {
  117. //Resolve the sript path to realpath
  118. //Run the script with this user scope
  119. thisJob.lastExecutionTime = time.Now().Unix()
  120. resp, err := a.options.Gateway.ExecuteAGIScriptAsUser(fsh, rpath, targetUser, nil)
  121. if err != nil {
  122. a.cronlogError(thisJob.Name+" execution error: "+err.Error(), err)
  123. thisJob.lastExecutionOutput = err.Error()
  124. } else {
  125. a.cronlog(thisJob.Name + " executed: " + resp)
  126. thisJob.lastExecutionOutput = resp
  127. }
  128. }(clonedJobStructure)
  129. } else {
  130. //Unknown script file. Ignore this
  131. a.cronlogError("This extension is not yet supported: "+ext, errors.New("unsupported AGI interface script extension"))
  132. }
  133. }
  134. }
  135. case <-stop:
  136. return
  137. }
  138. }
  139. }()
  140. return stop
  141. }
  142. func (a *Scheduler) Close() {
  143. if a.ticker != nil {
  144. //Stop the ticker
  145. a.ticker <- true
  146. }
  147. }
  148. //Add an job object to system scheduler
  149. func (a *Scheduler) AddJobToScheduler(job *Job) error {
  150. a.jobs = append(a.jobs, job)
  151. return nil
  152. }
  153. func (a *Scheduler) GetScheduledJobByName(name string) *Job {
  154. for _, thisJob := range a.jobs {
  155. if thisJob.Name == name {
  156. return thisJob
  157. }
  158. }
  159. return nil
  160. }
  161. func (a *Scheduler) RemoveJobFromScheduleList(taskName string) {
  162. newJobSlice := []*Job{}
  163. for _, j := range a.jobs {
  164. if j.Name != taskName {
  165. thisJob := j
  166. newJobSlice = append(newJobSlice, thisJob)
  167. }
  168. }
  169. a.jobs = newJobSlice
  170. }
  171. func (a *Scheduler) JobExists(name string) bool {
  172. targetJob := a.GetScheduledJobByName(name)
  173. if targetJob == nil {
  174. return false
  175. } else {
  176. return true
  177. }
  178. }
  179. //Write the output to log file. Default to ./system/aecron/{date}.log
  180. /*
  181. func cronlog(message string) {
  182. currentTime := time.Now()
  183. timestamp := currentTime.Format("2006-01-02 15:04:05")
  184. message = timestamp + " " + message
  185. currentLogFile := filepath.ToSlash(filepath.Clean(logFolder)) + "/" + time.Now().Format("2006-02-01") + ".log"
  186. f, err := os.OpenFile(currentLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  187. if err != nil {
  188. //Unable to write to file. Log to STDOUT instead
  189. log.Println(message)
  190. return
  191. }
  192. if _, err := f.WriteString(message + "\n"); err != nil {
  193. log.Println(message)
  194. return
  195. }
  196. defer f.Close()
  197. }
  198. */