From ec949a9dee8fe380cb8bbf497c2d7a0487cb9eae Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Mon, 13 Jan 2025 19:20:29 +0530 Subject: [PATCH] get online brokers in pinot spark connector --- .../pinot/connector/spark/common/PinotClusterClient.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala index 1c5dafe2a5b3..8e172848d3b6 100644 --- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala +++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotClusterClient.scala @@ -33,7 +33,7 @@ import scala.util.{Failure, Success, Try} */ private[pinot] object PinotClusterClient extends Logging { private val TABLE_SCHEMA_TEMPLATE = "http://%s/tables/%s/schema" - private val TABLE_BROKER_INSTANCES_TEMPLATE = "http://%s/v2/brokers/tables/%s" + private val LIVE_BROKERS_TEMPLATE = "http://%s/tables/livebrokers?tables=%s" private val TIME_BOUNDARY_TEMPLATE = "http://%s/debug/timeBoundary/%s" private val ROUTING_TABLE_TEMPLATE = "http://%s/debug/routingTable/sql?query=%s" private val INSTANCES_API_TEMPLATE = "http://%s/instances/%s" @@ -62,7 +62,7 @@ private[pinot] object PinotClusterClient extends Logging { */ def getBrokerInstances(controllerUrl: String, tableName: String): List[String] = { Try { - val uri = new URI(String.format(TABLE_BROKER_INSTANCES_TEMPLATE, controllerUrl, tableName)) + val uri = new URI(String.format(LIVE_BROKERS_TEMPLATE, controllerUrl, tableName)) val response = HttpUtils.sendGetRequest(uri) implicit val decodeIntOrString: Decoder[Either[Int, String]] = Decoder[Int].map(Left(_)).or(Decoder[String].map(Right(_)))