Skip to content

Commit

Permalink
added takeFirst
Browse files Browse the repository at this point in the history
  • Loading branch information
lucananni93 committed May 10, 2019
1 parent 3f08bc8 commit 7230b4d
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,12 @@ object PythonManager {
new CollectedResult(result)
}

// def take(index: Int, n: Int): CollectedResult = {
// this.checkSparkContext()
// val variableToTake = this.getVariable(index)
// val result = this.server.TAKE(variableToTake, n)
// new CollectedResult(result)
// }
def take(index: Int, n: Int): CollectedResult = {
this.checkSparkContext()
val variableToTake = this.getVariable(index)
val result = this.server.TAKE_FIRST(variableToTake, n)
new CollectedResult( (result._1.toIterator, result._2.toIterator, result._3) )
}

def serializeVariable(index: Int): String = {
val variableToSerialize = this.getVariable(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ class StubExecutor extends Implementation {
override def getParser(name: String, dataset: String): GMQLLoaderBase = ???

override def collectIterator(iRVariable: IRVariable): (Iterator[(GRecordKey, Array[GValue])], Iterator[(Long, (String, String))], List[(String, PARSING_TYPE)]) = ???

override def takeFirst(iRVariable: IRVariable, n: Int): (Array[(GRecordKey, Array[GValue])], Array[(Long, (String, String))], List[(String, PARSING_TYPE)]) = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class GmqlServer(var implementation: Implementation, binning_size: Option[Long]
implementation.collectIterator(iRVariable)
}

def TAKE_FIRST(iRVariable: IRVariable, n: Int): (Array[(GRecordKey, Array[GValue])], Array[(Long, (String, String))], List[(String, PARSING_TYPE)]) = {
optimise()
implementation.takeFirst(iRVariable, n)
}

def TAKE(iRVariable: IRVariable, n: Int): Any = {
optimise()
implementation.take(iRVariable, n)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ abstract class Implementation {

def take(iRVariable: IRVariable, n:Int):Any

def takeFirst(iRVariable: IRVariable, n:Int): (Array[(GRecordKey, Array[GValue])], Array[(Long, (String, String))], List[(String, PARSING_TYPE)])

/** stop GMQL implementation (kill a job)*/
def stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class GMQLSparkExecutor(val binSize : BinSize = BinSize(), val maxBinDistance :
(regionRDD,metaRDD,iRVariable.schema)
}

override def takeFirst(iRVariable: IRVariable, n: Int): (Array[(GRecordKey, Array[GValue])], Array[(Long, (String, String))], List[(String, PARSING_TYPE)]) = {
val metaRDD = implement_md(iRVariable.metaDag, sc).collect()
val regionRDD = implement_rd(iRVariable.regionDag, sc).take(n)

(regionRDD, metaRDD, iRVariable.schema)
}

override def stop(): Unit = {
sc.stop()
}
Expand Down

0 comments on commit 7230b4d

Please sign in to comment.