1package github
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 "github.com/shurcooL/githubv4"
11)
12
13const ( // These values influence how fast the github graphql rate limit is exhausted.
14 NUM_ISSUES = 40
15 NUM_ISSUE_EDITS = 100
16 NUM_TIMELINE_ITEMS = 100
17 NUM_COMMENT_EDITS = 100
18
19 CHAN_CAPACITY = 128
20)
21
22// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
23type importMediator struct {
24 // Github graphql client
25 gc *githubv4.Client
26
27 // name of the repository owner on Github
28 owner string
29
30 // name of the Github repository
31 project string
32
33 // since specifies which issues to import. Issues that have been updated at or after the
34 // given date should be imported.
35 since time.Time
36
37 // Issues is a channel holding bundles of Issues to be imported. Each issueEvent
38 // is either a message (type messageEvent) or a struct holding all the data associated with
39 // one issue (type issueData).
40 Issues chan issueEvent
41
42 // Sticky error
43 err error
44
45 // errMut is a mutex to synchronize access to the sticky error variable err.
46 errMut sync.Mutex
47}
48
49type issueEvent interface {
50 isIssueEvent()
51}
52type timelineEvent interface {
53 isTimelineEvent()
54}
55type userContentEditEvent interface {
56 isUserContentEditEvent()
57}
58
59type messageEvent struct {
60 msg string
61}
62
63func (messageEvent) isIssueEvent() {}
64func (messageEvent) isUserContentEditEvent() {}
65func (messageEvent) isTimelineEvent() {}
66
67type issueData struct {
68 issue
69 issueEdits <-chan userContentEditEvent
70 timelineItems <-chan timelineEvent
71}
72
73func (issueData) isIssueEvent() {}
74
75type timelineData struct {
76 timelineItem
77 userContentEdits <-chan userContentEditEvent
78}
79
80func (timelineData) isTimelineEvent() {}
81
82type userContentEditData struct {
83 userContentEdit
84}
85
86// func (userContentEditData) isEvent()
87func (userContentEditData) isUserContentEditEvent() {}
88
89func (mm *importMediator) setError(err error) {
90 mm.errMut.Lock()
91 mm.err = err
92 mm.errMut.Unlock()
93}
94
95func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
96 mm := importMediator{
97 gc: client,
98 owner: owner,
99 project: project,
100 since: since,
101 Issues: make(chan issueEvent, CHAN_CAPACITY),
102 err: nil,
103 }
104 go func() {
105 mm.fillIssues(ctx)
106 close(mm.Issues)
107 }()
108 return &mm
109}
110
111type varmap map[string]interface{}
112
113func newIssueVars(owner, project string, since time.Time) varmap {
114 return varmap{
115 "owner": githubv4.String(owner),
116 "name": githubv4.String(project),
117 "issueSince": githubv4.DateTime{Time: since},
118 "issueFirst": githubv4.Int(NUM_ISSUES),
119 "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
120 "issueEditBefore": (*githubv4.String)(nil),
121 "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
122 "timelineAfter": (*githubv4.String)(nil),
123 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
124 "commentEditBefore": (*githubv4.String)(nil),
125 }
126}
127
128func newIssueEditVars() varmap {
129 return varmap{
130 "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
131 }
132}
133
134func newTimelineVars() varmap {
135 return varmap{
136 "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
137 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
138 "commentEditBefore": (*githubv4.String)(nil),
139 }
140}
141
142func newCommentEditVars() varmap {
143 return varmap{
144 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
145 }
146}
147
148func (mm *importMediator) Error() error {
149 mm.errMut.Lock()
150 err := mm.err
151 mm.errMut.Unlock()
152 return err
153}
154
155func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
156 query := userQuery{}
157 vars := varmap{"login": githubv4.String(loginName)}
158 // handle message events localy
159 channel := make(chan messageEvent)
160 defer close(channel)
161 // print all messages immediately
162 go func() {
163 for event := range channel {
164 fmt.Println(event.msg)
165 }
166 }()
167 if err := mm.mQuery(ctx, &query, vars, channel); err != nil {
168 return nil, err
169 }
170 return &query.User, nil
171}
172
173func (mm *importMediator) fillIssues(ctx context.Context) {
174 // First: setup message handling while submitting GraphQL queries.
175 msgs := make(chan messageEvent)
176 defer close(msgs)
177 // forward all the messages to the issue channel. The message will be queued after
178 // all the issues which has been completed.
179 go func() {
180 for msg := range msgs {
181 select {
182 case <-ctx.Done():
183 return
184 case mm.Issues <- msg:
185 }
186 }
187 }()
188 // start processing the actual issues
189 initialCursor := githubv4.String("")
190 issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs)
191 for hasIssues {
192 for _, node := range issues.Nodes {
193 // We need to send an issue-bundle over the issue channel before we start
194 // filling the issue edits and the timeline items to avoid deadlocks.
195 issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
196 timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY)
197 select {
198 case <-ctx.Done():
199 return
200 case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}:
201 }
202
203 // We do not know whether the client reads from the issue edit channel
204 // or the timeline channel first. Since the capacity of any channel is limited
205 // any send operation may block. Hence, in order to avoid deadlocks we need
206 // to send over both these channels concurrently.
207 go func(node issueNode) {
208 mm.fillIssueEdits(ctx, &node, issueEditChan)
209 close(issueEditChan)
210 }(node)
211 go func(node issueNode) {
212 mm.fillTimeline(ctx, &node, timelineBundleChan)
213 close(timelineBundleChan)
214 }(node)
215 }
216 if !issues.PageInfo.HasNextPage {
217 break
218 }
219 issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs)
220 }
221}
222
223func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) {
224 // First: setup message handling while submitting GraphQL queries.
225 msgs := make(chan messageEvent)
226 defer close(msgs)
227 // forward all the messages to the issue-edit channel. The message will be queued after
228 // all the issue edits which have been completed.
229 go func() {
230 for msg := range msgs {
231 select {
232 case <-ctx.Done():
233 return
234 case channel <- msg:
235 }
236 }
237 }()
238 edits := &issueNode.UserContentEdits
239 hasEdits := true
240 for hasEdits {
241 for edit := range reverse(edits.Nodes) {
242 if edit.Diff == nil || string(*edit.Diff) == "" {
243 // issueEdit.Diff == nil happen if the event is older than early
244 // 2018, Github doesn't have the data before that. Best we can do is
245 // to ignore the event.
246 continue
247 }
248 select {
249 case <-ctx.Done():
250 return
251 case channel <- userContentEditData{edit}:
252 }
253 }
254 if !edits.PageInfo.HasPreviousPage {
255 break
256 }
257 edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs)
258 }
259}
260
261func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) {
262 // First: setup message handling while submitting GraphQL queries.
263 msgs := make(chan messageEvent)
264 defer close(msgs)
265 // forward all the messages to the timeline channel. The message will be queued after
266 // all the timeline items which have been completed.
267 go func() {
268 for msg := range msgs {
269 select {
270 case <-ctx.Done():
271 return
272 case channel <- msg:
273 }
274 }
275 }()
276 items := &issueNode.TimelineItems
277 hasItems := true
278 for hasItems {
279 for _, item := range items.Nodes {
280 if item.Typename == "IssueComment" {
281 // Issue comments are different than other timeline items in that
282 // they may have associated user content edits.
283 //
284 // Send over the timeline-channel before starting to fill the comment
285 // edits.
286 commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
287 select {
288 case <-ctx.Done():
289 return
290 case channel <- timelineData{item, commentEditChan}:
291 }
292 // We need to create a new goroutine for filling the comment edit
293 // channel.
294 go func(item timelineItem) {
295 mm.fillCommentEdits(ctx, &item, commentEditChan)
296 close(commentEditChan)
297 }(item)
298 } else {
299 select {
300 case <-ctx.Done():
301 return
302 case channel <- timelineData{item, nil}:
303 }
304 }
305 }
306 if !items.PageInfo.HasNextPage {
307 break
308 }
309 items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs)
310 }
311}
312
313func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) {
314 // Here we are only concerned with timeline items of type issueComment.
315 if item.Typename != "IssueComment" {
316 return
317 }
318 // First: setup message handling while submitting GraphQL queries.
319 msgs := make(chan messageEvent)
320 defer close(msgs)
321 // forward all the messages to the user content edit channel. The message will be queued after
322 // all the user content edits which have been completed already.
323 go func() {
324 for msg := range msgs {
325 select {
326 case <-ctx.Done():
327 return
328 case channel <- msg:
329 }
330 }
331 }()
332 comment := &item.IssueComment
333 edits := &comment.UserContentEdits
334 hasEdits := true
335 for hasEdits {
336 for edit := range reverse(edits.Nodes) {
337 if edit.Diff == nil || string(*edit.Diff) == "" {
338 // issueEdit.Diff == nil happen if the event is older than early
339 // 2018, Github doesn't have the data before that. Best we can do is
340 // to ignore the event.
341 continue
342 }
343 select {
344 case <-ctx.Done():
345 return
346 case channel <- userContentEditData{edit}:
347 }
348 }
349 if !edits.PageInfo.HasPreviousPage {
350 break
351 }
352 edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs)
353 }
354}
355
356func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
357 vars := newCommentEditVars()
358 vars["gqlNodeId"] = nid
359 if cursor == "" {
360 vars["commentEditBefore"] = (*githubv4.String)(nil)
361 } else {
362 vars["commentEditBefore"] = cursor
363 }
364 query := commentEditQuery{}
365 if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
366 mm.setError(err)
367 return nil, false
368 }
369 connection := &query.Node.IssueComment.UserContentEdits
370 if len(connection.Nodes) <= 0 {
371 return nil, false
372 }
373 return connection, true
374}
375
376func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) {
377 vars := newTimelineVars()
378 vars["gqlNodeId"] = nid
379 if cursor == "" {
380 vars["timelineAfter"] = (*githubv4.String)(nil)
381 } else {
382 vars["timelineAfter"] = cursor
383 }
384 query := timelineQuery{}
385 if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
386 mm.setError(err)
387 return nil, false
388 }
389 connection := &query.Node.Issue.TimelineItems
390 if len(connection.Nodes) <= 0 {
391 return nil, false
392 }
393 return connection, true
394}
395
396func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
397 vars := newIssueEditVars()
398 vars["gqlNodeId"] = nid
399 if cursor == "" {
400 vars["issueEditBefore"] = (*githubv4.String)(nil)
401 } else {
402 vars["issueEditBefore"] = cursor
403 }
404 query := issueEditQuery{}
405 if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
406 mm.setError(err)
407 return nil, false
408 }
409 connection := &query.Node.Issue.UserContentEdits
410 if len(connection.Nodes) <= 0 {
411 return nil, false
412 }
413 return connection, true
414}
415
416func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) {
417 vars := newIssueVars(mm.owner, mm.project, mm.since)
418 if cursor == "" {
419 vars["issueAfter"] = (*githubv4.String)(nil)
420 } else {
421 vars["issueAfter"] = githubv4.String(cursor)
422 }
423 query := issueQuery{}
424 if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
425 mm.setError(err)
426 return nil, false
427 }
428 connection := &query.Repository.Issues
429 if len(connection.Nodes) <= 0 {
430 return nil, false
431 }
432 return connection, true
433}
434
435func reverse(eds []userContentEdit) chan userContentEdit {
436 ret := make(chan userContentEdit)
437 go func() {
438 for i := range eds {
439 ret <- eds[len(eds)-1-i]
440 }
441 close(ret)
442 }()
443 return ret
444}
445
446type rateLimiter interface {
447 rateLimit() rateLimit
448}
449
450// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
451// and it is used to populate the response into it. It should be a pointer to a struct that
452// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
453// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
454// is expired. If there is another error, then the method will retry before giving up.
455func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
456 if err := mm.queryOnce(ctx, query, vars, msgs); err == nil {
457 // success: done
458 return nil
459 }
460 // failure: we will retry
461 // To retry is important for importing projects with a big number of issues.
462 retries := 3
463 var err error
464 for i := 0; i < retries; i++ {
465 // wait a few seconds before retry
466 sleepTime := 8 * (i + 1)
467 time.Sleep(time.Duration(sleepTime) * time.Second)
468 err = mm.queryOnce(ctx, query, vars, msgs)
469 if err == nil {
470 // success: done
471 return nil
472 }
473 }
474 return err
475}
476
477func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
478 // first: just send the query to the graphql api
479 vars["dryRun"] = githubv4.Boolean(false)
480 qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
481 defer cancel()
482 err := mm.gc.Query(qctx, query, vars)
483 if err == nil {
484 // no error: done
485 return nil
486 }
487 // matching the error string
488 if !strings.Contains(err.Error(), "API rate limit exceeded") {
489 // an error, but not the API rate limit error: done
490 return err
491 }
492 // a rate limit error
493 // ask the graphql api for rate limiting information
494 vars["dryRun"] = githubv4.Boolean(true)
495 qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
496 defer cancel()
497 if err := mm.gc.Query(qctx, query, vars); err != nil {
498 return err
499 }
500 rateLimit := query.rateLimit()
501 if rateLimit.Cost > rateLimit.Remaining {
502 // sleep
503 resetTime := rateLimit.ResetAt.Time
504 // Add a few seconds (8) for good measure
505 resetTime = resetTime.Add(8 * time.Second)
506 msg := fmt.Sprintf("Github GraphQL API rate limit exhausted. Sleeping until %s", resetTime.String())
507 select {
508 case <-ctx.Done():
509 return ctx.Err()
510 case msgs <- messageEvent{msg}:
511 }
512 timer := time.NewTimer(time.Until(resetTime))
513 select {
514 case <-ctx.Done():
515 stop(timer)
516 return ctx.Err()
517 case <-timer.C:
518 }
519 }
520 // run the original query again
521 vars["dryRun"] = githubv4.Boolean(false)
522 qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
523 defer cancel()
524 err = mm.gc.Query(qctx, query, vars)
525 return err // might be nil
526}
527
528func stop(t *time.Timer) {
529 if !t.Stop() {
530 select {
531 case <-t.C:
532 default:
533 }
534 }
535}