15
15
import java .util .List ;
16
16
import java .util .Random ;
17
17
18
+ import static org .junit .jupiter .api .Assertions .assertEquals ;
19
+
18
20
public class FlinkWindowPerformanceTest {
19
21
20
22
private List <Tuple2 <Long , TestInputData >> data ;
@@ -49,7 +51,7 @@ private List<Tuple2<Long, TestInputData>> generateData() {
49
51
return generatedData ;
50
52
}
51
53
52
- private void runWindowTest (String version ) throws Exception {
54
+ private List < Tuple2 < Long , TestInputData >> runWindowTest (String version ) throws Exception {
53
55
TestRunner runner = new TestRunner ();
54
56
var config = new TestFlinkConfig <>(data , TestInputData .class );
55
57
var source = new SourceStream <>(config );
@@ -75,20 +77,44 @@ private void runWindowTest(String version) throws Exception {
75
77
var result = stream .executeAndCollect (DATA_SIZE );
76
78
77
79
long endTime = System .nanoTime ();
78
- long duration = (endTime - startTime ) / 1_000_000 ; // Convert to milliseconds
80
+ long duration = (endTime - startTime ) / 1_000_000 ;
79
81
80
82
System .out .println ("Window function " + version + " execution time: " + duration + " ms" );
81
83
System .out .println ("Result size: " + result .size ());
84
+
85
+ return result ;
86
+ }
87
+
88
+ private void compareResults (List <Tuple2 <Long , TestInputData >> result1 , List <Tuple2 <Long , TestInputData >> result2 , String version1 , String version2 ) {
89
+ assertEquals (result1 .size (), result2 .size (), "Result sizes differ between " + version1 + " and " + version2 );
90
+
91
+ for (int i = 0 ; i < result1 .size (); i ++) {
92
+ Tuple2 <Long , TestInputData > r1 = result1 .get (i );
93
+ Tuple2 <Long , TestInputData > r2 = result2 .get (i );
94
+
95
+ assertEquals (r1 .f0 , r2 .f0 , "Timestamps differ at index " + i + " between " + version1 + " and " + version2 );
96
+ assertEquals (r1 .f1 .a , r2 .f1 .a , "Aggregated values differ at index " + i + " between " + version1 + " and " + version2 );
97
+ assertEquals (r1 .f1 .b , r2 .f1 .b , "Keys differ at index " + i + " between " + version1 + " and " + version2 );
98
+ }
99
+
100
+ System .out .println ("Results are identical between " + version1 + " and " + version2 );
82
101
}
83
102
84
103
@ Test
85
104
public void testWindowPerformance () throws Exception {
86
105
var rounds = 5 ;
87
106
while (rounds > 0 ) {
88
- runWindowTest ("v1" );
89
- runWindowTest ("v2" );
90
- runWindowTest ("v3" );
107
+ List <Tuple2 <Long , TestInputData >> resultV1 = runWindowTest ("v1" );
108
+ List <Tuple2 <Long , TestInputData >> resultV2 = runWindowTest ("v2" );
109
+ List <Tuple2 <Long , TestInputData >> resultV3 = runWindowTest ("v3" );
110
+
111
+ compareResults (resultV1 , resultV2 , "v1" , "v2" );
112
+ compareResults (resultV1 , resultV3 , "v1" , "v3" );
113
+ compareResults (resultV2 , resultV3 , "v2" , "v3" );
114
+
115
+ System .out .println ("Round " + (6 - rounds ) + " completed\n " );
91
116
rounds --;
92
117
}
93
118
}
119
+
94
120
}
0 commit comments