How can I build an ETL pipeline script to un-gunzip, extract, transform, save and gunzip files? I am able to get up to un-gunzip, but I am unable to extract, transform, save, and gunzip. I was attempting to follow this tutorial to get me started: https://www.mariokandut.com/transform-data-etl-pipeline-node-js/ One thing that I'm stuck on is how to loop through files after each sequential step. I get an unexpected error SyntaxError: Unexpected end of JSON input during the extract step.
I was able to extract, transform, and save in a separate example. Although I am unable to successfully combine it into this ETL pipeline script.
const fs = require('fs');
const {promises: {readdir, readFile, writeFile}} = require("fs");
var url = require('url');
const zlib = require('zlib');
const input_dir = __dirname + '/input'
const input_unzipped_dir = __dirname + '/input-unzipped'
const output_dir = __dirname + '/output'
async function get_files(dir) {
return await readdir(dir).then(response =>
response
);
}
function read_file(file_path, callback) {
fs.readFile(file_path, 'utf-8', (err, file_data) => {
if (err) {
return callback && callback(err);
}
try {
const object = JSON.parse(file_data);
return callback && callback(null, object);
} catch (err) {
return callback && callback(err);
}
})
}
function transform_JSON(file_data) {
console.log("ts is:", file_data.ts); // => "timestamp"
console.log("u is:", file_data.u); // => "url"
console.log("e is:", file_data.e); // => "event"
console.log(url.parse(file_data.u))
u = url.parse(file_data.u)
const query_map = new Map(Object.entries(file_data.e));
const output = {
timestamp: file_data.ts,
url_object: {
domain: u.host,
path: u.path,
query_object: query_map,
hash: u.hash,
},
ec: file_data.e,
}
const jsonString = JSON.stringify(output)
console.log(jsonString)
return jsonString
}
const orchestrate_etl_pipeline = async () => {
try {
// extract
files = await get_files(input_dir);
console.log(files);
if (!fs.existsSync(input_unzipped_dir)){
fs.mkdirSync(input_unzipped_dir);
}
Promise.all(files.map(filename => {
if (filename.endsWith('.gz')) {
return new Promise((resolve, reject) => {
const fileContents = fs.createReadStream(`${input_dir}/${filename}`);
const writeStream = fs.createWriteStream(`${input_unzipped_dir}/${filename.slice(0, -3)}`);
const unzip = zlib.createGunzip();
fileContents.pipe(unzip).pipe(writeStream).on('finish', (err) => {
if (err) return reject(err);
else resolve();
})
})
}
}))
.then(
console.log('unzip done')
);
// transform
files_unzipped = await get_files(input_unzipped_dir);
Promise.all(files_unzipped.map(filename => {
if (filename.endsWith('.json')) {
read_file(`${input_unzipped_dir}/${filename}`, (err, file_data) => {
if (err) {
console.error(err);
return
}
transform_JSON = transform_JSON(file_data)
console.log(transform_JSON)
})
}
}))
.then(
console.log('transform done')
);
// save file
// gunzip file
} catch (error) {
console.log(error);
}
}
orchestrate_etl_pipeline().then(console.log('etl done'));
Separate transform and save file example:
function jsonReader(file_path, callback) {
fs.readFile(file_path, (err, file_data) => {
if (err) {
return callback && callback(err);
}
try {
const object = JSON.parse(file_data);
return callback && callback(null, object);
} catch (err) {
return callback && callback(err);
}
});
}
jsonReader(`${input_zipped_dir}/t1669976028340.json`, (err, input) => {
if (err) {
console.log(err);
return;
}
console.log("ts is:", input.ts); // => "ts"
console.log("u is:", input.u); // => "u"
console.log("e is:", input.e); // => "e"
console.log(url.parse(input.u))
u = url.parse(input.u)
const query_map = new Map(Object.entries(input.e));
const output = {
timestamp: input.ts,
url_object: {
domain: u.host,
path: u.path,
query_object: query_map,
hash: u.hash,
},
ec: input.e,
}
jsonString = JSON.stringify(output)
console.log(jsonString)
fs.writeFile(`${input_zipped_dir}/t1669976028340.json`, jsonString, err => {
if (err) {
console.log('Error writing file', err)
} else {
console.log('Successfully wrote file')
}
})
})
I recommend you to use etl-gun to create ETL pipeline. It supports rxjs paradigm and make pipelines very visual. And it has endpoints for both json and filesystem.