gitlab: order events on the fly

Michael Muré created

Change summary

bridge/gitlab/event.go      | 75 +++++++++++++++++++++++++++-----------
bridge/gitlab/event_test.go | 39 ++++++++++++++++++++
bridge/gitlab/gitlab_api.go | 44 ++--------------------
bridge/gitlab/import.go     |  8 +++
4 files changed, 102 insertions(+), 64 deletions(-)

Detailed changes

bridge/gitlab/event.go 🔗

@@ -2,7 +2,6 @@ package gitlab
 
 import (
 	"fmt"
-	"sort"
 	"strings"
 	"time"
 
@@ -42,6 +41,8 @@ const (
 	EventMentionedInMergeRequest
 )
 
+var _ Event = &NoteEvent{}
+
 type NoteEvent struct{ gitlab.Note }
 
 func (n NoteEvent) ID() string           { return fmt.Sprintf("%d", n.Note.ID) }
@@ -108,6 +109,8 @@ func (n NoteEvent) Title() string {
 	return text.CleanupOneLine(n.Body)
 }
 
+var _ Event = &LabelEvent{}
+
 type LabelEvent struct{ gitlab.LabelEvent }
 
 func (l LabelEvent) ID() string           { return fmt.Sprintf("%d", l.LabelEvent.ID) }
@@ -124,6 +127,8 @@ func (l LabelEvent) Kind() EventKind {
 	}
 }
 
+var _ Event = &StateEvent{}
+
 type StateEvent struct{ gitlab.StateEvent }
 
 func (s StateEvent) ID() string           { return fmt.Sprintf("%d", s.StateEvent.ID) }
@@ -140,6 +145,8 @@ func (s StateEvent) Kind() EventKind {
 	}
 }
 
+var _ Event = &ErrorEvent{}
+
 type ErrorEvent struct {
 	Err  error
 	Time time.Time
@@ -150,28 +157,50 @@ func (e ErrorEvent) UserID() int          { return -1 }
 func (e ErrorEvent) CreatedAt() time.Time { return e.Time }
 func (e ErrorEvent) Kind() EventKind      { return EventError }
 
-// SortedEvents consumes an Event-channel and returns an event slice, sorted by creation date, using CreatedAt-method.
-func SortedEvents(c <-chan Event) []Event {
-	var events []Event
-	for e := range c {
-		events = append(events, e)
-	}
-	sort.Sort(eventsByCreation(events))
-	return events
-}
-
-type eventsByCreation []Event
-
-func (e eventsByCreation) Len() int {
-	return len(e)
-}
-
-func (e eventsByCreation) Less(i, j int) bool {
-	return e[i].CreatedAt().Before(e[j].CreatedAt())
-}
-
-func (e eventsByCreation) Swap(i, j int) {
-	e[i], e[j] = e[j], e[i]
+// SortedEvents fan-in some Event-channels into one, sorted by creation date, using CreatedAt-method.
+// This function assume that each channel is pre-ordered.
+func SortedEvents(inputs ...<-chan Event) chan Event {
+	out := make(chan Event)
+
+	go func() {
+		defer close(out)
+
+		heads := make([]Event, len(inputs))
+
+		// pre-fill the head view
+		for i, input := range inputs {
+			if event, ok := <-input; ok {
+				heads[i] = event
+			}
+		}
+
+		for {
+			var earliestEvent Event
+			var originChannel int
+
+			// pick the earliest event of the heads
+			for i, head := range heads {
+				if head != nil && (earliestEvent == nil || head.CreatedAt().Before(earliestEvent.CreatedAt())) {
+					earliestEvent = head
+					originChannel = i
+				}
+			}
+
+			if earliestEvent == nil {
+				// no event anymore, we are done
+				return
+			}
+
+			// we have an event: consume it and replace it if possible
+			heads[originChannel] = nil
+			if event, ok := <-inputs[originChannel]; ok {
+				heads[originChannel] = event
+			}
+			out <- earliestEvent
+		}
+	}()
+
+	return out
 }
 
 // getNewTitle parses body diff given by gitlab api and return it final form

bridge/gitlab/import_notes_test.go → bridge/gitlab/event_test.go 🔗

@@ -2,8 +2,10 @@ package gitlab
 
 import (
 	"testing"
+	"time"
 
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 )
 
 func TestGetNewTitle(t *testing.T) {
@@ -54,3 +56,40 @@ func TestGetNewTitle(t *testing.T) {
 		})
 	}
 }
+
+var _ Event = mockEvent(0)
+
+type mockEvent int64
+
+func (m mockEvent) ID() string           { panic("implement me") }
+func (m mockEvent) UserID() int          { panic("implement me") }
+func (m mockEvent) Kind() EventKind      { panic("implement me") }
+func (m mockEvent) CreatedAt() time.Time { return time.Unix(int64(m), 0) }
+
+func TestSortedEvents(t *testing.T) {
+	makeInput := func(times ...int64) chan Event {
+		out := make(chan Event)
+		go func() {
+			for _, t := range times {
+				out <- mockEvent(t)
+			}
+			close(out)
+		}()
+		return out
+	}
+
+	sorted := SortedEvents(
+		makeInput(),
+		makeInput(1, 7, 9, 19),
+		makeInput(2, 8, 23),
+		makeInput(35, 48, 59, 64, 721),
+	)
+
+	var previous Event
+	for event := range sorted {
+		if previous != nil {
+			require.True(t, previous.CreatedAt().Before(event.CreatedAt()))
+		}
+		previous = event
+	}
+}

