fix(log): respect log config (#280)

Ayman Bagabas and Carlos Alexandro Becker created

* fix(log): respect log config

Parse log config

fix(git): ensure os envs are present

feat: set the default time format to dateTime

fix(log): change update mirror log message into debug

fix(config): rename log config struct

fix(config): always return cfg

* perf: update mirrors in a workpool (#285)

* perf: update mirrors in a workqueue

Implement a simple chunked workqueue to queue updating mirrors. We use
the number of cpus to calculate the number of workers to distribute the
work to.

* fix: use automaxprocs

Signed-off-by: Carlos Alexandro Becker <caarlos0@users.noreply.github.com>

* fix: set maxprocs in main

* feat(wp): use a workpool impl

Use semaphores to implement a workpool of n workers
and use that to run the mirroring job

---------

Signed-off-by: Carlos Alexandro Becker <caarlos0@users.noreply.github.com>
Co-authored-by: Carlos Alexandro Becker <caarlos0@users.noreply.github.com>

---------

Signed-off-by: Carlos Alexandro Becker <caarlos0@users.noreply.github.com>
Co-authored-by: Carlos Alexandro Becker <caarlos0@users.noreply.github.com>

Change summary

cmd/soft/root.go           |  8 +++
go.mod                     |  1 
go.sum                     |  3 +
internal/log/log.go        | 18 +++++-
internal/sync/workqueue.go | 98 ++++++++++++++++++++++++++++++++++++++++
server/config/config.go    | 34 ++++++++-----
server/config/file.go      | 10 ++-
server/git/git.go          |  6 +-
server/jobs.go             | 43 +++++++++++------
server/server.go           |  2 
10 files changed, 183 insertions(+), 40 deletions(-)

Detailed changes

cmd/soft/root.go 🔗

@@ -8,6 +8,7 @@ import (
 	"github.com/charmbracelet/log"
 	. "github.com/charmbracelet/soft-serve/internal/log"
 	"github.com/spf13/cobra"
+	"go.uber.org/automaxprocs/maxprocs"
 )
 
 var (
@@ -52,6 +53,13 @@ func init() {
 
 func main() {
 	logger := NewDefaultLogger()
+
+	// Set the max number of processes to the number of CPUs
+	// This is useful when running soft serve in a container
+	if _, err := maxprocs.Set(maxprocs.Logger(logger.Debugf)); err != nil {
+		logger.Warn("couldn't set automaxprocs", "error", err)
+	}
+
 	ctx := log.WithContext(context.Background(), logger)
 	if err := rootCmd.ExecuteContext(ctx); err != nil {
 		os.Exit(1)

go.mod 🔗

@@ -32,6 +32,7 @@ require (
 	github.com/prometheus/client_golang v1.15.1
 	github.com/robfig/cron/v3 v3.0.1
 	github.com/spf13/cobra v1.7.0
+	go.uber.org/automaxprocs v1.5.2
 	goji.io v2.0.2+incompatible
 	golang.org/x/crypto v0.9.0
 	golang.org/x/sync v0.2.0

go.sum 🔗

@@ -161,6 +161,7 @@ github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFz
 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
 github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI=
 github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
 github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
@@ -207,6 +208,8 @@ github.com/yuin/goldmark v1.5.2 h1:ALmeCk/px5FSm1MAcFBAsVKZjDuMVj8Tm7FFIlMJnqU=
 github.com/yuin/goldmark v1.5.2/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
 github.com/yuin/goldmark-emoji v1.0.1 h1:ctuWEyzGBwiucEqxzwe0SOYDXPAucOrE9NQC18Wa1os=
 github.com/yuin/goldmark-emoji v1.0.1/go.mod h1:2w1E6FEWLcDQkoTE+7HU6QF1F6SLlNGjRIBbIZQFqkQ=
+go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME=
+go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
 goji.io v2.0.2+incompatible h1:uIssv/elbKRLznFUy3Xj4+2Mz/qKhek/9aZQDUMae7c=
 goji.io v2.0.2+incompatible/go.mod h1:sbqFwrtqZACxLBTQcdgVjFh54yGVCvwq8+w49MVMMIk=
 golang.org/x/arch v0.1.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=

internal/log/log.go 🔗

@@ -2,17 +2,29 @@ package log
 
 import (
 	"os"
+	"path/filepath"
 	"strconv"
 	"strings"
 	"time"
 
 	"github.com/charmbracelet/log"
+	"github.com/charmbracelet/soft-serve/server/config"
 )
 
 var contextKey = &struct{ string }{"logger"}
 
 // NewDefaultLogger returns a new logger with default settings.
 func NewDefaultLogger() *log.Logger {
+	dp := os.Getenv("SOFT_SERVE_DATA_PATH")
+	if dp == "" {
+		dp = "data"
+	}
+
+	cfg, err := config.ParseConfig(filepath.Join(dp, "config.yaml"))
+	if err != nil {
+		log.Errorf("failed to parse config: %v", err)
+	}
+
 	logger := log.NewWithOptions(os.Stderr, log.Options{
 		ReportTimestamp: true,
 		TimeFormat:      time.DateOnly,
@@ -22,11 +34,9 @@ func NewDefaultLogger() *log.Logger {
 		logger.SetLevel(log.DebugLevel)
 	}
 
-	if tsfmt := os.Getenv("SOFT_SERVE_LOG_TIME_FORMAT"); tsfmt != "" {
-		logger.SetTimeFormat(tsfmt)
-	}
+	logger.SetTimeFormat(cfg.Log.TimeFormat)
 
-	switch strings.ToLower(os.Getenv("SOFT_SERVE_LOG_FORMAT")) {
+	switch strings.ToLower(cfg.Log.Format) {
 	case "json":
 		logger.SetFormatter(log.JSONFormatter)
 	case "logfmt":

internal/sync/workqueue.go 🔗

@@ -0,0 +1,98 @@
+package sync
+
+import (
+	"context"
+	"sync"
+
+	"golang.org/x/sync/semaphore"
+)
+
+// WorkPool is a pool of work to be done.
+type WorkPool struct {
+	workers int
+	work    map[string]func()
+	mu      sync.RWMutex
+	sem     *semaphore.Weighted
+	ctx     context.Context
+	logger  func(string, ...interface{})
+}
+
+// WorkPoolOption is a function that configures a WorkPool.
+type WorkPoolOption func(*WorkPool)
+
+// WithWorkPoolLogger sets the logger to use.
+func WithWorkPoolLogger(logger func(string, ...interface{})) WorkPoolOption {
+	return func(wq *WorkPool) {
+		wq.logger = logger
+	}
+}
+
+// NewWorkPool creates a new work pool. The workers argument specifies the
+// number of concurrent workers to run the work.
+// The queue will chunk the work into batches of workers size.
+func NewWorkPool(ctx context.Context, workers int, opts ...WorkPoolOption) *WorkPool {
+	wq := &WorkPool{
+		workers: workers,
+		work:    make(map[string]func()),
+		ctx:     ctx,
+	}
+
+	for _, opt := range opts {
+		opt(wq)
+	}
+
+	if wq.workers <= 0 {
+		wq.workers = 1
+	}
+
+	wq.sem = semaphore.NewWeighted(int64(wq.workers))
+
+	return wq
+}
+
+// Run starts the workers and waits for them to finish.
+func (wq *WorkPool) Run() {
+	for id, fn := range wq.work {
+		if err := wq.sem.Acquire(wq.ctx, 1); err != nil {
+			wq.logf("workpool: %v", err)
+			return
+		}
+
+		go func(id string, fn func()) {
+			defer wq.sem.Release(1)
+			fn()
+			wq.mu.Lock()
+			delete(wq.work, id)
+			wq.mu.Unlock()
+		}(id, fn)
+	}
+
+	if err := wq.sem.Acquire(wq.ctx, int64(wq.workers)); err != nil {
+		wq.logf("workpool: %v", err)
+	}
+}
+
+// Add adds a new job to the pool.
+// If the job already exists, it is a no-op.
+func (wq *WorkPool) Add(id string, fn func()) {
+	wq.mu.Lock()
+	defer wq.mu.Unlock()
+	if _, ok := wq.work[id]; ok {
+		return
+	}
+	wq.work[id] = fn
+}
+
+// Status checks if a job is in the queue.
+func (wq *WorkPool) Status(id string) bool {
+	wq.mu.RLock()
+	defer wq.mu.RUnlock()
+	_, ok := wq.work[id]
+	return ok
+}
+
+func (wq *WorkPool) logf(format string, args ...interface{}) {
+	if wq.logger != nil {
+		wq.logger(format, args...)
+	}
+}

server/config/config.go 🔗

@@ -73,6 +73,17 @@ type StatsConfig struct {
 	ListenAddr string `env:"LISTEN_ADDR" yaml:"listen_addr"`
 }
 
+// LogConfig is the logger configuration.
+type LogConfig struct {
+	// Format is the format of the logs.
+	// Valid values are "json", "logfmt", and "text".
+	Format string `env:"FORMAT" yaml:"format"`
+
+	// Time format for the log `ts` field.
+	// Format must be described in Golang's time format.
+	TimeFormat string `env:"TIME_FORMAT" yaml:"time_format"`
+}
+
 // Config is the configuration for Soft Serve.
 type Config struct {
 	// Name is the name of the server.
@@ -90,13 +101,8 @@ type Config struct {
 	// Stats is the configuration for the stats server.
 	Stats StatsConfig `envPrefix:"STATS_" yaml:"stats"`
 
-	// LogFormat is the format of the logs.
-	// Valid values are "json", "logfmt", and "text".
-	LogFormat string `env:"LOG_FORMAT" yaml:"log_format"`
-
-	// Time format for the log `ts` field.
-	// Format must be described in Golang's time format.
-	LogTimeFormat string `env:"LOG_TIME_FORMAT" yaml:"log_time_format"`
+	// Log is the logger configuration.
+	Log LogConfig `envPrefix:"LOG_" yaml:"log"`
 
 	// InitialAdminKeys is a list of public keys that will be added to the list of admins.
 	InitialAdminKeys []string `env:"INITIAL_ADMIN_KEYS" envSeparator:"\n" yaml:"initial_admin_keys"`
@@ -111,10 +117,8 @@ type Config struct {
 func parseConfig(path string) (*Config, error) {
 	dataPath := filepath.Dir(path)
 	cfg := &Config{
-		Name:          "Soft Serve",
-		LogFormat:     "text",
-		LogTimeFormat: time.DateOnly,
-		DataPath:      dataPath,
+		Name:     "Soft Serve",
+		DataPath: dataPath,
 		SSH: SSHConfig{
 			ListenAddr:    ":23231",
 			PublicURL:     "ssh://localhost:23231",
@@ -136,6 +140,10 @@ func parseConfig(path string) (*Config, error) {
 		Stats: StatsConfig{
 			ListenAddr: "localhost:23233",
 		},
+		Log: LogConfig{
+			Format:     "text",
+			TimeFormat: time.DateTime,
+		},
 	}
 
 	f, err := os.Open(path)
@@ -182,11 +190,11 @@ func parseConfig(path string) (*Config, error) {
 func ParseConfig(path string) (*Config, error) {
 	cfg, err := parseConfig(path)
 	if err != nil {
-		return nil, err
+		return cfg, err
 	}
 
 	if err := cfg.validate(); err != nil {
-		return nil, err
+		return cfg, err
 	}
 
 	return cfg, nil

server/config/file.go 🔗

@@ -11,9 +11,13 @@ var configFileTmpl = template.Must(template.New("config").Parse(`# Soft Serve Se
 # This is the name that will be displayed in the UI.
 name: "{{ .Name }}"
 
-# Log format to use. Valid values are "json", "logfmt", and "text".
-log_format: "{{ .LogFormat }}"
-log_time_format: "{{ .LogTimeFormat }}"
+# Logging configuration.
+log:
+  # Log format to use. Valid values are "json", "logfmt", and "text".
+  format: "{{ .Log.Format }}"
+  # Time format for the log "timestamp" field.
+  # Should be described in Golang's time format.
+  time_format: "{{ .Log.TimeFormat }}"
 
 # The SSH server configuration.
 ssh:

server/git/git.go 🔗

@@ -83,12 +83,12 @@ func RunGit(ctx context.Context, in io.Reader, out io.Writer, er io.Writer, dir
 	logger := log.FromContext(ctx).WithPrefix("rungit")
 	c := exec.CommandContext(ctx, "git", args...)
 	c.Dir = dir
-	c.Env = append(c.Env, envs...)
+	c.Env = append(os.Environ(), envs...)
 	c.Env = append(c.Env, "PATH="+os.Getenv("PATH"))
 	c.Env = append(c.Env, "SOFT_SERVE_DEBUG="+os.Getenv("SOFT_SERVE_DEBUG"))
 	if cfg != nil {
-		c.Env = append(c.Env, "SOFT_SERVE_LOG_FORMAT="+cfg.LogFormat)
-		c.Env = append(c.Env, "SOFT_SERVE_LOG_TIME_FORMAT="+cfg.LogTimeFormat)
+		c.Env = append(c.Env, "SOFT_SERVE_LOG_FORMAT="+cfg.Log.Format)
+		c.Env = append(c.Env, "SOFT_SERVE_LOG_TIME_FORMAT="+cfg.Log.TimeFormat)
 	}
 
 	stdin, err := c.StdinPipe()

server/jobs.go 🔗

@@ -3,15 +3,15 @@ package server
 import (
 	"fmt"
 	"path/filepath"
+	"runtime"
 
 	"github.com/charmbracelet/soft-serve/git"
+	"github.com/charmbracelet/soft-serve/internal/sync"
 )
 
-var (
-	jobSpecs = map[string]string{
-		"mirror": "@every 10m",
-	}
-)
+var jobSpecs = map[string]string{
+	"mirror": "@every 10m",
+}
 
 // mirrorJob runs the (pull) mirror job task.
 func (s *Server) mirrorJob() func() {
@@ -25,26 +25,37 @@ func (s *Server) mirrorJob() func() {
 			return
 		}
 
+		// Divide the work up among the number of CPUs.
+		wq := sync.NewWorkPool(s.ctx, runtime.GOMAXPROCS(0),
+			sync.WithWorkPoolLogger(logger.Errorf),
+		)
+
+		logger.Debug("updating mirror repos")
 		for _, repo := range repos {
 			if repo.IsMirror() {
-				logger.Info("updating mirror", "repo", repo.Name())
 				r, err := repo.Open()
 				if err != nil {
 					logger.Error("error opening repository", "repo", repo.Name(), "err", err)
 					continue
 				}
 
-				cmd := git.NewCommand("remote", "update", "--prune")
-				cmd.AddEnvs(
-					fmt.Sprintf(`GIT_SSH_COMMAND=ssh -o UserKnownHostsFile="%s" -o StrictHostKeyChecking=no -i "%s"`,
-						filepath.Join(cfg.DataPath, "ssh", "known_hosts"),
-						cfg.SSH.ClientKeyPath,
-					),
-				)
-				if _, err := cmd.RunInDir(r.Path); err != nil {
-					logger.Error("error running git remote update", "repo", repo.Name(), "err", err)
-				}
+				name := repo.Name()
+				wq.Add(name, func() {
+					cmd := git.NewCommand("remote", "update", "--prune")
+					cmd.AddEnvs(
+						fmt.Sprintf(`GIT_SSH_COMMAND=ssh -o UserKnownHostsFile="%s" -o StrictHostKeyChecking=no -i "%s"`,
+							filepath.Join(cfg.DataPath, "ssh", "known_hosts"),
+							cfg.SSH.ClientKeyPath,
+						),
+					)
+					if _, err := cmd.RunInDir(r.Path); err != nil {
+						logger.Error("error running git remote update", "repo", name, "err", err)
+					}
+
+				})
 			}
 		}
+
+		wq.Run()
 	}
 }

server/server.go 🔗

@@ -62,7 +62,7 @@ func NewServer(ctx context.Context) (*Server, error) {
 	}
 
 	// Add cron jobs.
-	srv.Cron.AddFunc(jobSpecs["mirror"], srv.mirrorJob())
+	_, _ = srv.Cron.AddFunc(jobSpecs["mirror"], srv.mirrorJob())
 
 	srv.SSHServer, err = sshsrv.NewSSHServer(ctx)
 	if err != nil {