Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Implementation of query pushdown #64

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Test / resourceGenerators += Def.task {
import java.util.stream.Collectors
import scala.collection.JavaConverters._

def log(msg: Any) = println(s"[℣₳ℒ𐎅] $msg") //stand out in the crowd
def log(msg: Any): Unit = println(s"[℣₳ℒ𐎅] $msg") //stand out in the crowd

val theOnesWeLookFor = Set(
"libsqlite4java-linux-amd64-1.0.392.so",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,44 @@ private[dynamodb] object FilterPushdown {
def acceptFilters(filters: Array[Filter]): (Array[Filter], Array[Filter]) =
filters.partition(checkFilter)

/**
* Attempts to separate the given filters into a list of equality tests on the hash key and optionally a
* condition on the range key.
* If such a partitioning is not possible, the method will return (List.empty, None)
*
* @param filters list of acceptable pushdown filters
* @param keySchema hash and range key schema
* @return filters applicable as hash and range key conditions
*/
def makeQueryFilters(filters: Array[Filter], keySchema: KeySchema): (List[EqualTo], Option[Filter]) = {
// Find a valid condition on the hash key.
val hashKeyCondition = filters.find(checkHashKeyCondition(_, keySchema.hashKeyName))
hashKeyCondition.map(makeEqualityFilters(_, keySchema.hashKeyName)).map(eqFilters => {
// Also find a valid condition on the range key if possible, return equality filters regardless.
val rangeKeyCondition = keySchema.rangeKeyName.flatMap(rangeKeyName => filters.find(checkRangeKeyCondition(_, rangeKeyName)))
(eqFilters, rangeKeyCondition)
}).getOrElse({
// Even if we could not find valid conditions in each filter separately, there could still exist a valid
// 'and'-filter combining both (assuming a range key is defined in the schema).
if (keySchema.rangeKeyName.isDefined) {
filters.collectFirst({
case And(left, right) if checkHashAndRangeCondition(left, right, keySchema) =>
(makeEqualityFilters(left, keySchema.hashKeyName), Some(right))
case And(left, right) if checkHashAndRangeCondition(right, left, keySchema) =>
(makeEqualityFilters(right, keySchema.hashKeyName), Some(left))
}).getOrElse((List.empty, None))
} else (List.empty, None)
})
}

private def makeEqualityFilters(filter: Filter, hashKeyName: String): List[EqualTo] = filter match {
case eq: EqualTo if eq.attribute == hashKeyName => List(eq)
case In(attribute, values) if attribute == hashKeyName => values.map(EqualTo(attribute, _)).toList
case Or(left, right) => makeEqualityFilters(left, hashKeyName) ++ makeEqualityFilters(right, hashKeyName)
case _ => throw new IllegalArgumentException(s"Given filter is not a valid condition on key $hashKeyName")
}

// Check if the given Spark filter can form part of a DynamoDB FilterExpression.
private def checkFilter(filter: Filter): Boolean = filter match {
case _: StringEndsWith => false
case And(left, right) => checkFilter(left) && checkFilter(right)
Expand All @@ -46,6 +84,31 @@ private[dynamodb] object FilterPushdown {
case _ => true
}

// Check if the given Spark filter is a valid condition on the partition key of a DynamoDB KeyConditionExpression.
private def checkHashKeyCondition(filter: Filter, hashKeyName: String): Boolean = filter match {
case EqualTo(path, _) => path == hashKeyName
case Or(left, right) => checkHashKeyCondition(left, hashKeyName) && checkHashKeyCondition(right, hashKeyName)
case In(path, _) => path == hashKeyName
case _ => false
}

// Check if the given Spark filter is a valid condition on the sort key of a DynamoDB KeyConditionExpression.
private def checkRangeKeyCondition(filter: Filter, sortKeyName: String): Boolean = filter match {
case EqualTo(path, _) => path == sortKeyName
case GreaterThan(path, _) => path == sortKeyName
case GreaterThanOrEqual(path, _) => path == sortKeyName
case LessThan(path, _) => path == sortKeyName
case LessThanOrEqual(path, _) => path == sortKeyName
case StringStartsWith(path, _) => path == sortKeyName
// The following two are "BETWEEN" conditions.
case And(GreaterThanOrEqual(left, _), LessThanOrEqual(right, _)) => left == sortKeyName && right == sortKeyName
case And(LessThanOrEqual(right, _), GreaterThanOrEqual(left, _)) => left == sortKeyName && right == sortKeyName
case _ => false
}

private def checkHashAndRangeCondition(hashCandidate: Filter, rangeCandidate: Filter, keySchema: KeySchema) =
checkHashKeyCondition(hashCandidate, keySchema.hashKeyName) && checkRangeKeyCondition(rangeCandidate, keySchema.rangeKeyName.get)

private def buildCondition(filter: Filter): Condition = filter match {
case EqualTo(path, value: Boolean) => newBOOL(path).eq(value)
case EqualTo(path, value) => coerceAndApply(_ eq _, _ eq _)(path, value)
Expand Down Expand Up @@ -77,6 +140,11 @@ private[dynamodb] object FilterPushdown {
case StringContains(path, value) => newS(path).contains(value)
case StringEndsWith(_, _) => throw new UnsupportedOperationException("Filter `StringEndsWith` is not supported by DynamoDB")

case And(GreaterThanOrEqual(leftPath, min: Number), LessThanOrEqual(rightPath, max: Number)) if leftPath == rightPath =>
newN(leftPath).between(min, max)
case And(LessThanOrEqual(rightPath, max: Number), GreaterThanOrEqual(leftPath, min: Number)) if leftPath == rightPath =>
newN(leftPath).between(min, max)

case And(left, right) => parenthesize(buildCondition(left)) and parenthesize(buildCondition(right))
case Or(left, right) => parenthesize(buildCondition(left)) or parenthesize(buildCondition(right))
case Not(f) => parenthesize(buildCondition(f)).negate()
Expand Down