Github bridge: Refactor

Alexander Scharinger created

Change summary

bridge/github/import.go          | 385 +++++++++++++++------------------
bridge/github/import_mediator.go | 331 +++++++++-------------------
bridge/github/import_query.go    |  22 +
3 files changed, 303 insertions(+), 435 deletions(-)

Detailed changes

bridge/github/import.go 🔗

@@ -54,64 +54,80 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
 
 	go func() {
 		defer close(gi.out)
-
-		// Loop over all matching issues
-		for event := range gi.mediator.Issues {
-			var issue issue
-			var issueEdits <-chan userContentEditEvent
-			var timelineItems <-chan timelineEvent
-			switch e := event.(type) {
-			case messageEvent:
-				fmt.Println(e.msg)
-				continue
-			case issueData:
-				issue = e.issue
-				issueEdits = e.issueEdits
-				timelineItems = e.timelineItems
-			default:
-				panic(fmt.Sprint("Unknown event type"))
+		var currBug *cache.BugCache
+		var currEvent ImportEvent
+		var nextEvent ImportEvent
+		var err error
+		for {
+			// We need the current event and one look ahead event.
+			currEvent = nextEvent
+			if currEvent == nil {
+				currEvent = gi.mediator.NextImportEvent()
 			}
-			// create issue
-			b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits)
-			if err != nil {
-				err := fmt.Errorf("issue creation: %v", err)
-				out <- core.NewImportError(err, "")
-				return
+			if currEvent == nil {
+				break
 			}
-
-			// loop over timeline items
-			for event := range timelineItems {
-				var item timelineItem
-				var edits <-chan userContentEditEvent
-				switch e := event.(type) {
-				case messageEvent:
-					fmt.Println(e.msg)
-					continue
-				case timelineData:
-					item = e.timelineItem
-					edits = e.userContentEdits
+			nextEvent = gi.mediator.NextImportEvent()
+
+			switch event := currEvent.(type) {
+			case MessageEvent:
+				fmt.Println(event.msg)
+			case IssueEvent:
+				// first: commit what is being held in currBug
+				if err = gi.commit(currBug, out); err != nil {
+					out <- core.NewImportError(err, "")
+					return
+				}
+				// second: create new issue
+				switch next := nextEvent.(type) {
+				case IssueEditEvent:
+					// consuming and using next event
+					nextEvent = nil
+					currBug, err = gi.ensureIssue(ctx, repo, &event.issue, &next.userContentEdit)
 				default:
-					panic(fmt.Sprint("Unknown event type"))
+					currBug, err = gi.ensureIssue(ctx, repo, &event.issue, nil)
+				}
+				if err != nil {
+					err := fmt.Errorf("issue creation: %v", err)
+					out <- core.NewImportError(err, "")
+					return
+				}
+			case IssueEditEvent:
+				err = gi.ensureIssueEdit(ctx, repo, currBug, event.issueId, &event.userContentEdit)
+				if err != nil {
+					err = fmt.Errorf("issue edit: %v", err)
+					out <- core.NewImportError(err, "")
+					return
+				}
+			case TimelineEvent:
+				if next, ok := nextEvent.(CommentEditEvent); ok && event.Typename == "IssueComment" {
+					// consuming and using next event
+					nextEvent = nil
+					err = gi.ensureComment(ctx, repo, currBug, &event.timelineItem.IssueComment, &next.userContentEdit)
+				} else {
+					err = gi.ensureTimelineItem(ctx, repo, currBug, &event.timelineItem)
 				}
-				err := gi.ensureTimelineItem(ctx, repo, b, &item, edits)
 				if err != nil {
 					err = fmt.Errorf("timeline item creation: %v", err)
 					out <- core.NewImportError(err, "")
 					return
 				}
-			}
-
-			if !b.NeedCommit() {
-				out <- core.NewImportNothing(b.Id(), "no imported operation")
-			} else if err := b.Commit(); err != nil {
-				// commit bug state
-				err = fmt.Errorf("bug commit: %v", err)
-				out <- core.NewImportError(err, "")
-				return
+			case CommentEditEvent:
+				err = gi.ensureCommentEdit(ctx, repo, currBug, event.commentId, &event.userContentEdit)
+				if err != nil {
+					err = fmt.Errorf("comment edit: %v", err)
+					out <- core.NewImportError(err, "")
+					return
+				}
+			default:
+				panic("Unknown event type")
 			}
 		}
-
-		if err := gi.mediator.Error(); err != nil {
+		// commit what is being held in currBug before returning
+		if err = gi.commit(currBug, out); err != nil {
+			out <- core.NewImportError(err, "")
+		}
+		if err = gi.mediator.Error(); err != nil {
 			gi.out <- core.NewImportError(err, "")
 		}
 	}()
@@ -119,27 +135,21 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
 	return out, nil
 }
 
-// getNextUserContentEdit reads the input channel, handles messages, and returns the next
-// userContentEditData.
-func getNextUserContentEdit(in <-chan userContentEditEvent) (*userContentEditData, bool) {
-	for {
-		event, hasEvent := <-in
-		if !hasEvent {
-			return nil, false
-		}
-		switch e := event.(type) {
-		case messageEvent:
-			fmt.Println(e.msg)
-			continue
-		case userContentEditData:
-			return &e, true
-		default:
-			panic(fmt.Sprint("Unknown event type"))
-		}
+func (gi *githubImporter) commit(b *cache.BugCache, out chan<- core.ImportResult) error {
+	if b == nil {
+		return nil
 	}
+	if !b.NeedCommit() {
+		out <- core.NewImportNothing(b.Id(), "no imported operation")
+		return nil
+	} else if err := b.Commit(); err != nil {
+		// commit bug state
+		return fmt.Errorf("bug commit: %v", err)
+	}
+	return nil
 }
 
-func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEditEvents <-chan userContentEditEvent) (*cache.BugCache, error) {
+func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdit *userContentEdit) (*cache.BugCache, error) {
 	author, err := gi.ensurePerson(ctx, repo, issue.Author)
 	if err != nil {
 		return nil, err
@@ -150,14 +160,13 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
 		return excerpt.CreateMetadata[core.MetaKeyOrigin] == target &&
 			excerpt.CreateMetadata[metaKeyGithubId] == parseId(issue.Id)
 	})
-	if err != nil && err != bug.ErrBugNotExist {
+	if err == nil {
+		return b, nil
+	}
+	if err != bug.ErrBugNotExist {
 		return nil, err
 	}
 
-	// get first issue edit
-	// if it exists, then it holds the bug creation
-	firstEdit, hasEdit := getNextUserContentEdit(issueEditEvents)
-
 	// At Github there exist issues with seemingly empty titles. An example is
 	// https://github.com/NixOS/nixpkgs/issues/72730 .
 	// The title provided by the GraphQL API actually consists of a space followed by a
@@ -168,70 +177,49 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
 		title = EMPTY_TITLE_PLACEHOLDER
 	}
 
-	if err == bug.ErrBugNotExist {
-		var textInput string
-		if hasEdit {
-			// use the first issue edit: it represents the bug creation itself
-			textInput = string(*firstEdit.Diff)
-		} else {
-			// if there are no issue edits then the issue struct holds the bug creation
-			textInput = string(issue.Body)
-		}
-		cleanText, err := text.Cleanup(textInput)
-		if err != nil {
-			return nil, err
-		}
-		// create bug
-		b, _, err = repo.NewBugRaw(
-			author,
-			issue.CreatedAt.Unix(),
-			title, // TODO: this is the *current* title, not the original one
-			cleanText,
-			nil,
-			map[string]string{
-				core.MetaKeyOrigin: target,
-				metaKeyGithubId:    parseId(issue.Id),
-				metaKeyGithubUrl:   issue.Url.String(),
-			})
-		if err != nil {
-			return nil, err
-		}
-		// importing a new bug
-		gi.out <- core.NewImportBug(b.Id())
+	var textInput string
+	if issueEdit != nil {
+		// use the first issue edit: it represents the bug creation itself
+		textInput = string(*issueEdit.Diff)
+	} else {
+		// if there are no issue edits then the issue struct holds the bug creation
+		textInput = string(issue.Body)
 	}
-	if b == nil {
-		return nil, fmt.Errorf("finding or creating issue")
+	cleanText, err := text.Cleanup(textInput)
+	if err != nil {
+		return nil, err
 	}
-	// process remaining issue edits, if they exist
-	for {
-		edit, hasEdit := getNextUserContentEdit(issueEditEvents)
-		if !hasEdit {
-			break
-		}
-		// other edits will be added as CommentEdit operations
-		target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id))
-		if err == cache.ErrNoMatchingOp {
-			// original comment is missing somehow, issuing a warning
-			gi.out <- core.NewImportWarning(fmt.Errorf("comment ID %s to edit is missing", parseId(issue.Id)), b.Id())
-			continue
-		}
-		if err != nil {
-			return nil, err
-		}
 
-		err = gi.ensureCommentEdit(ctx, repo, b, target, &edit.userContentEdit)
-		if err != nil {
-			return nil, err
-		}
+	// create bug
+	b, _, err = repo.NewBugRaw(
+		author,
+		issue.CreatedAt.Unix(),
+		title, // TODO: this is the *current* title, not the original one
+		cleanText,
+		nil,
+		map[string]string{
+			core.MetaKeyOrigin: target,
+			metaKeyGithubId:    parseId(issue.Id),
+			metaKeyGithubUrl:   issue.Url.String(),
+		})
+	if err != nil {
+		return nil, err
 	}
+	// importing a new bug
+	gi.out <- core.NewImportBug(b.Id())
+
 	return b, nil
 }
 
-func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEditEvent) error {
+func (gi *githubImporter) ensureIssueEdit(ctx context.Context, repo *cache.RepoCache, bug *cache.BugCache, ghIssueId githubv4.ID, edit *userContentEdit) error {
+	return gi.ensureCommentEdit(ctx, repo, bug, ghIssueId, edit)
+}
+
+func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem) error {
 
 	switch item.Typename {
 	case "IssueComment":
-		err := gi.ensureComment(ctx, repo, b, &item.IssueComment, commentEdits)
+		err := gi.ensureComment(ctx, repo, b, &item.IssueComment, nil)
 		if err != nil {
 			return fmt.Errorf("timeline comment creation: %v", err)
 		}
@@ -390,75 +378,62 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re
 	return nil
 }
 
-func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEditEvents <-chan userContentEditEvent) error {
-	author, err := gi.ensurePerson(ctx, repo, comment.Author)
+func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, ghTargetId githubv4.ID, edit *userContentEdit) error {
+	// find comment
+	target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(ghTargetId))
 	if err != nil {
 		return err
 	}
-
-	targetOpID, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(comment.Id))
-	if err != nil && err != cache.ErrNoMatchingOp {
+	_, err = b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id))
+	if err == nil {
+		return nil
+	}
+	if err != cache.ErrNoMatchingOp {
 		// real error
 		return err
 	}
-	firstEdit, hasEdit := getNextUserContentEdit(commentEditEvents)
-	if err == cache.ErrNoMatchingOp {
-		var textInput string
-		if hasEdit {
-			// use the first comment edit: it represents the comment creation itself
-			textInput = string(*firstEdit.Diff)
-		} else {
-			// if there are not comment edits, then the comment struct holds the comment creation
-			textInput = string(comment.Body)
-		}
-		cleanText, err := text.Cleanup(textInput)
-		if err != nil {
-			return err
-		}
 
-		// add comment operation
-		op, err := b.AddCommentRaw(
-			author,
-			comment.CreatedAt.Unix(),
-			cleanText,
-			nil,
-			map[string]string{
-				metaKeyGithubId:  parseId(comment.Id),
-				metaKeyGithubUrl: comment.Url.String(),
-			},
-		)
-		if err != nil {
-			return err
-		}
+	editor, err := gi.ensurePerson(ctx, repo, edit.Editor)
+	if err != nil {
+		return err
+	}
 
-		gi.out <- core.NewImportComment(op.Id())
-		targetOpID = op.Id()
+	if edit.DeletedAt != nil {
+		// comment deletion, not supported yet
+		return nil
 	}
-	if targetOpID == "" {
-		return fmt.Errorf("finding or creating issue comment")
+
+	cleanText, err := text.Cleanup(string(*edit.Diff))
+	if err != nil {
+		return err
 	}
-	// process remaining comment edits, if they exist
-	for {
-		edit, hasEdit := getNextUserContentEdit(commentEditEvents)
-		if !hasEdit {
-			break
-		}
-		// ensure editor identity
-		_, err := gi.ensurePerson(ctx, repo, edit.Editor)
-		if err != nil {
-			return err
-		}
 
-		err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit.userContentEdit)
-		if err != nil {
-			return err
-		}
+	// comment edition
+	op, err := b.EditCommentRaw(
+		editor,
+		edit.CreatedAt.Unix(),
+		target,
+		cleanText,
+		map[string]string{
+			metaKeyGithubId: parseId(edit.Id),
+		},
+	)
+
+	if err != nil {
+		return err
 	}
+
+	gi.out <- core.NewImportCommentEdition(op.Id())
 	return nil
 }
 
-func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit *userContentEdit) error {
-	_, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id))
+func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, firstEdit *userContentEdit) error {
+	author, err := gi.ensurePerson(ctx, repo, comment.Author)
+	if err != nil {
+		return err
+	}
+
+	_, err = b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(comment.Id))
 	if err == nil {
 		return nil
 	}
