import_mediator.go

  1package github
  2
  3import (
  4	"context"
  5	"time"
  6
  7	"github.com/shurcooL/githubv4"
  8)
  9
 10const (
 11	// These values influence how fast the github graphql rate limit is exhausted.
 12	NumIssues        = 40
 13	NumIssueEdits    = 100
 14	NumTimelineItems = 100
 15	NumCommentEdits  = 100
 16
 17	ChanCapacity = 128
 18)
 19
 20// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
 21type importMediator struct {
 22	// Github graphql client
 23	gh *rateLimitHandlerClient
 24
 25	// name of the repository owner on Github
 26	owner string
 27
 28	// name of the Github repository
 29	project string
 30
 31	// since specifies which issues to import. Issues that have been updated at or after the
 32	// given date should be imported.
 33	since time.Time
 34
 35	// importEvents holds events representing issues, comments, edits, ...
 36	// In this channel issues are immediately followed by their issue edits and comments are
 37	// immediately followed by their comment edits.
 38	importEvents chan ImportEvent
 39
 40	// Sticky error
 41	err error
 42}
 43
 44type ImportEvent interface {
 45	isImportEvent()
 46}
 47
 48func (RateLimitingEvent) isImportEvent() {}
 49
 50type IssueEvent struct {
 51	issue
 52}
 53
 54func (IssueEvent) isImportEvent() {}
 55
 56type IssueEditEvent struct {
 57	issueId githubv4.ID
 58	userContentEdit
 59}
 60
 61func (IssueEditEvent) isImportEvent() {}
 62
 63type TimelineEvent struct {
 64	issueId githubv4.ID
 65	timelineItem
 66}
 67
 68func (TimelineEvent) isImportEvent() {}
 69
 70type CommentEditEvent struct {
 71	commentId githubv4.ID
 72	userContentEdit
 73}
 74
 75func (CommentEditEvent) isImportEvent() {}
 76
 77func (mm *importMediator) NextImportEvent() ImportEvent {
 78	return <-mm.importEvents
 79}
 80
 81func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owner, project string, since time.Time) *importMediator {
 82	mm := importMediator{
 83		gh:           client,
 84		owner:        owner,
 85		project:      project,
 86		since:        since,
 87		importEvents: make(chan ImportEvent, ChanCapacity),
 88		err:          nil,
 89	}
 90	go func() {
 91		mm.fillImportEvents(ctx)
 92		ctx.Done()
 93		close(mm.importEvents)
 94	}()
 95	return &mm
 96}
 97
 98type varmap map[string]interface{}
 99
