1package github
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 "github.com/shurcooL/githubv4"
11)
12
13const ( // These values influence how fast the github graphql rate limit is exhausted.
14 NUM_ISSUES = 40
15 NUM_ISSUE_EDITS = 100
16 NUM_TIMELINE_ITEMS = 100
17 NUM_COMMENT_EDITS = 100
18
19 CHAN_CAPACITY = 128
20)
21
22// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
23type importMediator struct {
24 // Github graphql client
25 gc *githubv4.Client
26
27 // name of the repository owner on Github
28 owner string
29
30 // name of the Github repository
31 project string
32
33 // since specifies which issues to import. Issues that have been updated at or after the
34 // given date should be imported.
35 since time.Time
36
37 // issues is a channel holding bundles of issues to be imported. Each bundle holds the data
38 // associated with one issue.
39 issues chan issueBundle
40
41 // Sticky error
42 err error
43
44 // errMut is a mutex to synchronize access to the sticky error variable err.
45 errMut sync.Mutex
46}
47
48type issueBundle struct {
49 issue issue
50 issueEdits <-chan userContentEdit
51 timelineBundles <-chan timelineBundle
52}
53
54type timelineBundle struct {
55 timelineItem timelineItem
56 userContentEdits <-chan userContentEdit
57}
58
59func (mm *importMediator) setError(err error) {
60 mm.errMut.Lock()
61 mm.err = err
62 mm.errMut.Unlock()
63}
64
65func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
66 mm := importMediator{
67 gc: client,
68 owner: owner,
69 project: project,
70 since: since,
71 issues: make(chan issueBundle, CHAN_CAPACITY),
72 err: nil,
73 }
74 go func() {
75 mm.fillIssues(ctx)
76 close(mm.issues)
77 }()
78 return &mm
79}
80
81type varmap map[string]interface{}
82
83func newIssueVars(owner, project string, since time.Time) varmap {
84 return varmap{
85 "owner": githubv4.String(owner),
86 "name": githubv4.String(project),
87 "issueSince": githubv4.DateTime{Time: since},
88 "issueFirst": githubv4.Int(NUM_ISSUES),
89 "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
90 "issueEditBefore": (*githubv4.String)(nil),
91 "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
92 "timelineAfter": (*githubv4.String)(nil),
93 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
94 "commentEditBefore": (*githubv4.String)(nil),
95 }
96}
97
98func newIssueEditVars() varmap {
99 return varmap{
100 "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
101 }
102}
103
104func newTimelineVars() varmap {
105 return varmap{
106 "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
107 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
108 "commentEditBefore": (*githubv4.String)(nil),
109 }
110}
111
112func newCommentEditVars() varmap {
113 return varmap{
114 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
115 }
116}
117
118func (mm *importMediator) Issues() <-chan issueBundle {
119 return mm.issues
120}
121
122func (mm *importMediator) Error() error {
123 mm.errMut.Lock()
124 err := mm.err
125 mm.errMut.Unlock()
126 return err
127}
128
129func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
130 query := userQuery{}
131 vars := varmap{"login": githubv4.String(loginName)}
132 if err := mm.mQuery(ctx, &query, vars); err != nil {
133 return nil, err
134 }
135 return &query.User, nil
136}
137
138func (mm *importMediator) fillIssues(ctx context.Context) {
139 initialCursor := githubv4.String("")
140 issues, hasIssues := mm.queryIssue(ctx, initialCursor)
141 for hasIssues {
142 for _, node := range issues.Nodes {
143 // We need to send an issue-bundle over the issue channel before we start
144 // filling the issue edits and the timeline items to avoid deadlocks.
145 issueEditChan := make(chan userContentEdit, CHAN_CAPACITY)
146 timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY)
147 select {
148 case <-ctx.Done():
149 return
150 case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}:
151 }
152
153 // We do not know whether the client reads from the issue edit channel
154 // or the timeline channel first. Since the capacity of any channel is limited
155 // any send operation may block. Hence, in order to avoid deadlocks we need
156 // to send over both these channels concurrently.
157 go func(node issueNode) {
158 mm.fillIssueEdits(ctx, &node, issueEditChan)
159 close(issueEditChan)
160 }(node)
161 go func(node issueNode) {
162 mm.fillTimeline(ctx, &node, timelineBundleChan)
163 close(timelineBundleChan)
164 }(node)
165 }
166 if !issues.PageInfo.HasNextPage {
167 break
168 }
169 issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
170 }
171}
172
173func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) {
174 edits := &issueNode.UserContentEdits
175 hasEdits := true
176 for hasEdits {
177 for edit := range reverse(edits.Nodes) {
178 if edit.Diff == nil || string(*edit.Diff) == "" {
179 // issueEdit.Diff == nil happen if the event is older than early
180 // 2018, Github doesn't have the data before that. Best we can do is
181 // to ignore the event.
182 continue
183 }
184 select {
185 case <-ctx.Done():
186 return
187 case channel <- edit:
188 }
189 }
190 if !edits.PageInfo.HasPreviousPage {
191 break
192 }
193 edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
194 }
195}
196
197func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) {
198 items := &issueNode.TimelineItems
199 hasItems := true
200 for hasItems {
201 for _, item := range items.Nodes {
202 if item.Typename == "IssueComment" {
203 // Issue comments are different than other timeline items in that
204 // they may have associated user content edits.
205 //
206 // Send over the timeline-channel before starting to fill the comment
207 // edits.
208 commentEditChan := make(chan userContentEdit, CHAN_CAPACITY)
209 select {
210 case <-ctx.Done():
211 return
212 case channel <- timelineBundle{item, commentEditChan}:
213 }
214 // We need to create a new goroutine for filling the comment edit
215 // channel.
216 go func(item timelineItem) {
217 mm.fillCommentEdits(ctx, &item, commentEditChan)
218 close(commentEditChan)
219 }(item)
220 } else {
221 select {
222 case <-ctx.Done():
223 return
224 case channel <- timelineBundle{item, nil}:
225 }
226 }
227 }
228 if !items.PageInfo.HasNextPage {
229 break
230 }
231 items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
232 }
233}
234
235func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) {
236 // Here we are only concerned with timeline items of type issueComment.
237 if item.Typename != "IssueComment" {
238 return
239 }
240 comment := &item.IssueComment
241 edits := &comment.UserContentEdits
242 hasEdits := true
243 for hasEdits {
244 for edit := range reverse(edits.Nodes) {
245 if edit.Diff == nil || string(*edit.Diff) == "" {
246 // issueEdit.Diff == nil happen if the event is older than early
247 // 2018, Github doesn't have the data before that. Best we can do is
248 // to ignore the event.
249 continue
250 }
251 select {
252 case <-ctx.Done():
253 return
254 case channel <- edit:
255 }
256 }
257 if !edits.PageInfo.HasPreviousPage {
258 break
259 }
260 edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
261 }
262}
263
264func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
265 vars := newCommentEditVars()
266 vars["gqlNodeId"] = nid
267 if cursor == "" {
268 vars["commentEditBefore"] = (*githubv4.String)(nil)
269 } else {
270 vars["commentEditBefore"] = cursor
271 }
272 query := commentEditQuery{}
273 if err := mm.mQuery(ctx, &query, vars); err != nil {
274 mm.setError(err)
275 return nil, false
276 }
277 connection := &query.Node.IssueComment.UserContentEdits
278 if len(connection.Nodes) <= 0 {
279 return nil, false
280 }
281 return connection, true
282}
283
284func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
285 vars := newTimelineVars()
286 vars["gqlNodeId"] = nid
287 if cursor == "" {
288 vars["timelineAfter"] = (*githubv4.String)(nil)
289 } else {
290 vars["timelineAfter"] = cursor
291 }
292 query := timelineQuery{}
293 if err := mm.mQuery(ctx, &query, vars); err != nil {
294 mm.setError(err)
295 return nil, false
296 }
297 connection := &query.Node.Issue.TimelineItems
298 if len(connection.Nodes) <= 0 {
299 return nil, false
300 }
301 return connection, true
302}
303
304func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
305 vars := newIssueEditVars()
306 vars["gqlNodeId"] = nid
307 if cursor == "" {
308 vars["issueEditBefore"] = (*githubv4.String)(nil)
309 } else {
310 vars["issueEditBefore"] = cursor
311 }
312 query := issueEditQuery{}
313 if err := mm.mQuery(ctx, &query, vars); err != nil {
314 mm.setError(err)
315 return nil, false
316 }
317 connection := &query.Node.Issue.UserContentEdits
318 if len(connection.Nodes) <= 0 {
319 return nil, false
320 }
321 return connection, true
322}
323
324func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
325 vars := newIssueVars(mm.owner, mm.project, mm.since)
326 if cursor == "" {
327 vars["issueAfter"] = (*githubv4.String)(nil)
328 } else {
329 vars["issueAfter"] = githubv4.String(cursor)
330 }
331 query := issueQuery{}
332 if err := mm.mQuery(ctx, &query, vars); err != nil {
333 mm.setError(err)
334 return nil, false
335 }
336 connection := &query.Repository.Issues
337 if len(connection.Nodes) <= 0 {
338 return nil, false
339 }
340 return connection, true
341}
342
343func reverse(eds []userContentEdit) chan userContentEdit {
344 ret := make(chan userContentEdit)
345 go func() {
346 for i := range eds {
347 ret <- eds[len(eds)-1-i]
348 }
349 close(ret)
350 }()
351 return ret
352}
353
354type rateLimiter interface {
355 rateLimit() rateLimit
356}
357
358// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
359// and it is used to populate the response into it. It should be a pointer to a struct that
360// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
361// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
362// is expired.
363func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
364 // first: just send the query to the graphql api
365 vars["dryRun"] = githubv4.Boolean(false)
366 qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
367 defer cancel()
368 err := mm.gc.Query(qctx, query, vars)
369 if err == nil {
370 // no error: done
371 return nil
372 }
373 // matching the error string
374 if !strings.Contains(err.Error(), "API rate limit exceeded") {
375 // an error, but not the API rate limit error: done
376 return err
377 }
378 // a rate limit error
379 // ask the graphql api for rate limiting information
380 vars["dryRun"] = githubv4.Boolean(true)
381 qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
382 defer cancel()
383 if err := mm.gc.Query(qctx, query, vars); err != nil {
384 return err
385 }
386 rateLimit := query.rateLimit()
387 if rateLimit.Cost > rateLimit.Remaining {
388 // sleep
389 resetTime := rateLimit.ResetAt.Time
390 // Add a few seconds (8) for good measure
391 resetTime = resetTime.Add(8 * time.Second)
392 fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String())
393 timer := time.NewTimer(time.Until(resetTime))
394 select {
395 case <-ctx.Done():
396 stop(timer)
397 return ctx.Err()
398 case <-timer.C:
399 }
400 }
401 // run the original query again
402 vars["dryRun"] = githubv4.Boolean(false)
403 qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
404 defer cancel()
405 err = mm.gc.Query(qctx, query, vars)
406 return err // might be nil
407}
408
409func stop(t *time.Timer) {
410 if !t.Stop() {
411 select {
412 case <-t.C:
413 default:
414 }
415 }
416}