Skip to content

Commit aecd932

Browse files
committed
[SPARK-54478][SPARK-54479][SPARK-54480][SPARK-54484] Re-enable streaming tests for connect compat test CI
### What changes were proposed in this pull request? This PR proposes to re-enable streaming tests for connect compatibility test CI. ### Why are the changes needed? They were disabled due to failure, but I can't reproduce these failures in both local and CI after installing zstandard. Code change to trigger compatibility test CI against test branch + install zstandard: master...HeartSaVioR:spark:WIP-investigate-ss-spark-connect-compat-test-failures-master-and-4.0 Code change to re-enable these tests during reproducing: branch-4.0...HeartSaVioR:spark:branch-4.0-SC-213385 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA run with the above reproducer setup: https://github.com/HeartSaVioR/spark/actions/runs/19807973545/job/56745231698 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53266 from HeartSaVioR/reenable-streaming-connect-tests. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 7833e2f commit aecd932

File tree

4 files changed

+3
-16
lines changed

4 files changed

+3
-16
lines changed

python/pyspark/sql/streaming/readwriter.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,7 +1568,6 @@ def foreach(self, f: Union[Callable[[Row], None], "SupportsProcess"]) -> "DataSt
15681568
self._jwrite.foreach(jForeachWriter)
15691569
return self
15701570

1571-
# SPARK-54478: Reenable doctest
15721571
def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> "DataStreamWriter":
15731572
"""
15741573
Sets the output of the streaming query to be processed using the provided
@@ -1601,9 +1600,9 @@ def foreachBatch(self, func: Callable[["DataFrame", int], None]) -> "DataStreamW
16011600
... my_value = 100
16021601
... batch_df.collect()
16031602
...
1604-
>>> q = df.writeStream.foreachBatch(func).start() # doctest: +SKIP
1605-
>>> time.sleep(3) # doctest: +SKIP
1606-
>>> q.stop() # doctest: +SKIP
1603+
>>> q = df.writeStream.foreachBatch(func).start()
1604+
>>> time.sleep(3)
1605+
>>> q.stop()
16071606
>>> # if in Spark Connect, my_value = -1, else my_value = 100
16081607
"""
16091608
from py4j.java_gateway import java_import

python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,13 @@
1515
# limitations under the License.
1616
#
1717
import unittest
18-
import os
1918

2019
from pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state import (
2120
GroupedApplyInPandasWithStateTestsMixin,
2221
)
2322
from pyspark.testing.connectutils import ReusedConnectTestCase
2423

2524

26-
@unittest.skipIf(
27-
os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54479: To be reenabled"
28-
)
2925
class GroupedApplyInPandasWithStateTests(
3026
GroupedApplyInPandasWithStateTestsMixin, ReusedConnectTestCase
3127
):

python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#
1717

1818
import unittest
19-
import os
2019

2120
from pyspark.sql.tests.streaming.test_streaming_foreach_batch import StreamingTestsForeachBatchMixin
2221
from pyspark.testing.connectutils import ReusedConnectTestCase, should_test_connect
@@ -26,9 +25,6 @@
2625
from pyspark.errors.exceptions.connect import StreamingPythonRunnerInitializationException
2726

2827

29-
@unittest.skipIf(
30-
os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54480: To be reenabled"
31-
)
3228
class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedConnectTestCase):
3329
def test_streaming_foreach_batch_propagates_python_errors(self):
3430
super().test_streaming_foreach_batch_propagates_python_errors()

python/pyspark/sql/tests/test_python_streaming_datasource.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,6 @@ def streamWriter(self, schema, overwrite):
139139

140140
return TestDataSource
141141

142-
@unittest.skipIf(
143-
os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "SPARK-54484: To be reenabled"
144-
)
145142
def test_stream_reader(self):
146143
self.spark.dataSource.register(self._get_test_data_source())
147144
df = self.spark.readStream.format("TestDataSource").load()
@@ -216,7 +213,6 @@ def streamReader(self, schema):
216213

217214
assertDataFrameEqual(df, expected_data)
218215

219-
@unittest.skipIf(os.environ.get("SPARK_SKIP_CONNECT_COMPAT_TESTS") == "1", "To be reenabled")
220216
def test_simple_stream_reader(self):
221217
class SimpleStreamReader(SimpleDataSourceStreamReader):
222218
def initialOffset(self):

0 commit comments

Comments
 (0)