100func newIssueVars(owner, project string, since time.Time) varmap {
101	return varmap{
102		"owner":             githubv4.String(owner),
103		"name":              githubv4.String(project),
104		"issueSince":        githubv4.DateTime{Time: since},
105		"issueFirst":        githubv4.Int(NumIssues),
106		"issueEditLast":     githubv4.Int(NumIssueEdits),
107		"issueEditBefore":   (*githubv4.String)(nil),
108		"timelineFirst":     githubv4.Int(NumTimelineItems),
109		"timelineAfter":     (*githubv4.String)(nil),
110		"commentEditLast":   githubv4.Int(NumCommentEdits),
111		"commentEditBefore": (*githubv4.String)(nil),
112	}
113}
114
115func newIssueEditVars() varmap {
116	return varmap{
117		"issueEditLast": githubv4.Int(NumIssueEdits),
118	}
119}
120
121func newTimelineVars() varmap {
122	return varmap{
123		"timelineFirst":     githubv4.Int(NumTimelineItems),
124		"commentEditLast":   githubv4.Int(NumCommentEdits),
125		"commentEditBefore": (*githubv4.String)(nil),
126	}
127}
128
129func newCommentEditVars() varmap {
130	return varmap{
131		"commentEditLast": githubv4.Int(NumCommentEdits),
132	}
133}
134
135func (mm *importMediator) Error() error {
136	return mm.err
137}
138
139func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
140	query := userQuery{}
141	vars := varmap{"login": githubv4.String(loginName)}
142	if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
143		return nil, err
144	}
145	return &query.User, nil
146}
147
148func (mm *importMediator) fillImportEvents(ctx context.Context) {
149	initialCursor := githubv4.String("")
150	issues, hasIssues := mm.queryIssue(ctx, initialCursor)
151	for hasIssues {
152		for _, node := range issues.Nodes {
153			select {
154			case <-ctx.Done():
155				return
156			case mm.importEvents <- IssueEvent{node.issue}:
157			}
158
159			// issue edit events follow the issue event
160			mm.fillIssueEditEvents(ctx, &node)
161			// last come the timeline events
162			mm.fillTimelineEvents(ctx, &node)
163		}
164		if !issues.PageInfo.HasNextPage {
165			break
166		}
167		issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
168	}
169}
170
171func (mm *importMediator) fillIssueEditEvents(ctx context.Context, issueNode *issueNode) {
172	edits := &issueNode.UserContentEdits
173	hasEdits := true
174	for hasEdits {
175		for edit := range reverse(edits.Nodes) {
176			if edit.Diff == nil || string(*edit.Diff) == "" {
177				// issueEdit.Diff == nil happen if the event is older than early
178				// 2018, Github doesn't have the data before that. Best we can do is
179				// to ignore the event.
180				continue
181			}
182			select {
183			case <-ctx.Done():
184				return
185			case mm.importEvents <- IssueEditEvent{issueId: issueNode.issue.Id, userContentEdit: edit}:
186			}
187		}
188		if !edits.PageInfo.HasPreviousPage {
189			break
190		}
191		edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
192	}
193}
194
195func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
196	vars := newIssueEditVars()
197	vars["gqlNodeId"] = nid
198	if cursor == "" {
199		vars["issueEditBefore"] = (*githubv4.String)(nil)
200	} else {
201		vars["issueEditBefore"] = cursor
202	}
203	query := issueEditQuery{}
204	if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
205		mm.err = err
206		return nil, false
207	}
208	connection := &query.Node.Issue.UserContentEdits
209	if len(connection.Nodes) <= 0 {
210		return nil, false
211	}
212	return connection, true
213}
214
215func (mm *importMediator) fillTimelineEvents(ctx context.Context, issueNode *issueNode) {
216	items := &issueNode.TimelineItems
217	hasItems := true
218	for hasItems {
219		for _, item := range items.Nodes {
220			select {
221			case <-ctx.Done():
222				return
223			case mm.importEvents <- TimelineEvent{issueId: issueNode.issue.Id, timelineItem: item}:
224			}
225			if item.Typename == "IssueComment" {
226				// Issue comments are different than other timeline items in that
227				// they may have associated user content edits.
228				// Right after the comment we send the comment edits.
229				mm.fillCommentEdits(ctx, &item)
230			}
231		}
232		if !items.PageInfo.HasNextPage {
233			break
234		}
235		items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
236	}
237}
238
239func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
240	vars := newTimelineVars()
241	vars["gqlNodeId"] = nid
242	if cursor == "" {
243		vars["timelineAfter"] = (*githubv4.String)(nil)
244	} else {
245		vars["timelineAfter"] = cursor
246	}
247	query := timelineQuery{}
248	if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
249		mm.err = err
250		return nil, false
251	}
252	connection := &query.Node.Issue.TimelineItems
253	if len(connection.Nodes) <= 0 {
254		return nil, false
255	}
256	return connection, true
257}
258
259func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem) {
260	// Here we are only concerned with timeline items of type issueComment.
261	if item.Typename != "IssueComment" {
262		return
263	}
264	// First: setup message handling while submitting GraphQL queries.
265	comment := &item.IssueComment
266	edits := &comment.UserContentEdits
267	hasEdits := true
268	for hasEdits {
269		for edit := range reverse(edits.Nodes) {
270			if edit.Diff == nil || string(*edit.Diff) == "" {
271				// issueEdit.Diff == nil happen if the event is older than early
272				// 2018, Github doesn't have the data before that. Best we can do is
273				// to ignore the event.
274				continue
275			}
276			select {
277			case <-ctx.Done():
278				return
279			case mm.importEvents <- CommentEditEvent{commentId: comment.Id, userContentEdit: edit}:
280			}
281		}
282		if !edits.PageInfo.HasPreviousPage {
283			break
284		}
285		edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
286	}
287}
288
289func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
290	vars := newCommentEditVars()
291	vars["gqlNodeId"] = nid
292	if cursor == "" {
293		vars["commentEditBefore"] = (*githubv4.String)(nil)
294	} else {
295		vars["commentEditBefore"] = cursor
296	}
297	query := commentEditQuery{}
298	if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
299		mm.err = err
300		return nil, false
301	}
302	connection := &query.Node.IssueComment.UserContentEdits
303	if len(connection.Nodes) <= 0 {
304		return nil, false
305	}
306	return connection, true
307}
308
309func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
310	vars := newIssueVars(mm.owner, mm.project, mm.since)
311	if cursor == "" {
312		vars["issueAfter"] = (*githubv4.String)(nil)
313	} else {
314		vars["issueAfter"] = cursor
315	}
316	query := issueQuery{}
317	if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
318		mm.err = err
319		return nil, false
320	}
321	connection := &query.Repository.Issues
322	if len(connection.Nodes) <= 0 {
323		return nil, false
324	}
325	return connection, true
326}
327
328func reverse(eds []userContentEdit) chan userContentEdit {
329	ret := make(chan userContentEdit)
330	go func() {
331		for i := range eds {
332			ret <- eds[len(eds)-1-i]
333		}
334		close(ret)
335	}()
336	return ret
337}