scheduler.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package scheduler
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "io/ioutil"
  6. "log"
  7. "os"
  8. "os/exec"
  9. "path/filepath"
  10. "runtime"
  11. "strings"
  12. "time"
  13. "imuslab.com/arozos/mod/agi"
  14. "imuslab.com/arozos/mod/user"
  15. )
  16. /*
  17. ArozOS System Scheduler
  18. author: tobychui
  19. This module provide scheduling executable feature for ArozOS
  20. Some feature was migrated from the v1.113 aecron module
  21. */
  22. type Job struct {
  23. Name string //The name of this job
  24. Creator string //The creator of this job. When execute, this user permission will be used
  25. Description string //Job description, can be empty
  26. Admin bool //If the creator has admin permission during the creation of this job. If this doesn't match with the runtime instance, this job wille be skipped
  27. ExecutionInterval int64 //Execuation interval in seconds
  28. BaseTime int64 //Exeuction basetime. The next interval is calculated using (current time - base time ) % execution interval
  29. JobType string //Job type, accept {file/function}. If not set default to file
  30. ScriptFile string //The script file being called. Can be an agi script (.agi / .js) or shell script (.bat or .sh)
  31. ScriptFunc func() `json:"-"` //The target function to execute
  32. }
  33. type Scheduler struct {
  34. jobs []*Job
  35. cronfile string
  36. userHandler *user.UserHandler
  37. gateway *agi.Gateway
  38. ticker chan bool
  39. }
  40. var (
  41. logFolder string = "./system/aecron/"
  42. )
  43. func NewScheduler(userHandler *user.UserHandler, gateway *agi.Gateway, cronfile string) (*Scheduler, error) {
  44. if !fileExists(cronfile) {
  45. //Cronfile not exists. Create it
  46. emptyJobList := []*Job{}
  47. ls, _ := json.Marshal(emptyJobList)
  48. err := ioutil.WriteFile(cronfile, ls, 0755)
  49. if err != nil {
  50. return nil, err
  51. }
  52. }
  53. //Load previous jobs from file
  54. jobs, err := loadJobsFromFile(cronfile)
  55. if err != nil {
  56. return nil, err
  57. }
  58. //Create the ArOZ Emulated Crontask
  59. thisScheduler := Scheduler{
  60. jobs: jobs,
  61. userHandler: userHandler,
  62. gateway: gateway,
  63. cronfile: cronfile,
  64. }
  65. //Create log folder
  66. os.MkdirAll(logFolder, 0755)
  67. //Start the cronjob at 1 minute ticker interval
  68. go func() {
  69. //Delay start: Wait until seconds = 0
  70. for time.Now().Unix()%60 > 0 {
  71. time.Sleep(500 * time.Millisecond)
  72. }
  73. stopChannel := thisScheduler.createTicker(1 * time.Minute)
  74. thisScheduler.ticker = stopChannel
  75. log.Println("Emulated Crontab Started - Scheduling Tasks")
  76. }()
  77. //Return the crontask
  78. return &thisScheduler, nil
  79. }
  80. //Load a list of jobs from file
  81. func loadJobsFromFile(cronfile string) ([]*Job, error) {
  82. //Try to read the cronfile
  83. filecontent, err := ioutil.ReadFile(cronfile)
  84. if err != nil {
  85. return []*Job{}, err
  86. }
  87. //Phrase the cronfile
  88. prevousJobs := []Job{}
  89. err = json.Unmarshal(filecontent, &prevousJobs)
  90. if err != nil {
  91. return []*Job{}, err
  92. }
  93. //Convert the json objets to pointer for easy changing by other process
  94. jobsPointers := []*Job{}
  95. for _, thisJob := range prevousJobs {
  96. thisJob.JobType = "file"
  97. var newJobPointer Job = thisJob
  98. jobsPointers = append(jobsPointers, &newJobPointer)
  99. }
  100. return jobsPointers, nil
  101. }
  102. func (a *Scheduler) createTicker(duration time.Duration) chan bool {
  103. ticker := time.NewTicker(duration)
  104. stop := make(chan bool, 1)
  105. go func() {
  106. defer log.Println("Aecron Stopped")
  107. for {
  108. select {
  109. case <-ticker.C:
  110. //Run jobs
  111. for _, thisJob := range a.jobs {
  112. if (time.Now().Unix()-thisJob.BaseTime)%thisJob.ExecutionInterval == 0 {
  113. //Execute this job
  114. scriptFile := thisJob.ScriptFile
  115. if !fileExists(scriptFile) {
  116. //This job no longer exists in the file system. Remove it
  117. a.RemoveJobFromScheduleList(thisJob.Name)
  118. }
  119. clonedJobStructure := *thisJob
  120. ext := filepath.Ext(scriptFile)
  121. if ext == ".js" || ext == ".agi" {
  122. //Run using AGI interface in go routine
  123. go func(thisJob Job) {
  124. userinfo, err := a.userHandler.GetUserInfoFromUsername(thisJob.Creator)
  125. if err != nil {
  126. //This user not exists. Skip this script
  127. cronlog("[ERROR] User not exists: " + thisJob.Creator + ". Skipping scheduled job: " + thisJob.Name + ".")
  128. return
  129. }
  130. //Run the script with this user scope
  131. resp, err := a.gateway.ExecuteAGIScriptAsUser(thisJob.ScriptFile, userinfo)
  132. if err != nil {
  133. cronlog("[ERROR] " + thisJob.Name + " " + err.Error())
  134. } else {
  135. cronlog(thisJob.Name + " " + resp)
  136. }
  137. }(clonedJobStructure)
  138. } else if ext == ".bat" || ext == ".sh" {
  139. //Run as shell script
  140. go func(thisJob Job) {
  141. scriptPath := thisJob.ScriptFile
  142. if runtime.GOOS == "windows" {
  143. scriptPath = strings.ReplaceAll(filepath.ToSlash(scriptPath), "/", "\\")
  144. }
  145. cmd := exec.Command(scriptPath)
  146. out, err := cmd.CombinedOutput()
  147. if err != nil {
  148. cronlog("[ERROR] " + thisJob.Name + " " + err.Error() + " => " + string(out))
  149. }
  150. cronlog(thisJob.Name + " " + string(out))
  151. }(clonedJobStructure)
  152. } else {
  153. //Unknown script file. Ignore this
  154. log.Println("This extension is not yet supported: ", ext)
  155. }
  156. }
  157. }
  158. case <-stop:
  159. return
  160. }
  161. }
  162. }()
  163. return stop
  164. }
  165. func (a *Scheduler) Close() {
  166. if a.ticker != nil {
  167. //Stop the ticker
  168. a.ticker <- true
  169. }
  170. }
  171. //Add an job object to system scheduler
  172. func (a *Scheduler) AddJobToScheduler(job *Job) error {
  173. if job.JobType == "" {
  174. if job.ScriptFunc == nil && job.ScriptFile == "" {
  175. return errors.New("Invalid job file or function")
  176. }
  177. if job.ScriptFunc != nil {
  178. job.JobType = "function"
  179. } else if job.ScriptFile != "" {
  180. job.JobType = "file"
  181. }
  182. }
  183. a.jobs = append(a.jobs, job)
  184. return nil
  185. }
  186. //Create a new scheduled function job in the scheduler
  187. func (a *Scheduler) CreateNewScheduledFunctionJob(name string, desc string, executionInterval int64, targetFunction func()) error {
  188. if name == "" || desc == "" {
  189. return errors.New("Name or description of a scheduled task cannot be empty")
  190. }
  191. if executionInterval < 60 {
  192. return errors.New("The minimum execution interval is 60 seconds.")
  193. }
  194. //Create a new scehduled job
  195. newJob := Job{
  196. Name: name,
  197. Creator: "system",
  198. Description: desc,
  199. Admin: true,
  200. ExecutionInterval: executionInterval,
  201. BaseTime: time.Now().Unix(),
  202. JobType: "function",
  203. ScriptFunc: targetFunction,
  204. }
  205. //Add the new job to scheduler
  206. a.AddJobToScheduler(&newJob)
  207. return nil
  208. }
  209. func (a *Scheduler) GetScheduledJobByName(name string) *Job {
  210. for _, thisJob := range a.jobs {
  211. if thisJob.Name == name {
  212. return thisJob
  213. }
  214. }
  215. return nil
  216. }
  217. func (a *Scheduler) RemoveJobFromScheduleList(taskName string) {
  218. newJobSlice := []*Job{}
  219. for _, j := range a.jobs {
  220. if j.Name != taskName {
  221. thisJob := j
  222. newJobSlice = append(newJobSlice, thisJob)
  223. }
  224. }
  225. a.jobs = newJobSlice
  226. }
  227. func (a *Scheduler) JobExists(name string) bool {
  228. targetJob := a.GetScheduledJobByName(name)
  229. if targetJob == nil {
  230. return false
  231. } else {
  232. return true
  233. }
  234. }
  235. //Write the output to log file. Default to ./system/aecron/{date}.log
  236. func cronlog(message string) {
  237. currentTime := time.Now()
  238. timestamp := currentTime.Format("2006-01-02 15:04:05")
  239. message = timestamp + " " + message
  240. currentLogFile := filepath.ToSlash(filepath.Clean(logFolder)) + "/" + time.Now().Format("01-02-2006") + ".log"
  241. f, err := os.OpenFile(currentLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  242. if err != nil {
  243. //Unable to write to file. Log to STDOUT instead
  244. log.Println(message)
  245. return
  246. }
  247. if _, err := f.WriteString(message + "\n"); err != nil {
  248. log.Println(message)
  249. return
  250. }
  251. defer f.Close()
  252. }