Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
// Package dispatch provides the factory and registration mechanism for all `framework.IntraFlowDispatchPolicy`
// implementations.
// It allows new policies to be added to the system and instantiated by name.
package dispatch
package intraflow

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ limitations under the License.
*/

// Package fcfs provides a First-Come, First-Served implementation of the `framework.IntraFlowDispatchPolicy`.
package fcfs
package intraflow

import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
)

Expand Down Expand Up @@ -53,7 +52,7 @@ import (
const FCFSPolicyName = "FCFS"

func init() {
dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(FCFSPolicyName),
MustRegisterPolicy(RegisteredPolicyName(FCFSPolicyName),
func() (framework.IntraFlowDispatchPolicy, error) {
return newFCFS(), nil
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package fcfs
package intraflow

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package dispatch_test
package intraflow

import (
"testing"
Expand All @@ -23,8 +23,6 @@ import (
"github.com/stretchr/testify/require"

frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
_ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
)

// TestIntraFlowDispatchPolicyConformance is the main conformance test suite for `framework.IntraFlowDispatchPolicy`
Expand All @@ -34,7 +32,7 @@ import (
func TestIntraFlowDispatchPolicyConformance(t *testing.T) {
t.Parallel()

for policyName, constructor := range dispatch.RegisteredPolicies {
for policyName, constructor := range RegisteredPolicies {
t.Run(string(policyName), func(t *testing.T) {
t.Parallel()

Expand Down
17 changes: 8 additions & 9 deletions pkg/epp/flowcontrol/registry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
)

Expand All @@ -36,7 +35,7 @@ const (
// It is set to 1 GB.
defaultPriorityBandMaxBytes uint64 = 1_000_000_000
// defaultIntraFlowDispatchPolicy is the default policy for selecting items within a single flow's queue.
defaultIntraFlowDispatchPolicy intra.RegisteredPolicyName = fcfs.FCFSPolicyName
defaultIntraFlowDispatchPolicy intraflow.RegisteredPolicyName = intraflow.FCFSPolicyName
// defaultInterFlowDispatchPolicy is the default policy for selecting which flow's queue to service next.
defaultInterFlowDispatchPolicy interflow.RegisteredPolicyName = interflow.BestHeadPolicyName
// defaultQueue is the default queue implementation for flows.
Expand All @@ -54,15 +53,15 @@ const (

// capabilityChecker abstracts the logic required to validate if a policy is compatible with a queue.
type capabilityChecker interface {
CheckCompatibility(p intra.RegisteredPolicyName, q queue.RegisteredQueueName) error
CheckCompatibility(p intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error
}

// runtimeCapabilityChecker is the default implementation used in production.
// It instantiates the actual plugins to inspect their required and provided capabilities.
type runtimeCapabilityChecker struct{}

func (r *runtimeCapabilityChecker) CheckCompatibility(p intra.RegisteredPolicyName, q queue.RegisteredQueueName) error {
tempPolicy, err := intra.NewPolicyFromName(p)
func (r *runtimeCapabilityChecker) CheckCompatibility(p intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error {
tempPolicy, err := intraflow.NewPolicyFromName(p)
if err != nil {
return fmt.Errorf("failed to validate policy %q: %w", p, err)
}
Expand Down Expand Up @@ -154,7 +153,7 @@ type PriorityBandConfig struct {
// IntraFlowDispatchPolicy specifies the default name of the policy used to select a request from within a single
// flow's queue in this band.
// Optional: Defaults to defaultIntraFlowDispatchPolicy ("FCFS").
IntraFlowDispatchPolicy intra.RegisteredPolicyName
IntraFlowDispatchPolicy intraflow.RegisteredPolicyName

// InterFlowDispatchPolicy specifies the name of the policy used to select which flow's queue to service next from
// this band.
Expand Down Expand Up @@ -253,8 +252,8 @@ func withCapabilityChecker(checker capabilityChecker) ConfigOption {
// PriorityBandConfigOption defines a functional option for configuring a single PriorityBandConfig.
type PriorityBandConfigOption func(*PriorityBandConfig) error

// WithIntraFlowPolicy sets the intra-flow dispatch policy (e.g., "FCFS").
func WithIntraFlowPolicy(name intra.RegisteredPolicyName) PriorityBandConfigOption {
// WithIntraFlowPolicy sets the intraflow-flow dispatch policy (e.g., "FCFS").
func WithIntraFlowPolicy(name intraflow.RegisteredPolicyName) PriorityBandConfigOption {
return func(p *PriorityBandConfig) error {
if name == "" {
return errors.New("IntraFlowDispatchPolicy cannot be empty")
Expand Down
15 changes: 7 additions & 8 deletions pkg/epp/flowcontrol/registry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ import (
"github.com/stretchr/testify/require"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
)

// mockCapabilityChecker is a test double for verifying that NewConfig correctly delegates compatibility checks.
type mockCapabilityChecker struct {
checkCompatibilityFunc func(intra intra.RegisteredPolicyName, q queue.RegisteredQueueName) error
checkCompatibilityFunc func(intra intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error
}

func (m *mockCapabilityChecker) CheckCompatibility(p intra.RegisteredPolicyName, q queue.RegisteredQueueName) error {
func (m *mockCapabilityChecker) CheckCompatibility(p intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error {
if m.checkCompatibilityFunc != nil {
return m.checkCompatibilityFunc(p, q)
}
Expand Down Expand Up @@ -128,7 +127,7 @@ func TestNewConfig(t *testing.T) {
Queue: "CustomQueue",
}),
withCapabilityChecker(&mockCapabilityChecker{
checkCompatibilityFunc: func(_ intra.RegisteredPolicyName, _ queue.RegisteredQueueName) error { return nil },
checkCompatibilityFunc: func(_ intraflow.RegisteredPolicyName, _ queue.RegisteredQueueName) error { return nil },
}),
},
assertion: func(t *testing.T, cfg *Config) {
Expand Down Expand Up @@ -191,7 +190,7 @@ func TestNewConfig(t *testing.T) {
opts: []ConfigOption{
WithPriorityBand(mustBand(t, 1, "High")),
withCapabilityChecker(&mockCapabilityChecker{
checkCompatibilityFunc: func(p intra.RegisteredPolicyName, q queue.RegisteredQueueName) error {
checkCompatibilityFunc: func(p intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error {
return contracts.ErrPolicyQueueIncompatible
},
}),
Expand All @@ -210,7 +209,7 @@ func TestNewConfig(t *testing.T) {
name: "ShouldError_WhenDefaultRuntimeCheckerDetectsUnknownQueue",
opts: []ConfigOption{
WithPriorityBand(mustBand(t, 1, "BadBand",
WithIntraFlowPolicy(fcfs.FCFSPolicyName),
WithIntraFlowPolicy(intraflow.FCFSPolicyName),
WithQueue("non-existent-queue"),
)),
},
Expand Down Expand Up @@ -254,7 +253,7 @@ func TestNewPriorityBandConfig(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, queue.RegisteredQueueName("CustomQueue"), pb.Queue)
assert.Equal(t, uint64(999), pb.MaxBytes)
assert.Equal(t, intra.RegisteredPolicyName("CustomPolicy"), pb.IntraFlowDispatchPolicy)
assert.Equal(t, intraflow.RegisteredPolicyName("CustomPolicy"), pb.IntraFlowDispatchPolicy)
assert.Equal(t, defaultInterFlowDispatchPolicy, pb.InterFlowDispatchPolicy) // Unchanged default
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/flowcontrol/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
Expand Down Expand Up @@ -591,7 +591,7 @@ func (fr *FlowRegistry) buildFlowComponents(key types.FlowKey, numInstances int)

allComponents := make([]flowComponents, numInstances)
for i := range numInstances {
policy, err := intra.NewPolicyFromName(bandConfig.IntraFlowDispatchPolicy)
policy, err := intraflow.NewPolicyFromName(bandConfig.IntraFlowDispatchPolicy)
if err != nil {
return nil, fmt.Errorf("failed to instantiate intra-flow policy %q for flow %s: %w",
bandConfig.IntraFlowDispatchPolicy, key, err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/flowcontrol/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
testclock "k8s.io/utils/clock/testing"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestFlowRegistry_WithConnection_AndHandle(t *testing.T) {
t.Run("ShouldFail_WhenJITFails", func(t *testing.T) {
t.Parallel()

badPolicyName := intra.RegisteredPolicyName("non-existent-policy")
badPolicyName := intraflow.RegisteredPolicyName("non-existent-policy")
badBand, err := NewPriorityBandConfig(highPriority, "High", WithIntraFlowPolicy(badPolicyName))
require.NoError(t, err)

Expand All @@ -208,7 +208,7 @@ func TestFlowRegistry_WithConnection_AndHandle(t *testing.T) {
cfg, err := NewConfig(
WithPriorityBand(badBand),
withCapabilityChecker(&mockCapabilityChecker{
checkCompatibilityFunc: func(_ intra.RegisteredPolicyName, _ queue.RegisteredQueueName) error {
checkCompatibilityFunc: func(_ intraflow.RegisteredPolicyName, _ queue.RegisteredQueueName) error {
return nil // Approve everything.
},
}),
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/flowcontrol/registry/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
Expand Down Expand Up @@ -92,7 +92,7 @@ func newShardTestHarness(t *testing.T) *shardTestHarness {
func (h *shardTestHarness) synchronizeFlow(key types.FlowKey) {
h.t.Helper()
spec := types.FlowSpecification{Key: key}
policy, err := intra.NewPolicyFromName(defaultIntraFlowDispatchPolicy)
policy, err := intraflow.NewPolicyFromName(defaultIntraFlowDispatchPolicy)
assert.NoError(h.t, err, "Helper synchronizeFlow: failed to create real intra-flow policy for synchronization")
q, err := queue.NewQueueFromName(defaultQueue, policy.Comparator())
assert.NoError(h.t, err, "Helper synchronizeFlow: failed to create real queue for synchronization")
Expand Down