2323import org .apache .nifi .controller .ProcessorNode ;
2424import org .slf4j .Logger ;
2525import org .slf4j .LoggerFactory ;
26+ import java .util .function .BiConsumer ;
2627
2728
2829import static org .apache .nifi .c2 .protocol .api .C2OperationState .OperationState .FULLY_APPLIED ;
@@ -40,33 +41,37 @@ public DefaultProcessorStateStrategy(FlowController flowController) {
4041
4142 @ Override
4243 public OperationState startProcessor (String processorId ) {
43- return changeState (processorId , true );
44+ return changeState (processorId , this :: start );
4445 }
4546
4647 @ Override
4748 public OperationState stopProcessor (String processorId ) {
48- return changeState (processorId , false );
49+ return changeState (processorId , this :: stop );
4950 }
5051
51- private OperationState changeState (String processorId , boolean start ) {
52+ private OperationState changeState (String processorId , BiConsumer < String , String > action ) {
5253 try {
5354 ProcessorNode node = flowController .getFlowManager ().getProcessorNode (processorId );
5455 if (node == null ) {
5556 LOGGER .warn ("Processor with id {} not found" , processorId );
5657 return NOT_APPLIED ;
5758 }
5859 String parentGroupId = node .getProcessGroupIdentifier ();
59- if (start ) {
60- flowController .startProcessor (parentGroupId , processorId , true );
61- LOGGER .info ("Started processor {} (group={})" , processorId , parentGroupId );
62- } else {
63- flowController .stopProcessor (parentGroupId , processorId );
64- LOGGER .info ("Stopped processor {} (group={})" , processorId , parentGroupId );
65- }
60+ action .accept (processorId , parentGroupId );
6661 return FULLY_APPLIED ;
6762 } catch (Exception e ) {
6863 LOGGER .error ("Failed to change state for processor {}" , processorId , e );
6964 return NOT_APPLIED ;
7065 }
7166 }
67+
68+ private void start (String processorId , String parentGroupId ) {
69+ flowController .startProcessor (parentGroupId , processorId , true );
70+ LOGGER .info ("Started processor {} (group={})" , processorId , parentGroupId );
71+ }
72+
73+ private void stop (String processorId , String parentGroupId ) {
74+ flowController .stopProcessor (parentGroupId , processorId );
75+ LOGGER .info ("Stopped processor {} (group={})" , processorId , parentGroupId );
76+ }
7277}
0 commit comments