scheduler.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package scheduler
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "io/ioutil"
  6. "log"
  7. "os"
  8. "os/exec"
  9. "path/filepath"
  10. "runtime"
  11. "strings"
  12. "time"
  13. "imuslab.com/arozos/mod/agi"
  14. "imuslab.com/arozos/mod/user"
  15. )
  16. /*
  17. ArozOS System Scheduler
  18. author: tobychui
  19. This module provide scheduling executable feature for ArozOS
  20. Some feature was migrated from the v1.113 aecron module
  21. */
  22. type Job struct {
  23. Name string //The name of this job
  24. Creator string //The creator of this job. When execute, this user permission will be used
  25. Description string //Job description, can be empty
  26. Admin bool //If the creator has admin permission during the creation of this job. If this doesn't match with the runtime instance, this job wille be skipped
  27. ExecutionInterval int64 //Execuation interval in seconds
  28. BaseTime int64 //Exeuction basetime. The next interval is calculated using (current time - base time ) % execution interval
  29. JobType string //Job type, accept {file/function}. If not set default to file
  30. ScriptFile string //The script file being called. Can be an agi script (.agi / .js) or shell script (.bat or .sh)
  31. ScriptFunc func() (string, error) `json:"-"` //The target function to execute
  32. }
  33. type Scheduler struct {
  34. jobs []*Job
  35. cronfile string
  36. userHandler *user.UserHandler
  37. gateway *agi.Gateway
  38. ticker chan bool
  39. }
  40. var (
  41. logFolder string = "./system/aecron/"
  42. )
  43. func NewScheduler(userHandler *user.UserHandler, gateway *agi.Gateway, cronfile string) (*Scheduler, error) {
  44. if !fileExists(cronfile) {
  45. //Cronfile not exists. Create it
  46. emptyJobList := []*Job{}
  47. ls, _ := json.Marshal(emptyJobList)
  48. err := ioutil.WriteFile(cronfile, ls, 0755)
  49. if err != nil {
  50. return nil, err
  51. }
  52. }
  53. //Load previous jobs from file
  54. jobs, err := loadJobsFromFile(cronfile)
  55. if err != nil {
  56. return nil, err
  57. }
  58. //Create the ArOZ Emulated Crontask
  59. thisScheduler := Scheduler{
  60. jobs: jobs,
  61. userHandler: userHandler,
  62. gateway: gateway,
  63. cronfile: cronfile,
  64. }
  65. //Create log folder
  66. os.MkdirAll(logFolder, 0755)
  67. //Start the cronjob at 1 minute ticker interval
  68. go func() {
  69. //Delay start: Wait until seconds = 0
  70. for time.Now().Unix()%60 > 0 {
  71. time.Sleep(500 * time.Millisecond)
  72. }
  73. stopChannel := thisScheduler.createTicker(1 * time.Minute)
  74. thisScheduler.ticker = stopChannel
  75. log.Println("Emulated Crontab Started - Scheduling Tasks")
  76. }()
  77. //Return the crontask
  78. return &thisScheduler, nil
  79. }
  80. //Load a list of jobs from file
  81. func loadJobsFromFile(cronfile string) ([]*Job, error) {
  82. //Try to read the cronfile
  83. filecontent, err := ioutil.ReadFile(cronfile)
  84. if err != nil {
  85. return []*Job{}, err
  86. }
  87. //Phrase the cronfile
  88. prevousJobs := []Job{}
  89. err = json.Unmarshal(filecontent, &prevousJobs)
  90. if err != nil {
  91. return []*Job{}, err
  92. }
  93. //Convert the json objets to pointer for easy changing by other process
  94. jobsPointers := []*Job{}
  95. for _, thisJob := range prevousJobs {
  96. thisJob.JobType = "file"
  97. var newJobPointer Job = thisJob
  98. jobsPointers = append(jobsPointers, &newJobPointer)
  99. }
  100. return jobsPointers, nil
  101. }
  102. func (a *Scheduler) createTicker(duration time.Duration) chan bool {
  103. ticker := time.NewTicker(duration)
  104. stop := make(chan bool, 1)
  105. go func() {
  106. defer log.Println("Scheduler Stopped")
  107. for {
  108. select {
  109. case <-ticker.C:
  110. //Run jobs
  111. for _, thisJob := range a.jobs {
  112. if (time.Now().Unix()-thisJob.BaseTime)%thisJob.ExecutionInterval == 0 {
  113. //Execute this job
  114. if thisJob.JobType == "function" {
  115. //Execute the script function
  116. returnvalue, err := thisJob.ScriptFunc()
  117. if err != nil {
  118. //Execution error. Kill this scheule
  119. log.Println(`*Scheduler* Error occured when running task ` + thisJob.Name + ": " + err.Error())
  120. a.RemoveJobFromScheduleList(thisJob.Name)
  121. cronlog("[ERROR]: " + err.Error())
  122. }
  123. //Execution suceed. Log the return value
  124. if len(returnvalue) > 0 {
  125. cronlog(returnvalue)
  126. }
  127. } else {
  128. //This is requesting to execute a script file
  129. scriptFile := thisJob.ScriptFile
  130. if !fileExists(scriptFile) {
  131. //This job no longer exists in the file system. Remove it
  132. a.RemoveJobFromScheduleList(thisJob.Name)
  133. }
  134. clonedJobStructure := *thisJob
  135. ext := filepath.Ext(scriptFile)
  136. if ext == ".js" || ext == ".agi" {
  137. //Run using AGI interface in go routine
  138. go func(thisJob Job) {
  139. userinfo, err := a.userHandler.GetUserInfoFromUsername(thisJob.Creator)
  140. if err != nil {
  141. //This user not exists. Skip this script
  142. cronlog("[ERROR] User not exists: " + thisJob.Creator + ". Skipping scheduled job: " + thisJob.Name + ".")
  143. return
  144. }
  145. //Run the script with this user scope
  146. resp, err := a.gateway.ExecuteAGIScriptAsUser(thisJob.ScriptFile, userinfo)
  147. if err != nil {
  148. cronlog("[ERROR] " + thisJob.Name + " " + err.Error())
  149. } else {
  150. cronlog(thisJob.Name + " " + resp)
  151. }
  152. }(clonedJobStructure)
  153. } else if ext == ".bat" || ext == ".sh" {
  154. //Run as shell script
  155. go func(thisJob Job) {
  156. scriptPath := thisJob.ScriptFile
  157. if runtime.GOOS == "windows" {
  158. scriptPath = strings.ReplaceAll(filepath.ToSlash(scriptPath), "/", "\\")
  159. }
  160. cmd := exec.Command(scriptPath)
  161. out, err := cmd.CombinedOutput()
  162. if err != nil {
  163. cronlog("[ERROR] " + thisJob.Name + " " + err.Error() + " => " + string(out))
  164. }
  165. cronlog(thisJob.Name + " " + string(out))
  166. }(clonedJobStructure)
  167. } else {
  168. //Unknown script file. Ignore this
  169. log.Println("This extension is not yet supported: ", ext)
  170. }
  171. }
  172. }
  173. }
  174. case <-stop:
  175. return
  176. }
  177. }
  178. }()
  179. return stop
  180. }
  181. func (a *Scheduler) Close() {
  182. if a.ticker != nil {
  183. //Stop the ticker
  184. a.ticker <- true
  185. }
  186. }
  187. //Add an job object to system scheduler
  188. func (a *Scheduler) AddJobToScheduler(job *Job) error {
  189. if job.JobType == "" {
  190. if job.ScriptFunc == nil && job.ScriptFile == "" {
  191. return errors.New("Invalid job file or function")
  192. }
  193. if job.ScriptFunc != nil {
  194. job.JobType = "function"
  195. } else if job.ScriptFile != "" {
  196. job.JobType = "file"
  197. }
  198. }
  199. a.jobs = append(a.jobs, job)
  200. return nil
  201. }
  202. //Create a new scheduled function job in the scheduler
  203. func (a *Scheduler) CreateNewScheduledFunctionJob(name string, desc string, executionInterval int64, targetFunction func() (string, error)) error {
  204. if name == "" || desc == "" {
  205. return errors.New("Name or description of a scheduled task cannot be empty")
  206. }
  207. if executionInterval < 60 {
  208. return errors.New("The minimum execution interval is 60 seconds.")
  209. }
  210. //Get the cloest minute
  211. baseTime := time.Now().Unix() - (time.Now().Unix() % 60)
  212. //Create a new scehduled job
  213. newJob := Job{
  214. Name: name,
  215. Creator: "system",
  216. Description: desc,
  217. Admin: true,
  218. ExecutionInterval: executionInterval,
  219. BaseTime: baseTime,
  220. JobType: "function",
  221. ScriptFunc: targetFunction,
  222. }
  223. //Add the new job to scheduler
  224. a.AddJobToScheduler(&newJob)
  225. return nil
  226. }
  227. func (a *Scheduler) GetScheduledJobByName(name string) *Job {
  228. for _, thisJob := range a.jobs {
  229. if thisJob.Name == name {
  230. return thisJob
  231. }
  232. }
  233. return nil
  234. }
  235. func (a *Scheduler) RemoveJobFromScheduleList(taskName string) {
  236. newJobSlice := []*Job{}
  237. for _, j := range a.jobs {
  238. if j.Name != taskName {
  239. thisJob := j
  240. newJobSlice = append(newJobSlice, thisJob)
  241. }
  242. }
  243. a.jobs = newJobSlice
  244. }
  245. func (a *Scheduler) JobExists(name string) bool {
  246. targetJob := a.GetScheduledJobByName(name)
  247. if targetJob == nil {
  248. return false
  249. } else {
  250. return true
  251. }
  252. }
  253. //Write the output to log file. Default to ./system/aecron/{date}.log
  254. func cronlog(message string) {
  255. currentTime := time.Now()
  256. timestamp := currentTime.Format("2006-01-02 15:04:05")
  257. message = timestamp + " " + message
  258. currentLogFile := filepath.ToSlash(filepath.Clean(logFolder)) + "/" + time.Now().Format("01-02-2006") + ".log"
  259. f, err := os.OpenFile(currentLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  260. if err != nil {
  261. //Unable to write to file. Log to STDOUT instead
  262. log.Println(message)
  263. return
  264. }
  265. if _, err := f.WriteString(message + "\n"); err != nil {
  266. log.Println(message)
  267. return
  268. }
  269. defer f.Close()
  270. }