Skip to content

Commit

Permalink
Update proto to support to/from json with an extension codec (#14561)
Browse files Browse the repository at this point in the history
  • Loading branch information
Omega359 authored Feb 9, 2025
1 parent df0d966 commit 9c12919
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,7 @@ pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result<Bytes> {
#[cfg(feature = "json")]
pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result<String> {
let extension_codec = DefaultLogicalExtensionCodec {};
let protobuf =
protobuf::LogicalPlanNode::try_from_logical_plan(plan, &extension_codec)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
serde_json::to_string(&protobuf)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
logical_plan_to_json_with_extension_codec(plan, &extension_codec)
}

/// Serialize a LogicalPlan as bytes, using the provided extension codec
Expand All @@ -220,13 +216,24 @@ pub fn logical_plan_to_bytes_with_extension_codec(
Ok(buffer.into())
}

/// Serialize a LogicalPlan as JSON using the provided extension codec
#[cfg(feature = "json")]
pub fn logical_plan_to_json_with_extension_codec(
plan: &LogicalPlan,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<String> {
let protobuf =
protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
serde_json::to_string(&protobuf)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))
}

/// Deserialize a LogicalPlan from JSON
#[cfg(feature = "json")]
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultLogicalExtensionCodec {};
back.try_into_logical_plan(ctx, &extension_codec)
logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from bytes
Expand All @@ -249,6 +256,18 @@ pub fn logical_plan_from_bytes_with_extension_codec(
protobuf.try_into_logical_plan(ctx, extension_codec)
}

/// Deserialize a LogicalPlan from JSON
#[cfg(feature = "json")]
pub fn logical_plan_from_json_with_extension_codec(
json: &str,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?;
back.try_into_logical_plan(ctx, extension_codec)
}

/// Serialize a PhysicalPlan as bytes
pub fn physical_plan_to_bytes(plan: Arc<dyn ExecutionPlan>) -> Result<Bytes> {
let extension_codec = DefaultPhysicalExtensionCodec {};
Expand Down

0 comments on commit 9c12919

Please sign in to comment.