diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 49b17824c3c72..4b8dc33c6bf52 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -22,6 +22,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.file.Files; import com.google.common.io.Closeables; @@ -54,7 +55,7 @@ public void create() throws IOException { localDirs[i] = JavaUtils.createDirectory(root, "spark").getAbsolutePath(); for (int p = 0; p < subDirsPerLocalDir; p ++) { - new File(localDirs[i], String.format("%02x", p)).mkdirs(); + Files.createDirectories(new File(localDirs[i], String.format("%02x", p)).toPath()); } } } diff --git a/common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala b/common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala index 140de836622f4..5120a229fe186 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala @@ -95,7 +95,7 @@ private[spark] object IvyTestUtils { className: String, packageName: String): Seq[(String, File)] = { val rFilesDir = new File(dir, "R" + File.separator + "pkg") - new File(rFilesDir, "R").mkdirs() + SparkFileUtils.createDirectory(new File(rFilesDir, "R")) val contents = s"""myfunc <- function(x) { | SparkR:::callJStatic("$packageName.$className", "myFunc", x) @@ -150,11 +150,11 @@ private[spark] object IvyTestUtils { useIvyLayout: Boolean): File = { if (useIvyLayout) { val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true) - ivyXmlPath.mkdirs() + SparkFileUtils.createDirectory(ivyXmlPath) createIvyDescriptor(ivyXmlPath, artifact, dependencies) } else { val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout) - pomPath.mkdirs() + SparkFileUtils.createDirectory(pomPath) createPom(pomPath, artifact, dependencies) } } @@ -293,13 +293,13 @@ private[spark] object IvyTestUtils { // Where the root of the repository exists, and what Ivy will search in val tempPath = tempDir.getOrElse(SparkFileUtils.createTempDir()) // Create directory if it doesn't exist - tempPath.mkdirs() + SparkFileUtils.createDirectory(tempPath) // Where to create temporary class files and such val root = new File(tempPath, tempPath.hashCode().toString) - root.mkdirs() + SparkFileUtils.createDirectory(root) try { val jarPath = pathFromCoordinate(artifact, tempPath, "jar", useIvyLayout) - jarPath.mkdirs() + SparkFileUtils.createDirectory(jarPath) val className = "MyLib" val javaClass = createJavaClass(root, className, artifact.groupId) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 6f345e069ff78..ccced0e1f0c14 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1899,7 +1899,7 @@ abstract class AvroSuite withTempPath { tempDir => val tempEmptyDir = s"$tempDir/sqlOverwrite" // Create a temp directory for table that will be overwritten - new File(tempEmptyDir).mkdirs() + Utils.createDirectory(tempEmptyDir) spark.sql( s""" |CREATE TEMPORARY VIEW episodes diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 212693f6e02cc..a1264af021aa1 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -38,6 +38,7 @@ import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark._ import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.streaming.kafka010.mocks.MockTime +import org.apache.spark.util.Utils class KafkaRDDSuite extends SparkFunSuite { @@ -91,7 +92,7 @@ class KafkaRDDSuite extends SparkFunSuite { val logs = new Pool[TopicPartition, UnifiedLog]() val logDir = kafkaTestUtils.brokerLogDir val dir = new File(logDir, topic + "-" + partition) - dir.mkdirs() + Utils.createDirectory(dir) val logProps = new ju.Properties() logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, java.lang.Float.valueOf(0.1f)) diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 5d996381a485e..ac2c1f73bd096 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -153,7 +153,7 @@ private[deploy] object RPackageUtils extends Logging { if (verbose) { print(log"Creating directory: ${MDC(PATH, dir)}", printStream) } - dir.mkdirs + Utils.createDirectory(dir) } else { val inStream = jar.getInputStream(entry) val outPath = new File(tempDir, entryPath) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index ca0e024ad1aed..22e4c83440f61 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -145,7 +145,7 @@ private[deploy] class DriverRunner( */ private def createWorkingDirectory(): File = { val driverDir = new File(workDir, driverId) - if (!driverDir.exists() && !driverDir.mkdirs()) { + if (!driverDir.exists() && !Utils.createDirectory(driverDir)) { throw new IOException("Failed to create directory " + driverDir) } driverDir diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index b2ec23887a400..488bc6c791311 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -600,7 +600,7 @@ private[deploy] class Worker( // Create the executor's working directory val executorDir = new File(workDir, appId + "/" + execId) - if (!executorDir.mkdirs()) { + if (!Utils.createDirectory(executorDir)) { throw new IOException("Failed to create directory " + executorDir) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 126c92e4cb656..b2b6d2a2959da 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -90,7 +90,7 @@ private[spark] class PipedRDD[T: ClassTag]( val currentDir = new File(".") logDebug("currentDir = " + currentDir.getAbsolutePath()) val taskDirFile = new File(taskDirectory) - taskDirFile.mkdirs() + Utils.createDirectory(taskDirFile) try { val tasksDirFilter = new NotEqualsFileNameFilter("tasks") diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 72d8dc0b19d21..94a0ea1ecaef0 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -316,7 +316,7 @@ private[spark] class DiskBlockManager( throw SparkCoreErrors.failToCreateDirectoryError(dirToCreate.getAbsolutePath, maxAttempts) } try { - dirToCreate.mkdirs() + Utils.createDirectory(dirToCreate) Files.setPosixFilePermissions( dirToCreate.toPath, PosixFilePermissions.fromString("rwxrwx---")) if (dirToCreate.exists()) { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2d3ae9f171fef..8f60857eb691b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -458,7 +458,7 @@ private[spark] object Utils * to work around a security issue, see also SPARK-38631. */ private def unTarUsingJava(source: File, dest: File): Unit = { - if (!dest.mkdirs && !dest.isDirectory) { + if (!Utils.createDirectory(dest) && !dest.isDirectory) { throw new IOException(s"Mkdirs failed to create $dest") } else { try { @@ -810,7 +810,7 @@ private[spark] object Utils configuredLocalDirs.flatMap { root => try { val rootDir = new File(root) - if (rootDir.exists || rootDir.mkdirs()) { + if (rootDir.exists || Utils.createDirectory(rootDir)) { val dir = createTempDir(root) chmod700(dir) Some(dir.getAbsolutePath) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 9e5859feefb59..a88240bc612a4 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -455,7 +455,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test("SPARK-22585 addJar argument without scheme is interpreted literally without url decoding") { withTempDir { dir => val tmpDir = new File(dir, "host%3A443") - tmpDir.mkdirs() + Utils.createDirectory(tmpDir) val tmpJar = File.createTempFile("t%2F", ".jar", tmpDir) sc = new SparkContext("local", "test") diff --git a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala index 5eb22032a5e80..ebb8609e8c13a 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala @@ -20,6 +20,7 @@ package org.apache.spark.benchmark import java.io.{File, FileOutputStream, OutputStream} import org.apache.spark.internal.config.Tests.IS_TESTING +import org.apache.spark.util.Utils /** * A base class for generate benchmark results to a file. @@ -60,7 +61,7 @@ abstract class BenchmarkBase { // scalastyle:off println println(s"Creating ${dir.getAbsolutePath} for benchmark results.") // scalastyle:on println - dir.mkdirs() + Utils.createDirectory(dir) } val file = new File(dir, resultFileName) if (!file.exists()) { diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index 77f5268f79cae..67c1abdb0d55c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -147,12 +147,12 @@ class RPackageUtilsSuite Utils.tryWithSafeFinally { IvyTestUtils.writeFile(tempDir, "test.R", "abc") val fakeSparkRDir = new File(tempDir, "SparkR") - assert(fakeSparkRDir.mkdirs()) + assert(Utils.createDirectory(fakeSparkRDir)) IvyTestUtils.writeFile(fakeSparkRDir, "abc.R", "abc") IvyTestUtils.writeFile(fakeSparkRDir, "DESCRIPTION", "abc") IvyTestUtils.writeFile(tempDir, "package.zip", "abc") // fake zip file :) val fakePackageDir = new File(tempDir, "packageTest") - assert(fakePackageDir.mkdirs()) + assert(Utils.createDirectory(fakePackageDir)) IvyTestUtils.writeFile(fakePackageDir, "def.R", "abc") IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc") val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip") diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index bd34e6f2bba3d..1bf7d875c653f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -904,7 +904,7 @@ class SparkSubmitSuite // compile a small jar containing a class that will be called from R code. withTempDir { tempDir => val srcDir = new File(tempDir, "sparkrtest") - srcDir.mkdirs() + Utils.createDirectory(srcDir) val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath, """package sparkrtest; | diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 4ac919dd9e6a6..7cfcfe0e4cf30 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -220,7 +220,7 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with if (regenerateGoldenFiles) { FileUtils.deleteDirectory(expRoot) - expRoot.mkdirs() + Utils.createDirectory(expRoot) } // run a bunch of characterization tests -- just verify the behavior is the same as what is saved diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index f3bae2066e146..ff5d314d1688a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -383,7 +383,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with P // Create the executor's working directory val executorDir = new File(worker.workDir, appId + "/" + execId) - if (!executorDir.exists && !executorDir.mkdirs()) { + if (!executorDir.exists && !Utils.createDirectory(executorDir)) { throw new IOException("Failed to create directory " + executorDir) } executorDir.setLastModified(System.currentTimeMillis - (1000 * 120)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 5a8722a55ed76..4c6b7f25e3c32 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -207,7 +207,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local // compile a small jar containing an exception that will be thrown on an executor. val tempDir = Utils.createTempDir() val srcDir = new File(tempDir, "repro/") - srcDir.mkdirs() + Utils.createDirectory(srcDir) val excSource = new JavaSourceFromString(new File(srcDir, "MyException").toURI.getPath, """package repro; | diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala index 4b22ec334e84b..3a30444d0aab0 100644 --- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala @@ -29,7 +29,7 @@ class LocalDirsSuite extends SparkFunSuite with LocalRootDirsTest { private def assumeNonExistentAndNotCreatable(f: File): Unit = { try { - assume(!f.exists() && !f.mkdirs()) + assume(!f.exists() && !Utils.createDirectory(f)) } finally { Utils.deleteRecursively(f) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 56d7b7ff6a09e..ca22329f60bbf 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -129,12 +129,12 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { reduceId: Int, blockId: String): AppShufflePartitionInfo = { val dataFile = ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager, partitionId, reduceId) - dataFile.getParentFile.mkdirs() + Utils.createDirectory(dataFile.getParentFile) val indexFile = ShuffleTestAccessor.getMergedShuffleIndexFile(mergeManager, partitionId, reduceId) - indexFile.getParentFile.mkdirs() + Utils.createDirectory(indexFile.getParentFile) val metaFile = ShuffleTestAccessor.getMergedShuffleMetaFile(mergeManager, partitionId, reduceId) - metaFile.getParentFile.mkdirs() + Utils.createDirectory(metaFile.getParentFile) val partitionInfo = ShuffleTestAccessor.getOrCreateAppShufflePartitionInfo( mergeManager, partitionId, reduceId, blockId) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/TransformWithStateConnectSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/TransformWithStateConnectSuite.scala index 310b50dac1cc3..f113f274d7124 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/TransformWithStateConnectSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/TransformWithStateConnectSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{ListState, MapState, OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeMode, TimerValues, TTLConfig, ValueState} import org.apache.spark.sql.types._ +import org.apache.spark.util.SparkFileUtils case class InputRowForConnectTest(key: String, value: String) case class OutputRowForConnectTest(key: String, value: String) @@ -494,7 +495,7 @@ class TransformWithStateConnectSuite val file = Paths.get(inputPath).toFile val parentDir = file.getParentFile if (parentDir != null && !parentDir.exists()) { - parentDir.mkdirs() + SparkFileUtils.createDirectory(parentDir) } val writer = new BufferedWriter(new FileWriter(inputPath)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 562a57aafbd41..636103f58330f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -335,7 +335,7 @@ class RocksDBFileManager( versionToRocksDBFiles.keySet().removeIf(_._1 >= version) val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) - localDir.mkdirs() + Utils.createDirectory(localDir) // Since we cleared the local dir, we should also clear the local file mapping rocksDBFileMapping.clear() RocksDBCheckpointMetadata(Seq.empty, 0) @@ -828,7 +828,7 @@ class RocksDBFileManager( private def getImmutableFilesFromVersionZip( version: Long, checkpointUniqueId: Option[String] = None): Seq[RocksDBImmutableFile] = { Utils.deleteRecursively(localTempDir) - localTempDir.mkdirs() + Utils.createDirectory(localTempDir) Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId), localTempDir) val metadataFile = localMetadataFile(localTempDir) val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala index df6fc50dc59db..f689c88a1cac2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExpressionsSchemaSuite.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.tags.ExtendedSQLTest +import org.apache.spark.util.Utils // scalastyle:off line.size.limit /** @@ -147,7 +148,7 @@ class ExpressionsSchemaSuite extends QueryTest with SharedSparkSession { val goldenOutput = (header ++ outputBuffer).mkString("\n") val parent = resultFile.getParentFile if (!parent.exists()) { - assert(parent.mkdirs(), "Could not create directory: " + parent) + assert(Utils.createDirectory(parent), "Could not create directory: " + parent) } stringToFile(resultFile, goldenOutput) // scalastyle:off println diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ICUCollationsMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ICUCollationsMapSuite.scala index 42d486bd75454..c9f0b9e54285c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ICUCollationsMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ICUCollationsMapSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile, CollationFactory} +import org.apache.spark.util.Utils // scalastyle:off line.size.limit /** @@ -54,7 +55,7 @@ class ICUCollationsMapSuite extends SparkFunSuite { } val parent = collationsMapFile.getParentFile if (!parent.exists()) { - assert(parent.mkdirs(), "Could not create directory: " + parent) + assert(Utils.createDirectory(parent), "Could not create directory: " + parent) } stringToFile(collationsMapFile, goldenOutput) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index ad424b3a7cc76..bf6aca29be75e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec, ValidateRequirements} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.tags.ExtendedSQLTest +import org.apache.spark.util.Utils // scalastyle:off line.size.limit /** @@ -123,7 +124,7 @@ trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite { if (!foundMatch) { FileUtils.deleteDirectory(dir) - assert(dir.mkdirs()) + assert(Utils.createDirectory(dir)) val file = new File(dir, "simplified.txt") FileUtils.writeStringToFile(file, simplified, StandardCharsets.UTF_8) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 575a4ae69d1a9..e4fe46d08fa78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -377,7 +377,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper val resultFile = new File(testCase.resultFile) val parent = resultFile.getParentFile if (!parent.exists()) { - assert(parent.mkdirs(), "Could not create directory: " + parent) + assert(Utils.createDirectory(parent), "Could not create directory: " + parent) } stringToFile(resultFile, goldenOutput) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala index c1246a167b8cc..d83fed4bf9a7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, strin import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.TestSparkSession import org.apache.spark.tags.ExtendedSQLTest +import org.apache.spark.util.Utils /** * End-to-end tests to check TPCDS query results. @@ -122,7 +123,7 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp } val parent = goldenFile.getParentFile if (!parent.exists()) { - assert(parent.mkdirs(), "Could not create directory: " + parent) + assert(Utils.createDirectory(parent), "Could not create directory: " + parent) } stringToFile(goldenFile, goldenOutput) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 94a0501b74d47..4faeae51ca584 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -400,7 +400,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession { Seq("p1=1/p2=2/p3=3/file1", "p1=1/p2=3/p3=3/file1").foreach { fileName => val file = new File(tempDir, fileName) - assert(file.getParentFile.exists() || file.getParentFile.mkdirs()) + assert(file.getParentFile.exists() || Utils.createDirectory(file.getParentFile)) util.stringToFile(file, fileName) } @@ -682,7 +682,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession { files.foreach { case (name, size) => val file = new File(tempDir, name) - assert(file.getParentFile.exists() || file.getParentFile.mkdirs()) + assert(file.getParentFile.exists() || Utils.createDirectory(file.getParentFile)) util.stringToFile(file, "*" * size) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index d63a1a7a2248b..0bc0dfeff05c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -4065,7 +4065,7 @@ abstract class JsonSuite // Test scan with partitions. withTempDir { dir => - new File(dir, "a=1/b=2/").mkdirs() + Utils.createDirectory(new File(dir, "a=1/b=2/")) Files.write(new File(dir, "a=1/b=2/file.json").toPath, content) checkAnswer( spark.read.format("json").option("singleVariantColumn", "var") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala index ea839b8e1ef10..4de62156e3b90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils // The data where the partitioning key exists only in the directory structure. case class OrcParData(intField: Int, stringField: String) @@ -56,7 +57,7 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { new File(parent, child) } - assert(partDir.mkdirs(), s"Couldn't create directory $partDir") + assert(Utils.createDirectory(partDir), s"Couldn't create directory $partDir") partDir } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index d108803d43e44..b7b082e329658 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils /** * A helper trait that provides convenient facilities for Parquet testing. @@ -105,7 +106,7 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { new File(parent, child) } - assert(partDir.mkdirs(), s"Couldn't create directory $partDir") + assert(Utils.createDirectory(partDir), s"Couldn't create directory $partDir") partDir } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala index 4314e0d0ee380..5cf1dea7d073c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala @@ -68,7 +68,7 @@ class RowQueueSuite extends SparkFunSuite with EncryptionFunSuite { encryptionTest("disk queue") { conf => val serManager = createSerializerManager(conf) val dir = Utils.createTempDir().getCanonicalFile - dir.mkdirs() + Utils.createDirectory(dir) val queue = DiskRowQueue(new File(dir, "buffer"), 1, serManager) val row = new UnsafeRow(1) row.pointTo(new Array[Byte](16), 16) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index b18d8f816e301..55bee7d4713de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -264,9 +264,9 @@ private class PartitionFileExistCommitProtocol( override def setupJob(jobContext: JobContext): Unit = { super.setupJob(jobContext) val stagingDir = new File(new Path(path).toUri.getPath, s".spark-staging-$jobId") - stagingDir.mkdirs() + Utils.createDirectory(stagingDir) val stagingPartDir = new File(stagingDir, "p1=2") - stagingPartDir.mkdirs() + Utils.createDirectory(stagingPartDir) val conflictTaskFile = new File(stagingPartDir, s"part-00000-$jobId.c000.snappy.parquet") conflictTaskFile.createNewFile() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index a753da116924d..a6a044c302ce3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -106,7 +106,7 @@ abstract class FileStreamSourceTest override def addData(source: FileStreamSource): Unit = { val tempFile = Utils.tempFileWith(new File(tmp, tmpFilePrefix)) val finalFile = new File(src, tempFile.getName) - src.mkdirs() + Utils.createDirectory(src) require(stringToFile(tempFile, content).renameTo(finalFile)) logInfo(s"Written text '$content' to file $finalFile") } @@ -127,7 +127,7 @@ abstract class FileStreamSourceTest def writeToFile(df: DataFrame, src: File, tmp: File): Unit = { val tmpDir = Utils.tempFileWith(new File(tmp, "orc")) df.write.orc(tmpDir.getCanonicalPath) - src.mkdirs() + Utils.createDirectory(src) tmpDir.listFiles().foreach { f => f.renameTo(new File(src, s"${f.getName}")) } @@ -149,7 +149,7 @@ abstract class FileStreamSourceTest def writeToFile(df: DataFrame, src: File, tmp: File): Unit = { val tmpDir = Utils.tempFileWith(new File(tmp, "parquet")) df.write.parquet(tmpDir.getCanonicalPath) - src.mkdirs() + Utils.createDirectory(src) tmpDir.listFiles().foreach { f => f.renameTo(new File(src, s"${f.getName}")) } @@ -664,7 +664,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { withTempDirs { case (baseSrc, tmp) => withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { val src = new File(baseSrc, "type=X") - src.mkdirs() + Utils.createDirectory(src) // Add a file so that we can infer its schema stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") @@ -1451,7 +1451,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { test("explain") { withTempDirs { case (src, tmp) => - src.mkdirs() + Utils.createDirectory(src) val df = spark.readStream.format("text").load(src.getCanonicalPath).map(_.toString + "-x") // Test `explain` not throwing errors @@ -1500,7 +1500,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { withTempDirs { case (root, tmp) => val src = new File(root, "a=1") - src.mkdirs() + Utils.createDirectory(src) (1 to numFiles).map { _.toString }.foreach { i => val tempFile = Utils.tempFileWith(new File(tmp, "text")) @@ -1924,8 +1924,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest { withTempDirs { case (dir, tmp) => val sourceDir1 = new File(dir, "source1") val sourceDir2 = new File(dir, "source2") - sourceDir1.mkdirs() - sourceDir2.mkdirs() + Utils.createDirectory(sourceDir1) + Utils.createDirectory(sourceDir2) val source1 = createFileStream("text", s"${sourceDir1.getCanonicalPath}") val source2 = createFileStream("text", s"${sourceDir2.getCanonicalPath}") @@ -2595,7 +2595,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val tempFile = Utils.tempFileWith(new File(tmp, "text")) val finalFile = new File(src, tempFile.getName) require(!src.exists(), s"$src exists, dir: ${src.isDirectory}, file: ${src.isFile}") - require(src.mkdirs(), s"Cannot create $src") + require(Utils.createDirectory(src), s"Cannot create $src") require(src.isDirectory(), s"$src is not a directory") require(stringToFile(tempFile, content).renameTo(finalFile)) finalFile diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 200603cae5866..6ea610857b077 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -567,7 +567,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { test("MemorySink can recover from a checkpoint in Complete Mode") { val checkpointLoc = newMetadataDir val checkpointDir = new File(checkpointLoc, "offsets") - checkpointDir.mkdirs() + Utils.createDirectory(checkpointDir) assert(checkpointDir.exists()) testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = true) } @@ -575,7 +575,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { test("SPARK-18927: MemorySink can recover from a checkpoint provided in conf in Complete Mode") { val checkpointLoc = newMetadataDir val checkpointDir = new File(checkpointLoc, "offsets") - checkpointDir.mkdirs() + Utils.createDirectory(checkpointDir) assert(checkpointDir.exists()) withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointLoc) { testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = false) @@ -588,7 +588,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { val df = ms.toDF().toDF("a") val checkpointLoc = newMetadataDir val checkpointDir = new File(checkpointLoc, "offsets") - checkpointDir.mkdirs() + Utils.createDirectory(checkpointDir) assert(checkpointDir.exists()) val e = intercept[AnalysisException] { diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 9d9b6f1c7b0e1..680c16cac74d5 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -75,6 +75,7 @@ import org.apache.spark.internal.SparkLoggerFactory; import org.apache.spark.internal.LogKeys; import org.apache.spark.internal.MDC; +import org.apache.spark.util.Utils; import static org.apache.hadoop.hive.conf.SystemVariables.ENV_PREFIX; import static org.apache.hadoop.hive.conf.SystemVariables.HIVECONF_PREFIX; @@ -304,7 +305,7 @@ public void setOperationLogSessionDir(File operationLogRootDir) { if (!operationLogRootDir.exists()) { LOG.warn("The operation log root directory is removed, recreating: {}", MDC.of(LogKeys.PATH$.MODULE$, operationLogRootDir.getAbsolutePath())); - if (!operationLogRootDir.mkdirs()) { + if (!Utils.createDirectory(operationLogRootDir)) { LOG.warn("Unable to create operation log root directory: {}", MDC.of(LogKeys.PATH$.MODULE$, operationLogRootDir.getAbsolutePath())); } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java index 3f60fd00b82a7..83d4c7a3622f2 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/SessionManager.java @@ -43,6 +43,7 @@ import org.apache.spark.internal.SparkLoggerFactory; import org.apache.spark.internal.LogKeys; import org.apache.spark.internal.MDC; +import org.apache.spark.util.Utils; /** * SessionManager. @@ -126,7 +127,7 @@ private void initOperationLogRootDir() { } if (!operationLogRootDir.exists()) { - if (!operationLogRootDir.mkdirs()) { + if (!Utils.createDirectory(operationLogRootDir)) { LOG.warn("Unable to create operation log root directory: {}", MDC.of(LogKeys.PATH$.MODULE$, operationLogRootDir.getAbsolutePath())); isOperationLogEnabled = false; diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala index 62d97772bcbc1..138f979d7bc3d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.scheduler.SparkListenerJobStart import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.util.Utils import org.apache.spark.util.kvstore.InMemoryStore class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter { @@ -39,7 +40,7 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter { val tmpDirName = System.getProperty("java.io.tmpdir") val tmpDir = new File(tmpDirName) if (!tmpDir.exists()) { - tmpDir.mkdirs() + Utils.createDirectory(tmpDir) } super.beforeAll() } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala index 7cf17a089ea6b..806eabc96fe3f 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.scheduler.SparkListenerJobStart import org.apache.spark.sql.hive.thriftserver._ import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.util.Utils import org.apache.spark.util.kvstore.InMemoryStore @@ -39,7 +40,7 @@ class ThriftServerPageSuite extends SparkFunSuite with BeforeAndAfter { val tmpDirName = System.getProperty("java.io.tmpdir") val tmpDir = new File(tmpDirName) if (!tmpDir.exists()) { - tmpDir.mkdirs() + Utils.createDirectory(tmpDir) } super.beforeAll() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 58ca4a4ad1cf3..6581d39c707e7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -142,7 +142,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { val outDir = new File(targetDir) if (!outDir.exists()) { - outDir.mkdirs() + Utils.createDirectory(outDir) } // propagate exceptions up to the caller of getFileFromUrl diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index e2f0040afe57c..2af4d01fcfb80 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1068,14 +1068,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto withTempPaths(numPaths = 2) { case Seq(dir1, dir2) => val partDir1 = new File(new File(dir1, "ds=2008-04-09"), "hr=11") val file1 = new File(partDir1, "data") - file1.getParentFile.mkdirs() + Utils.createDirectory(file1.getParentFile) Utils.tryWithResource(new PrintWriter(file1)) { writer => writer.write("1,a") } val partDir2 = new File(new File(dir2, "ds=2008-04-09"), "hr=12") val file2 = new File(partDir2, "data") - file2.getParentFile.mkdirs() + Utils.createDirectory(file2.getParentFile) Utils.tryWithResource(new PrintWriter(file2)) { writer => writer.write("1,a") } @@ -1670,14 +1670,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto withTempPaths(numPaths = 2) { case Seq(dir1, dir2) => val partDir1 = new File(new File(dir1, "ds=2008-04-09"), "hr=11") val file1 = new File(partDir1, "data") - file1.getParentFile.mkdirs() + Utils.createDirectory(file1.getParentFile) Utils.tryWithResource(new PrintWriter(file1)) { writer => writer.write("1,a") } val partDir2 = new File(new File(dir2, "ds=2008-04-09"), "hr=12") val file2 = new File(partDir2, "data") - file2.getParentFile.mkdirs() + Utils.createDirectory(file2.getParentFile) Utils.tryWithResource(new PrintWriter(file2)) { writer => writer.write("1,a") }