diff --git a/kubeedge-counter-demo/counter-mapper/device/device.go b/kubeedge-counter-demo/counter-mapper/device/device.go index e50e8a668..15e8248ba 100644 --- a/kubeedge-counter-demo/counter-mapper/device/device.go +++ b/kubeedge-counter-demo/counter-mapper/device/device.go @@ -11,23 +11,24 @@ const ( ) type Counter struct { - status chan int - handle func (int) + Name string + CurrentStatus string + status chan int + handle func(string, int) } - func (counter *Counter) runDevice(interrupt chan struct{}) { data := 0 for { select { case <-interrupt: - counter.handle(0) + counter.handle(counter.Name, 0) return default: data++ - counter.handle(data) - fmt.Println("Counter value:", data) + counter.handle(counter.Name, data) + fmt.Printf("Counter: %s, Counter value: %d \n", counter.Name, data) time.Sleep(1 * time.Second) } } @@ -57,10 +58,12 @@ func (counter *Counter) TurnOff() { counter.status <- OFF } -func NewCounter(h func (x int)) *Counter { +func NewCounter(name string, h func(name string, x int)) *Counter { counter := &Counter{ - status: make(chan int), - handle: h, + Name: name, + CurrentStatus: "OFF", + status: make(chan int), + handle: h, } go counter.initDevice() diff --git a/kubeedge-counter-demo/counter-mapper/main.go b/kubeedge-counter-demo/counter-mapper/main.go index 3d8b1b608..d92d1a817 100644 --- a/kubeedge-counter-demo/counter-mapper/main.go +++ b/kubeedge-counter-demo/counter-mapper/main.go @@ -6,19 +6,22 @@ import ( "os" "os/signal" "strconv" + "strings" "syscall" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/kubeedge/examples/kubeedge-counter-demo/counter-mapper/device" + counter "github.com/kubeedge/examples/kubeedge-counter-demo/counter-mapper/device" "github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller/types" ) var cli mqtt.Client +var counterList map[string]*counter.Counter const ( - mqttUrl = "tcp://127.0.0.1:1883" - topic = "$hw/events/device/counter/twin/update" + mqttUrl = "tcp://127.0.0.1:1883" + subTopic = "$hw/events/device/+/twin/update" + pubTopic = "$hw/events/device/%s/twin/update" ) //BaseMessage the base struct of event message @@ -73,10 +76,11 @@ func createActualUpdateMessage(actualValue string) DeviceTwinUpdate { return deviceTwinUpdateMessage } -func publishToMqtt(data int) { +func publishToMqtt(name string, data int) { updateMessage := createActualUpdateMessage(strconv.Itoa(data)) twinUpdateBody, _ := json.Marshal(updateMessage) + topic := fmt.Sprintf(pubTopic, name) token := cli.Publish(topic, 0, false, twinUpdateBody) if token.Wait() && token.Error() != nil { @@ -104,32 +108,30 @@ func main() { defer close(stopchan) cli = connectToMqtt() + counterList = make(map[string]*counter.Counter) - // Link to pseudo device counter - ctr := counter.NewCounter(publishToMqtt) - - current_status := "OFF" - - token := cli.Subscribe(topic+"/document", 0, func(client mqtt.Client, msg mqtt.Message) { + token := cli.Subscribe(subTopic+"/document", 0, func(client mqtt.Client, msg mqtt.Message) { Update := &types.DeviceTwinDocument{} err := json.Unmarshal(msg.Payload(), Update) if err != nil { fmt.Printf("Unmarshal error: %v\n", err) } + ctr := GetCounterByName(GetNameFromTopic(msg.Topic())) + cmd := *Update.Twin["status"].CurrentState.Expected.Value - if cmd == "ON" && cmd != current_status { + if cmd == "ON" && cmd != ctr.CurrentStatus { ctr.TurnOn() - fmt.Printf("turn on counter.\n") + fmt.Printf("turn on counter %s.\n", ctr.Name) } - if cmd == "OFF" && cmd != current_status { + if cmd == "OFF" && cmd != ctr.CurrentStatus { ctr.TurnOff() - fmt.Printf("turn off counter.\n") + fmt.Printf("turn off counter %s.\n", ctr.Name) } - current_status = cmd + ctr.CurrentStatus = cmd }) if token.Wait() && token.Error() != nil { @@ -142,3 +144,19 @@ func main() { break } } + +func GetCounterByName(name string) *counter.Counter { + c, ok := counterList[name] + if ok { + return c + } + + ctr := counter.NewCounter(name, publishToMqtt) + counterList[name] = ctr + return ctr +} + +func GetNameFromTopic(topic string) string { + ts := strings.Split(topic, "/") + return ts[3] +}