import_mediator.go

  1package github
  2
  3import (
  4	"context"
  5	"fmt"
  6	"strings"
  7	"sync"
  8	"time"
  9
 10	"github.com/shurcooL/githubv4"
 11)
 12
 13const ( // These values influence how fast the github graphql rate limit is exhausted.
 14	NUM_ISSUES         = 40
 15	NUM_ISSUE_EDITS    = 100
 16	NUM_TIMELINE_ITEMS = 100
 17	NUM_COMMENT_EDITS  = 100
 18
 19	CHAN_CAPACITY = 128
 20)
 21
 22// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
 23type importMediator struct {
 24	// Github graphql client
 25	gc *githubv4.Client
 26
 27	// name of the repository owner on Github
 28	owner string
 29
 30	// name of the Github repository
 31	project string
 32
 33	// since specifies which issues to import. Issues that have been updated at or after the
 34	// given date should be imported.
 35	since time.Time
 36
 37	// issues is a channel holding bundles of issues to be imported. Each bundle holds the data
 38	// associated with one issue.
 39	issues chan issueBundle
 40
 41	// Sticky error
 42	err error
 43
 44	// errMut is a mutex to synchronize access to the sticky error variable err.
 45	errMut sync.Mutex
 46}
 47
 48type issueBundle struct {
 49	issue           issue
 50	issueEdits      <-chan userContentEdit
 51	timelineBundles <-chan timelineBundle
 52}
 53
 54type timelineBundle struct {
 55	timelineItem     timelineItem
 56	userContentEdits <-chan userContentEdit
 57}
 58
 59func (mm *importMediator) setError(err error) {
 60	mm.errMut.Lock()
 61	mm.err = err
 62	mm.errMut.Unlock()
 63}
 64
 65func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
 66	mm := importMediator{
 67		gc:      client,
 68		owner:   owner,
 69		project: project,
 70		since:   since,
 71		issues:  make(chan issueBundle, CHAN_CAPACITY),
 72		err:     nil,
 73	}
 74	go func() {
 75		mm.fillIssues(ctx)
 76		close(mm.issues)
 77	}()
 78	return &mm
 79}
 80
 81type varmap map[string]interface{}
 82
 83func newIssueVars(owner, project string, since time.Time) varmap {
 84	return varmap{
 85		"owner":             githubv4.String(owner),
 86		"name":              githubv4.String(project),
 87		"issueSince":        githubv4.DateTime{Time: since},
 88		"issueFirst":        githubv4.Int(NUM_ISSUES),
 89		"issueEditLast":     githubv4.Int(NUM_ISSUE_EDITS),
 90		"issueEditBefore":   (*githubv4.String)(nil),
 91		"timelineFirst":     githubv4.Int(NUM_TIMELINE_ITEMS),
 92		"timelineAfter":     (*githubv4.String)(nil),
 93		"commentEditLast":   githubv4.Int(NUM_COMMENT_EDITS),
 94		"commentEditBefore": (*githubv4.String)(nil),
 95	}
 96}
 97
 98func newIssueEditVars() varmap {
 99	return varmap{
100		"issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
101	}
102}
103
104func newTimelineVars() varmap {
105	return varmap{
106		"timelineFirst":     githubv4.Int(NUM_TIMELINE_ITEMS),
107		"commentEditLast":   githubv4.Int(NUM_COMMENT_EDITS),
108		"commentEditBefore": (*githubv4.String)(nil),
109	}
110}
111
112func newCommentEditVars() varmap {
113	return varmap{
114		"commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
115	}
116}
117
118func (mm *importMediator) Issues() <-chan issueBundle {
119	return mm.issues
120}
121
122func (mm *importMediator) Error() error {
123	mm.errMut.Lock()
124	err := mm.err
125	mm.errMut.Unlock()
126	return err
127}
128
129func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
130	query := userQuery{}
131	vars := varmap{"login": githubv4.String(loginName)}
132	if err := mm.mQuery(ctx, &query, vars); err != nil {
133		return nil, err
134	}
135	return &query.User, nil
136}
137
138func (mm *importMediator) fillIssues(ctx context.Context) {
139	initialCursor := githubv4.String("")
140	issues, hasIssues := mm.queryIssue(ctx, initialCursor)
141	for hasIssues {
142		for _, node := range issues.Nodes {
143			// We need to send an issue-bundle over the issue channel before we start
144			// filling the issue edits and the timeline items to avoid deadlocks.
145			issueEditChan := make(chan userContentEdit, CHAN_CAPACITY)
146			timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY)
147			select {
148			case <-ctx.Done():
149				return
150			case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}:
151			}
152
153			// We do not know whether the client reads from the issue edit channel
154			// or the timeline channel first. Since the capacity of any channel is limited
155			// any send operation may block. Hence, in order to avoid deadlocks we need
156			// to send over both these channels concurrently.
157			go func(node issueNode) {
158				mm.fillIssueEdits(ctx, &node, issueEditChan)
159				close(issueEditChan)
160			}(node)
161			go func(node issueNode) {
162				mm.fillTimeline(ctx, &node, timelineBundleChan)
163				close(timelineBundleChan)
164			}(node)
165		}
166		if !issues.PageInfo.HasNextPage {
167			break
168		}
169		issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
170	}
171}
172
173func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) {
174	edits := &issueNode.UserContentEdits
175	hasEdits := true
176	for hasEdits {
177		for edit := range reverse(edits.Nodes) {
178			if edit.Diff == nil || string(*edit.Diff) == "" {
179				// issueEdit.Diff == nil happen if the event is older than early
180				// 2018, Github doesn't have the data before that. Best we can do is
181				// to ignore the event.
182				continue
183			}
184			select {
185			case <-ctx.Done():
186				return
187			case channel <- edit:
188			}
189		}
190		if !edits.PageInfo.HasPreviousPage {
191			break
192		}
193		edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
194	}
195}
196
197func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) {
198	items := &issueNode.TimelineItems
199	hasItems := true
200	for hasItems {
201		for _, item := range items.Nodes {
202			if item.Typename == "IssueComment" {
203				// Issue comments are different than other timeline items in that
204				// they may have associated user content edits.
205				//
206				// Send over the timeline-channel before starting to fill the comment
207				// edits.
208				commentEditChan := make(chan userContentEdit, CHAN_CAPACITY)
209				select {
210				case <-ctx.Done():
211					return
212				case channel <- timelineBundle{item, commentEditChan}:
213				}
214				// We need to create a new goroutine for filling the comment edit
215				// channel.
216				go func(item timelineItem) {
217					mm.fillCommentEdits(ctx, &item, commentEditChan)
218					close(commentEditChan)
219				}(item)
220			} else {
221				select {
222				case <-ctx.Done():
223					return
224				case channel <- timelineBundle{item, nil}:
225				}
226			}
227		}
228		if !items.PageInfo.HasNextPage {
229			break
230		}
231		items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
232	}
233}
234
235func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) {
236	// Here we are only concerned with timeline items of type issueComment.
237	if item.Typename != "IssueComment" {
238		return
239	}
240	comment := &item.IssueComment
241	edits := &comment.UserContentEdits
242	hasEdits := true
243	for hasEdits {
244		for edit := range reverse(edits.Nodes) {
245			if edit.Diff == nil || string(*edit.Diff) == "" {
246				// issueEdit.Diff == nil happen if the event is older than early
247				// 2018, Github doesn't have the data before that. Best we can do is
248				// to ignore the event.
249				continue
250			}
251			select {
252			case <-ctx.Done():
253				return
254			case channel <- edit:
255			}
256		}
257		if !edits.PageInfo.HasPreviousPage {
258			break
259		}
260		edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
261	}
262}
263
264func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
265	vars := newCommentEditVars()
266	vars["gqlNodeId"] = nid
267	if cursor == "" {
268		vars["commentEditBefore"] = (*githubv4.String)(nil)
269	} else {
270		vars["commentEditBefore"] = cursor
271	}
272	query := commentEditQuery{}
273	if err := mm.mQuery(ctx, &query, vars); err != nil {
274		mm.setError(err)
275		return nil, false
276	}
277	connection := &query.Node.IssueComment.UserContentEdits
278	if len(connection.Nodes) <= 0 {
279		return nil, false
280	}
281	return connection, true
282}
283
284func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
285	vars := newTimelineVars()
286	vars["gqlNodeId"] = nid
287	if cursor == "" {
288		vars["timelineAfter"] = (*githubv4.String)(nil)
289	} else {
290		vars["timelineAfter"] = cursor
291	}
292	query := timelineQuery{}
293	if err := mm.mQuery(ctx, &query, vars); err != nil {
294		mm.setError(err)
295		return nil, false
296	}
297	connection := &query.Node.Issue.TimelineItems
298	if len(connection.Nodes) <= 0 {
299		return nil, false
300	}
301	return connection, true
302}
303
304func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
305	vars := newIssueEditVars()
306	vars["gqlNodeId"] = nid
307	if cursor == "" {
308		vars["issueEditBefore"] = (*githubv4.String)(nil)
309	} else {
310		vars["issueEditBefore"] = cursor
311	}
312	query := issueEditQuery{}
313	if err := mm.mQuery(ctx, &query, vars); err != nil {
314		mm.setError(err)
315		return nil, false
316	}
317	connection := &query.Node.Issue.UserContentEdits
318	if len(connection.Nodes) <= 0 {
319		return nil, false
320	}
321	return connection, true
322}
323
324func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
325	vars := newIssueVars(mm.owner, mm.project, mm.since)
326	if cursor == "" {
327		vars["issueAfter"] = (*githubv4.String)(nil)
328	} else {
329		vars["issueAfter"] = githubv4.String(cursor)
330	}
331	query := issueQuery{}
332	if err := mm.mQuery(ctx, &query, vars); err != nil {
333		mm.setError(err)
334		return nil, false
335	}
336	connection := &query.Repository.Issues
337	if len(connection.Nodes) <= 0 {
338		return nil, false
339	}
340	return connection, true
341}
342
343func reverse(eds []userContentEdit) chan userContentEdit {
344	ret := make(chan userContentEdit)
345	go func() {
346		for i := range eds {
347			ret <- eds[len(eds)-1-i]
348		}
349		close(ret)
350	}()
351	return ret
352}
353
354type rateLimiter interface {
355	rateLimit() rateLimit
356}
357
358// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
359// and it is used to populate the response into it. It should be a pointer to a struct that
360// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
361// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
362// is expired.
363func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
364	// first: just send the query to the graphql api
365	vars["dryRun"] = githubv4.Boolean(false)
366	qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
367	defer cancel()
368	err := mm.gc.Query(qctx, query, vars)
369	if err == nil {
370		// no error: done
371		return nil
372	}
373	// matching the error string
374	if !strings.Contains(err.Error(), "API rate limit exceeded") {
375		// an error, but not the API rate limit error: done
376		return err
377	}
378	// a rate limit error
379	// ask the graphql api for rate limiting information
380	vars["dryRun"] = githubv4.Boolean(true)
381	qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
382	defer cancel()
383	if err := mm.gc.Query(qctx, query, vars); err != nil {
384		return err
385	}
386	rateLimit := query.rateLimit()
387	if rateLimit.Cost > rateLimit.Remaining {
388		// sleep
389		resetTime := rateLimit.ResetAt.Time
390		// Add a few seconds (8) for good measure
391		resetTime = resetTime.Add(8 * time.Second)
392		fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String())
393		timer := time.NewTimer(time.Until(resetTime))
394		select {
395		case <-ctx.Done():
396			stop(timer)
397			return ctx.Err()
398		case <-timer.C:
399		}
400	}
401	// run the original query again
402	vars["dryRun"] = githubv4.Boolean(false)
403	qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
404	defer cancel()
405	err = mm.gc.Query(qctx, query, vars)
406	return err // might be nil
407}
408
409func stop(t *time.Timer) {
410	if !t.Stop() {
411		select {
412		case <-t.C:
413		default:
414		}
415	}
416}