Skip to content

Conversation

dynamic-Archu
Copy link

@dynamic-Archu dynamic-Archu commented Apr 22, 2025

Optimized approach for OCM transfer by implementing a concurrent worker pool, replacing the previous sequential transfer logic.

What this PR does / why we need it

This PR introduces an optimized approach for OCM transfer by implementing a concurrent worker pool, replacing the previous sequential transfer logic.

Improvements:

Reduced transfer time from ~45 minutes to ~23 minutes 40 seconds.
Achieved nearly 2x speedup by leveraging concurrent processing.
Introduced a configurable concurrent worker pool to parallelize transfers.

Technical Approach:

Replaced copyVersion with a new function copyVersionWithWorkerPool, using a bounded goroutine worker pool
Introduced a transferTask abstraction to encapsulate resource and source transfer logic
Enabled parallel transfer of artifacts using a concurrent worker pool (default size: 5 workers)
Tuned worker pool size for optimal performance and resource utilization.

Why:

Introducing concurrency significantly improves transfer speed and overall efficiency.

Which issue(s) this PR fixes

Optimized approach for OCM transfer by implementing a concurrent worker pool, replacing the previous sequential transfer logic.
@dynamic-Archu dynamic-Archu requested a review from a team as a code owner April 22, 2025 10:24
@github-actions github-actions bot added the size/m Medium label Apr 22, 2025
@dynamic-Archu dynamic-Archu changed the title Updated transfer.go feat(transfer): Optimized approach for OCM transfer by implementing a concurrent worker pool. Apr 22, 2025
@github-actions github-actions bot added the kind/feature new feature, enhancement, improvement, extension label Apr 22, 2025
@frewilhelm frewilhelm added the needs/validation Validate the issue and assign a priority label Apr 22, 2025
Copy link
Contributor

@jakobmoellerdev jakobmoellerdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution.

Please adjust the following:

  • Ensure Context Cancellation is still possible
  • Ensure tests are green
  • Please keep existing comments and keep your change set minimal for the change needed.
  • Please consider what happens if children have joint dependencies
Component A
=> Component B
=> Component C

Component B
=> Component D
Component C
=> Component D

Currently you are triggering multiple transfers for Component D which is incorrect.

Cheers!

@dynamic-Archu
Copy link
Author

dynamic-Archu commented Apr 24, 2025

@jakobmoellerdev

Thanks for the feedback! I have a few questions regarding the points you've raised:

  • Could you clarify the specific logic you expect for context cancellation ?
  • For the scenario with joint dependencies (i.e., Component B and Component C both depending on Component D), would the correct approach be to ignore the second transfer of Component D? Or is the ideal solution to ensure that only one transfer occurs for Component D, possibly by adding a check to see if it has already been triggered?
  • Should we be considering any optimization patterns to avoid triggering multiple transfers?

@jakobmoellerdev
Copy link
Contributor

jakobmoellerdev commented Apr 24, 2025

@jakobmoellerdev

Thanks for the feedback! I have a few questions regarding the points you've raised:

  • Could you clarify the specific logic you expect for context cancellation ?

I expect that the worker pool can be cancelled if a parent context is cancelled. you removed the parent context right now. I think you can just add this back in and create a new subcontext: https://github.com/open-component-model/ocm/pull/1420/files#diff-d341af7fc25e4e0ea76361047d0e578a1e197d1e598fcba3ffc5ce866c42358dL38

  • For the scenario with joint dependencies (i.e., Component B and Component C both depending on Component D), would the correct approach be to ignore the second transfer of Component D? Or is the ideal solution to ensure that only one transfer occurs for Component D, possibly by adding a check to see if it has already been triggered?

It is imperative that the transfer for a single component version in the handler happens at most once per transfer attempt. Retries are handled within there, so we should avoid triggering it multiple times. There are a few ways you can do this, for example with mutexes or synced maps.

  • Should we be considering any optimization patterns to avoid triggering multiple transfers?

See above :)

One other thing: If the transfer of one component version fails, we should cancel all other active transfers. For that you should possibly use an error group from the sync package

}

// subp := common.AddPrinterGap(ctx, " ")
var wg sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed this while taking a look here again, but this is not implemented as described in the PR:

