Github bridge: send message to user when waiting

Alexander Scharinger created

When the Github GraphQL API rate limit is exhausted print a message at
the bottom of the terminal so the user knows why the import has been
paused.

Change summary

bridge/github/import.go          |  81 +++++++++++--
bridge/github/import_mediator.go | 191 +++++++++++++++++++++++++--------
2 files changed, 209 insertions(+), 63 deletions(-)

Detailed changes

bridge/github/import.go 🔗

@@ -56,10 +56,21 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
 		defer close(gi.out)
 
 		// Loop over all matching issues
-		for bundle := range gi.mediator.Issues() {
-			issue := bundle.issue
-			issueEdits := bundle.issueEdits
-			timelineBundles := bundle.timelineBundles
+		for event := range gi.mediator.Issues {
+			var issue issue
+			var issueEdits <-chan userContentEditEvent
+			var timelineItems <-chan timelineEvent
+			switch e := event.(type) {
+			case messageEvent:
+				fmt.Println(e.msg)
+				continue
+			case issueData:
+				issue = e.issue
+				issueEdits = e.issueEdits
+				timelineItems = e.timelineItems
+			default:
+				panic(fmt.Sprint("Unknown event type"))
+			}
 			// create issue
 			b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits)
 			if err != nil {
@@ -69,9 +80,19 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
 			}
 
 			// loop over timeline items
-			for bundle := range timelineBundles {
-				item := bundle.timelineItem
-				edits := bundle.userContentEdits
+			for event := range timelineItems {
+				var item timelineItem
+				var edits <-chan userContentEditEvent
+				switch e := event.(type) {
+				case messageEvent:
+					fmt.Println(e.msg)
+					continue
+				case timelineData:
+					item = e.timelineItem
+					edits = e.userContentEdits
+				default:
+					panic(fmt.Sprint("Unknown event type"))
+				}
 				err := gi.ensureTimelineItem(ctx, repo, b, &item, edits)
 				if err != nil {
 					err = fmt.Errorf("timeline item creation: %v", err)
@@ -98,7 +119,27 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
 	return out, nil
 }
 
-func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdits <-chan userContentEdit) (*cache.BugCache, error) {
+// getNextUserContentEdit reads the input channel, handles messages, and returns the next
+// userContentEditData.
+func getNextUserContentEdit(in <-chan userContentEditEvent) (*userContentEditData, bool) {
+	for {
+		event, hasEvent := <-in
+		if !hasEvent {
+			return nil, false
+		}
+		switch e := event.(type) {
+		case messageEvent:
+			fmt.Println(e.msg)
+			continue
+		case userContentEditData:
+			return &e, true
+		default:
+			panic(fmt.Sprint("Unknown event type"))
+		}
+	}
+}
+
+func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEditEvents <-chan userContentEditEvent) (*cache.BugCache, error) {
 	author, err := gi.ensurePerson(ctx, repo, issue.Author)
 	if err != nil {
 		return nil, err
@@ -115,7 +156,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
 
 	// get first issue edit
 	// if it exists, then it holds the bug creation
-	firstEdit, hasEdit := <-issueEdits
+	firstEdit, hasEdit := getNextUserContentEdit(issueEditEvents)
 
 	// At Github there exist issues with seemingly empty titles. An example is
 	// https://github.com/NixOS/nixpkgs/issues/72730 .
@@ -162,7 +203,11 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
 		return nil, fmt.Errorf("finding or creating issue")
 	}
 	// process remaining issue edits, if they exist
-	for edit := range issueEdits {
+	for {
+		edit, hasEdit := getNextUserContentEdit(issueEditEvents)
+		if !hasEdit {
+			break
+		}
 		// other edits will be added as CommentEdit operations
 		target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id))
 		if err == cache.ErrNoMatchingOp {
@@ -174,7 +219,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
 			return nil, err
 		}
 
-		err = gi.ensureCommentEdit(ctx, repo, b, target, &edit)
+		err = gi.ensureCommentEdit(ctx, repo, b, target, &edit.userContentEdit)
 		if err != nil {
 			return nil, err
 		}
@@ -182,7 +227,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
 	return b, nil
 }
 
-func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEdit) error {
+func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEditEvent) error {
 
 	switch item.Typename {
 	case "IssueComment":
@@ -345,7 +390,7 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re
 	return nil
 }
 
-func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEdits <-chan userContentEdit) error {
+func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEditEvents <-chan userContentEditEvent) error {
 	author, err := gi.ensurePerson(ctx, repo, comment.Author)
 	if err != nil {
 		return err
@@ -356,7 +401,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac
 		// real error
 		return err
 	}
-	firstEdit, hasEdit := <-commentEdits
+	firstEdit, hasEdit := getNextUserContentEdit(commentEditEvents)
 	if err == cache.ErrNoMatchingOp {
 		var textInput string
 		if hasEdit {
@@ -393,14 +438,18 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac
 		return fmt.Errorf("finding or creating issue comment")
 	}
 	// process remaining comment edits, if they exist
-	for edit := range commentEdits {
+	for {
+		edit, hasEdit := getNextUserContentEdit(commentEditEvents)
+		if !hasEdit {
+			break
+		}
 		// ensure editor identity
 		_, err := gi.ensurePerson(ctx, repo, edit.Editor)
 		if err != nil {
 			return err
 		}
 
-		err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit)
+		err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit.userContentEdit)
 		if err != nil {
 			return err
 		}

bridge/github/import_mediator.go 🔗

@@ -34,9 +34,10 @@ type importMediator struct {
 	// given date should be imported.
 	since time.Time
 
-	// issues is a channel holding bundles of issues to be imported. Each bundle holds the data
-	// associated with one issue.
-	issues chan issueBundle
+	// Issues is a channel holding bundles of Issues to be imported. Each issueEvent
+	// is either a message (type messageEvent) or a struct holding all the data associated with
+	// one issue (type issueData).
+	Issues chan issueEvent
 
 	// Sticky error
 	err error
@@ -45,17 +46,46 @@ type importMediator struct {
 	errMut sync.Mutex
 }
 
-type issueBundle struct {
-	issue           issue
-	issueEdits      <-chan userContentEdit
-	timelineBundles <-chan timelineBundle
+type issueEvent interface {
+	isIssueEvent()
+}
+type timelineEvent interface {
+	isTimelineEvent()
+}
+type userContentEditEvent interface {
+	isUserContentEditEvent()
 }
 
-type timelineBundle struct {
-	timelineItem     timelineItem
-	userContentEdits <-chan userContentEdit
+type messageEvent struct {
+	msg string
 }
 
+func (messageEvent) isIssueEvent()           {}
+func (messageEvent) isUserContentEditEvent() {}
+func (messageEvent) isTimelineEvent()        {}
+
+type issueData struct {
+	issue
+	issueEdits    <-chan userContentEditEvent
+	timelineItems <-chan timelineEvent
+}
+
+func (issueData) isIssueEvent() {}
+
+type timelineData struct {
+	timelineItem
+	userContentEdits <-chan userContentEditEvent
+}
+
+func (timelineData) isTimelineEvent() {}
+
+type userContentEditData struct {
+	userContentEdit
+}
+
+// func (userContentEditData) isEvent()
+func (userContentEditData) isUserContentEditEvent() {}
+
 func (mm *importMediator) setError(err error) {
 	mm.errMut.Lock()
 	mm.err = err
@@ -68,12 +98,12 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj
 		owner:   owner,
 		project: project,
 		since:   since,
-		issues:  make(chan issueBundle, CHAN_CAPACITY),
+		Issues:  make(chan issueEvent, CHAN_CAPACITY),
 		err:     nil,
 	}
 	go func() {
 		mm.fillIssues(ctx)
-		close(mm.issues)
+		close(mm.Issues)
 	}()
 	return &mm
 }
@@ -115,10 +145,6 @@ func newCommentEditVars() varmap {
 	}
 }
 
-func (mm *importMediator) Issues() <-chan issueBundle {
-	return mm.issues
-}
-
 func (mm *importMediator) Error() error {
 	mm.errMut.Lock()
 	err := mm.err
@@ -129,25 +155,49 @@ func (mm *importMediator) Error() error {
 func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
 	query := userQuery{}
 	vars := varmap{"login": githubv4.String(loginName)}
-	if err := mm.mQuery(ctx, &query, vars); err != nil {
+	// handle message events localy
+	channel := make(chan messageEvent)
+	defer close(channel)
+	// print all messages immediately
+	go func() {
+		for event := range channel {
+			fmt.Println(event.msg)
+		}
+	}()
+	if err := mm.mQuery(ctx, &query, vars, channel); err != nil {
 		return nil, err
 	}
 	return &query.User, nil
 }
 
 func (mm *importMediator) fillIssues(ctx context.Context) {
+	// First: setup message handling while submitting GraphQL queries.
+	msgs := make(chan messageEvent)
+	defer close(msgs)
+	// forward all the messages to the issue channel. The message will be queued after
+	// all the issues which has been completed.
+	go func() {
+		for msg := range msgs {
+			select {
+			case <-ctx.Done():
+				return
+			case mm.Issues <- msg:
+			}
+		}
+	}()
+	// start processing the actual issues
 	initialCursor := githubv4.String("")
-	issues, hasIssues := mm.queryIssue(ctx, initialCursor)
+	issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs)
 	for hasIssues {
 		for _, node := range issues.Nodes {
 			// We need to send an issue-bundle over the issue channel before we start
 			// filling the issue edits and the timeline items to avoid deadlocks.
-			issueEditChan := make(chan userContentEdit, CHAN_CAPACITY)
-			timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY)
+			issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
+			timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY)
 			select {
 			case <-ctx.Done():
 				return
-			case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}:
+			case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}:
 			}
 
 			// We do not know whether the client reads from the issue edit channel
@@ -166,11 +216,25 @@ func (mm *importMediator) fillIssues(ctx context.Context) {
 		if !issues.PageInfo.HasNextPage {
 			break
 		}
-		issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
+		issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs)
 	}
 }
 
-func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) {
+func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) {
+	// First: setup message handling while submitting GraphQL queries.
+	msgs := make(chan messageEvent)
+	defer close(msgs)
+	// forward all the messages to the issue-edit channel. The message will be queued after
+	// all the issue edits which have been completed.
+	go func() {
+		for msg := range msgs {
+			select {
+			case <-ctx.Done():
+				return
+			case channel <- msg:
+			}
+		}
+	}()
 	edits := &issueNode.UserContentEdits
 	hasEdits := true
 	for hasEdits {
@@ -184,17 +248,31 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo
 			select {
 			case <-ctx.Done():
 				return
-			case channel <- edit:
+			case channel <- userContentEditData{edit}:
 			}
 		}
 		if !edits.PageInfo.HasPreviousPage {
 			break
 		}
-		edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
+		edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs)
 	}
 }
 
