Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Go SDK]: Custom Splittable DoFn causes errors on runner split #26245

Closed
2 of 15 tasks
camphillips22 opened this issue Apr 12, 2023 · 5 comments
Closed
2 of 15 tasks

Comments

@camphillips22
Copy link
Contributor

camphillips22 commented Apr 12, 2023

What happened?

I created a simple Splittable DoFn that uses an offsetrange.Restriction. The range of the SDF was determined at pipeline construction time and the input to the SDF is an Impulse() collection.

The dataflow runner then spews a lot of the following message:

Error message from worker: generic::invalid_argument: First residual index 2 cannot be greater than the number of input elements to the bundle 1

Occasionally, I see this one:

Error message from worker: generic::data_loss: SDK claims to have processed 2 but should have processed 1 elements. passed through: ==> dist_proc/dax/workflow/worker/fnapi_operators.cc:1875 generic::invalid_argument: First residual index cannot increase from previous split, was 1 and is now 2

Looking in the logs, the RESP: instruction_id logs for "process_bundle_split" there are two types of messages, ones that contain non-empty primary and residual roots and ones where both are empty. The ones that are not empty contain a channel split like:

last_primary_element: -1
first_residual_element: 1

The ones that are empty contain a channel split that have

last_primary_element: 1
first_residual_element: 2

As the input collection only contains a single element, I believe this is the source of the error. I've been trying to determine the circumstances that lead to this. My only guess currently is that it's a race. Basically, when the Datasource node moves onto the next element (increments the index) and is waiting on receiving the io.EOF, a split comes in. This results in empty primary and residual roots (since the restriction tracker is done), and an off by one "error" in the element indexes.

As far as what to do as a workaround, I believe that the data is complete, but it's a bit concerning to see "data_loss" in the error logs.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@lostluck
Copy link
Contributor

What runner?

What version of the Go SDK?

Example code for the DoFn in question?

The Datasource and split handling code changed in order to support Timers, but existing correctly implemented SDFs (such as in textio and similar), continued to pass splitting tests along with that work.

@camphillips22
Copy link
Contributor Author

camphillips22 commented Apr 13, 2023

Runner: Google Dataflow
Go SDK version: 2.46.0

Example code:

func GenerateData(s beam.Scope, start, end time.Time, col beam.PCollection) beam.PCollection {
	s = s.Scope("GenerateData")
	return beam.ParDo(s, &pageIdGeneratorFn{
		Start: start.Format(time.RFC3339),
		End:   end.Format(time.RFC3339),
	}, col)
}

type dataGeneratorFn struct {
	Start string
	End   string
}

func (f *dataGeneratorFn) CreateInitialRestriction(_ []byte) offsetrange.Restriction {
	s, _ := time.Parse(time.RFC3339Nano, f.Start)
	e, _ := time.Parse(time.RFC3339Nano, f.End)
	return offsetrange.Restriction{
		Start: s.Unix(),
		End:   e.Unix(),
	}
}

func (f *dataGeneratorFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}

func (f *dataGeneratorFn) RestrictionSize(_ []byte, rest offsetrange.Restriction) float64 {
	return rest.Size()
}

func (f *dataGeneratorFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
        rest.SizedSplits(int64(time.Hour.Seconds()))
}

func (f *dataGeneratorFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, _ []byte, emit func([]byte)) (sdf.ProcessContinuation, error) {

        rest := rt.GetRestriction().(offsetrange.Restriction)
	start := time.Unix(rest.Start, 0)
	end := time.Unix(rest.End, 0)
        
        // iterator may take a while to set up. Claim the start before setting up since dataflow aggressively splits.
        if !rt.TryClaim(rest.Start) {
               return sdf.StopProcessing(), nil
        }
        lastClaim := rest.Start

        // setup iterator that takes in time bounds
        iter := GetIterator(ctx, start, end)
	for {
		v, err := iter.Next()

		if err != nil {
			if err == io.EOF {
                                 rt.TryClaim(rest.End)
				return sdf.StopProcessing(), nil
			}
			return sdf.ResumeProcessingIn(5 * time.Second), err
		}
                if v.Timestamp() != lastClaim {
	                if !rt.TryClaim(v.Timestamp()) {
		                return sdf.StopProcessing(), nil
	                }
                        lastClaim = v.Timestamp()
                }
		emit(v.Data())
	}
}

As far as passing tests, I think that even with the error, I'm getting correct results, but the volume of errors is a bit concerning.

@lostluck
Copy link
Contributor

lostluck commented Apr 19, 2023

To not leave this hanging, this appears to be an issue with the Dataflow side worker, not the SDK. That release is being rolled back.

Unless you've seen this issue prior to April 3rd.
When the rollback is complete I'll comment here again.

@lostluck
Copy link
Contributor

The bad workers have been rolled back. Are you still seeing this error?

@lostluck
Copy link
Contributor

lostluck commented Dec 4, 2023

Closing due to inactivity.

@lostluck lostluck closed this as completed Dec 4, 2023
@github-actions github-actions bot added this to the 2.53.0 Release milestone Dec 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants