11 Jan 2024

Stream IOT Data from MQTT broker to Digital Twin

aps-mqtt-demo

In everything from small home automation to industrial IOT applications, sensors produce data every second or every couple of seconds. Not all information may be saved; only averages over a short period of time and anomalies are stored for analysis. But many times we want to stream the data in a digital twin for visualization and monitoring purposes.

This sample demonstrates steaming IOT data from an MQTT broker to a Digital Twin.

Source Code:

IOT application as an MQTT subscriber: aps-mqtt-demo

MQTT publisher sample: mqtt-publisher

In this example, we have used the EMQX cloud MQTT broker because it's simple to use and provides a free-tier option for development. You can use any MQTT broker of your choice.

Once you create an EMQX cloud account, create a cloud instance of Broker, make note of the websocket connection url, and under authentication, create a username and password for your client.

EMQX

 

Use these credentials to connect to the MQTT broker instance and start publishing the data. You can publish the data from either a PLC(Programmable Logic Controller) or an Industrial PC or your own server for testing. Here's an example of connecting to EMQX cloud deployment and publishing. 

For testing, we can publish random data to the broker, An example of connecting and publishing:

// connection option
const options = {
    clean: true, // retain session
    connectTimeout: 4000, // Timeout period
    // Authentication information
    clientId: MQTT_CLIENT+Date.now(),
    username: MQTT_USERNAME,
    password: MQTT_PASSWORD,
}

const connectUrl = MQTT_URL
const client = mqtt.connect(connectUrl, options)

client.on('reconnect', (error) => {
console.log('reconnecting:', error)
})

client.on('connect', (e) => {
    sendData();
    setTimeout(()=>{
        sendData();
    },1000)
})

client.on('error', (error) => {
console.log('Connection failed:', error)
})


function sendData() {
    let data = {timestamps:[],temperature:[],flowrate:[],pressure:[]};
    data.timestamps.push(new Date);
    data.flowrate.push(random(20,240));
    data.pressure.push(random(0,200));
    data.temperature.push(random(0,100));
    client.publish('data', data)
}

const random = (min, max) => Math.floor(Math.random() * (max - min)) + min;

On the application side, we can consume the data published on the server side and pass it via websocket to the client (in this case, browser)

client.subscribe(["data"], [0], function(qosList) {
    console.log(qosList)
});

let data = {timestamps:[],temperature:[],flowrate:[],pressure:[]};
client.on('message', (topic, message) => {
    let i = JSON.parse(arrayBufferToString(message))
    
    data.pressure.push(i.pressure[0]);
    if(data.pressure.length>200)data.pressure.shift();
    
    data.flowrate.push(i.flowrate[0]);
    if(data.flowrate.length>200)data.flowrate.shift();
    
    data.temperature.push(i.temperature[0]);
    data.timestamps.push(i.timestamps[0]);
    if(data.temperature.length>200)data.temperature.shift();
    if(data.timestamps.length>200)data.timestamps.shift();
})

io.on('connection', (socket) => {
    console.log('user connected');
    socket.on('disconnect', function () {
      console.log('user disconnected');
    });
    socket.on('mqttdata', () => {
        io.emit('mqttdata', data)
    })
    setInterval(()=>{sendData()},1000)
    
  })

function sendData() {
    io.emit('mqttdata', data);
}

Please try the source code sample for full working code. Let us know your feedback, If you face any issue using the code, please email to aps.help@autodesk.com

Tags:

Related Article