bridge/core: add context.Context to ImportAll and ExportAll signatures

Amine Hilaly created

bridge/core: add ImportResult objects to stream import events

bridge/core: launchpad support asynchronous import

bridge/github: cancellable export and import functions

bridge/gitlab: cancellable export and import functions

commands: bridge pull/push gracefull kill

bridge/github: fix github import

bridge/github: use simple context for imports

bridge/core: name parameters in interfaces

github/core: Add EventError to export and import events types

bridge/gitlab: add context support in gitlab requests functions

bridge/gitlab: remove imported events count from importer logic

bridge/github: remove imported events count from importer logic

bridge/github: add context support in query and muration requets

bridge/github: fix bug duplicate editions after multiple calls

bridge/core: import import and export events String methods
bridge/gitlab: fix error handling in note import events

commands/bridge: Add statistics about imports and exports

bridge/gitlab: properly handle context cancellation

bridge/github: improve error handling

bridge: break iterators on context cancel or timeout

bridge: add context timeout support

bridge: improve event formating and error handling

commands: handle interrupt and switch cases

bridge/github: add export mutation timeouts

bridge: fix race condition bug in the github and gitlab importers
bridge/github: improve context error handling

Change summary

bridge/core/bridge.go             |  15 +-
bridge/core/export.go             |  24 ++-
bridge/core/import.go             | 128 +++++++++++++++++++++
bridge/core/interfaces.go         |   5 
bridge/github/export.go           | 140 +++++++++++-----------
bridge/github/export_test.go      |  12 +
bridge/github/import.go           | 199 ++++++++++++++++++++++----------
bridge/github/import_test.go      |   8 +
bridge/github/iterator.go         |  58 +++++----
bridge/gitlab/import.go           | 156 +++++++++++++++---------
bridge/gitlab/import_notes.go     |  39 ++++++
bridge/gitlab/import_test.go      |   9 +
bridge/gitlab/iterator.go         |  33 +++++
bridge/launchpad/import.go        | 185 ++++++++++++++++-------------
bridge/launchpad/launchpad_api.go |  54 +-------
cache/repo_cache.go               |  16 -
commands/bridge_pull.go           |  53 ++++++++
commands/bridge_push.go           |  50 +++++++
entity/merge.go                   |   8 
19 files changed, 802 insertions(+), 390 deletions(-)

Detailed changes

bridge/core/bridge.go 🔗