-func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) {
+func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) {
+	// First: setup message handling while submitting GraphQL queries.
+	msgs := make(chan messageEvent)
+	defer close(msgs)
+	// forward all the messages to the timeline channel. The message will be queued after
+	// all the timeline items which have been completed.
+	go func() {
+		for msg := range msgs {
+			select {
+			case <-ctx.Done():
+				return
+			case channel <- msg:
+			}
+		}
+	}()
 	items := &issueNode.TimelineItems
 	hasItems := true
 	for hasItems {
@@ -205,11 +283,11 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode
 				//
 				// Send over the timeline-channel before starting to fill the comment
 				// edits.
-				commentEditChan := make(chan userContentEdit, CHAN_CAPACITY)
+				commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
 				select {
 				case <-ctx.Done():
 					return
-				case channel <- timelineBundle{item, commentEditChan}:
+				case channel <- timelineData{item, commentEditChan}:
 				}
 				// We need to create a new goroutine for filling the comment edit
 				// channel.
@@ -221,22 +299,36 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode
 				select {
 				case <-ctx.Done():
 					return
-				case channel <- timelineBundle{item, nil}:
+				case channel <- timelineData{item, nil}:
 				}
 			}
 		}
 		if !items.PageInfo.HasNextPage {
 			break
 		}