Enabled parallel transfer of artifacts using a concurrent worker pool (default size: 5 workers, configurable)
Tuned worker pool size for optimal performance and resource utilization.

You should make this sync group limited and configurable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is still not resolved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the expectation here that once one of the go routine fails, all others should abort? if so, then you should use an errgroup.WithContext. WDYT?

func CopyVersionWithContext(cctx context.Context, log logging.Logger, hist common.History, src ocm.ComponentVersionAccess, t ocm.ComponentVersionAccess, handler TransferHandler) (rerr error) {
return copyVersion(cctx, common.GetPrinter(cctx), log, hist, src, t, src.GetDescriptor().Copy(), handler)
func CopyVersionWithContext(cctx context.Context, printer common.Printer, log logging.Logger, hist common.History, src ocm.ComponentVersionAccess, t ocm.ComponentVersionAccess, handler TransferHandler) (rerr error) {
return copyVersionWithWorkerPool(cctx, printer, log, hist, src, t, src.GetDescriptor().Copy(), handler, 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 5 workers seem very specific to your trial run and should not be used for OCM in general.

I have 2 suggestions for possible value setting:

  1. Default to a runtime-dynamic value, for example runtime.NumCPU() that defaults to active discovered CPUs by the GO runtime
  2. Default to a sequential value, i.e. 1

Choose whichever you prefer.

That being said, to properly make it configurable like you describe in your PR, I am missing exactly that: a configurable flag. As such you will need to introduce both a setting/option as well as a CLI flag for concurrency. Ideally this should also be configurable in .ocmconfig. I don't mind for now if you want to have this in separate PRs but imo the merge should not happen without ANY configuration ability.

Thanks!

i, r := i, r
tasks <- transferTask{
id: fmt.Sprintf("resource-%d", i),
task: func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This functional transfer task is very hard to read and debug, so I suggest you split this into a separate function.

@dynamic-Archu
Copy link
Author

dynamic-Archu commented May 13, 2025

Hi @jakobmoellerdev,

Thank you for the review and feedback ! I'll look into the suggestions you provided.

In the meantime, I wanted to raise a concern I noticed while testing the OCM transfer process locally using the latest version of the OCM codebase. When I run it after cloning, I consistently encounter the following error:

Error: transfer resources and sources: commit and ref cannot be specified at the same time

This error appears even when using the latest OCM transfer.go code (without my changes), and although the transfer process completes, the blobs are not getting transferred to the JFrog repository.
When I run the same with my modified code, the components are getting transferred correctly, but the blobs are still missing in the JFrog repo.

Screenshot 2025-05-13 at 8 23 40 PM Screenshot 2025-05-13 at 8 18 48 PM Screenshot 2025-05-13 at 8 21 02 PM

It would be great if someone could help verify this behavior and share any insights.

@jakobmoellerdev
Copy link
Contributor

There is a related change I did quite some time back that influenced the github access method: #1406 i introduced this limitation because ref and commit are mutually exclusive to the access method. Im assuming you have incorrect specs in your component version

Updated existing code with new changes based on the previous feedbacks
@dynamic-Archu
Copy link
Author

Hi @jakobmoellerdev ,

Thank you for the previous review and inputs! I've made the necessary changes based on your comments. Could you please review the latest changes and let me know if everything looks good?

Additionally, I wanted to follow up regarding the issue I'm facing during local testing - the blobs are still not getting transferred and I'm consistently seeing the following error: transfer resources and sources: commit and ref cannot be specified at the same time

You also suggested that the issue could be due to incorrect specs in the component version.
Could this error also be the reason why the blobs are not being transferred?

Looking forward to your input. Thanks again!

@jakobmoellerdev
Copy link
Contributor

The issue you were facing is a regression we reverted now in the latest RCs and on main, you shouldnt have this error anymore. :)

@dynamic-Archu
Copy link
Author

dynamic-Archu commented May 26, 2025

Hi @jakobmoellerdev,

There are a couple of checks that are failing, and I’d appreciate your guidance on how to proceed:

  1. Test Failure – There seems to be a mismatch in the expected vs. actual output in components/transfer/cmd_test.go under transfers ctf.
  2. gci Formatting Error – I'm also encountering an import sorting issue flagged by gci in components/transfer/ctf/ctf.go.

Could you please share some insights or point me in the right direction to resolve these?

Thanks in advance!

Copy link
Contributor

@jakobmoellerdev jakobmoellerdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the lint error. you shoud fixup your import ordering on the project such that

  gci:
    sections:
      - standard
      - blank
      - dot
      - default
      - prefix(ocm.software/ocm)
    custom-order: true

is set. your file edit breaks these rules. please see the GCI lint documentation on how to fix that and your IDE documentation on how to configure import ordering. You can also run golangci-lint in --fix mode to correct this automatically.

Regarding the test run:
you are changing the way that notifyArtifactInfo is called in your transfer logic.

In your code https://github.com/open-component-model/ocm/pull/1420/files#diff-d341af7fc25e4e0ea76361047d0e578a1e197d1e598fcba3ffc5ce866c42358dR316 I am assuming that you changed how changed and valueNeeded are evaluated and that causes a different handling. Please double check that the behavior on the breaking test is the same as before your change.

Before:

var old compdesc.Resource

				hint := ocmcpi.ArtifactNameHint(a, src)
				old, err = cur.GetResourceByIdentity(r.Meta().GetIdentity(srccd.Resources))

				changed := err != nil || old.Digest == nil || !old.Digest.Equal(r.Meta().Digest)
				valueNeeded := err == nil && needsTransport(src.GetContext(), r, &old)
				if changed || valueNeeded {
					var msgs []interface{}
					if !errors.IsErrNotFound(err) {
						if err != nil {
							return err
						}
						if !changed && valueNeeded {
							msgs = []interface{}{"copy"}
						} else {
							msgs = []interface{}{"overwrite"}
						}
					}
					notifyArtifactInfo(printer, log, "resource", i, r.Meta(), hint, msgs...)
					err = handler.HandleTransferResource(r, m, hint, t)
				} else {
					if err == nil { // old resource found -> keep current access method
						t.SetResource(r.Meta(), old.Access, ocm.ModifyElement(), ocm.SkipVerify(), ocm.DisableExtraIdentityDefaulting())
					}
					notifyArtifactInfo(printer, log, "resource", i, r.Meta(), hint, "already present")
				}

after

				hint := ocmcpi.ArtifactNameHint(a, src)
				old, err := cur.GetResourceByIdentity(r.Meta().GetIdentity(srccd.Resources))
				changed := err != nil || old.Digest == nil || !old.Digest.Equal(r.Meta().Digest)
				valueNeeded := err == nil && needsTransport(src.GetContext(), r, &old)
				if changed || valueNeeded {
					notifyArtifactInfo(printer, log, "resource", i, r.Meta(), hint, "copy")
					return handler.HandleTransferResource(r, m, hint, t)
				} else if err == nil {
					t.SetResource(r.Meta(), old.Access, ocm.ModifyElement(), ocm.SkipVerify(), ocm.DisableExtraIdentityDefaulting())
					notifyArtifactInfo(printer, log, "resource", i, r.Meta(), hint, "already present")
				}

}

// subp := common.AddPrinterGap(ctx, " ")
var wg sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is still not resolved


nested := finalize.Nested()
log.Info(" transferring resources and sources using worker pool", "workers", maxWorkers)
tasks := make(chan transferTask)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you not make the tasks a bounded channel too?

Copy link
Author

@dynamic-Archu dynamic-Archu Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, absolutely! I've now updated the tasks channel to be a bounded (buffered) channel within the copyVersionWithWorkerPool function.

This helps to:

  • Decouple the task producer from the workers slightly.
  • Smooth out throughput by allowing a small backlog of tasks to accumulate.
  • Control memory usage by limiting the number of in-flight tasks.

The buffer size is set to maxWorkers * 2 (with a minimum of 1), which provides a good balance.

Here's the updated line in copyVersionWithWorkerPool:

// ...
	log.Info("  transferring resources and sources using worker pool", "workers", maxWorkers)

	// Make tasks a bounded (buffered) channel to smooth out task distribution.
	taskBufferSize := maxWorkers * 2
	if taskBufferSize < 1 { // Ensure a minimum buffer size
		taskBufferSize = 1
	}
	tasks := make(chan transferTask, taskBufferSize) // <-- Now a buffered channel
	errChan := make(chan error, len(src.GetResources())+len(src.GetSources()))
	// ...

Thanks for the suggestion!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think it is necessary for you to introduce an unknown buffer size. instead you can introduce a goroutine pool that is buffered to the number of resources+sources no? (same as the errChan)

}

go func() {
for i, r := range src.GetResources() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to run a go routine to submit transfer tasks to the pool?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task submission loop (go func() { ... tasks <- ... close(tasks) }) runs in its own goroutine primarily to:

  1. Avoid Deadlock: If it ran in the main goroutine, sending tasks could block (if the channel is full), preventing wg.Wait() and close(tasks) from being reached.
  2. Proper Channel Closure: Ensures close(tasks) is called only after all tasks have been submitted, allowing worker range loops to terminate correctly.

This pattern allows the task producer to run concurrently with the workers and manage channel lifecycle gracefully.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why in general a submission is done in a goroutine. However, im wondering why you need it. You know the length of elements in the channel in advance so it is always buffered correctly and can never lock. as such you do not have a deadlock. that also simplifies channel closure.

Copy link
Contributor

@jakobmoellerdev jakobmoellerdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great already, thanks for all the efforts 🥳 There are some minor corrections and one major question to answer which we already briefly discussed before:

Do you want to change the default OCM behavior to run concurrent transfers. If yes, then how do we retain the old serialized ordered behavior? It is such a big switch that we have to make sure users can opt in to the parallel transfer IMO.

type TransportClosure = common.NameVersionInfo[*struct{}]

// TransferWorkersEnvVar is the environment variable to configure the number of transfer workers.
const TransferWorkersEnvVar = "OCM_TRANSFER_WORKERS" // This constant is now technically unused, but kept for context.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make the env variable read out inside the attribute coding and change to OCM_MAX_WORKERS

}

// subp := common.AddPrinterGap(ctx, " ")
var wg sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the expectation here that once one of the go routine fails, all others should abort? if so, then you should use an errgroup.WithContext. WDYT?

// If no copy is done, merge must keep the access methods in target!!!
if !doMerge || doCopy {
err = copyVersion(ctx, printer, log, state.History, src, t, n, handler)
// *** FIX: Pass 'ctx' argument here ***
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing TODO

// If attributeWorkers is 0, it means either user explicitly set 0, or attribute was not set (default to 0).
// In both cases, this signals to use the CPU-based auto-detection.
if attributeWorkers == 0 {
return determineWorkersFromCPU()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default should become part of the attribute get logic imo

@jakobmoellerdev
Copy link
Contributor

Before I forget, we introduced a DCO requirement so you need to sign off your commits now!

Copy link
Contributor

@jakobmoellerdev jakobmoellerdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have quite a few people that look forward to this. if you need help in resolving open comments, please feel free to join our community call!

https://ocm.software/community/engagement/#community-calls

if err != nil {
unstr = nil
// Capture the error specifically for this operation
specErr := error(nil) // Declare a local error variable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
specErr := error(nil) // Declare a local error variable
var specErr error

// Capture the error specifically for this operation
specErr := error(nil) // Declare a local error variable
unstr, specErr = runtimeutil.ToUnstructuredTypedObject(tgt.GetSpecification())
if specErr != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline this assignment and if statement

Copy link
Author

@dynamic-Archu dynamic-Archu Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure 👍

func calculateEffectiveTransferWorkers(ctx context.Context) int {
// First, obtain the OCM data context from the provided Go context.
// `ocm.DefaultContext()` is a common way to get it if it's not directly `datacontext.Context`.
ocmCtx := ocm.DefaultContext()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt the ocm context be retrieved from the caller context?

@jakobmoellerdev jakobmoellerdev removed the needs/validation Validate the issue and assign a priority label Sep 3, 2025
@github-actions github-actions bot added the size/l Large label Oct 3, 2025
@dynamic-Archu
Copy link
Author

Hi @jakobmoellerdev , could you please trigger all the remaining test cases ?

@jakobmoellerdev
Copy link
Contributor

Hey there: tests/lint still failing, PTAL. Also please sign and make sure DCO is passing. also run go generate as there are diffs. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/feature new feature, enhancement, improvement, extension size/l Large size/m Medium

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants