import_mediator.go

  1package github
  2
  3import (
  4	"context"
  5	"fmt"
  6	"strings"
  7	"sync"
  8	"time"
  9
 10	"github.com/shurcooL/githubv4"
 11)
 12
 13const ( // These values influence how fast the github graphql rate limit is exhausted.
 14	NUM_ISSUES         = 40
 15	NUM_ISSUE_EDITS    = 100
 16	NUM_TIMELINE_ITEMS = 100
 17	NUM_COMMENT_EDITS  = 100
 18
 19	CHAN_CAPACITY = 128
 20)
 21
 22type varmap map[string]interface{}
 23
 24// importMediator provides an interface to retrieve Github issues.
 25type importMediator struct {
 26	// Github graphql client
 27	gc *githubv4.Client
 28
 29	// name of the repository owner on Github
 30	owner string
 31
 32	// name of the Github repository
 33	project string
 34
 35	// The importMediator will only query issues updated or created after the date given in
 36	// the variable since.
 37	since time.Time
 38
 39	// channel for the issues
 40	issues chan issue
 41
 42	// channel for issue edits
 43	issueEdits    map[githubv4.ID]chan userContentEdit
 44	issueEditsMut sync.Mutex
 45
 46	// channel for timeline items
 47	timelineItems    map[githubv4.ID]chan timelineItem
 48	timelineItemsMut sync.Mutex
 49
 50	// channel for comment edits
 51	commentEdits    map[githubv4.ID]chan userContentEdit
 52	commentEditsMut sync.Mutex
 53
 54	// Sticky error
 55	err    error
 56	errMut sync.Mutex
 57}
 58
 59func (mm *importMediator) setError(err error) {
 60	mm.errMut.Lock()
 61	mm.err = err
 62	mm.errMut.Unlock()
 63}
 64
 65func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
 66	mm := importMediator{
 67		gc:               client,
 68		owner:            owner,
 69		project:          project,
 70		since:            since,
 71		issues:           make(chan issue, CHAN_CAPACITY),
 72		issueEdits:       make(map[githubv4.ID]chan userContentEdit),
 73		issueEditsMut:    sync.Mutex{},
 74		timelineItems:    make(map[githubv4.ID]chan timelineItem),
 75		timelineItemsMut: sync.Mutex{},
 76		commentEdits:     make(map[githubv4.ID]chan userContentEdit),
 77		commentEditsMut:  sync.Mutex{},
 78		err:              nil,
 79	}
 80	go func() {
 81		mm.fillIssues(ctx)
 82		close(mm.issues)
 83	}()
 84	return &mm
 85}
 86
 87func newIssueVars(owner, project string, since time.Time) varmap {
 88	return varmap{
 89		"owner":             githubv4.String(owner),
 90		"name":              githubv4.String(project),
 91		"issueSince":        githubv4.DateTime{Time: since},
 92		"issueFirst":        githubv4.Int(NUM_ISSUES),
 93		"issueEditLast":     githubv4.Int(NUM_ISSUE_EDITS),
 94		"issueEditBefore":   (*githubv4.String)(nil),
 95		"timelineFirst":     githubv4.Int(NUM_TIMELINE_ITEMS),
 96		"timelineAfter":     (*githubv4.String)(nil),
 97		"commentEditLast":   githubv4.Int(NUM_COMMENT_EDITS),
 98		"commentEditBefore": (*githubv4.String)(nil),
 99	}
100}
101
102func newIssueEditVars() varmap {
103	return varmap{
104		"issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
105	}
106}
107
108func newTimelineVars() varmap {
109	return varmap{
110		"timelineFirst":     githubv4.Int(NUM_TIMELINE_ITEMS),
111		"commentEditLast":   githubv4.Int(NUM_COMMENT_EDITS),
112		"commentEditBefore": (*githubv4.String)(nil),
113	}
114}
115
116func newCommentEditVars() varmap {
117	return varmap{
118		"commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
119	}
120}
121
122func (mm *importMediator) Issues() <-chan issue {
123	return mm.issues
124}
125
126func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit {
127	mm.issueEditsMut.Lock()
128	channel := mm.issueEdits[issue.Id]
129	mm.issueEditsMut.Unlock()
130	return channel
131}
132
133func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem {
134	mm.timelineItemsMut.Lock()
135	channel := mm.timelineItems[issue.Id]
136	mm.timelineItemsMut.Unlock()
137	return channel
138}
139
140func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit {
141	mm.commentEditsMut.Lock()
142	channel := mm.commentEdits[comment.Id]
143	mm.commentEditsMut.Unlock()
144	return channel
145}
146
147func (mm *importMediator) Error() error {
148	mm.errMut.Lock()
149	err := mm.err
150	mm.errMut.Unlock()
151	return err
152}
153
154func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
155	query := userQuery{}
156	vars := varmap{"login": githubv4.String(loginName)}
157	if err := mm.mQuery(ctx, &query, vars); err != nil {
158		return nil, err
159	}
160	return &query.User, nil
161}
162
163func (mm *importMediator) fillIssues(ctx context.Context) {
164	initialCursor := githubv4.String("")
165	issues, hasIssues := mm.queryIssue(ctx, initialCursor)
166	for hasIssues {
167		for _, node := range issues.Nodes {
168			// The order of statements in this loop is crucial for the correct concurrent
169			// execution.
170			//
171			// The issue edit channel and the timeline channel need to be added to the
172			// corresponding maps before the issue is sent in the issue channel.
173			// Otherwise, the client could try to retrieve issue edits and timeline itmes
174			// before these channels are even created. In this case the client would
175			// receive a nil channel.
176			issueEditChan := make(chan userContentEdit, CHAN_CAPACITY)
177			timelineChan := make(chan timelineItem, CHAN_CAPACITY)
178			mm.issueEditsMut.Lock()
179			mm.issueEdits[node.issue.Id] = issueEditChan
180			mm.issueEditsMut.Unlock()
181			mm.timelineItemsMut.Lock()
182			mm.timelineItems[node.issue.Id] = timelineChan
183			mm.timelineItemsMut.Unlock()
184			select {
185			case <-ctx.Done():
186				return
187			case mm.issues <- node.issue:
188			}
189
190			// We do not know whether the client reads from the issue edit channel
191			// or the timeline channel first. Since the capacity of any channel is limited
192			// any send operation may block. Hence, in order to avoid deadlocks we need
193			// to send over both these channels concurrently.
194			go func(node issueNode) {
195				mm.fillIssueEdits(ctx, &node, issueEditChan)
196				close(issueEditChan)
197			}(node)
198			go func(node issueNode) {
199				mm.fillTimeline(ctx, &node, timelineChan)
200				close(timelineChan)
201			}(node)
202		}
203		if !issues.PageInfo.HasNextPage {
204			break
205		}
206		issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
207	}
208}
209
210func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) {
211	edits := &issueNode.UserContentEdits
212	hasEdits := true
213	for hasEdits {
214		for edit := range reverse(edits.Nodes) {
215			if edit.Diff == nil || string(*edit.Diff) == "" {
216				// issueEdit.Diff == nil happen if the event is older than early
217				// 2018, Github doesn't have the data before that. Best we can do is
218				// to ignore the event.
219				continue
220			}
221			select {
222			case <-ctx.Done():
223				return
224			case channel <- edit:
225			}
226		}
227		if !edits.PageInfo.HasPreviousPage {
228			break
229		}
230		edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
231	}
232}
233
234func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) {
235	items := &issueNode.TimelineItems
236	hasItems := true
237	for hasItems {
238		for _, item := range items.Nodes {
239			if item.Typename == "IssueComment" {
240				// Here the order of statements is crucial for correct concurrency.
241				commentEditChan := make(chan userContentEdit, CHAN_CAPACITY)
242				mm.commentEditsMut.Lock()
243				mm.commentEdits[item.IssueComment.Id] = commentEditChan
244				mm.commentEditsMut.Unlock()
245				select {
246				case <-ctx.Done():
247					return
248				case channel <- item:
249				}
250				// We need to create a new goroutine for filling the comment edit
251				// channel.
252				go func(item timelineItem) {
253					mm.fillCommentEdits(ctx, &item, commentEditChan)
254					close(commentEditChan)
255				}(item)
256			} else {
257				select {
258				case <-ctx.Done():
259					return
260				case channel <- item:
261				}
262			}
263		}
264		if !items.PageInfo.HasNextPage {
265			break
266		}
267		items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
268	}
269}
270
271func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) {
272	// Here we are only concerned with timeline items of type issueComment.
273	if item.Typename != "IssueComment" {
274		return
275	}
276	comment := &item.IssueComment
277	edits := &comment.UserContentEdits
278	hasEdits := true
279	for hasEdits {
280		for edit := range reverse(edits.Nodes) {
281			if edit.Diff == nil || string(*edit.Diff) == "" {
282				// issueEdit.Diff == nil happen if the event is older than early
283				// 2018, Github doesn't have the data before that. Best we can do is
284				// to ignore the event.
285				continue
286			}
287			select {
288			case <-ctx.Done():
289				return
290			case channel <- edit:
291			}
292		}
293		if !edits.PageInfo.HasPreviousPage {
294			break
295		}
296		edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
297	}
298}
299
300func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
301	vars := newCommentEditVars()
302	vars["gqlNodeId"] = nid
303	if cursor == "" {
304		vars["commentEditBefore"] = (*githubv4.String)(nil)
305	} else {
306		vars["commentEditBefore"] = cursor
307	}
308	query := commentEditQuery{}
309	if err := mm.mQuery(ctx, &query, vars); err != nil {
310		mm.setError(err)
311		return nil, false
312	}
313	connection := &query.Node.IssueComment.UserContentEdits
314	if len(connection.Nodes) <= 0 {
315		return nil, false
316	}
317	return connection, true
318}
319
320func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
321	vars := newTimelineVars()
322	vars["gqlNodeId"] = nid
323	if cursor == "" {
324		vars["timelineAfter"] = (*githubv4.String)(nil)
325	} else {
326		vars["timelineAfter"] = cursor
327	}
328	query := timelineQuery{}
329	if err := mm.mQuery(ctx, &query, vars); err != nil {
330		mm.setError(err)
331		return nil, false
332	}
333	connection := &query.Node.Issue.TimelineItems
334	if len(connection.Nodes) <= 0 {
335		return nil, false
336	}
337	return connection, true
338}
339
340func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
341	vars := newIssueEditVars()
342	vars["gqlNodeId"] = nid
343	if cursor == "" {
344		vars["issueEditBefore"] = (*githubv4.String)(nil)
345	} else {
346		vars["issueEditBefore"] = cursor
347	}
348	query := issueEditQuery{}
349	if err := mm.mQuery(ctx, &query, vars); err != nil {
350		mm.setError(err)
351		return nil, false
352	}
353	connection := &query.Node.Issue.UserContentEdits
354	if len(connection.Nodes) <= 0 {
355		return nil, false
356	}
357	return connection, true
358}
359
360func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
361	vars := newIssueVars(mm.owner, mm.project, mm.since)
362	if cursor == "" {
363		vars["issueAfter"] = (*githubv4.String)(nil)
364	} else {
365		vars["issueAfter"] = githubv4.String(cursor)
366	}
367	query := issueQuery{}
368	if err := mm.mQuery(ctx, &query, vars); err != nil {
369		mm.setError(err)
370		return nil, false
371	}
372	connection := &query.Repository.Issues
373	if len(connection.Nodes) <= 0 {
374		return nil, false
375	}
376	return connection, true
377}
378
379func reverse(eds []userContentEdit) chan userContentEdit {
380	ret := make(chan userContentEdit)
381	go func() {
382		for i := range eds {
383			ret <- eds[len(eds)-1-i]
384		}
385		close(ret)
386	}()
387	return ret
388}
389
390type rateLimiter interface {
391	rateLimit() rateLimit
392}
393
394// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
395// and it is used to populate the response into it. It should be a pointer to a struct that
396// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
397// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
398// is expired.
399func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
400	// first: just send the query to the graphql api
401	vars["dryRun"] = githubv4.Boolean(false)
402	qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
403	defer cancel()
404	err := mm.gc.Query(qctx, query, vars)
405	if err == nil {
406		// no error: done
407		return nil
408	}
409	// matching the error string
410	if !strings.Contains(err.Error(), "API rate limit exceeded") {
411		// an error, but not the API rate limit error: done
412		return err
413	}
414	// a rate limit error
415	// ask the graphql api for rate limiting information
416	vars["dryRun"] = githubv4.Boolean(true)
417	qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
418	defer cancel()
419	if err := mm.gc.Query(qctx, query, vars); err != nil {
420		return err
421	}
422	rateLimit := query.rateLimit()
423	if rateLimit.Cost > rateLimit.Remaining {
424		// sleep
425		resetTime := rateLimit.ResetAt.Time
426		// Add a few seconds (8) for good measure
427		resetTime = resetTime.Add(8 * time.Second)
428		fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String())
429		timer := time.NewTimer(time.Until(resetTime))
430		select {
431		case <-ctx.Done():
432			stop(timer)
433			return ctx.Err()
434		case <-timer.C:
435		}
436	}
437	// run the original query again
438	vars["dryRun"] = githubv4.Boolean(false)
439	qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
440	defer cancel()
441	err = mm.gc.Query(qctx, query, vars)
442	return err // might be nil
443}
444
445func stop(t *time.Timer) {
446	if !t.Stop() {
447		select {
448		case <-t.C:
449		default:
450		}
451	}
452}