@@ -467,41 +442,35 @@ func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.Rep
 		return err
 	}
 
-	editor, err := gi.ensurePerson(ctx, repo, edit.Editor)
+	var textInput string
+	if firstEdit != nil {
+		// use the first comment edit: it represents the comment creation itself
+		textInput = string(*firstEdit.Diff)
+	} else {
+		// if there are not comment edits, then the comment struct holds the comment creation
+		textInput = string(comment.Body)
+	}
+	cleanText, err := text.Cleanup(textInput)
 	if err != nil {
 		return err
 	}
 
-	switch {
-	case edit.DeletedAt != nil:
-		// comment deletion, not supported yet
-		return nil
-
-	case edit.DeletedAt == nil:
-
-		cleanText, err := text.Cleanup(string(*edit.Diff))
-		if err != nil {
-			return err
-		}
-
-		// comment edition
-		op, err := b.EditCommentRaw(
-			editor,
-			edit.CreatedAt.Unix(),
-			target,
-			cleanText,
-			map[string]string{
-				metaKeyGithubId: parseId(edit.Id),
-			},
-		)
-
-		if err != nil {
-			return err
-		}
-
-		gi.out <- core.NewImportCommentEdition(op.Id())
-		return nil
+	// add comment operation
+	op, err := b.AddCommentRaw(
+		author,
+		comment.CreatedAt.Unix(),
+		cleanText,
+		nil,
+		map[string]string{
+			metaKeyGithubId:  parseId(comment.Id),
+			metaKeyGithubUrl: comment.Url.String(),
+		},
+	)
+	if err != nil {
+		return err
 	}
+
+	gi.out <- core.NewImportComment(op.Id())
 	return nil
 }
 