bridge/gitlab/gitlab_api.go 🔗

@@ -2,7 +2,6 @@ package gitlab
 
 import (
 	"context"
-	"sync"
 	"time"
 
 	"github.com/MichaelMure/git-bug/util/text"
@@ -11,7 +10,6 @@ import (
 
 // Issues returns a channel with gitlab project issues, ascending order.
 func Issues(ctx context.Context, client *gitlab.Client, pid string, since time.Time) <-chan *gitlab.Issue {
-
 	out := make(chan *gitlab.Issue)
 
 	go func() {
@@ -24,7 +22,7 @@ func Issues(ctx context.Context, client *gitlab.Client, pid string, since time.T
 		}
 
 		for {
-			issues, resp, err := client.Issues.ListProjectIssues(pid, &opts)
+			issues, resp, err := client.Issues.ListProjectIssues(pid, &opts, gitlab.WithContext(ctx))
 			if err != nil {
 				return
 			}
@@ -44,40 +42,8 @@ func Issues(ctx context.Context, client *gitlab.Client, pid string, since time.T
 	return out
 }
 
-// Issues returns a channel with merged, but unsorted gitlab note, label and state change events.
-func IssueEvents(ctx context.Context, client *gitlab.Client, issue *gitlab.Issue) <-chan Event {
-	cs := []<-chan Event{
-		Notes(ctx, client, issue),
-		LabelEvents(ctx, client, issue),
-		StateEvents(ctx, client, issue),
-	}
-
-	var wg sync.WaitGroup
-	out := make(chan Event)
-
-	output := func(c <-chan Event) {
-		for n := range c {
-			out <- n
-		}
-		wg.Done()
-	}
-
-	wg.Add(len(cs))
-	for _, c := range cs {
-		go output(c)
-	}
-
-	go func() {
-		wg.Wait()
-		close(out)
-	}()
-
-	return out
-}
-
 // Notes returns a channel with note events
 func Notes(ctx context.Context, client *gitlab.Client, issue *gitlab.Issue) <-chan Event {
-
 	out := make(chan Event)
 
 	go func() {
@@ -89,7 +55,7 @@ func Notes(ctx context.Context, client *gitlab.Client, issue *gitlab.Issue) <-ch
 		}
 
 		for {
-			notes, resp, err := client.Notes.ListIssueNotes(issue.ProjectID, issue.IID, &opts)
+			notes, resp, err := client.Notes.ListIssueNotes(issue.ProjectID, issue.IID, &opts, gitlab.WithContext(ctx))
 
 			if err != nil {
 				out <- ErrorEvent{Err: err, Time: time.Now()}
@@ -112,7 +78,6 @@ func Notes(ctx context.Context, client *gitlab.Client, issue *gitlab.Issue) <-ch
 
 // LabelEvents returns a channel with label events.
 func LabelEvents(ctx context.Context, client *gitlab.Client, issue *gitlab.Issue) <-chan Event {
-
 	out := make(chan Event)
 
 	go func() {
@@ -121,7 +86,7 @@ func LabelEvents(ctx context.Context, client *gitlab.Client, issue *gitlab.Issue
 		opts := gitlab.ListLabelEventsOptions{}
 
 		for {
-			events, resp, err := client.ResourceLabelEvents.ListIssueLabelEvents(issue.ProjectID, issue.IID, &opts)
+			events, resp, err := client.ResourceLabelEvents.ListIssueLabelEvents(issue.ProjectID, issue.IID, &opts, gitlab.WithContext(ctx))
 
 			if err != nil {
 				out <- ErrorEvent{Err: err, Time: time.Now()}
@@ -146,7 +111,6 @@ func LabelEvents(ctx context.Context, client *gitlab.Client, issue *gitlab.Issue
 
 // StateEvents returns a channel with state change events.
 func StateEvents(ctx context.Context, client *gitlab.Client, issue *gitlab.Issue) <-chan Event {
-
 	out := make(chan Event)
 
 	go func() {
@@ -155,7 +119,7 @@ func StateEvents(ctx context.Context, client *gitlab.Client, issue *gitlab.Issue
 		opts := gitlab.ListStateEventsOptions{}
 
 		for {
-			events, resp, err := client.ResourceStateEvents.ListIssueStateEvents(issue.ProjectID, issue.IID, &opts)
+			events, resp, err := client.ResourceStateEvents.ListIssueStateEvents(issue.ProjectID, issue.IID, &opts, gitlab.WithContext(ctx))
 			if err != nil {
 				out <- ErrorEvent{Err: err, Time: time.Now()}
 			}

bridge/gitlab/import.go 🔗

@@ -71,7 +71,13 @@ func (gi *gitlabImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
 				return
 			}
 
-			for _, e := range SortedEvents(IssueEvents(ctx, gi.client, issue)) {
+			issueEvents := SortedEvents(
+				Notes(ctx, gi.client, issue),
+				LabelEvents(ctx, gi.client, issue),
+				StateEvents(ctx, gi.client, issue),
+			)
+
+			for e := range issueEvents {
 				if e, ok := e.(ErrorEvent); ok {
 					out <- core.NewImportError(e.Err, "")
 					continue