Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 67 additions & 13 deletions cmd/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ import (

var agent *cs.Agent

type mqttCmd struct {
TCP string
HTTP string
WS string
}

func pprof() {
go func() {
log.Info("listen pprof", "error", http.ListenAndServe(":6060", nil))
Expand All @@ -55,16 +61,20 @@ func realMain(ctx context.Context) error {
var err error
var confFile string
var members string
var WSList []*listeners.Websocket
var TCPList []*listeners.TCP
var HTTPList []*listeners.HTTPStats
cfg := config.New()
cmd := mqttCmd{}

flag.StringVar(&confFile, "conf", "", "read the program parameters from the config file")
flag.UintVar(&cfg.StorageWay, "storage-way", 3, "storage way options:0 memory, 1 bolt, 2 badger, 3 redis")
flag.UintVar(&cfg.Auth.Way, "auth-way", 0, "authentication way options:0 anonymous, 1 username and password, 2 clientid")
flag.UintVar(&cfg.Auth.Datasource, "auth-ds", 0, "authentication datasource options:0 free, 1 redis, 2 mysql, 3 postgresql, 4 http")
flag.StringVar(&cfg.Auth.ConfPath, "auth-path", "", "config file path should correspond to the auth-datasource")
flag.StringVar(&cfg.Mqtt.TCP, "tcp", ":1883", "network address for mqtt tcp listener")
flag.StringVar(&cfg.Mqtt.WS, "ws", ":1882", "network address for mqtt websocket listener")
flag.StringVar(&cfg.Mqtt.HTTP, "http", ":8080", "network address for web info dashboard listener")
flag.StringVar(&cmd.TCP, "tcp", ":0", "network address for mqtt tcp listener")
flag.StringVar(&cmd.WS, "ws", ":0", "network address for mqtt websocket listener")
flag.StringVar(&cmd.HTTP, "http", ":0", "network address for web info dashboard listener")
flag.StringVar(&cfg.Cluster.NodeName, "node-name", "", "node name must be unique in the cluster")
flag.StringVar(&cfg.Cluster.BindAddr, "bind-ip", "127.0.0.1", "the ip used for discovery and communication between nodes. It is usually set to the intranet ip addr.")
flag.IntVar(&cfg.Cluster.BindPort, "gossip-port", 7946, "this port is used to discover nodes in a cluster")
Expand Down Expand Up @@ -122,29 +132,73 @@ func realMain(ctx context.Context) error {
}

// gen tls config
var listenerConfig *listeners.Config
var listenerTLSConfig *listeners.Config
var listenerConfig *listeners.Config

if tlsConfig, err := config.GenTlsConfig(cfg); err != nil {
onError(err, "gen tls config")
} else {
if tlsConfig != nil {
listenerConfig = &listeners.Config{TLSConfig: tlsConfig}
}
listenerTLSConfig = &listeners.Config{TLSConfig: tlsConfig}
} else {
log.Info("TLS is not configured, all listeners will use unencrypted connections.")
}
}

// add tcp listener
tcp := listeners.NewTCP("tcp", cfg.Mqtt.TCP, listenerConfig)
onError(server.AddListener(tcp), "add tcp listener")
// add cli tcp listener
if cmd.TCP != ":0" {
tcp := listeners.NewTCP("tcp", cmd.TCP, listenerTLSConfig)
onError(server.AddListener(tcp), "add tcp listener")
}

// TCP Listeners from config file
TCPList = make([]*listeners.TCP, len(cfg.Mqtt.TCPListeners))
for i := 0; i < len(cfg.Mqtt.TCPListeners); i++ {
if cfg.Mqtt.TCPListeners[i].Tls {
TCPList[i] = listeners.NewTCP(cfg.Mqtt.TCPListeners[i].Name, cfg.Mqtt.TCPListeners[i].Port, listenerTLSConfig)
} else {
TCPList[i] = listeners.NewTCP(cfg.Mqtt.TCPListeners[i].Name, cfg.Mqtt.TCPListeners[i].Port, listenerConfig)
}
onError(server.AddListener(TCPList[i]), "add tcp listener: " + cfg.Mqtt.TCPListeners[i].Name)
}

// add websocket listener
ws := listeners.NewWebsocket("ws", cfg.Mqtt.WS, listenerConfig)
onError(server.AddListener(ws), "add websocket listener")
if cmd.WS != ":0" {
ws := listeners.NewWebsocket("ws", cmd.WS, listenerTLSConfig)
onError(server.AddListener(ws), "add websocket listener")
}

// WS Listeners from config file
WSList = make([]*listeners.Websocket, len(cfg.Mqtt.WSListeners))
for i := 0; i < len(cfg.Mqtt.WSListeners); i++ {
if cfg.Mqtt.WSListeners[i].Tls {
WSList[i] = listeners.NewWebsocket(cfg.Mqtt.WSListeners[i].Name, cfg.Mqtt.WSListeners[i].Port, listenerTLSConfig)
} else {
WSList[i] = listeners.NewWebsocket(cfg.Mqtt.WSListeners[i].Name, cfg.Mqtt.WSListeners[i].Port, listenerConfig)
}
onError(server.AddListener(WSList[i]), "add websocket listener: " + cfg.Mqtt.WSListeners[i].Name)
}

// add http listener
csHls := csRt.New(agent).GenHandlers()
mqHls := mqttRt.New(server).GenHandlers()
maps.Copy(csHls, mqHls)
http := listeners.NewHTTP("stats", cfg.Mqtt.HTTP, nil, csHls)
onError(server.AddListener(http), "add http listener")

if cmd.HTTP != ":0" {
http := listeners.NewHTTP("stats", cmd.HTTP, nil, csHls)
onError(server.AddListener(http), "add http listener")
}

// HTTP Listeners from config file
HTTPList = make([]*listeners.HTTPStats, len(cfg.Mqtt.HTTPListeners))
for i := 0; i < len(cfg.Mqtt.HTTPListeners); i++ {
if cfg.Mqtt.HTTPListeners[i].Tls {
HTTPList[i] = listeners.NewHTTP(cfg.Mqtt.HTTPListeners[i].Name, cfg.Mqtt.HTTPListeners[i].Port, listenerTLSConfig, csHls)
} else {
HTTPList[i] = listeners.NewHTTP(cfg.Mqtt.HTTPListeners[i].Name, cfg.Mqtt.HTTPListeners[i].Port, listenerConfig, csHls)
}
onError(server.AddListener(HTTPList[i]), "add http listener: " + cfg.Mqtt.HTTPListeners[i].Name)
}

errCh := make(chan error, 1)
// start server
Expand Down
15 changes: 12 additions & 3 deletions cmd/config/node1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ cluster:
inout-pool-nonblocking: false #Pool size is unlimited, when inout-pool-nonblocking is true, inbound-pool-size and outbound-pool-size is inoperative.

mqtt:
tcp: :1883
ws: :1882
http: :8080
TCPListeners:
- name: "TCP TLS Listener"
port: ":1883"
tls: true
WSListeners:
- name: "Websocket TLS Listener"
port: ":1882"
tls: true
HTTPListeners:
- name: "HTTPS Listener"
port: ":8080"
tls: true
tls:
ca-cert: #CA root certificate file path. Not empty enable bidirectional authentication.
server-cert: #Server certificate file path
Expand Down
17 changes: 13 additions & 4 deletions cmd/config/node2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ cluster:
inout-pool-nonblocking: false #Pool size is unlimited, when inout-pool-nonblocking is true, inbound-pool-size and outbound-pool-size is inoperative.

mqtt:
tcp: :1885
ws: :1886
http: :8081
TCPListeners:
- name: "TCP TLS Listener"
port: ":1885"
tls: true
WSListeners:
- name: "Websocket TLS Listener"
port: ":1886"
tls: true
HTTPListeners:
- name: "HTTPS Listener"
port: ":8081"
tls: true
tls:
ca-cert: #CA root certificate file path. Not empty enable bidirectional authentication.
server-cert: #Server certificate file path
Expand Down Expand Up @@ -77,4 +86,4 @@ log:
max-age: 30 #MaxAge is the maximum number of days to retain old log files based on the timestamp encoded in their filename
max-backups: 10 #MaxBackups is the maximum number of old log files to retain
compress: true #Compress determines if the rotated log files should be compressed using gzip
level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8.
level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8.
17 changes: 13 additions & 4 deletions cmd/config/node3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ cluster:
inout-pool-nonblocking: false #Pool size is unlimited, when inout-pool-nonblocking is true, inbound-pool-size and outbound-pool-size is inoperative.

mqtt:
tcp: :1887
ws: :1888
http: :8082
TCPListeners:
- name: "TCP TLS Listener"
port: ":1887"
tls: true
WSListeners:
- name: "Websocket TLS Listener"
port: ":1888"
tls: true
HTTPListeners:
- name: "HTTPS Listener"
port: ":8082"
tls: true
tls:
ca-cert: #CA root certificate file path. Not empty enable bidirectional authentication.
server-cert: #Server certificate file path
Expand Down Expand Up @@ -77,4 +86,4 @@ log:
max-age: 30 #MaxAge is the maximum number of days to retain old log files based on the timestamp encoded in their filename
max-backups: 10 #MaxBackups is the maximum number of old log files to retain
compress: true #Compress determines if the rotated log files should be compressed using gzip
level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8.
level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8.
20 changes: 16 additions & 4 deletions cmd/config/single.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,21 @@ auth:
conf-path: ./config/auth-redis.yml #The config file path should correspond to the auth-datasource

mqtt:
tcp: :1883
ws: :1882
http: :8080
TCPListeners:
- name: "TCP TLS Listener"
port: ":1883"
tls: true #Use TLS if configured, otherwise fall back to an unencrypted connection.
WSListeners:
- name: "Websocket TLS Listener"
port: ":1882"
tls: true
- name: "Websocket Insecure Listener"
port: ":1884"
tls: false #Don't attempt to use TLS
HTTPListeners:
- name: "HTTP TLS Listener"
port: ":8080"
tls: true
tls:
ca-cert: #CA root certificate file path. Not empty enable bidirectional authentication.
server-cert: #Server certificate file path
Expand Down Expand Up @@ -58,4 +70,4 @@ log:
max-age: 30 #MaxAge is the maximum number of days to retain old log files based on the timestamp encoded in their filename
max-backups: 10 #MaxBackups is the maximum number of old log files to retain
compress: true #Compress determines if the rotated log files should be compressed using gzip
level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8.
level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8.
83 changes: 68 additions & 15 deletions cmd/single/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import (
"go.etcd.io/bbolt"
)

type mqttCmd struct {
TCP string
HTTP string
WS string
}

func pprof() {
go func() {
log.Info("listen pprof", "error", http.ListenAndServe(":6060", nil))
Expand All @@ -49,16 +55,20 @@ func main() {
func realMain(ctx context.Context) error {
var err error
var confFile string
var WSList []*listeners.Websocket
var TCPList []*listeners.TCP
var HTTPList []*listeners.HTTPStats
cfg := config.New()
cmd := mqttCmd{}

flag.StringVar(&confFile, "conf", "", "read the program parameters from the config file")
flag.UintVar(&cfg.StorageWay, "storage-way", 1, "storage way optional items:0 memory, 1 bolt, 2 badger, 3 redis")
flag.UintVar(&cfg.Auth.Way, "auth-way", 0, "authentication way optional items:0 anonymous, 1 username and password, 2 clientid")
flag.UintVar(&cfg.Auth.Datasource, "auth-ds", 0, "authentication datasource optional items:0 free, 1 redis, 2 mysql, 3 postgresql, 4 http")
flag.StringVar(&cfg.Auth.ConfPath, "auth-path", "", "config file path should correspond to the auth-datasource")
flag.StringVar(&cfg.Mqtt.TCP, "tcp", ":1883", "network address for Mqtt TCP listener")
flag.StringVar(&cfg.Mqtt.WS, "ws", ":1882", "network address for Mqtt Websocket listener")
flag.StringVar(&cfg.Mqtt.HTTP, "http", ":8080", "network address for web info dashboard listener")
flag.StringVar(&cmd.TCP, "tcp", ":0", "network address for Mqtt TCP listener")
flag.StringVar(&cmd.WS, "ws", ":0", "network address for Mqtt Websocket listener")
flag.StringVar(&cmd.HTTP, "http", ":0", "network address for web info dashboard listener")
flag.BoolVar(&cfg.Log.Enable, "log-enable", true, "log enabled or not")
flag.StringVar(&cfg.Log.Filename, "log-file", "./logs/comqtt.log", "log filename")
//parse arguments
Expand Down Expand Up @@ -90,26 +100,69 @@ func realMain(ctx context.Context) error {
initBridge(server, cfg)

// gen tls config
var listenerConfig *listeners.Config
var listenerTLSConfig *listeners.Config
var listenerConfig *listeners.Config

if tlsConfig, err := config.GenTlsConfig(cfg); err != nil {
onError(err, "")
} else {
if tlsConfig != nil {
listenerConfig = &listeners.Config{TLSConfig: tlsConfig}
}
listenerTLSConfig = &listeners.Config{TLSConfig: tlsConfig}
} else {
log.Info("TLS is not configured, all listeners will use unencrypted connections.")
}
}

// add tcp listener
tcp := listeners.NewTCP("tcp", cfg.Mqtt.TCP, listenerConfig)
onError(server.AddListener(tcp), "add tcp listener")
// add cli tcp listener
if cmd.TCP != ":0" {
tcp := listeners.NewTCP("tcp", cmd.TCP, listenerTLSConfig)
onError(server.AddListener(tcp), "add tcp listener")
}

// TCP Listeners from config file
TCPList = make([]*listeners.TCP, len(cfg.Mqtt.TCPListeners))
for i := 0; i < len(cfg.Mqtt.TCPListeners); i++ {
if cfg.Mqtt.TCPListeners[i].Tls {
TCPList[i] = listeners.NewTCP(cfg.Mqtt.TCPListeners[i].Name, cfg.Mqtt.TCPListeners[i].Port, listenerTLSConfig)
} else {
TCPList[i] = listeners.NewTCP(cfg.Mqtt.TCPListeners[i].Name, cfg.Mqtt.TCPListeners[i].Port, listenerConfig)
}
onError(server.AddListener(TCPList[i]), "add tcp listener: " + cfg.Mqtt.TCPListeners[i].Name)
}

// add cli websocket listener
if cmd.WS != ":0" {
ws := listeners.NewWebsocket("ws", cmd.WS, listenerTLSConfig)
onError(server.AddListener(ws), "add websocket listener")
}

// WS Listeners from config file
WSList = make([]*listeners.Websocket, len(cfg.Mqtt.WSListeners))
for i := 0; i < len(cfg.Mqtt.WSListeners); i++ {
if cfg.Mqtt.WSListeners[i].Tls {
WSList[i] = listeners.NewWebsocket(cfg.Mqtt.WSListeners[i].Name, cfg.Mqtt.WSListeners[i].Port, listenerTLSConfig)
} else {
WSList[i] = listeners.NewWebsocket(cfg.Mqtt.WSListeners[i].Name, cfg.Mqtt.WSListeners[i].Port, listenerConfig)
}
onError(server.AddListener(WSList[i]), "add websocket listener: " + cfg.Mqtt.WSListeners[i].Name)
}

// add websocket listener
ws := listeners.NewWebsocket("ws", cfg.Mqtt.WS, listenerConfig)
onError(server.AddListener(ws), "add websocket listener")
// add cli http listener
if cmd.HTTP != ":0" {
http := listeners.NewHTTP("stats", cmd.HTTP, nil, rest.New(server).GenHandlers())
onError(server.AddListener(http), "add http listener")
}

// add http listener
http := listeners.NewHTTP("stats", cfg.Mqtt.HTTP, nil, rest.New(server).GenHandlers())
onError(server.AddListener(http), "add http listener")
// HTTP Listeners from config file
HTTPList = make([]*listeners.HTTPStats, len(cfg.Mqtt.HTTPListeners))
for i := 0; i < len(cfg.Mqtt.HTTPListeners); i++ {
if cfg.Mqtt.HTTPListeners[i].Tls {
HTTPList[i] = listeners.NewHTTP(cfg.Mqtt.HTTPListeners[i].Name, cfg.Mqtt.HTTPListeners[i].Port, listenerTLSConfig, rest.New(server).GenHandlers())
} else {
HTTPList[i] = listeners.NewHTTP(cfg.Mqtt.HTTPListeners[i].Name, cfg.Mqtt.HTTPListeners[i].Port, listenerConfig, rest.New(server).GenHandlers())
}
onError(server.AddListener(HTTPList[i]), "add http listener: " + cfg.Mqtt.HTTPListeners[i].Name)
}

errCh := make(chan error, 1)
// start server
Expand Down
16 changes: 11 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,17 @@ type auth struct {
}

type mqtt struct {
TCP string `yaml:"tcp"`
WS string `yaml:"ws"`
HTTP string `yaml:"http"`
Tls tls `yaml:"tls"`
Options comqtt.Options `yaml:"options"`
TCPListeners []listenerOptions `yaml:"TCPListeners"`
WSListeners []listenerOptions `yaml:"WSListeners"`
HTTPListeners []listenerOptions `yaml:"HTTPListeners"`
Tls tls `yaml:"tls"`
Options comqtt.Options `yaml:"options"`
}

type listenerOptions struct {
Name string `yaml:"name"`
Port string `yaml:"port"`
Tls bool `yaml:"tls"`
}

type tls struct {
Expand Down