import_mediator.go

  1package github
  2
  3import (
  4	"context"
  5	"fmt"
  6	"strings"
  7	"time"
  8
  9	"github.com/shurcooL/githubv4"
 10)
 11
 12const (
 13	// These values influence how fast the github graphql rate limit is exhausted.
 14	NUM_ISSUES         = 40
 15	NUM_ISSUE_EDITS    = 100
 16	NUM_TIMELINE_ITEMS = 100
 17	NUM_COMMENT_EDITS  = 100
 18
 19	CHAN_CAPACITY = 128
 20)
 21
 22// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
 23type importMediator struct {
 24	// Github graphql client
 25	gc *githubv4.Client
 26
 27	// name of the repository owner on Github
 28	owner string
 29
 30	// name of the Github repository
 31	project string
 32
 33	// since specifies which issues to import. Issues that have been updated at or after the
 34	// given date should be imported.
 35	since time.Time
 36
 37	// importEvents holds events representing issues, comments, edits, ...
 38	// In this channel issues are immediately followed by their issue edits and comments are
 39	// immediately followed by their comment edits.
 40	importEvents chan ImportEvent
 41
 42	// Sticky error
 43	err error
 44}
 45
 46type ImportEvent interface {
 47	isImportEvent()
 48}
 49
 50type MessageEvent struct {
 51	msg string
 52}
 53
 54func (MessageEvent) isImportEvent() {}
 55
 56type IssueEvent struct {
 57	issue
 58}
 59
 60func (IssueEvent) isImportEvent() {}
 61
 62type IssueEditEvent struct {
 63	issueId githubv4.ID
 64	userContentEdit
 65}
 66
 67func (IssueEditEvent) isImportEvent() {}
 68
 69type TimelineEvent struct {
 70	issueId githubv4.ID
 71	timelineItem
 72}
 73
 74func (TimelineEvent) isImportEvent() {}
 75
 76type CommentEditEvent struct {
 77	commentId githubv4.ID
 78	userContentEdit
 79}
 80
 81func (CommentEditEvent) isImportEvent() {}
 82
 83func (mm *importMediator) NextImportEvent() ImportEvent {
 84	return <-mm.importEvents
 85}
 86
 87func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
 88	mm := importMediator{
 89		gc:           client,
 90		owner:        owner,
 91		project:      project,
 92		since:        since,
 93		importEvents: make(chan ImportEvent, CHAN_CAPACITY),
 94		err:          nil,
 95	}
 96	go func() {
 97		mm.fillImportEvents(ctx)
 98		close(mm.importEvents)
 99	}()
