Skip to content

Commit d73b821

Browse files
hekikeGAlexIHU
andauthored
feat(streaming): lookup usage by customer key and subjects (#3178)
Co-authored-by: Alex Goth <[email protected]>
1 parent 041095e commit d73b821

File tree

11 files changed

+119
-61
lines changed

11 files changed

+119
-61
lines changed

openmeter/customer/adapter/customer.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -462,13 +462,20 @@ func (a *adapter) GetCustomerByUsageAttribution(ctx context.Context, input custo
462462

463463
query := repo.db.Customer.Query().
464464
Where(customerdb.Namespace(input.Namespace)).
465-
Where(customerdb.HasSubjectsWith(
466-
customersubjectsdb.SubjectKey(input.SubjectKey),
467-
customersubjectsdb.Or(
468-
customersubjectsdb.DeletedAtIsNil(),
469-
customersubjectsdb.DeletedAtGT(now),
465+
Where(
466+
customerdb.Or(
467+
// We lookup the customer by subject key in the subjects table
468+
customerdb.HasSubjectsWith(
469+
customersubjectsdb.SubjectKey(input.Key),
470+
customersubjectsdb.Or(
471+
customersubjectsdb.DeletedAtIsNil(),
472+
customersubjectsdb.DeletedAtGT(now),
473+
),
474+
),
475+
// Or else we lookup the customer by key in the customers table
476+
customerdb.Key(input.Key),
470477
),
471-
)).
478+
).
472479
Where(customerdb.DeletedAtIsNil())
473480
query = WithSubjects(query, now)
474481
if slices.Contains(input.Expands, customer.ExpandSubscriptions) {
@@ -479,7 +486,7 @@ func (a *adapter) GetCustomerByUsageAttribution(ctx context.Context, input custo
479486
if err != nil {
480487
if entdb.IsNotFound(err) {
481488
return nil, models.NewGenericNotFoundError(
482-
fmt.Errorf("customer with subject key %s not found in %s namespace", input.SubjectKey, input.Namespace),
489+
fmt.Errorf("customer with subject key %s not found in %s namespace", input.Key, input.Namespace),
483490
)
484491
}
485492

openmeter/customer/customer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,10 @@ func (c CustomerUsageAttribution) GetFirstSubjectKey() (string, error) {
231231

232232
// GetCustomerByUsageAttributionInput represents the input for the GetCustomerByUsageAttribution method
233233
type GetCustomerByUsageAttributionInput struct {
234-
Namespace string
235-
SubjectKey string
234+
Namespace string
235+
236+
// The key of either the customer or one of its subjects
237+
Key string
236238

237239
// Expand
238240
Expands Expands
@@ -243,7 +245,7 @@ func (i GetCustomerByUsageAttributionInput) Validate() error {
243245
return models.NewGenericValidationError(errors.New("namespace is required"))
244246
}
245247

246-
if i.SubjectKey == "" {
248+
if i.Key == "" {
247249
return models.NewGenericValidationError(errors.New("subject key is required"))
248250
}
249251

openmeter/customer/service/hooks/subjectcustomer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ func (s subjectCustomerHook) PostDelete(ctx context.Context, sub *subject.Subjec
6868

6969
// Let's get the customer by usage attribution
7070
cus, err := s.provisioner.customer.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{
71-
Namespace: sub.Namespace,
72-
SubjectKey: sub.Key,
71+
Namespace: sub.Namespace,
72+
Key: sub.Key,
7373
})
7474
if err != nil {
7575
if models.IsGenericNotFoundError(err) {
@@ -305,8 +305,8 @@ var ErrCustomerKeyConflict = errors.New("customer key conflict")
305305
func (p CustomerProvisioner) getCustomerForSubject(ctx context.Context, sub *subject.Subject) (*customer.Customer, error) {
306306
// Try to find Customer for Subject by usage attribution
307307
cus, err := p.customer.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{
308-
Namespace: sub.Namespace,
309-
SubjectKey: sub.Key,
308+
Namespace: sub.Namespace,
309+
Key: sub.Key,
310310
})
311311
if err != nil && !models.IsGenericNotFoundError(err) {
312312
return nil, err

openmeter/customer/service/hooks/subjectcustomer_test.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,11 @@ func TestCustomerProvisioner_EnsureCustomer(t *testing.T) {
150150
require.NoError(t, err, "creating subject should not fail")
151151
assert.NotNilf(t, sub, "subject must not be nil")
152152

153-
cus, err := env.CustomerService.CreateCustomer(ctx, customer.CreateCustomerInput{
153+
cusForSubject, err := provisioner.EnsureCustomer(ctx, &sub)
154+
require.NoError(t, err, "provisioning customer should not fail")
155+
assert.NotNilf(t, cusForSubject, "customer must not be nil")
156+
157+
_, err = env.CustomerService.CreateCustomer(ctx, customer.CreateCustomerInput{
154158
Namespace: namespace,
155159
CustomerMutate: customer.CustomerMutate{
156160
Key: lo.ToPtr(sub.Key),
@@ -170,22 +174,8 @@ func TestCustomerProvisioner_EnsureCustomer(t *testing.T) {
170174
Annotation: nil,
171175
},
172176
})
173-
require.NoError(t, err, "creating customer should not fail")
174-
assert.NotNilf(t, cus, "customer must not be nil")
175-
176-
cus, err = provisioner.EnsureCustomer(ctx, &sub)
177-
require.NoError(t, err, "provisioning customer should not fail")
178-
assert.NotNilf(t, cus, "customer must not be nil")
179177

180-
cus, err = env.CustomerService.GetCustomer(ctx, customer.GetCustomerInput{
181-
CustomerID: &customer.CustomerID{
182-
Namespace: cus.Namespace,
183-
ID: cus.ID,
184-
},
185-
})
186-
require.NoErrorf(t, err, "getting customer for subject should not fail")
187-
assert.NotNilf(t, cus, "customer must not be nil")
188-
AssertSubjectCustomerEqual(t, &sub, cus)
178+
require.True(t, models.IsGenericConflictError(err), "creating customer should fail with conflict")
189179
})
190180

191181
t.Run("CustomerKeyMismatch", func(t *testing.T) {

openmeter/customer/service/service_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ func Test_CustomerService(t *testing.T) {
8686

8787
t.Run("ByUsageAttribution", func(t *testing.T) {
8888
cusByUsage, err := env.CustomerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{
89-
Namespace: cus.Namespace,
90-
SubjectKey: cus.UsageAttribution.SubjectKeys[0],
89+
Namespace: cus.Namespace,
90+
Key: cus.UsageAttribution.SubjectKeys[0],
9191
})
9292
require.NoError(t, err, "getting customer usage attribution should not fail")
9393
assert.NotNilf(t, cusByUsage, "customer must not be nil")
@@ -128,8 +128,8 @@ func Test_CustomerService(t *testing.T) {
128128

129129
t.Run("ByUsageAttribution", func(t *testing.T) {
130130
cusByUsage, err := env.CustomerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{
131-
Namespace: cus.Namespace,
132-
SubjectKey: subjectKeys[1],
131+
Namespace: cus.Namespace,
132+
Key: subjectKeys[1],
133133
})
134134
require.NoError(t, err, "getting customer usage attribution should not fail")
135135
assert.NotNilf(t, cusByUsage, "customer must not be nil")

openmeter/entitlement/driver/entitlement.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -569,8 +569,8 @@ func (h *entitlementHandler) resolveCustomerFromSubject(ctx context.Context, nam
569569
}
570570

571571
cust, err := h.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{
572-
Namespace: namespace,
573-
SubjectKey: subj.Key,
572+
Namespace: namespace,
573+
Key: subj.Key,
574574
})
575575
if err != nil {
576576
return nil, err

openmeter/entitlement/driver/metered.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,8 +395,8 @@ func (h *meteredEntitlementHandler) resolveCustomerFromSubject(ctx context.Conte
395395
}
396396

397397
cust, err := h.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{
398-
Namespace: namespace,
399-
SubjectKey: subj.Key,
398+
Namespace: namespace,
399+
Key: subj.Key,
400400
})
401401
if err != nil {
402402
return nil, err

openmeter/meterevent/adapter/event.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ func (a *adapter) enrichEventsWithCustomerID(ctx context.Context, namespace stri
277277
// FIXME: do this in a batches to avoid hitting the database for each event
278278
// Get the customer by usage attribution subject key
279279
cust, err := a.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{
280-
Namespace: namespace,
281-
SubjectKey: event.Subject,
280+
Namespace: namespace,
281+
Key: event.Subject,
282282
})
283283
if err != nil {
284284
if models.IsGenericNotFoundError(err) {

openmeter/streaming/clickhouse/meter_query_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ func TestQueryMeter(t *testing.T) {
375375
},
376376
ID: "customer1",
377377
},
378+
Key: lo.ToPtr("customer-key-1"),
378379
UsageAttribution: customer.CustomerUsageAttribution{
379380
SubjectKeys: []string{"subject1"},
380381
},
@@ -392,10 +393,17 @@ func TestQueryMeter(t *testing.T) {
392393
},
393394
},
394395
},
395-
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.subject IN (?)",
396-
wantArgs: []interface{}{"my_namespace", "event1", []string{"subject1", "subject2"}},
396+
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.subject IN (?)",
397+
wantArgs: []interface{}{"my_namespace", "event1", []string{
398+
// Only the first customer has a key
399+
"customer-key-1",
400+
// Usage attribution subjects of the first customer
401+
"subject1",
402+
// Usage attribution subjects of the second customer
403+
"subject2",
404+
}},
397405
},
398-
{
406+
{ // Filter by both customer and subject
399407
name: "Filter by both customer and subject",
400408
query: queryMeter{
401409
Database: "openmeter",

openmeter/streaming/clickhouse/queryhelper.go

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55
"strings"
66

77
"github.com/huandu/go-sqlbuilder"
8-
"github.com/samber/lo"
98

109
"github.com/openmeterio/openmeter/openmeter/streaming"
1110
)
1211

12+
const subjectToCustomerIDDictionary = "subject_to_customer_id"
13+
1314
// selectCustomerIdColumn
1415
func selectCustomerIdColumn(eventsTableName string, customers []streaming.Customer, query *sqlbuilder.SelectBuilder) *sqlbuilder.SelectBuilder {
1516
// If there are no customers, we return an empty customer id column
@@ -21,28 +22,43 @@ func selectCustomerIdColumn(eventsTableName string, customers []streaming.Custom
2122
getColumn := columnFactory(eventsTableName)
2223
subjectColumn := getColumn("subject")
2324

24-
// Build a map of subject to customer id
25+
// Build a map of event subjects to customer ids
2526
var values []string
2627

28+
// For each customer, we map event subjects to customer ids
2729
for _, customer := range customers {
28-
// Add each subject key to the map and map it to the customer id
30+
customerIDSQL := fmt.Sprintf("'%s'", sqlbuilder.Escape(customer.GetUsageAttribution().ID))
31+
32+
// We map the customer key to the customer id if it exists
33+
if customer.GetUsageAttribution().Key != nil {
34+
customerKeySQL := fmt.Sprintf("'%s'", sqlbuilder.Escape(*customer.GetUsageAttribution().Key))
35+
values = append(values, customerKeySQL, customerIDSQL)
36+
}
37+
38+
// We map each subject key to the customer id
2939
for _, subjectKey := range customer.GetUsageAttribution().SubjectKeys {
3040
subjectSQL := fmt.Sprintf("'%s'", sqlbuilder.Escape(subjectKey))
31-
customerIDSQL := fmt.Sprintf("'%s'", sqlbuilder.Escape(customer.GetUsageAttribution().ID))
3241

3342
values = append(values, subjectSQL, customerIDSQL)
3443
}
3544
}
3645

37-
mapAs := "subject_to_customer_id"
38-
mapSQL := fmt.Sprintf("WITH map(%s) as %s", strings.Join(values, ", "), mapAs)
46+
// If there are no values, we return an empty customer id column
47+
// This can happen if none of the customers has key or usage attribution subjects
48+
if len(values) == 0 {
49+
return query.SelectMore("'' AS customer_id")
50+
}
51+
52+
// Name of the map (dictionary)
53+
54+
mapSQL := fmt.Sprintf("WITH map(%s) as %s", strings.Join(values, ", "), subjectToCustomerIDDictionary)
3955

4056
// Add the map to query via WITH clause
4157
mapQuery := sqlbuilder.ClickHouse.NewCTEBuilder().SQL(mapSQL)
4258
query = query.With(mapQuery)
4359

4460
// Select the customer id column
45-
query = query.SelectMore(fmt.Sprintf("%s[%s] AS customer_id", mapAs, subjectColumn))
61+
query = query.SelectMore(fmt.Sprintf("%s[%s] AS customer_id", subjectToCustomerIDDictionary, subjectColumn))
4662

4763
return query
4864
}
@@ -58,12 +74,26 @@ func customersWhere(eventsTableName string, customers []streaming.Customer, quer
5874
getColumn := columnFactory(eventsTableName)
5975
subjectColumn := getColumn("subject")
6076

61-
// If the customer filter is provided, we add all the subjects to the filter
62-
subjects := lo.Map(customers, func(customer streaming.Customer, _ int) []string {
63-
return customer.GetUsageAttribution().SubjectKeys
64-
})
77+
var subjects []string
78+
79+
// Collect all the subjects from the customers
80+
for _, customer := range customers {
81+
// Add the customer key to the filter if it exists
82+
if customer.GetUsageAttribution().Key != nil {
83+
subjects = append(subjects, *customer.GetUsageAttribution().Key)
84+
}
85+
86+
// Add each subject key to the filter
87+
subjects = append(subjects, customer.GetUsageAttribution().SubjectKeys...)
88+
}
89+
90+
// If there are no subjects, we return an empty subject filter
91+
// This can happen if none of the customers has key or usage attribution subjects
92+
if len(subjects) == 0 {
93+
return query
94+
}
6595

66-
return query.Where(query.In(subjectColumn, lo.Flatten(subjects)))
96+
return query.Where(query.In(subjectColumn, subjects))
6797
}
6898

6999
// subjectWhere applies the subject filter to the query.

0 commit comments

Comments
 (0)