1818import com .facebook .presto .plugin .clp .ClpSplit ;
1919import com .facebook .presto .plugin .clp .ClpTableHandle ;
2020import com .facebook .presto .plugin .clp .ClpTableLayoutHandle ;
21+ import com .facebook .presto .plugin .clp .optimization .ClpTopNSpec ;
2122import com .fasterxml .jackson .databind .JsonNode ;
2223import com .fasterxml .jackson .databind .ObjectMapper ;
2324import com .google .common .collect .ImmutableList ;
2829import java .io .InputStream ;
2930import java .io .OutputStream ;
3031import java .net .HttpURLConnection ;
32+ import java .net .MalformedURLException ;
3133import java .net .URL ;
3234import java .nio .charset .StandardCharsets ;
35+ import java .util .ArrayList ;
3336import java .util .Collections ;
3437import java .util .Iterator ;
3538import java .util .List ;
39+ import java .util .Optional ;
3640
3741import static com .facebook .presto .plugin .clp .ClpSplit .SplitType ;
3842import static com .facebook .presto .plugin .clp .ClpSplit .SplitType .ARCHIVE ;
3943import static com .facebook .presto .plugin .clp .ClpSplit .SplitType .IR ;
4044import static java .lang .String .format ;
45+ import static java .util .Comparator .comparingLong ;
46+ import static java .util .Objects .requireNonNull ;
4147import static java .util .concurrent .TimeUnit .SECONDS ;
4248
4349public class ClpPinotSplitProvider
4450 implements ClpSplitProvider
4551{
4652 private static final Logger log = Logger .get (ClpPinotSplitProvider .class );
4753 private static final String SQL_SELECT_SPLITS_TEMPLATE = "SELECT tpath FROM %s WHERE 1 = 1 AND (%s) LIMIT 999999" ;
54+ private static final String SQL_SELECT_SPLIT_META_TEMPLATE = "SELECT tpath, creationtime, lastmodifiedtime, num_messages FROM %s WHERE 1 = 1 AND (%s) ORDER BY %s %s LIMIT 999999" ;
4855 private final ClpConfig config ;
56+ private final URL pinotDatabaseUrl ;
4957
5058 @ Inject
5159 public ClpPinotSplitProvider (ClpConfig config )
5260 {
53- this .config = config ;
61+ this .config = requireNonNull (config , "config is null" );
62+ try {
63+ this .pinotDatabaseUrl = new URL (config .getMetadataDbUrl () + "/query/sql" );
64+ }
65+ catch (MalformedURLException e ) {
66+ throw new IllegalArgumentException (
67+ format ("Invalid Pinot database URL: %s/query/sql" , config .getMetadataDbUrl ()), e );
68+ }
5469 }
5570
5671 @ Override
5772 public List <ClpSplit > listSplits (ClpTableLayoutHandle clpTableLayoutHandle )
5873 {
59- ImmutableList .Builder <ClpSplit > splits = new ImmutableList .Builder <>();
6074 ClpTableHandle clpTableHandle = clpTableLayoutHandle .getTable ();
75+ Optional <ClpTopNSpec > topNSpecOptional = clpTableLayoutHandle .getTopN ();
6176 String tableName = clpTableHandle .getSchemaTableName ().getTableName ();
6277 try {
63- URL url = new URL (config .getMetadataDbUrl () + "/query/sql" );
78+ ImmutableList .Builder <ClpSplit > splits = new ImmutableList .Builder <>();
79+ if (topNSpecOptional .isPresent ()) {
80+ ClpTopNSpec topNSpec = topNSpecOptional .get ();
81+ // Only handles one range metadata column for now (first ordering)
82+ ClpTopNSpec .Ordering ordering = topNSpec .getOrderings ().get (0 );
83+ // Get the last column in the ordering (the primary sort column for nested fields)
84+ String col = ordering .getColumns ().get (ordering .getColumns ().size () - 1 );
85+ String dir = (ordering .getOrder () == ClpTopNSpec .Order .ASC ) ? "ASC" : "DESC" ;
86+ String splitMetaQuery = format (SQL_SELECT_SPLIT_META_TEMPLATE , tableName , clpTableLayoutHandle .getMetadataSql ().orElse ("1 = 1" ), col , dir );
87+ List <ArchiveMeta > archiveMetaList = fetchArchiveMeta (splitMetaQuery , ordering );
88+ List <ArchiveMeta > selected = selectTopNArchives (archiveMetaList , topNSpec .getLimit (), ordering .getOrder ());
89+
90+ for (ArchiveMeta a : selected ) {
91+ String splitPath = a .id ;
92+ splits .add (new ClpSplit (splitPath , determineSplitType (splitPath ), clpTableLayoutHandle .getKqlQuery ()));
93+ }
94+
95+ List <ClpSplit > filteredSplits = splits .build ();
96+ log .debug ("Number of topN filtered splits: %s" , filteredSplits .size ());
97+ return filteredSplits ;
98+ }
99+
100+ String splitQuery = format (SQL_SELECT_SPLITS_TEMPLATE , tableName , clpTableLayoutHandle .getMetadataSql ().orElse ("1 = 1" ));
101+ List <JsonNode > splitRows = getQueryResult (pinotDatabaseUrl , splitQuery );
102+ for (JsonNode row : splitRows ) {
103+ String splitPath = row .elements ().next ().asText ();
104+ splits .add (new ClpSplit (splitPath , determineSplitType (splitPath ), clpTableLayoutHandle .getKqlQuery ()));
105+ }
106+
107+ List <ClpSplit > filteredSplits = splits .build ();
108+ log .debug ("Number of filtered splits: %s" , filteredSplits .size ());
109+ return filteredSplits ;
110+ }
111+ catch (Exception e ) {
112+ log .error (e , "Failed to list splits for table %s" , tableName );
113+ return Collections .emptyList ();
114+ }
115+ }
116+
117+ /**
118+ * Fetches archive metadata from the database.
119+ *
120+ * @param query SQL query string that selects the archives
121+ * @param ordering The top-N ordering specifying which columns contain lowerBound/upperBound
122+ * @return List of ArchiveMeta objects representing archive metadata
123+ */
124+ private List <ArchiveMeta > fetchArchiveMeta (String query , ClpTopNSpec .Ordering ordering )
125+ {
126+ ImmutableList .Builder <ArchiveMeta > archiveMetas = new ImmutableList .Builder <>();
127+ List <JsonNode > rows = getQueryResult (pinotDatabaseUrl , query );
128+ for (JsonNode row : rows ) {
129+ archiveMetas .add (new ArchiveMeta (
130+ row .get (0 ).asText (),
131+ row .get (1 ).asLong (),
132+ row .get (2 ).asLong (),
133+ row .get (3 ).asLong ()));
134+ }
135+ return archiveMetas .build ();
136+ }
137+
138+ /**
139+ * Selects the set of archives that must be scanned to guarantee the top-N results by timestamp
140+ * (ASC or DESC), given only archive ranges and message counts.
141+ * <ul>
142+ * <li>Merges overlapping archives into components (union of time ranges).</li>
143+ * <li>For DESC: always include the newest component, then add older ones until their total
144+ * message counts cover the limit.</li>
145+ * <li>For ASC: symmetric — start from the oldest, then add newer ones.</li>
146+ * </ul>
147+
148+ * @param archives list of archives with [lowerBound, upperBound, messageCount]
149+ * @param limit number of messages requested
150+ * @param order ASC (earliest first) or DESC (latest first)
151+ * @return archives that must be scanned
152+ */
153+ private static List <ArchiveMeta > selectTopNArchives (List <ArchiveMeta > archives , long limit , ClpTopNSpec .Order order )
154+ {
155+ if (archives == null || archives .isEmpty () || limit <= 0 ) {
156+ return ImmutableList .of ();
157+ }
158+ requireNonNull (order , "order is null" );
159+
160+ // 1) Merge overlaps into groups
161+ List <ArchiveGroup > groups = toArchiveGroups (archives );
162+
163+ if (groups .isEmpty ()) {
164+ return ImmutableList .of ();
165+ }
166+
167+ // 2) Pick minimal set of groups per order, then return all member archives
168+ List <ArchiveMeta > selected = new ArrayList <>();
169+ if (order == ClpTopNSpec .Order .DESC ) {
170+ // newest group index
171+ int k = groups .size () - 1 ;
172+
173+ // must include newest group
174+ selected .addAll (groups .get (k ).members );
175+
176+ // assume worst case: newest contributes 0 after filter; cover limit from older groups
177+ long coveredByOlder = 0 ;
178+ for (int i = k - 1 ; i >= 0 && coveredByOlder < limit ; --i ) {
179+ selected .addAll (groups .get (i ).members );
180+ coveredByOlder += groups .get (i ).count ;
181+ }
182+ }
183+ else {
184+ // oldest group index
185+ int k = 0 ;
186+
187+ // must include oldest group
188+ selected .addAll (groups .get (k ).members );
189+
190+ // assume worst case: oldest contributes 0; cover limit from newer groups
191+ long coveredByNewer = 0 ;
192+ for (int i = k + 1 ; i < groups .size () && coveredByNewer < limit ; ++i ) {
193+ selected .addAll (groups .get (i ).members );
194+ coveredByNewer += groups .get (i ).count ;
195+ }
196+ }
197+
198+ return selected ;
199+ }
200+
201+ /**
202+ * Groups overlapping archives into non-overlapping archive groups.
203+ *
204+ * @param archives archives sorted by lowerBound
205+ * @return merged components
206+ */
207+ private static List <ArchiveGroup > toArchiveGroups (List <ArchiveMeta > archives )
208+ {
209+ List <ArchiveMeta > sorted = new ArrayList <>(archives );
210+ sorted .sort (comparingLong ((ArchiveMeta a ) -> a .lowerBound )
211+ .thenComparingLong (a -> a .upperBound ));
212+
213+ List <ArchiveGroup > groups = new ArrayList <>();
214+ ArchiveGroup cur = null ;
215+
216+ for (ArchiveMeta a : sorted ) {
217+ if (cur == null ) {
218+ cur = startArchiveGroup (a );
219+ }
220+ else if (overlaps (cur , a )) {
221+ // extend current component
222+ cur .end = Math .max (cur .end , a .upperBound );
223+ cur .count += a .messageCount ;
224+ cur .members .add (a );
225+ }
226+ else {
227+ // finalize current, start a new one
228+ groups .add (cur );
229+ cur = startArchiveGroup (a );
230+ }
231+ }
232+ if (cur != null ) {
233+ groups .add (cur );
234+ }
235+ return groups ;
236+ }
237+
238+ private static ArchiveGroup startArchiveGroup (ArchiveMeta a )
239+ {
240+ ArchiveGroup group = new ArchiveGroup ();
241+ group .begin = a .lowerBound ;
242+ group .end = a .upperBound ;
243+ group .count = a .messageCount ;
244+ group .members .add (a );
245+ return group ;
246+ }
247+
248+ private static boolean overlaps (ArchiveGroup cur , ArchiveMeta a )
249+ {
250+ return a .lowerBound <= cur .end && a .upperBound >= cur .begin ;
251+ }
252+
253+ /**
254+ * Determines the split type based on file path extension.
255+ *
256+ * @param splitPath the file path
257+ * @return IR for .clp.zst files, ARCHIVE otherwise
258+ */
259+ private static SplitType determineSplitType (String splitPath )
260+ {
261+ return splitPath .endsWith (".clp.zst" ) ? IR : ARCHIVE ;
262+ }
263+
264+ private static List <JsonNode > getQueryResult (URL url , String sql )
265+ {
266+ try {
64267 HttpURLConnection conn = (HttpURLConnection ) url .openConnection ();
65268 conn .setRequestMethod ("POST" );
66269 conn .setRequestProperty ("Content-Type" , "application/json" );
@@ -69,10 +272,9 @@ public List<ClpSplit> listSplits(ClpTableLayoutHandle clpTableLayoutHandle)
69272 conn .setConnectTimeout ((int ) SECONDS .toMillis (5 ));
70273 conn .setReadTimeout ((int ) SECONDS .toMillis (30 ));
71274
72- String query = format (SQL_SELECT_SPLITS_TEMPLATE , tableName , clpTableLayoutHandle .getMetadataSql ().orElse ("1 = 1" ));
73- log .info ("Pinot query: %s" , query );
275+ log .info ("Executing Pinot query: %s" , sql );
74276 ObjectMapper mapper = new ObjectMapper ();
75- String body = format ("{\" sql\" : %s }" , mapper .writeValueAsString (query ));
277+ String body = format ("{\" sql\" : %s }" , mapper .writeValueAsString (sql ));
76278 try (OutputStream os = conn .getOutputStream ()) {
77279 os .write (body .getBytes (StandardCharsets .UTF_8 ));
78280 }
@@ -89,26 +291,66 @@ public List<ClpSplit> listSplits(ClpTableLayoutHandle clpTableLayoutHandle)
89291 }
90292 JsonNode resultTable = root .get ("resultTable" );
91293 if (resultTable == null ) {
92- throw new RuntimeException ( "No \" resultTable\" field found " );
294+ throw new IllegalStateException ( "Pinot query response missing ' resultTable' field" );
93295 }
94296 JsonNode rows = resultTable .get ("rows" );
95297 if (rows == null ) {
96- throw new RuntimeException ( "No \" rows\" field found " );
298+ throw new IllegalStateException ( "Pinot query response missing ' rows' field in resultTable " );
97299 }
300+ ImmutableList .Builder <JsonNode > resultBuilder = ImmutableList .builder ();
98301 for (Iterator <JsonNode > it = rows .elements (); it .hasNext (); ) {
99302 JsonNode row = it .next ();
100- String splitPath = row .elements ().next ().asText ();
101- SplitType splitType = splitPath .endsWith (".clp.zst" ) ? IR : ARCHIVE ;
102- splits .add (new ClpSplit (splitPath , splitType , clpTableLayoutHandle .getKqlQuery ()));
303+ resultBuilder .add (row );
103304 }
104- List <ClpSplit > filteredSplits = splits .build ();
105- log .debug ("Number of filtered splits: %s" , filteredSplits .size ());
106- return filteredSplits ;
305+ List <JsonNode > results = resultBuilder .build ();
306+ log .debug ("Number of results: %s" , results .size ());
307+ return results ;
308+ }
309+ catch (IOException e ) {
310+ log .error (e , "IO error executing Pinot query: %s" , sql );
311+ return Collections .emptyList ();
107312 }
108313 catch (Exception e ) {
109- log .error (e );
314+ log .error (e , "Unexpected error executing Pinot query: %s" , sql );
315+ return Collections .emptyList ();
110316 }
317+ }
318+
319+ /**
320+ * Represents metadata of an archive, including its ID, timestamp bounds, and message count.
321+ */
322+ private static final class ArchiveMeta
323+ {
324+ private final String id ;
325+ private final long lowerBound ;
326+ private final long upperBound ;
327+ private final long messageCount ;
111328
112- return Collections .emptyList ();
329+ ArchiveMeta (String id , long lowerBound , long upperBound , long messageCount )
330+ {
331+ this .id = requireNonNull (id , "id is null" );
332+ if (lowerBound > upperBound ) {
333+ throw new IllegalArgumentException (
334+ format ("Invalid archive bounds: lowerBound (%d) > upperBound (%d)" , lowerBound , upperBound ));
335+ }
336+ if (messageCount < 0 ) {
337+ throw new IllegalArgumentException (
338+ format ("Invalid message count: %d (must be >= 0)" , messageCount ));
339+ }
340+ this .lowerBound = lowerBound ;
341+ this .upperBound = upperBound ;
342+ this .messageCount = messageCount ;
343+ }
344+ }
345+
346+ /**
347+ * Represents a group of overlapping archives treated as one logical unit.
348+ */
349+ private static final class ArchiveGroup
350+ {
351+ long begin ;
352+ long end ;
353+ long count ;
354+ final List <ArchiveMeta > members = new ArrayList <>();
113355 }
114356}
0 commit comments