import_mediator.go

  1package github
  2
  3import (
  4	"context"
  5	"fmt"
  6	"strings"
  7	"sync"
  8	"time"
  9
 10	"github.com/shurcooL/githubv4"
 11)
 12
 13const ( // These values influence how fast the github graphql rate limit is exhausted.
 14	NUM_ISSUES         = 40
 15	NUM_ISSUE_EDITS    = 100
 16	NUM_TIMELINE_ITEMS = 100
 17	NUM_COMMENT_EDITS  = 100
 18
 19	CHAN_CAPACITY = 128
 20)
 21
 22// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
 23type importMediator struct {
 24	// Github graphql client
 25	gc *githubv4.Client
 26
 27	// name of the repository owner on Github
 28	owner string
 29
 30	// name of the Github repository
 31	project string
 32
 33	// since specifies which issues to import. Issues that have been updated at or after the
 34	// given date should be imported.
 35	since time.Time
 36
 37	// Issues is a channel holding bundles of Issues to be imported. Each issueEvent
 38	// is either a message (type messageEvent) or a struct holding all the data associated with
 39	// one issue (type issueData).
 40	Issues chan issueEvent
 41
 42	// Sticky error
 43	err error
 44
 45	// errMut is a mutex to synchronize access to the sticky error variable err.
 46	errMut sync.Mutex
 47}
 48
 49type issueEvent interface {
 50	isIssueEvent()
 51}
 52type timelineEvent interface {
 53	isTimelineEvent()
 54}
 55type userContentEditEvent interface {
 56	isUserContentEditEvent()
 57}
 58
 59type messageEvent struct {
 60	msg string
 61}
 62
 63func (messageEvent) isIssueEvent()           {}
 64func (messageEvent) isUserContentEditEvent() {}
 65func (messageEvent) isTimelineEvent()        {}
 66
 67type issueData struct {
 68	issue
 69	issueEdits    <-chan userContentEditEvent
 70	timelineItems <-chan timelineEvent
 71}
 72
 73func (issueData) isIssueEvent() {}
 74
 75type timelineData struct {
 76	timelineItem
 77	userContentEdits <-chan userContentEditEvent
 78}
 79
 80func (timelineData) isTimelineEvent() {}
 81
 82type userContentEditData struct {
 83	userContentEdit
 84}
 85
 86// func (userContentEditData) isEvent()
 87func (userContentEditData) isUserContentEditEvent() {}
 88
 89func (mm *importMediator) setError(err error) {
 90	mm.errMut.Lock()
 91	mm.err = err
 92	mm.errMut.Unlock()
 93}
 94
 95func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
 96	mm := importMediator{
 97		gc:      client,
 98		owner:   owner,
 99		project: project,
100		since:   since,
101		Issues:  make(chan issueEvent, CHAN_CAPACITY),
102		err:     nil,
103	}
104	go func() {
105		mm.fillIssues(ctx)
106		close(mm.Issues)
107	}()
108	return &mm
109}
110
111type varmap map[string]interface{}
112
113func newIssueVars(owner, project string, since time.Time) varmap {
114	return varmap{
115		"owner":             githubv4.String(owner),
116		"name":              githubv4.String(project),
117		"issueSince":        githubv4.DateTime{Time: since},
118		"issueFirst":        githubv4.Int(NUM_ISSUES),
119		"issueEditLast":     githubv4.Int(NUM_ISSUE_EDITS),
120		"issueEditBefore":   (*githubv4.String)(nil),
121		"timelineFirst":     githubv4.Int(NUM_TIMELINE_ITEMS),
122		"timelineAfter":     (*githubv4.String)(nil),
123		"commentEditLast":   githubv4.Int(NUM_COMMENT_EDITS),
124		"commentEditBefore": (*githubv4.String)(nil),
125	}
126}
127
128func newIssueEditVars() varmap {
129	return varmap{
130		"issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
131	}
132}
133
134func newTimelineVars() varmap {
135	return varmap{
136		"timelineFirst":     githubv4.Int(NUM_TIMELINE_ITEMS),
137		"commentEditLast":   githubv4.Int(NUM_COMMENT_EDITS),
138		"commentEditBefore": (*githubv4.String)(nil),
139	}
140}
141
142func newCommentEditVars() varmap {
143	return varmap{
144		"commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
145	}
146}
147
148func (mm *importMediator) Error() error {
149	mm.errMut.Lock()
150	err := mm.err
151	mm.errMut.Unlock()
152	return err
153}
154
155func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
156	query := userQuery{}
157	vars := varmap{"login": githubv4.String(loginName)}
158	// handle message events localy
159	channel := make(chan messageEvent)
160	defer close(channel)
161	// print all messages immediately
162	go func() {
163		for event := range channel {
164			fmt.Println(event.msg)
165		}
166	}()
167	if err := mm.mQuery(ctx, &query, vars, channel); err != nil {
168		return nil, err
169	}
170	return &query.User, nil
171}
172
173func (mm *importMediator) fillIssues(ctx context.Context) {
174	// First: setup message handling while submitting GraphQL queries.
175	msgs := make(chan messageEvent)
176	defer close(msgs)
177	// forward all the messages to the issue channel. The message will be queued after
178	// all the issues which has been completed.
179	go func() {
180		for msg := range msgs {
181			select {
182			case <-ctx.Done():
183				return
184			case mm.Issues <- msg:
185			}
186		}
187	}()
188	// start processing the actual issues
189	initialCursor := githubv4.String("")
190	issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs)
191	for hasIssues {
192		for _, node := range issues.Nodes {
193			// We need to send an issue-bundle over the issue channel before we start
194			// filling the issue edits and the timeline items to avoid deadlocks.
195			issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
196			timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY)
197			select {
198			case <-ctx.Done():
199				return
200			case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}:
201			}
202
203			// We do not know whether the client reads from the issue edit channel
204			// or the timeline channel first. Since the capacity of any channel is limited
205			// any send operation may block. Hence, in order to avoid deadlocks we need
206			// to send over both these channels concurrently.
207			go func(node issueNode) {
208				mm.fillIssueEdits(ctx, &node, issueEditChan)
209				close(issueEditChan)
210			}(node)
211			go func(node issueNode) {
212				mm.fillTimeline(ctx, &node, timelineBundleChan)
213				close(timelineBundleChan)
214			}(node)
215		}
216		if !issues.PageInfo.HasNextPage {
217			break
218		}
219		issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs)
220	}
221}
222
223func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) {
224	// First: setup message handling while submitting GraphQL queries.
225	msgs := make(chan messageEvent)
226	defer close(msgs)
227	// forward all the messages to the issue-edit channel. The message will be queued after
228	// all the issue edits which have been completed.
229	go func() {
230		for msg := range msgs {
231			select {
232			case <-ctx.Done():
233				return
234			case channel <- msg:
235			}
236		}
237	}()
238	edits := &issueNode.UserContentEdits
239	hasEdits := true
240	for hasEdits {
241		for edit := range reverse(edits.Nodes) {
242			if edit.Diff == nil || string(*edit.Diff) == "" {
243				// issueEdit.Diff == nil happen if the event is older than early
244				// 2018, Github doesn't have the data before that. Best we can do is
245				// to ignore the event.
246				continue
247			}
248			select {
249			case <-ctx.Done():
250				return
251			case channel <- userContentEditData{edit}:
252			}
253		}
254		if !edits.PageInfo.HasPreviousPage {
255			break
256		}
257		edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs)
258	}
259}
260
261func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) {
262	// First: setup message handling while submitting GraphQL queries.
263	msgs := make(chan messageEvent)
264	defer close(msgs)
265	// forward all the messages to the timeline channel. The message will be queued after
266	// all the timeline items which have been completed.
267	go func() {
268		for msg := range msgs {
269			select {
270			case <-ctx.Done():
271				return
272			case channel <- msg:
273			}
274		}
275	}()
276	items := &issueNode.TimelineItems
277	hasItems := true
278	for hasItems {
279		for _, item := range items.Nodes {
280			if item.Typename == "IssueComment" {
281				// Issue comments are different than other timeline items in that
282				// they may have associated user content edits.
283				//
284				// Send over the timeline-channel before starting to fill the comment
285				// edits.
286				commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
287				select {
288				case <-ctx.Done():
289					return
290				case channel <- timelineData{item, commentEditChan}:
291				}
292				// We need to create a new goroutine for filling the comment edit
293				// channel.
294				go func(item timelineItem) {
295					mm.fillCommentEdits(ctx, &item, commentEditChan)
296					close(commentEditChan)
297				}(item)
298			} else {
299				select {
300				case <-ctx.Done():
301					return
302				case channel <- timelineData{item, nil}:
303				}
304			}
305		}
306		if !items.PageInfo.HasNextPage {
307			break
308		}
309		items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs)
310	}
311}
312
313func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) {
314	// Here we are only concerned with timeline items of type issueComment.
315	if item.Typename != "IssueComment" {
316		return
317	}
318	// First: setup message handling while submitting GraphQL queries.
319	msgs := make(chan messageEvent)
320	defer close(msgs)
321	// forward all the messages to the user content edit channel. The message will be queued after
322	// all the user content edits which have been completed already.
323	go func() {
324		for msg := range msgs {
325			select {
326			case <-ctx.Done():
327				return
328			case channel <- msg:
329			}
330		}
331	}()
332	comment := &item.IssueComment
333	edits := &comment.UserContentEdits
334	hasEdits := true
335	for hasEdits {
336		for edit := range reverse(edits.Nodes) {
337			if edit.Diff == nil || string(*edit.Diff) == "" {
338				// issueEdit.Diff == nil happen if the event is older than early
339				// 2018, Github doesn't have the data before that. Best we can do is
340				// to ignore the event.
341				continue
342			}
343			select {
344			case <-ctx.Done():
345				return
346			case channel <- userContentEditData{edit}:
347			}
348		}
349		if !edits.PageInfo.HasPreviousPage {
350			break
351		}
352		edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs)
353	}
354}
355
356func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
357	vars := newCommentEditVars()
358	vars["gqlNodeId"] = nid
359	if cursor == "" {
360		vars["commentEditBefore"] = (*githubv4.String)(nil)
361	} else {
362		vars["commentEditBefore"] = cursor
363	}
364	query := commentEditQuery{}
365	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
366		mm.setError(err)
367		return nil, false
368	}
369	connection := &query.Node.IssueComment.UserContentEdits
370	if len(connection.Nodes) <= 0 {
371		return nil, false
372	}
373	return connection, true
374}
375
376func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) {
377	vars := newTimelineVars()
378	vars["gqlNodeId"] = nid
379	if cursor == "" {
380		vars["timelineAfter"] = (*githubv4.String)(nil)
381	} else {
382		vars["timelineAfter"] = cursor
383	}
384	query := timelineQuery{}
385	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
386		mm.setError(err)
387		return nil, false
388	}
389	connection := &query.Node.Issue.TimelineItems
390	if len(connection.Nodes) <= 0 {
391		return nil, false
392	}
393	return connection, true
394}
395
396func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
397	vars := newIssueEditVars()
398	vars["gqlNodeId"] = nid
399	if cursor == "" {
400		vars["issueEditBefore"] = (*githubv4.String)(nil)
401	} else {
402		vars["issueEditBefore"] = cursor
403	}
404	query := issueEditQuery{}
405	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
406		mm.setError(err)
407		return nil, false
408	}
409	connection := &query.Node.Issue.UserContentEdits
410	if len(connection.Nodes) <= 0 {
411		return nil, false
412	}
413	return connection, true
414}
415
416func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) {
417	vars := newIssueVars(mm.owner, mm.project, mm.since)
418	if cursor == "" {
419		vars["issueAfter"] = (*githubv4.String)(nil)
420	} else {
421		vars["issueAfter"] = githubv4.String(cursor)
422	}
423	query := issueQuery{}
424	if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
425		mm.setError(err)
426		return nil, false
427	}
428	connection := &query.Repository.Issues
429	if len(connection.Nodes) <= 0 {
430		return nil, false
431	}
432	return connection, true
433}
434
435func reverse(eds []userContentEdit) chan userContentEdit {
436	ret := make(chan userContentEdit)
437	go func() {
438		for i := range eds {
439			ret <- eds[len(eds)-1-i]
440		}
441		close(ret)
442	}()
443	return ret
444}
445
446type rateLimiter interface {
447	rateLimit() rateLimit
448}
449
450// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
451// and it is used to populate the response into it. It should be a pointer to a struct that
452// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
453// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
454// is expired. If there is another error, then the method will retry before giving up.
455func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
456	if err := mm.queryOnce(ctx, query, vars, msgs); err == nil {
457		// success: done
458		return nil
459	}
460	// failure: we will retry
461	// To retry is important for importing projects with a big number of issues.
462	retries := 3
463	var err error
464	for i := 0; i < retries; i++ {
465		// wait a few seconds before retry
466		sleepTime := 8 * (i + 1)
467		time.Sleep(time.Duration(sleepTime) * time.Second)
468		err = mm.queryOnce(ctx, query, vars, msgs)
469		if err == nil {
470			// success: done
471			return nil
472		}
473	}
474	return err
475}
476
477func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
478	// first: just send the query to the graphql api
479	vars["dryRun"] = githubv4.Boolean(false)
480	qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
481	defer cancel()
482	err := mm.gc.Query(qctx, query, vars)
483	if err == nil {
484		// no error: done
485		return nil
486	}
487	// matching the error string
488	if !strings.Contains(err.Error(), "API rate limit exceeded") {
489		// an error, but not the API rate limit error: done
490		return err
491	}
492	// a rate limit error
493	// ask the graphql api for rate limiting information
494	vars["dryRun"] = githubv4.Boolean(true)
495	qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
496	defer cancel()
497	if err := mm.gc.Query(qctx, query, vars); err != nil {
498		return err
499	}
500	rateLimit := query.rateLimit()
501	if rateLimit.Cost > rateLimit.Remaining {
502		// sleep
503		resetTime := rateLimit.ResetAt.Time
504		// Add a few seconds (8) for good measure
505		resetTime = resetTime.Add(8 * time.Second)
506		msg := fmt.Sprintf("Github GraphQL API rate limit exhausted. Sleeping until %s", resetTime.String())
507		select {
508		case <-ctx.Done():
509			return ctx.Err()
510		case msgs <- messageEvent{msg}:
511		}
512		timer := time.NewTimer(time.Until(resetTime))
513		select {
514		case <-ctx.Done():
515			stop(timer)
516			return ctx.Err()
517		case <-timer.C:
518		}
519	}
520	// run the original query again
521	vars["dryRun"] = githubv4.Boolean(false)
522	qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
523	defer cancel()
524	err = mm.gc.Query(qctx, query, vars)
525	return err // might be nil
526}
527
528func stop(t *time.Timer) {
529	if !t.Stop() {
530		select {
531		case <-t.C:
532		default:
533		}
534	}
535}