scheduler.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package scheduler
  2. import (
  3. "encoding/json"
  4. "io/ioutil"
  5. "log"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "runtime"
  10. "strings"
  11. "time"
  12. "imuslab.com/arozos/mod/agi"
  13. "imuslab.com/arozos/mod/user"
  14. )
  15. /*
  16. ArOZ Emulated Crontab
  17. author: tobychui
  18. This is not actually a crontab but something similar that provide
  19. timered operations for executing commands in agi or bash in an interval
  20. bases
  21. */
  22. type Job struct {
  23. Name string //The name of this job
  24. Creator string //The creator of this job. When execute, this user permission will be used
  25. Description string //Job description, can be empty
  26. Admin bool //If the creator has admin permission during the creation of this job. If this doesn't match with the runtime instance, this job wille be skipped
  27. ExecutionInterval int64 //Execuation interval in seconds
  28. BaseTime int64 //Exeuction basetime. The next interval is calculated using (current time - base time ) % execution interval
  29. ScriptFile string //The script file being called. Can be an agi script (.agi / .js) or shell script (.bat or .sh)
  30. }
  31. type Scheduler struct {
  32. jobs []*Job
  33. cronfile string
  34. userHandler *user.UserHandler
  35. gateway *agi.Gateway
  36. ticker chan bool
  37. }
  38. var (
  39. logFolder string = "./system/aecron/"
  40. )
  41. func NewScheduler(userHandler *user.UserHandler, gateway *agi.Gateway, cronfile string) (*Scheduler, error) {
  42. if !fileExists(cronfile) {
  43. //Cronfile not exists. Create it
  44. emptyJobList := []*Job{}
  45. ls, _ := json.Marshal(emptyJobList)
  46. err := ioutil.WriteFile(cronfile, ls, 0755)
  47. if err != nil {
  48. return nil, err
  49. }
  50. }
  51. //Load previous jobs from file
  52. jobs, err := loadJobsFromFile(cronfile)
  53. if err != nil {
  54. return nil, err
  55. }
  56. //Create the ArOZ Emulated Crontask
  57. aecron := Scheduler{
  58. jobs: jobs,
  59. userHandler: userHandler,
  60. gateway: gateway,
  61. cronfile: cronfile,
  62. }
  63. //Create log folder
  64. os.MkdirAll(logFolder, 0755)
  65. //Start the cronjob at 1 minute ticker interval
  66. go func() {
  67. //Delay start: Wait until seconds = 0
  68. for time.Now().Unix()%60 > 0 {
  69. time.Sleep(500 * time.Millisecond)
  70. }
  71. stopChannel := aecron.createTicker(1 * time.Minute)
  72. aecron.ticker = stopChannel
  73. log.Println("Emulated Crontab Started - Scheduling Tasks")
  74. }()
  75. //Return the crontask
  76. return &aecron, nil
  77. }
  78. //Load a list of jobs from file
  79. func loadJobsFromFile(cronfile string) ([]*Job, error) {
  80. //Try to read the cronfile
  81. filecontent, err := ioutil.ReadFile(cronfile)
  82. if err != nil {
  83. return []*Job{}, err
  84. }
  85. //Phrase the cronfile
  86. prevousJobs := []Job{}
  87. err = json.Unmarshal(filecontent, &prevousJobs)
  88. if err != nil {
  89. return []*Job{}, err
  90. }
  91. //Convert the json objets to pointer for easy changing by other process
  92. jobsPointers := []*Job{}
  93. for _, thisJob := range prevousJobs {
  94. var newJobPointer Job = thisJob
  95. jobsPointers = append(jobsPointers, &newJobPointer)
  96. }
  97. return jobsPointers, nil
  98. }
  99. func (a *Scheduler) createTicker(duration time.Duration) chan bool {
  100. ticker := time.NewTicker(duration)
  101. stop := make(chan bool, 1)
  102. go func() {
  103. defer log.Println("Aecron Stopped")
  104. for {
  105. select {
  106. case <-ticker.C:
  107. //Run jobs
  108. for _, thisJob := range a.jobs {
  109. if (time.Now().Unix()-thisJob.BaseTime)%thisJob.ExecutionInterval == 0 {
  110. //Execute this job
  111. scriptFile := thisJob.ScriptFile
  112. if !fileExists(scriptFile) {
  113. //This job no longer exists in the file system. Remove it
  114. a.RemoveJobFromScheduleList(thisJob.Name)
  115. }
  116. clonedJobStructure := *thisJob
  117. ext := filepath.Ext(scriptFile)
  118. if ext == ".js" || ext == ".agi" {
  119. //Run using AGI interface in go routine
  120. go func(thisJob Job) {
  121. userinfo, err := a.userHandler.GetUserInfoFromUsername(thisJob.Creator)
  122. if err != nil {
  123. //This user not exists. Skip this script
  124. cronlog("[ERROR] User not exists: " + thisJob.Creator + ". Skipping scheduled job: " + thisJob.Name + ".")
  125. return
  126. }
  127. //Run the script with this user scope
  128. resp, err := a.gateway.ExecuteAGIScriptAsUser(thisJob.ScriptFile, userinfo)
  129. if err != nil {
  130. cronlog("[ERROR] " + thisJob.Name + " " + err.Error())
  131. } else {
  132. cronlog(thisJob.Name + " " + resp)
  133. }
  134. }(clonedJobStructure)
  135. } else if ext == ".bat" || ext == ".sh" {
  136. //Run as shell script
  137. go func(thisJob Job) {
  138. scriptPath := thisJob.ScriptFile
  139. if runtime.GOOS == "windows" {
  140. scriptPath = strings.ReplaceAll(filepath.ToSlash(scriptPath), "/", "\\")
  141. }
  142. cmd := exec.Command(scriptPath)
  143. out, err := cmd.CombinedOutput()
  144. if err != nil {
  145. cronlog("[ERROR] " + thisJob.Name + " " + err.Error() + " => " + string(out))
  146. }
  147. cronlog(thisJob.Name + " " + string(out))
  148. }(clonedJobStructure)
  149. } else {
  150. //Unknown script file. Ignore this
  151. log.Println("This extension is not yet supported: ", ext)
  152. }
  153. }
  154. }
  155. case <-stop:
  156. return
  157. }
  158. }
  159. }()
  160. return stop
  161. }
  162. func (a *Scheduler) Close() {
  163. if a.ticker != nil {
  164. //Stop the ticker
  165. a.ticker <- true
  166. }
  167. }
  168. func (a *Scheduler) GetScheduledJobByName(name string) *Job {
  169. for _, thisJob := range a.jobs {
  170. if thisJob.Name == name {
  171. return thisJob
  172. }
  173. }
  174. return nil
  175. }
  176. func (a *Scheduler) RemoveJobFromScheduleList(taskName string) {
  177. newJobSlice := []*Job{}
  178. for _, j := range a.jobs {
  179. if j.Name != taskName {
  180. thisJob := j
  181. newJobSlice = append(newJobSlice, thisJob)
  182. }
  183. }
  184. a.jobs = newJobSlice
  185. }
  186. func (a *Scheduler) JobExists(name string) bool {
  187. targetJob := a.GetScheduledJobByName(name)
  188. if targetJob == nil {
  189. return false
  190. } else {
  191. return true
  192. }
  193. }
  194. //Write the output to log file. Default to ./system/aecron/{date}.log
  195. func cronlog(message string) {
  196. currentTime := time.Now()
  197. timestamp := currentTime.Format("2006-01-02 15:04:05")
  198. message = timestamp + " " + message
  199. currentLogFile := filepath.ToSlash(filepath.Clean(logFolder)) + "/" + time.Now().Format("01-02-2006") + ".log"
  200. f, err := os.OpenFile(currentLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  201. if err != nil {
  202. //Unable to write to file. Log to STDOUT instead
  203. log.Println(message)
  204. return
  205. }
  206. if _, err := f.WriteString(message + "\n"); err != nil {
  207. log.Println(message)
  208. return
  209. }
  210. defer f.Close()
  211. }