How to perform multiple sequential operations in Node.js?

61 Views Asked by At

How can I build an ETL pipeline script to un-gunzip, extract, transform, save and gunzip files? I am able to get up to un-gunzip, but I am unable to extract, transform, save, and gunzip. I was attempting to follow this tutorial to get me started: https://www.mariokandut.com/transform-data-etl-pipeline-node-js/ One thing that I'm stuck on is how to loop through files after each sequential step. I get an unexpected error SyntaxError: Unexpected end of JSON input during the extract step.

I was able to extract, transform, and save in a separate example. Although I am unable to successfully combine it into this ETL pipeline script.

const fs = require('fs');
const {promises: {readdir, readFile, writeFile}} = require("fs");
var url = require('url');
const zlib = require('zlib');

const input_dir = __dirname + '/input'
const input_unzipped_dir = __dirname + '/input-unzipped'
const output_dir = __dirname + '/output'

async function get_files(dir) {
  return await readdir(dir).then(response =>
    response
  );
}

function read_file(file_path, callback) {
    fs.readFile(file_path, 'utf-8', (err, file_data) => {
      if (err) {
        return callback && callback(err);
      }

      try {
        const object = JSON.parse(file_data);
        return callback && callback(null, object);
      } catch (err) {
        return callback && callback(err);
      }
    })
}

function transform_JSON(file_data) {
  console.log("ts is:", file_data.ts); // => "timestamp"
  console.log("u is:", file_data.u); // => "url"
  console.log("e is:", file_data.e); // => "event"

  console.log(url.parse(file_data.u))
  u = url.parse(file_data.u)

  const query_map = new Map(Object.entries(file_data.e));

  const output = {
    timestamp: file_data.ts,
    url_object: {
      domain: u.host,
      path: u.path,
      query_object: query_map,
      hash: u.hash,
    },
    ec: file_data.e,
  }
  const jsonString = JSON.stringify(output)
  console.log(jsonString)
  return jsonString
}

const orchestrate_etl_pipeline = async () => {
  try {
    // extract

    files = await get_files(input_dir);

    console.log(files);

    if (!fs.existsSync(input_unzipped_dir)){
      fs.mkdirSync(input_unzipped_dir);
    }

    Promise.all(files.map(filename => {
      if (filename.endsWith('.gz')) {
        return new Promise((resolve, reject) => {
          const fileContents = fs.createReadStream(`${input_dir}/${filename}`);
          const writeStream = fs.createWriteStream(`${input_unzipped_dir}/${filename.slice(0, -3)}`);
          const unzip = zlib.createGunzip();
          fileContents.pipe(unzip).pipe(writeStream).on('finish', (err) => {
            if (err) return reject(err);
            else resolve();
          })
        })
      }
    }))
    .then(
      console.log('unzip done')
    );
      
    // transform

    files_unzipped = await get_files(input_unzipped_dir);

    Promise.all(files_unzipped.map(filename => {
      if (filename.endsWith('.json')) {
        read_file(`${input_unzipped_dir}/${filename}`, (err, file_data) => {
          if (err) {
            console.error(err);
            return
          }

          transform_JSON = transform_JSON(file_data)
          
          console.log(transform_JSON)
        })
      }
    }))
    .then(
      console.log('transform done')
    );

    // save file
    // gunzip file
  } catch (error) {
    console.log(error);
  }
}

orchestrate_etl_pipeline().then(console.log('etl done'));

Separate transform and save file example:

function jsonReader(file_path, callback) {
  fs.readFile(file_path, (err, file_data) => {
    if (err) {
      return callback && callback(err);
    }
    try {
      const object = JSON.parse(file_data);
      return callback && callback(null, object);
    } catch (err) {
      return callback && callback(err);
    }
  });
}

jsonReader(`${input_zipped_dir}/t1669976028340.json`, (err, input) => {
  if (err) {
    console.log(err);
    return;
  }
  console.log("ts is:", input.ts); // => "ts"
  console.log("u is:", input.u); // => "u"
  console.log("e is:", input.e); // => "e"

  console.log(url.parse(input.u))
  u = url.parse(input.u)

  const query_map = new Map(Object.entries(input.e));

  const output = {
    timestamp: input.ts,
    url_object: {
      domain: u.host,
      path: u.path,
      query_object: query_map,
      hash: u.hash,
    },
    ec: input.e,
  }
  
  jsonString = JSON.stringify(output)

  console.log(jsonString)

  fs.writeFile(`${input_zipped_dir}/t1669976028340.json`, jsonString, err => {
    if (err) {
      console.log('Error writing file', err)
    } else {
      console.log('Successfully wrote file')
    }
  })
})
1

There are 1 best solutions below

1
Игорь Бережной On

I recommend you to use etl-gun to create ETL pipeline. It supports rxjs paradigm and make pipelines very visual. And it has endpoints for both json and filesystem.