-		items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
+		items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs)
 	}
 }
 
-func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) {
+func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) {
 	// Here we are only concerned with timeline items of type issueComment.
 	if item.Typename != "IssueComment" {
 		return
 	}
+	// First: setup message handling while submitting GraphQL queries.
+	msgs := make(chan messageEvent)
+	defer close(msgs)
+	// forward all the messages to the user content edit channel. The message will be queued after
+	// all the user content edits which have been completed already.
+	go func() {
+		for msg := range msgs {
+			select {
+			case <-ctx.Done():
+				return
+			case channel <- msg:
+			}
+		}
+	}()
 	comment := &item.IssueComment
 	edits := &comment.UserContentEdits
 	hasEdits := true
@@ -251,17 +343,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt
 			select {
 			case <-ctx.Done():
 				return
-			case channel <- edit:
+			case channel <- userContentEditData{edit}:
 			}
 		}
 		if !edits.PageInfo.HasPreviousPage {
 			break
 		}
-		edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
+		edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs)
 	}
 }
 
-func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
 	vars := newCommentEditVars()
 	vars["gqlNodeId"] = nid
 	if cursor == "" {
@@ -270,7 +362,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
 		vars["commentEditBefore"] = cursor
 	}
 	query := commentEditQuery{}
-	if err := mm.mQuery(ctx, &query, vars); err != nil {
+	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
 		mm.setError(err)
 		return nil, false
 	}
