Skip to content

[SPARK-52615][CORE] Replace File.mkdirs with Utils.createDirectory #51322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;

import com.google.common.io.Closeables;

Expand Down Expand Up @@ -54,7 +55,7 @@ public void create() throws IOException {
localDirs[i] = JavaUtils.createDirectory(root, "spark").getAbsolutePath();

for (int p = 0; p < subDirsPerLocalDir; p ++) {
new File(localDirs[i], String.format("%02x", p)).mkdirs();
Files.createDirectories(new File(localDirs[i], String.format("%02x", p)).toPath());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private[spark] object IvyTestUtils {
className: String,
packageName: String): Seq[(String, File)] = {
val rFilesDir = new File(dir, "R" + File.separator + "pkg")
new File(rFilesDir, "R").mkdirs()
SparkFileUtils.createDirectory(new File(rFilesDir, "R"))
val contents =
s"""myfunc <- function(x) {
| SparkR:::callJStatic("$packageName.$className", "myFunc", x)
Expand Down Expand Up @@ -150,11 +150,11 @@ private[spark] object IvyTestUtils {
useIvyLayout: Boolean): File = {
if (useIvyLayout) {
val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true)
ivyXmlPath.mkdirs()
SparkFileUtils.createDirectory(ivyXmlPath)
createIvyDescriptor(ivyXmlPath, artifact, dependencies)
} else {
val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout)
pomPath.mkdirs()
SparkFileUtils.createDirectory(pomPath)
createPom(pomPath, artifact, dependencies)
}
}
Expand Down Expand Up @@ -293,13 +293,13 @@ private[spark] object IvyTestUtils {
// Where the root of the repository exists, and what Ivy will search in
val tempPath = tempDir.getOrElse(SparkFileUtils.createTempDir())
// Create directory if it doesn't exist
tempPath.mkdirs()
SparkFileUtils.createDirectory(tempPath)
// Where to create temporary class files and such
val root = new File(tempPath, tempPath.hashCode().toString)
root.mkdirs()
SparkFileUtils.createDirectory(root)
try {
val jarPath = pathFromCoordinate(artifact, tempPath, "jar", useIvyLayout)
jarPath.mkdirs()
SparkFileUtils.createDirectory(jarPath)
val className = "MyLib"

val javaClass = createJavaClass(root, className, artifact.groupId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1899,7 +1899,7 @@ abstract class AvroSuite
withTempPath { tempDir =>
val tempEmptyDir = s"$tempDir/sqlOverwrite"
// Create a temp directory for table that will be overwritten
new File(tempEmptyDir).mkdirs()
Utils.createDirectory(tempEmptyDir)
spark.sql(
s"""
|CREATE TEMPORARY VIEW episodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
import org.apache.spark._
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.streaming.kafka010.mocks.MockTime
import org.apache.spark.util.Utils

class KafkaRDDSuite extends SparkFunSuite {

Expand Down Expand Up @@ -91,7 +92,7 @@ class KafkaRDDSuite extends SparkFunSuite {
val logs = new Pool[TopicPartition, UnifiedLog]()
val logDir = kafkaTestUtils.brokerLogDir
val dir = new File(logDir, topic + "-" + partition)
dir.mkdirs()
Utils.createDirectory(dir)
val logProps = new ju.Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, java.lang.Float.valueOf(0.1f))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private[deploy] object RPackageUtils extends Logging {
if (verbose) {
print(log"Creating directory: ${MDC(PATH, dir)}", printStream)
}
dir.mkdirs
Utils.createDirectory(dir)
} else {
val inStream = jar.getInputStream(entry)
val outPath = new File(tempDir, entryPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private[deploy] class DriverRunner(
*/
private def createWorkingDirectory(): File = {
val driverDir = new File(workDir, driverId)
if (!driverDir.exists() && !driverDir.mkdirs()) {
if (!driverDir.exists() && !Utils.createDirectory(driverDir)) {
throw new IOException("Failed to create directory " + driverDir)
}
driverDir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ private[deploy] class Worker(

// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
if (!Utils.createDirectory(executorDir)) {
throw new IOException("Failed to create directory " + executorDir)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[spark] class PipedRDD[T: ClassTag](
val currentDir = new File(".")
logDebug("currentDir = " + currentDir.getAbsolutePath())
val taskDirFile = new File(taskDirectory)
taskDirFile.mkdirs()
Utils.createDirectory(taskDirFile)

try {
val tasksDirFilter = new NotEqualsFileNameFilter("tasks")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private[spark] class DiskBlockManager(
throw SparkCoreErrors.failToCreateDirectoryError(dirToCreate.getAbsolutePath, maxAttempts)
}
try {
dirToCreate.mkdirs()
Utils.createDirectory(dirToCreate)
Files.setPosixFilePermissions(
dirToCreate.toPath, PosixFilePermissions.fromString("rwxrwx---"))
if (dirToCreate.exists()) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ private[spark] object Utils
* to work around a security issue, see also SPARK-38631.
*/
private def unTarUsingJava(source: File, dest: File): Unit = {
if (!dest.mkdirs && !dest.isDirectory) {
if (!Utils.createDirectory(dest) && !dest.isDirectory) {
throw new IOException(s"Mkdirs failed to create $dest")
} else {
try {
Expand Down Expand Up @@ -810,7 +810,7 @@ private[spark] object Utils
configuredLocalDirs.flatMap { root =>
try {
val rootDir = new File(root)
if (rootDir.exists || rootDir.mkdirs()) {
if (rootDir.exists || Utils.createDirectory(rootDir)) {
val dir = createTempDir(root)
chmod700(dir)
Some(dir.getAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("SPARK-22585 addJar argument without scheme is interpreted literally without url decoding") {
withTempDir { dir =>
val tmpDir = new File(dir, "host%3A443")
tmpDir.mkdirs()
Utils.createDirectory(tmpDir)
val tmpJar = File.createTempFile("t%2F", ".jar", tmpDir)

sc = new SparkContext("local", "test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.benchmark
import java.io.{File, FileOutputStream, OutputStream}

import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.util.Utils

/**
* A base class for generate benchmark results to a file.
Expand Down Expand Up @@ -60,7 +61,7 @@ abstract class BenchmarkBase {
// scalastyle:off println
println(s"Creating ${dir.getAbsolutePath} for benchmark results.")
// scalastyle:on println
dir.mkdirs()
Utils.createDirectory(dir)
}
val file = new File(dir, resultFileName)
if (!file.exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ class RPackageUtilsSuite
Utils.tryWithSafeFinally {
IvyTestUtils.writeFile(tempDir, "test.R", "abc")
val fakeSparkRDir = new File(tempDir, "SparkR")
assert(fakeSparkRDir.mkdirs())
assert(Utils.createDirectory(fakeSparkRDir))
IvyTestUtils.writeFile(fakeSparkRDir, "abc.R", "abc")
IvyTestUtils.writeFile(fakeSparkRDir, "DESCRIPTION", "abc")
IvyTestUtils.writeFile(tempDir, "package.zip", "abc") // fake zip file :)
val fakePackageDir = new File(tempDir, "packageTest")
assert(fakePackageDir.mkdirs())
assert(Utils.createDirectory(fakePackageDir))
IvyTestUtils.writeFile(fakePackageDir, "def.R", "abc")
IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ class SparkSubmitSuite
// compile a small jar containing a class that will be called from R code.
withTempDir { tempDir =>
val srcDir = new File(tempDir, "sparkrtest")
srcDir.mkdirs()
Utils.createDirectory(srcDir)
val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath,
"""package sparkrtest;
|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with

if (regenerateGoldenFiles) {
FileUtils.deleteDirectory(expRoot)
expRoot.mkdirs()
Utils.createDirectory(expRoot)
}

// run a bunch of characterization tests -- just verify the behavior is the same as what is saved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with P
// Create the executor's working directory
val executorDir = new File(worker.workDir, appId + "/" + execId)

if (!executorDir.exists && !executorDir.mkdirs()) {
if (!executorDir.exists && !Utils.createDirectory(executorDir)) {
throw new IOException("Failed to create directory " + executorDir)
}
executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
// compile a small jar containing an exception that will be thrown on an executor.
val tempDir = Utils.createTempDir()
val srcDir = new File(tempDir, "repro/")
srcDir.mkdirs()
Utils.createDirectory(srcDir)
val excSource = new JavaSourceFromString(new File(srcDir, "MyException").toURI.getPath,
"""package repro;
|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class LocalDirsSuite extends SparkFunSuite with LocalRootDirsTest {

private def assumeNonExistentAndNotCreatable(f: File): Unit = {
try {
assume(!f.exists() && !f.mkdirs())
assume(!f.exists() && !Utils.createDirectory(f))
} finally {
Utils.deleteRecursively(f)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
reduceId: Int,
blockId: String): AppShufflePartitionInfo = {
val dataFile = ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager, partitionId, reduceId)
dataFile.getParentFile.mkdirs()
Utils.createDirectory(dataFile.getParentFile)
val indexFile =
ShuffleTestAccessor.getMergedShuffleIndexFile(mergeManager, partitionId, reduceId)
indexFile.getParentFile.mkdirs()
Utils.createDirectory(indexFile.getParentFile)
val metaFile = ShuffleTestAccessor.getMergedShuffleMetaFile(mergeManager, partitionId, reduceId)
metaFile.getParentFile.mkdirs()
Utils.createDirectory(metaFile.getParentFile)
val partitionInfo = ShuffleTestAccessor.getOrCreateAppShufflePartitionInfo(
mergeManager, partitionId, reduceId, blockId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{ListState, MapState, OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeMode, TimerValues, TTLConfig, ValueState}
import org.apache.spark.sql.types._
import org.apache.spark.util.SparkFileUtils

case class InputRowForConnectTest(key: String, value: String)
case class OutputRowForConnectTest(key: String, value: String)
Expand Down Expand Up @@ -494,7 +495,7 @@ class TransformWithStateConnectSuite
val file = Paths.get(inputPath).toFile
val parentDir = file.getParentFile
if (parentDir != null && !parentDir.exists()) {
parentDir.mkdirs()
SparkFileUtils.createDirectory(parentDir)
}

val writer = new BufferedWriter(new FileWriter(inputPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class RocksDBFileManager(
versionToRocksDBFiles.keySet().removeIf(_._1 >= version)
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localDir.mkdirs()
Utils.createDirectory(localDir)
// Since we cleared the local dir, we should also clear the local file mapping
rocksDBFileMapping.clear()
RocksDBCheckpointMetadata(Seq.empty, 0)
Expand Down Expand Up @@ -828,7 +828,7 @@ class RocksDBFileManager(
private def getImmutableFilesFromVersionZip(
version: Long, checkpointUniqueId: Option[String] = None): Seq[RocksDBImmutableFile] = {
Utils.deleteRecursively(localTempDir)
localTempDir.mkdirs()
Utils.createDirectory(localTempDir)
Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId), localTempDir)
val metadataFile = localMetadataFile(localTempDir)
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.tags.ExtendedSQLTest
import org.apache.spark.util.Utils

// scalastyle:off line.size.limit
/**
Expand Down Expand Up @@ -147,7 +148,7 @@ class ExpressionsSchemaSuite extends QueryTest with SharedSparkSession {
val goldenOutput = (header ++ outputBuffer).mkString("\n")
val parent = resultFile.getParentFile
if (!parent.exists()) {
assert(parent.mkdirs(), "Could not create directory: " + parent)
assert(Utils.createDirectory(parent), "Could not create directory: " + parent)
}
stringToFile(resultFile, goldenOutput)
// scalastyle:off println
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile, CollationFactory}
import org.apache.spark.util.Utils

// scalastyle:off line.size.limit
/**
Expand Down Expand Up @@ -54,7 +55,7 @@ class ICUCollationsMapSuite extends SparkFunSuite {
}
val parent = collationsMapFile.getParentFile
if (!parent.exists()) {
assert(parent.mkdirs(), "Could not create directory: " + parent)
assert(Utils.createDirectory(parent), "Could not create directory: " + parent)
}
stringToFile(collationsMapFile, goldenOutput)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec, ValidateRequirements}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.tags.ExtendedSQLTest
import org.apache.spark.util.Utils

// scalastyle:off line.size.limit
/**
Expand Down Expand Up @@ -123,7 +124,7 @@ trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite {

if (!foundMatch) {
FileUtils.deleteDirectory(dir)
assert(dir.mkdirs())
assert(Utils.createDirectory(dir))

val file = new File(dir, "simplified.txt")
FileUtils.writeStringToFile(file, simplified, StandardCharsets.UTF_8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper
val resultFile = new File(testCase.resultFile)
val parent = resultFile.getParentFile
if (!parent.exists()) {
assert(parent.mkdirs(), "Could not create directory: " + parent)
assert(Utils.createDirectory(parent), "Could not create directory: " + parent)
}
stringToFile(resultFile, goldenOutput)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, strin
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.TestSparkSession
import org.apache.spark.tags.ExtendedSQLTest
import org.apache.spark.util.Utils

/**
* End-to-end tests to check TPCDS query results.
Expand Down Expand Up @@ -122,7 +123,7 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
}
val parent = goldenFile.getParentFile
if (!parent.exists()) {
assert(parent.mkdirs(), "Could not create directory: " + parent)
assert(Utils.createDirectory(parent), "Could not create directory: " + parent)
}
stringToFile(goldenFile, goldenOutput)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession {

Seq("p1=1/p2=2/p3=3/file1", "p1=1/p2=3/p3=3/file1").foreach { fileName =>
val file = new File(tempDir, fileName)
assert(file.getParentFile.exists() || file.getParentFile.mkdirs())
assert(file.getParentFile.exists() || Utils.createDirectory(file.getParentFile))
util.stringToFile(file, fileName)
}

Expand Down Expand Up @@ -682,7 +682,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession {
files.foreach {
case (name, size) =>
val file = new File(tempDir, name)
assert(file.getParentFile.exists() || file.getParentFile.mkdirs())
assert(file.getParentFile.exists() || Utils.createDirectory(file.getParentFile))
util.stringToFile(file, "*" * size)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4065,7 +4065,7 @@ abstract class JsonSuite

// Test scan with partitions.
withTempDir { dir =>
new File(dir, "a=1/b=2/").mkdirs()
Utils.createDirectory(new File(dir, "a=1/b=2/"))
Files.write(new File(dir, "a=1/b=2/file.json").toPath, content)
checkAnswer(
spark.read.format("json").option("singleVariantColumn", "var")
Expand Down
Loading