Merge pull request #813 from MichaelMure/fix-data-race2

Michael Muré created

Github: fix data race

Change summary

bridge/github/client.go          | 106 +++++++++++--------------
bridge/github/export.go          |  15 ---
bridge/github/import_events.go   |  40 ++++++++++
bridge/github/import_mediator.go | 134 ++++++++++++++-------------------
4 files changed, 144 insertions(+), 151 deletions(-)

Detailed changes

bridge/github/client.go 🔗

@@ -7,8 +7,9 @@ import (
 	"strings"
 	"time"
 
-	"github.com/MichaelMure/git-bug/bridge/core"
 	"github.com/shurcooL/githubv4"
+
+	"github.com/MichaelMure/git-bug/bridge/core"
 )
 
 var _ Client = &githubv4.Client{}
@@ -29,79 +30,69 @@ func newRateLimitHandlerClient(httpClient *http.Client) *rateLimitHandlerClient
 	return &rateLimitHandlerClient{sc: githubv4.NewClient(httpClient)}
 }
 
-type RateLimitingEvent struct {
-	msg string
-}
-
-// mutate calls the github api with a graphql mutation and for each rate limiting event it sends an
-// export result.
+// mutate calls the github api with a graphql mutation and sends a core.ExportResult for each rate limiting event
 func (c *rateLimitHandlerClient) mutate(ctx context.Context, m interface{}, input githubv4.Input, vars map[string]interface{}, out chan<- core.ExportResult) error {
 	// prepare a closure for the mutation
 	mutFun := func(ctx context.Context) error {
 		return c.sc.Mutate(ctx, m, input, vars)
 	}
-	limitEvents := make(chan RateLimitingEvent)
-	defer close(limitEvents)
-	go func() {
-		for e := range limitEvents {
-			select {
-			case <-ctx.Done():
-				return
-			case out <- core.NewExportRateLimiting(e.msg):
-			}
+	callback := func(msg string) {
+		select {
+		case <-ctx.Done():
+		case out <- core.NewExportRateLimiting(msg):
 		}
-	}()
-	return c.callAPIAndRetry(mutFun, ctx, limitEvents)
+	}
+	return c.callAPIAndRetry(ctx, mutFun, callback)
 }
 
-// queryWithLimitEvents calls the github api with a graphql query and it sends rate limiting events
-// to a given channel of type RateLimitingEvent.
-func (c *rateLimitHandlerClient) queryWithLimitEvents(ctx context.Context, query interface{}, vars map[string]interface{}, limitEvents chan<- RateLimitingEvent) error {
-	// prepare a closure fot the query
+// queryImport calls the github api with a graphql query, and sends an ImportEvent for each rate limiting event
+func (c *rateLimitHandlerClient) queryImport(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error {
+	// prepare a closure for the query
 	queryFun := func(ctx context.Context) error {
 		return c.sc.Query(ctx, query, vars)
 	}
-	return c.callAPIAndRetry(queryFun, ctx, limitEvents)
+	callback := func(msg string) {
+		select {
+		case <-ctx.Done():
+		case importEvents <- RateLimitingEvent{msg}:
+		}
+	}
+	return c.callAPIAndRetry(ctx, queryFun, callback)
 }
 
-// queryWithImportEvents calls the github api with a graphql query and it sends rate limiting events
-// to a given channel of type ImportEvent.
-func (c *rateLimitHandlerClient) queryWithImportEvents(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error {
-	// forward rate limiting events to channel of import events
-	limitEvents := make(chan RateLimitingEvent)
-	defer close(limitEvents)
-	go func() {
-		for e := range limitEvents {
-			select {
-			case <-ctx.Done():
-				return
-			case importEvents <- e:
-			}
+// queryImport calls the github api with a graphql query, and sends a core.ExportResult for each rate limiting event
+func (c *rateLimitHandlerClient) queryExport(ctx context.Context, query interface{}, vars map[string]interface{}, out chan<- core.ExportResult) error {
+	// prepare a closure for the query
+	queryFun := func(ctx context.Context) error {
+		return c.sc.Query(ctx, query, vars)
+	}
+	callback := func(msg string) {
+		select {
+		case <-ctx.Done():
+		case out <- core.NewExportRateLimiting(msg):
 		}
-	}()
-	return c.queryWithLimitEvents(ctx, query, vars, limitEvents)
+	}
+	return c.callAPIAndRetry(ctx, queryFun, callback)
 }
 
-// queryPrintMsgs calls the github api with a graphql query and it prints for ever rate limiting
-// event a message to stdout.
+// queryPrintMsgs calls the github api with a graphql query, and prints a message to stdout for every rate limiting event .
 func (c *rateLimitHandlerClient) queryPrintMsgs(ctx context.Context, query interface{}, vars map[string]interface{}) error {
-	// print rate limiting events directly to stdout.
-	limitEvents := make(chan RateLimitingEvent)
-	defer close(limitEvents)
-	go func() {
-		for e := range limitEvents {
-			fmt.Println(e.msg)
-		}
-	}()
-	return c.queryWithLimitEvents(ctx, query, vars, limitEvents)
+	// prepare a closure for the query
+	queryFun := func(ctx context.Context) error {
+		return c.sc.Query(ctx, query, vars)
+	}
+	callback := func(msg string) {
+		fmt.Println(msg)
+	}
+	return c.callAPIAndRetry(ctx, queryFun, callback)
 }
 
 // callAPIAndRetry calls the Github GraphQL API (inderectely through callAPIDealWithLimit) and in
 // case of error it repeats the request to the Github API. The parameter `apiCall` is intended to be
 // a closure containing a query or a mutation to the Github GraphQL API.
-func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error {
+func (c *rateLimitHandlerClient) callAPIAndRetry(ctx context.Context, apiCall func(context.Context) error, rateLimitEvent func(msg string)) error {
 	var err error
-	if err = c.callAPIDealWithLimit(apiCall, ctx, events); err == nil {
+	if err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent); err == nil {
 		return nil
 	}
 	// failure; the reason may be temporary network problems or internal errors
@@ -117,7 +108,7 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e
 			stop(timer)
 			return ctx.Err()
 		case <-timer.C:
-			err = c.callAPIDealWithLimit(apiCall, ctx, events)
+			err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent)
 			if err == nil {
 				return nil
 			}
@@ -127,10 +118,10 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e
 }
 
 // callAPIDealWithLimit calls the Github GraphQL API and if the Github API returns a rate limiting
-// error, then it waits until the rate limit is reset and it repeats the request to the API. The
+// error, then it waits until the rate limit is reset, and it repeats the request to the API. The
 // parameter `apiCall` is intended to be a closure containing a query or a mutation to the Github
 // GraphQL API.
-func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error {
+func (c *rateLimitHandlerClient) callAPIDealWithLimit(ctx context.Context, apiCall func(context.Context) error, rateLimitCallback func(msg string)) error {
 	qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
 	defer cancel()
 	// call the function fun()
@@ -155,11 +146,8 @@ func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Conte
 			resetTime.String(),
 		)
 		// Send message about rate limiting event.
-		select {
-		case <-ctx.Done():
-			return ctx.Err()
-		case events <- RateLimitingEvent{msg}:
-		}
+		rateLimitCallback(msg)
+
 		// Pause current goroutine
 		timer := time.NewTimer(time.Until(resetTime))
 		select {

bridge/github/export.go 🔗

@@ -486,23 +486,10 @@ func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *rateLimitHa
 	}
 
 	q := labelsQuery{}
-	// When performing the queries we have to forward rate limiting events to the
-	// current channel of export results.
-	events := make(chan RateLimitingEvent)
-	defer close(events)
-	go func() {
-		for e := range events {
-			select {
-			case <-ctx.Done():
-				return
-			case ge.out <- core.NewExportRateLimiting(e.msg):
-			}
-		}
-	}()
 
 	hasNextPage := true
 	for hasNextPage {
-		if err := gc.queryWithLimitEvents(ctx, &q, variables, events); err != nil {
+		if err := gc.queryExport(ctx, &q, variables, ge.out); err != nil {
 			return err
 		}
 

bridge/github/import_events.go 🔗

@@ -0,0 +1,40 @@
+package github
+
+import "github.com/shurcooL/githubv4"
+
+type ImportEvent interface {
+	isImportEvent()
+}
+
+type RateLimitingEvent struct {
+	msg string
+}
+
+func (RateLimitingEvent) isImportEvent() {}
+
+type IssueEvent struct {
+	issue
+}
+
+func (IssueEvent) isImportEvent() {}
+
+type IssueEditEvent struct {
+	issueId githubv4.ID
+	userContentEdit
+}
+
+func (IssueEditEvent) isImportEvent() {}
+
+type TimelineEvent struct {
+	issueId githubv4.ID
+	timelineItem
+}
+
+func (TimelineEvent) isImportEvent() {}
+
+type CommentEditEvent struct {
+	commentId githubv4.ID
+	userContentEdit
+}
+
+func (CommentEditEvent) isImportEvent() {}

bridge/github/import_mediator.go 🔗

@@ -9,6 +9,7 @@ import (
 
 const (
 	// These values influence how fast the github graphql rate limit is exhausted.
+
 	NumIssues        = 40
 	NumIssueEdits    = 100
 	NumTimelineItems = 100
@@ -41,43 +42,6 @@ type importMediator struct {
 	err error
 }
 
-type ImportEvent interface {
-	isImportEvent()
-}
-
-func (RateLimitingEvent) isImportEvent() {}
-
-type IssueEvent struct {
-	issue
-}
-
-func (IssueEvent) isImportEvent() {}
-
-type IssueEditEvent struct {
-	issueId githubv4.ID
-	userContentEdit
-}
-
-func (IssueEditEvent) isImportEvent() {}
-
-type TimelineEvent struct {
-	issueId githubv4.ID
-	timelineItem
-}
-
-func (TimelineEvent) isImportEvent() {}
-
-type CommentEditEvent struct {
-	commentId githubv4.ID
-	userContentEdit
-}
-
-func (CommentEditEvent) isImportEvent() {}
-
-func (mm *importMediator) NextImportEvent() ImportEvent {
-	return <-mm.importEvents
-}
-
 func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owner, project string, since time.Time) *importMediator {
 	mm := importMediator{
 		gh:           client,
@@ -87,48 +51,24 @@ func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owne
 		importEvents: make(chan ImportEvent, ChanCapacity),
 		err:          nil,
 	}
-	go func() {
-		mm.fillImportEvents(ctx)
-		close(mm.importEvents)
-	}()
-	return &mm
-}
 
-type varmap map[string]interface{}
+	go mm.start(ctx)
 
-func newIssueVars(owner, project string, since time.Time) varmap {
-	return varmap{
-		"owner":             githubv4.String(owner),
-		"name":              githubv4.String(project),
-		"issueSince":        githubv4.DateTime{Time: since},
-		"issueFirst":        githubv4.Int(NumIssues),
-		"issueEditLast":     githubv4.Int(NumIssueEdits),
-		"issueEditBefore":   (*githubv4.String)(nil),
-		"timelineFirst":     githubv4.Int(NumTimelineItems),
-		"timelineAfter":     (*githubv4.String)(nil),
-		"commentEditLast":   githubv4.Int(NumCommentEdits),
-		"commentEditBefore": (*githubv4.String)(nil),
-	}
-}
-
-func newIssueEditVars() varmap {
-	return varmap{
-		"issueEditLast": githubv4.Int(NumIssueEdits),
-	}
+	return &mm
 }
 
-func newTimelineVars() varmap {
-	return varmap{
-		"timelineFirst":     githubv4.Int(NumTimelineItems),
-		"commentEditLast":   githubv4.Int(NumCommentEdits),
-		"commentEditBefore": (*githubv4.String)(nil),
-	}
+func (mm *importMediator) start(ctx context.Context) {
+	ctx, cancel := context.WithCancel(ctx)
+	mm.fillImportEvents(ctx)
+	// Make sure we cancel everything when we are done, instead of relying on the parent context
+	// This should unblock pending send to the channel if the capacity was reached and avoid a panic/race when closing.
+	cancel()
+	close(mm.importEvents)
 }
 
-func newCommentEditVars() varmap {
-	return varmap{
-		"commentEditLast": githubv4.Int(NumCommentEdits),
-	}
+// NextImportEvent returns the next ImportEvent, or nil if done.
+func (mm *importMediator) NextImportEvent() ImportEvent {
+	return <-mm.importEvents
 }
 
 func (mm *importMediator) Error() error {
@@ -138,7 +78,7 @@ func (mm *importMediator) Error() error {
 func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
 	query := userQuery{}
 	vars := varmap{"login": githubv4.String(loginName)}
-	if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+	if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
 		return nil, err
 	}
 	return &query.User, nil
@@ -200,7 +140,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
 		vars["issueEditBefore"] = cursor
 	}
 	query := issueEditQuery{}
-	if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+	if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
 		mm.err = err
 		return nil, false
 	}
@@ -244,7 +184,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
 		vars["timelineAfter"] = cursor
 	}
 	query := timelineQuery{}
-	if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+	if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
 		mm.err = err
 		return nil, false
 	}
@@ -294,7 +234,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
 		vars["commentEditBefore"] = cursor
 	}
 	query := commentEditQuery{}
-	if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+	if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
 		mm.err = err
 		return nil, false
 	}
@@ -313,7 +253,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
 		vars["issueAfter"] = cursor
 	}
 	query := issueQuery{}
-	if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+	if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
 		mm.err = err
 		return nil, false
 	}
@@ -334,3 +274,41 @@ func reverse(eds []userContentEdit) chan userContentEdit {
 	}()
 	return ret
 }
+
+// varmap is a container for Github API's pagination variables
+type varmap map[string]interface{}
+
+func newIssueVars(owner, project string, since time.Time) varmap {
+	return varmap{
+		"owner":             githubv4.String(owner),
+		"name":              githubv4.String(project),
+		"issueSince":        githubv4.DateTime{Time: since},
+		"issueFirst":        githubv4.Int(NumIssues),
+		"issueEditLast":     githubv4.Int(NumIssueEdits),
+		"issueEditBefore":   (*githubv4.String)(nil),
+		"timelineFirst":     githubv4.Int(NumTimelineItems),
+		"timelineAfter":     (*githubv4.String)(nil),
+		"commentEditLast":   githubv4.Int(NumCommentEdits),
+		"commentEditBefore": (*githubv4.String)(nil),
+	}
+}
+
+func newIssueEditVars() varmap {
+	return varmap{
+		"issueEditLast": githubv4.Int(NumIssueEdits),
+	}
+}
+
+func newTimelineVars() varmap {
+	return varmap{
+		"timelineFirst":     githubv4.Int(NumTimelineItems),
+		"commentEditLast":   githubv4.Int(NumCommentEdits),
+		"commentEditBefore": (*githubv4.String)(nil),
+	}
+}
+
+func newCommentEditVars() varmap {
+	return varmap{
+		"commentEditLast": githubv4.Int(NumCommentEdits),
+	}
+}