bridge/github/import_mediator.go 🔗

@@ -4,13 +4,13 @@ import (
 	"context"
 	"fmt"
 	"strings"
-	"sync"
 	"time"
 
 	"github.com/shurcooL/githubv4"
 )
 
-const ( // These values influence how fast the github graphql rate limit is exhausted.
+const (
+	// These values influence how fast the github graphql rate limit is exhausted.
 	NUM_ISSUES         = 40
 	NUM_ISSUE_EDITS    = 100
 	NUM_TIMELINE_ITEMS = 100
@@ -34,76 +34,68 @@ type importMediator struct {
 	// given date should be imported.
 	since time.Time
 
-	// Issues is a channel holding bundles of Issues to be imported. Each issueEvent
-	// is either a message (type messageEvent) or a struct holding all the data associated with
-	// one issue (type issueData).
-	Issues chan issueEvent
+	// importEvents holds events representing issues, comments, edits, ...
+	// In this channel issues are immediately followed by their issue edits and comments are
+	// immediately followed by their comment edits.
+	importEvents chan ImportEvent
 
 	// Sticky error
 	err error
-
-	// errMut is a mutex to synchronize access to the sticky error variable err.
-	errMut sync.Mutex
 }
 
-type issueEvent interface {
-	isIssueEvent()
-}
-type timelineEvent interface {
-	isTimelineEvent()
-}
-type userContentEditEvent interface {
-	isUserContentEditEvent()
+type ImportEvent interface {
+	isImportEvent()
 }
 
-type messageEvent struct {
+type MessageEvent struct {
 	msg string
 }
 
-func (messageEvent) isIssueEvent()           {}
-func (messageEvent) isUserContentEditEvent() {}
-func (messageEvent) isTimelineEvent()        {}
+func (MessageEvent) isImportEvent() {}
 
-type issueData struct {
+type IssueEvent struct {
 	issue
-	issueEdits    <-chan userContentEditEvent
-	timelineItems <-chan timelineEvent
 }
 
-func (issueData) isIssueEvent() {}
+func (IssueEvent) isImportEvent() {}
+
+type IssueEditEvent struct {
+	issueId githubv4.ID
+	userContentEdit
+}
+
+func (IssueEditEvent) isImportEvent() {}
 
-type timelineData struct {
+type TimelineEvent struct {
+	issueId githubv4.ID
 	timelineItem
-	userContentEdits <-chan userContentEditEvent
 }
 
-func (timelineData) isTimelineEvent() {}
+func (TimelineEvent) isImportEvent() {}
 
-type userContentEditData struct {
+type CommentEditEvent struct {
+	commentId githubv4.ID
 	userContentEdit
 }
 
-// func (userContentEditData) isEvent()
-func (userContentEditData) isUserContentEditEvent() {}
+func (CommentEditEvent) isImportEvent() {}
 
-func (mm *importMediator) setError(err error) {
-	mm.errMut.Lock()
-	mm.err = err
-	mm.errMut.Unlock()
+func (mm *importMediator) NextImportEvent() ImportEvent {
+	return <-mm.importEvents
 }
 
 func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
 	mm := importMediator{
-		gc:      client,
-		owner:   owner,
-		project: project,
-		since:   since,
-		Issues:  make(chan issueEvent, CHAN_CAPACITY),
-		err:     nil,
+		gc:           client,
+		owner:        owner,
+		project:      project,
+		since:        since,
+		importEvents: make(chan ImportEvent, CHAN_CAPACITY),
+		err:          nil,
 	}
 	go func() {
-		mm.fillIssues(ctx)
-		close(mm.Issues)
+		mm.fillImportEvents(ctx)
+		close(mm.importEvents)
 	}()
 	return &mm
 }
@@ -146,95 +138,42 @@ func newCommentEditVars() varmap {
 }
 
 func (mm *importMediator) Error() error {
-	mm.errMut.Lock()
-	err := mm.err
-	mm.errMut.Unlock()
-	return err
+	return mm.err
 }
 
 func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
 	query := userQuery{}
 	vars := varmap{"login": githubv4.String(loginName)}
-	// handle message events localy
-	channel := make(chan messageEvent)
-	defer close(channel)
-	// print all messages immediately
-	go func() {
-		for event := range channel {
-			fmt.Println(event.msg)
-		}
-	}()
-	if err := mm.mQuery(ctx, &query, vars, channel); err != nil {
+	if err := mm.mQuery(ctx, &query, vars); err != nil {
 		return nil, err
 	}
 	return &query.User, nil
 }
 
-func (mm *importMediator) fillIssues(ctx context.Context) {
-	// First: setup message handling while submitting GraphQL queries.
-	msgs := make(chan messageEvent)
-	defer close(msgs)
-	// forward all the messages to the issue channel. The message will be queued after
-	// all the issues which has been completed.
-	go func() {
-		for msg := range msgs {
-			select {
-			case <-ctx.Done():
-				return
-			case mm.Issues <- msg:
-			}
-		}
-	}()
-	// start processing the actual issues
+func (mm *importMediator) fillImportEvents(ctx context.Context) {
 	initialCursor := githubv4.String("")
-	issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs)
+	issues, hasIssues := mm.queryIssue(ctx, initialCursor)
 	for hasIssues {
 		for _, node := range issues.Nodes {
-			// We need to send an issue-bundle over the issue channel before we start
-			// filling the issue edits and the timeline items to avoid deadlocks.
-			issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
-			timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY)
 			select {
 			case <-ctx.Done():
 				return
-			case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}:
+			case mm.importEvents <- IssueEvent{node.issue}:
 			}
 
-			// We do not know whether the client reads from the issue edit channel
-			// or the timeline channel first. Since the capacity of any channel is limited
-			// any send operation may block. Hence, in order to avoid deadlocks we need
-			// to send over both these channels concurrently.
-			go func(node issueNode) {
-				mm.fillIssueEdits(ctx, &node, issueEditChan)
-				close(issueEditChan)
-			}(node)
-			go func(node issueNode) {
-				mm.fillTimeline(ctx, &node, timelineBundleChan)
-				close(timelineBundleChan)
-			}(node)
+			// issue edit events follow the issue event
+			mm.fillIssueEditEvents(ctx, &node)
+			// last come the timeline events
+			mm.fillTimelineEvents(ctx, &node)
 		}
 		if !issues.PageInfo.HasNextPage {
 			break
 		}
-		issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs)
+		issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
 	}
 }
 
-func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) {
-	// First: setup message handling while submitting GraphQL queries.
-	msgs := make(chan messageEvent)
-	defer close(msgs)
-	// forward all the messages to the issue-edit channel. The message will be queued after
-	// all the issue edits which have been completed.
-	go func() {
-		for msg := range msgs {
-			select {
-			case <-ctx.Done():
-				return
-			case channel <- msg:
-			}
-		}
-	}()
+func (mm *importMediator) fillIssueEditEvents(ctx context.Context, issueNode *issueNode) {
 	edits := &issueNode.UserContentEdits
 	hasEdits := true
 	for hasEdits {
@@ -248,87 +187,86 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo
 			select {
 			case <-ctx.Done():
 				return
-			case channel <- userContentEditData{edit}:
+			case mm.importEvents <- IssueEditEvent{issueId: issueNode.issue.Id, userContentEdit: edit}:
 			}
 		}
 		if !edits.PageInfo.HasPreviousPage {
 			break
 		}
-		edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs)
+		edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
 	}
 }
 
-func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) {
-	// First: setup message handling while submitting GraphQL queries.
-	msgs := make(chan messageEvent)
-	defer close(msgs)
-	// forward all the messages to the timeline channel. The message will be queued after
-	// all the timeline items which have been completed.
-	go func() {
-		for msg := range msgs {
-			select {
-			case <-ctx.Done():
-				return
-			case channel <- msg:
-			}
-		}
-	}()
+func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+	vars := newIssueEditVars()
+	vars["gqlNodeId"] = nid
+	if cursor == "" {
+		vars["issueEditBefore"] = (*githubv4.String)(nil)
+	} else {
+		vars["issueEditBefore"] = cursor
+	}
+	query := issueEditQuery{}
+	if err := mm.mQuery(ctx, &query, vars); err != nil {
+		mm.err = err
+		return nil, false
+	}
+	connection := &query.Node.Issue.UserContentEdits
+	if len(connection.Nodes) <= 0 {
+		return nil, false
+	}
+	return connection, true
+}
+
+func (mm *importMediator) fillTimelineEvents(ctx context.Context, issueNode *issueNode) {
 	items := &issueNode.TimelineItems
 	hasItems := true
 	for hasItems {
 		for _, item := range items.Nodes {
+			select {
+			case <-ctx.Done():
+				return
+			case mm.importEvents <- TimelineEvent{issueId: issueNode.issue.Id, timelineItem: item}:
+			}
 			if item.Typename == "IssueComment" {
 				// Issue comments are different than other timeline items in that
 				// they may have associated user content edits.
-				//
-				// Send over the timeline-channel before starting to fill the comment
-				// edits.
-				commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
-				select {
-				case <-ctx.Done():
-					return
-				case channel <- timelineData{item, commentEditChan}:
-				}
-				// We need to create a new goroutine for filling the comment edit
-				// channel.
-				go func(item timelineItem) {
-					mm.fillCommentEdits(ctx, &item, commentEditChan)
-					close(commentEditChan)
-				}(item)
-			} else {
-				select {
-				case <-ctx.Done():
-					return
-				case channel <- timelineData{item, nil}:
-				}
+				// Right after the comment we send the comment edits.
+				mm.fillCommentEdits(ctx, &item)
 			}
 		}
 		if !items.PageInfo.HasNextPage {
 			break
 		}
-		items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs)
+		items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
+	}
+}
+
+func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
+	vars := newTimelineVars()
+	vars["gqlNodeId"] = nid
+	if cursor == "" {
+		vars["timelineAfter"] = (*githubv4.String)(nil)
+	} else {
+		vars["timelineAfter"] = cursor
 	}
