Node App: Trying to publish an MQTT message from within a firebird.attach routine

107 Views Asked by At

So I've got a firebird.attach routine which is successfully working

res.render('success');
client.publish('time', 'test');
console.log(`MQTT sent`)

but I'm trying to publish the above mqtt message from within it.

client.on('connect', function () {
  client.subscribe('presence', function (err) {
    if (!err) {
      client.publish('test', 'Hello')
    }
  })
})

the above message works and is sent and received, which is on its own outside of the firebird.attach block of code and so I'm wondering if it's even possible to run the mqtt code within the firebird.attach block? The console.log message (MQTT sent) DOES appear so I'm confused.

thanks

Code is here:

require('dotenv').config()

// Express intialization
const express = require('express')
const app = express()
app.set('view engine', 'pug')
const port = 4010
var moment = require('moment'); // require
app.use(express.static('public'))


// Firebird initialization
var firebird = require('node-firebird')
var options = {}
options.host = process.env.HOST
options.port = process.env.PORT
options.database = process.env.DB_PATH
options.user = process.env.DB_USER
options.password = process.env.DB_PASSWORD

var mqtt = require('mqtt')
var client  = mqtt.connect('mqtt://192.168.1.1')

client.on('connect', function () {
  client.subscribe('presence', function (err) {
    if (!err) {
      client.publish('presence', 'Hello mqtt')
    }
  })
})

client.on('message', function (topic, message) {
  // message is Buffer
  console.log(message.toString())
  client.end()
})


firebird.attach(options, function(err, db) {
    if(err) throw err;
    
    app.get('/', (req, res) => {
        db.query('SELECT opid, name FROM PXOPS WHERE OPID NOT IN (1, -1, 10, 14)', function(err, rows) {
          if(err) throw err;
  
          res.render('index', { operators: rows });
        });
      })
  
      app.get('/clock/:id', (req, res) => {
          if(req.params.id) {
              db.query('SELECT opid, name FROM PXOPS WHERE OPID = ' + req.params.id, function(err, operator_row = []) {
                  if(operator_row.length === 1)
                  {
                      var clockedIn = false;
                      db.query('SELECT FIRST 1 * FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' AND DATETIME >= \'' + moment().startOf('day').format('YYYY-MM-DD HH:mm:ss') + '\' ORDER BY DATETIME DESC', function(err, last_row) {
                          console.log(last_row);
              if(err) throw err;
                          if(last_row.length === 1 && last_row[0].EPXNTTYPE === 0) clockedIn = true;
                      
                          if(clockedIn)
                          {
                              db.query('SELECT FIRST 10 * FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' AND DATETIME >= \'' + moment().startOf('day').format('YYYY-MM-DD HH:mm:ss') + '\'', function(err, today_rows) {
                                  totalMinutes = 0;
                console.log(today_rows);
                                  today_rows.push({ DATETIME: moment().format('YYYY-MM-DD HH:mm:ss')});
                                  splitRows = today_rows.reduce(function(result, value, index, array) {
                                      if (index % 2 === 0)
                                        result.push(array.slice(index, index + 2));
                                      return result;
                                  }, []);
  
                                  splitRows.forEach(pair => {
                                      totalMinutes += Math.round(moment.duration(moment(pair[1].DATETIME).diff(moment(pair[0].DATETIME))).asMinutes());
                                  });
  
                                  console.log(totalMinutes);
  
                                  minuteString = null;
                                  hours = 0;
                                  remainingMinutes = 0;
                                  if(totalMinutes > 60)
                                  {
                                      hours = Math.floor(totalMinutes / 60);
                                      remainingMinutes = totalMinutes - (hours * 60);
                                  }
  
                                  if(hours > 0 && remainingMinutes > 0)
                                  {
                                      minuteString = hours + ' hours and ' + remainingMinutes + ' minutes'
                                  }
                                  else if(hours > 0 && remainingMinutes === 0)
                                  {
                                      minuteString = hours + ' hours';
                                  }
                                  else {
                                      minuteString = totalMinutes + ' minutes';
                                  }
  
                                  res.render('clock', { operator: operator_row[0], working_string: minuteString, clockedIn: clockedIn });
  
                              });
                          }
                          else
                          {
                              res.render('clock', {operator: operator_row[0], clockedIn: clockedIn})
                          }
                      });
  
                  }
              });
          }
      })
  
      app.get('/clock-in/:id', (req, res) => {
          db.query('SELECT opid, name FROM PXOPS WHERE OPID = ' + req.params.id, function(err, rows) {
  
              if(rows.length === 0) 
              {
                  res.render('error');
              }
  
              // Check if he's already clocked in
              db.query('SELECT FIRST 1 OPID FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
                  if(err) throw err;
  
                  if(rows.length === 1 && rows[0].EPXNTTYPE == 0)
                  {
                      res.render('error');
                  }
                  else
                  {
                      db.query('SELECT FIRST 1 ATTENDEPXNTID FROM PXATTENDEPXNTS ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
                          lastID = 0;
                          if(rows.length === 1)
                          {
                              lastID = rows[0].ATTENDEPXNTID;
                          }
  
                          db.query('INSERT INTO PXATTENDEPXNTS (ATTENDEPXNTID, OPID, DATETIME, EPXNTTYPE, SCANNEDSTR, BRANCHID, CHANGED_FLAG, DEPTID) VALUES (' + (lastID + 1) + ','+ req.params.id +', \'' + moment().format('YYYY-MM-DD HH:mm:ss') + '\', 0, null, 1, 1, 0)', function(err, rows) {
                              if(err) throw err;
  
                              res.render('success');
                                                            client.publish('timeclock', 'clocked in');
                                                            console.log(`MQTT sent`)
                          });
                      });
                  }
              });
          });
        })
  
        app.get('/clock-out/:id', (req, res) => {
          db.query('SELECT opid, name FROM PXOPS WHERE OPID = ' + req.params.id, function(err, rows) {
  
              if(rows.length === 0) 
              {
                  res.render('error');
              }
  
              // Check if he's already clocked out
              db.query('SELECT FIRST 1 OPID FROM PXATTENDEPXNTS WHERE OPID = ' + req.params.id + ' ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
                  if(err) throw err;
  
                  if(rows.length === 1 && rows[0].EPXNTTYPE == 1)
                  {
                      res.render('error');
                  }
                  else
                  {
                      db.query('SELECT FIRST 1 ATTENDEPXNTID FROM PXATTENDEPXNTS ORDER BY ATTENDEPXNTID DESC', function(err, rows) {
                          lastID = 0;
                          if(rows.length === 1)
                          {
                              lastID = rows[0].ATTENDEPXNTID;
                          }
  
                          db.query('INSERT INTO PXATTENDEPXNTS (ATTENDEPXNTID, OPID, DATETIME, EPXNTTYPE, SCANNEDSTR, BRANCHID, CHANGED_FLAG, DEPTID) VALUES (' + (lastID + 1) + ','+ req.params.id +', \''+ moment().format('YYYY-MM-DD HH:mm:ss') +'\', 1, null, 1, 1, 0)', function(err, rows) {
                              if(err) throw err;
  
                              res.render('success')
                          });
                      });
                  }
              });
          });
        })
})


app.listen(port, () => {
  console.log(`App listening at http://localhost:${port}`)
})
1

There are 1 best solutions below

3
On BEST ANSWER

client.end() in the on message callback is going to shutdown the MQTT client. So the message you publish in the on connect callback is going to cause the client to immediately exit.

(Clients receive their own published messages if subscribed to the topic, this can be changed with MQTT v5 iirc, but needs to be explicitly setup)