Good day everyone and thank you for taking the time to answer my question.
I am using node js, docker, redis bull, postgresql, tenserflow js to write a project to train an AI model. My server has 12 CPU cores and 16GB of RAM.
The problem occurs after following these steps.
- I make a two-dimensional data array of 10080 elements for input data for training and an array of 5040 elements for output data. all data are numbers
- Using redis bull, I create queues that retrieve this data from the database, prepare it for training, and train the model synchronously using .fit().
- I used queues to divide the learning process evenly across CPU cores.
Problems:
- For some reason, the system uses only one CPU core, and just as I see from monitoring, the training packages are too large and clog 100% of this CPU core.
Maybe I should use .dataset() and worker threads? I need to train madel with 10080 input data packets. when the size of the entire database is about 5TB of data With one core I will train the model for a year and a half with all this data((( I don't understand how to split the training across all possible CPU cores. I will answer any questions and will be grateful for any advice
There is some my code:
trainQueue.process(1, (payload, done) => {
const train = async()=>{
try {
// console.log(payload.data);
const test = await trainTask(payload, done);
if (test) {
console.log("Done!");
} else {
throw new Error("Не тут то было!")
}
done();
} catch (err) {
done(err);
}
}
train()
});
for (var i = 0; i < q.length; i++) {
await delay(100)
trainQueue.add(q[i],
{
jobId: `Burger#${Number(new Date())}`,
attempts: 5,
backoff: 300000,
delay: 1000,
// })
}
module.exports = async (payload, done) => {
try {
// STEP 1.bun
console.log(payload.data.bun);
payload.log(`Start Training, ${payload.data.sym}`,);
const model = await train.GetModel(payload)
if (!model) {
throw new Error("Model not found")
}
payload.progress(5);
// await delay(2000)
const data = await train.GetLastData(payload, payload.data.qtyitems, payload.data.sym)
payload.log(data.length)
payload.progress(15);
if (payload.data.qtyitems == data.length) {
payload.log(new Date(Number(data[0].open_time)))
payload.log(new Date(Number(data[data.length - 1].open_time)))
payload.log((data[0].trained))
payload.log((data[data.length - 1].trained))
const { dataset, labels } = await train.PrepareData(payload, payload.data.qtyitems, data)
payload.progress(25);
const { datasetTenser, labelsTenser } = await train.getTensers(payload, dataset, labels)
payload.progress(50);
const trained = await train.Train(payload, model, datasetTenser, labelsTenser)
const rr = data.splice(data.length / 2)
payload.progress(75);
const UpdateData = await train.UpdateData(payload, data)
payload.progress(100);
} else {
throw new Error('Не то количество минут, которое ожидали')
}
// await delay(3000)
await payload.progress(100);
// done();
return true
} catch (err) {
console.log("Error")
return done(err);
}
}
const t = await model.fit(dataset, labels, {
epochs: 3,
batchSize: 32,
});
await model.save('file://model-js')
payload.log('Сохранил модель!')
tf.dispose(model)
tf.dispose(dataset)
tf.dispose(labels)