@@ -32,12 +32,12 @@ type githubExporter struct {
conf core.Configuration
// cache identities clients
- identityClient map[entity.Id]*githubv4.Client
+ identityClient map[entity.Id]*rateLimitHandlerClient
// the client to use for non user-specific queries
// it's the client associated to the "default-login" config
// used for the github V4 API (graphql)
- defaultClient *githubv4.Client
+ defaultClient *rateLimitHandlerClient
// the token of the default user
// it's the token associated to the "default-login" config
@@ -53,12 +53,15 @@ type githubExporter struct {
// cache labels used to speed up exporting labels events
cachedLabels map[string]string
+
+ // channel to send export results
+ out chan<- core.ExportResult
}
// Init .
func (ge *githubExporter) Init(_ context.Context, repo *cache.RepoCache, conf core.Configuration) error {
ge.conf = conf
- ge.identityClient = make(map[entity.Id]*githubv4.Client)
+ ge.identityClient = make(map[entity.Id]*rateLimitHandlerClient)
ge.cachedOperationIDs = make(map[entity.Id]string)
ge.cachedLabels = make(map[string]string)
@@ -114,7 +117,7 @@ func (ge *githubExporter) cacheAllClient(repo *cache.RepoCache) error {
}
// getClientForIdentity return a githubv4 API client configured with the access token of the given identity.
-func (ge *githubExporter) getClientForIdentity(userId entity.Id) (*githubv4.Client, error) {
+func (ge *githubExporter) getClientForIdentity(userId entity.Id) (*rateLimitHandlerClient, error) {
client, ok := ge.identityClient[userId]
if ok {
return client, nil
@@ -126,6 +129,7 @@ func (ge *githubExporter) getClientForIdentity(userId entity.Id) (*githubv4.Clie
// ExportAll export all event made by the current user to Github
func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) {
out := make(chan core.ExportResult)
+ ge.out = out
var err error
// get repository node id
@@ -139,15 +143,16 @@ func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache,
return nil, err
}
- // query all labels
- err = ge.cacheGithubLabels(ctx, ge.defaultClient)
- if err != nil {
- return nil, err
- }
-
go func() {
defer close(out)
+ // query all labels
+ err = ge.cacheGithubLabels(ctx, ge.defaultClient)
+ if err != nil {
+ out <- core.NewExportError(errors.Wrap(err, "can't obtain Github labels"), "")
+ return
+ }
+
allIdentitiesIds := make([]entity.Id, 0, len(ge.identityClient))
for id := range ge.identityClient {
allIdentitiesIds = append(allIdentitiesIds, id)
@@ -250,7 +255,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
}
// create bug
- id, url, err := createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message)
+ id, url, err := ge.createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message)
if err != nil {
err := errors.Wrap(err, "exporting github issue")
out <- core.NewExportError(err, b.Id())
@@ -304,7 +309,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
switch op := op.(type) {
case *bug.AddCommentOperation:
// send operation to github
- id, url, err = addCommentGithubIssue(ctx, client, bugGithubID, op.Message)
+ id, url, err = ge.addCommentGithubIssue(ctx, client, bugGithubID, op.Message)
if err != nil {
err := errors.Wrap(err, "adding comment")
out <- core.NewExportError(err, b.Id())
@@ -321,7 +326,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
if op.Target == createOp.Id() {
// case bug creation operation: we need to edit the Github issue
- if err := updateGithubIssueBody(ctx, client, bugGithubID, op.Message); err != nil {
+ if err := ge.updateGithubIssueBody(ctx, client, bugGithubID, op.Message); err != nil {
err := errors.Wrap(err, "editing issue")
out <- core.NewExportError(err, b.Id())
return
@@ -340,7 +345,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
panic("unexpected error: comment id not found")
}
- eid, eurl, err := editCommentGithubIssue(ctx, client, commentID, op.Message)
+ eid, eurl, err := ge.editCommentGithubIssue(ctx, client, commentID, op.Message)
if err != nil {
err := errors.Wrap(err, "editing comment")
out <- core.NewExportError(err, b.Id())
@@ -355,7 +360,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
}
case *bug.SetStatusOperation:
- if err := updateGithubIssueStatus(ctx, client, bugGithubID, op.Status); err != nil {
+ if err := ge.updateGithubIssueStatus(ctx, client, bugGithubID, op.Status); err != nil {
err := errors.Wrap(err, "editing status")
out <- core.NewExportError(err, b.Id())
return
@@ -367,7 +372,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
url = bugGithubURL
case *bug.SetTitleOperation:
- if err := updateGithubIssueTitle(ctx, client, bugGithubID, op.Title); err != nil {
+ if err := ge.updateGithubIssueTitle(ctx, client, bugGithubID, op.Title); err != nil {
err := errors.Wrap(err, "editing title")
out <- core.NewExportError(err, b.Id())
return
@@ -472,7 +477,7 @@ func markOperationAsExported(b *cache.BugCache, target entity.Id, githubID, gith
return err
}
-func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *githubv4.Client) error {
+func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *rateLimitHandlerClient) error {
variables := map[string]interface{}{
"owner": githubv4.String(ge.conf[confKeyOwner]),
"name": githubv4.String(ge.conf[confKeyProject]),
@@ -481,17 +486,25 @@ func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *githubv4.Cl
}
q := labelsQuery{}
+ // When performing the queries we have to forward rate limiting events to the
+ // current channel of export results.
+ events := make(chan RateLimitingEvent)
+ defer close(events)
+ go func() {
+ for e := range events {
+ select {
+ case <-ctx.Done():
+ return
+ case ge.out <- core.NewExportRateLimiting(e.msg):
+ }
+ }
+ }()
hasNextPage := true
for hasNextPage {
- // create a new timeout context at each iteration
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
-
- if err := gc.Query(ctx, &q, variables); err != nil {
- cancel()
+ if err := gc.queryWithLimitEvents(ctx, &q, variables, events); err != nil {
return err
}
- cancel()
for _, label := range q.Repository.Labels.Nodes {
ge.cachedLabels[label.Name] = label.ID
@@ -584,11 +597,9 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC
Color: githubv4.String(labelColor),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
- defer cancel()
+ ctx := context.Background()
- if err := gc.Mutate(ctx, &m, input, nil); err != nil {
+ if err := gc.mutate(ctx, &m, input, nil); err != nil {
return "", err
}
@@ -596,7 +607,7 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC
}
*/
-func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) {
+func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *rateLimitHandlerClient, repositoryID string, label bug.Label) (string, error) {
// try to get label id from cache
labelID, err := ge.getLabelID(string(label))
if err == nil {
@@ -618,7 +629,7 @@ func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *gith
return labelID, nil
}
-func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) {
+func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *rateLimitHandlerClient, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) {
ids := make([]githubv4.ID, 0, len(labels))
var err error
@@ -643,7 +654,7 @@ func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client,
}
// create a github issue and return it ID
-func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, title, body string) (string, string, error) {
+func (ge *githubExporter) createGithubIssue(ctx context.Context, gc *rateLimitHandlerClient, repositoryID, title, body string) (string, string, error) {
m := &createIssueMutation{}
input := githubv4.CreateIssueInput{
RepositoryID: repositoryID,
@@ -651,10 +662,7 @@ func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, t
Body: (*githubv4.String)(&body),
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return "", "", err
}
@@ -663,17 +671,14 @@ func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, t
}
// add a comment to an issue and return it ID
-func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID string, body string) (string, string, error) {
+func (ge *githubExporter) addCommentGithubIssue(ctx context.Context, gc *rateLimitHandlerClient, subjectID string, body string) (string, string, error) {
m := &addCommentToIssueMutation{}
input := githubv4.AddCommentInput{
SubjectID: subjectID,
Body: githubv4.String(body),
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return "", "", err
}
@@ -681,24 +686,21 @@ func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID s
return node.ID, node.URL, nil
}
-func editCommentGithubIssue(ctx context.Context, gc *githubv4.Client, commentID, body string) (string, string, error) {
+func (ge *githubExporter) editCommentGithubIssue(ctx context.Context, gc *rateLimitHandlerClient, commentID, body string) (string, string, error) {
m := &updateIssueCommentMutation{}
input := githubv4.UpdateIssueCommentInput{
ID: commentID,
Body: githubv4.String(body),
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return "", "", err
}
return commentID, m.UpdateIssueComment.IssueComment.URL, nil
}
-func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string, status bug.Status) error {
+func (ge *githubExporter) updateGithubIssueStatus(ctx context.Context, gc *rateLimitHandlerClient, id string, status bug.Status) error {
m := &updateIssueMutation{}
// set state
@@ -718,44 +720,35 @@ func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string
State: &state,
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return err
}
return nil
}
-func updateGithubIssueBody(ctx context.Context, gc *githubv4.Client, id string, body string) error {
+func (ge *githubExporter) updateGithubIssueBody(ctx context.Context, gc *rateLimitHandlerClient, id string, body string) error {
m := &updateIssueMutation{}
input := githubv4.UpdateIssueInput{
ID: id,
Body: (*githubv4.String)(&body),
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return err
}
return nil
}
-func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title string) error {
+func (ge *githubExporter) updateGithubIssueTitle(ctx context.Context, gc *rateLimitHandlerClient, id, title string) error {
m := &updateIssueMutation{}
input := githubv4.UpdateIssueInput{
ID: id,
Title: (*githubv4.String)(&title),
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return err
}
@@ -763,9 +756,7 @@ func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title
}
// update github issue labels
-func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githubv4.Client, labelableID string, added, removed []bug.Label) error {
- reqCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
+func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *rateLimitHandlerClient, labelableID string, added, removed []bug.Label) error {
wg, ctx := errgroup.WithContext(ctx)
if len(added) > 0 {
@@ -782,7 +773,7 @@ func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githu
}
// add labels
- if err := gc.Mutate(reqCtx, m, inputAdd, nil); err != nil {
+ if err := gc.mutate(ctx, m, inputAdd, nil, ge.out); err != nil {
return err
}
return nil
@@ -803,7 +794,7 @@ func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githu
}
// remove label labels
- if err := gc.Mutate(reqCtx, m2, inputRemove, nil); err != nil {
+ if err := gc.mutate(ctx, m2, inputRemove, nil, ge.out); err != nil {
return err
}
return nil
@@ -2,8 +2,6 @@ package github
import (
"context"
- "fmt"
- "strings"
"time"
"github.com/shurcooL/githubv4"
@@ -22,7 +20,7 @@ const (
// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
type importMediator struct {
// Github graphql client
- gc *githubv4.Client
+ gh *rateLimitHandlerClient
// name of the repository owner on Github
owner string
@@ -47,10 +45,6 @@ type ImportEvent interface {
isImportEvent()
}
-type RateLimitingEvent struct {
- msg string
-}
-
func (RateLimitingEvent) isImportEvent() {}
type IssueEvent struct {
@@ -84,9 +78,9 @@ func (mm *importMediator) NextImportEvent() ImportEvent {
return <-mm.importEvents
}
-func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
+func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owner, project string, since time.Time) *importMediator {
mm := importMediator{
- gc: client,
+ gh: client,
owner: owner,
project: project,
since: since,
@@ -144,7 +138,7 @@ func (mm *importMediator) Error() error {
func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
query := userQuery{}
vars := varmap{"login": githubv4.String(loginName)}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
return nil, err
}
return &query.User, nil
@@ -206,7 +200,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
vars["issueEditBefore"] = cursor
}
query := issueEditQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -250,7 +244,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
vars["timelineAfter"] = cursor
}
query := timelineQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -300,7 +294,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
vars["commentEditBefore"] = cursor
}
query := commentEditQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -319,7 +313,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
vars["issueAfter"] = cursor
}
query := issueQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -340,97 +334,3 @@ func reverse(eds []userContentEdit) chan userContentEdit {
}()
return ret
}
-
-// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
-// and it is used to populate the response into it. It should be a pointer to a struct that
-// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
-// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
-// is expired. If there is another error, then the method will retry before giving up.
-func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
- if err := mm.queryOnce(ctx, query, vars); err == nil {
- // success: done
- return nil
- }
- // failure: we will retry
- // To retry is important for importing projects with a big number of issues, because
- // there may be temporary network errors or momentary internal errors of the github servers.
- retries := 3
- var err error
- for i := 0; i < retries; i++ {
- // wait a few seconds before retry
- sleepTime := time.Duration(8*(i+1)) * time.Second
- timer := time.NewTimer(sleepTime)
- select {
- case <-ctx.Done():
- stop(timer)
- return ctx.Err()
- case <-timer.C:
- }
- err = mm.queryOnce(ctx, query, vars)
- if err == nil {
- // success: done
- return nil
- }
- }
- return err
-}
-
-func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
- // first: just send the query to the graphql api
- vars["dryRun"] = githubv4.Boolean(false)
- qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
- err := mm.gc.Query(qctx, query, vars)
- if err == nil {
- // no error: done
- return nil
- }
- // matching the error string
- if !strings.Contains(err.Error(), "API rate limit exceeded") {
- // an error, but not the API rate limit error: done
- return err
- }
- // a rate limit error
- // ask the graphql api for rate limiting information
- vars["dryRun"] = githubv4.Boolean(true)
- qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
- if err := mm.gc.Query(qctx, query, vars); err != nil {
- return err
- }
- rateLimit := query.rateLimit()
- if rateLimit.Cost > rateLimit.Remaining {
- // sleep
- resetTime := rateLimit.ResetAt.Time
- // Add a few seconds (8) for good measure
- resetTime = resetTime.Add(8 * time.Second)
- msg := fmt.Sprintf("Github GraphQL API: import will sleep until %s", resetTime.String())
- select {
- case <-ctx.Done():
- return ctx.Err()
- case mm.importEvents <- RateLimitingEvent{msg}:
- }
- timer := time.NewTimer(time.Until(resetTime))
- select {
- case <-ctx.Done():
- stop(timer)
- return ctx.Err()
- case <-timer.C:
- }
- }
- // run the original query again
- vars["dryRun"] = githubv4.Boolean(false)
- qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
- err = mm.gc.Query(qctx, query, vars)
- return err // might be nil
-}
-
-func stop(t *time.Timer) {
- if !t.Stop() {
- select {
- case <-t.C:
- default:
- }
- }
-}