|
47 | 47 | import java.sql.Statement; |
48 | 48 | import java.util.Arrays; |
49 | 49 | import java.util.Locale; |
| 50 | +import java.util.concurrent.CountDownLatch; |
| 51 | +import java.util.concurrent.ExecutorService; |
| 52 | +import java.util.concurrent.Executors; |
| 53 | +import java.util.concurrent.Future; |
| 54 | +import java.util.concurrent.TimeUnit; |
| 55 | +import java.util.concurrent.atomic.AtomicInteger; |
50 | 56 | import java.util.stream.Collectors; |
51 | 57 |
|
52 | 58 | import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail; |
@@ -387,6 +393,97 @@ public void testPrivileges() throws SQLException { |
387 | 393 | } |
388 | 394 | } |
389 | 395 |
|
| 396 | + @Test |
| 397 | + public void testConcurrentCteQueries() throws Exception { |
| 398 | + final int threadCount = 10; |
| 399 | + final int queriesPerThread = 20; |
| 400 | + final AtomicInteger successCount = new AtomicInteger(0); |
| 401 | + final AtomicInteger failureCount = new AtomicInteger(0); |
| 402 | + final AtomicInteger totalCount = new AtomicInteger(0); |
| 403 | + final CountDownLatch startLatch = new CountDownLatch(1); |
| 404 | + final CountDownLatch finishLatch = new CountDownLatch(threadCount); |
| 405 | + |
| 406 | + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); |
| 407 | + |
| 408 | + // Create CTE query tasks |
| 409 | + Future<?>[] futures = new Future<?>[threadCount]; |
| 410 | + for (int i = 0; i < threadCount; i++) { |
| 411 | + final int threadId = i; |
| 412 | + futures[i] = |
| 413 | + executorService.submit( |
| 414 | + () -> { |
| 415 | + try { |
| 416 | + startLatch.await(); // Wait for all threads to be ready |
| 417 | + |
| 418 | + try (Connection connection = |
| 419 | + EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); |
| 420 | + Statement statement = connection.createStatement()) { |
| 421 | + statement.execute("USE testdb"); |
| 422 | + |
| 423 | + // Execute multiple CTE queries in each thread |
| 424 | + for (int j = 0; j < queriesPerThread; j++) { |
| 425 | + try { |
| 426 | + // Test different types of CTE queries |
| 427 | + String[] queries = { |
| 428 | + String.format( |
| 429 | + "WITH cte as %s (SELECT * FROM testtb WHERE voltage > 150) SELECT * FROM cte ORDER BY deviceid", |
| 430 | + cteKeywords[j % cteKeywords.length]), |
| 431 | + String.format( |
| 432 | + "WITH cte as %s (SELECT deviceid, avg(voltage) as avg_v FROM testtb GROUP BY deviceid) SELECT * FROM cte", |
| 433 | + cteKeywords[j % cteKeywords.length]), |
| 434 | + String.format( |
| 435 | + "WITH cte as %s (SELECT * FROM testtb WHERE time > 1000) SELECT count(*) as cnt FROM cte", |
| 436 | + cteKeywords[j % cteKeywords.length]) |
| 437 | + }; |
| 438 | + |
| 439 | + String query = queries[j % queries.length]; |
| 440 | + ResultSet resultSet = statement.executeQuery(query); |
| 441 | + |
| 442 | + // Verify results |
| 443 | + int rowCount = 0; |
| 444 | + while (resultSet.next()) { |
| 445 | + rowCount++; |
| 446 | + } |
| 447 | + totalCount.getAndAdd(rowCount); |
| 448 | + |
| 449 | + successCount.incrementAndGet(); |
| 450 | + |
| 451 | + } catch (SQLException e) { |
| 452 | + failureCount.incrementAndGet(); |
| 453 | + System.err.println( |
| 454 | + "Thread " + threadId + " query " + j + " failed: " + e.getMessage()); |
| 455 | + } |
| 456 | + } |
| 457 | + } |
| 458 | + } catch (Exception e) { |
| 459 | + failureCount.incrementAndGet(); |
| 460 | + System.err.println("Thread " + threadId + " failed: " + e.getMessage()); |
| 461 | + } finally { |
| 462 | + finishLatch.countDown(); |
| 463 | + } |
| 464 | + }); |
| 465 | + } |
| 466 | + |
| 467 | + // Start all threads at once |
| 468 | + startLatch.countDown(); |
| 469 | + |
| 470 | + // Wait for all threads to complete |
| 471 | + finishLatch.await(60, TimeUnit.SECONDS); |
| 472 | + |
| 473 | + // Shutdown executor |
| 474 | + executorService.shutdown(); |
| 475 | + boolean terminated = executorService.awaitTermination(10, TimeUnit.SECONDS); |
| 476 | + if (!terminated) { |
| 477 | + executorService.shutdownNow(); |
| 478 | + } |
| 479 | + |
| 480 | + // Verify results |
| 481 | + int totalQueries = threadCount * queriesPerThread; |
| 482 | + assertEquals("All queries should succeed", totalQueries, successCount.get()); |
| 483 | + assertEquals("No queries should fail", 0, failureCount.get()); |
| 484 | + assertEquals("Total query count should match", 340, totalCount.get()); |
| 485 | + } |
| 486 | + |
390 | 487 | private static void prepareData() { |
391 | 488 | try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); |
392 | 489 | Statement statement = connection.createStatement()) { |
|
0 commit comments