diff --git a/server/cron/cron.go b/server/cron/cron.go index aae506ed54d09edff27558ba2664079e8eafab5f..6fed17b6a15ae6a954e9d6aa8d2418521231b4be 100644 --- a/server/cron/cron.go +++ b/server/cron/cron.go @@ -48,3 +48,14 @@ func (s *Scheduler) Shutdown() { func (s *Scheduler) Start() { s.Cron.Start() } + +// AddFunc adds a job to the Scheduler. +func (s *Scheduler) AddFunc(spec string, fn func()) (int, error) { + id, err := s.Cron.AddFunc(spec, fn) + return int(id), err +} + +// Remove removes a job from the Scheduler. +func (s *Scheduler) Remove(id int) { + s.Cron.Remove(cron.EntryID(id)) +} diff --git a/server/jobs/jobs.go b/server/jobs/jobs.go new file mode 100644 index 0000000000000000000000000000000000000000..ba39af85c6ae9e632d64d9512f96a9ddccbf27b8 --- /dev/null +++ b/server/jobs/jobs.go @@ -0,0 +1,32 @@ +package jobs + +import ( + "context" + "sync" +) + +// Job is a job that can be registered with the scheduler. +type Job struct { + ID int + Spec string + Func func(context.Context) func() +} + +var ( + mtx sync.Mutex + jobs = make(map[string]*Job, 0) +) + +// Register registers a job. +func Register(name, spec string, fn func(context.Context) func()) { + mtx.Lock() + defer mtx.Unlock() + jobs[name] = &Job{Spec: spec, Func: fn} +} + +// List returns a map of registered jobs. +func List() map[string]*Job { + mtx.Lock() + defer mtx.Unlock() + return jobs +} diff --git a/server/jobs.go b/server/jobs/mirror.go similarity index 66% rename from server/jobs.go rename to server/jobs/mirror.go index 2cfbd09844200eb92f4596b8f6eb32911775d315..cf6a620a082760ce0bbc4ca64e441c8f611fdce5 100644 --- a/server/jobs.go +++ b/server/jobs/mirror.go @@ -1,32 +1,36 @@ -package server +package jobs import ( + "context" "fmt" "path/filepath" "runtime" + "github.com/charmbracelet/log" "github.com/charmbracelet/soft-serve/git" "github.com/charmbracelet/soft-serve/server/backend" + "github.com/charmbracelet/soft-serve/server/config" "github.com/charmbracelet/soft-serve/server/sync" ) -var jobSpecs = map[string]string{ - "mirror": "@every 10m", +func init() { + Register("mirror-pull", "@every 10m", mirrorPull) } -// mirrorJob runs the (pull) mirror job task. -func (s *Server) mirrorJob(b *backend.Backend) func() { - cfg := s.Config - logger := s.logger +// mirrorPull runs the (pull) mirror job task. +func mirrorPull(ctx context.Context) func() { + cfg := config.FromContext(ctx) + logger := log.FromContext(ctx).WithPrefix("jobs.mirror") + b := backend.FromContext(ctx) return func() { - repos, err := b.Repositories(s.ctx) + repos, err := b.Repositories(ctx) if err != nil { logger.Error("error getting repositories", "err", err) return } // Divide the work up among the number of CPUs. - wq := sync.NewWorkPool(s.ctx, runtime.GOMAXPROCS(0), + wq := sync.NewWorkPool(ctx, runtime.GOMAXPROCS(0), sync.WithWorkPoolLogger(logger.Errorf), ) @@ -41,7 +45,7 @@ func (s *Server) mirrorJob(b *backend.Backend) func() { name := repo.Name() wq.Add(name, func() { - cmd := git.NewCommand("remote", "update", "--prune") + cmd := git.NewCommand("remote", "update", "--prune").WithContext(ctx) cmd.AddEnvs( fmt.Sprintf(`GIT_SSH_COMMAND=ssh -o UserKnownHostsFile="%s" -o StrictHostKeyChecking=no -i "%s"`, filepath.Join(cfg.DataPath, "ssh", "known_hosts"), diff --git a/server/server.go b/server/server.go index 0b3f23b0e2b3055597d32e4568442218f2a84f92..8c73ad213dd6d68a51f7463604e56d851ff70d47 100644 --- a/server/server.go +++ b/server/server.go @@ -13,6 +13,7 @@ import ( "github.com/charmbracelet/soft-serve/server/cron" "github.com/charmbracelet/soft-serve/server/daemon" "github.com/charmbracelet/soft-serve/server/db" + "github.com/charmbracelet/soft-serve/server/jobs" sshsrv "github.com/charmbracelet/soft-serve/server/ssh" "github.com/charmbracelet/soft-serve/server/stats" "github.com/charmbracelet/soft-serve/server/web" @@ -44,8 +45,8 @@ func NewServer(ctx context.Context) (*Server, error) { cfg := config.FromContext(ctx) be := backend.FromContext(ctx) db := db.FromContext(ctx) + logger := log.FromContext(ctx).WithPrefix("server") srv := &Server{ - Cron: cron.NewScheduler(ctx), Config: cfg, Backend: be, DB: db, @@ -54,7 +55,17 @@ func NewServer(ctx context.Context) (*Server, error) { } // Add cron jobs. - _, _ = srv.Cron.AddFunc(jobSpecs["mirror"], srv.mirrorJob(be)) + sched := cron.NewScheduler(ctx) + for n, j := range jobs.List() { + id, err := sched.AddFunc(j.Spec, j.Func(ctx)) + if err != nil { + logger.Warn("error adding cron job", "job", n, "err", err) + } + + j.ID = id + } + + srv.Cron = sched srv.SSHServer, err = sshsrv.NewSSHServer(ctx) if err != nil { @@ -133,7 +144,10 @@ func (s *Server) Shutdown(ctx context.Context) error { return s.StatsServer.Shutdown(ctx) }) errg.Go(func() error { - s.Cron.Stop() + for _, j := range jobs.List() { + s.Cron.Remove(j.ID) + } + s.Cron.Shutdown() return nil }) // defer s.DB.Close() // nolint: errcheck