1package github
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 "github.com/shurcooL/githubv4"
11)
12
13const ( // These values influence how fast the github graphql rate limit is exhausted.
14 NUM_ISSUES = 40
15 NUM_ISSUE_EDITS = 100
16 NUM_TIMELINE_ITEMS = 100
17 NUM_COMMENT_EDITS = 100
18
19 CHAN_CAPACITY = 128
20)
21
22type varmap map[string]interface{}
23
24// importMediator provides an interface to retrieve Github issues.
25type importMediator struct {
26 // Github graphql client
27 gc *githubv4.Client
28
29 // name of the repository owner on Github
30 owner string
31
32 // name of the Github repository
33 project string
34
35 // The importMediator will only query issues updated or created after the date given in
36 // the variable since.
37 since time.Time
38
39 // channel for the issues
40 issues chan issue
41
42 // channel for issue edits
43 issueEdits map[githubv4.ID]chan userContentEdit
44 issueEditsMut sync.Mutex
45
46 // channel for timeline items
47 timelineItems map[githubv4.ID]chan timelineItem
48 timelineItemsMut sync.Mutex
49
50 // channel for comment edits
51 commentEdits map[githubv4.ID]chan userContentEdit
52 commentEditsMut sync.Mutex
53
54 // Sticky error
55 err error
56 errMut sync.Mutex
57}
58
59func (mm *importMediator) setError(err error) {
60 mm.errMut.Lock()
61 mm.err = err
62 mm.errMut.Unlock()
63}
64
65func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
66 mm := importMediator{
67 gc: client,
68 owner: owner,
69 project: project,
70 since: since,
71 issues: make(chan issue, CHAN_CAPACITY),
72 issueEdits: make(map[githubv4.ID]chan userContentEdit),
73 issueEditsMut: sync.Mutex{},
74 timelineItems: make(map[githubv4.ID]chan timelineItem),
75 timelineItemsMut: sync.Mutex{},
76 commentEdits: make(map[githubv4.ID]chan userContentEdit),
77 commentEditsMut: sync.Mutex{},
78 err: nil,
79 }
80 go func() {
81 mm.fillIssues(ctx)
82 close(mm.issues)
83 }()
84 return &mm
85}
86
87func newIssueVars(owner, project string, since time.Time) varmap {
88 return varmap{
89 "owner": githubv4.String(owner),
90 "name": githubv4.String(project),
91 "issueSince": githubv4.DateTime{Time: since},
92 "issueFirst": githubv4.Int(NUM_ISSUES),
93 "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
94 "issueEditBefore": (*githubv4.String)(nil),
95 "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
96 "timelineAfter": (*githubv4.String)(nil),
97 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
98 "commentEditBefore": (*githubv4.String)(nil),
99 }
100}
101
102func newIssueEditVars() varmap {
103 return varmap{
104 "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
105 }
106}
107
108func newTimelineVars() varmap {
109 return varmap{
110 "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
111 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
112 "commentEditBefore": (*githubv4.String)(nil),
113 }
114}
115
116func newCommentEditVars() varmap {
117 return varmap{
118 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
119 }
120}
121
122func (mm *importMediator) Issues() <-chan issue {
123 return mm.issues
124}
125
126func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit {
127 mm.issueEditsMut.Lock()
128 channel := mm.issueEdits[issue.Id]
129 mm.issueEditsMut.Unlock()
130 return channel
131}
132
133func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem {
134 mm.timelineItemsMut.Lock()
135 channel := mm.timelineItems[issue.Id]
136 mm.timelineItemsMut.Unlock()
137 return channel
138}
139
140func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit {
141 mm.commentEditsMut.Lock()
142 channel := mm.commentEdits[comment.Id]
143 mm.commentEditsMut.Unlock()
144 return channel
145}
146
147func (mm *importMediator) Error() error {
148 mm.errMut.Lock()
149 err := mm.err
150 mm.errMut.Unlock()
151 return err
152}
153
154func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
155 query := userQuery{}
156 vars := varmap{"login": githubv4.String(loginName)}
157 if err := mm.mQuery(ctx, &query, vars); err != nil {
158 return nil, err
159 }
160 return &query.User, nil
161}
162
163func (mm *importMediator) fillIssues(ctx context.Context) {
164 initialCursor := githubv4.String("")
165 issues, hasIssues := mm.queryIssue(ctx, initialCursor)
166 for hasIssues {
167 for _, node := range issues.Nodes {
168 // The order of statements in this loop is crucial for the correct concurrent
169 // execution.
170 //
171 // The issue edit channel and the timeline channel need to be added to the
172 // corresponding maps before the issue is sent in the issue channel.
173 // Otherwise, the client could try to retrieve issue edits and timeline itmes
174 // before these channels are even created. In this case the client would
175 // receive a nil channel.
176 issueEditChan := make(chan userContentEdit, CHAN_CAPACITY)
177 timelineChan := make(chan timelineItem, CHAN_CAPACITY)
178 mm.issueEditsMut.Lock()
179 mm.issueEdits[node.issue.Id] = issueEditChan
180 mm.issueEditsMut.Unlock()
181 mm.timelineItemsMut.Lock()
182 mm.timelineItems[node.issue.Id] = timelineChan
183 mm.timelineItemsMut.Unlock()
184 select {
185 case <-ctx.Done():
186 return
187 case mm.issues <- node.issue:
188 }
189
190 // We do not know whether the client reads from the issue edit channel
191 // or the timeline channel first. Since the capacity of any channel is limited
192 // any send operation may block. Hence, in order to avoid deadlocks we need
193 // to send over both these channels concurrently.
194 go func(node issueNode) {
195 mm.fillIssueEdits(ctx, &node, issueEditChan)
196 close(issueEditChan)
197 }(node)
198 go func(node issueNode) {
199 mm.fillTimeline(ctx, &node, timelineChan)
200 close(timelineChan)
201 }(node)
202 }
203 if !issues.PageInfo.HasNextPage {
204 break
205 }
206 issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
207 }
208}
209
210func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) {
211 edits := &issueNode.UserContentEdits
212 hasEdits := true
213 for hasEdits {
214 for edit := range reverse(edits.Nodes) {
215 if edit.Diff == nil || string(*edit.Diff) == "" {
216 // issueEdit.Diff == nil happen if the event is older than early
217 // 2018, Github doesn't have the data before that. Best we can do is
218 // to ignore the event.
219 continue
220 }
221 select {
222 case <-ctx.Done():
223 return
224 case channel <- edit:
225 }
226 }
227 if !edits.PageInfo.HasPreviousPage {
228 break
229 }
230 edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
231 }
232}
233
234func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) {
235 items := &issueNode.TimelineItems
236 hasItems := true
237 for hasItems {
238 for _, item := range items.Nodes {
239 if item.Typename == "IssueComment" {
240 // Here the order of statements is crucial for correct concurrency.
241 commentEditChan := make(chan userContentEdit, CHAN_CAPACITY)
242 mm.commentEditsMut.Lock()
243 mm.commentEdits[item.IssueComment.Id] = commentEditChan
244 mm.commentEditsMut.Unlock()
245 select {
246 case <-ctx.Done():
247 return
248 case channel <- item:
249 }
250 // We need to create a new goroutine for filling the comment edit
251 // channel.
252 go func(item timelineItem) {
253 mm.fillCommentEdits(ctx, &item, commentEditChan)
254 close(commentEditChan)
255 }(item)
256 } else {
257 select {
258 case <-ctx.Done():
259 return
260 case channel <- item:
261 }
262 }
263 }
264 if !items.PageInfo.HasNextPage {
265 break
266 }
267 items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
268 }
269}
270
271func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) {
272 // Here we are only concerned with timeline items of type issueComment.
273 if item.Typename != "IssueComment" {
274 return
275 }
276 comment := &item.IssueComment
277 edits := &comment.UserContentEdits
278 hasEdits := true
279 for hasEdits {
280 for edit := range reverse(edits.Nodes) {
281 if edit.Diff == nil || string(*edit.Diff) == "" {
282 // issueEdit.Diff == nil happen if the event is older than early
283 // 2018, Github doesn't have the data before that. Best we can do is
284 // to ignore the event.
285 continue
286 }
287 select {
288 case <-ctx.Done():
289 return
290 case channel <- edit:
291 }
292 }
293 if !edits.PageInfo.HasPreviousPage {
294 break
295 }
296 edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
297 }
298}
299
300func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
301 vars := newCommentEditVars()
302 vars["gqlNodeId"] = nid
303 if cursor == "" {
304 vars["commentEditBefore"] = (*githubv4.String)(nil)
305 } else {
306 vars["commentEditBefore"] = cursor
307 }
308 query := commentEditQuery{}
309 if err := mm.mQuery(ctx, &query, vars); err != nil {
310 mm.setError(err)
311 return nil, false
312 }
313 connection := &query.Node.IssueComment.UserContentEdits
314 if len(connection.Nodes) <= 0 {
315 return nil, false
316 }
317 return connection, true
318}
319
320func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
321 vars := newTimelineVars()
322 vars["gqlNodeId"] = nid
323 if cursor == "" {
324 vars["timelineAfter"] = (*githubv4.String)(nil)
325 } else {
326 vars["timelineAfter"] = cursor
327 }
328 query := timelineQuery{}
329 if err := mm.mQuery(ctx, &query, vars); err != nil {
330 mm.setError(err)
331 return nil, false
332 }
333 connection := &query.Node.Issue.TimelineItems
334 if len(connection.Nodes) <= 0 {
335 return nil, false
336 }
337 return connection, true
338}
339
340func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
341 vars := newIssueEditVars()
342 vars["gqlNodeId"] = nid
343 if cursor == "" {
344 vars["issueEditBefore"] = (*githubv4.String)(nil)
345 } else {
346 vars["issueEditBefore"] = cursor
347 }
348 query := issueEditQuery{}
349 if err := mm.mQuery(ctx, &query, vars); err != nil {
350 mm.setError(err)
351 return nil, false
352 }
353 connection := &query.Node.Issue.UserContentEdits
354 if len(connection.Nodes) <= 0 {
355 return nil, false
356 }
357 return connection, true
358}
359
360func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
361 vars := newIssueVars(mm.owner, mm.project, mm.since)
362 if cursor == "" {
363 vars["issueAfter"] = (*githubv4.String)(nil)
364 } else {
365 vars["issueAfter"] = githubv4.String(cursor)
366 }
367 query := issueQuery{}
368 if err := mm.mQuery(ctx, &query, vars); err != nil {
369 mm.setError(err)
370 return nil, false
371 }
372 connection := &query.Repository.Issues
373 if len(connection.Nodes) <= 0 {
374 return nil, false
375 }
376 return connection, true
377}
378
379func reverse(eds []userContentEdit) chan userContentEdit {
380 ret := make(chan userContentEdit)
381 go func() {
382 for i := range eds {
383 ret <- eds[len(eds)-1-i]
384 }
385 close(ret)
386 }()
387 return ret
388}
389
390type rateLimiter interface {
391 rateLimit() rateLimit
392}
393
394// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
395// and it is used to populate the response into it. It should be a pointer to a struct that
396// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
397// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
398// is expired.
399func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
400 // first: just send the query to the graphql api
401 vars["dryRun"] = githubv4.Boolean(false)
402 qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
403 defer cancel()
404 err := mm.gc.Query(qctx, query, vars)
405 if err == nil {
406 // no error: done
407 return nil
408 }
409 // matching the error string
410 if !strings.Contains(err.Error(), "API rate limit exceeded") {
411 // an error, but not the API rate limit error: done
412 return err
413 }
414 // a rate limit error
415 // ask the graphql api for rate limiting information
416 vars["dryRun"] = githubv4.Boolean(true)
417 qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
418 defer cancel()
419 if err := mm.gc.Query(qctx, query, vars); err != nil {
420 return err
421 }
422 rateLimit := query.rateLimit()
423 if rateLimit.Cost > rateLimit.Remaining {
424 // sleep
425 resetTime := rateLimit.ResetAt.Time
426 // Add a few seconds (8) for good measure
427 resetTime = resetTime.Add(8 * time.Second)
428 fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String())
429 timer := time.NewTimer(time.Until(resetTime))
430 select {
431 case <-ctx.Done():
432 stop(timer)
433 return ctx.Err()
434 case <-timer.C:
435 }
436 }
437 // run the original query again
438 vars["dryRun"] = githubv4.Boolean(false)
439 qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
440 defer cancel()
441 err = mm.gc.Query(qctx, query, vars)
442 return err // might be nil
443}
444
445func stop(t *time.Timer) {
446 if !t.Stop() {
447 select {
448 case <-t.C:
449 default:
450 }
451 }
452}