Skip to content

Commit 643e8b6

Browse files
committed
feat(streaming): lookup usage by customer id and key
1 parent 77ad9f7 commit 643e8b6

File tree

2 files changed

+32
-4
lines changed

2 files changed

+32
-4
lines changed

openmeter/streaming/clickhouse/meter_query.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,26 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) {
202202

203203
// Add the case statements for each subject to customer ID mapping
204204
for _, customer := range d.FilterCustomer {
205+
// Add the customer ID to the case statement
206+
str := fmt.Sprintf(
207+
"WHEN %s = '%s' THEN '%s' ",
208+
getColumn("subject"),
209+
sqlbuilder.Escape(customer.ID),
210+
sqlbuilder.Escape(customer.ID),
211+
)
212+
caseBuilder.WriteString(str)
213+
214+
// Add the customer key to the case statement
215+
if customer.Key != nil {
216+
str := fmt.Sprintf(
217+
"WHEN %s = '%s' THEN '%s' ",
218+
getColumn("subject"),
219+
sqlbuilder.Escape(*customer.Key),
220+
sqlbuilder.Escape(customer.ID),
221+
)
222+
caseBuilder.WriteString(str)
223+
}
224+
205225
for _, subjectKey := range customer.UsageAttribution.SubjectKeys {
206226
str := fmt.Sprintf(
207227
"WHEN %s = '%s' THEN '%s' ",
@@ -309,6 +329,14 @@ func (d *queryMeter) subjectWhere(query *sqlbuilder.SelectBuilder) *sqlbuilder.S
309329

310330
for _, customer := range d.FilterCustomer {
311331
subjects = append(subjects, customer.UsageAttribution.SubjectKeys...)
332+
333+
// Add the customer ID
334+
subjects = append(subjects, customer.ID)
335+
336+
// Add the customer key if it's set
337+
if customer.Key != nil {
338+
subjects = append(subjects, *customer.Key)
339+
}
312340
}
313341

314342
query = query.Where(query.Or(slicesx.Map(subjects, mapFunc)...))

openmeter/streaming/clickhouse/meter_query_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,8 @@ func TestQueryMeter(t *testing.T) {
290290
},
291291
GroupBy: []string{"customer_id"},
292292
},
293-
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, CASE WHEN om_events.subject = 'subject1' THEN 'customer1' WHEN om_events.subject = 'subject2' THEN 'customer2' ELSE '' END AS customer_id FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ? OR om_events.subject = ?) GROUP BY customer_id",
294-
wantArgs: []interface{}{"my_namespace", "event1", "subject1", "subject2"},
293+
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, CASE WHEN om_events.subject = 'customer1' THEN 'customer1' WHEN om_events.subject = 'subject1' THEN 'customer1' WHEN om_events.subject = 'customer2' THEN 'customer2' WHEN om_events.subject = 'subject2' THEN 'customer2' ELSE '' END AS customer_id FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ? OR om_events.subject = ? OR om_events.subject = ? OR om_events.subject = ?) GROUP BY customer_id",
294+
wantArgs: []interface{}{"my_namespace", "event1", "subject1", "customer1", "subject2", "customer2"},
295295
},
296296
{ // Filter by both customer and subject
297297
query: queryMeter{
@@ -320,8 +320,8 @@ func TestQueryMeter(t *testing.T) {
320320
FilterSubject: []string{"subject1"},
321321
GroupBy: []string{"customer_id"},
322322
},
323-
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, CASE WHEN om_events.subject = 'subject1' THEN 'customer1' WHEN om_events.subject = 'subject2' THEN 'customer1' ELSE '' END AS customer_id FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ? OR om_events.subject = ?) AND (om_events.subject = ?) GROUP BY customer_id",
324-
wantArgs: []interface{}{"my_namespace", "event1", "subject1", "subject2", "subject1"},
323+
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, CASE WHEN om_events.subject = 'customer1' THEN 'customer1' WHEN om_events.subject = 'subject1' THEN 'customer1' WHEN om_events.subject = 'subject2' THEN 'customer1' ELSE '' END AS customer_id FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ? OR om_events.subject = ? OR om_events.subject = ?) AND (om_events.subject = ?) GROUP BY customer_id",
324+
wantArgs: []interface{}{"my_namespace", "event1", "subject1", "subject2", "customer1", "subject1"},
325325
},
326326
{ // Aggregate data with filtering for a single group and single value
327327
query: queryMeter{

0 commit comments

Comments
 (0)