scheduler.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package scheduler
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "io/ioutil"
  6. "log"
  7. "os"
  8. "os/exec"
  9. "path/filepath"
  10. "runtime"
  11. "strings"
  12. "time"
  13. "imuslab.com/arozos/mod/agi"
  14. "imuslab.com/arozos/mod/user"
  15. )
  16. /*
  17. ArozOS System Scheduler
  18. author: tobychui
  19. This module provide scheduling executable feature for ArozOS
  20. Some feature was migrated from the v1.113 aecron module
  21. */
  22. type Job struct {
  23. Name string //The name of this job
  24. Creator string //The creator of this job. When execute, this user permission will be used
  25. Description string //Job description, can be empty
  26. Admin bool //If the creator has admin permission during the creation of this job. If this doesn't match with the runtime instance, this job wille be skipped
  27. ExecutionInterval int64 //Execuation interval in seconds
  28. BaseTime int64 //Exeuction basetime. The next interval is calculated using (current time - base time ) % execution interval
  29. JobType string //Job type, accept {file/function}. If not set default to file
  30. FshID string
  31. ScriptFile string //The script file being called. Can be an agi script (.agi / .js) or shell script (.bat or .sh)
  32. ScriptFunc func() (string, error) `json:"-"` //The target function to execute
  33. }
  34. type Scheduler struct {
  35. jobs []*Job
  36. cronfile string
  37. userHandler *user.UserHandler
  38. gateway *agi.Gateway
  39. ticker chan bool
  40. }
  41. var (
  42. logFolder string = "./system/aecron/"
  43. )
  44. func NewScheduler(userHandler *user.UserHandler, gateway *agi.Gateway, cronfile string) (*Scheduler, error) {
  45. if !fileExists(cronfile) {
  46. //Cronfile not exists. Create it
  47. emptyJobList := []*Job{}
  48. ls, _ := json.Marshal(emptyJobList)
  49. err := ioutil.WriteFile(cronfile, ls, 0755)
  50. if err != nil {
  51. return nil, err
  52. }
  53. }
  54. //Load previous jobs from file
  55. jobs, err := loadJobsFromFile(cronfile)
  56. if err != nil {
  57. return nil, err
  58. }
  59. //Create the ArOZ Emulated Crontask
  60. thisScheduler := Scheduler{
  61. jobs: jobs,
  62. userHandler: userHandler,
  63. gateway: gateway,
  64. cronfile: cronfile,
  65. }
  66. //Create log folder
  67. os.MkdirAll(logFolder, 0755)
  68. //Start the cronjob at 1 minute ticker interval
  69. go func() {
  70. //Delay start: Wait until seconds = 0
  71. for time.Now().Unix()%60 > 0 {
  72. time.Sleep(500 * time.Millisecond)
  73. }
  74. stopChannel := thisScheduler.createTicker(1 * time.Minute)
  75. thisScheduler.ticker = stopChannel
  76. log.Println("ArozOS System Scheduler Started")
  77. }()
  78. //Return the crontask
  79. return &thisScheduler, nil
  80. }
  81. //Load a list of jobs from file
  82. func loadJobsFromFile(cronfile string) ([]*Job, error) {
  83. //Try to read the cronfile
  84. filecontent, err := ioutil.ReadFile(cronfile)
  85. if err != nil {
  86. return []*Job{}, err
  87. }
  88. //Phrase the cronfile
  89. prevousJobs := []Job{}
  90. err = json.Unmarshal(filecontent, &prevousJobs)
  91. if err != nil {
  92. return []*Job{}, err
  93. }
  94. //Convert the json objets to pointer for easy changing by other process
  95. jobsPointers := []*Job{}
  96. for _, thisJob := range prevousJobs {
  97. thisJob.JobType = "file"
  98. var newJobPointer Job = thisJob
  99. jobsPointers = append(jobsPointers, &newJobPointer)
  100. }
  101. return jobsPointers, nil
  102. }
  103. func (a *Scheduler) createTicker(duration time.Duration) chan bool {
  104. ticker := time.NewTicker(duration)
  105. stop := make(chan bool, 1)
  106. go func() {
  107. defer log.Println("Scheduler Stopped")
  108. for {
  109. select {
  110. case <-ticker.C:
  111. //Run jobs
  112. for _, thisJob := range a.jobs {
  113. if (time.Now().Unix()-thisJob.BaseTime)%thisJob.ExecutionInterval == 0 {
  114. //Execute this job
  115. if thisJob.JobType == "function" {
  116. //Execute the script function
  117. returnvalue, err := thisJob.ScriptFunc()
  118. if err != nil {
  119. //Execution error. Kill this scheule
  120. log.Println(`*Scheduler* Error occured when running task ` + thisJob.Name + ": " + err.Error())
  121. a.RemoveJobFromScheduleList(thisJob.Name)
  122. cronlog("[ERROR]: " + err.Error())
  123. }
  124. //Execution suceed. Log the return value
  125. if len(returnvalue) > 0 {
  126. cronlog(returnvalue)
  127. }
  128. } else {
  129. //This is requesting to execute a script file
  130. scriptFile := thisJob.ScriptFile
  131. if !fileExists(scriptFile) {
  132. //This job no longer exists in the file system. Remove it
  133. a.RemoveJobFromScheduleList(thisJob.Name)
  134. }
  135. clonedJobStructure := *thisJob
  136. ext := filepath.Ext(scriptFile)
  137. if ext == ".js" || ext == ".agi" {
  138. //Run using AGI interface in go routine
  139. go func(thisJob Job) {
  140. userinfo, err := a.userHandler.GetUserInfoFromUsername(thisJob.Creator)
  141. if err != nil {
  142. //This user not exists. Skip this script
  143. cronlog("[ERROR] User not exists: " + thisJob.Creator + ". Skipping scheduled job: " + thisJob.Name + ".")
  144. return
  145. }
  146. //Run the script with this user scope
  147. resp, err := a.gateway.ExecuteAGIScriptAsUser(thisJob.ScriptFile, userinfo, nil)
  148. if err != nil {
  149. cronlog("[ERROR] " + thisJob.Name + " " + err.Error())
  150. } else {
  151. cronlog(thisJob.Name + " " + resp)
  152. }
  153. }(clonedJobStructure)
  154. } else if ext == ".bat" || ext == ".sh" {
  155. //Run as shell script
  156. go func(thisJob Job) {
  157. scriptPath := thisJob.ScriptFile
  158. if runtime.GOOS == "windows" {
  159. scriptPath = strings.ReplaceAll(filepath.ToSlash(scriptPath), "/", "\\")
  160. }
  161. cmd := exec.Command(scriptPath)
  162. out, err := cmd.CombinedOutput()
  163. if err != nil {
  164. cronlog("[ERROR] " + thisJob.Name + " " + err.Error() + " => " + string(out))
  165. }
  166. cronlog(thisJob.Name + " " + string(out))
  167. }(clonedJobStructure)
  168. } else {
  169. //Unknown script file. Ignore this
  170. log.Println("This extension is not yet supported: ", ext)
  171. }
  172. }
  173. }
  174. }
  175. case <-stop:
  176. return
  177. }
  178. }
  179. }()
  180. return stop
  181. }
  182. func (a *Scheduler) Close() {
  183. if a.ticker != nil {
  184. //Stop the ticker
  185. a.ticker <- true
  186. }
  187. }
  188. //Add an job object to system scheduler
  189. func (a *Scheduler) AddJobToScheduler(job *Job) error {
  190. if job.JobType == "" {
  191. if job.ScriptFunc == nil && job.ScriptFile == "" {
  192. return errors.New("Invalid job file or function")
  193. }
  194. if job.ScriptFunc != nil {
  195. job.JobType = "function"
  196. } else if job.ScriptFile != "" {
  197. job.JobType = "file"
  198. }
  199. }
  200. a.jobs = append(a.jobs, job)
  201. return nil
  202. }
  203. //Create a new scheduled function job in the scheduler
  204. func (a *Scheduler) CreateNewScheduledFunctionJob(name string, desc string, executionInterval int64, targetFunction func() (string, error)) error {
  205. if name == "" || desc == "" {
  206. return errors.New("Name or description of a scheduled task cannot be empty")
  207. }
  208. if executionInterval < 60 {
  209. return errors.New("The minimum execution interval is 60 seconds.")
  210. }
  211. //Get the cloest minute
  212. baseTime := time.Now().Unix() - (time.Now().Unix() % 60)
  213. //Create a new scehduled job
  214. newJob := Job{
  215. Name: name,
  216. Creator: "system",
  217. Description: desc,
  218. Admin: true,
  219. ExecutionInterval: executionInterval,
  220. BaseTime: baseTime,
  221. JobType: "function",
  222. ScriptFunc: targetFunction,
  223. }
  224. //Add the new job to scheduler
  225. a.AddJobToScheduler(&newJob)
  226. return nil
  227. }
  228. func (a *Scheduler) GetScheduledJobByName(name string) *Job {
  229. for _, thisJob := range a.jobs {
  230. if thisJob.Name == name {
  231. return thisJob
  232. }
  233. }
  234. return nil
  235. }
  236. func (a *Scheduler) RemoveJobFromScheduleList(taskName string) {
  237. newJobSlice := []*Job{}
  238. for _, j := range a.jobs {
  239. if j.Name != taskName {
  240. thisJob := j
  241. newJobSlice = append(newJobSlice, thisJob)
  242. }
  243. }
  244. a.jobs = newJobSlice
  245. }
  246. func (a *Scheduler) JobExists(name string) bool {
  247. targetJob := a.GetScheduledJobByName(name)
  248. if targetJob == nil {
  249. return false
  250. } else {
  251. return true
  252. }
  253. }
  254. //Write the output to log file. Default to ./system/aecron/{date}.log
  255. func cronlog(message string) {
  256. currentTime := time.Now()
  257. timestamp := currentTime.Format("2006-01-02 15:04:05")
  258. message = timestamp + " " + message
  259. currentLogFile := filepath.ToSlash(filepath.Clean(logFolder)) + "/" + time.Now().Format("2006-02-01") + ".log"
  260. f, err := os.OpenFile(currentLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  261. if err != nil {
  262. //Unable to write to file. Log to STDOUT instead
  263. log.Println(message)
  264. return
  265. }
  266. if _, err := f.WriteString(message + "\n"); err != nil {
  267. log.Println(message)
  268. return
  269. }
  270. defer f.Close()
  271. }