@@ -2,6 +2,7 @@
 package core
 
 import (
+	"context"
 	"fmt"
 	"reflect"
 	"regexp"
@@ -289,26 +290,26 @@ func (b *Bridge) ensureInit() error {
 	return nil
 }
 
-func (b *Bridge) ImportAll(since time.Time) error {
+func (b *Bridge) ImportAll(ctx context.Context, since time.Time) (<-chan ImportResult, error) {
 	importer := b.getImporter()
 	if importer == nil {
-		return ErrImportNotSupported
+		return nil, ErrImportNotSupported
 	}
 
 	err := b.ensureConfig()
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	err = b.ensureInit()
 	if err != nil {
-		return err
+		return nil, err
 	}
 
-	return importer.ImportAll(b.repo, since)
+	return importer.ImportAll(ctx, b.repo, since)
 }
 
-func (b *Bridge) ExportAll(since time.Time) (<-chan ExportResult, error) {
+func (b *Bridge) ExportAll(ctx context.Context, since time.Time) (<-chan ExportResult, error) {
 	exporter := b.getExporter()
 	if exporter == nil {
 		return nil, ErrExportNotSupported
@@ -324,5 +325,5 @@ func (b *Bridge) ExportAll(since time.Time) (<-chan ExportResult, error) {
 		return nil, err
 	}
 
-	return exporter.ExportAll(b.repo, since)
+	return exporter.ExportAll(ctx, b.repo, since)
 }

bridge/core/export.go 🔗

@@ -17,6 +17,7 @@ const (
 	ExportEventTitleEdition
 	ExportEventLabelChange
 	ExportEventNothing
+	ExportEventError
 )
 
 // ExportResult is an event that is emitted during the export process, to
@@ -32,19 +33,28 @@ type ExportResult struct {
 func (er ExportResult) String() string {
 	switch er.Event {
 	case ExportEventBug:
-		return "new issue"
+		return fmt.Sprintf("new issue: %s", er.ID)
 	case ExportEventComment:
-		return "new comment"
+		return fmt.Sprintf("new comment: %s", er.ID)
 	case ExportEventCommentEdition:
-		return "updated comment"
+		return fmt.Sprintf("updated comment: %s", er.ID)
 	case ExportEventStatusChange:
-		return "changed status"
+		return fmt.Sprintf("changed status: %s", er.ID)
 	case ExportEventTitleEdition:
-		return "changed title"
+		return fmt.Sprintf("changed title: %s", er.ID)
 	case ExportEventLabelChange:
-		return "changed label"
+		return fmt.Sprintf("changed label: %s", er.ID)
 	case ExportEventNothing:
-		return fmt.Sprintf("no event: %v", er.Reason)
+		if er.ID != "" {
+			return fmt.Sprintf("ignoring export event %s: %s", er.ID, er.Reason)
+		}
+		return fmt.Sprintf("ignoring export event: %s", er.Reason)
+	case ExportEventError:
+		if er.ID != "" {
+			return fmt.Sprintf("export error at %s: %s", er.ID, er.Err.Error())
+		}
+		return fmt.Sprintf("export error: %s", er.Err.Error())
+
 	default:
 		panic("unknown export result")
 	}

bridge/core/import.go 🔗

@@ -0,0 +1,128 @@
+package core
+
+import (
+	"fmt"
+
+	"github.com/MichaelMure/git-bug/entity"
+)
+
+type ImportEvent int
+
+const (
+	_ ImportEvent = iota
+	ImportEventBug
+	ImportEventComment
+	ImportEventCommentEdition
+	ImportEventStatusChange
+	ImportEventTitleEdition
+	ImportEventLabelChange
+	ImportEventIdentity
+	ImportEventNothing
+	ImportEventError
+)
+
+// ImportResult is an event that is emitted during the import process, to
+// allow calling code to report on what is happening, collect metrics or
+// display meaningful errors if something went wrong.
+type ImportResult struct {
+	Err    error
+	Event  ImportEvent
+	ID     entity.Id
+	Reason string
+}
+
+func (er ImportResult) String() string {
+	switch er.Event {
+	case ImportEventBug:
+		return fmt.Sprintf("new issue: %s", er.ID)
+	case ImportEventComment:
+		return fmt.Sprintf("new comment: %s", er.ID)
+	case ImportEventCommentEdition:
+		return fmt.Sprintf("updated comment: %s", er.ID)
+	case ImportEventStatusChange:
+		return fmt.Sprintf("changed status: %s", er.ID)
+	case ImportEventTitleEdition:
+		return fmt.Sprintf("changed title: %s", er.ID)
+	case ImportEventLabelChange:
+		return fmt.Sprintf("changed label: %s", er.ID)
+	case ImportEventIdentity:
+		return fmt.Sprintf("new identity: %s", er.ID)
+	case ImportEventNothing:
+		if er.ID != "" {
+			return fmt.Sprintf("ignoring import event %s: %s", er.ID, er.Reason)
+		}
+		return fmt.Sprintf("ignoring event: %s", er.Reason)
+	case ImportEventError:
+		if er.ID != "" {
+			return fmt.Sprintf("import error at id %s: %s", er.ID, er.Err.Error())
+		}
+		return fmt.Sprintf("import error: %s", er.Err.Error())
+	default:
+		panic("unknown import result")
+	}
+}
+
+func NewImportError(err error, id entity.Id) ImportResult {
+	return ImportResult{
+		Err:   err,
+		ID:    id,
+		Event: ImportEventError,
+	}
+}
+
+func NewImportNothing(id entity.Id, reason string) ImportResult {
+	return ImportResult{
+		ID:     id,
+		Reason: reason,
+		Event:  ImportEventNothing,
+	}
+}
+
+func NewImportBug(id entity.Id) ImportResult {
+	return ImportResult{
+		ID:    id,
+		Event: ImportEventBug,
+	}
+}
+
+func NewImportComment(id entity.Id) ImportResult {
+	return ImportResult{
+		ID:    id,
+		Event: ImportEventComment,
+	}
+}
+
+func NewImportCommentEdition(id entity.Id) ImportResult {
+	return ImportResult{
+		ID:    id,
+		Event: ImportEventCommentEdition,
+	}
+}
+
+func NewImportStatusChange(id entity.Id) ImportResult {
+	return ImportResult{
+		ID:    id,
+		Event: ImportEventStatusChange,
+	}
+}
+
+func NewImportLabelChange(id entity.Id) ImportResult {
+	return ImportResult{
+		ID:    id,
+		Event: ImportEventLabelChange,
+	}
+}
+
+func NewImportTitleEdition(id entity.Id) ImportResult {
+	return ImportResult{
+		ID:    id,
+		Event: ImportEventTitleEdition,
+	}
+}
+
+func NewImportIdentity(id entity.Id) ImportResult {
+	return ImportResult{
+		ID:    id,
+		Event: ImportEventIdentity,
+	}
+}

bridge/core/interfaces.go 🔗

@@ -1,6 +1,7 @@
 package core
 
 import (
+	"context"
 	"time"
 
 	"github.com/MichaelMure/git-bug/cache"
@@ -29,10 +30,10 @@ type BridgeImpl interface {
 
 type Importer interface {
 	Init(conf Configuration) error
-	ImportAll(repo *cache.RepoCache, since time.Time) error
+	ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan ImportResult, error)
 }
 
 type Exporter interface {
 	Init(conf Configuration) error
-	ExportAll(repo *cache.RepoCache, since time.Time) (<-chan ExportResult, error)
+	ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan ExportResult, error)
 }

bridge/github/export.go 🔗

@@ -79,7 +79,7 @@ func (ge *githubExporter) getIdentityClient(id entity.Id) (*githubv4.Client, err
 }
 
 // ExportAll export all event made by the current user to Github
-func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) {
+func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) {
 	out := make(chan core.ExportResult)
 
 	user, err := repo.GetUserIdentity()
@@ -91,6 +91,7 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c
 
 	// get repository node id
 	ge.repositoryID, err = getRepositoryNodeID(
+		ctx,
 		ge.conf[keyOwner],
 		ge.conf[keyProject],
 		ge.conf[keyToken],
@@ -117,20 +118,28 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c
 				return
 			}
 
-			snapshot := b.Snapshot()
+			select {
 
-			// ignore issues created before since date
-			// TODO: compare the Lamport time instead of using the unix time
-			if snapshot.CreatedAt.Before(since) {
-				out <- core.NewExportNothing(b.Id(), "bug created before the since date")
-				continue
-			}
+			case <-ctx.Done():
+				// stop iterating if context cancel function is called
+				return
 
-			if snapshot.HasAnyActor(allIdentitiesIds...) {
-				// try to export the bug and it associated events
-				ge.exportBug(b, since, out)
-			} else {
-				out <- core.NewExportNothing(id, "not an actor")
+			default:
+				snapshot := b.Snapshot()
+
+				// ignore issues created before since date
+				// TODO: compare the Lamport time instead of using the unix time
+				if snapshot.CreatedAt.Before(since) {
+					out <- core.NewExportNothing(b.Id(), "bug created before the since date")
+					continue
+				}
+
+				if snapshot.HasAnyActor(allIdentitiesIds...) {
+					// try to export the bug and it associated events
+					ge.exportBug(ctx, b, since, out)
+				} else {
+					out <- core.NewExportNothing(id, "not an actor")
+				}
 			}
 		}
 	}()
@@ -139,7 +148,7 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c
 }
 
 // exportBug publish bugs and related events
-func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan<- core.ExportResult) {
+func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, since time.Time, out chan<- core.ExportResult) {
 	snapshot := b.Snapshot()
 
 	var bugGithubID string
@@ -199,7 +208,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
 		}
 
 		// create bug
-		id, url, err := createGithubIssue(client, ge.repositoryID, createOp.Title, createOp.Message)
+		id, url, err := createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message)
 		if err != nil {
 			err := errors.Wrap(err, "exporting github issue")
 			out <- core.NewExportError(err, b.Id())
@@ -257,7 +266,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
 			opr := op.(*bug.AddCommentOperation)
 
 			// send operation to github
-			id, url, err = addCommentGithubIssue(client, bugGithubID, opr.Message)
+			id, url, err = addCommentGithubIssue(ctx, client, bugGithubID, opr.Message)
 			if err != nil {
 				err := errors.Wrap(err, "adding comment")
 				out <- core.NewExportError(err, b.Id())
@@ -277,7 +286,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
 			if opr.Target == createOp.Id() {
 
 				// case bug creation operation: we need to edit the Github issue
-				if err := updateGithubIssueBody(client, bugGithubID, opr.Message); err != nil {
+				if err := updateGithubIssueBody(ctx, client, bugGithubID, opr.Message); err != nil {
 					err := errors.Wrap(err, "editing issue")
 					out <- core.NewExportError(err, b.Id())
 					return
@@ -296,7 +305,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
 					panic("unexpected error: comment id not found")
 				}
 
-				eid, eurl, err := editCommentGithubIssue(client, commentID, opr.Message)
+				eid, eurl, err := editCommentGithubIssue(ctx, client, commentID, opr.Message)
 				if err != nil {
 					err := errors.Wrap(err, "editing comment")
 					out <- core.NewExportError(err, b.Id())
@@ -312,7 +321,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
 
 		case *bug.SetStatusOperation:
 			opr := op.(*bug.SetStatusOperation)
-			if err := updateGithubIssueStatus(client, bugGithubID, opr.Status); err != nil {
+			if err := updateGithubIssueStatus(ctx, client, bugGithubID, opr.Status); err != nil {
 				err := errors.Wrap(err, "editing status")
 				out <- core.NewExportError(err, b.Id())
 				return
@@ -325,7 +334,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
 
 		case *bug.SetTitleOperation:
 			opr := op.(*bug.SetTitleOperation)
-			if err := updateGithubIssueTitle(client, bugGithubID, opr.Title); err != nil {
+			if err := updateGithubIssueTitle(ctx, client, bugGithubID, opr.Title); err != nil {
 				err := errors.Wrap(err, "editing title")
 				out <- core.NewExportError(err, b.Id())
 				return
@@ -338,7 +347,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
 
 		case *bug.LabelChangeOperation:
 			opr := op.(*bug.LabelChangeOperation)
-			if err := ge.updateGithubIssueLabels(client, bugGithubID, opr.Added, opr.Removed); err != nil {
+			if err := ge.updateGithubIssueLabels(ctx, client, bugGithubID, opr.Added, opr.Removed); err != nil {
 				err := errors.Wrap(err, "updating labels")
 				out <- core.NewExportError(err, b.Id())
 				return
@@ -370,12 +379,9 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
 }
 
 // getRepositoryNodeID request github api v3 to get repository node id
-func getRepositoryNodeID(owner, project, token string) (string, error) {
+func getRepositoryNodeID(ctx context.Context, owner, project, token string) (string, error) {
 	url := fmt.Sprintf("%s/repos/%s/%s", githubV3Url, owner, project)
-
-	client := &http.Client{
-		Timeout: defaultTimeout,
-	}
+	client := &http.Client{}
 
 	req, err := http.NewRequest("GET", url, nil)
 	if err != nil {
@@ -385,6 +391,10 @@ func getRepositoryNodeID(owner, project, token string) (string, error) {
 	// need the token for private repositories
 	req.Header.Set("Authorization", fmt.Sprintf("token %s", token))
 
+	ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+	defer cancel()
+	req = req.WithContext(ctx)
+
 	resp, err := client.Do(req)
 	if err != nil {
 		return "", err
@@ -425,7 +435,7 @@ func markOperationAsExported(b *cache.BugCache, target entity.Id, githubID, gith
 }
 
 // get label from github
-func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (string, error) {
+func (ge *githubExporter) getGithubLabelID(ctx context.Context, gc *githubv4.Client, label string) (string, error) {
 	q := &labelQuery{}
 	variables := map[string]interface{}{
 		"label": githubv4.String(label),
@@ -433,8 +443,7 @@ func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (s
 		"name":  githubv4.String(ge.conf[keyProject]),
 	}
 
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
 	defer cancel()
 
 	if err := gc.Query(ctx, q, variables); err != nil {
@@ -452,12 +461,9 @@ func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (s
 // create a new label and return it github id
 // NOTE: since createLabel mutation is still in preview mode we use github api v3 to create labels
 // see https://developer.github.com/v4/mutation/createlabel/ and https://developer.github.com/v4/previews/#labels-preview
-func (ge *githubExporter) createGithubLabel(label, color string) (string, error) {
+func (ge *githubExporter) createGithubLabel(ctx context.Context, label, color string) (string, error) {
 	url := fmt.Sprintf("%s/repos/%s/%s/labels", githubV3Url, ge.conf[keyOwner], ge.conf[keyProject])
-
-	client := &http.Client{
-		Timeout: defaultTimeout,
-	}
+	client := &http.Client{}
 
 	params := struct {
 		Name        string `json:"name"`
@@ -478,6 +484,10 @@ func (ge *githubExporter) createGithubLabel(label, color string) (string, error)
 		return "", err
 	}
 
+	ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+	defer cancel()
+	req = req.WithContext(ctx)
+
 	// need the token for private repositories
 	req.Header.Set("Authorization", fmt.Sprintf("token %s", ge.conf[keyToken]))
 
@@ -529,9 +539,9 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC
 }
 */
 
-func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) {
+func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) {
 	// try to get label id
-	labelID, err := ge.getGithubLabelID(gc, string(label))
+	labelID, err := ge.getGithubLabelID(ctx, gc, string(label))
 	if err == nil {
 		return labelID, nil
 	}
@@ -540,7 +550,10 @@ func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, reposito
 	rgba := label.RGBA()
 	hexColor := fmt.Sprintf("%.2x%.2x%.2x", rgba.R, rgba.G, rgba.B)
 
-	labelID, err = ge.createGithubLabel(string(label), hexColor)
+	ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+	defer cancel()
+
+	labelID, err = ge.createGithubLabel(ctx, string(label), hexColor)
 	if err != nil {
 		return "", err
 	}
@@ -548,7 +561,7 @@ func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, reposito
 	return labelID, nil
 }
 
-func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) {
+func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) {
 	ids := make([]githubv4.ID, 0, len(labels))
 	var err error
 
@@ -557,7 +570,7 @@ func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string,
 		id, ok := ge.cachedLabels[string(label)]
 		if !ok {
 			// try to query label id
-			id, err = ge.getOrCreateGithubLabelID(gc, repositoryID, label)
+			id, err = ge.getOrCreateGithubLabelID(ctx, gc, repositoryID, label)
 			if err != nil {
 				return nil, errors.Wrap(err, "get or create github label")
 			}
@@ -573,7 +586,7 @@ func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string,
 }
 
 // create a github issue and return it ID
-func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (string, string, error) {
+func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, title, body string) (string, string, error) {
 	m := &createIssueMutation{}
 	input := githubv4.CreateIssueInput{
 		RepositoryID: repositoryID,
@@ -581,8 +594,7 @@ func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (s
 		Body:         (*githubv4.String)(&body),
 	}
 
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
 	defer cancel()
 
 	if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -594,15 +606,14 @@ func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (s
 }
 
 // add a comment to an issue and return it ID
-func addCommentGithubIssue(gc *githubv4.Client, subjectID string, body string) (string, string, error) {
+func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID string, body string) (string, string, error) {
 	m := &addCommentToIssueMutation{}
 	input := githubv4.AddCommentInput{
 		SubjectID: subjectID,
 		Body:      githubv4.String(body),
 	}
 
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
 	defer cancel()
 
 	if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -613,15 +624,14 @@ func addCommentGithubIssue(gc *githubv4.Client, subjectID string, body string) (
 	return node.ID, node.URL, nil
 }
 
-func editCommentGithubIssue(gc *githubv4.Client, commentID, body string) (string, string, error) {
+func editCommentGithubIssue(ctx context.Context, gc *githubv4.Client, commentID, body string) (string, string, error) {
 	m := &updateIssueCommentMutation{}
 	input := githubv4.UpdateIssueCommentInput{
 		ID:   commentID,
 		Body: githubv4.String(body),
 	}
 
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
 	defer cancel()
 
 	if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -631,7 +641,7 @@ func editCommentGithubIssue(gc *githubv4.Client, commentID, body string) (string
 	return commentID, m.UpdateIssueComment.IssueComment.URL, nil
 }
 
-func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status) error {
+func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string, status bug.Status) error {
 	m := &updateIssueMutation{}
 
 	// set state
@@ -651,8 +661,7 @@ func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status)
 		State: &state,
 	}
 
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
 	defer cancel()
 
 	if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -662,15 +671,14 @@ func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status)
 	return nil
 }
 
-func updateGithubIssueBody(gc *githubv4.Client, id string, body string) error {
+func updateGithubIssueBody(ctx context.Context, gc *githubv4.Client, id string, body string) error {
 	m := &updateIssueMutation{}
 	input := githubv4.UpdateIssueInput{
 		ID:   id,
 		Body: (*githubv4.String)(&body),
 	}
 
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
 	defer cancel()
 
 	if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -680,15 +688,14 @@ func updateGithubIssueBody(gc *githubv4.Client, id string, body string) error {
 	return nil
 }
 
-func updateGithubIssueTitle(gc *githubv4.Client, id, title string) error {
+func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title string) error {
 	m := &updateIssueMutation{}
 	input := githubv4.UpdateIssueInput{
 		ID:    id,
 		Title: (*githubv4.String)(&title),
 	}
 
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
 	defer cancel()
 
 	if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -699,18 +706,19 @@ func updateGithubIssueTitle(gc *githubv4.Client, id, title string) error {
 }
 
 // update github issue labels
-func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelableID string, added, removed []bug.Label) error {
+func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githubv4.Client, labelableID string, added, removed []bug.Label) error {
 	var errs []string
 	var wg sync.WaitGroup
 
-	parentCtx := context.Background()
+	reqCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
+	defer cancel()
 
 	if len(added) > 0 {
 		wg.Add(1)
 		go func() {
 			defer wg.Done()
 
-			addedIDs, err := ge.getLabelsIDs(gc, labelableID, added)
+			addedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, added)
 			if err != nil {
 				errs = append(errs, errors.Wrap(err, "getting added labels ids").Error())
 				return
@@ -722,11 +730,8 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable
 				LabelIDs:    addedIDs,
 			}
 
-			ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
-			defer cancel()
-
 			// add labels
-			if err := gc.Mutate(ctx, m, inputAdd, nil); err != nil {
+			if err := gc.Mutate(reqCtx, m, inputAdd, nil); err != nil {
 				errs = append(errs, err.Error())
 			}
 		}()
@@ -737,7 +742,7 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable
 		go func() {
 			defer wg.Done()
 
-			removedIDs, err := ge.getLabelsIDs(gc, labelableID, removed)
+			removedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, removed)
 			if err != nil {
 				errs = append(errs, errors.Wrap(err, "getting added labels ids").Error())
 				return
@@ -749,11 +754,8 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable
 				LabelIDs:    removedIDs,
 			}
 
-			ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
-			defer cancel()
-
 			// remove label labels
-			if err := gc.Mutate(ctx, m2, inputRemove, nil); err != nil {
+			if err := gc.Mutate(reqCtx, m2, inputRemove, nil); err != nil {
 				errs = append(errs, err.Error())
 			}
 		}()

bridge/github/export_test.go 🔗

@@ -2,6 +2,7 @@ package github
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"fmt"
 	"math/rand"
@@ -177,13 +178,14 @@ func TestPushPull(t *testing.T) {
 	})
 	require.NoError(t, err)
 
+	ctx := context.Background()
 	start := time.Now()
 
 	// export all bugs
-	events, err := exporter.ExportAll(backend, time.Time{})
+	exportEvents, err := exporter.ExportAll(ctx, backend, time.Time{})
 	require.NoError(t, err)
 
-	for result := range events {
+	for result := range exportEvents {
 		require.NoError(t, result.Err)
 	}
 	require.NoError(t, err)
@@ -206,9 +208,13 @@ func TestPushPull(t *testing.T) {
 	require.NoError(t, err)
 
 	// import all exported bugs to the second backend
-	err = importer.ImportAll(backendTwo, time.Time{})
+	importEvents, err := importer.ImportAll(ctx, backendTwo, time.Time{})
 	require.NoError(t, err)
 
+	for result := range importEvents {
+		require.NoError(t, result.Err)
+	}
+
 	require.Len(t, backendTwo.AllBugsIds(), len(tests))
 
 	for _, tt := range tests {

bridge/github/import.go 🔗

@@ -28,11 +28,8 @@ type githubImporter struct {
 	// iterator
 	iterator *iterator
 
-	// number of imported issues
-	importedIssues int
-
-	// number of imported identities
-	importedIdentities int
+	// send only channel
+	out chan<- core.ImportResult
 }
 
 func (gi *githubImporter) Init(conf core.Configuration) error {
@@ -42,40 +39,49 @@ func (gi *githubImporter) Init(conf core.Configuration) error {
 
 // ImportAll iterate over all the configured repository issues and ensure the creation of the
 // missing issues / timeline items / edits / label events ...
-func (gi *githubImporter) ImportAll(repo *cache.RepoCache, since time.Time) error {
-	gi.iterator = NewIterator(gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since)
-
-	// Loop over all matching issues
-	for gi.iterator.NextIssue() {
-		issue := gi.iterator.IssueValue()
-		fmt.Printf("importing issue: %v\n", issue.Title)
+func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) {
+	gi.iterator = NewIterator(ctx, 10, gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since)
+	out := make(chan core.ImportResult)
+	gi.out = out
+
+	go func() {
+		defer close(gi.out)
+
+		// Loop over all matching issues
+		for gi.iterator.NextIssue() {
+			issue := gi.iterator.IssueValue()
+			// create issue
+			b, err := gi.ensureIssue(repo, issue)
+			if err != nil {
+				err := fmt.Errorf("issue creation: %v", err)
+				out <- core.NewImportError(err, "")
+				return
+			}
 
-		// create issue
-		b, err := gi.ensureIssue(repo, issue)
-		if err != nil {
-			return fmt.Errorf("issue creation: %v", err)
-		}
+			// loop over timeline items
+			for gi.iterator.NextTimelineItem() {
+				item := gi.iterator.TimelineItemValue()
+				if err := gi.ensureTimelineItem(repo, b, item); err != nil {
+					err := fmt.Errorf("timeline item creation: %v", err)
+					out <- core.NewImportError(err, "")
+					return
+				}
+			}
 
-		// loop over timeline items
-		for gi.iterator.NextTimelineItem() {
-			if err := gi.ensureTimelineItem(repo, b, gi.iterator.TimelineItemValue()); err != nil {
-				return fmt.Errorf("timeline item creation: %v", err)
+			// commit bug state
+			if err := b.CommitAsNeeded(); err != nil {
+				err = fmt.Errorf("bug commit: %v", err)
+				out <- core.NewImportError(err, "")
+				return
 			}
 		}
 
-		// commit bug state
-		if err := b.CommitAsNeeded(); err != nil {
-			return fmt.Errorf("bug commit: %v", err)
+		if err := gi.iterator.Error(); err != nil && err != context.Canceled {
+			gi.out <- core.NewImportError(err, "")
 		}
-	}
+	}()
 
-	if err := gi.iterator.Error(); err != nil {
-		fmt.Printf("import error: %v\n", err)
-		return err
-	}
-
-	fmt.Printf("Successfully imported %d issues and %d identities from Github\n", gi.importedIssues, gi.importedIdentities)
-	return nil
+	return out, nil
 }
 
 func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline) (*cache.BugCache, error) {
@@ -122,7 +128,9 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
 			}
 
 			// importing a new bug
-			gi.importedIssues++
+			gi.out <- core.NewImportBug(b.Id())
+		} else {
+			gi.out <- core.NewImportNothing("", "bug already imported")
 		}
 
 	} else {
@@ -130,6 +138,7 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
 		for i, edit := range issueEdits {
 			if i == 0 && b != nil {
 				// The first edit in the github result is the issue creation itself, we already have that
+				gi.out <- core.NewImportNothing("", "bug already imported")
 				continue
 			}
 
@@ -159,13 +168,12 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
 				}
 
 				// importing a new bug
-				gi.importedIssues++
-
+				gi.out <- core.NewImportBug(b.Id())
 				continue
 			}
 
 			// other edits will be added as CommentEdit operations
-			target, err := b.ResolveOperationWithMetadata(keyGithubUrl, issue.Url.String())
+			target, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(issue.Id))
 			if err != nil {
 				return nil, err
 			}
@@ -181,16 +189,16 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
 }
 
 func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error {
-	fmt.Printf("import event item: %s\n", item.Typename)
 
 	switch item.Typename {
 	case "IssueComment":
 		// collect all comment edits
-		commentEdits := []userContentEdit{}
+		var commentEdits []userContentEdit
 		for gi.iterator.NextCommentEdit() {
 			commentEdits = append(commentEdits, gi.iterator.CommentEditValue())
 		}
 
+		// ensureTimelineComment send import events over out chanel
 		err := gi.ensureTimelineComment(repo, b, item.IssueComment, commentEdits)
 		if err != nil {
 			return fmt.Errorf("timeline comment creation: %v", err)
@@ -199,6 +207,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
 	case "LabeledEvent":
 		id := parseId(item.LabeledEvent.Id)
 		_, err := b.ResolveOperationWithMetadata(keyGithubId, id)
+		if err == nil {
+			reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+			gi.out <- core.NewImportNothing("", reason)
+			return nil
+		}
+
 		if err != cache.ErrNoMatchingOp {
 			return err
 		}
@@ -206,7 +220,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
 		if err != nil {
 			return err
 		}
-		_, err = b.ForceChangeLabelsRaw(
+		op, err := b.ForceChangeLabelsRaw(
 			author,
 			item.LabeledEvent.CreatedAt.Unix(),
 			[]string{
@@ -215,12 +229,21 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
 			nil,
 			map[string]string{keyGithubId: id},
 		)
+		if err != nil {
+			return err
+		}
 
-		return err
+		gi.out <- core.NewImportLabelChange(op.Id())
+		return nil
 
 	case "UnlabeledEvent":
 		id := parseId(item.UnlabeledEvent.Id)
 		_, err := b.ResolveOperationWithMetadata(keyGithubId, id)
+		if err == nil {
+			reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+			gi.out <- core.NewImportNothing("", reason)
+			return nil
+		}
 		if err != cache.ErrNoMatchingOp {
 			return err
 		}
@@ -229,7 +252,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
 			return err
 		}
 
-		_, err = b.ForceChangeLabelsRaw(
+		op, err := b.ForceChangeLabelsRaw(
 			author,
 			item.UnlabeledEvent.CreatedAt.Unix(),
 			nil,
@@ -238,7 +261,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
 			},
 			map[string]string{keyGithubId: id},
 		)
-		return err
+		if err != nil {
+			return err
+		}
+
+		gi.out <- core.NewImportLabelChange(op.Id())
+		return nil
 
 	case "ClosedEvent":
 		id := parseId(item.ClosedEvent.Id)
@@ -246,16 +274,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
 		if err != cache.ErrNoMatchingOp {
 			return err
 		}
+		if err == nil {
+			reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+			gi.out <- core.NewImportNothing("", reason)
+			return nil
+		}
 		author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor)
 		if err != nil {
 			return err
 		}
-		_, err = b.CloseRaw(
+		op, err := b.CloseRaw(
 			author,
 			item.ClosedEvent.CreatedAt.Unix(),
 			map[string]string{keyGithubId: id},
 		)
-		return err
+
+		if err != nil {
+			return err
+		}
+
+		gi.out <- core.NewImportStatusChange(op.Id())
+		return nil
 
 	case "ReopenedEvent":
 		id := parseId(item.ReopenedEvent.Id)
@@ -263,16 +302,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
 		if err != cache.ErrNoMatchingOp {
 			return err
 		}
+		if err == nil {
+			reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+			gi.out <- core.NewImportNothing("", reason)
+			return nil
+		}
 		author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor)
 		if err != nil {
 			return err
 		}
-		_, err = b.OpenRaw(
+		op, err := b.OpenRaw(
 			author,
 			item.ReopenedEvent.CreatedAt.Unix(),
 			map[string]string{keyGithubId: id},
 		)
-		return err
+
+		if err != nil {
+			return err
+		}
+
+		gi.out <- core.NewImportStatusChange(op.Id())
+		return nil
 
 	case "RenamedTitleEvent":
 		id := parseId(item.RenamedTitleEvent.Id)
@@ -280,20 +330,31 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
 		if err != cache.ErrNoMatchingOp {
 			return err
 		}
+		if err == nil {
+			reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+			gi.out <- core.NewImportNothing("", reason)
+			return nil
+		}
 		author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor)
 		if err != nil {
 			return err
 		}
-		_, err = b.SetTitleRaw(
+		op, err := b.SetTitleRaw(
 			author,
 			item.RenamedTitleEvent.CreatedAt.Unix(),
 			string(item.RenamedTitleEvent.CurrentTitle),
 			map[string]string{keyGithubId: id},
 		)
-		return err
+		if err != nil {
+			return err
+		}
+
+		gi.out <- core.NewImportTitleEdition(op.Id())
+		return nil
 
 	default:
-		fmt.Printf("ignore event: %v\n", item.Typename)
+		reason := fmt.Sprintf("ignoring timeline type: %v", item.Typename)
+		gi.out <- core.NewImportNothing("", reason)
 	}
 
 	return nil
@@ -307,14 +368,16 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
 	}
 
 	targetOpID, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(item.Id))
-	if err != nil && err != cache.ErrNoMatchingOp {
+	if err == nil {
+		reason := fmt.Sprintf("comment already imported")
+		gi.out <- core.NewImportNothing("", reason)
+	} else if err != cache.ErrNoMatchingOp {
 		// real error
 		return err
 	}
 
 	// if no edits are given we create the comment
 	if len(edits) == 0 {
-		// if comment doesn't exist
 		if err == cache.ErrNoMatchingOp {
 			cleanText, err := text.Cleanup(string(item.Body))
 			if err != nil {
@@ -322,7 +385,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
 			}
 
 			// add comment operation
-			_, err = b.AddCommentRaw(
+			op, err := b.AddCommentRaw(
 				author,
 				item.CreatedAt.Unix(),
 				cleanText,
@@ -332,12 +395,18 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
 					keyGithubUrl: parseId(item.Url.String()),
 				},
 			)
-			return err
+			if err != nil {
+				return err
+			}
+
+			gi.out <- core.NewImportComment(op.Id())
 		}
+
 	} else {
 		for i, edit := range edits {
 			if i == 0 && targetOpID != "" {
 				// The first edit in the github result is the comment creation itself, we already have that
+				gi.out <- core.NewImportNothing("", "comment already imported")
 				continue
 			}
 
@@ -370,7 +439,6 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
 
 				// set target for the nexr edit now that the comment is created
 				targetOpID = op.Id()
-
 				continue
 			}
 
@@ -386,7 +454,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
 func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error {
 	_, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(edit.Id))
 	if err == nil {
-		// already imported
+		gi.out <- core.NewImportNothing(b.Id(), "edition already imported")
 		return nil
 	}
 	if err != cache.ErrNoMatchingOp {
@@ -394,8 +462,6 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
 		return err
 	}
 
-	fmt.Println("import edition")
-
 	editor, err := gi.ensurePerson(repo, edit.Editor)
 	if err != nil {
 		return err
@@ -404,7 +470,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
 	switch {
 	case edit.DeletedAt != nil:
 		// comment deletion, not supported yet
-		fmt.Println("comment deletion is not supported yet")
+		gi.out <- core.NewImportNothing(b.Id(), "comment deletion is not supported yet")
 
 	case edit.DeletedAt == nil:
 
@@ -414,7 +480,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
 		}
 
 		// comment edition
-		_, err = b.EditCommentRaw(
+		op, err := b.EditCommentRaw(
 			editor,
 			edit.CreatedAt.Unix(),
 			target,
@@ -427,6 +493,8 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
 		if err != nil {
 			return err
 		}
+
+		gi.out <- core.NewImportCommentEdition(op.Id())
 	}
 
 	return nil
@@ -450,7 +518,6 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
 	}
 
 	// importing a new identity
-	gi.importedIdentities++
 
 	var name string
 	var email string
@@ -471,7 +538,7 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
 	case "Bot":
 	}
 
-	return repo.NewIdentityRaw(
+	i, err = repo.NewIdentityRaw(
 		name,
 		email,
 		string(actor.Login),
@@ -480,6 +547,13 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
 			keyGithubLogin: string(actor.Login),
 		},
 	)
+
+	if err != nil {
+		return nil, err
+	}
+
+	gi.out <- core.NewImportIdentity(i.Id())
+	return i, nil
 }
 
 func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) {
@@ -500,8 +574,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache,
 
 	gc := buildClient(gi.conf[keyToken])
 
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(gi.iterator.ctx, defaultTimeout)
 	defer cancel()
 
 	err = gc.Query(ctx, &q, variables)

bridge/github/import_test.go 🔗

@@ -1,6 +1,7 @@
 package github
 
 import (
+	"context"
 	"fmt"
 	"os"
 	"testing"
@@ -146,11 +147,16 @@ func Test_Importer(t *testing.T) {
 	})
 	require.NoError(t, err)
 
+	ctx := context.Background()
 	start := time.Now()
 
-	err = importer.ImportAll(backend, time.Time{})
+	events, err := importer.ImportAll(ctx, backend, time.Time{})
 	require.NoError(t, err)
 
+	for result := range events {
+		require.NoError(t, result.Err)
+	}
+
 	fmt.Printf("test repository imported in %f seconds\n", time.Since(start).Seconds())
 
 	require.Len(t, backend.AllBugsIds(), len(tests))

bridge/github/iterator.go 🔗

@@ -46,6 +46,9 @@ type iterator struct {
 	// to make
 	capacity int
 
+	// shared context used for all graphql queries
+	ctx context.Context
+
 	// sticky error
 	err error
 
@@ -60,11 +63,12 @@ type iterator struct {
 }
 
 // NewIterator create and initialize a new iterator
-func NewIterator(owner, project, token string, since time.Time) *iterator {
+func NewIterator(ctx context.Context, capacity int, owner, project, token string, since time.Time) *iterator {
 	i := &iterator{
 		gc:       buildClient(token),
 		since:    since,
-		capacity: 10,
+		capacity: capacity,
+		ctx:      ctx,
 		timeline: timelineIterator{
 			index:       -1,
 			issueEdit:   indexer{-1},
@@ -147,8 +151,7 @@ func (i *iterator) Error() error {
 }
 
 func (i *iterator) queryIssue() bool {
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
 	defer cancel()
 
 	if err := i.gc.Query(ctx, &i.timeline.query, i.timeline.variables); err != nil {
@@ -167,6 +170,10 @@ func (i *iterator) queryIssue() bool {
 // NextIssue try to query the next issue and return true. Only one issue is
 // queried at each call.
 func (i *iterator) NextIssue() bool {
+	if i.err != nil {
+		return false
+	}
+
 	// if $issueAfter variable is nil we can directly make the first query
 	if i.timeline.variables["issueAfter"] == (*githubv4.String)(nil) {
 		nextIssue := i.queryIssue()
@@ -175,10 +182,6 @@ func (i *iterator) NextIssue() bool {
 		return nextIssue
 	}
 
-	if i.err != nil {
-		return false
-	}
-
 	if !i.timeline.query.Repository.Issues.PageInfo.HasNextPage {
 		return false
 	}
@@ -207,11 +210,15 @@ func (i *iterator) NextTimelineItem() bool {
 		return false
 	}
 
+	if i.ctx.Err() != nil {
+		return false
+	}
+
 	if len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges) == 0 {
 		return false
 	}
 
-	if i.timeline.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges))-1 {
+	if i.timeline.index < len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges)-1 {
 		i.timeline.index++
 		return true
 	}
@@ -225,8 +232,7 @@ func (i *iterator) NextTimelineItem() bool {
 	// more timelines, query them
 	i.timeline.variables["timelineAfter"] = i.timeline.query.Repository.Issues.Nodes[0].Timeline.PageInfo.EndCursor
 
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
 	defer cancel()
 
 	if err := i.gc.Query(ctx, &i.timeline.query, i.timeline.variables); err != nil {
@@ -245,8 +251,7 @@ func (i *iterator) TimelineItemValue() timelineItem {
 }
 
 func (i *iterator) queryIssueEdit() bool {
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
 	defer cancel()
 
 	if err := i.gc.Query(ctx, &i.issueEdit.query, i.issueEdit.variables); err != nil {
@@ -285,10 +290,14 @@ func (i *iterator) NextIssueEdit() bool {
 		return false
 	}
 
+	if i.ctx.Err() != nil {
+		return false
+	}
+
 	// this mean we looped over all available issue edits in the timeline.
 	// now we have to use i.issueEditQuery
 	if i.timeline.issueEdit.index == -2 {
-		if i.issueEdit.index < min(i.capacity, len(i.issueEdit.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes))-1 {
+		if i.issueEdit.index < len(i.issueEdit.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes)-1 {
 			i.issueEdit.index++
 			return i.nextValidIssueEdit()
 		}
@@ -319,7 +328,7 @@ func (i *iterator) NextIssueEdit() bool {
 	}
 
 	// loop over them timeline comment edits
-	if i.timeline.issueEdit.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes))-1 {
+	if i.timeline.issueEdit.index < len(i.timeline.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes)-1 {
 		i.timeline.issueEdit.index++
 		return i.nextValidIssueEdit()
 	}
@@ -347,8 +356,7 @@ func (i *iterator) IssueEditValue() userContentEdit {
 }
 
 func (i *iterator) queryCommentEdit() bool {
-	parentCtx := context.Background()
-	ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+	ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
 	defer cancel()
 
 	if err := i.gc.Query(ctx, &i.commentEdit.query, i.commentEdit.variables); err != nil {
@@ -384,10 +392,14 @@ func (i *iterator) NextCommentEdit() bool {
 		return false
 	}
 
+	if i.ctx.Err() != nil {
+		return false
+	}
+
 	// same as NextIssueEdit
 	if i.timeline.commentEdit.index == -2 {
 
-		if i.commentEdit.index < min(i.capacity, len(i.commentEdit.query.Repository.Issues.Nodes[0].Timeline.Nodes[0].IssueComment.UserContentEdits.Nodes))-1 {
+		if i.commentEdit.index < len(i.commentEdit.query.Repository.Issues.Nodes[0].Timeline.Nodes[0].IssueComment.UserContentEdits.Nodes)-1 {
 			i.commentEdit.index++
 			return i.nextValidCommentEdit()
 		}
@@ -409,7 +421,7 @@ func (i *iterator) NextCommentEdit() bool {
 	}
 
 	// loop over them timeline comment edits
-	if i.timeline.commentEdit.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes))-1 {
+	if i.timeline.commentEdit.index < len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes)-1 {
 		i.timeline.commentEdit.index++
 		return i.nextValidCommentEdit()
 	}
@@ -440,14 +452,6 @@ func (i *iterator) CommentEditValue() userContentEdit {
 	return i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes[i.timeline.commentEdit.index]
 }
 
-func min(a, b int) int {
-	if a > b {
-		return b
-	}
-
-	return a
-}
-
 func reverseEdits(edits []userContentEdit) {
 	for i, j := 0, len(edits)-1; i < j; i, j = i+1, j-1 {
 		edits[i], edits[j] = edits[j], edits[i]

bridge/gitlab/import.go 🔗

@@ -1,6 +1,7 @@
 package gitlab
 
 import (
+	"context"
 	"fmt"
 	"strconv"
 	"time"
@@ -21,11 +22,8 @@ type gitlabImporter struct {
 	// iterator
 	iterator *iterator
 
-	// number of imported issues
-	importedIssues int
-
-	// number of imported identities
-	importedIdentities int
+	// send only channel
+	out chan<- core.ImportResult
 }
 
 func (gi *gitlabImporter) Init(conf core.Configuration) error {
@@ -35,49 +33,60 @@ func (gi *gitlabImporter) Init(conf core.Configuration) error {
 
 // ImportAll iterate over all the configured repository issues (notes) and ensure the creation
 // of the missing issues / comments / label events / title changes ...
-func (gi *gitlabImporter) ImportAll(repo *cache.RepoCache, since time.Time) error {
-	gi.iterator = NewIterator(gi.conf[keyProjectID], gi.conf[keyToken], since)
-
-	// Loop over all matching issues
-	for gi.iterator.NextIssue() {
-		issue := gi.iterator.IssueValue()
-		fmt.Printf("importing issue: %v\n", issue.Title)
+func (gi *gitlabImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) {
+	gi.iterator = NewIterator(ctx, 10, gi.conf[keyProjectID], gi.conf[keyToken], since)
+	out := make(chan core.ImportResult)
+	gi.out = out
+
+	go func() {
+		defer close(gi.out)
+
+		// Loop over all matching issues
+		for gi.iterator.NextIssue() {
+			issue := gi.iterator.IssueValue()
+
+			// create issue
+			b, err := gi.ensureIssue(repo, issue)
+			if err != nil {
+				err := fmt.Errorf("issue creation: %v", err)
+				out <- core.NewImportError(err, "")
+				return
+			}
 
-		// create issue
-		b, err := gi.ensureIssue(repo, issue)
-		if err != nil {
-			return fmt.Errorf("issue creation: %v", err)
-		}
+			// Loop over all notes
+			for gi.iterator.NextNote() {
+				note := gi.iterator.NoteValue()
+				if err := gi.ensureNote(repo, b, note); err != nil {
+					err := fmt.Errorf("note creation: %v", err)
+					out <- core.NewImportError(err, entity.Id(strconv.Itoa(note.ID)))
+					return
+				}
+			}
 
-		// Loop over all notes
-		for gi.iterator.NextNote() {
-			note := gi.iterator.NoteValue()
-			if err := gi.ensureNote(repo, b, note); err != nil {
-				return fmt.Errorf("note creation: %v", err)
+			// Loop over all label events
+			for gi.iterator.NextLabelEvent() {
+				labelEvent := gi.iterator.LabelEventValue()
+				if err := gi.ensureLabelEvent(repo, b, labelEvent); err != nil {
+					err := fmt.Errorf("label event creation: %v", err)
+					out <- core.NewImportError(err, entity.Id(strconv.Itoa(labelEvent.ID)))
+					return
+				}
 			}
-		}
 
-		// Loop over all label events
-		for gi.iterator.NextLabelEvent() {
-			labelEvent := gi.iterator.LabelEventValue()
-			if err := gi.ensureLabelEvent(repo, b, labelEvent); err != nil {
-				return fmt.Errorf("label event creation: %v", err)
+			// commit bug state
+			if err := b.CommitAsNeeded(); err != nil {
+				err := fmt.Errorf("bug commit: %v", err)
+				out <- core.NewImportError(err, "")
+				return
 			}
 		}
 
 		if err := gi.iterator.Error(); err != nil {
-			fmt.Printf("import error: %v\n", err)
-			return err
+			out <- core.NewImportError(err, "")
 		}
+	}()
 
-		// commit bug state
-		if err := b.CommitAsNeeded(); err != nil {
-			return fmt.Errorf("bug commit: %v", err)
-		}
-	}
-
-	fmt.Printf("Successfully imported %d issues and %d identities from Gitlab\n", gi.importedIssues, gi.importedIdentities)
-	return nil
+	return out, nil
 }
 
 func (gi *gitlabImporter) ensureIssue(repo *cache.RepoCache, issue *gitlab.Issue) (*cache.BugCache, error) {
@@ -89,13 +98,14 @@ func (gi *gitlabImporter) ensureIssue(repo *cache.RepoCache, issue *gitlab.Issue
 
 	// resolve bug
 	b, err := repo.ResolveBugCreateMetadata(keyGitlabUrl, issue.WebURL)
-	if err != nil && err != bug.ErrBugNotExist {
-		return nil, err
-	}
-
 	if err == nil {
+		reason := fmt.Sprintf("bug already imported")
+		gi.out <- core.NewImportNothing("", reason)
 		return b, nil
 	}
+	if err != bug.ErrBugNotExist {
+		return nil, err
+	}
 
 	// if bug was never imported
 	cleanText, err := text.Cleanup(issue.Description)
@@ -123,7 +133,7 @@ func (gi *gitlabImporter) ensureIssue(repo *cache.RepoCache, issue *gitlab.Issue
 	}
 
 	// importing a new bug
-	gi.importedIssues++
+	gi.out <- core.NewImportBug(b.Id())
 
 	return b, nil
 }
@@ -149,28 +159,36 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
 			return nil
 		}
 
-		_, err = b.CloseRaw(
+		op, err := b.CloseRaw(
 			author,
 			note.CreatedAt.Unix(),
 			map[string]string{
 				keyGitlabId: gitlabID,
 			},
 		)
-		return err
+		if err != nil {
+			return err
+		}
+
+		gi.out <- core.NewImportStatusChange(op.Id())
 
 	case NOTE_REOPENED:
 		if errResolve == nil {
 			return nil
 		}
 
-		_, err = b.OpenRaw(
+		op, err := b.OpenRaw(
 			author,
 			note.CreatedAt.Unix(),
 			map[string]string{
 				keyGitlabId: gitlabID,
 			},
 		)
-		return err
+		if err != nil {
+			return err
+		}
+
+		gi.out <- core.NewImportStatusChange(op.Id())
 
 	case NOTE_DESCRIPTION_CHANGED:
 		issue := gi.iterator.IssueValue()
@@ -181,7 +199,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
 		// TODO: Check only one time and ignore next 'description change' within one issue
 		if errResolve == cache.ErrNoMatchingOp && issue.Description != firstComment.Message {
 			// comment edition
-			_, err = b.EditCommentRaw(
+			op, err := b.EditCommentRaw(
 				author,
 				note.UpdatedAt.Unix(),
 				firstComment.Id(),
@@ -190,8 +208,11 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
 					keyGitlabId: gitlabID,
 				},
 			)
+			if err != nil {
+				return err
+			}
 
-			return err
+			gi.out <- core.NewImportTitleEdition(op.Id())
 		}
 
 	case NOTE_COMMENT:
@@ -204,7 +225,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
 		if errResolve == cache.ErrNoMatchingOp {
 
 			// add comment operation
-			_, err = b.AddCommentRaw(
+			op, err := b.AddCommentRaw(
 				author,
 				note.CreatedAt.Unix(),
 				cleanText,
@@ -213,8 +234,11 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
 					keyGitlabId: gitlabID,
 				},
 			)
-
-			return err
+			if err != nil {
+				return err
+			}
+			gi.out <- core.NewImportComment(op.Id())
+			return nil
 		}
 
 		// if comment was already exported
@@ -228,7 +252,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
 		// compare local bug comment with the new note body
 		if comment.Message != cleanText {
 			// comment edition
-			_, err = b.EditCommentRaw(
+			op, err := b.EditCommentRaw(
 				author,
 				note.UpdatedAt.Unix(),
 				comment.Id(),
@@ -236,7 +260,10 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
 				nil,
 			)
 
-			return err
+			if err != nil {
+				return err
+			}
+			gi.out <- core.NewImportCommentEdition(op.Id())
 		}
 
 		return nil
@@ -247,7 +274,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
 			return nil
 		}
 
-		_, err = b.SetTitleRaw(
+		op, err := b.SetTitleRaw(
 			author,
 			note.CreatedAt.Unix(),
 			body,
@@ -255,8 +282,11 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
 				keyGitlabId: gitlabID,
 			},
 		)
+		if err != nil {
+			return err
+		}
 
-		return err
+		gi.out <- core.NewImportTitleEdition(op.Id())
 
 	case NOTE_UNKNOWN,
 		NOTE_ASSIGNED,
@@ -269,6 +299,9 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
 		NOTE_UNLOCKED,
 		NOTE_MENTIONED_IN_ISSUE,
 		NOTE_MENTIONED_IN_MERGE_REQUEST:
+
+		reason := fmt.Sprintf("unsupported note type: %s", noteType.String())
+		gi.out <- core.NewImportNothing("", reason)
 		return nil
 
 	default:
@@ -337,10 +370,7 @@ func (gi *gitlabImporter) ensurePerson(repo *cache.RepoCache, id int) (*cache.Id
 		return nil, err
 	}
 
-	// importing a new identity
-	gi.importedIdentities++
-
-	return repo.NewIdentityRaw(
+	i, err = repo.NewIdentityRaw(
 		user.Name,
 		user.PublicEmail,
 		user.Username,
@@ -351,6 +381,12 @@ func (gi *gitlabImporter) ensurePerson(repo *cache.RepoCache, id int) (*cache.Id
 			keyGitlabLogin: user.Username,
 		},
 	)
+	if err != nil {
+		return nil, err
+	}
+
+	gi.out <- core.NewImportIdentity(i.Id())
+	return i, nil
 }
 
 func parseID(id int) string {

bridge/gitlab/import_notes.go 🔗

@@ -28,6 +28,45 @@ const (
 	NOTE_UNKNOWN
 )
 
+func (nt NoteType) String() string {
+	switch nt {
+	case NOTE_COMMENT:
+		return "note comment"
+	case NOTE_TITLE_CHANGED:
+		return "note title changed"
+	case NOTE_DESCRIPTION_CHANGED:
+		return "note description changed"
+	case NOTE_CLOSED:
+		return "note closed"
+	case NOTE_REOPENED:
+		return "note reopened"
+	case NOTE_LOCKED:
+		return "note locked"
+	case NOTE_UNLOCKED:
+		return "note unlocked"
+	case NOTE_CHANGED_DUEDATE:
+		return "note changed duedate"
+	case NOTE_REMOVED_DUEDATE:
+		return "note remove duedate"
+	case NOTE_ASSIGNED:
+		return "note assigned"
+	case NOTE_UNASSIGNED:
+		return "note unassigned"
+	case NOTE_CHANGED_MILESTONE:
+		return "note changed milestone"
+	case NOTE_REMOVED_MILESTONE:
+		return "note removed in milestone"
+	case NOTE_MENTIONED_IN_ISSUE:
+		return "note mentioned in issue"
+	case NOTE_MENTIONED_IN_MERGE_REQUEST:
+		return "note mentioned in merge request"
+	case NOTE_UNKNOWN:
+		return "note unknown"
+	default:
+		panic("unknown note type")
+	}
+}
+
 // GetNoteType parse a note system and body and return the note type and it content
 func GetNoteType(n *gitlab.Note) (NoteType, string) {
 	// when a note is a comment system is set to false

bridge/gitlab/import_test.go 🔗

@@ -1,6 +1,7 @@
 package gitlab
 
 import (
+	"context"
 	"fmt"
 	"os"
 	"testing"
@@ -99,10 +100,16 @@ func TestImport(t *testing.T) {
 	})
 	require.NoError(t, err)
 
+	ctx := context.Background()
 	start := time.Now()
-	err = importer.ImportAll(backend, time.Time{})
+
+	events, err := importer.ImportAll(ctx, backend, time.Time{})
 	require.NoError(t, err)
 
+	for result := range events {
+		require.NoError(t, result.Err)
+	}
+
 	fmt.Printf("test repository imported in %f seconds\n", time.Since(start).Seconds())
 
 	require.Len(t, backend.AllBugsIds(), len(tests))

bridge/gitlab/iterator.go 🔗

@@ -1,6 +1,7 @@
 package gitlab
 
 import (
+	"context"
 	"time"
 
 	"github.com/xanzy/go-gitlab"
@@ -38,6 +39,9 @@ type iterator struct {
 	// number of issues and notes to query at once
 	capacity int
 
+	// shared context
+	ctx context.Context
+
 	// sticky error
 	err error
 
@@ -52,12 +56,13 @@ type iterator struct {
 }
 
 // NewIterator create a new iterator
-func NewIterator(projectID, token string, since time.Time) *iterator {
+func NewIterator(ctx context.Context, capacity int, projectID, token string, since time.Time) *iterator {
 	return &iterator{
 		gc:       buildClient(token),
 		project:  projectID,
 		since:    since,
-		capacity: 20,
+		capacity: capacity,
+		ctx:      ctx,
 		issue: &issueIterator{
 			index: -1,
 			page:  1,
@@ -79,6 +84,9 @@ func (i *iterator) Error() error {
 }
 
 func (i *iterator) getNextIssues() bool {
+	ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
+	defer cancel()
+
 	issues, _, err := i.gc.Issues.ListProjectIssues(
 		i.project,
 		&gitlab.ListProjectIssuesOptions{
@@ -90,6 +98,7 @@ func (i *iterator) getNextIssues() bool {
 			UpdatedAfter: &i.since,
 			Sort:         gitlab.String("asc"),
 		},
+		gitlab.WithContext(ctx),
 	)
 
 	if err != nil {
@@ -116,6 +125,10 @@ func (i *iterator) NextIssue() bool {
 		return false
 	}
 
+	if i.ctx.Err() != nil {
+		return false
+	}
+
 	// first query
 	if i.issue.cache == nil {
 		return i.getNextIssues()
@@ -135,6 +148,9 @@ func (i *iterator) IssueValue() *gitlab.Issue {
 }
 
 func (i *iterator) getNextNotes() bool {
+	ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
+	defer cancel()
+
 	notes, _, err := i.gc.Notes.ListIssueNotes(
 		i.project,
 		i.IssueValue().IID,
@@ -146,6 +162,7 @@ func (i *iterator) getNextNotes() bool {
 			Sort:    gitlab.String("asc"),
 			OrderBy: gitlab.String("created_at"),
 		},
+		gitlab.WithContext(ctx),
 	)
 
 	if err != nil {
@@ -171,6 +188,10 @@ func (i *iterator) NextNote() bool {
 		return false
 	}
 
+	if i.ctx.Err() != nil {
+		return false
+	}
+
 	if len(i.note.cache) == 0 {
 		return i.getNextNotes()
 	}
@@ -189,6 +210,9 @@ func (i *iterator) NoteValue() *gitlab.Note {
 }
 
 func (i *iterator) getNextLabelEvents() bool {
+	ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
+	defer cancel()
+
 	labelEvents, _, err := i.gc.ResourceLabelEvents.ListIssueLabelEvents(
 		i.project,
 		i.IssueValue().IID,
@@ -198,6 +222,7 @@ func (i *iterator) getNextLabelEvents() bool {
 				PerPage: i.capacity,
 			},
 		},
+		gitlab.WithContext(ctx),
 	)
 
 	if err != nil {
@@ -224,6 +249,10 @@ func (i *iterator) NextLabelEvent() bool {
 		return false
 	}
 
+	if i.ctx.Err() != nil {
+		return false
+	}
+
 	if len(i.labelEvent.cache) == 0 {
 		return i.getNextLabelEvents()
 	}

bridge/launchpad/import.go 🔗

@@ -1,11 +1,10 @@
 package launchpad
 
 import (
+	"context"
 	"fmt"
 	"time"
 
-	"github.com/pkg/errors"
-
 	"github.com/MichaelMure/git-bug/bridge/core"
 	"github.com/MichaelMure/git-bug/bug"
 	"github.com/MichaelMure/git-bug/cache"
@@ -45,98 +44,116 @@ func (li *launchpadImporter) ensurePerson(repo *cache.RepoCache, owner LPPerson)
 	)
 }
 
-func (li *launchpadImporter) ImportAll(repo *cache.RepoCache, since time.Time) error {
+func (li *launchpadImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) {
+	out := make(chan core.ImportResult)
 	lpAPI := new(launchpadAPI)
 
 	err := lpAPI.Init()
 	if err != nil {
-		return err
+		return nil, err
 	}
 
-	lpBugs, err := lpAPI.SearchTasks(li.conf["project"])
+	lpBugs, err := lpAPI.SearchTasks(ctx, li.conf["project"])
 	if err != nil {
-		return err
+		return nil, err
 	}
 
-	for _, lpBug := range lpBugs {
-		var b *cache.BugCache
-		var err error
-
-		lpBugID := fmt.Sprintf("%d", lpBug.ID)
-		b, err = repo.ResolveBugCreateMetadata(keyLaunchpadID, lpBugID)
-		if err != nil && err != bug.ErrBugNotExist {
-			return err
-		}
-
-		owner, err := li.ensurePerson(repo, lpBug.Owner)
-		if err != nil {
-			return err
-		}
-
-		if err == bug.ErrBugNotExist {
-			createdAt, _ := time.Parse(time.RFC3339, lpBug.CreatedAt)
-			b, _, err = repo.NewBugRaw(
-				owner,
-				createdAt.Unix(),
-				lpBug.Title,
-				lpBug.Description,
-				nil,
-				map[string]string{
-					keyLaunchpadID: lpBugID,
-				},
-			)
-			if err != nil {
-				return errors.Wrapf(err, "failed to add bug id #%s", lpBugID)
+	go func() {
+		for _, lpBug := range lpBugs {
+			select {
+			case <-ctx.Done():
+				return
+			default:
+				lpBugID := fmt.Sprintf("%d", lpBug.ID)
+				b, err := repo.ResolveBugCreateMetadata(keyLaunchpadID, lpBugID)
+				if err != nil && err != bug.ErrBugNotExist {
+					out <- core.NewImportError(err, entity.Id(lpBugID))
+					return
+				}
+
+				owner, err := li.ensurePerson(repo, lpBug.Owner)
+				if err != nil {
+					out <- core.NewImportError(err, entity.Id(lpBugID))
+					return
+				}
+
+				if err == bug.ErrBugNotExist {
+					createdAt, _ := time.Parse(time.RFC3339, lpBug.CreatedAt)
+					b, _, err = repo.NewBugRaw(
+						owner,
+						createdAt.Unix(),
+						lpBug.Title,
+						lpBug.Description,
+						nil,
+						map[string]string{
+							keyLaunchpadID: lpBugID,
+						},
+					)
+					if err != nil {
+						out <- core.NewImportError(err, entity.Id(lpBugID))
+						return
+					}
+
+					out <- core.NewImportBug(b.Id())
+
+				}
+
+				/* Handle messages */
+				if len(lpBug.Messages) == 0 {
+					err := fmt.Sprintf("bug doesn't have any comments")
+					out <- core.NewImportNothing(entity.Id(lpBugID), err)
+					return
+				}
+
+				// The Launchpad API returns the bug description as the first
+				// comment, so skip it.
+				for _, lpMessage := range lpBug.Messages[1:] {
+					_, err := b.ResolveOperationWithMetadata(keyLaunchpadID, lpMessage.ID)
+					if err != nil && err != cache.ErrNoMatchingOp {
+						out <- core.NewImportError(err, entity.Id(lpMessage.ID))
+						return
+					}
+
+					// If this comment already exists, we are probably
+					// updating an existing bug. We do not want to duplicate
+					// the comments, so let us just skip this one.
+					// TODO: Can Launchpad comments be edited?
+					if err == nil {
+						continue
+					}
+
+					owner, err := li.ensurePerson(repo, lpMessage.Owner)
+					if err != nil {
+						out <- core.NewImportError(err, "")
+						return
+					}
+
+					// This is a new comment, we can add it.
+					createdAt, _ := time.Parse(time.RFC3339, lpMessage.CreatedAt)
+					op, err := b.AddCommentRaw(
+						owner,
+						createdAt.Unix(),
+						lpMessage.Content,
+						nil,
+						map[string]string{
+							keyLaunchpadID: lpMessage.ID,
+						})
+					if err != nil {
+						out <- core.NewImportError(err, op.Id())
+						return
+					}
+
+					out <- core.NewImportComment(op.Id())
+				}
+
+				err = b.CommitAsNeeded()
+				if err != nil {
+					out <- core.NewImportError(err, "")
+					return
+				}
 			}
-		} else {
-			/* TODO: Update bug */
-			fmt.Println("TODO: Update bug")
 		}
+	}()
 
-		/* Handle messages */
-		if len(lpBug.Messages) == 0 {
-			return errors.Wrapf(err, "failed to fetch comments for bug #%s", lpBugID)
-		}
-
-		// The Launchpad API returns the bug description as the first
-		// comment, so skip it.
-		for _, lpMessage := range lpBug.Messages[1:] {
-			_, err := b.ResolveOperationWithMetadata(keyLaunchpadID, lpMessage.ID)
-			if err != nil && err != cache.ErrNoMatchingOp {
-				return errors.Wrapf(err, "failed to fetch comments for bug #%s", lpBugID)
-			}
-
-			// If this comment already exists, we are probably
-			// updating an existing bug. We do not want to duplicate
-			// the comments, so let us just skip this one.
-			// TODO: Can Launchpad comments be edited?
-			if err == nil {
-				continue
-			}
-
-			owner, err := li.ensurePerson(repo, lpMessage.Owner)
-			if err != nil {
-				return err
-			}
-
-			// This is a new comment, we can add it.
-			createdAt, _ := time.Parse(time.RFC3339, lpMessage.CreatedAt)
-			_, err = b.AddCommentRaw(
-				owner,
-				createdAt.Unix(),
-				lpMessage.Content,
-				nil,
-				map[string]string{
-					keyLaunchpadID: lpMessage.ID,
-				})
-			if err != nil {
-				return errors.Wrapf(err, "failed to add comment to bug #%s", lpBugID)
-			}
-		}
-		err = b.CommitAsNeeded()
-		if err != nil {
-			return err
-		}
-	}
-	return nil
+	return out, nil
 }

bridge/launchpad/launchpad_api.go 🔗

@@ -14,6 +14,7 @@ package launchpad
  */
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"net/http"
@@ -33,43 +34,6 @@ type LPPerson struct {
 // https://api.launchpad.net/devel/~login
 var personCache = make(map[string]LPPerson)
 
-func (owner *LPPerson) UnmarshalJSON(data []byte) error {
-	type LPPersonX LPPerson // Avoid infinite recursion
-	var ownerLink string
-	if err := json.Unmarshal(data, &ownerLink); err != nil {
-		return err
-	}
-
-	// First, try to gather info about the bug owner using our cache.
-	if cachedPerson, hasKey := personCache[ownerLink]; hasKey {
-		*owner = cachedPerson
-		return nil
-	}
-
-	// If the bug owner is not already known, we have to send a request.
-	req, err := http.NewRequest("GET", ownerLink, nil)
-	if err != nil {
-		return nil
-	}
-
-	client := &http.Client{}
-	resp, err := client.Do(req)
-	if err != nil {
-		return nil
-	}
-
-	defer resp.Body.Close()
-
-	var p LPPersonX
-	if err := json.NewDecoder(resp.Body).Decode(&p); err != nil {
-		return nil
-	}
-	*owner = LPPerson(p)
-	// Do not forget to update the cache.
-	personCache[ownerLink] = *owner
-	return nil
-}
-
 // LPBug describes a Launchpad bug.
 type LPBug struct {
 	Title       string   `json:"title"`
@@ -109,11 +73,13 @@ type launchpadAPI struct {
 }
 
 func (lapi *launchpadAPI) Init() error {
-	lapi.client = &http.Client{}
+	lapi.client = &http.Client{
+		Timeout: defaultTimeout,
+	}
 	return nil
 }
 
-func (lapi *launchpadAPI) SearchTasks(project string) ([]LPBug, error) {
+func (lapi *launchpadAPI) SearchTasks(ctx context.Context, project string) ([]LPBug, error) {
 	var bugs []LPBug
 
 	// First, let us build the URL. Not all statuses are included by
@@ -153,7 +119,7 @@ func (lapi *launchpadAPI) SearchTasks(project string) ([]LPBug, error) {
 		}
 
 		for _, bugEntry := range result.Entries {
-			bug, err := lapi.queryBug(bugEntry.BugLink)
+			bug, err := lapi.queryBug(ctx, bugEntry.BugLink)
 			if err == nil {
 				bugs = append(bugs, bug)
 			}
@@ -170,13 +136,14 @@ func (lapi *launchpadAPI) SearchTasks(project string) ([]LPBug, error) {
 	return bugs, nil
 }
 
-func (lapi *launchpadAPI) queryBug(url string) (LPBug, error) {
+func (lapi *launchpadAPI) queryBug(ctx context.Context, url string) (LPBug, error) {
 	var bug LPBug
 
 	req, err := http.NewRequest("GET", url, nil)
 	if err != nil {
 		return bug, err
 	}
+	req = req.WithContext(ctx)
 
 	resp, err := lapi.client.Do(req)
 	if err != nil {
@@ -191,7 +158,7 @@ func (lapi *launchpadAPI) queryBug(url string) (LPBug, error) {
 
 	/* Fetch messages */
 	messagesCollectionLink := fmt.Sprintf("%s/bugs/%d/messages", apiRoot, bug.ID)
-	messages, err := lapi.queryMessages(messagesCollectionLink)
+	messages, err := lapi.queryMessages(ctx, messagesCollectionLink)
 	if err != nil {
 		return bug, err
 	}
@@ -200,7 +167,7 @@ func (lapi *launchpadAPI) queryBug(url string) (LPBug, error) {
 	return bug, nil
 }
 
-func (lapi *launchpadAPI) queryMessages(messagesURL string) ([]LPMessage, error) {
+func (lapi *launchpadAPI) queryMessages(ctx context.Context, messagesURL string) ([]LPMessage, error) {
 	var messages []LPMessage
 
 	for {
@@ -208,6 +175,7 @@ func (lapi *launchpadAPI) queryMessages(messagesURL string) ([]LPMessage, error)
 		if err != nil {
 			return nil, err
 		}
+		req = req.WithContext(ctx)
 
 		resp, err := lapi.client.Do(req)
 		if err != nil {

cache/repo_cache.go 🔗

@@ -172,18 +172,10 @@ func (c *RepoCache) lock() error {
 }
 
 func (c *RepoCache) Close() error {
-	for id := range c.identities {
-		delete(c.identities, id)
-	}
-	for id := range c.identitiesExcerpts {
-		delete(c.identitiesExcerpts, id)
-	}
-	for id := range c.bugs {
-		delete(c.bugs, id)
-	}
-	for id := range c.bugExcerpts {
-		delete(c.bugExcerpts, id)
-	}
+	c.identities = make(map[entity.Id]*IdentityCache)
+	c.identitiesExcerpts = nil
+	c.bugs = make(map[entity.Id]*BugCache)
+	c.bugExcerpts = nil
 
 	lockPath := repoLockFilePath(c.repo)
 	return os.Remove(lockPath)

commands/bridge_pull.go 🔗

@@ -1,6 +1,10 @@
 package commands
 
 import (
+	"context"
+	"fmt"
+	"os"
+	"sync"
 	"time"
 
 	"github.com/spf13/cobra"
@@ -31,12 +35,59 @@ func runBridgePull(cmd *cobra.Command, args []string) error {
 		return err
 	}
 
+	parentCtx := context.Background()
+	ctx, cancel := context.WithCancel(parentCtx)
+	defer cancel()
+
+	// buffered channel to avoid send block at the end
+	done := make(chan struct{}, 1)
+
+	var mu sync.Mutex
+	interruptCount := 0
+	interrupt.RegisterCleaner(func() error {
+		mu.Lock()
+		if interruptCount > 0 {
+			fmt.Println("Received another interrupt before graceful stop, terminating...")
+			os.Exit(0)
+		}
+
+		interruptCount++
+		mu.Unlock()
+
+		fmt.Println("Received interrupt signal, stopping the import...\n(Hit ctrl-c again to kill the process.)")
+
+		// send signal to stop the importer
+		cancel()
+
+		// block until importer gracefully shutdown
+		<-done
+		return nil
+	})
+
 	// TODO: by default import only new events
-	err = b.ImportAll(time.Time{})
+	events, err := b.ImportAll(ctx, time.Time{})
 	if err != nil {
 		return err
 	}
 
+	importedIssues := 0
+	importedIdentities := 0
+	for result := range events {
+		fmt.Println(result.String())
+
+		switch result.Event {
+		case core.ImportEventBug:
+			importedIssues++
+		case core.ImportEventIdentity:
+			importedIdentities++
+		}
+	}
+
+	// send done signal
+	close(done)
+
+	fmt.Printf("Successfully imported %d issues and %d identities with %s bridge\n", importedIssues, importedIdentities, b.Name)
+
 	return nil
 }
 

commands/bridge_push.go 🔗

@@ -1,7 +1,10 @@
 package commands
 
 import (
+	"context"
 	"fmt"
+	"os"
+	"sync"
 	"time"
 
 	"github.com/spf13/cobra"
@@ -32,20 +35,55 @@ func runBridgePush(cmd *cobra.Command, args []string) error {
 		return err
 	}
 
+	parentCtx := context.Background()
+	ctx, cancel := context.WithCancel(parentCtx)
+	defer cancel()
+
+	done := make(chan struct{}, 1)
+
+	var mu sync.Mutex
+	interruptCount := 0
+	interrupt.RegisterCleaner(func() error {
+		mu.Lock()
+		if interruptCount > 0 {
+			fmt.Println("Received another interrupt before graceful stop, terminating...")
+			os.Exit(0)
+		}
+
+		interruptCount++
+		mu.Unlock()
+
+		fmt.Println("Received interrupt signal, stopping the import...\n(Hit ctrl-c again to kill the process.)")
+
+		// send signal to stop the importer
+		cancel()
+
+		// block until importer gracefully shutdown
+		<-done
+		return nil
+	})
+
 	// TODO: by default export only new events
-	out, err := b.ExportAll(time.Time{})
+	events, err := b.ExportAll(ctx, time.Time{})
 	if err != nil {
 		return err
 	}
 
-	for result := range out {
-		if result.Err != nil {
-			fmt.Println(result.Err, result.Reason)
-		} else {
-			fmt.Printf("%s: %s\n", result.String(), result.ID)
+	exportedIssues := 0
+	for result := range events {
+		fmt.Println(result.String())
+
+		switch result.Event {
+		case core.ExportEventBug:
+			exportedIssues++
 		}
 	}
 
+	// send done signal
+	close(done)
+
+	fmt.Printf("Successfully exported %d issues with %s bridge\n", exportedIssues, b.Name)
+
 	return nil
 }
 

entity/merge.go 🔗

@@ -13,6 +13,7 @@ const (
 	MergeStatusInvalid
 	MergeStatusUpdated
 	MergeStatusNothing
+	MergeStatusError
 )
 
 type MergeResult struct {
@@ -39,6 +40,8 @@ func (mr MergeResult) String() string {
 		return "updated"
 	case MergeStatusNothing:
 		return "nothing to do"
+	case MergeStatusError:
+		return fmt.Sprintf("merge error on %s: %s", mr.Id, mr.Err.Error())
 	default:
 		panic("unknown merge status")
 	}
@@ -46,8 +49,9 @@ func (mr MergeResult) String() string {
 
 func NewMergeError(err error, id Id) MergeResult {
 	return MergeResult{
-		Err: err,
-		Id:  id,
+		Err:    err,
+		Id:     id,
+		Status: MergeStatusError,
 	}
 }