Skip to content

Commit

Permalink
Added Monitor event
Browse files Browse the repository at this point in the history
  • Loading branch information
PascalToninWit committed Dec 18, 2018
1 parent 4526e77 commit 1b53f70
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
22 changes: 19 additions & 3 deletions TMQTTClient/MQTT.pas
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ TMQTTClient = class(TObject)
FPingRespEvent: TPingRespEvent;
FSubAckEvent: TSubAckEvent;
FUnSubAckEvent: TUnSubAckEvent;
FOnMonitorEvent: TMonitorEvent;

FCritical: TRTLCriticalSection;
FMessageQueue: TQueue;
Expand All @@ -137,6 +138,7 @@ TMQTTClient = class(TObject)
procedure OnRTPublish(Sender: TObject; topic, payload: ansistring;
retain: boolean);
procedure OnRTTerminate(Sender: TObject);
procedure Monitor(const TheIsRead, TheIsError: boolean; TheArgs: array of const);

public
function isConnected: boolean;
Expand All @@ -160,6 +162,7 @@ TMQTTClient = class(TObject)
property OnPingResp: TPingRespEvent read FPingRespEvent write FPingRespEvent;
property OnSubAck: TSubAckEvent read FSubAckEvent write FSubAckEvent;
property OnUnSubAck: TUnSubAckEvent read FUnSubAckEvent write FUnSubAckEvent;
property OnMonitorEvent: TMonitorEvent read FOnMonitorEvent write FOnMonitorEvent;
end;

// Message Component Build helpers
Expand Down Expand Up @@ -237,6 +240,7 @@ procedure TMQTTClient.Connect;
FReadThread.OnPingResp := @OnRTPingResp;
FReadThread.OnSubAck := @OnRTSubAck;
FReadThread.OnTerminate := @OnRTTerminate;
FReadThread.OnMonitor := OnMonitorEvent;
FReadThread.Start;
FReaderThreadRunning := True;
end;
Expand All @@ -251,7 +255,8 @@ function TMQTTClient.Disconnect: boolean;
var
Data: TBytes;
begin
writeln('TMQTTClient.Disconnect');
// writeln('TMQTTClient.Disconnect');
Monitor(false, false, ['Disconnect']);
Result := False;

SetLength(Data, 2);
Expand Down Expand Up @@ -286,7 +291,8 @@ function TMQTTClient.Disconnect: boolean;
------------------------------------------------------------------------------*}
procedure TMQTTClient.ForceDisconnect;
begin
writeln('TMQTTClient.ForceDisconnect');
// writeln('TMQTTClient.ForceDisconnect');
Monitor(false, false, ['ForceDisconnect']);
if FReadThread <> nil then
begin
FReadThread.OnTerminate := nil;
Expand All @@ -311,7 +317,17 @@ procedure TMQTTClient.OnRTTerminate(Sender: TObject);
FReadThread := nil;
FReaderThreadRunning := False;
FisConnected := False;
WriteLn('TMQTTClient.OnRTTerminate: Thread.Terminated.');
// WriteLn('TMQTTClient.OnRTTerminate: Thread.Terminated.');
Monitor(false, false, ['Thread.Terminated']);
end;

{*------------------------------------------------------------------------------
Raise a monitor event.
------------------------------------------------------------------------------*}
procedure TMQTTClient.Monitor(const TheIsRead, TheIsError: boolean; TheArgs: array of const);
begin
if Assigned(FOnMonitorEvent) then FOnMonitorEvent(TheIsRead,TheIsError,TheArgs);

end;

{*------------------------------------------------------------------------------
Expand Down
25 changes: 20 additions & 5 deletions TMQTTClient/MQTTReadThread.pas
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ TMQTTMessage = record
TSubAckEvent = procedure(Sender: TObject; MessageID: integer;
GrantedQoS: integer) of object;
TUnSubAckEvent = procedure(Sender: TObject; MessageID: integer) of object;
TMonitorEvent = procedure(const TheIsRead, TheIsError: boolean; TheArgs: array of const) of object;

TMQTTReadThread = class(TThread)
private
Expand All @@ -82,6 +83,7 @@ TMQTTReadThread = class(TThread)
FPingRespEvent: TPingRespEvent;
FSubAckEvent: TSubAckEvent;
FUnSubAckEvent: TUnSubAckEvent;
FOnMonitorEvent: TMonitorEvent;

// Takes a 2 Byte Length array and returns the length of the ansistring it preceeds as per the spec.
function BytesToStrLength(LengthBytes: TBytes): integer;
Expand All @@ -102,6 +104,8 @@ TMQTTReadThread = class(TThread)
property OnPingResp: TPingRespEvent read FPingRespEvent write FPingRespEvent;
property OnSubAck: TSubAckEvent read FSubAckEvent write FSubAckEvent;
property OnUnSubAck: TUnSubAckEvent read FUnSubAckEvent write FUnSubAckEvent;
property OnMonitor: TMonitorEvent read FOnMonitorEvent write FOnMonitorEvent;
procedure Monitor(const TheIsRead, TheIsError: boolean; TheArgs: array of const);
end;

implementation
Expand Down Expand Up @@ -134,6 +138,12 @@ constructor TMQTTReadThread.Create(HostName: ansistring; Port: integer);
FreeOnTerminate := True;
end;

procedure TMQTTReadThread.Monitor(const TheIsRead, TheIsError: boolean; TheArgs: array of const);
begin
if Assigned(FOnMonitorEvent) then FOnMonitorEvent(TheIsRead,TheIsError,TheArgs);

end;

procedure TMQTTReadThread.Execute;
var
rxState: TRxStates;
Expand Down Expand Up @@ -172,19 +182,24 @@ procedure TMQTTReadThread.Execute;
RL := RemainingLength(Length(VH) + Length(Payload));
Data := BuildCommand(FH, RL, VH, Payload);

writeln('RX_START: ', FPSocket.LastErrorDesc);
writeln('RX_START: ', FPSocket.LastError);
//writeln('RX_START: ', FPSocket.LastErrorDesc);
Monitor(true, Length(FPSocket.LastErrorDesc) > 0, ['RX_START: ', FPSocket.LastErrorDesc]);
//writeln('RX_START: ', FPSocket.LastError);
Monitor(true, FPSocket.LastError > 0, ['RX_START: ', FPSocket.LastError]);

//sleep(1);

// Send CONNECT message
while not self.Terminated do
begin
writeln('loop...');
// writeln('loop...');
Monitor(false, false, ['loop']);
SocketWrite(Data);
error := FPSocket.LastError;
writeln('RX_START: ', FPSocket.LastErrorDesc);
writeln('RX_START: ', error);
//writeln('RX_START: ', FPSocket.LastErrorDesc);
Monitor(true, Length(FPSocket.LastErrorDesc) > 0, ['RX_START: ', FPSocket.LastErrorDesc]);
//writeln('RX_START: ', error);
Monitor(true, error > 0, ['RX_START: ', error]);
if error = 0 then
begin
rxState := RX_FIXED_HEADER;
Expand Down

0 comments on commit 1b53f70

Please sign in to comment.