1package github
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "github.com/shurcooL/githubv4"
10)
11
12const (
13 // These values influence how fast the github graphql rate limit is exhausted.
14 NumIssues = 40
15 NumIssueEdits = 100
16 NumTimelineItems = 100
17 NumCommentEdits = 100
18
19 ChanCapacity = 128
20)
21
22// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
23type importMediator struct {
24 // Github graphql client
25 gc *githubv4.Client
26
27 // name of the repository owner on Github
28 owner string
29
30 // name of the Github repository
31 project string
32
33 // since specifies which issues to import. Issues that have been updated at or after the
34 // given date should be imported.
35 since time.Time
36
37 // importEvents holds events representing issues, comments, edits, ...
38 // In this channel issues are immediately followed by their issue edits and comments are
39 // immediately followed by their comment edits.
40 importEvents chan ImportEvent
41
42 // Sticky error
43 err error
44}
45
46type ImportEvent interface {
47 isImportEvent()
48}
49
50type RateLimitingEvent struct {
51 msg string
52}
53
54func (RateLimitingEvent) isImportEvent() {}
55
56type IssueEvent struct {
57 issue
58}
59
60func (IssueEvent) isImportEvent() {}
61
62type IssueEditEvent struct {
63 issueId githubv4.ID
64 userContentEdit
65}
66
67func (IssueEditEvent) isImportEvent() {}
68
69type TimelineEvent struct {
70 issueId githubv4.ID
71 timelineItem
72}
73
74func (TimelineEvent) isImportEvent() {}
75
76type CommentEditEvent struct {
77 commentId githubv4.ID
78 userContentEdit
79}
80
81func (CommentEditEvent) isImportEvent() {}
82
83func (mm *importMediator) NextImportEvent() ImportEvent {
84 return <-mm.importEvents
85}
86
87func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
88 mm := importMediator{
89 gc: client,
90 owner: owner,
91 project: project,
92 since: since,
93 importEvents: make(chan ImportEvent, ChanCapacity),
94 err: nil,
95 }
96 go func() {
97 mm.fillImportEvents(ctx)
98 close(mm.importEvents)
99 }()
100 return &mm
101}
102
103type varmap map[string]interface{}
104
105func newIssueVars(owner, project string, since time.Time) varmap {
106 return varmap{
107 "owner": githubv4.String(owner),
108 "name": githubv4.String(project),
109 "issueSince": githubv4.DateTime{Time: since},
110 "issueFirst": githubv4.Int(NumIssues),
111 "issueEditLast": githubv4.Int(NumIssueEdits),
112 "issueEditBefore": (*githubv4.String)(nil),
113 "timelineFirst": githubv4.Int(NumTimelineItems),
114 "timelineAfter": (*githubv4.String)(nil),
115 "commentEditLast": githubv4.Int(NumCommentEdits),
116 "commentEditBefore": (*githubv4.String)(nil),
117 }
118}
119
120func newIssueEditVars() varmap {
121 return varmap{
122 "issueEditLast": githubv4.Int(NumIssueEdits),
123 }
124}
125
126func newTimelineVars() varmap {
127 return varmap{
128 "timelineFirst": githubv4.Int(NumTimelineItems),
129 "commentEditLast": githubv4.Int(NumCommentEdits),
130 "commentEditBefore": (*githubv4.String)(nil),
131 }
132}
133
134func newCommentEditVars() varmap {
135 return varmap{
136 "commentEditLast": githubv4.Int(NumCommentEdits),
137 }
138}
139
140func (mm *importMediator) Error() error {
141 return mm.err
142}
143
144func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
145 query := userQuery{}
146 vars := varmap{"login": githubv4.String(loginName)}
147 if err := mm.mQuery(ctx, &query, vars); err != nil {
148 return nil, err
149 }
150 return &query.User, nil
151}
152
153func (mm *importMediator) fillImportEvents(ctx context.Context) {
154 initialCursor := githubv4.String("")
155 issues, hasIssues := mm.queryIssue(ctx, initialCursor)
156 for hasIssues {
157 for _, node := range issues.Nodes {
158 select {
159 case <-ctx.Done():
160 return
161 case mm.importEvents <- IssueEvent{node.issue}:
162 }
163
164 // issue edit events follow the issue event
165 mm.fillIssueEditEvents(ctx, &node)
166 // last come the timeline events
167 mm.fillTimelineEvents(ctx, &node)
168 }
169 if !issues.PageInfo.HasNextPage {
170 break
171 }
172 issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
173 }
174}
175
176func (mm *importMediator) fillIssueEditEvents(ctx context.Context, issueNode *issueNode) {
177 edits := &issueNode.UserContentEdits
178 hasEdits := true
179 for hasEdits {
180 for edit := range reverse(edits.Nodes) {
181 if edit.Diff == nil || string(*edit.Diff) == "" {
182 // issueEdit.Diff == nil happen if the event is older than early
183 // 2018, Github doesn't have the data before that. Best we can do is
184 // to ignore the event.
185 continue
186 }
187 select {
188 case <-ctx.Done():
189 return
190 case mm.importEvents <- IssueEditEvent{issueId: issueNode.issue.Id, userContentEdit: edit}:
191 }
192 }
193 if !edits.PageInfo.HasPreviousPage {
194 break
195 }
196 edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
197 }
198}
199
200func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
201 vars := newIssueEditVars()
202 vars["gqlNodeId"] = nid
203 if cursor == "" {
204 vars["issueEditBefore"] = (*githubv4.String)(nil)
205 } else {
206 vars["issueEditBefore"] = cursor
207 }
208 query := issueEditQuery{}
209 if err := mm.mQuery(ctx, &query, vars); err != nil {
210 mm.err = err
211 return nil, false
212 }
213 connection := &query.Node.Issue.UserContentEdits
214 if len(connection.Nodes) <= 0 {
215 return nil, false
216 }
217 return connection, true
218}
219
220func (mm *importMediator) fillTimelineEvents(ctx context.Context, issueNode *issueNode) {
221 items := &issueNode.TimelineItems
222 hasItems := true
223 for hasItems {
224 for _, item := range items.Nodes {
225 select {
226 case <-ctx.Done():
227 return
228 case mm.importEvents <- TimelineEvent{issueId: issueNode.issue.Id, timelineItem: item}:
229 }
230 if item.Typename == "IssueComment" {
231 // Issue comments are different than other timeline items in that
232 // they may have associated user content edits.
233 // Right after the comment we send the comment edits.
234 mm.fillCommentEdits(ctx, &item)
235 }
236 }
237 if !items.PageInfo.HasNextPage {
238 break
239 }
240 items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
241 }
242}
243
244func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
245 vars := newTimelineVars()
246 vars["gqlNodeId"] = nid
247 if cursor == "" {
248 vars["timelineAfter"] = (*githubv4.String)(nil)
249 } else {
250 vars["timelineAfter"] = cursor
251 }
252 query := timelineQuery{}
253 if err := mm.mQuery(ctx, &query, vars); err != nil {
254 mm.err = err
255 return nil, false
256 }
257 connection := &query.Node.Issue.TimelineItems
258 if len(connection.Nodes) <= 0 {
259 return nil, false
260 }
261 return connection, true
262}
263
264func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem) {
265 // Here we are only concerned with timeline items of type issueComment.
266 if item.Typename != "IssueComment" {
267 return
268 }
269 // First: setup message handling while submitting GraphQL queries.
270 comment := &item.IssueComment
271 edits := &comment.UserContentEdits
272 hasEdits := true
273 for hasEdits {
274 for edit := range reverse(edits.Nodes) {
275 if edit.Diff == nil || string(*edit.Diff) == "" {
276 // issueEdit.Diff == nil happen if the event is older than early
277 // 2018, Github doesn't have the data before that. Best we can do is
278 // to ignore the event.
279 continue
280 }
281 select {
282 case <-ctx.Done():
283 return
284 case mm.importEvents <- CommentEditEvent{commentId: comment.Id, userContentEdit: edit}:
285 }
286 }
287 if !edits.PageInfo.HasPreviousPage {
288 break
289 }
290 edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
291 }
292}
293
294func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
295 vars := newCommentEditVars()
296 vars["gqlNodeId"] = nid
297 if cursor == "" {
298 vars["commentEditBefore"] = (*githubv4.String)(nil)
299 } else {
300 vars["commentEditBefore"] = cursor
301 }
302 query := commentEditQuery{}
303 if err := mm.mQuery(ctx, &query, vars); err != nil {
304 mm.err = err
305 return nil, false
306 }
307 connection := &query.Node.IssueComment.UserContentEdits
308 if len(connection.Nodes) <= 0 {
309 return nil, false
310 }
311 return connection, true
312}
313
314func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
315 vars := newIssueVars(mm.owner, mm.project, mm.since)
316 if cursor == "" {
317 vars["issueAfter"] = (*githubv4.String)(nil)
318 } else {
319 vars["issueAfter"] = cursor
320 }
321 query := issueQuery{}
322 if err := mm.mQuery(ctx, &query, vars); err != nil {
323 mm.err = err
324 return nil, false
325 }
326 connection := &query.Repository.Issues
327 if len(connection.Nodes) <= 0 {
328 return nil, false
329 }
330 return connection, true
331}
332
333func reverse(eds []userContentEdit) chan userContentEdit {
334 ret := make(chan userContentEdit)
335 go func() {
336 for i := range eds {
337 ret <- eds[len(eds)-1-i]
338 }
339 close(ret)
340 }()
341 return ret
342}
343
344// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
345// and it is used to populate the response into it. It should be a pointer to a struct that
346// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
347// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
348// is expired. If there is another error, then the method will retry before giving up.
349func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
350 if err := mm.queryOnce(ctx, query, vars); err == nil {
351 // success: done
352 return nil
353 }
354 // failure: we will retry
355 // To retry is important for importing projects with a big number of issues, because
356 // there may be temporary network errors or momentary internal errors of the github servers.
357 retries := 3
358 var err error
359 for i := 0; i < retries; i++ {
360 // wait a few seconds before retry
361 sleepTime := time.Duration(8*(i+1)) * time.Second
362 timer := time.NewTimer(sleepTime)
363 select {
364 case <-ctx.Done():
365 stop(timer)
366 return ctx.Err()
367 case <-timer.C:
368 }
369 err = mm.queryOnce(ctx, query, vars)
370 if err == nil {
371 // success: done
372 return nil
373 }
374 }
375 return err
376}
377
378func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
379 // first: just send the query to the graphql api
380 vars["dryRun"] = githubv4.Boolean(false)
381 qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
382 defer cancel()
383 err := mm.gc.Query(qctx, query, vars)
384 if err == nil {
385 // no error: done
386 return nil
387 }
388 // matching the error string
389 if !strings.Contains(err.Error(), "API rate limit exceeded") {
390 // an error, but not the API rate limit error: done
391 return err
392 }
393 // a rate limit error
394 // ask the graphql api for rate limiting information
395 vars["dryRun"] = githubv4.Boolean(true)
396 qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
397 defer cancel()
398 if err := mm.gc.Query(qctx, query, vars); err != nil {
399 return err
400 }
401 rateLimit := query.rateLimit()
402 if rateLimit.Cost > rateLimit.Remaining {
403 // sleep
404 resetTime := rateLimit.ResetAt.Time
405 // Add a few seconds (8) for good measure
406 resetTime = resetTime.Add(8 * time.Second)
407 msg := fmt.Sprintf("Github GraphQL API: import will sleep until %s", resetTime.String())
408 select {
409 case <-ctx.Done():
410 return ctx.Err()
411 case mm.importEvents <- RateLimitingEvent{msg}:
412 }
413 timer := time.NewTimer(time.Until(resetTime))
414 select {
415 case <-ctx.Done():
416 stop(timer)
417 return ctx.Err()
418 case <-timer.C:
419 }
420 }
421 // run the original query again
422 vars["dryRun"] = githubv4.Boolean(false)
423 qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
424 defer cancel()
425 err = mm.gc.Query(qctx, query, vars)
426 return err // might be nil
427}
428
429func stop(t *time.Timer) {
430 if !t.Stop() {
431 select {
432 case <-t.C:
433 default:
434 }
435 }
436}