Hi all of stackoverflow users, I am currently facing an issue regarding mongodb & nodejs. I am trying to insert data into mongodb, and I have all the data ready to be inserted into mongodb. The data I am using to insert is a csv type file.
This is the my project. There is a client and server in my project. The server will take in data from CSV file, listening to client and will send data from the CSV file in the memory space of array over to Client. The client side will connect to server, request data and append data into mongodb.
This following code inside client.js
will most probably help me to use its value to append it into mongodb. Can anybody help me with it?
monitoredItem.on("changed",function(dataValue){
console.log(" New Data Receive = ",dataValue.value.value);
});
I am unsure of how to append data into mongodb.
This is my client.js
/*global require,console,setTimeout */
var opcua = require("node-opcua");
var async = require("async");
var fs = require("fs");
var csv = require("fast-csv");
var sleep = require("system-sleep");
var client = new opcua.OPCUAClient();
var endpointUrl = "opc.tcp://" + require("os").hostname() + ":4334/UA/MyLittleServer";
var the_session, the_subscription;
async.series([
// step 1 : connect to
function(callback) {
client.connect(endpointUrl,function (err) {
if(err) {
console.log(" cannot connect to endpoint :" , endpointUrl );
} else {
console.log("connected !");
console.log("Endpoint URL ", endpointUrl);
}
callback(err);
});
},
// step 2 : createSession
function(callback) {
client.createSession( function(err,session) {
if(!err) {
the_session = session;
}
callback(err);
});
},
// step 3 : browse
function(callback) {
the_session.browse("RootFolder", function(err,browse_result){
if(!err) {
browse_result[0].references.forEach(function(reference) {
console.log( reference.browseName.toString());
});
}
callback(err);
});
},
// step 4 : read a variable with readVariableValue
//function(callback) {
// the_session.readVariableValue("ns=2000;s=TEST", function(err,dataValue) {
// if (!err) {
// console.log(" free mem % = " , dataValue.toString());
// }
// callback(err);
// });
//},
// step 4' : read a variable with read
//function(callback) {
// var max_age = 0;
// var nodes_to_read = [
// { nodeId: "ns=2000;s=TEST", attributeId: opcua.AttributeIds.Value }
// ];
// the_session.read(nodes_to_read, max_age, function(err,nodes_to_read,dataValues) {
// if (!err) {
// console.log(" free mem % = " , dataValues[0]);
// }
// callback(err);
// });
//},
// function(callback){
// the_session.readVariableValue("ns=74;s=Dou", function(err,dataValue) {
// if(!err){
// console.log("Test Success", dataValue.toString());
// }
// callback(err);
// });
// },
//
// function(callback){
// the_session.readVariableValue("ns=74;s=Float", function(err,dataValue) {
// if(!err){
// console.log("Test Success", dataValue.toString());
// }
// callback(err);
// });
// },
//
// function(callback){
// the_session.readVariableValue("ns=74;s=String", function(err,dataValue) {
// if(!err){
// console.log("Test Success", dataValue.toString());
// }
// callback(err);
// });
// },
// function(callback){
// the_session.readVariableValue("ns=1;s=CSV", function(err, dataValue) {
// if(!err){
// console.log(dataValue.toString());
// sleep(5000);
// }
// callback(err);
// });
// },
// function(callback){
// the_session.readVariableValue("ns=1;s=CSV", function(err, dataValue) {
// if(!err){
// fs.createReadStream(dataValue.toString())
// console.log(dataValue.toString());
// sleep(5000);
// .pipe(csv())
// .on('data', function(data){
// console.log(csv);
// sleep(5000);
// })
// .op('end', function(data){
// console.log("Read Finish")
// });
// }
// callback(err);
// });
// },
// step 5: install a subscription and install a monitored item for 10 seconds
function(callback) {
the_subscription=new opcua.ClientSubscription(the_session,{
requestedPublishingInterval: 1000,
requestedLifetimeCount: 10,
requestedMaxKeepAliveCount: 2,
maxNotificationsPerPublish: 10,
publishingEnabled: true,
priority: 10
});
the_subscription.on("started",function(){
console.log("subscription started for 2 seconds - subscriptionId=",the_subscription.subscriptionId);
}).on("keepalive",function(){
console.log("keepalive");
}).on("terminated",function(){
callback();
});
setTimeout(function(){
the_subscription.terminate();
},10000000);
// install monitored item
var monitoredItem = the_subscription.monitor({
nodeId: opcua.resolveNodeId("ns=2000;s=TEST"),
attributeId: opcua.AttributeIds.Value
},
{
samplingInterval: 100,
discardOldest: true,
queueSize: 10
},
opcua.read_service.TimestampsToReturn.Both
);
console.log("-------------------------------------");
monitoredItem.on("changed",function(dataValue){
console.log(" New Data Receive = ",dataValue.value.value);
});
},
// close session
function(callback) {
the_session.close(function(err){
if(err) {
console.log("session closed failed ?");
}
callback();
});
}
],
function(err) {
if (err) {
console.log(" failure ",err);
} else {
console.log("done!");
}
client.disconnect(function(){});
}) ;
This is my server.js
/*global require,setInterval,console */
var opcua = require("node-opcua");
var fs = require("fs");
var csv = require("fast-csv");
var sleep = require("system-sleep");
var currentCount = 0;
var array = ["No New Data"];
fs.createReadStream('1000data.csv')
.pipe(csv())
.on('data', function(data){
array.push(JSON.stringify(data));
//sleep(5);
})
.on('end', function(data) {
});
// Let's create an instance of OPCUAServer
var server = new opcua.OPCUAServer({
port: 4334, // the port of the listening socket of the server
resourcePath: "UA/MyLittleServer", // this path will be added to the endpoint resource name
buildInfo : { //Information of the build, Retrieve the server information for the current instance of the db client
productName: "MySampleServer1",
buildNumber: "7658",
buildDate: new Date(2014,5,2)
}
});
function post_initialize() {
console.log("initialized");
function construct_my_address_space(server) {
var addressSpace = server.engine.addressSpace;
// declare a new object
var device = addressSpace.addObject({
organizedBy: addressSpace.rootFolder.objects,
browseName: "MyDevice"
});
// add some variables
// add a variable named MyVariable1 to the newly created folder "MyDevice"
var variable1 = 1;
// emulate variable1 changing every 500 ms
setInterval(function(){ variable1+=1; }, 1000);
addressSpace.addVariable({
componentOf: device,
browseName: "MyVariable1",
dataType: "Double",
value: {
get: function () {
return new opcua.Variant({dataType: opcua.DataType.Double, value: variable1 });
}
}
});
// var variableTest = "Test";
// addressSpace.addVariable({
// componentOf: device,
// nodeId: "ns=1;b=1020FFAA",
// browseName: "MyVariableTest",
// dataType: "String",
// value: {
// get: function () {
// return new opcua.Variant({dataType: opcua.DataType.String, value: variableTest });
// },
// set: function (variant) {
// variable2 = parseFloat(variant.value);
// return opcua.StatusCodes.Good;
// }
// }
// });
// server.jsonVar = server.engine.addVariable("MyDevice", {
// browseName: "JSONObject",
// dataType: "String",
// value: {
// get:function(){
// return new opcua.Variant({
// dataType: opcua.DataType.String,
// value: get_json_string()
// });
// }
// }
// });
// var varTestDou = addressSpace.addVariable({
// componentOf: device,
// nodeId: "ns=74;s=Dou",
// browseName : "VarD",
// dataType: "Double",
// value: new opcua.Variant({dataType: opcua.DataType.Double, value: [10.0,292.31,412.345,185,3453.245]})
// });
// var varTestFloat = addressSpace.addVariable({
// componentOf: device,
// nodeId: "ns=74;s=Float",
// browseName : "VarF",
// dataType: "Float",
// value: new opcua.Variant({dataType: opcua.DataType.Float, value: [10.0,402.23,123,34,643,34]})
// });
// var varTestString = addressSpace.addVariable({
// componentOf: device,
// nodeId: "ns=74;s=String",
// browseName : "VarT",
// dataType: "String",
// value: new opcua.Variant({dataType: opcua.DataType.String, value: "001,41,54,87,23,12/3/2016,8:39am"})
// });
var csvFile = addressSpace.addVariable({
componentOf: device,
nodeId: "ns=1;s=CSV",
browseName: "csvData",
dataType: "String",
value: new opcua.Variant({dataType: opcua.DataType.String, value: array[variable1]})
});
// fs.createReadStream('1000data.csv')
// .pipe(csv())
// .on('data', function(data){
// console.log("Data uploaded");
// })
// .on('end', function(date) {
// console.log("Read Finish");
// });
//look at this if you want to send data from client to server
// add a variable named MyVariable2 to the newly created folder "MyDevice"
var variable2 = 10.0;
server.engine.addressSpace.addVariable({
componentOf: device,
nodeId: "ns=2000;b=1020FFAA", // some opaque NodeId in namespace 4
browseName: "MyVariable2",
dataType: "Double",
value: {
get: function () {
return new opcua.Variant({dataType: opcua.DataType.Double, value: variable2 });
},
set: function (variant) {
variable2 = parseFloat(variant.value);
return opcua.StatusCodes.Good;
}
}
});
var os = require("os");
/**
* returns the percentage of free memory on the running machine
* @return {double}
*/
function available_memory() {
if (currentCount < array.length-1){
currentCount += 1 ;
console.log(array[currentCount]);
return currentCount;
} else {
console.log(array[0]);
return 0;
}
}
server.engine.addressSpace.addVariable({
componentOf: device,
nodeId: "ns=2000;s=TEST", // a string nodeID
browseName: "FreeMemory",
dataType: "String",
value: {
get: function () {return new opcua.Variant({dataType: opcua.DataType.String, value: array[available_memory()] });}
}
});
}
construct_my_address_space(server);
server.start(function() {
console.log("Server is now listening ... ( press CTRL+C to stop)");
console.log("port ", server.endpoints[0].port);
var endpointUrl = server.endpoints[0].endpointDescriptions()[0].endpointUrl;
console.log(" the primary server endpoint url is ", endpointUrl );
});
}
server.initialize(post_initialize);
This is my 1000data.csv
Machine Unit,Air Temperature,Water Temperature,Heat Temperature,Room Temperature,Date,Time
1,61,54,87,24,12/3/2016,8:39AM
2,41,57,92,23,29/9/2016,3:51PM
3,39,53,89,25,22/12/2016,5:30PM
You could add a function which adds the monitoring and gets a callback which is called when the data changes. Therein you add the data to mongoDB.
The function could look like this:
Now on step 5 you could add something like:
The model might look like this:
The schema of the database depends on the data you want to save