Is there any way that I can use expr.partition_by in the context of Unbounded streaming data?
#18169
Unanswered
baiguoname
asked this question in
Q&A
Replies: 1 comment
-
|
For more detail infomation, this is the // A custom TableProvider that consumes from a channel.
#[derive(Debug, Clone)]
pub struct ChannelTableProvider {
schema: Arc<Schema>,
receiver: Arc<tokio::sync::Mutex<Receiver<RecordBatch>>>,
}
#[async_trait::async_trait]
impl TableProvider for ChannelTableProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = match projection {
Some(n) => {
self.schema.project(n)?.into()
}
None => {
self.schema.clone()
}
};
let stream = DynamicRecordBatchStream::new(schema, self.receiver.clone());
Ok(Arc::new(DynamicExecutionPlan::new(self.schema.clone(), stream)))
}
}
// Custom ExecutionPlan to produce batches dynamically from a channel.
#[derive(Debug)]
pub struct DynamicExecutionPlan {
properties: PlanProperties,
stream: DynamicRecordBatchStream, // You might store the stream or a way to create it.
}
impl DynamicExecutionPlan {
pub fn new(schema: Arc<Schema>, stream: DynamicRecordBatchStream) -> Self {
use datafusion::physical_plan::expressions::Column;
let g1 = PhysicalSortExpr::new(
Arc::new(Column::new("trading_date", 2)),
SortOptions { descending: false, nulls_first: false }
);
let properties = PlanProperties::new(
{
EquivalenceProperties::new_with_orderings(
schema.clone(),
[[g1]]
)
},
// {
// Partitioning::Hash(vec![Arc::new(Column::new("trading_date", 2))], 2)
// },
// EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Unbounded { requires_infinite_memory: false },
// Boundedness::Bounded,
);
Self {
properties,
stream,
}
}
}
impl DisplayAs for DynamicExecutionPlan {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "bbb")
}
}
impl ExecutionPlan for DynamicExecutionPlan {
fn name(&self) -> &str {
"gg"
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> Arc<Schema> {
self.stream.schema.clone()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(self)
} else {
Err(DataFusionError::Internal(
"DynamicExecutionPlan can't have children".to_string(),
))
}
}
fn execute(&self, _partition: usize, _context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(self.stream.clone()))
}
}
// Custom Stream for RecordBatches coming from the channel.
#[derive(Debug, Clone)]
pub struct DynamicRecordBatchStream {
schema: Arc<Schema>,
receiver: Arc<tokio::sync::Mutex<Receiver<RecordBatch>>>,
}
impl DynamicRecordBatchStream {
pub fn new(schema: Arc<Schema>, receiver: Arc<tokio::sync::Mutex<Receiver<RecordBatch>>>) -> Self {
Self { schema, receiver }
}
}
impl Stream for DynamicRecordBatchStream {
type Item = Result<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut receiver = self.receiver.try_lock().unwrap();
match receiver.poll_recv(cx) {
Poll::Ready(Some(batch)) => Poll::Ready(Some(Ok(batch))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
impl RecordBatchStream for DynamicRecordBatchStream {
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
// get_schema()
}
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
For example, I can use the following
exprin the context ofUnboundded streaming data:But how can I use it followed by
partition_bylike this:Beta Was this translation helpful? Give feedback.
All reactions