1package jobs
2
3import (
4 "context"
5 "fmt"
6 "path/filepath"
7 "runtime"
8 "strings"
9
10 "github.com/charmbracelet/log"
11 "github.com/charmbracelet/soft-serve/git"
12 "github.com/charmbracelet/soft-serve/pkg/backend"
13 "github.com/charmbracelet/soft-serve/pkg/config"
14 "github.com/charmbracelet/soft-serve/pkg/db"
15 "github.com/charmbracelet/soft-serve/pkg/lfs"
16 "github.com/charmbracelet/soft-serve/pkg/store"
17 "github.com/charmbracelet/soft-serve/pkg/sync"
18)
19
20func init() {
21 Register("mirror-pull", mirrorPull{})
22}
23
24type mirrorPull struct{}
25
26// Spec derives the spec used for pull mirrors and implements Runner.
27func (m mirrorPull) Spec(ctx context.Context) string {
28 cfg := config.FromContext(ctx)
29 if cfg.Jobs.MirrorPull != "" {
30 return cfg.Jobs.MirrorPull
31 }
32 return "@every 10m"
33}
34
35// Func runs the (pull) mirror job task and implements Runner.
36func (m mirrorPull) Func(ctx context.Context) func() {
37 cfg := config.FromContext(ctx)
38 logger := log.FromContext(ctx).WithPrefix("jobs.mirror")
39 b := backend.FromContext(ctx)
40 dbx := db.FromContext(ctx)
41 datastore := store.FromContext(ctx)
42 return func() {
43 repos, err := b.Repositories(ctx)
44 if err != nil {
45 logger.Error("error getting repositories", "err", err)
46 return
47 }
48
49 // Divide the work up among the number of CPUs.
50 wq := sync.NewWorkPool(ctx, runtime.GOMAXPROCS(0),
51 sync.WithWorkPoolLogger(logger.Errorf),
52 )
53
54 logger.Debug("updating mirror repos")
55 for _, repo := range repos {
56 if repo.IsMirror() {
57 r, err := repo.Open()
58 if err != nil {
59 logger.Error("error opening repository", "repo", repo.Name(), "err", err)
60 continue
61 }
62
63 name := repo.Name()
64 wq.Add(name, func() {
65 repo := repo
66
67 cmds := []string{
68 "fetch --prune", // fetch prune before updating remote
69 "gc --aggressive --prune=now", // aggressive garbage collection
70 "remote update --prune", // update remote and prune remote refs
71 }
72
73 for _, c := range cmds {
74 args := strings.Split(c, " ")
75 cmd := git.NewCommand(args...).WithContext(ctx)
76 cmd.AddEnvs(
77 fmt.Sprintf(`GIT_SSH_COMMAND=ssh -o UserKnownHostsFile="%s" -o StrictHostKeyChecking=no -i "%s"`,
78 filepath.Join(cfg.DataPath, "ssh", "known_hosts"),
79 cfg.SSH.ClientKeyPath,
80 ),
81 )
82
83 if _, err := cmd.RunInDir(r.Path); err != nil {
84 logger.Error("error running git remote update", "repo", name, "err", err)
85 }
86 }
87
88 if cfg.LFS.Enabled {
89 rcfg, err := r.Config()
90 if err != nil {
91 logger.Error("error getting git config", "repo", name, "err", err)
92 return
93 }
94
95 lfsEndpoint := rcfg.Section("lfs").Option("url")
96 if lfsEndpoint == "" {
97 // If there is no LFS url defined, means the repo
98 // doesn't use LFS and we can skip it.
99 return
100 }
101
102 ep, err := lfs.NewEndpoint(lfsEndpoint)
103 if err != nil {
104 logger.Error("error creating LFS endpoint", "repo", name, "err", err)
105 return
106 }
107
108 client := lfs.NewClient(ep)
109 if client == nil {
110 logger.Errorf("failed to create lfs client: unsupported endpoint %s", lfsEndpoint)
111 return
112 }
113
114 if err := backend.StoreRepoMissingLFSObjects(ctx, repo, dbx, datastore, client); err != nil {
115 logger.Error("failed to store missing lfs objects", "err", err, "path", r.Path)
116 return
117 }
118 }
119 })
120 }
121 }
122
123 wq.Run()
124 }
125}