import_mediator.go

  1package github
  2
  3import (
  4	"context"
  5	"fmt"
  6	"runtime"
  7	"sync"
  8	"time"
  9
 10	"github.com/shurcooL/githubv4"
 11)
 12
 13type varmap map[string]interface{}
 14
 15func trace() {
 16	pc := make([]uintptr, 15)
 17	n := runtime.Callers(2, pc)
 18	frames := runtime.CallersFrames(pc[:n])
 19	frame, _ := frames.Next()
 20	fmt.Printf("%s:%d %s\n", frame.File, frame.Line, frame.Function)
 21}
 22
 23const (
 24	NUM_ISSUES         = 50
 25	NUM_ISSUE_EDITS    = 99
 26	NUM_TIMELINE_ITEMS = 99
 27	NUM_COMMENT_EDITS  = 99
 28
 29	CHAN_CAPACITY = 128
 30)
 31
 32// TODO: remove all debug output and trace() in all files. Use ag
 33
 34type importMediator struct {
 35	// Github graphql client
 36	gc      *githubv4.Client
 37	owner   string
 38	project string
 39	// The iterator will only query issues updated or created after the date given in
 40	// the variable since.
 41	since time.Time
 42
 43	issues           chan issue
 44	issueEditsMut    sync.Mutex
 45	timelineItemsMut sync.Mutex
 46	commentEditsMut  sync.Mutex
 47	issueEdits       map[githubv4.ID]chan userContentEdit
 48	timelineItems    map[githubv4.ID]chan timelineItem
 49	commentEdits     map[githubv4.ID]chan userContentEdit
 50
 51	// Sticky error
 52	err error
 53}
 54
 55func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
 56	mm := importMediator{
 57		gc:               client,
 58		owner:            owner,
 59		project:          project,
 60		since:            since,
 61		issues:           make(chan issue, CHAN_CAPACITY),
 62		issueEditsMut:    sync.Mutex{},
 63		timelineItemsMut: sync.Mutex{},
 64		commentEditsMut:  sync.Mutex{},
 65		issueEdits:       make(map[githubv4.ID]chan userContentEdit),
 66		timelineItems:    make(map[githubv4.ID]chan timelineItem),
 67		commentEdits:     make(map[githubv4.ID]chan userContentEdit),
 68		err:              nil,
 69	}
 70	go func() {
 71		defer close(mm.issues)
 72		mm.fillChannels(ctx)
 73	}()
 74	return &mm
 75}
 76
 77func (mm *importMediator) Issues() <-chan issue {
 78	return mm.issues
 79}
 80
 81func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit {
 82	mm.issueEditsMut.Lock()
 83	channel := mm.issueEdits[issue.Id]
 84	mm.issueEditsMut.Unlock()
 85	return channel
 86}
 87
 88func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem {
 89	mm.timelineItemsMut.Lock()
 90	channel := mm.timelineItems[issue.Id]
 91	mm.timelineItemsMut.Unlock()
 92	return channel
 93}
 94
 95func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit {
 96	mm.commentEditsMut.Lock()
 97	channel := mm.commentEdits[comment.Id]
 98	mm.commentEditsMut.Unlock()
 99	return channel
100}
101
102func (mm *importMediator) Error() error {
103	return mm.err
104}
105
106func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
107	query := userQuery{}
108	vars := varmap{"login": githubv4.String(loginName)}
109	c, cancel := context.WithTimeout(ctx, defaultTimeout)
110	defer cancel()
111	if err := mm.mQuery(c, &query, vars); err != nil {
112		return nil, err
113	}
114	return &query.User, nil
115}
116
117func (mm *importMediator) fillChannels(ctx context.Context) {
118	issueCursor := githubv4.String("")
119	for {
120		issues, hasIssues := mm.queryIssue(ctx, issueCursor)
121		if !hasIssues {
122			break
123		}
124		issueCursor = issues.PageInfo.EndCursor
125		for _, issueNode := range issues.Nodes {
126			// fmt.Printf(">>> issue: %v\n", issueNode.issue.Title)
127			mm.fillChannelIssueEdits(ctx, &issueNode)
128			mm.fillChannelTimeline(ctx, &issueNode)
129			// To avoid race conditions add the issue only after all its edits,
130			// timeline times, etc. are added to their respective channels.
131			mm.issues <- issueNode.issue
132		}
133	}
134}
135
136func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode *issueNode) {
137	// fmt.Printf("fillChannelIssueEdit() issue id == %v\n", issueNode.issue.Id)
138	// fmt.Printf("%v\n", issueNode)
139	channel := make(chan userContentEdit, CHAN_CAPACITY)
140	defer close(channel)
141	mm.issueEditsMut.Lock()
142	mm.issueEdits[issueNode.issue.Id] = channel
143	mm.issueEditsMut.Unlock()
144	edits := &issueNode.UserContentEdits
145	hasEdits := true
146	for hasEdits {
147		// fmt.Println("before the reversed loop")
148		for edit := range reverse(edits.Nodes) {
149			// fmt.Println("in the reversed loop")
150			if edit.Diff == nil || string(*edit.Diff) == "" {
151				// issueEdit.Diff == nil happen if the event is older than
152				// early 2018, Github doesn't have the data before that.
153				// Best we can do is to ignore the event.
154				continue
155			}
156			// fmt.Printf("about to push issue edit\n")
157			channel <- edit
158		}
159		// fmt.Printf("has next ? %v\n", edits.PageInfo.HasNextPage)
160		// fmt.Printf("has previous ? %v\n", edits.PageInfo.HasPreviousPage)
161		if !edits.PageInfo.HasPreviousPage {
162			break
163		}
164		edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
165	}
166}
167
168func (mm *importMediator) fillChannelTimeline(ctx context.Context, issueNode *issueNode) {
169	// fmt.Printf("fullChannelTimeline()\n")
170	channel := make(chan timelineItem, CHAN_CAPACITY)
171	defer close(channel)
172	mm.timelineItemsMut.Lock()
173	mm.timelineItems[issueNode.issue.Id] = channel
174	mm.timelineItemsMut.Unlock()
175	items := &issueNode.TimelineItems
176	hasItems := true
177	for hasItems {
178		for _, item := range items.Nodes {
179			channel <- item
180			mm.fillChannelCommentEdits(ctx, &item)
181		}
182		// fmt.Printf("has next ? %v\n", items.PageInfo.HasNextPage)
183		// fmt.Printf("has previous ? %v\n", items.PageInfo.HasPreviousPage)
184		if !items.PageInfo.HasNextPage {
185			break
186		}
187		items, hasItems = mm.queryTimelineItems(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
188	}
189}
190
191func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *timelineItem) {
192	// This concerns only timeline items of type comment
193	if item.Typename != "IssueComment" {
194		return
195	}
196	comment := &item.IssueComment
197	channel := make(chan userContentEdit, CHAN_CAPACITY)
198	defer close(channel)
199	mm.commentEditsMut.Lock()
200	mm.commentEdits[comment.Id] = channel
201	mm.commentEditsMut.Unlock()
202	edits := &comment.UserContentEdits
203	hasEdits := true
204	for hasEdits {
205		for edit := range reverse(edits.Nodes) {
206			if edit.Diff == nil || string(*edit.Diff) == "" {
207				// issueEdit.Diff == nil happen if the event is older than
208				// early 2018, Github doesn't have the data before that.
209				// Best we can do is to ignore the event.
210				continue
211			}
212			channel <- edit
213		}
214		if !edits.PageInfo.HasPreviousPage {
215			break
216		}
217		edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
218	}
219}
220
221func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
222	// trace()
223	vars := varmap{
224		"gqlNodeId":       nid,
225		"commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
226	}
227	if cursor == "" {
228		vars["commentEditBefore"] = (*githubv4.String)(nil)
229	} else {
230		vars["commentEditBefore"] = cursor
231	}
232	c, cancel := context.WithTimeout(ctx, defaultTimeout)
233	defer cancel()
234	query := commentEditQuery{}
235	if err := mm.mQuery(c, &query, vars); err != nil {
236		mm.err = err
237		return nil, false
238	}
239	connection := &query.Node.IssueComment.UserContentEdits
240	if len(connection.Nodes) <= 0 {
241		return nil, false
242	}
243	return connection, true
244}
245
246func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
247	// trace()
248	vars := varmap{
249		"gqlNodeId":         nid,
250		"timelineFirst":     githubv4.Int(NUM_TIMELINE_ITEMS),
251		"commentEditLast":   githubv4.Int(NUM_COMMENT_EDITS),
252		"commentEditBefore": (*githubv4.String)(nil),
253	}
254	if cursor == "" {
255		vars["timelineAfter"] = (*githubv4.String)(nil)
256	} else {
257		vars["timelineAfter"] = cursor
258	}
259	c, cancel := context.WithTimeout(ctx, defaultTimeout)
260	defer cancel()
261	query := timelineQuery{}
262	if err := mm.mQuery(c, &query, vars); err != nil {
263		mm.err = err
264		return nil, false
265	}
266	connection := &query.Node.Issue.TimelineItems
267	if len(connection.Nodes) <= 0 {
268		return nil, false
269	}
270	return connection, true
271}
272
273func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
274	// trace()
275	vars := varmap{
276		"gqlNodeId":     nid,
277		"issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
278	}
279	if cursor == "" {
280		vars["issueEditBefore"] = (*githubv4.String)(nil)
281	} else {
282		vars["issueEditBefore"] = cursor
283	}
284	c, cancel := context.WithTimeout(ctx, defaultTimeout)
285	defer cancel()
286	query := issueEditQuery{}
287	if err := mm.mQuery(c, &query, vars); err != nil {
288		mm.err = err
289		return nil, false
290	}
291	connection := &query.Node.Issue.UserContentEdits
292	if len(connection.Nodes) <= 0 {
293		return nil, false
294	}
295	return connection, true
296}
297
298func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
299	// trace()
300	vars := varmap{
301		"owner":             githubv4.String(mm.owner),
302		"name":              githubv4.String(mm.project),
303		"issueSince":        githubv4.DateTime{Time: mm.since},
304		"issueFirst":        githubv4.Int(NUM_ISSUES),
305		"issueEditLast":     githubv4.Int(NUM_ISSUE_EDITS),
306		"issueEditBefore":   (*githubv4.String)(nil),
307		"timelineFirst":     githubv4.Int(NUM_TIMELINE_ITEMS),
308		"timelineAfter":     (*githubv4.String)(nil),
309		"commentEditLast":   githubv4.Int(NUM_COMMENT_EDITS),
310		"commentEditBefore": (*githubv4.String)(nil),
311	}
312	if cursor == "" {
313		vars["issueAfter"] = (*githubv4.String)(nil)
314	} else {
315		vars["issueAfter"] = githubv4.String(cursor)
316	}
317	c, cancel := context.WithTimeout(ctx, defaultTimeout)
318	defer cancel()
319	query := issueQuery{}
320	if err := mm.mQuery(c, &query, vars); err != nil {
321		mm.err = err
322		return nil, false
323	}
324	connection := &query.Repository.Issues
325	if len(connection.Nodes) <= 0 {
326		return nil, false
327	}
328	return connection, true
329}
330
331func reverse(eds []userContentEdit) chan userContentEdit {
332	ret := make(chan userContentEdit)
333	go func() {
334		for i := range eds {
335			ret <- eds[len(eds)-1-i]
336		}
337		close(ret)
338	}()
339	return ret
340}
341
342type rateLimiter interface {
343	rateLimit() rateLimit
344}
345
346// TODO: move that into its own file
347//
348// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL
349// query and it is used to populate the response into it. It should be a pointer to a struct
350// that corresponds to the Github graphql schema and it should implement the rateLimiter
351// interface. This function queries Github for the remaining rate limit points before
352// executing the actual query. The function waits, if there are not enough rate limiting
353// points left.
354func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
355	// First: check the cost of the query and wait if necessary
356	vars["dryRun"] = githubv4.Boolean(true)
357	qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
358	defer cancel()
359	if err := mm.gc.Query(qctx, query, vars); err != nil {
360		return err
361	}
362	fmt.Printf("%v\n", query)
363	rateLimit := query.rateLimit()
364	if rateLimit.Cost > rateLimit.Remaining {
365		resetTime := rateLimit.ResetAt.Time
366		fmt.Println("Github rate limit exhausted")
367		fmt.Printf("Sleeping until %s\n", resetTime.String())
368		// Add a few seconds (8) for good measure
369		timer := time.NewTimer(time.Until(resetTime.Add(8 * time.Second)))
370		select {
371		case <-ctx.Done():
372			stop(timer)
373			return ctx.Err()
374		case <-timer.C:
375		}
376	}
377	// Second: Do the actual query
378	vars["dryRun"] = githubv4.Boolean(false)
379	qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
380	defer cancel()
381	if err := mm.gc.Query(qctx, query, vars); err != nil {
382		return err
383	}
384	return nil
385}
386
387func stop(t *time.Timer) {
388	if !t.Stop() {
389		select {
390		case <-t.C:
391		default:
392		}
393	}
394}