lfs.go

 1package backend
 2
 3import (
 4	"context"
 5	"errors"
 6	"io"
 7	"path"
 8	"path/filepath"
 9	"strconv"
10
11	"github.com/charmbracelet/soft-serve/pkg/config"
12	"github.com/charmbracelet/soft-serve/pkg/db"
13	"github.com/charmbracelet/soft-serve/pkg/lfs"
14	"github.com/charmbracelet/soft-serve/pkg/proto"
15	"github.com/charmbracelet/soft-serve/pkg/storage"
16	"github.com/charmbracelet/soft-serve/pkg/store"
17)
18
19// StoreRepoMissingLFSObjects stores missing LFS objects for a repository.
20func StoreRepoMissingLFSObjects(ctx context.Context, repo proto.Repository, dbx *db.DB, store store.Store, lfsClient lfs.Client) error {
21	cfg := config.FromContext(ctx)
22	repoID := strconv.FormatInt(repo.ID(), 10)
23	lfsRoot := filepath.Join(cfg.DataPath, "lfs", repoID)
24
25	// TODO: support S3 storage
26	strg := storage.NewLocalStorage(lfsRoot)
27	pointerChan := make(chan lfs.PointerBlob)
28	errChan := make(chan error, 1)
29	r, err := repo.Open()
30	if err != nil {
31		return err
32	}
33
34	go lfs.SearchPointerBlobs(ctx, r, pointerChan, errChan)
35
36	download := func(pointers []lfs.Pointer) error {
37		return lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error {
38			if objectError != nil {
39				return objectError
40			}
41
42			defer content.Close() //nolint: errcheck
43			return dbx.TransactionContext(ctx, func(tx *db.Tx) error {
44				if err := store.CreateLFSObject(ctx, tx, repo.ID(), p.Oid, p.Size); err != nil {
45					return db.WrapError(err)
46				}
47
48				_, err := strg.Put(path.Join("objects", p.RelativePath()), content)
49				return err
50			})
51		})
52	}
53
54	var batch []lfs.Pointer
55	for pointer := range pointerChan {
56		obj, err := store.GetLFSObjectByOid(ctx, dbx, repo.ID(), pointer.Oid)
57		if err != nil && !errors.Is(err, db.ErrRecordNotFound) {
58			return db.WrapError(err)
59		}
60
61		exist, err := strg.Exists(path.Join("objects", pointer.RelativePath()))
62		if err != nil {
63			return err
64		}
65
66		if exist && obj.ID == 0 {
67			if err := store.CreateLFSObject(ctx, dbx, repo.ID(), pointer.Oid, pointer.Size); err != nil {
68				return db.WrapError(err)
69			}
70		} else {
71			batch = append(batch, pointer.Pointer)
72			// Limit batch requests to 20 objects
73			if len(batch) >= 20 {
74				if err := download(batch); err != nil {
75					return err
76				}
77
78				batch = nil
79			}
80		}
81	}
82
83	if err, ok := <-errChan; ok {
84		return err
85	}
86
87	return nil
88}