+	query := timelineQuery{}
+	if err := mm.mQuery(ctx, &query, vars); err != nil {
+		mm.err = err
+		return nil, false
+	}
+	connection := &query.Node.Issue.TimelineItems
+	if len(connection.Nodes) <= 0 {
+		return nil, false
+	}
+	return connection, true
 }
 
-func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) {
+func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem) {
 	// Here we are only concerned with timeline items of type issueComment.
 	if item.Typename != "IssueComment" {
 		return
 	}
 	// First: setup message handling while submitting GraphQL queries.
-	msgs := make(chan messageEvent)
-	defer close(msgs)
-	// forward all the messages to the user content edit channel. The message will be queued after
-	// all the user content edits which have been completed already.
-	go func() {
-		for msg := range msgs {
-			select {
-			case <-ctx.Done():
-				return
-			case channel <- msg:
-			}
-		}
-	}()
 	comment := &item.IssueComment
 	edits := &comment.UserContentEdits
 	hasEdits := true
@@ -343,17 +281,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt
 			select {
 			case <-ctx.Done():
 				return
-			case channel <- userContentEditData{edit}:
+			case mm.importEvents <- CommentEditEvent{commentId: comment.Id, userContentEdit: edit}:
 			}
 		}
 		if !edits.PageInfo.HasPreviousPage {
 			break
 		}
