diff --git a/TMQTTClient/MQTT.pas b/TMQTTClient/MQTT.pas index ab32ab6..4e6a008 100644 --- a/TMQTTClient/MQTT.pas +++ b/TMQTTClient/MQTT.pas @@ -39,12 +39,12 @@ interface SysUtils, blcksock, contnrs, MQTTReadThread; type - // Message type. 4 Bit unsigned. TMQTTMessageType = ( Reserved0, // 0 Reserved CONNECT, // 1 Client request to connect to Broker CONNACK, // 2 Connect Acknowledgment + // PUBLISH Control Packet is sent from a Client to a Server or from Server to a Client to transport an Application Message. PUBLISH, // 3 Publish message PUBACK, // 4 Publish Acknowledgment PUBREC, // 5 Publish Received (assured delivery part 1) @@ -63,14 +63,17 @@ interface // The message class definition TMQTTMessage = class private - FTopic: ansistring; + FTopic: ansistring; FPayload: ansistring; + FRetain: boolean; public property Topic: ansistring read FTopic; property PayLoad: ansistring read FPayload; + property Retain: boolean read FRetain; - constructor Create(const topic_: ansistring; const payload_: ansistring); + constructor Create(const topic_: ansistring; const payload_: ansistring; + const retain_: boolean); end; // The acknowledgement class definition @@ -87,9 +90,7 @@ TMQTTMessageAck = class property qos: integer read FQos; constructor Create(const messageType_: TMQTTMessageType; - const messageId_: integer; - const returnCode_: integer; - const qos_: integer); + const messageId_: integer; const returnCode_: integer; const qos_: integer); end; TRemainingLength = array of byte; @@ -97,24 +98,25 @@ TMQTTMessageAck = class PMQTTClient = ^TMQTTClient; + // Main object - MQTT client implementation TMQTTClient = class(TObject) private - FClientID: ansistring; - FHostname: ansistring; - FPort: integer; + FClientID: ansistring; + FHostname: ansistring; + FPort: integer; FReadThread: TMQTTReadThread; FSocket: TTCPBlockSocket; FMessageID: integer; FisConnected: boolean; FReaderThreadRunning: boolean; - FConnAckEvent: TConnAckEvent; - FPublishEvent: TPublishEvent; + FConnAckEvent: TConnAckEvent; + FPublishEvent: TPublishEvent; FPingRespEvent: TPingRespEvent; - FSubAckEvent: TSubAckEvent; + FSubAckEvent: TSubAckEvent; FUnSubAckEvent: TUnSubAckEvent; - FCritical: TRTLCriticalSection; + FCritical: TRTLCriticalSection; FMessageQueue: TQueue; FMessageAckQueue: TQueue; @@ -132,7 +134,8 @@ TMQTTClient = class(TObject) procedure OnRTPingResp(Sender: TObject); procedure OnRTSubAck(Sender: TObject; MessageID: integer; GrantedQoS: integer); procedure OnRTUnSubAck(Sender: TObject; MessageID: integer); - procedure OnRTPublish(Sender: TObject; topic, payload: ansistring); + procedure OnRTPublish(Sender: TObject; topic, payload: ansistring; + retain: boolean); procedure OnRTTerminate(Sender: TObject); public @@ -140,19 +143,16 @@ TMQTTClient = class(TObject) procedure Connect; function Disconnect: boolean; procedure ForceDisconnect; - function Publish(Topic: ansistring; sPayload: ansistring): boolean; - overload; - function Publish(Topic: ansistring; sPayload: ansistring; Retain: boolean): boolean; - overload; + function Publish(Topic: ansistring; sPayload: ansistring): boolean; overload; + function Publish(Topic: ansistring; sPayload: ansistring; + Retain: boolean): boolean; overload; function Subscribe(Topic: ansistring): integer; function Unsubscribe(Topic: ansistring): integer; function PingReq: boolean; function getMessage: TMQTTMessage; function getMessageAck: TMQTTMessageAck; - constructor Create(Hostname: ansistring; Port: integer); - overload; - destructor Destroy; - override; + constructor Create(Hostname: ansistring; Port: integer); overload; + destructor Destroy; override; property ClientID: ansistring read FClientID write FClientID; property OnConnAck: TConnAckEvent read FConnAckEvent write FConnAckEvent; @@ -163,9 +163,7 @@ TMQTTClient = class(TObject) end; // Message Component Build helpers -function FixedHeader(MessageType: TMQTTMessageType; Dup: word; Qos: word; Retain: word): - byte - ; +function FixedHeader(MessageType: TMQTTMessageType; Dup, Qos, Retain: byte): byte; // Variable Header per command creation funcs function VariableHeaderConnect(KeepAlive: word): TBytes; @@ -173,17 +171,15 @@ function VariableHeaderConnect(KeepAlive: word): TBytes; // Takes a ansistring and converts to An Array of Bytes preceded by 2 Length Bytes. function StrToBytes(str: ansistring; perpendLength: boolean): TUTF8Text; -procedure CopyIntoArray(var DestArray: array of byte; SourceArray: array of byte; StartIndex - : - integer); +procedure CopyIntoArray(var DestArray: array of byte; SourceArray: array of byte; + StartIndex: integer); // Byte Array Helper Functions procedure AppendArray(var Dest: TUTF8Text; Source: array of byte); // Helper Function - Puts the seperate component together into an Array of Bytes for transmission -function BuildCommand(FixedHead: byte; RemainL: TRemainingLength; VariableHead: TBytes; - Payload: - array of byte): TBytes; +function BuildCommand(FixedHead: byte; RemainL: TRemainingLength; + VariableHead: TBytes; Payload: array of byte): TBytes; // Calculates the Remaining Length bytes of the FixedHeader as per the spec. function RemainingLength(MessageLength: integer): TRemainingLength; @@ -191,17 +187,18 @@ function RemainingLength(MessageLength: integer): TRemainingLength; implementation -constructor TMQTTMessage.Create(const Topic_: ansistring; const Payload_: ansistring); + +constructor TMQTTMessage.Create(const topic_: ansistring; + const payload_: ansistring; const retain_: boolean); begin // Save the passed parameters - FTopic := Topic_; + FTopic := Topic_; FPayload := Payload_; + FRetain := retain_; end; constructor TMQTTMessageAck.Create(const messageType_: TMQTTMessageType; - const messageId_: integer; - const returnCode_: integer; - const qos_: integer); + const messageId_: integer; const returnCode_: integer; const qos_: integer); begin FMessageType := messageType_; FMessageId := messageId_; @@ -216,24 +213,32 @@ constructor TMQTTMessageAck.Create(const messageType_: TMQTTMessageType; ------------------------------------------------------------------------------*} procedure TMQTTClient.Connect; begin - if FReaderThreadRunning = False then begin - if FSocket = nil then begin + if FReaderThreadRunning = False then + begin + // Create and start RX thread + if FReadThread <> nil then + begin + FReadThread.OnTerminate := nil; + FreeAndNil(FReadThread); + end; + (* + if FSocket = nil then begin // Create a socket. FSocket := TTCPBlockSocket.Create; FSocket.nonBlockMode := True; // We really don't want sending on FSocket.NonblockSendTimeout := 1; // the socket to block our main thread. - // Create and start RX thread - FReadThread := TMQTTReadThread.Create(@FSocket, FHostname, FPort); - FReadThread.OnConnAck := @OnRTConnAck; - FReadThread.OnPublish := @OnRTPublish; - FReadThread.OnPublish := @OnRTPublish; - FReadThread.OnPingResp := @OnRTPingResp; - FReadThread.OnSubAck := @OnRTSubAck; - FReadThread.OnTerminate := @OnRTTerminate; - FReadThread.Start; - FReaderThreadRunning := True; - end; + *) + + FReadThread := TMQTTReadThread.Create(FHostname, FPort); + FReadThread.OnConnAck := @OnRTConnAck; + FReadThread.OnPublish := @OnRTPublish; + FReadThread.OnPublish := @OnRTPublish; + FReadThread.OnPingResp := @OnRTPingResp; + FReadThread.OnSubAck := @OnRTSubAck; + FReadThread.OnTerminate := @OnRTTerminate; + FReadThread.Start; + FReaderThreadRunning := True; end; end; @@ -243,7 +248,6 @@ procedure TMQTTClient.Connect; @return Returns whether the Data was written successfully to the socket. ------------------------------------------------------------------------------*} function TMQTTClient.Disconnect: boolean; - var Data: TBytes; begin @@ -253,13 +257,28 @@ function TMQTTClient.Disconnect: boolean; SetLength(Data, 2); Data[0] := FixedHeader(MQTT.DISCONNECT, 0, 0, 0); Data[1] := 0; - if SocketWrite(Data) then begin - Result := True; + if SocketWrite(Data) then + begin + FisConnected := False; + if FReadThread <> nil then + begin + //todo: collect all terminate code (connect, Disconnect, ForceDisconnect) to one point + FReadThread.OnTerminate := nil; + FReadThread.Terminate; + FReadThread := nil; + //todo: the probability of a hang? + //FReadThread.waitFor; + end; + (* TODO : review old fix code FReadThread.waitFor; FSocket.CloseSocket; FisConnected := False; FSocket := nil; - end else Result := False; + *) + Result := True; + end + else + Result := False; end; {*------------------------------------------------------------------------------ @@ -268,14 +287,18 @@ function TMQTTClient.Disconnect: boolean; procedure TMQTTClient.ForceDisconnect; begin writeln('TMQTTClient.ForceDisconnect'); - if FReadThread <> nil then begin + if FReadThread <> nil then + begin + FReadThread.OnTerminate := nil; FReadThread.Terminate; FReadThread := nil; end; + (* TODO : review old fix code if FSocket <> nil then begin FSocket.CloseSocket; FSocket := nil; end; + *) FisConnected := False; end; @@ -284,7 +307,11 @@ procedure TMQTTClient.ForceDisconnect; ------------------------------------------------------------------------------*} procedure TMQTTClient.OnRTTerminate(Sender: TObject); begin + //todo: on terminating - need disable this object + FReadThread := nil; FReaderThreadRunning := False; + FisConnected := False; + WriteLn('TMQTTClient.OnRTTerminate: Thread.Terminated.'); end; {*------------------------------------------------------------------------------ @@ -293,21 +320,22 @@ procedure TMQTTClient.OnRTTerminate(Sender: TObject); @return Returns whether the Data was written successfully to the socket. ------------------------------------------------------------------------------*} function TMQTTClient.PingReq: boolean; - var - FH: byte; - RL: byte; + FH: byte; + RL: byte; Data: TBytes; begin Result := False; SetLength(Data, 2); - FH := FixedHeader(MQTT.PINGREQ, 0, 0, 0); - RL := 0; + FH := FixedHeader(MQTT.PINGREQ, 0, 0, 0); + RL := 0; Data[0] := FH; Data[1] := RL; - if SocketWrite(Data) then Result := True - else Result := False; + if SocketWrite(Data) then + Result := True + else + Result := False; end; {*------------------------------------------------------------------------------ @@ -319,12 +347,11 @@ function TMQTTClient.PingReq: boolean; @return Returns whether the Data was written successfully to the socket. ------------------------------------------------------------------------------*} function TMQTTClient.Publish(Topic, sPayload: ansistring; Retain: boolean): boolean; - var - Data: TBytes; - FH: byte; - RL: TRemainingLength; - VH: TBytes; + Data: TBytes; + FH: byte; + RL: TRemainingLength; + VH: TBytes; Payload: TUTF8Text; begin Result := False; @@ -333,10 +360,12 @@ function TMQTTClient.Publish(Topic, sPayload: ansistring; Retain: boolean): bool VH := VariableHeaderPublish(Topic); SetLength(Payload, 0); AppendArray(Payload, StrToBytes(sPayload, False)); - RL := RemainingLength(Length(VH) + Length(Payload)); + RL := RemainingLength(Length(VH) + Length(Payload)); Data := BuildCommand(FH, RL, VH, Payload); - if SocketWrite(Data) then Result := True - else Result := False; + if SocketWrite(Data) then + Result := True + else + Result := False; end; {*------------------------------------------------------------------------------ @@ -359,16 +388,15 @@ function TMQTTClient.Publish(Topic, sPayload: ansistring): boolean; it to the Message ID used later in the SUBACK event handler. ------------------------------------------------------------------------------*} function TMQTTClient.Subscribe(Topic: ansistring): integer; - var - Data: TBytes; - FH: byte; - RL: TRemainingLength; - VH: TBytes; + Data: TBytes; + FH: byte; + RL: TRemainingLength; + VH: TBytes; Payload: TUTF8Text; begin - FH := FixedHeader(MQTT.SUBSCRIBE, 0, 1, 0); - VH := VariableHeaderSubscribe; + FH := FixedHeader(MQTT.SUBSCRIBE, 0, 1, 0); + VH := VariableHeaderSubscribe; Result := (FMessageID - 1); SetLength(Payload, 0); AppendArray(Payload, StrToBytes(Topic, True)); @@ -376,7 +404,7 @@ function TMQTTClient.Subscribe(Topic: ansistring): integer; SetLength(Payload, Length(Payload) + 1); // Always Append Requested QoS Level 0 Payload[Length(Payload) - 1] := $0; - RL := RemainingLength(Length(VH) + Length(Payload)); + RL := RemainingLength(Length(VH) + Length(Payload)); Data := BuildCommand(FH, RL, VH, Payload); SocketWrite(Data); end; @@ -389,20 +417,19 @@ function TMQTTClient.Subscribe(Topic: ansistring): integer; it to the Message ID used later in the UNSUBACK event handler. ------------------------------------------------------------------------------*} function TMQTTClient.Unsubscribe(Topic: ansistring): integer; - var - Data: TBytes; - FH: byte; - RL: TRemainingLength; - VH: TBytes; + Data: TBytes; + FH: byte; + RL: TRemainingLength; + VH: TBytes; Payload: TUTF8Text; begin - FH := FixedHeader(MQTT.UNSUBSCRIBE, 0, 0, 0); - VH := VariableHeaderUnsubscribe; + FH := FixedHeader(MQTT.UNSUBSCRIBE, 0, 0, 0); + VH := VariableHeaderUnsubscribe; Result := (FMessageID - 1); SetLength(Payload, 0); AppendArray(Payload, StrToBytes(Topic, True)); - RL := RemainingLength(Length(VH) + Length(Payload)); + RL := RemainingLength(Length(VH) + Length(Payload)); Data := BuildCommand(FH, RL, VH, Payload); SocketWrite(Data); end; @@ -431,33 +458,39 @@ constructor TMQTTClient.Create(Hostname: ansistring; Port: integer); Randomize; // Create a Default ClientID as a default. Can be overridden with TMQTTClient.ClientID any time before connection. - FClientID := 'dMQTTClient' + IntToStr(Random(1000) + 1); - FHostname := Hostname; - FPort := Port; + FClientID := 'dMQTTClient' + IntToStr(Random(1000) + 1); + FHostname := Hostname; + FPort := Port; FMessageID := 1; FReaderThreadRunning := False; InitCriticalSection(FCritical); - FMessageQueue := TQueue.Create; + FMessageQueue := TQueue.Create; FMessageAckQueue := TQueue.Create; end; destructor TMQTTClient.Destroy; begin - FSocket.Free; + if (isConnected) and (FReadThread <> nil) then + begin + FReadThread.Terminate; + FReadThread.WaitFor; + //note: free is not needed - the FreeOnTerminate mode is enabled + end; FMessageQueue.Free; FMessageAckQueue.Free; DoneCriticalSection(FCritical); inherited; end; -function FixedHeader(MessageType: TMQTTMessageType; Dup, Qos, - Retain: word): byte; +function FixedHeader(MessageType: TMQTTMessageType; Dup, Qos, Retain: byte): byte; begin - { Fixed Header Spec: - bit |7 6 5 4 | |3 | |2 1 | | 0 | - byte 1 |Message Type| |DUP flag| |QoS level| |RETAIN| } - Result := (Ord(MessageType) * 16) + (Dup * 8) + (Qos * 2) + (Retain * 1); + byte 1 bits |7 6 5 4 | 3 | 2 1 | 0 | + fields |Message Type| DUP flag | QoS level| RETAIN| +} + Result := byte(Ord(MessageType) shl 4) or (Dup shl 3) or (Qos shl 1) or + (Retain shl 0); + //todo: OLD code: Result := (Ord(MessageType) * 16) + (Dup * 8) + (Qos * 2) + (Retain * 1); end; function TMQTTClient.GetMessageID: TBytes; @@ -476,53 +509,54 @@ function TMQTTClient.GetMessageID: TBytes; end; function TMQTTClient.SocketWrite(Data: TBytes): boolean; - var sentData: integer; begin Result := False; // Returns whether the Data was successfully written to the socket. - if isConnected then begin - sentData := FSocket.SendBuffer(Pointer(Data), Length(Data)); - if sentData = Length(Data) then Result := True - else Result := False; - end; + if isConnected then + Result := FReadThread.SocketWrite(Data); end; function StrToBytes(str: ansistring; perpendLength: boolean): TUTF8Text; - var i, offset: integer; begin { This is a UTF-8 hack to give 2 Bytes of Length followed by the string itself. } - if perpendLength then begin + if perpendLength then + begin SetLength(Result, Length(str) + 2); Result[0] := Length(str) div 256; Result[1] := Length(str) mod 256; - offset := 1; - end else begin + offset := 1; + end + else + begin SetLength(Result, Length(str)); offset := -1; end; - for I := 1 to Length(str) do Result[i + offset] := Ord(str[i]); + for I := 1 to Length(str) do + Result[i + offset] := Ord(str[i]); end; function RemainingLength(MessageLength: integer): TRemainingLength; - var byteindex: integer; - digit: integer; + digit: integer; begin SetLength(Result, 1); byteindex := 0; - while (MessageLength > 0) do begin + while (MessageLength > 0) do + begin digit := MessageLength mod 128; MessageLength := MessageLength div 128; - if MessageLength > 0 then begin + if MessageLength > 0 then + begin digit := digit or $80; end; Result[byteindex] := digit; - if MessageLength > 0 then begin + if MessageLength > 0 then + begin Inc(byteindex); SetLength(Result, Length(Result) + 1); end; @@ -530,15 +564,23 @@ function RemainingLength(MessageLength: integer): TRemainingLength; end; function VariableHeaderConnect(KeepAlive: word): TBytes; - const + //todo: version update! MQIsdp->MQTT. version 4! MQTT_PROTOCOL = 'MQIsdp'; - MQTT_VERSION = 3; - + MQTT_VERSION = 3; var Qos, Retain: word; - iByteIndex: integer; - ProtoBytes: TUTF8Text; +{todo: connect flags +7 User Name Flag +6 Password Flag +5 Will Retain +4 Will QoS +3 Will QoS +2 Will Flag +1 Clean Session +0 Reserved } + iByteIndex: integer; + ProtoBytes: TUTF8Text; begin // Set the Length of our variable header array. SetLength(Result, 12); @@ -551,7 +593,7 @@ function VariableHeaderConnect(KeepAlive: word): TBytes; Result[iByteIndex] := MQTT_VERSION; Inc(iByteIndex); // Connect Flags - Qos := 0; + Qos := 0; Retain := 0; Result[iByteIndex] := 0; Result[iByteIndex] := (Retain * 32) + (Qos * 16) + (1 * 4) + (1 * 2); @@ -580,13 +622,13 @@ function TMQTTClient.VariableHeaderUnsubscribe: TBytes; Result := GetMessageID; end; -procedure CopyIntoArray(var DestArray: array of byte; - SourceArray: array of byte; +procedure CopyIntoArray(var DestArray: array of byte; SourceArray: array of byte; StartIndex: integer); begin Assert(StartIndex >= 0); // WARNING! move causes range check error if source length is zero. - if Length(SourceArray) > 0 then Move(SourceArray[0], DestArray[StartIndex], Length(SourceArray)); + if Length(SourceArray) > 0 then + Move(SourceArray[0], DestArray[StartIndex], Length(SourceArray)); end; procedure AppendArray(var Dest: TUTF8Text; Source: array of byte); @@ -595,7 +637,8 @@ procedure AppendArray(var Dest: TUTF8Text; Source: array of byte); DestLen: integer; begin // WARNING: move causes range check error if source length is zero! - if Length(Source) > 0 then begin + if Length(Source) > 0 then + begin DestLen := Length(Dest); SetLength(Dest, DestLen + Length(Source)); Move(Source, Dest[DestLen], Length(Source)); @@ -604,7 +647,6 @@ procedure AppendArray(var Dest: TUTF8Text; Source: array of byte); function BuildCommand(FixedHead: byte; RemainL: TRemainingLength; VariableHead: TBytes; Payload: array of byte): TBytes; - var iNextIndex: integer; begin @@ -631,12 +673,16 @@ function BuildCommand(FixedHead: byte; RemainL: TRemainingLength; procedure TMQTTClient.OnRTConnAck(Sender: TObject; ReturnCode: integer); begin - if ReturnCode = 0 then begin + if ReturnCode = 0 then + begin FisConnected := True; end; - if Assigned(OnConnAck) then begin + if Assigned(OnConnAck) then + begin OnConnAck(Self, ReturnCode); - end else begin + end + else + begin // Protected code. EnterCriticalSection(FCritical); try @@ -649,9 +695,12 @@ procedure TMQTTClient.OnRTConnAck(Sender: TObject; ReturnCode: integer); procedure TMQTTClient.OnRTPingResp(Sender: TObject); begin - if Assigned(OnPingResp) then begin + if Assigned(OnPingResp) then + begin OnPingResp(Self); - end else begin + end + else + begin // Protected code. EnterCriticalSection(FCritical); try @@ -662,30 +711,39 @@ procedure TMQTTClient.OnRTPingResp(Sender: TObject); end; end; -procedure TMQTTClient.OnRTPublish(Sender: TObject; topic, payload: ansistring); +procedure TMQTTClient.OnRTPublish(Sender: TObject; topic, payload: ansistring; + retain: boolean); begin - if Assigned(OnPublish) then begin - OnPublish(Self, topic, payload); - end else begin + if Assigned(OnPublish) then + begin + OnPublish(Self, topic, payload, retain); + end + else + begin // Protected code. EnterCriticalSection(FCritical); try - FMessageQueue.Push(TMQTTMessage.Create(topic, payload)); + FMessageQueue.Push(TMQTTMessage.Create(topic, payload, retain)); finally LeaveCriticalSection(FCritical); end; end; end; -procedure TMQTTClient.OnRTSubAck(Sender: TObject; MessageID: integer; GrantedQoS: integer); +procedure TMQTTClient.OnRTSubAck(Sender: TObject; MessageID: integer; + GrantedQoS: integer); begin - if Assigned(OnSubAck) then begin + if Assigned(OnSubAck) then + begin OnSubAck(Self, MessageID, GrantedQoS); - end else begin + end + else + begin // Protected code. EnterCriticalSection(FCritical); try - FMessageAckQueue.Push(TMQTTMessageAck.Create(SUBACK, MessageID, 0, GrantedQos)); + FMessageAckQueue.Push(TMQTTMessageAck.Create(SUBACK, MessageID, + 0, GrantedQos)); finally LeaveCriticalSection(FCritical); end; @@ -694,9 +752,12 @@ procedure TMQTTClient.OnRTSubAck(Sender: TObject; MessageID: integer; GrantedQoS procedure TMQTTClient.OnRTUnSubAck(Sender: TObject; MessageID: integer); begin - if Assigned(OnUnSubAck) then begin + if Assigned(OnUnSubAck) then + begin OnUnSubAck(Self, MessageID); - end else begin + end + else + begin // Protected code. EnterCriticalSection(FCritical); try diff --git a/TMQTTClient/MQTTReadThread.pas b/TMQTTClient/MQTTReadThread.pas index ba12d90..e4fde70 100644 --- a/TMQTTClient/MQTTReadThread.pas +++ b/TMQTTClient/MQTTReadThread.pas @@ -33,46 +33,48 @@ interface uses - SysUtils, Classes, + SysUtils, Classes, blcksock, {$IFDEF LINUX} -BaseUnix + BaseUnix, {$ENDIF} - blcksock; + synsock; {$IFNDEF LINUX} { TODO : Find value ESysETIMEDOUT for windows } const ESysETIMEDOUT = 110; // http://www.chiark.greenend.org.uk/doc/fp-docs/2.6.4/rtl/baseunix/esysetimedout.html {$ENDIF} -type TBytes = array of byte; +type + TBytes = array of byte; type TMQTTMessage = record FixedHeader: byte; - RL: TBytes; + RL: TBytes; Data: TBytes; end; -type TRxStates = (RX_START, RX_FIXED_HEADER, RX_LENGTH, RX_DATA, RX_ERROR); - - PTCPBlockSocket = ^TTCPBlockSocket; +type + TRxStates = (RX_START, RX_FIXED_HEADER, RX_LENGTH, RX_DATA, RX_ERROR); TRemainingLength = array of byte; TUTF8Text = array of byte; TConnAckEvent = procedure(Sender: TObject; ReturnCode: integer) of object; - TPublishEvent = procedure(Sender: TObject; topic, payload: ansistring) of object; + TPublishEvent = procedure(Sender: TObject; topic, payload: ansistring; + retain: boolean) of object; TPingRespEvent = procedure(Sender: TObject) of object; - TSubAckEvent = procedure(Sender: TObject; MessageID: integer; GrantedQoS: integer) of object; + TSubAckEvent = procedure(Sender: TObject; MessageID: integer; + GrantedQoS: integer) of object; TUnSubAckEvent = procedure(Sender: TObject; MessageID: integer) of object; TMQTTReadThread = class(TThread) private FClientID: ansistring; FHostname: ansistring; - FPort: integer; - FPSocket: PTCPBlockSocket; + FPort: integer; + //FPSocket: PTCPBlockSocket; CurrentMessage: TMQTTMessage; // Events FConnAckEvent: TConnAckEvent; @@ -86,14 +88,15 @@ TMQTTReadThread = class(TThread) // This is our data processing and event firing command. procedure HandleData; - - function SocketWrite(Data: TBytes): boolean; - protected procedure Execute; override; public - constructor Create(Socket: PTCPBlockSocket; Hostname: ansistring; Port: integer); + //todo: change Socket resurse working + FPSocket: TTCPBlockSocket; + function SocketWrite(Data: TBytes): boolean; + + constructor Create(Hostname: ansistring; Port: integer); property OnConnAck: TConnAckEvent read FConnAckEvent write FConnAckEvent; property OnPublish: TPublishEvent read FPublishEvent write FPublishEvent; property OnPingResp: TPingRespEvent read FPingRespEvent write FPingRespEvent; @@ -106,192 +109,258 @@ implementation uses MQTT; -{ TMQTTReadThread } +procedure SetBit(var Value: byte; const Index: byte; const State: boolean); inline; +begin + Value := (Value and ((byte(1) shl Index) xor High(byte))) or + (byte(State) shl Index); +end; -constructor TMQTTReadThread.Create(Socket: PTCPBlockSocket; HostName: ansistring; Port: integer); +function GetBit(const Value: byte; const Index: byte): boolean; inline; +begin + Result := ((Value shr Index) and 1) = 1; +end; + +{ TMQTTReadThread } +constructor TMQTTReadThread.Create(HostName: ansistring; Port: integer); begin inherited Create(True); + // Create a Default ClientID as a default. Can be overridden with TMQTTClient.ClientID any time before connection. FClientID := 'dMQTTClientx' + IntToStr(Random(1000) + 1); - FPSocket := Socket; + //FPSocket := Socket; FHostname := Hostname; - FPort := Port; + FPort := Port; FreeOnTerminate := True; end; procedure TMQTTReadThread.Execute; - var rxState: TRxStates; remainingLengthx: integer; - digit: integer; + digit: integer; multiplier: integer; - Data: TBytes; - RL: TRemainingLength; - VH: TBytes; - FH: byte; + Data: TBytes; + RL: TRemainingLength; + VH: TBytes; + FH: byte; Payload: TUTF8Text; - error: integer; + error: integer; begin rxState := RX_START; - - while not self.Terminated do begin - case rxState of - RX_START: begin - // Make the socket connection - FPSocket^.Connect(FHostname, IntToStr(FPort)); - - // Build CONNECT message - FH := FixedHeader(MQTT.CONNECT, 0, 0, 0); - VH := VariableHeaderConnect(40); - SetLength(Payload, 0); - AppendArray(Payload, StrToBytes(FClientID, True)); - AppendArray(Payload, StrToBytes('lwt', True)); - AppendArray(Payload, StrToBytes(FClientID + ' died', True)); - RL := RemainingLength(Length(VH) + Length(Payload)); - Data := BuildCommand(FH, RL, VH, Payload); - - writeln('RX_START: ', FPSocket^.LastErrorDesc); - writeln('RX_START: ', FPSocket^.LastError); - - //sleep(1); - - // Send CONNECT message - while True do begin - writeln('loop...'); - SocketWrite(Data); - error := FPSocket^.LastError; - writeln('RX_START: ', FPSocket^.LastErrorDesc); - writeln('RX_START: ', error); - if error = 0 then begin - rxState := RX_FIXED_HEADER; - break; - end else begin - if error = 110 then begin - continue; + try + // Create a socket. + FPSocket := TTCPBlockSocket.Create; + FPSocket.nonBlockMode := True; // We really don't want sending on + FPSocket.NonblockSendTimeout := 1; + // the socket to block our main thread. + while not self.Terminated do + begin + case rxState of + RX_START: + begin + // Make the socket connection + FPSocket.Connect(FHostname, IntToStr(FPort)); + + // Build CONNECT message + FH := FixedHeader(MQTT.CONNECT, 0, 0, 0); + VH := VariableHeaderConnect(40); + SetLength(Payload, 0); + AppendArray(Payload, StrToBytes(FClientID, True)); + AppendArray(Payload, StrToBytes('lwt', True)); + AppendArray(Payload, StrToBytes(FClientID + ' died', True)); + RL := RemainingLength(Length(VH) + Length(Payload)); + Data := BuildCommand(FH, RL, VH, Payload); + + writeln('RX_START: ', FPSocket.LastErrorDesc); + writeln('RX_START: ', FPSocket.LastError); + + //sleep(1); + + // Send CONNECT message + while not self.Terminated do + begin + writeln('loop...'); + SocketWrite(Data); + error := FPSocket.LastError; + writeln('RX_START: ', FPSocket.LastErrorDesc); + writeln('RX_START: ', error); + if error = 0 then + begin + rxState := RX_FIXED_HEADER; + break; + end + else + begin + if error = 110 then + begin + continue; + end; + rxState := RX_ERROR; + break; end; - rxState := RX_ERROR; - break; end; end; - end; - RX_FIXED_HEADER: begin - multiplier := 1; - remainingLengthx := 0; - CurrentMessage.Data := nil; - - CurrentMessage.FixedHeader := FPSocket^.RecvByte(1000); - if (FPSocket^.LastError = ESysETIMEDOUT) then continue; - if (FPSocket^.LastError <> 0) then rxState := RX_ERROR - else - rxState := RX_LENGTH; - end; - RX_LENGTH: begin - digit := FPSocket^.RecvByte(1000); - if (FPSocket^.LastError = ESysETIMEDOUT) then continue; - if (FPSocket^.LastError <> 0) then rxState := RX_ERROR - else begin - remainingLengthx := remainingLengthx + (digit and 127) * multiplier; - if (digit and 128) > 0 then begin - multiplier := multiplier * 128; - rxState := RX_LENGTH; - end else - rxState := RX_DATA; + RX_FIXED_HEADER: + begin + multiplier := 1; + remainingLengthx := 0; + CurrentMessage.Data := nil; + + CurrentMessage.FixedHeader := FPSocket.RecvByte(1000); + if (FPSocket.LastError = WSAETIMEDOUT) then + continue; + if (FPSocket.LastError <> 0) then + rxState := RX_ERROR + else + rxState := RX_LENGTH; end; - end; - RX_DATA: begin - SetLength(CurrentMessage.Data, remainingLengthx); - FPSocket^.RecvBufferEx(Pointer(CurrentMessage.Data), remainingLengthx, 1000); - if (FPSocket^.LastError <> 0) then rxState := RX_ERROR - else begin - HandleData; - rxState := RX_FIXED_HEADER; + RX_LENGTH: + begin + digit := FPSocket.RecvByte(1000); + if (FPSocket.LastError = WSAETIMEDOUT) then + continue; + if (FPSocket.LastError <> 0) then + rxState := RX_ERROR + else + begin + remainingLengthx := + remainingLengthx + (digit and 127) * multiplier; + if (digit and 128) > 0 then + begin + multiplier := multiplier * 128; + rxState := RX_LENGTH; + end + else + rxState := RX_DATA; + end; + end; + RX_DATA: + begin + SetLength(CurrentMessage.Data, remainingLengthx); + FPSocket.RecvBufferEx(Pointer(CurrentMessage.Data), + remainingLengthx, 1000); + if (FPSocket.LastError <> 0) then + rxState := RX_ERROR + else + begin + HandleData; + rxState := RX_FIXED_HEADER; + end; + end; + RX_ERROR: + begin + // Quit the loop, terminating the thread. + break; end; - end; - RX_ERROR: begin - // Quit the loop, terminating the thread. - break; end; end; - end; + finally + FPSocket.CloseSocket(); + FreeAndNil(FPSocket); + end; // try end; procedure TMQTTReadThread.HandleData; - var MessageType: byte; DataLen: integer; - QoS: integer; - Topic: ansistring; + QoS: integer; + Retain: boolean; + Topic: ansistring; Payload: ansistring; ResponseVH: TBytes; ConnectReturn: integer; begin - if (CurrentMessage.FixedHeader <> 0) then begin + if (CurrentMessage.FixedHeader <> 0) then + begin MessageType := CurrentMessage.FixedHeader shr 4; - if (MessageType = Ord(MQTT.CONNACK)) then begin + if (MessageType = Ord(MQTT.CONNACK)) then + begin // Check if we were given a Connect Return Code. // Any return code except 0 is an Error - if ((Length(CurrentMessage.Data) > 0) and (Length(CurrentMessage.Data) < 4)) then begin + if ((Length(CurrentMessage.Data) > 0) and + (Length(CurrentMessage.Data) < 4)) then + begin ConnectReturn := CurrentMessage.Data[1]; - if Assigned(OnConnAck) then OnConnAck(Self, ConnectReturn); - end; - end else if (MessageType = Ord(MQTT.PUBLISH)) then begin - // Read the Length Bytes - DataLen := BytesToStrLength(Copy(CurrentMessage.Data, 0, 2)); - // Get the Topic - SetString(Topic, PChar(@CurrentMessage.Data[2]), DataLen); - // Get the Payload - SetString(Payload, PChar(@CurrentMessage.Data[2 + DataLen]), - (Length(CurrentMessage.Data) - 2 - DataLen)); - if Assigned(OnPublish) then OnPublish(Self, Topic, Payload); - end else if (MessageType = Ord(MQTT.SUBACK)) then begin - // Reading the Message ID - ResponseVH := Copy(CurrentMessage.Data, 0, 2); - DataLen := BytesToStrLength(ResponseVH); - // Next Read the Granted QoS - QoS := 0; - if (Length(CurrentMessage.Data) - 2) > 0 then begin - ResponseVH := Copy(CurrentMessage.Data, 2, 1); - QoS := ResponseVH[0]; + if Assigned(OnConnAck) then + OnConnAck(Self, ConnectReturn); end; - if Assigned(OnSubAck) then OnSubAck(Self, DataLen, QoS); - end else if (MessageType = Ord(MQTT.UNSUBACK)) then begin - // Read the Message ID for the event handler - ResponseVH := Copy(CurrentMessage.Data, 0, 2); - DataLen := BytesToStrLength(ResponseVH); - if Assigned(OnUnSubAck) then OnUnSubAck(Self, DataLen); - end else if (MessageType = Ord(MQTT.PINGRESP)) then begin - if Assigned(OnPingResp) then OnPingResp(Self); - end; + end + else + if (MessageType = Ord(MQTT.PUBLISH)) then + begin + Retain := GetBit(CurrentMessage.FixedHeader, 0); + // Read the Length Bytes + DataLen := BytesToStrLength(Copy(CurrentMessage.Data, 0, 2)); + // Get the Topic + SetString(Topic, PChar(@CurrentMessage.Data[2]), DataLen); + // Get the Payload + SetString(Payload, PChar(@CurrentMessage.Data[2 + DataLen]), + (Length(CurrentMessage.Data) - 2 - DataLen)); + if Assigned(OnPublish) then + OnPublish(Self, Topic, Payload, retain); + end + else + if (MessageType = Ord(MQTT.SUBACK)) then + begin + // Reading the Message ID + ResponseVH := Copy(CurrentMessage.Data, 0, 2); + DataLen := BytesToStrLength(ResponseVH); + // Next Read the Granted QoS + QoS := 0; + if (Length(CurrentMessage.Data) - 2) > 0 then + begin + ResponseVH := Copy(CurrentMessage.Data, 2, 1); + QoS := ResponseVH[0]; + end; + if Assigned(OnSubAck) then + OnSubAck(Self, DataLen, QoS); + end + else + if (MessageType = Ord(MQTT.UNSUBACK)) then + begin + // Read the Message ID for the event handler + ResponseVH := Copy(CurrentMessage.Data, 0, 2); + DataLen := BytesToStrLength(ResponseVH); + if Assigned(OnUnSubAck) then + OnUnSubAck(Self, DataLen); + end + else + if (MessageType = Ord(MQTT.PINGRESP)) then + begin + if Assigned(OnPingResp) then + OnPingResp(Self); + end; end; end; function TMQTTReadThread.BytesToStrLength(LengthBytes: TBytes): integer; begin Assert(Length(LengthBytes) = 2, - 'TMQTTReadThread: UTF-8 Length Bytes preceeding the text must be 2 Bytes in Legnth' - ); + 'TMQTTReadThread: UTF-8 Length Bytes preceeding the text must be 2 Bytes in Legnth'); + Result := 0; Result := LengthBytes[0] shl 8; Result := Result + LengthBytes[1]; end; function TMQTTReadThread.SocketWrite(Data: TBytes): boolean; - var sentData: integer; begin Result := False; // Returns whether the Data was successfully written to the socket. - while not FPSocket^.CanWrite(0) do begin + while not FPSocket.CanWrite(0) do + begin sleep(100); end; - sentData := FPSocket^.SendBuffer(Pointer(Data), Length(Data)); - if sentData = Length(Data) then Result := True; + sentData := FPSocket.SendBuffer(Pointer(Data), Length(Data)); + if sentData = Length(Data) then + Result := True; end; + end. diff --git a/examples/embeddedApp/embeddedApp.lpi b/examples/embeddedApp/embeddedApp.lpi new file mode 100644 index 0000000..65509bf --- /dev/null +++ b/examples/embeddedApp/embeddedApp.lpi @@ -0,0 +1,116 @@ + + + + + + + + + + + + + + + <UseAppBundle Value="False"/> + <ResourceType Value="res"/> + </General> + <BuildModes Count="2"> + <Item1 Name="debug" Default="True"/> + <Item2 Name="release"> + <CompilerOptions> + <Version Value="11"/> + <PathDelim Value="\"/> + <Target> + <Filename Value="embeddedApp"/> + </Target> + <SearchPaths> + <OtherUnitFiles Value="..\..\TMQTTClient"/> + <UnitOutputDirectory Value="lib\$(TargetCPU)-$(TargetOS)"/> + </SearchPaths> + <Linking> + <Debugging> + <GenerateDebugInfo Value="False"/> + </Debugging> + </Linking> + <Other> + <CustomOptions Value="-dUseCThreads"/> + </Other> + </CompilerOptions> + </Item2> + </BuildModes> + <PublishOptions> + <Version Value="2"/> + </PublishOptions> + <RunParams> + <local> + <FormatVersion Value="1"/> + </local> + </RunParams> + <RequiredPackages Count="1"> + <Item1> + <PackageName Value="laz_synapse"/> + </Item1> + </RequiredPackages> + <Units Count="3"> + <Unit0> + <Filename Value="embeddedApp.pas"/> + <IsPartOfProject Value="True"/> + </Unit0> + <Unit1> + <Filename Value="..\..\TMQTTClient\MQTT.pas"/> + <IsPartOfProject Value="True"/> + </Unit1> + <Unit2> + <Filename Value="..\..\TMQTTClient\MQTTReadThread.pas"/> + <IsPartOfProject Value="True"/> + </Unit2> + </Units> + </ProjectOptions> + <CompilerOptions> + <Version Value="11"/> + <PathDelim Value="\"/> + <Target> + <Filename Value="embeddedApp"/> + </Target> + <SearchPaths> + <OtherUnitFiles Value="..\..\TMQTTClient"/> + <UnitOutputDirectory Value="lib\$(TargetCPU)-$(TargetOS)"/> + </SearchPaths> + <Parsing> + <SyntaxOptions> + <IncludeAssertionCode Value="True"/> + </SyntaxOptions> + </Parsing> + <CodeGeneration> + <Checks> + <IOChecks Value="True"/> + <RangeChecks Value="True"/> + <OverflowChecks Value="True"/> + </Checks> + <VerifyObjMethodCallValidity Value="True"/> + </CodeGeneration> + <Linking> + <Debugging> + <UseHeaptrc Value="True"/> + <TrashVariables Value="True"/> + </Debugging> + </Linking> + <Other> + <CustomOptions Value="-dUseCThreads"/> + </Other> + </CompilerOptions> + <Debugging> + <Exceptions Count="3"> + <Item1> + <Name Value="EAbort"/> + </Item1> + <Item2> + <Name Value="ECodetoolError"/> + </Item2> + <Item3> + <Name Value="EFOpenError"/> + </Item3> + </Exceptions> + </Debugging> +</CONFIG> diff --git a/examples/embeddedApp/embeddedApp.pas b/examples/embeddedApp/embeddedApp.pas index c1809a3..e392afb 100644 --- a/examples/embeddedApp/embeddedApp.pas +++ b/examples/embeddedApp/embeddedApp.pas @@ -38,10 +38,20 @@ // cthreads is required to get the MQTTReadThread working. -uses cthreads, Classes, MQTT, sysutils; +uses + {$IFDEF UNIX} {$IFDEF UseCThreads} + cthreads, {$ENDIF} {$ENDIF} + Classes, MQTT, laz_synapse, sysutils; // The major states of the application. +const + pubTimerInterval = 60*10; // 60 - 1 minute + pingTimerInterval = 10*10; // 10 - ping every 10 sec + MQTT_Server = '192.168.1.19'; + MQTT_Topic = '/jack/says'; + //MQTT_Server = '192.168.0.26'; + type TembeddedAppStates = ( CONNECT, WAIT_CONNECT, @@ -53,7 +63,7 @@ // Define class for the embedded application TembeddedApp = object strict - private + private MQTTClient: TMQTTClient; pingCounter : integer; pingTimer : integer; @@ -61,7 +71,8 @@ message : ansistring; pubTimer : integer; connectTimer : integer; - public + public + terminate: boolean; procedure run (); end; @@ -78,9 +89,9 @@ 'All work and no play makes Jack a dull boy. All work and no play makes Jack a dull boy.' ; - MQTTClient := TMQTTClient.Create('192.168.0.26', 1883); + MQTTClient := TMQTTClient.Create(MQTT_Server, 1883); - while true do + while not terminate do begin case state of CONNECT : @@ -106,9 +117,9 @@ RUNNING : begin // Publish stuff - if pubTimer mod 1 = 0 then + if pubTimer mod pubTimerInterval = 0 then begin - if not MQTTClient.Publish('/jack/says/', message) then + if not MQTTClient.Publish(MQTT_Topic, message) then begin writeln ('embeddedApp: Error: Publish Failed.'); state := FAILING; @@ -149,6 +160,9 @@ begin writeln ('getMessage: ' + msg.topic + ' Payload: ' + msg.payload); + if msg.PayLoad = 'stop' then + terminate := true; + // Important to free messages here. msg.free; end; @@ -165,7 +179,7 @@ if ack.returnCode = 0 then begin // Make subscriptions - MQTTClient.Subscribe('/jack/says/'); + MQTTClient.Subscribe(MQTT_Topic); // Enter the running state state := RUNNING; end @@ -202,6 +216,9 @@ // Yawn. sleep(100); end; + + MQTTClient.ForceDisconnect; + FreeAndNil(MQTTClient); end; var diff --git a/examples/fpcConsole/fpcConsoleMQTT.lpi b/examples/fpcConsole/fpcConsoleMQTT.lpi new file mode 100644 index 0000000..61576b1 --- /dev/null +++ b/examples/fpcConsole/fpcConsoleMQTT.lpi @@ -0,0 +1,133 @@ +<?xml version="1.0" encoding="UTF-8"?> +<CONFIG> + <ProjectOptions> + <Version Value="10"/> + <PathDelim Value="\"/> + <General> + <Flags> + <MainUnitHasCreateFormStatements Value="False"/> + <MainUnitHasScaledStatement Value="False"/> + </Flags> + <SessionStorage Value="InProjectDir"/> + <MainUnit Value="0"/> + <Title Value="fpcConsoleMQTT"/> + <UseAppBundle Value="False"/> + <ResourceType Value="res"/> + </General> + <BuildModes Count="2"> + <Item1 Name="debug_wnd_x86" Default="True"/> + <Item2 Name="debug_lnx_arm"> + <CompilerOptions> + <Version Value="11"/> + <PathDelim Value="\"/> + <Target> + <Filename Value="fpcConsoleMQTT"/> + </Target> + <SearchPaths> + <IncludeFiles Value="$(ProjOutDir)"/> + <OtherUnitFiles Value="..\..\TMQTTClient"/> + <UnitOutputDirectory Value="lib\$(TargetCPU)-$(TargetOS)"/> + </SearchPaths> + <Parsing> + <SyntaxOptions> + <IncludeAssertionCode Value="True"/> + </SyntaxOptions> + </Parsing> + <CodeGeneration> + <Checks> + <IOChecks Value="True"/> + <RangeChecks Value="True"/> + <OverflowChecks Value="True"/> + </Checks> + <VerifyObjMethodCallValidity Value="True"/> + <TargetCPU Value="arm"/> + <TargetOS Value="linux"/> + </CodeGeneration> + <Linking> + <Debugging> + <UseHeaptrc Value="True"/> + <TrashVariables Value="True"/> + </Debugging> + </Linking> + <Other> + <CustomOptions Value="-dUseCThreads"/> + </Other> + </CompilerOptions> + </Item2> + </BuildModes> + <PublishOptions> + <Version Value="2"/> + </PublishOptions> + <RunParams> + <local> + <FormatVersion Value="1"/> + </local> + </RunParams> + <RequiredPackages Count="1"> + <Item1> + <PackageName Value="laz_synapse"/> + </Item1> + </RequiredPackages> + <Units Count="3"> + <Unit0> + <Filename Value="fpcConsoleMQTT.pas"/> + <IsPartOfProject Value="True"/> + </Unit0> + <Unit1> + <Filename Value="..\..\TMQTTClient\MQTT.pas"/> + <IsPartOfProject Value="True"/> + </Unit1> + <Unit2> + <Filename Value="..\..\TMQTTClient\MQTTReadThread.pas"/> + <IsPartOfProject Value="True"/> + </Unit2> + </Units> + </ProjectOptions> + <CompilerOptions> + <Version Value="11"/> + <PathDelim Value="\"/> + <Target> + <Filename Value="fpcConsoleMQTT"/> + </Target> + <SearchPaths> + <IncludeFiles Value="$(ProjOutDir)"/> + <OtherUnitFiles Value="..\..\TMQTTClient"/> + <UnitOutputDirectory Value="lib\$(TargetCPU)-$(TargetOS)"/> + </SearchPaths> + <Parsing> + <SyntaxOptions> + <IncludeAssertionCode Value="True"/> + </SyntaxOptions> + </Parsing> + <CodeGeneration> + <Checks> + <IOChecks Value="True"/> + <RangeChecks Value="True"/> + <OverflowChecks Value="True"/> + </Checks> + <VerifyObjMethodCallValidity Value="True"/> + </CodeGeneration> + <Linking> + <Debugging> + <UseHeaptrc Value="True"/> + <TrashVariables Value="True"/> + </Debugging> + </Linking> + <Other> + <CustomOptions Value="-dUseCThreads"/> + </Other> + </CompilerOptions> + <Debugging> + <Exceptions Count="3"> + <Item1> + <Name Value="EAbort"/> + </Item1> + <Item2> + <Name Value="ECodetoolError"/> + </Item2> + <Item3> + <Name Value="EFOpenError"/> + </Item3> + </Exceptions> + </Debugging> +</CONFIG> diff --git a/examples/fpcConsole/fpcConsoleMQTT.pas b/examples/fpcConsole/fpcConsoleMQTT.pas new file mode 100644 index 0000000..75fbf3f --- /dev/null +++ b/examples/fpcConsole/fpcConsoleMQTT.pas @@ -0,0 +1,201 @@ +program fpcConsoleMQTT; + +{$mode objfpc}{$H+} + +uses {$IFDEF UNIX} {$IFDEF UseCThreads} + cthreads, {$ENDIF} {$ENDIF} + Classes, + SysUtils, + CustApp { you can add units after this }, + //old: CRT, + MQTT, + syncobjs, // TCriticalSection + fptimer; + +const + MQTT_Server = 'orangepi.lan'; + +type + { TMQTTGate } + + TMQTTGate = class(TCustomApplication) + protected + MQTTClient: TMQTTClient; + + SyncCode: TCriticalSection; + TimerTick: TFPTimer; + cnt: integer; + + // Unsafe events! Called from MQTT thread (TMQTTReadThread) + procedure OnConnAck(Sender: TObject; ReturnCode: integer); + procedure OnPingResp(Sender: TObject); + procedure OnSubAck(Sender: TObject; MessageID: integer; GrantedQoS: integer); + procedure OnUnSubAck(Sender: TObject); + procedure OnPublish(Sender: TObject; topic, payload: ansistring; isRetain: boolean); + + procedure OnTimerTick(Sender: TObject); + procedure DoRun; override; + public + procedure WriteHelp; virtual; + end; + +{old: const + { ^C } + //ContrBreakSIG = ^C; // yes, its valid string! (OMG!) + //ContrBreakSIG = #$03; +} + +function NewTimer(Intr: integer; Proc: TNotifyEvent; AEnable: boolean = false): TFPTimer; +begin + Result := TFPTimer.Create(nil); + Result.UseTimerThread:=false; + Result.Interval := Intr; + Result.OnTimer := Proc; + Result.Enabled := AEnable; +end; + +{ TMQTTGate } + +procedure TMQTTGate.OnConnAck(Sender: TObject; ReturnCode: integer); +begin + SyncCode.Enter; + writeln('ConnAck'); + SyncCode.Leave; +end; + +procedure TMQTTGate.OnPingResp(Sender: TObject); +begin + SyncCode.Enter; + writeln('PingResp'); + SyncCode.Leave; +end; + +procedure TMQTTGate.OnSubAck(Sender: TObject; MessageID: integer; GrantedQoS: integer); +begin + SyncCode.Enter; + writeln('SubAck'); + SyncCode.Leave; +end; + +procedure TMQTTGate.OnUnSubAck(Sender: TObject); +begin + SyncCode.Enter; + writeln('UnSubAck'); + SyncCode.Leave; +end; + +procedure TMQTTGate.OnPublish(Sender: TObject; topic, payload: ansistring; + isRetain: boolean); +begin + SyncCode.Enter; + writeln('Publish', ' topic=', topic, ' payload=', payload); + SyncCode.Leave; +end; + +procedure TMQTTGate.OnTimerTick(Sender: TObject); +begin + SyncCode.Enter; + cnt := cnt + 1; + writeln('Tick. N='+IntToStr(cnt)); + MQTTClient.PingReq; + MQTTClient.Publish('test', IntToStr(cnt)); + SyncCode.Leave; +end; + +procedure TMQTTGate.DoRun; +var + ErrorMsg: string; +begin + StopOnException := True; + SyncCode := TCriticalSection.Create(); + + // quick check parameters + ErrorMsg := CheckOptions('h', 'help'); + if ErrorMsg <> '' then + begin + ShowException(Exception.Create(ErrorMsg)); + Terminate; + Exit; + end; + + // parse parameters + if HasOption('h', 'help') then + begin + WriteHelp; + Terminate; + Exit; + end; + + // begin main program + MQTTClient := TMQTTClient.Create(MQTT_Server, 1883); + MQTTClient.OnConnAck := @OnConnAck; + MQTTClient.OnPingResp := @OnPingResp; + MQTTClient.OnPublish := @OnPublish; + MQTTClient.OnSubAck := @OnSubAck; + MQTTClient.Connect(); + + //todo: wait 'OnConnAck' + Sleep(1000); + if not MQTTClient.isConnected then + begin + writeln('connect FAIL'); + exit; + end; + + // mqtt subscribe to all topics + MQTTClient.Subscribe('#'); + + cnt := 0; + TimerTick := NewTimer(5000, @OnTimerTick, true); + try + while (not Terminated) and (MQTTClient.isConnected) do + begin + // wait other thread + CheckSynchronize(1000); + + //old: Check for ctrl-c + {if KeyPressed then // <--- CRT function to test key press + if ReadKey = ContrBreakSIG then // read the key pressed + begin + writeln('Ctrl-C pressed.'); + Terminate; + end;} + end; + + MQTTClient.Unsubscribe('#'); + MQTTClient.Disconnect; + Sleep(100); + MQTTClient.ForceDisconnect; + finally + FreeAndNil(TimerTick); + FreeAndNil(MQTTClient); + FreeAndNil(SyncCode); + Sleep(2000); // wait thread dies + end; + // stop program loop + Terminate; +end; + +procedure TMQTTGate.WriteHelp; +begin + { add your help code here } + writeln('Usage: ', ExeName, ' -h'); +end; + +var + Application: TMQTTGate; + +function MyCtrlBreakHandler(CtrlBr: boolean): boolean; +begin + writeln('CtrlBreak pressed. Terminating.'); + Application.Terminate; + Result := true; +end; + +begin + SysSetCtrlBreakHandler(@MyCtrlBreakHandler); + Application := TMQTTGate.Create(nil); + Application.Run; + Application.Free; +end. +