1package github
2
3import (
4 "context"
5 "fmt"
6 "runtime"
7 "sync"
8 "time"
9
10 "github.com/shurcooL/githubv4"
11)
12
13type varmap map[string]interface{}
14
15func trace() {
16 pc := make([]uintptr, 15)
17 n := runtime.Callers(2, pc)
18 frames := runtime.CallersFrames(pc[:n])
19 frame, _ := frames.Next()
20 fmt.Printf("%s:%d %s\n", frame.File, frame.Line, frame.Function)
21}
22
23const (
24 NUM_ISSUES = 50
25 NUM_ISSUE_EDITS = 99
26 NUM_TIMELINE_ITEMS = 99
27 NUM_COMMENT_EDITS = 99
28
29 CHAN_CAPACITY = 128
30)
31
32// TODO: remove all debug output and trace() in all files. Use ag
33
34type importMediator struct {
35 // Github graphql client
36 gc *githubv4.Client
37 owner string
38 project string
39 // The iterator will only query issues updated or created after the date given in
40 // the variable since.
41 since time.Time
42
43 issues chan issue
44 issueEditsMut sync.Mutex
45 timelineItemsMut sync.Mutex
46 commentEditsMut sync.Mutex
47 issueEdits map[githubv4.ID]chan userContentEdit
48 timelineItems map[githubv4.ID]chan timelineItem
49 commentEdits map[githubv4.ID]chan userContentEdit
50
51 // Sticky error
52 err error
53}
54
55func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
56 mm := importMediator{
57 gc: client,
58 owner: owner,
59 project: project,
60 since: since,
61 issues: make(chan issue, CHAN_CAPACITY),
62 issueEditsMut: sync.Mutex{},
63 timelineItemsMut: sync.Mutex{},
64 commentEditsMut: sync.Mutex{},
65 issueEdits: make(map[githubv4.ID]chan userContentEdit),
66 timelineItems: make(map[githubv4.ID]chan timelineItem),
67 commentEdits: make(map[githubv4.ID]chan userContentEdit),
68 err: nil,
69 }
70 go func() {
71 defer close(mm.issues)
72 mm.fillChannels(ctx)
73 }()
74 return &mm
75}
76
77func (mm *importMediator) Issues() <-chan issue {
78 return mm.issues
79}
80
81func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit {
82 mm.issueEditsMut.Lock()
83 channel := mm.issueEdits[issue.Id]
84 mm.issueEditsMut.Unlock()
85 return channel
86}
87
88func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem {
89 mm.timelineItemsMut.Lock()
90 channel := mm.timelineItems[issue.Id]
91 mm.timelineItemsMut.Unlock()
92 return channel
93}
94
95func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit {
96 mm.commentEditsMut.Lock()
97 channel := mm.commentEdits[comment.Id]
98 mm.commentEditsMut.Unlock()
99 return channel
100}
101
102func (mm *importMediator) Error() error {
103 return mm.err
104}
105
106func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
107 query := userQuery{}
108 vars := varmap{"login": githubv4.String(loginName)}
109 c, cancel := context.WithTimeout(ctx, defaultTimeout)
110 defer cancel()
111 if err := mm.mQuery(c, &query, vars); err != nil {
112 return nil, err
113 }
114 return &query.User, nil
115}
116
117func (mm *importMediator) fillChannels(ctx context.Context) {
118 issueCursor := githubv4.String("")
119 for {
120 issues, hasIssues := mm.queryIssue(ctx, issueCursor)
121 if !hasIssues {
122 break
123 }
124 issueCursor = issues.PageInfo.EndCursor
125 for _, issueNode := range issues.Nodes {
126 // fmt.Printf(">>> issue: %v\n", issueNode.issue.Title)
127 mm.fillChannelIssueEdits(ctx, &issueNode)
128 mm.fillChannelTimeline(ctx, &issueNode)
129 // To avoid race conditions add the issue only after all its edits,
130 // timeline times, etc. are added to their respective channels.
131 mm.issues <- issueNode.issue
132 }
133 }
134}
135
136func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode *issueNode) {
137 // fmt.Printf("fillChannelIssueEdit() issue id == %v\n", issueNode.issue.Id)
138 // fmt.Printf("%v\n", issueNode)
139 channel := make(chan userContentEdit, CHAN_CAPACITY)
140 defer close(channel)
141 mm.issueEditsMut.Lock()
142 mm.issueEdits[issueNode.issue.Id] = channel
143 mm.issueEditsMut.Unlock()
144 edits := &issueNode.UserContentEdits
145 hasEdits := true
146 for hasEdits {
147 // fmt.Println("before the reversed loop")
148 for edit := range reverse(edits.Nodes) {
149 // fmt.Println("in the reversed loop")
150 if edit.Diff == nil || string(*edit.Diff) == "" {
151 // issueEdit.Diff == nil happen if the event is older than
152 // early 2018, Github doesn't have the data before that.
153 // Best we can do is to ignore the event.
154 continue
155 }
156 // fmt.Printf("about to push issue edit\n")
157 channel <- edit
158 }
159 // fmt.Printf("has next ? %v\n", edits.PageInfo.HasNextPage)
160 // fmt.Printf("has previous ? %v\n", edits.PageInfo.HasPreviousPage)
161 if !edits.PageInfo.HasPreviousPage {
162 break
163 }
164 edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
165 }
166}
167
168func (mm *importMediator) fillChannelTimeline(ctx context.Context, issueNode *issueNode) {
169 // fmt.Printf("fullChannelTimeline()\n")
170 channel := make(chan timelineItem, CHAN_CAPACITY)
171 defer close(channel)
172 mm.timelineItemsMut.Lock()
173 mm.timelineItems[issueNode.issue.Id] = channel
174 mm.timelineItemsMut.Unlock()
175 items := &issueNode.TimelineItems
176 hasItems := true
177 for hasItems {
178 for _, item := range items.Nodes {
179 channel <- item
180 mm.fillChannelCommentEdits(ctx, &item)
181 }
182 // fmt.Printf("has next ? %v\n", items.PageInfo.HasNextPage)
183 // fmt.Printf("has previous ? %v\n", items.PageInfo.HasPreviousPage)
184 if !items.PageInfo.HasNextPage {
185 break
186 }
187 items, hasItems = mm.queryTimelineItems(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
188 }
189}
190
191func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *timelineItem) {
192 // This concerns only timeline items of type comment
193 if item.Typename != "IssueComment" {
194 return
195 }
196 comment := &item.IssueComment
197 channel := make(chan userContentEdit, CHAN_CAPACITY)
198 defer close(channel)
199 mm.commentEditsMut.Lock()
200 mm.commentEdits[comment.Id] = channel
201 mm.commentEditsMut.Unlock()
202 edits := &comment.UserContentEdits
203 hasEdits := true
204 for hasEdits {
205 for edit := range reverse(edits.Nodes) {
206 if edit.Diff == nil || string(*edit.Diff) == "" {
207 // issueEdit.Diff == nil happen if the event is older than
208 // early 2018, Github doesn't have the data before that.
209 // Best we can do is to ignore the event.
210 continue
211 }
212 channel <- edit
213 }
214 if !edits.PageInfo.HasPreviousPage {
215 break
216 }
217 edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
218 }
219}
220
221func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
222 // trace()
223 vars := varmap{
224 "gqlNodeId": nid,
225 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
226 }
227 if cursor == "" {
228 vars["commentEditBefore"] = (*githubv4.String)(nil)
229 } else {
230 vars["commentEditBefore"] = cursor
231 }
232 c, cancel := context.WithTimeout(ctx, defaultTimeout)
233 defer cancel()
234 query := commentEditQuery{}
235 if err := mm.mQuery(c, &query, vars); err != nil {
236 mm.err = err
237 return nil, false
238 }
239 connection := &query.Node.IssueComment.UserContentEdits
240 if len(connection.Nodes) <= 0 {
241 return nil, false
242 }
243 return connection, true
244}
245
246func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
247 // trace()
248 vars := varmap{
249 "gqlNodeId": nid,
250 "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
251 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
252 "commentEditBefore": (*githubv4.String)(nil),
253 }
254 if cursor == "" {
255 vars["timelineAfter"] = (*githubv4.String)(nil)
256 } else {
257 vars["timelineAfter"] = cursor
258 }
259 c, cancel := context.WithTimeout(ctx, defaultTimeout)
260 defer cancel()
261 query := timelineQuery{}
262 if err := mm.mQuery(c, &query, vars); err != nil {
263 mm.err = err
264 return nil, false
265 }
266 connection := &query.Node.Issue.TimelineItems
267 if len(connection.Nodes) <= 0 {
268 return nil, false
269 }
270 return connection, true
271}
272
273func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
274 // trace()
275 vars := varmap{
276 "gqlNodeId": nid,
277 "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
278 }
279 if cursor == "" {
280 vars["issueEditBefore"] = (*githubv4.String)(nil)
281 } else {
282 vars["issueEditBefore"] = cursor
283 }
284 c, cancel := context.WithTimeout(ctx, defaultTimeout)
285 defer cancel()
286 query := issueEditQuery{}
287 if err := mm.mQuery(c, &query, vars); err != nil {
288 mm.err = err
289 return nil, false
290 }
291 connection := &query.Node.Issue.UserContentEdits
292 if len(connection.Nodes) <= 0 {
293 return nil, false
294 }
295 return connection, true
296}
297
298func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
299 // trace()
300 vars := varmap{
301 "owner": githubv4.String(mm.owner),
302 "name": githubv4.String(mm.project),
303 "issueSince": githubv4.DateTime{Time: mm.since},
304 "issueFirst": githubv4.Int(NUM_ISSUES),
305 "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
306 "issueEditBefore": (*githubv4.String)(nil),
307 "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
308 "timelineAfter": (*githubv4.String)(nil),
309 "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
310 "commentEditBefore": (*githubv4.String)(nil),
311 }
312 if cursor == "" {
313 vars["issueAfter"] = (*githubv4.String)(nil)
314 } else {
315 vars["issueAfter"] = githubv4.String(cursor)
316 }
317 c, cancel := context.WithTimeout(ctx, defaultTimeout)
318 defer cancel()
319 query := issueQuery{}
320 if err := mm.mQuery(c, &query, vars); err != nil {
321 mm.err = err
322 return nil, false
323 }
324 connection := &query.Repository.Issues
325 if len(connection.Nodes) <= 0 {
326 return nil, false
327 }
328 return connection, true
329}
330
331func reverse(eds []userContentEdit) chan userContentEdit {
332 ret := make(chan userContentEdit)
333 go func() {
334 for i := range eds {
335 ret <- eds[len(eds)-1-i]
336 }
337 close(ret)
338 }()
339 return ret
340}
341
342type rateLimiter interface {
343 rateLimit() rateLimit
344}
345
346// TODO: move that into its own file
347//
348// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL
349// query and it is used to populate the response into it. It should be a pointer to a struct
350// that corresponds to the Github graphql schema and it should implement the rateLimiter
351// interface. This function queries Github for the remaining rate limit points before
352// executing the actual query. The function waits, if there are not enough rate limiting
353// points left.
354func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
355 // First: check the cost of the query and wait if necessary
356 vars["dryRun"] = githubv4.Boolean(true)
357 qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
358 defer cancel()
359 if err := mm.gc.Query(qctx, query, vars); err != nil {
360 return err
361 }
362 fmt.Printf("%v\n", query)
363 rateLimit := query.rateLimit()
364 if rateLimit.Cost > rateLimit.Remaining {
365 resetTime := rateLimit.ResetAt.Time
366 fmt.Println("Github rate limit exhausted")
367 fmt.Printf("Sleeping until %s\n", resetTime.String())
368 // Add a few seconds (8) for good measure
369 timer := time.NewTimer(time.Until(resetTime.Add(8 * time.Second)))
370 select {
371 case <-ctx.Done():
372 stop(timer)
373 return ctx.Err()
374 case <-timer.C:
375 }
376 }
377 // Second: Do the actual query
378 vars["dryRun"] = githubv4.Boolean(false)
379 qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
380 defer cancel()
381 if err := mm.gc.Query(qctx, query, vars); err != nil {
382 return err
383 }
384 return nil
385}
386
387func stop(t *time.Timer) {
388 if !t.Stop() {
389 select {
390 case <-t.C:
391 default:
392 }
393 }
394}