@@ -281,7 +373,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
 	return connection, true
 }
 
-func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
+func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) {
 	vars := newTimelineVars()
 	vars["gqlNodeId"] = nid
 	if cursor == "" {
@@ -290,7 +382,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
 		vars["timelineAfter"] = cursor
 	}
 	query := timelineQuery{}
-	if err := mm.mQuery(ctx, &query, vars); err != nil {
+	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
 		mm.setError(err)
 		return nil, false
 	}
@@ -301,7 +393,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
 	return connection, true
 }
 
-func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
 	vars := newIssueEditVars()
 	vars["gqlNodeId"] = nid
 	if cursor == "" {
@@ -310,7 +402,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
 		vars["issueEditBefore"] = cursor
 	}
 	query := issueEditQuery{}
-	if err := mm.mQuery(ctx, &query, vars); err != nil {
+	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
 		mm.setError(err)
 		return nil, false
 	}
@@ -321,7 +413,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
 	return connection, true
 }
 
-func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
+func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) {
 	vars := newIssueVars(mm.owner, mm.project, mm.since)
 	if cursor == "" {
 		vars["issueAfter"] = (*githubv4.String)(nil)
@@ -329,7 +421,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
 		vars["issueAfter"] = githubv4.String(cursor)
 	}
 	query := issueQuery{}
-	if err := mm.mQuery(ctx, &query, vars); err != nil {
+	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
 		mm.setError(err)
 		return nil, false
 	}
@@ -360,20 +452,20 @@ type rateLimiter interface {
 // corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
 // there is a Github rate limiting error, then the function sleeps and retries after the rate limit
 // is expired. If there is another error, then the method will retry before giving up.
-func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
-	if err := mm.queryOnce(ctx, query, vars); err == nil {
+func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
+	if err := mm.queryOnce(ctx, query, vars, msgs); err == nil {
 		// success: done
 		return nil
 	}
 	// failure: we will retry
-	// This is important for importing projects with a big number of issues.
+	// To retry is important for importing projects with a big number of issues.
 	retries := 3
 	var err error
 	for i := 0; i < retries; i++ {
 		// wait a few seconds before retry
 		sleepTime := 8 * (i + 1)
 		time.Sleep(time.Duration(sleepTime) * time.Second)
-		err = mm.queryOnce(ctx, query, vars)
+		err = mm.queryOnce(ctx, query, vars, msgs)
 		if err == nil {
 			// success: done
 			return nil
@@ -382,7 +474,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma
 	return err
 }
 
-func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
+func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
 	// first: just send the query to the graphql api
 	vars["dryRun"] = githubv4.Boolean(false)
 	qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
@@ -411,7 +503,12 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars
 		resetTime := rateLimit.ResetAt.Time
 		// Add a few seconds (8) for good measure
 		resetTime = resetTime.Add(8 * time.Second)
-		fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String())
+		msg := fmt.Sprintf("Github GraphQL API rate limit exhausted. Sleeping until %s", resetTime.String())
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case msgs <- messageEvent{msg}:
+		}
 		timer := time.NewTimer(time.Until(resetTime))
 		select {
 		case <-ctx.Done():