-		edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs)
+		edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
 	}
 }
 
-func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
+func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
 	vars := newCommentEditVars()
 	vars["gqlNodeId"] = nid
 	if cursor == "" {
@@ -362,8 +300,8 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
 		vars["commentEditBefore"] = cursor
 	}
 	query := commentEditQuery{}
-	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
-		mm.setError(err)
+	if err := mm.mQuery(ctx, &query, vars); err != nil {
+		mm.err = err
 		return nil, false
 	}
 	connection := &query.Node.IssueComment.UserContentEdits
@@ -373,47 +311,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
 	return connection, true
 }
 
-func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) {
-	vars := newTimelineVars()
-	vars["gqlNodeId"] = nid
-	if cursor == "" {
-		vars["timelineAfter"] = (*githubv4.String)(nil)
-	} else {
-		vars["timelineAfter"] = cursor
-	}
-	query := timelineQuery{}
-	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
-		mm.setError(err)
-		return nil, false
-	}
-	connection := &query.Node.Issue.TimelineItems
-	if len(connection.Nodes) <= 0 {
-		return nil, false
-	}
-	return connection, true
-}
-
-func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
-	vars := newIssueEditVars()
-	vars["gqlNodeId"] = nid
-	if cursor == "" {
-		vars["issueEditBefore"] = (*githubv4.String)(nil)
-	} else {
-		vars["issueEditBefore"] = cursor
-	}
-	query := issueEditQuery{}
-	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
-		mm.setError(err)
-		return nil, false
-	}
-	connection := &query.Node.Issue.UserContentEdits
-	if len(connection.Nodes) <= 0 {
-		return nil, false
-	}
-	return connection, true
-}
-
-func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) {
+func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
 	vars := newIssueVars(mm.owner, mm.project, mm.since)
 	if cursor == "" {
 		vars["issueAfter"] = (*githubv4.String)(nil)
@@ -421,8 +319,8 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
 		vars["issueAfter"] = githubv4.String(cursor)
 	}
 	query := issueQuery{}
-	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
-		mm.setError(err)
+	if err := mm.mQuery(ctx, &query, vars); err != nil {
+		mm.err = err
 		return nil, false
 	}
 	connection := &query.Repository.Issues
@@ -443,29 +341,26 @@ func reverse(eds []userContentEdit) chan userContentEdit {
 	return ret
 }
 
-type rateLimiter interface {
-	rateLimit() rateLimit
-}
-
 // mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
 // and it is used to populate the response into it. It should be a pointer to a struct that
 // corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
 // there is a Github rate limiting error, then the function sleeps and retries after the rate limit
 // is expired. If there is another error, then the method will retry before giving up.
-func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
-	if err := mm.queryOnce(ctx, query, vars, msgs); err == nil {
+func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
+	if err := mm.queryOnce(ctx, query, vars); err == nil {
 		// success: done
 		return nil
 	}
 	// failure: we will retry
-	// To retry is important for importing projects with a big number of issues.
+	// To retry is important for importing projects with a big number of issues, because
+	// there may be temporary network errors or momentary internal errors of the github servers.
 	retries := 3
 	var err error
 	for i := 0; i < retries; i++ {
 		// wait a few seconds before retry
 		sleepTime := 8 * (i + 1)
 		time.Sleep(time.Duration(sleepTime) * time.Second)
-		err = mm.queryOnce(ctx, query, vars, msgs)
+		err = mm.queryOnce(ctx, query, vars)
 		if err == nil {
 			// success: done
 			return nil
@@ -474,7 +369,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma
 	return err
 }
 
-func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
+func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
 	// first: just send the query to the graphql api
 	vars["dryRun"] = githubv4.Boolean(false)
 	qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
@@ -507,7 +402,7 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars
 		select {
 		case <-ctx.Done():
 			return ctx.Err()
-		case msgs <- messageEvent{msg}:
+		case mm.importEvents <- MessageEvent{msg}:
 		}
 		timer := time.NewTimer(time.Until(resetTime))
 		select {

bridge/github/import_query.go 🔗

@@ -2,6 +2,19 @@ package github
 
 import "github.com/shurcooL/githubv4"
 
+type rateLimit struct {
+	Cost      githubv4.Int
+	Limit     githubv4.Int
+	NodeCount githubv4.Int
+	Remaining githubv4.Int
+	ResetAt   githubv4.DateTime
+	Used      githubv4.Int
+}
+
+type rateLimiter interface {
+	rateLimit() rateLimit
+}
+
 type userQuery struct {
 	RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"`
 	User      user      `graphql:"user(login: $login)"`
@@ -211,12 +224,3 @@ type pageInfo struct {
 	StartCursor     githubv4.String
 	HasPreviousPage bool
 }
-
-type rateLimit struct {
-	Cost      githubv4.Int
-	Limit     githubv4.Int
-	NodeCount githubv4.Int
-	Remaining githubv4.Int
-	ResetAt   githubv4.DateTime
-	Used      githubv4.Int
-}