diff --git a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala index 44ce22dfb1..92c9f85e89 100644 --- a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala +++ b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala @@ -32,6 +32,8 @@ import org.apache.texera.dao.SqlServer import org.apache.texera.dao.SqlServer.withTransaction import org.apache.texera.dao.jooq.generated.enums.PrivilegeEnum import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART import org.apache.texera.dao.jooq.generated.tables.DatasetUserAccess.DATASET_USER_ACCESS import org.apache.texera.dao.jooq.generated.tables.DatasetVersion.DATASET_VERSION import org.apache.texera.dao.jooq.generated.tables.User.USER @@ -53,25 +55,23 @@ import org.apache.texera.service.util.S3StorageClient.{ MAXIMUM_NUM_OF_MULTIPART_S3_PARTS, MINIMUM_NUM_OF_MULTIPART_S3_PART } -import org.jooq.{DSLContext, EnumType} +import org.jooq.exception.DataAccessException import org.jooq.impl.DSL import org.jooq.impl.DSL.{inline => inl} +import org.jooq.{DSLContext, EnumType} +import software.amazon.awssdk.services.s3.model.UploadPartResponse + import java.io.{InputStream, OutputStream} import java.net.{HttpURLConnection, URL, URLDecoder} import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} +import java.sql.SQLException import java.util import java.util.Optional import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ -import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION -import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART -import org.jooq.exception.DataAccessException -import software.amazon.awssdk.services.s3.model.UploadPartResponse - -import java.sql.SQLException import scala.util.Try object DatasetResource { @@ -80,6 +80,16 @@ object DatasetResource { .getInstance() .createDSLContext() + private def singleFileUploadMaxBytes(ctx: DSLContext, defaultMiB: Long = 20L): Long = { + val limit = ctx + .select(DSL.field("value", classOf[String])) + .from(DSL.table(DSL.name("texera_db", "site_settings"))) + .where(DSL.field("key", classOf[String]).eq("single_file_upload_max_size_mib")) + .fetchOneInto(classOf[String]) + Try(Option(limit).getOrElse(defaultMiB.toString).trim.toLong) + .getOrElse(defaultMiB) * 1024L * 1024L + } + /** * Helper function to get the dataset from DB using did */ @@ -647,14 +657,16 @@ class DatasetResource { @QueryParam("ownerEmail") ownerEmail: String, @QueryParam("datasetName") datasetName: String, @QueryParam("filePath") filePath: String, - @QueryParam("numParts") numParts: Optional[Integer], + @QueryParam("fileSizeBytes") fileSizeBytes: Optional[Long], + @QueryParam("partSizeBytes") partSizeBytes: Optional[Long], @Auth user: SessionUser ): Response = { val uid = user.getUid val dataset: Dataset = getDatasetBy(ownerEmail, datasetName) operationType.toLowerCase match { - case "init" => initMultipartUpload(dataset.getDid, filePath, numParts, uid) + case "init" => + initMultipartUpload(dataset.getDid, filePath, fileSizeBytes, partSizeBytes, uid) case "finish" => finishMultipartUpload(dataset.getDid, filePath, uid) case "abort" => abortMultipartUpload(dataset.getDid, filePath, uid) case _ => @@ -715,7 +727,55 @@ class DatasetResource { if (session == null) throw new NotFoundException("Upload session not found. Call type=init first.") - val expectedParts = session.getNumPartsRequested + val expectedParts: Int = session.getNumPartsRequested + val fileSizeBytesValue: Long = session.getFileSizeBytes + val partSizeBytesValue: Long = session.getPartSizeBytes + + if (fileSizeBytesValue <= 0L) { + throw new WebApplicationException( + s"Upload session has an invalid file size of $fileSizeBytesValue. Restart the upload.", + Response.Status.INTERNAL_SERVER_ERROR + ) + } + if (partSizeBytesValue <= 0L) { + throw new WebApplicationException( + s"Upload session has an invalid part size of $partSizeBytesValue. Restart the upload.", + Response.Status.INTERNAL_SERVER_ERROR + ) + } + + // lastPartSize = fileSize - partSize*(expectedParts-1) + val nMinus1: Long = expectedParts.toLong - 1L + if (nMinus1 < 0L) { + throw new WebApplicationException( + s"Upload session has an invalid number of requested parts of $expectedParts. Restart the upload.", + Response.Status.INTERNAL_SERVER_ERROR + ) + } + if (nMinus1 > 0L && partSizeBytesValue > Long.MaxValue / nMinus1) { + throw new WebApplicationException( + "Overflow while computing last part size", + Response.Status.INTERNAL_SERVER_ERROR + ) + } + val prefixBytes: Long = partSizeBytesValue * nMinus1 + if (prefixBytes > fileSizeBytesValue) { + throw new WebApplicationException( + s"Upload session is invalid: computed bytes before last part ($prefixBytes) exceed declared file size ($fileSizeBytesValue). Restart the upload.", + Response.Status.INTERNAL_SERVER_ERROR + ) + } + val lastPartSize: Long = fileSizeBytesValue - prefixBytes + if (lastPartSize <= 0L || lastPartSize > partSizeBytesValue) { + throw new WebApplicationException( + s"Upload session is invalid: computed last part size ($lastPartSize bytes) must be within 1..$partSizeBytesValue bytes. Restart the upload.", + Response.Status.INTERNAL_SERVER_ERROR + ) + } + + val allowedSize: Long = + if (partNumber < expectedParts) partSizeBytesValue else lastPartSize + if (partNumber > expectedParts) { throw new BadRequestException( s"$partNumber exceeds the requested parts on init: $expectedParts" @@ -729,10 +789,17 @@ class DatasetResource { ) } + if (contentLength != allowedSize) { + throw new BadRequestException( + s"Invalid part size for partNumber=$partNumber. " + + s"Expected Content-Length=$allowedSize, got $contentLength." + ) + } + val physicalAddr = Option(session.getPhysicalAddress).map(_.trim).getOrElse("") if (physicalAddr.isEmpty) { throw new WebApplicationException( - "Upload session is missing physicalAddress. Re-init the upload.", + "Upload session is missing physicalAddress. Restart the upload.", Response.Status.INTERNAL_SERVER_ERROR ) } @@ -743,7 +810,7 @@ class DatasetResource { catch { case e: IllegalArgumentException => throw new WebApplicationException( - s"Upload session has invalid physicalAddress. Re-init the upload. (${e.getMessage})", + s"Upload session has invalid physicalAddress. Restart the upload. (${e.getMessage})", Response.Status.INTERNAL_SERVER_ERROR ) } @@ -775,7 +842,7 @@ class DatasetResource { if (partRow == null) { // Should not happen if init pre-created rows throw new WebApplicationException( - s"Part row not initialized for part $partNumber. Re-init the upload.", + s"Part row not initialized for part $partNumber. Restart the upload.", Response.Status.INTERNAL_SERVER_ERROR ) } @@ -1399,7 +1466,8 @@ class DatasetResource { private def initMultipartUpload( did: Integer, encodedFilePath: String, - numParts: Optional[Integer], + fileSizeBytes: Optional[Long], + partSizeBytes: Optional[Long], uid: Integer ): Response = { @@ -1416,12 +1484,63 @@ class DatasetResource { URLDecoder.decode(encodedFilePath, StandardCharsets.UTF_8.name()) ) - val numPartsValue = numParts.toScala.getOrElse { - throw new BadRequestException("numParts is required for initialization") + val fileSizeBytesValue: Long = + fileSizeBytes + .orElseThrow(() => + new BadRequestException("fileSizeBytes is required for initialization") + ) + + if (fileSizeBytesValue <= 0L) { + throw new BadRequestException("fileSizeBytes must be > 0") } - if (numPartsValue < 1 || numPartsValue > MAXIMUM_NUM_OF_MULTIPART_S3_PARTS) { + + val partSizeBytesValue: Long = + partSizeBytes + .orElseThrow(() => + new BadRequestException("partSizeBytes is required for initialization") + ) + + if (partSizeBytesValue <= 0L) { + throw new BadRequestException("partSizeBytes must be > 0") + } + + // singleFileUploadMaxBytes applies to TOTAL bytes (sum of all parts == file size) + val totalMaxBytes: Long = singleFileUploadMaxBytes(ctx) + if (totalMaxBytes <= 0L) { + throw new WebApplicationException( + "singleFileUploadMaxBytes must be > 0", + Response.Status.INTERNAL_SERVER_ERROR + ) + } + if (fileSizeBytesValue > totalMaxBytes) { throw new BadRequestException( - "numParts must be between 1 and " + MAXIMUM_NUM_OF_MULTIPART_S3_PARTS + s"fileSizeBytes=$fileSizeBytesValue exceeds singleFileUploadMaxBytes=$totalMaxBytes" + ) + } + + // Compute numParts = ceil(fileSize / partSize) = (fileSize + partSize - 1) / partSize + val addend: Long = partSizeBytesValue - 1L + if (addend < 0L || fileSizeBytesValue > Long.MaxValue - addend) { + throw new WebApplicationException( + "Overflow while computing numParts", + Response.Status.INTERNAL_SERVER_ERROR + ) + } + + val numPartsLong: Long = (fileSizeBytesValue + addend) / partSizeBytesValue + if (numPartsLong < 1L || numPartsLong > MAXIMUM_NUM_OF_MULTIPART_S3_PARTS.toLong) { + throw new BadRequestException( + s"Computed numParts=$numPartsLong is out of range 1..$MAXIMUM_NUM_OF_MULTIPART_S3_PARTS" + ) + } + val numPartsValue: Int = numPartsLong.toInt + + // S3 multipart constraint: all non-final parts must be >= 5MiB. + // If we have >1 parts, then partSizeBytesValue is the non-final part size. + if (numPartsValue > 1 && partSizeBytesValue < MINIMUM_NUM_OF_MULTIPART_S3_PART) { + throw new BadRequestException( + s"partSizeBytes=$partSizeBytesValue is too small. " + + s"All non-final parts must be >= $MINIMUM_NUM_OF_MULTIPART_S3_PART bytes." ) } @@ -1453,7 +1572,6 @@ class DatasetResource { val uploadIdStr = presign.getUploadId val physicalAddr = presign.getPhysicalAddress - // If anything fails after this point, abort LakeFS multipart try { val rowsInserted = ctx .insertInto(DATASET_UPLOAD_SESSION) @@ -1462,7 +1580,9 @@ class DatasetResource { .set(DATASET_UPLOAD_SESSION.UID, uid) .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr) .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr) - .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, numPartsValue) + .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, Integer.valueOf(numPartsValue)) + .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES, java.lang.Long.valueOf(fileSizeBytesValue)) + .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES, java.lang.Long.valueOf(partSizeBytesValue)) .onDuplicateKeyIgnore() .execute() @@ -1506,7 +1626,6 @@ class DatasetResource { Response.ok().build() } catch { case e: Exception => - // rollback will remove session + parts rows; we still must abort LakeFS try { LakeFSStorageClient.abortPresignedMultipartUploads( repositoryName, @@ -1572,7 +1691,7 @@ class DatasetResource { val physicalAddr = Option(session.getPhysicalAddress).map(_.trim).getOrElse("") if (physicalAddr.isEmpty) { throw new WebApplicationException( - "Upload session is missing physicalAddress. Re-init the upload.", + "Upload session is missing physicalAddress. Restart the upload.", Response.Status.INTERNAL_SERVER_ERROR ) } @@ -1595,7 +1714,7 @@ class DatasetResource { if (totalCnt != expectedParts) { throw new WebApplicationException( - s"Part table mismatch: expected $expectedParts rows but found $totalCnt. Re-init the upload.", + s"Part table mismatch: expected $expectedParts rows but found $totalCnt. Restart the upload.", Response.Status.INTERNAL_SERVER_ERROR ) } @@ -1646,7 +1765,29 @@ class DatasetResource { physicalAddr ) - // Cleanup: delete the session; parts are removed by ON DELETE CASCADE + // FINAL SERVER-SIDE SIZE CHECK (do not rely on init) + val actualSizeBytes = + Option(objectStats.getSizeBytes).map(_.longValue()).getOrElse(-1L) + + if (actualSizeBytes <= 0L) { + throw new WebApplicationException( + "lakeFS did not return sizeBytes for completed multipart upload", + Response.Status.INTERNAL_SERVER_ERROR + ) + } + + val maxBytes = singleFileUploadMaxBytes(ctx) + val tooLarge = actualSizeBytes > maxBytes + + if (tooLarge) { + try { + LakeFSStorageClient.resetObjectUploadOrDeletion(dataset.getRepositoryName, filePath) + } catch { + case _: Throwable => () + } + } + + // always cleanup session ctx .deleteFrom(DATASET_UPLOAD_SESSION) .where( @@ -1657,6 +1798,13 @@ class DatasetResource { ) .execute() + if (tooLarge) { + throw new WebApplicationException( + s"Upload exceeded max size: actualSizeBytes=$actualSizeBytes maxBytes=$maxBytes", + Response.Status.REQUEST_ENTITY_TOO_LARGE + ) + } + Response .ok( Map( @@ -1716,7 +1864,7 @@ class DatasetResource { val physicalAddr = Option(session.getPhysicalAddress).map(_.trim).getOrElse("") if (physicalAddr.isEmpty) { throw new WebApplicationException( - "Upload session is missing physicalAddress. Re-init the upload.", + "Upload session is missing physicalAddress. Restart the upload.", Response.Status.INTERNAL_SERVER_ERROR ) } diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index 3f72c57486..f781a94f23 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -22,7 +22,7 @@ package org.apache.texera.service.resource import ch.qos.logback.classic.{Level, Logger} import io.lakefs.clients.sdk.ApiException import jakarta.ws.rs._ -import jakarta.ws.rs.core.{Cookie, HttpHeaders, MediaType, MultivaluedHashMap, Response} +import jakarta.ws.rs.core._ import org.apache.texera.amber.core.storage.util.LakeFSStorageClient import org.apache.texera.auth.SessionUser import org.apache.texera.dao.MockTexeraDB @@ -34,10 +34,10 @@ import org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset, User} import org.apache.texera.service.MockLakeFS import org.jooq.SQLDialect import org.jooq.impl.DSL -import org.scalatest.tagobjects.Slow -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Tag} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.scalatest.tagobjects.Slow +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Tag} import org.slf4j.LoggerFactory import java.io.{ByteArrayInputStream, IOException, InputStream} @@ -181,6 +181,8 @@ class DatasetResourceSpec catch { case e: ApiException if e.getCode == 409 => // ok } + // Ensure max upload size setting does not leak between tests + clearMaxUploadMiB() } override protected def afterAll(): Unit = { @@ -383,23 +385,96 @@ class DatasetResourceSpec override def getDate: Date = null override def getLength: Int = -1 } + private def mkHeadersRawContentLength(raw: String): HttpHeaders = + new HttpHeaders { + override def getRequestHeader(name: String): java.util.List[String] = + if (HttpHeaders.CONTENT_LENGTH.equalsIgnoreCase(name)) Collections.singletonList(raw) + else Collections.emptyList() + + override def getHeaderString(name: String): String = + if (HttpHeaders.CONTENT_LENGTH.equalsIgnoreCase(name)) raw else null + override def getRequestHeaders: MultivaluedMap[String, String] = { + val map = new MultivaluedHashMap[String, String]() + map.putSingle(HttpHeaders.CONTENT_LENGTH, raw) + map + } + override def getAcceptableMediaTypes: java.util.List[MediaType] = Collections.emptyList() + override def getAcceptableLanguages: java.util.List[Locale] = Collections.emptyList() + override def getMediaType: MediaType = null + override def getLanguage: Locale = null + override def getCookies: java.util.Map[String, Cookie] = Collections.emptyMap() + // Not used by the resource (it reads getHeaderString), but keep it safe. + override def getLength: Int = -1 + override def getDate: Date = ??? + } private def uniqueFilePath(prefix: String): String = s"$prefix/${System.nanoTime()}-${Random.alphanumeric.take(8).mkString}.bin" + // ---------- site_settings helpers (max upload size) ---------- + private val MaxUploadKey = "single_file_upload_max_size_mib" + + private def upsertSiteSetting(key: String, value: String): Unit = { + val table = DSL.table(DSL.name("texera_db", "site_settings")) + val keyField = DSL.field(DSL.name("key"), classOf[String]) + val valField = DSL.field(DSL.name("value"), classOf[String]) + + // Keep it simple + compatible across jOOQ versions: delete then insert. + val ctx = getDSLContext + ctx.deleteFrom(table).where(keyField.eq(key)).execute() + ctx.insertInto(table).columns(keyField, valField).values(key, value).execute() + } + + private def deleteSiteSetting(key: String): Boolean = { + val table = DSL.table(DSL.name("texera_db", "site_settings")) + val keyField = DSL.field(DSL.name("key"), classOf[String]) + getDSLContext.deleteFrom(table).where(keyField.eq(key)).execute() > 0 + } + + private def setMaxUploadMiB(mib: Long): Unit = upsertSiteSetting(MaxUploadKey, mib.toString) + private def clearMaxUploadMiB(): Unit = deleteSiteSetting(MaxUploadKey) + + /** + * Convenience helper that adapts legacy "numParts" tests to the new init API: + * init now takes (fileSizeBytes, partSizeBytes) and computes numParts internally. + * + * - Non-final parts are exactly partSizeBytes. + * - Final part is exactly lastPartBytes. + */ private def initUpload( filePath: String, numParts: Int, + lastPartBytes: Int = 1, + partSizeBytes: Int = MinNonFinalPartBytes, user: SessionUser = multipartOwnerSessionUser - ): Response = + ): Response = { + require(numParts >= 1, "numParts must be >= 1") + require(lastPartBytes > 0, "lastPartBytes must be > 0") + require(partSizeBytes > 0, "partSizeBytes must be > 0") + if (numParts > 1) + require( + lastPartBytes <= partSizeBytes, + "lastPartBytes must be <= partSizeBytes for multipart" + ) + + val fileSizeBytes: Long = + if (numParts == 1) lastPartBytes.toLong + else partSizeBytes.toLong * (numParts.toLong - 1L) + lastPartBytes.toLong + + // For numParts == 1, allow partSizeBytes >= fileSizeBytes (still computes 1 part). + val maxPartSizeBytes: Long = + if (numParts == 1) Math.max(partSizeBytes.toLong, fileSizeBytes) else partSizeBytes.toLong + datasetResource.multipartUpload( "init", ownerUser.getEmail, multipartDataset.getName, urlEnc(filePath), - Optional.of(numParts), + Optional.of(java.lang.Long.valueOf(fileSizeBytes)), + Optional.of(java.lang.Long.valueOf(maxPartSizeBytes)), user ) + } private def finishUpload( filePath: String, @@ -411,6 +486,7 @@ class DatasetResourceSpec multipartDataset.getName, urlEnc(filePath), Optional.empty(), + Optional.empty(), user ) @@ -424,6 +500,7 @@ class DatasetResourceSpec multipartDataset.getName, urlEnc(filePath), Optional.empty(), + Optional.empty(), user ) @@ -433,11 +510,14 @@ class DatasetResourceSpec bytes: Array[Byte], user: SessionUser = multipartOwnerSessionUser, contentLengthOverride: Option[Long] = None, - missingContentLength: Boolean = false + missingContentLength: Boolean = false, + rawContentLengthOverride: Option[String] = None ): Response = { - val hdrs = + val contentLength = contentLengthOverride.getOrElse(bytes.length.toLong) + val headers = if (missingContentLength) mkHeadersMissingContentLength - else mkHeaders(contentLengthOverride.getOrElse(bytes.length.toLong)) + else + rawContentLengthOverride.map(mkHeadersRawContentLength).getOrElse(mkHeaders(contentLength)) datasetResource.uploadPart( ownerUser.getEmail, @@ -445,7 +525,7 @@ class DatasetResourceSpec urlEnc(filePath), partNumber, new ByteArrayInputStream(bytes), - hdrs, + headers, user ) } @@ -455,17 +535,21 @@ class DatasetResourceSpec partNumber: Int, stream: InputStream, contentLength: Long, - user: SessionUser = multipartOwnerSessionUser - ): Response = + user: SessionUser = multipartOwnerSessionUser, + rawContentLengthOverride: Option[String] = None + ): Response = { + val headers = + rawContentLengthOverride.map(mkHeadersRawContentLength).getOrElse(mkHeaders(contentLength)) datasetResource.uploadPart( ownerUser.getEmail, multipartDataset.getName, urlEnc(filePath), partNumber, stream, - mkHeaders(contentLength), + headers, user ) + } private def fetchSession(filePath: String) = getDSLContext @@ -524,26 +608,165 @@ class DatasetResourceSpec assertPlaceholdersCreated(sessionRecord.getUploadId, expectedParts = 3) } - it should "reject missing numParts" in { - val filePath = uniqueFilePath("init-missing-numparts") - val ex = intercept[BadRequestException] { + it should "reject missing fileSizeBytes / partSizeBytes" in { + val filePath1 = uniqueFilePath("init-missing-filesize") + val ex1 = intercept[BadRequestException] { datasetResource.multipartUpload( "init", ownerUser.getEmail, multipartDataset.getName, - urlEnc(filePath), + urlEnc(filePath1), Optional.empty(), + Optional.of(java.lang.Long.valueOf(MinNonFinalPartBytes.toLong)), multipartOwnerSessionUser ) } - assertStatus(ex, 400) + assertStatus(ex1, 400) + + val filePath2 = uniqueFilePath("init-missing-partsize") + val ex2 = intercept[BadRequestException] { + datasetResource.multipartUpload( + "init", + ownerUser.getEmail, + multipartDataset.getName, + urlEnc(filePath2), + Optional.of(java.lang.Long.valueOf(1L)), + Optional.empty(), + multipartOwnerSessionUser + ) + } + assertStatus(ex2, 400) + } + + it should "reject invalid fileSizeBytes / partSizeBytes (<= 0)" in { + val filePath = uniqueFilePath("init-bad-sizes") + + assertStatus( + intercept[BadRequestException] { + datasetResource.multipartUpload( + "init", + ownerUser.getEmail, + multipartDataset.getName, + urlEnc(filePath), + Optional.of(java.lang.Long.valueOf(0L)), + Optional.of(java.lang.Long.valueOf(1L)), + multipartOwnerSessionUser + ) + }, + 400 + ) + + assertStatus( + intercept[BadRequestException] { + datasetResource.multipartUpload( + "init", + ownerUser.getEmail, + multipartDataset.getName, + urlEnc(filePath), + Optional.of(java.lang.Long.valueOf(1L)), + Optional.of(java.lang.Long.valueOf(0L)), + multipartOwnerSessionUser + ) + }, + 400 + ) + } + + it should "enforce max upload size at init (>, == boundary)" in { + // Use a tiny limit so the test doesn't allocate big buffers. + setMaxUploadMiB(1) // 1 MiB + + val oneMiB: Long = 1024L * 1024L + + val filePathOver = uniqueFilePath("init-max-over") + assertStatus( + intercept[BadRequestException] { + datasetResource.multipartUpload( + "init", + ownerUser.getEmail, + multipartDataset.getName, + urlEnc(filePathOver), + Optional.of(java.lang.Long.valueOf(oneMiB + 1L)), + Optional.of(java.lang.Long.valueOf(oneMiB + 1L)), // single-part + multipartOwnerSessionUser + ) + }, + 400 + ) + + val filePathEq = uniqueFilePath("init-max-eq") + val resp = + datasetResource.multipartUpload( + "init", + ownerUser.getEmail, + multipartDataset.getName, + urlEnc(filePathEq), + Optional.of(java.lang.Long.valueOf(oneMiB)), + Optional.of(java.lang.Long.valueOf(oneMiB)), // single-part + multipartOwnerSessionUser + ) + + resp.getStatus shouldEqual 200 + fetchSession(filePathEq) should not be null + } + + it should "enforce max upload size for multipart (2-part boundary)" in { + setMaxUploadMiB(6) // 6 MiB + + val max6MiB: Long = 6L * 1024L * 1024L + val partSize: Long = MinNonFinalPartBytes.toLong // 5 MiB + + val filePathEq = uniqueFilePath("init-max-multipart-eq") + val respEq = + datasetResource.multipartUpload( + "init", + ownerUser.getEmail, + multipartDataset.getName, + urlEnc(filePathEq), + Optional.of(java.lang.Long.valueOf(max6MiB)), + Optional.of(java.lang.Long.valueOf(partSize)), + multipartOwnerSessionUser + ) + + respEq.getStatus shouldEqual 200 + fetchSession(filePathEq).getNumPartsRequested shouldEqual 2 + + val filePathOver = uniqueFilePath("init-max-multipart-over") + assertStatus( + intercept[BadRequestException] { + datasetResource.multipartUpload( + "init", + ownerUser.getEmail, + multipartDataset.getName, + urlEnc(filePathOver), + Optional.of(java.lang.Long.valueOf(max6MiB + 1L)), + Optional.of(java.lang.Long.valueOf(partSize)), + multipartOwnerSessionUser + ) + }, + 400 + ) } - it should "reject invalid numParts (0, negative, too large)" in { - val filePath = uniqueFilePath("init-bad-numparts") - assertStatus(intercept[BadRequestException] { initUpload(filePath, 0) }, 400) - assertStatus(intercept[BadRequestException] { initUpload(filePath, -1) }, 400) - assertStatus(intercept[BadRequestException] { initUpload(filePath, 1000000000) }, 400) + it should "reject init when fileSizeBytes/partSizeBytes would overflow numParts computation (malicious huge inputs)" in { + // Make max big enough to get past the max-size gate without overflowing maxBytes itself. + val maxMiB: Long = Long.MaxValue / (1024L * 1024L) + setMaxUploadMiB(maxMiB) + val totalMaxBytes: Long = maxMiB * 1024L * 1024L + val filePath = uniqueFilePath("init-overflow-numParts") + + val ex = intercept[WebApplicationException] { + datasetResource.multipartUpload( + "init", + ownerUser.getEmail, + multipartDataset.getName, + urlEnc(filePath), + Optional.of(java.lang.Long.valueOf(totalMaxBytes)), + Optional.of(java.lang.Long.valueOf(MinNonFinalPartBytes.toLong)), + multipartOwnerSessionUser + ) + } + assertStatus(ex, 500) } it should "reject invalid filePath (empty, absolute, '.', '..', control chars)" in { @@ -571,6 +794,7 @@ class DatasetResourceSpec multipartDataset.getName, urlEnc(filePath), Optional.empty(), + Optional.empty(), multipartOwnerSessionUser ) } @@ -679,6 +903,76 @@ class DatasetResourceSpec 400 ) } + it should "reject non-numeric Content-Length (header poisoning)" in { + val filePath = uniqueFilePath("part-cl-nonnumeric") + initUpload(filePath, numParts = 1) + val ex = intercept[BadRequestException] { + uploadPart( + filePath, + partNumber = 1, + bytes = tinyBytes(1.toByte), + rawContentLengthOverride = Some("not-a-number") + ) + } + assertStatus(ex, 400) + } + it should "reject Content-Length that overflows Long (header poisoning)" in { + val filePath = uniqueFilePath("part-cl-overflow") + initUpload(filePath, numParts = 1) + val ex = intercept[BadRequestException] { + uploadPart( + filePath, + partNumber = 1, + bytes = tinyBytes(1.toByte), + rawContentLengthOverride = Some("999999999999999999999999999999999999999") + ) + } + assertStatus(ex, 400) + } + it should "reject when Content-Length does not equal the expected part size (attempted size-bypass)" in { + val filePath = uniqueFilePath("part-cl-mismatch-expected") + initUpload(filePath, numParts = 2) + val uploadId = fetchUploadIdOrFail(filePath) + val bytes = minPartBytes(1.toByte) // exactly MinNonFinalPartBytes + val ex = intercept[BadRequestException] { + uploadPart( + filePath, + partNumber = 1, + bytes = bytes, + contentLengthOverride = Some(bytes.length.toLong - 1L) // lie by 1 byte + ) + } + assertStatus(ex, 400) + // Ensure we didn't accidentally persist an ETag for a rejected upload. + fetchPartRows(uploadId).find(_.getPartNumber == 1).get.getEtag shouldEqual "" + } + + it should "not store more bytes than declared Content-Length (send 2x bytes, claim x)" in { + val filePath = uniqueFilePath("part-body-gt-cl") + val declared: Int = 1024 + initUpload(filePath, numParts = 1, lastPartBytes = declared, partSizeBytes = declared) + + val first = Array.fill[Byte](declared)(1.toByte) + val extra = Array.fill[Byte](declared)(2.toByte) + val sent = first ++ extra // 2x bytes sent + + uploadPart( + filePath, + partNumber = 1, + bytes = sent, + contentLengthOverride = Some(declared.toLong) // claim only x + ).getStatus shouldEqual 200 + + finishUpload(filePath).getStatus shouldEqual 200 + // If anything "accepted" the extra bytes, the committed object would exceed declared size. + val repoName = multipartDataset.getRepositoryName + val downloaded = LakeFSStorageClient.getFileFromRepo(repoName, "main", filePath) + Files.size(Paths.get(downloaded.toURI)) shouldEqual declared.toLong + + val expected = sha256OfChunks(Seq(first)) + val got = sha256OfFile(Paths.get(downloaded.toURI)) + got.toSeq shouldEqual expected + } it should "reject null/empty filePath param early without depending on error text" in { val httpHeaders = mkHeaders(1L) @@ -849,6 +1143,45 @@ class DatasetResourceSpec assertStatus(ex, 404) } + it should "not commit an oversized upload if the max upload size is tightened before finish (server-side rollback)" in { + val filePath = uniqueFilePath("finish-max-tightened") + val twoMiB: Long = 2L * 1024L * 1024L + + // Allow init + part upload under a higher limit. + setMaxUploadMiB(3) // 3 MiB + datasetResource + .multipartUpload( + "init", + ownerUser.getEmail, + multipartDataset.getName, + urlEnc(filePath), + Optional.of(java.lang.Long.valueOf(twoMiB)), + Optional.of(java.lang.Long.valueOf(twoMiB)), + multipartOwnerSessionUser + ) + .getStatus shouldEqual 200 + + uploadPart(filePath, 1, Array.fill[Byte](twoMiB.toInt)(7.toByte)).getStatus shouldEqual 200 + + // Tighten the limit just before finish. + setMaxUploadMiB(1) // 1 MiB + + val ex = intercept[WebApplicationException] { + finishUpload(filePath) // this now THROWS 413 (doesn't return Response) + } + ex.getResponse.getStatus shouldEqual 413 + + // Oversized objects must not remain accessible after finish (rollback happened). + val repoName = multipartDataset.getRepositoryName + val notFound = intercept[ApiException] { + LakeFSStorageClient.getFileFromRepo(repoName, "main", filePath) + } + notFound.getCode shouldEqual 404 + + // Session still available. + fetchSession(filePath) should not be null + } + it should "reject finish when no parts were uploaded (all placeholders empty) without checking messages" in { val filePath = uniqueFilePath("finish-no-parts") initUpload(filePath, numParts = 2) @@ -1101,7 +1434,7 @@ class DatasetResourceSpec it should "allow abort + re-init after part 1 succeeded but part 2 drops mid-flight; then complete successfully" in { val filePath = uniqueFilePath("reinit-after-part2-drop") - initUpload(filePath, numParts = 2).getStatus shouldEqual 200 + initUpload(filePath, numParts = 2, lastPartBytes = 1024 * 1024).getStatus shouldEqual 200 val uploadId1 = fetchUploadIdOrFail(filePath) uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200 @@ -1120,7 +1453,7 @@ class DatasetResourceSpec fetchSession(filePath) shouldBe null fetchPartRows(uploadId1) shouldBe empty - initUpload(filePath, numParts = 2).getStatus shouldEqual 200 + initUpload(filePath, numParts = 2, lastPartBytes = 123).getStatus shouldEqual 200 uploadPart(filePath, 1, minPartBytes(3.toByte)).getStatus shouldEqual 200 uploadPart(filePath, 2, tinyBytes(4.toByte, n = 123)).getStatus shouldEqual 200 finishUpload(filePath).getStatus shouldEqual 200 @@ -1135,7 +1468,7 @@ class DatasetResourceSpec } def reinitAndFinishHappy(filePath: String): Unit = { - initUpload(filePath, numParts = 2).getStatus shouldEqual 200 + initUpload(filePath, numParts = 2, lastPartBytes = 321).getStatus shouldEqual 200 uploadPart(filePath, 1, minPartBytes(7.toByte)).getStatus shouldEqual 200 uploadPart(filePath, 2, tinyBytes(8.toByte, n = 321)).getStatus shouldEqual 200 finishUpload(filePath).getStatus shouldEqual 200 @@ -1165,7 +1498,7 @@ class DatasetResourceSpec withClue("scenario (2): part2 mid-flight drop") { val filePath = uniqueFilePath("reupload-part2-drop") - initUpload(filePath, numParts = 2).getStatus shouldEqual 200 + initUpload(filePath, numParts = 2, lastPartBytes = 1024 * 1024).getStatus shouldEqual 200 val uploadId = fetchUploadIdOrFail(filePath) uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200 @@ -1215,7 +1548,7 @@ class DatasetResourceSpec // --------------------------------------------------------------------------- it should "upload without corruption (sha256 matches final object)" in { val filePath = uniqueFilePath("sha256-positive") - initUpload(filePath, numParts = 3).getStatus shouldEqual 200 + initUpload(filePath, numParts = 3, lastPartBytes = 123).getStatus shouldEqual 200 val part1 = minPartBytes(1.toByte) val part2 = minPartBytes(2.toByte) @@ -1239,7 +1572,7 @@ class DatasetResourceSpec it should "detect corruption (sha256 mismatch when a part is altered)" in { val filePath = uniqueFilePath("sha256-negative") - initUpload(filePath, numParts = 3).getStatus shouldEqual 200 + initUpload(filePath, numParts = 3, lastPartBytes = 123).getStatus shouldEqual 200 val part1 = minPartBytes(1.toByte) val part2 = minPartBytes(2.toByte) @@ -1279,7 +1612,7 @@ class DatasetResourceSpec val filePath = uniqueFilePath(s"stress-$i") val numParts = 2 + Random.nextInt(maxParts - 1) - initUpload(filePath, numParts).getStatus shouldEqual 200 + initUpload(filePath, numParts, lastPartBytes = 1024).getStatus shouldEqual 200 val sharedMin = minPartBytes((i % 127).toByte) val partFuts = (1 to numParts).map { partN => diff --git a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts index 97b2e264b7..da10d5db27 100644 --- a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts +++ b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts @@ -231,7 +231,8 @@ export class DatasetService { .set("ownerEmail", ownerEmail) .set("datasetName", datasetName) .set("filePath", encodeURIComponent(filePath)) - .set("numParts", partCount.toString()); + .set("fileSizeBytes", file.size.toString()) + .set("partSizeBytes", partSize); const init$ = this.http.post<{}>( `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`, diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql index 57ac69b687..0d596db23d 100644 --- a/sql/texera_ddl.sql +++ b/sql/texera_ddl.sql @@ -280,17 +280,31 @@ CREATE TABLE IF NOT EXISTS dataset_version CREATE TABLE IF NOT EXISTS dataset_upload_session ( - did INT NOT NULL, - uid INT NOT NULL, - file_path TEXT NOT NULL, - upload_id VARCHAR(256) NOT NULL UNIQUE, - physical_address TEXT, - num_parts_requested INT NOT NULL, + did INT NOT NULL, + uid INT NOT NULL, + file_path TEXT NOT NULL, + upload_id VARCHAR(256) NOT NULL UNIQUE, + physical_address TEXT, + num_parts_requested INT NOT NULL, + file_size_bytes BIGINT NOT NULL, + part_size_bytes BIGINT NOT NULL, PRIMARY KEY (uid, did, file_path), FOREIGN KEY (did) REFERENCES dataset(did) ON DELETE CASCADE, - FOREIGN KEY (uid) REFERENCES "user"(uid) ON DELETE CASCADE + FOREIGN KEY (uid) REFERENCES "user"(uid) ON DELETE CASCADE, + + CONSTRAINT chk_dataset_upload_session_num_parts_requested_positive + CHECK (num_parts_requested >= 1), + + CONSTRAINT chk_dataset_upload_session_file_size_bytes_positive + CHECK (file_size_bytes > 0), + + CONSTRAINT chk_dataset_upload_session_part_size_bytes_positive + CHECK (part_size_bytes > 0), + + CONSTRAINT chk_dataset_upload_session_part_size_bytes_s3_upper_bound + CHECK (part_size_bytes <= 5368709120) ); CREATE TABLE IF NOT EXISTS dataset_upload_session_part diff --git a/sql/updates/18.sql b/sql/updates/18.sql new file mode 100644 index 0000000000..92d2f998c7 --- /dev/null +++ b/sql/updates/18.sql @@ -0,0 +1,60 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ============================================ +-- 1. Connect to the texera_db database +-- ============================================ +\c texera_db +SET search_path TO texera_db, public; + +-- ============================================ +-- 2. Update the table schema +-- ============================================ +BEGIN; + +-- Add the 2 columns (defaults backfill existing rows safely) +ALTER TABLE dataset_upload_session + ADD COLUMN IF NOT EXISTS file_size_bytes BIGINT NOT NULL DEFAULT 1, + ADD COLUMN IF NOT EXISTS part_size_bytes BIGINT NOT NULL DEFAULT 5242880; + +-- Drop any old/alternate constraint names from previous attempts (so we end up with exactly the new names) +ALTER TABLE dataset_upload_session + DROP CONSTRAINT IF EXISTS dataset_upload_session_num_parts_requested_positive, + DROP CONSTRAINT IF EXISTS chk_dataset_upload_session_num_parts_requested_positive, + DROP CONSTRAINT IF EXISTS chk_dataset_upload_session_file_size_bytes_positive, + DROP CONSTRAINT IF EXISTS chk_dataset_upload_session_part_size_bytes_positive, + DROP CONSTRAINT IF EXISTS dataset_upload_session_part_size_bytes_positive, + DROP CONSTRAINT IF EXISTS dataset_upload_session_part_size_bytes_s3_upper_bound, + DROP CONSTRAINT IF EXISTS chk_dataset_upload_session_part_size_bytes_s3_upper_bound; + +-- Add constraints exactly like the new CREATE TABLE +ALTER TABLE dataset_upload_session + ADD CONSTRAINT chk_dataset_upload_session_num_parts_requested_positive + CHECK (num_parts_requested >= 1), + ADD CONSTRAINT chk_dataset_upload_session_file_size_bytes_positive + CHECK (file_size_bytes > 0), + ADD CONSTRAINT chk_dataset_upload_session_part_size_bytes_positive + CHECK (part_size_bytes > 0), + ADD CONSTRAINT chk_dataset_upload_session_part_size_bytes_s3_upper_bound + CHECK (part_size_bytes <= 5368709120); + +-- Match CREATE TABLE (no defaults) +ALTER TABLE dataset_upload_session + ALTER COLUMN file_size_bytes DROP DEFAULT, + ALTER COLUMN part_size_bytes DROP DEFAULT; + +COMMIT;