100	return &mm
101}
102
103type varmap map[string]interface{}
104
105func newIssueVars(owner, project string, since time.Time) varmap {
106	return varmap{
107		"owner":             githubv4.String(owner),
108		"name":              githubv4.String(project),
109		"issueSince":        githubv4.DateTime{Time: since},
110		"issueFirst":        githubv4.Int(NUM_ISSUES),
111		"issueEditLast":     githubv4.Int(NUM_ISSUE_EDITS),
112		"issueEditBefore":   (*githubv4.String)(nil),
113		"timelineFirst":     githubv4.Int(NUM_TIMELINE_ITEMS),
114		"timelineAfter":     (*githubv4.String)(nil),
115		"commentEditLast":   githubv4.Int(NUM_COMMENT_EDITS),
116		"commentEditBefore": (*githubv4.String)(nil),
117	}
118}
119
120func newIssueEditVars() varmap {
121	return varmap{
122		"issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
123	}
124}
125
126func newTimelineVars() varmap {
127	return varmap{
128		"timelineFirst":     githubv4.Int(NUM_TIMELINE_ITEMS),
129		"commentEditLast":   githubv4.Int(NUM_COMMENT_EDITS),
130		"commentEditBefore": (*githubv4.String)(nil),
131	}
132}
133
134func newCommentEditVars() varmap {
135	return varmap{
136		"commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
137	}
138}
139
140func (mm *importMediator) Error() error {
141	return mm.err
142}
143
144func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
145	query := userQuery{}
146	vars := varmap{"login": githubv4.String(loginName)}
147	if err := mm.mQuery(ctx, &query, vars); err != nil {
148		return nil, err
149	}
150	return &query.User, nil
151}
152
153func (mm *importMediator) fillImportEvents(ctx context.Context) {
154	initialCursor := githubv4.String("")
155	issues, hasIssues := mm.queryIssue(ctx, initialCursor)
156	for hasIssues {
157		for _, node := range issues.Nodes {
158			select {
159			case <-ctx.Done():
160				return
161			case mm.importEvents <- IssueEvent{node.issue}:
162			}
163
164			// issue edit events follow the issue event
165			mm.fillIssueEditEvents(ctx, &node)
166			// last come the timeline events
167			mm.fillTimelineEvents(ctx, &node)
168		}
169		if !issues.PageInfo.HasNextPage {
170			break
171		}
172		issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
173	}
174}
175
176func (mm *importMediator) fillIssueEditEvents(ctx context.Context, issueNode *issueNode) {
177	edits := &issueNode.UserContentEdits
178	hasEdits := true
179	for hasEdits {
180		for edit := range reverse(edits.Nodes) {
181			if edit.Diff == nil || string(*edit.Diff) == "" {
182				// issueEdit.Diff == nil happen if the event is older than early
183				// 2018, Github doesn't have the data before that. Best we can do is
184				// to ignore the event.
185				continue
186			}
187			select {
188			case <-ctx.Done():
189				return
190			case mm.importEvents <- IssueEditEvent{issueId: issueNode.issue.Id, userContentEdit: edit}:
191			}
192		}
193		if !edits.PageInfo.HasPreviousPage {
194			break
195		}
196		edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
197	}
198}
199
200func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
201	vars := newIssueEditVars()
202	vars["gqlNodeId"] = nid
203	if cursor == "" {
204		vars["issueEditBefore"] = (*githubv4.String)(nil)
205	} else {
206		vars["issueEditBefore"] = cursor
207	}
208	query := issueEditQuery{}
209	if err := mm.mQuery(ctx, &query, vars); err != nil {
210		mm.err = err
211		return nil, false
212	}
213	connection := &query.Node.Issue.UserContentEdits
214	if len(connection.Nodes) <= 0 {
215		return nil, false
216	}
217	return connection, true
218}
219
220func (mm *importMediator) fillTimelineEvents(ctx context.Context, issueNode *issueNode) {
221	items := &issueNode.TimelineItems
222	hasItems := true
223	for hasItems {
224		for _, item := range items.Nodes {
225			select {
226			case <-ctx.Done():
227				return
228			case mm.importEvents <- TimelineEvent{issueId: issueNode.issue.Id, timelineItem: item}:
229			}
230			if item.Typename == "IssueComment" {
231				// Issue comments are different than other timeline items in that
232				// they may have associated user content edits.
233				// Right after the comment we send the comment edits.
234				mm.fillCommentEdits(ctx, &item)
235			}
236		}
237		if !items.PageInfo.HasNextPage {
238			break
239		}
240		items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
241	}
242}
243
244func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
245	vars := newTimelineVars()
246	vars["gqlNodeId"] = nid
247	if cursor == "" {
248		vars["timelineAfter"] = (*githubv4.String)(nil)
249	} else {
250		vars["timelineAfter"] = cursor
251	}
252	query := timelineQuery{}
253	if err := mm.mQuery(ctx, &query, vars); err != nil {
254		mm.err = err
255		return nil, false
256	}
257	connection := &query.Node.Issue.TimelineItems
258	if len(connection.Nodes) <= 0 {
259		return nil, false
260	}
261	return connection, true
262}
263
264func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem) {
265	// Here we are only concerned with timeline items of type issueComment.
266	if item.Typename != "IssueComment" {
267		return
268	}
269	// First: setup message handling while submitting GraphQL queries.
270	comment := &item.IssueComment
271	edits := &comment.UserContentEdits
272	hasEdits := true
273	for hasEdits {
274		for edit := range reverse(edits.Nodes) {
275			if edit.Diff == nil || string(*edit.Diff) == "" {
276				// issueEdit.Diff == nil happen if the event is older than early
277				// 2018, Github doesn't have the data before that. Best we can do is
278				// to ignore the event.
279				continue
280			}
281			select {
282			case <-ctx.Done():
283				return
284			case mm.importEvents <- CommentEditEvent{commentId: comment.Id, userContentEdit: edit}:
285			}
286		}
287		if !edits.PageInfo.HasPreviousPage {
288			break
289		}
290		edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
291	}
292}
293
294func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
295	vars := newCommentEditVars()
296	vars["gqlNodeId"] = nid
297	if cursor == "" {
298		vars["commentEditBefore"] = (*githubv4.String)(nil)
299	} else {
300		vars["commentEditBefore"] = cursor
301	}
302	query := commentEditQuery{}
303	if err := mm.mQuery(ctx, &query, vars); err != nil {
304		mm.err = err
305		return nil, false
306	}
307	connection := &query.Node.IssueComment.UserContentEdits
308	if len(connection.Nodes) <= 0 {
309		return nil, false
310	}
311	return connection, true
312}
313
314func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
315	vars := newIssueVars(mm.owner, mm.project, mm.since)
316	if cursor == "" {
317		vars["issueAfter"] = (*githubv4.String)(nil)
318	} else {
319		vars["issueAfter"] = githubv4.String(cursor)
320	}
321	query := issueQuery{}
322	if err := mm.mQuery(ctx, &query, vars); err != nil {
323		mm.err = err
324		return nil, false
325	}
326	connection := &query.Repository.Issues
327	if len(connection.Nodes) <= 0 {
328		return nil, false
329	}
330	return connection, true
331}
332
333func reverse(eds []userContentEdit) chan userContentEdit {
334	ret := make(chan userContentEdit)
335	go func() {
336		for i := range eds {
337			ret <- eds[len(eds)-1-i]
338		}
339		close(ret)
340	}()
341	return ret
342}
343
344// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
345// and it is used to populate the response into it. It should be a pointer to a struct that
346// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
347// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
348// is expired. If there is another error, then the method will retry before giving up.
349func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
350	if err := mm.queryOnce(ctx, query, vars); err == nil {
351		// success: done
352		return nil
353	}
354	// failure: we will retry
355	// To retry is important for importing projects with a big number of issues, because
356	// there may be temporary network errors or momentary internal errors of the github servers.
357	retries := 3
358	var err error
359	for i := 0; i < retries; i++ {
360		// wait a few seconds before retry
361		sleepTime := 8 * (i + 1)
362		time.Sleep(time.Duration(sleepTime) * time.Second)
363		err = mm.queryOnce(ctx, query, vars)
364		if err == nil {
365			// success: done
366			return nil
367		}
368	}
369	return err
370}
371
372func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
373	// first: just send the query to the graphql api
374	vars["dryRun"] = githubv4.Boolean(false)
375	qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
376	defer cancel()
377	err := mm.gc.Query(qctx, query, vars)
378	if err == nil {
379		// no error: done
380		return nil
381	}
382	// matching the error string
383	if !strings.Contains(err.Error(), "API rate limit exceeded") {
384		// an error, but not the API rate limit error: done
385		return err
386	}
387	// a rate limit error
388	// ask the graphql api for rate limiting information
389	vars["dryRun"] = githubv4.Boolean(true)
390	qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
391	defer cancel()
392	if err := mm.gc.Query(qctx, query, vars); err != nil {
393		return err
394	}
395	rateLimit := query.rateLimit()
396	if rateLimit.Cost > rateLimit.Remaining {
397		// sleep
398		resetTime := rateLimit.ResetAt.Time
399		// Add a few seconds (8) for good measure
400		resetTime = resetTime.Add(8 * time.Second)
401		msg := fmt.Sprintf("Github GraphQL API rate limit exhausted. Sleeping until %s", resetTime.String())
402		select {
403		case <-ctx.Done():
404			return ctx.Err()
405		case mm.importEvents <- MessageEvent{msg}:
406		}
407		timer := time.NewTimer(time.Until(resetTime))
408		select {
409		case <-ctx.Done():
410			stop(timer)
411			return ctx.Err()
412		case <-timer.C:
413		}
414	}
415	// run the original query again
416	vars["dryRun"] = githubv4.Boolean(false)
417	qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
418	defer cancel()
419	err = mm.gc.Query(qctx, query, vars)
420	return err // might be nil
421}
422
423func stop(t *time.Timer) {
424	if !t.Stop() {
425		select {
426		case <-t.C:
427		default:
428		}
429	}
430}