diff --git a/dgraph/cmd/dgraphimport/import_test.go b/dgraph/cmd/dgraphimport/import_test.go index bd762ede6b1..b6925775a33 100644 --- a/dgraph/cmd/dgraphimport/import_test.go +++ b/dgraph/cmd/dgraphimport/import_test.go @@ -75,8 +75,6 @@ func TestEmptyBulkOutDir(t *testing.T) { } func TestDrainModeAfterStartSnapshotStream(t *testing.T) { - t.Skip("Skipping... sometimes the query for schema succeeds even when the server is in draining mode") - tests := []struct { name string numAlphas int @@ -123,8 +121,6 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) { } func TestImportApis(t *testing.T) { - t.Skip("Skipping import tests due to persistent flakiness with container networking and Raft leadership issues") - tests := []testcase{ { name: "SingleGroupShutTwoAlphasPerGroup", @@ -391,12 +387,12 @@ func verifyImportResults(t *testing.T, gc *dgraphapi.GrpcClient, downAlphas int) } retryDelay := time.Second - hasAllPredicates := true // Get expected predicates first var expectedSchemaObj map[string]interface{} require.NoError(t, json.Unmarshal([]byte(expectedSchema), &expectedSchemaObj)) expectedPredicates := getPredicateMap(expectedSchemaObj) + var hasAllPredicates bool for i := 0; i < maxRetries; i++ { // Checking client connection again here because an import operation may be in progress on the rejoined alpha @@ -412,7 +408,7 @@ func verifyImportResults(t *testing.T, gc *dgraphapi.GrpcClient, downAlphas int) // Get actual predicates actualPredicates := getPredicateMap(actualSchema) - // Check if all expected predicates are present + hasAllPredicates = true for predName := range expectedPredicates { if _, exists := actualPredicates[predName]; !exists { hasAllPredicates = false diff --git a/dgraphtest/load.go b/dgraphtest/load.go index 4451a28f43e..a356f66612c 100644 --- a/dgraphtest/load.go +++ b/dgraphtest/load.go @@ -580,8 +580,8 @@ func downloadFile(fname, url string) error { cmd := exec.Command("wget", "-O", fname, url) cmd.Dir = datasetFilesPath - if out, err := cmd.CombinedOutput(); err != nil { - return fmt.Errorf("error downloading file %s: %s", fname, string(out)) + if _, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("error downloading file %s: %w", fname, err) } return nil } diff --git a/dgraphtest/local_cluster.go b/dgraphtest/local_cluster.go index c5fda79e375..6b48c8d4b2f 100644 --- a/dgraphtest/local_cluster.go +++ b/dgraphtest/local_cluster.go @@ -930,8 +930,7 @@ func (c *LocalCluster) Client() (*dgraphapi.GrpcClient, func(), error) { var conns []*grpc.ClientConn for _, aa := range c.alphas { if !aa.isRunning { - // QUESTIONS(shivaji): Should this be 'continue' instead of a break from the loop - break + continue } url, err := aa.alphaURL(c) if err != nil { diff --git a/systest/backup/nfs-backup/Untitled b/systest/backup/nfs-backup/Untitled new file mode 100644 index 00000000000..c175a1bdb25 --- /dev/null +++ b/systest/backup/nfs-backup/Untitled @@ -0,0 +1 @@ +alpha1_backup_clust_ha \ No newline at end of file diff --git a/systest/backup/nfs-backup/backup_test.go b/systest/backup/nfs-backup/backup_test.go index 4ff9be7df5e..e360ab2caf4 100644 --- a/systest/backup/nfs-backup/backup_test.go +++ b/systest/backup/nfs-backup/backup_test.go @@ -38,13 +38,11 @@ var ( ) func TestBackupHAClust(t *testing.T) { - t.Skip("Skipping HA backup test via NFS") backupRestoreTest(t, "alpha1_backup_clust_ha", "zero1_backup_clust_ha", "alpha4_restore_clust_ha", backupDstHA) } func TestBackupNonHAClust(t *testing.T) { - t.Skip("Skipping Non-HA backup test via NFS") backupRestoreTest(t, "alpha7_backup_clust_non_ha", "zero7_backup_clust_non_ha", "alpha8_restore_clust_non_ha", backupDstNonHA) } @@ -61,11 +59,10 @@ func backupRestoreTest(t *testing.T, backupAlphaName string, backupZeroName stri backupZero := testutil.ContainerInstance{Name: backupZeroName, Prefix: testutil.DockerPrefix} require.NoError(t, backupZero.BestEffortWaitForHealthy(6080)) - // Resolve addresses after containers are healthy - backupAlphaSocketAddr := testutil.ContainerAddr(backupAlphaName, 9080) - backupAlphaSocketAddrHttp := testutil.ContainerAddr(backupAlphaName, 8080) - restoreAlphaAddr := testutil.ContainerAddr(restoreAlphaName, 8080) - backupZeroAddr := testutil.ContainerAddr(backupZeroName, 6080) + backupAlphaSocketAddr := testutil.ContainerAddrRetry(backupAlphaName, 9080) + backupAlphaSocketAddrHttp := testutil.ContainerAddrRetry(backupAlphaName, 8080) + restoreAlphaAddr := testutil.ContainerAddrRetry(restoreAlphaName, 8080) + backupZeroAddr := testutil.ContainerAddrRetry(backupZeroName, 6080) var dg *dgo.Dgraph var err error @@ -73,6 +70,10 @@ func backupRestoreTest(t *testing.T, backupAlphaName string, backupZeroName stri // Wait for gRPC connection to be ready with retries t.Log("Waiting for gRPC connection to be ready...") + fmt.Println("=================================================") + fmt.Println("backup alpha addess ------>", backupAlphaSocketAddr) + fmt.Println("=================================================") + for i := 0; i < 30; i++ { var connErr error dg, connErr = dgo.Open(fmt.Sprintf("dgraph://%s?sslmode=disable", backupAlphaSocketAddr)) diff --git a/testutil/docker.go b/testutil/docker.go index 8558117c50e..00a83bd42ae 100644 --- a/testutil/docker.go +++ b/testutil/docker.go @@ -226,6 +226,26 @@ func ContainerAddrWithHost(name string, privatePort uint16, host string) string return host + ":" + strconv.Itoa(int(privatePort)) } +func ContainerAddrWithHostRetry(name string, privatePort uint16, host string) string { + maxAttempts := 30 + for attempt := range maxAttempts { + fmt.Printf("Attempt %d to get container address for %s:%d with host %s\n", attempt, name, privatePort, host) + c := getContainer(name) + for _, p := range c.Ports { + if p.PrivatePort == privatePort { + // Found the mapping - return immediately without waiting + return host + ":" + strconv.Itoa(int(p.PublicPort)) + } + } + + if attempt < maxAttempts-1 { + time.Sleep(500 * time.Millisecond) + } + } + + return host + ":" + strconv.Itoa(int(privatePort)) +} + func ContainerAddrLocalhost(name string, privatePort uint16) string { return ContainerAddrWithHost(name, privatePort, "localhost") } @@ -234,6 +254,10 @@ func ContainerAddr(name string, privatePort uint16) string { return ContainerAddrWithHost(name, privatePort, "0.0.0.0") } +func ContainerAddrRetry(name string, privatePort uint16) string { + return ContainerAddrWithHostRetry(name, privatePort, "0.0.0.0") +} + // DockerStart starts the specified services. func DockerRun(instance string, op int) error { c := getContainer(instance) diff --git a/worker/import.go b/worker/import.go index 68a96b82f6b..b7b965e7272 100644 --- a/worker/import.go +++ b/worker/import.go @@ -298,7 +298,7 @@ func InStream(stream api.Dgraph_StreamExtSnapshotServer) error { return fmt.Errorf("failed to establish stream with leader: %v", err) } glog.Infof("[import] [forward %d -> %d] start", groups().Node.gid, groupId) - glog.Infof("[import] [forward %d -> %d] start", groups().Node.MyAddr, groups().Leader(groupId).Addr) + glog.Infof("[import] [forward %v -> %d] start", groups().Node.MyAddr, groups().Leader(groupId).Addr) glog.Infof("[import] sending forward true to leader of group [%v]", groupId) forwardReq := &api.StreamExtSnapshotRequest{Forward: true} @@ -313,12 +313,6 @@ func InStream(stream api.Dgraph_StreamExtSnapshotServer) error { func pipeTwoStream(in api.Dgraph_StreamExtSnapshotServer, out pb.Worker_StreamExtSnapshotClient, groupId uint32) error { currentGroup := groups().Node.gid ctx := in.Context() - if err := out.Send(&api.StreamExtSnapshotRequest{GroupId: groupId}); err != nil { - return fmt.Errorf("send groupId downstream(%d): %w", groupId, err) - } - if _, err := out.Recv(); err != nil { - return fmt.Errorf("ack groupId downstream(%d): %w", groupId, err) - } for { if err := ctx.Err(); err != nil { @@ -487,7 +481,7 @@ func streamInGroup(stream api.Dgraph_StreamExtSnapshotServer, forward bool) erro if forward { // We are not going to return any error from here because we care about the majority of nodes. // If the majority of nodes are able to receive the data, the remaining ones can catch up later. - glog.Infof("[import] Streaming external snapshot to [%v] from [%v] forward [%v]", member.Addr, node.MyAddr) + glog.Infof("[import] Streaming external snapshot to [%v] from [%v]", member.Addr, node.MyAddr) eg.Go(func() error { glog.Infof(`[import:forward] streaming external snapshot to [%v] from [%v]`, member.Addr, node.MyAddr) if member.AmDead {