From 1b53f70334e3fe53ad820d7e58bc484ad21040ac Mon Sep 17 00:00:00 2001 From: Pascal TONIN Date: Tue, 18 Dec 2018 11:32:33 +0100 Subject: [PATCH] Added Monitor event --- TMQTTClient/MQTT.pas | 22 +++++++++++++++++++--- TMQTTClient/MQTTReadThread.pas | 25 ++++++++++++++++++++----- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/TMQTTClient/MQTT.pas b/TMQTTClient/MQTT.pas index 4e6a008..4b959ae 100644 --- a/TMQTTClient/MQTT.pas +++ b/TMQTTClient/MQTT.pas @@ -115,6 +115,7 @@ TMQTTClient = class(TObject) FPingRespEvent: TPingRespEvent; FSubAckEvent: TSubAckEvent; FUnSubAckEvent: TUnSubAckEvent; + FOnMonitorEvent: TMonitorEvent; FCritical: TRTLCriticalSection; FMessageQueue: TQueue; @@ -137,6 +138,7 @@ TMQTTClient = class(TObject) procedure OnRTPublish(Sender: TObject; topic, payload: ansistring; retain: boolean); procedure OnRTTerminate(Sender: TObject); + procedure Monitor(const TheIsRead, TheIsError: boolean; TheArgs: array of const); public function isConnected: boolean; @@ -160,6 +162,7 @@ TMQTTClient = class(TObject) property OnPingResp: TPingRespEvent read FPingRespEvent write FPingRespEvent; property OnSubAck: TSubAckEvent read FSubAckEvent write FSubAckEvent; property OnUnSubAck: TUnSubAckEvent read FUnSubAckEvent write FUnSubAckEvent; + property OnMonitorEvent: TMonitorEvent read FOnMonitorEvent write FOnMonitorEvent; end; // Message Component Build helpers @@ -237,6 +240,7 @@ procedure TMQTTClient.Connect; FReadThread.OnPingResp := @OnRTPingResp; FReadThread.OnSubAck := @OnRTSubAck; FReadThread.OnTerminate := @OnRTTerminate; + FReadThread.OnMonitor := OnMonitorEvent; FReadThread.Start; FReaderThreadRunning := True; end; @@ -251,7 +255,8 @@ function TMQTTClient.Disconnect: boolean; var Data: TBytes; begin - writeln('TMQTTClient.Disconnect'); + // writeln('TMQTTClient.Disconnect'); + Monitor(false, false, ['Disconnect']); Result := False; SetLength(Data, 2); @@ -286,7 +291,8 @@ function TMQTTClient.Disconnect: boolean; ------------------------------------------------------------------------------*} procedure TMQTTClient.ForceDisconnect; begin - writeln('TMQTTClient.ForceDisconnect'); + // writeln('TMQTTClient.ForceDisconnect'); + Monitor(false, false, ['ForceDisconnect']); if FReadThread <> nil then begin FReadThread.OnTerminate := nil; @@ -311,7 +317,17 @@ procedure TMQTTClient.OnRTTerminate(Sender: TObject); FReadThread := nil; FReaderThreadRunning := False; FisConnected := False; - WriteLn('TMQTTClient.OnRTTerminate: Thread.Terminated.'); + // WriteLn('TMQTTClient.OnRTTerminate: Thread.Terminated.'); + Monitor(false, false, ['Thread.Terminated']); +end; + +{*------------------------------------------------------------------------------ + Raise a monitor event. +------------------------------------------------------------------------------*} +procedure TMQTTClient.Monitor(const TheIsRead, TheIsError: boolean; TheArgs: array of const); +begin + if Assigned(FOnMonitorEvent) then FOnMonitorEvent(TheIsRead,TheIsError,TheArgs); + end; {*------------------------------------------------------------------------------ diff --git a/TMQTTClient/MQTTReadThread.pas b/TMQTTClient/MQTTReadThread.pas index e4fde70..2eda1ba 100644 --- a/TMQTTClient/MQTTReadThread.pas +++ b/TMQTTClient/MQTTReadThread.pas @@ -68,6 +68,7 @@ TMQTTMessage = record TSubAckEvent = procedure(Sender: TObject; MessageID: integer; GrantedQoS: integer) of object; TUnSubAckEvent = procedure(Sender: TObject; MessageID: integer) of object; + TMonitorEvent = procedure(const TheIsRead, TheIsError: boolean; TheArgs: array of const) of object; TMQTTReadThread = class(TThread) private @@ -82,6 +83,7 @@ TMQTTReadThread = class(TThread) FPingRespEvent: TPingRespEvent; FSubAckEvent: TSubAckEvent; FUnSubAckEvent: TUnSubAckEvent; + FOnMonitorEvent: TMonitorEvent; // Takes a 2 Byte Length array and returns the length of the ansistring it preceeds as per the spec. function BytesToStrLength(LengthBytes: TBytes): integer; @@ -102,6 +104,8 @@ TMQTTReadThread = class(TThread) property OnPingResp: TPingRespEvent read FPingRespEvent write FPingRespEvent; property OnSubAck: TSubAckEvent read FSubAckEvent write FSubAckEvent; property OnUnSubAck: TUnSubAckEvent read FUnSubAckEvent write FUnSubAckEvent; + property OnMonitor: TMonitorEvent read FOnMonitorEvent write FOnMonitorEvent; + procedure Monitor(const TheIsRead, TheIsError: boolean; TheArgs: array of const); end; implementation @@ -134,6 +138,12 @@ constructor TMQTTReadThread.Create(HostName: ansistring; Port: integer); FreeOnTerminate := True; end; +procedure TMQTTReadThread.Monitor(const TheIsRead, TheIsError: boolean; TheArgs: array of const); +begin + if Assigned(FOnMonitorEvent) then FOnMonitorEvent(TheIsRead,TheIsError,TheArgs); + +end; + procedure TMQTTReadThread.Execute; var rxState: TRxStates; @@ -172,19 +182,24 @@ procedure TMQTTReadThread.Execute; RL := RemainingLength(Length(VH) + Length(Payload)); Data := BuildCommand(FH, RL, VH, Payload); - writeln('RX_START: ', FPSocket.LastErrorDesc); - writeln('RX_START: ', FPSocket.LastError); + //writeln('RX_START: ', FPSocket.LastErrorDesc); + Monitor(true, Length(FPSocket.LastErrorDesc) > 0, ['RX_START: ', FPSocket.LastErrorDesc]); + //writeln('RX_START: ', FPSocket.LastError); + Monitor(true, FPSocket.LastError > 0, ['RX_START: ', FPSocket.LastError]); //sleep(1); // Send CONNECT message while not self.Terminated do begin - writeln('loop...'); + // writeln('loop...'); + Monitor(false, false, ['loop']); SocketWrite(Data); error := FPSocket.LastError; - writeln('RX_START: ', FPSocket.LastErrorDesc); - writeln('RX_START: ', error); + //writeln('RX_START: ', FPSocket.LastErrorDesc); + Monitor(true, Length(FPSocket.LastErrorDesc) > 0, ['RX_START: ', FPSocket.LastErrorDesc]); + //writeln('RX_START: ', error); + Monitor(true, error > 0, ['RX_START: ', error]); if error = 0 then begin rxState := RX_FIXED_HEADER;