@@ -76,12 +76,22 @@ impl Future for FutureExecutionState {
76
76
}
77
77
}
78
78
79
+ #[ derive( thiserror:: Error , Debug , PartialOrd , PartialEq ) ]
80
+ pub enum ExecutionStateErrors {
81
+ #[ error( "the execution of this graph has reached a fixed point and will not continue without outside influence" ) ]
82
+ NoFurtherExecutionDetected ,
83
+ #[ error( "an unexpected error has occurred during the evaluation of state {0}" ) ]
84
+ CellExecutionUnexpectedFailure ( ExecutionNodeId , String ) ,
85
+ #[ error( "unknown execution state error" ) ]
86
+ Unknown ( String ) ,
87
+ }
88
+
79
89
#[ derive( Clone ) ]
80
90
pub enum ExecutionStateEvaluation {
81
91
/// An exception was thrown
82
- Error ,
92
+ Error ( ExecutionNodeId ) ,
83
93
/// An eval function indicated that we should return
84
- EvalFailure ,
94
+ EvalFailure ( ExecutionNodeId ) ,
85
95
/// Execution complete
86
96
Complete ( ExecutionState ) ,
87
97
/// Execution in progress
@@ -94,17 +104,17 @@ impl ExecutionStateEvaluation {
94
104
95
105
ExecutionStateEvaluation :: Complete ( ref state) => state. state_get ( operation_id) ,
96
106
ExecutionStateEvaluation :: Executing ( ..) => unreachable ! ( "Cannot get state from a future state" ) ,
97
- ExecutionStateEvaluation :: Error => unreachable ! ( "Cannot get state from a future state" ) ,
98
- ExecutionStateEvaluation :: EvalFailure => unreachable ! ( "Cannot get state from a future state" ) ,
107
+ ExecutionStateEvaluation :: Error ( _ ) => unreachable ! ( "Cannot get state from a future state" ) ,
108
+ ExecutionStateEvaluation :: EvalFailure ( _ ) => unreachable ! ( "Cannot get state from a future state" ) ,
99
109
}
100
110
}
101
111
102
112
pub fn state_get_value ( & self , operation_id : & OperationId ) -> Option < & RkyvSerializedValue > {
103
113
match self {
104
114
ExecutionStateEvaluation :: Complete ( ref state) => state. state_get ( operation_id) . map ( |o| & o. output ) ,
105
115
ExecutionStateEvaluation :: Executing ( ..) => unreachable ! ( "Cannot get state from a future state" ) ,
106
- ExecutionStateEvaluation :: Error => unreachable ! ( "Cannot get state from a future state" ) ,
107
- ExecutionStateEvaluation :: EvalFailure => unreachable ! ( "Cannot get state from a future state" ) ,
116
+ ExecutionStateEvaluation :: Error ( _ ) => unreachable ! ( "Cannot get state from a future state" ) ,
117
+ ExecutionStateEvaluation :: EvalFailure ( _ ) => unreachable ! ( "Cannot get state from a future state" ) ,
108
118
}
109
119
}
110
120
}
@@ -114,8 +124,8 @@ impl Debug for ExecutionStateEvaluation {
114
124
match self {
115
125
ExecutionStateEvaluation :: Complete ( ref state) => f. debug_tuple ( "Complete" ) . field ( state) . finish ( ) ,
116
126
ExecutionStateEvaluation :: Executing ( ..) => f. debug_tuple ( "Executing" ) . field ( & format ! ( "Future state evaluating" ) ) . finish ( ) ,
117
- ExecutionStateEvaluation :: Error => unreachable ! ( "Cannot get state from a future state" ) ,
118
- ExecutionStateEvaluation :: EvalFailure => unreachable ! ( "Cannot get state from a future state" ) ,
127
+ ExecutionStateEvaluation :: Error ( _ ) => unreachable ! ( "Cannot get state from a future state" ) ,
128
+ ExecutionStateEvaluation :: EvalFailure ( _ ) => unreachable ! ( "Cannot get state from a future state" ) ,
119
129
}
120
130
}
121
131
}
@@ -652,14 +662,15 @@ impl ExecutionState {
652
662
/// Receiver that we pass to the exec for it to capture oneshot RPC communication
653
663
let exec = op. operation . deref ( ) ;
654
664
let result = exec ( & before_execution_state, payload, None , None ) . await ?;
655
- let mut after_execution_state = state. clone_with_new_id ( ) ;
665
+
666
+ // Add result into a new execution state
667
+ let mut after_execution_state = state. clone ( ) ;
656
668
after_execution_state. stack . pop_back ( ) ;
657
669
after_execution_state. state_insert ( usize:: MAX , result. clone ( ) ) ;
658
670
after_execution_state. fresh_values . insert ( usize:: MAX ) ;
659
671
660
- // TODO: Add result into a new execution state
661
672
662
- // TODO: capture the value of the output
673
+ // Capture the value of the output
663
674
if let Some ( graph_sender) = self . graph_sender . as_ref ( ) {
664
675
let s = graph_sender. clone ( ) ;
665
676
let result = pause_future_with_oneshot ( ExecutionStateEvaluation :: Complete ( after_execution_state. clone ( ) ) , & s) . await ;
@@ -778,16 +789,12 @@ impl ExecutionState {
778
789
let dependency_graph = self . get_dependency_graph ( ) ;
779
790
780
791
// If none of the inputs are more fresh than our own operation freshness, skip this node
781
- // TODO: order appears to matter here, and it shouldn't
782
792
if !signature. is_empty ( ) {
783
793
let our_freshness = self . value_freshness_map . get ( & next_operation_id) . copied ( ) . unwrap_or ( 0 ) ;
784
- dbg ! ( ( & next_operation_id, & our_freshness) ) ;
785
- dbg ! ( signature) ;
786
794
let any_more_fresh = dependency_graph
787
795
. edges_directed ( next_operation_id, Direction :: Incoming )
788
796
. any ( |( from, _, _) | {
789
797
let their_freshness = self . value_freshness_map . get ( & from) . copied ( ) . unwrap_or ( 0 ) ;
790
- dbg ! ( ( from, their_freshness) ) ;
791
798
their_freshness >= our_freshness
792
799
} ) ;
793
800
if !any_more_fresh {
@@ -799,7 +806,7 @@ impl ExecutionState {
799
806
let inputs = self . prepare_operation_inputs ( signature, next_operation_id, dependency_graph) ?;
800
807
801
808
if !signature. check_input_against_signature ( & inputs) {
802
- println ! ( "Signature validation failed continuing" ) ;
809
+ println ! ( "Signature validation failed, continuing" ) ;
803
810
return Ok ( None ) ;
804
811
}
805
812
0 commit comments