Using NodeJS ReadStream or Buffer with ffmpeg and ffprobe hangs

214 Views Asked by At

I'm using ffmpeg to convert some audio files in NodeJS This function takes as input a ReadStream from a media file or a Buffer with the bytes of the media file and convert it:

let convertStream = function (streamOrBuffer, opath, sample_rate, params) {
      var self = this;
      // defaults
      var loglevel = self.logger.isDebug() ? 'debug' : 'error';
      return new Promise(async (resolve, reject) => {
        var args = [];
        args = args.concat([
          '-y',
          '-loglevel', loglevel,
          '-v', 'quiet',
          '-i', 'pipe:'
        ]);
        if (sample_rate) { // valid sample rate
          args.push('-ar');
          args.push(sample_rate);
        }
        // output
        args = args.concat([
          opath
        ]);
        const opts = self._options.child_process;
        self.logger.debug("convertStream options: %s", args.join(" "));
        const ffmpeg = cp.spawn('ffmpeg', args, opts)
          .on('message', msg => self.logger.info(msg))
          .on('error', reject)
          .on('exit', (code, signal) => {
            self.logger.debug("convertStream [exit] code:%s signal:%s", code, signal);
          })
          .on('close', () => {
            self.logger.debug("downloadHLS [close]");
            return resolve();
          });
        ffmpeg.stdin.on("error", (err) => {
          if (err.code === "EPIPE") {

            // ignore EPIPE error
            // throws sometimes a EPIPE error, ignore as long ffmpeg exit with code 0

          } else {
            self.logger.warn("convertStream ffmepg stdin error", err);
          }
        });
        if (streamOrBuffer instanceof Buffer) { // Buffer
          ffmpeg.stdin.write(streamOrBuffer);
        } else { // ReadStream
          ffmpeg.stdin.write((await stream2Buffer(streamOrBuffer)));
        }
        ffmpeg.stdin.end();
      });
    }//convertStream

where stream2Buffer looks like:

let stream2Buffer = function (stream) {
      return new Promise((resolve, reject) => {
        const _buf = [];
        stream.on("data", (chunk) => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", (err) => reject(err));
      });
    }//stream2Buffer

This works ok using both a ReadStream:

var readStream = fs.createReadStream(mediaPath);
await convertStream (readStream, 'out.mp4', '44100', {})

that using a Buffer object like

const buff = await request.get(mediaUrl, {});
await convertStream (buff, 'out.mp4', '44100', {})      

so I would expect to work when using ffprobe:

let probeStream = function (streamOrBuffer, sample_rate, params) {
      var self = this;
      return new Promise(async (resolve, reject) => {
        var loglevel = self.logger.isDebug() ? 'debug' : 'error';
        var args = [];
        args = args.concat([
          '-v', 'quiet',
          '-loglevel', loglevel,
          '-print_format', 'json',
          '-show_chapters',
          '-show_format',
          '-show_streams',
          '-i', 'pipe:'
        ]);
        if (sample_rate) { // valid sample rate
          args.push('-ar');
          args.push(sample_rate);
        }
        const opts = self._options.child_process;
        self.logger.debug("probeStream probe options: %s", args.join(" "));
        var result = '';
        const ffprobe = cp.spawn('ffprobe', args, opts)
          .on('message', msg => self.logger.info(msg))
          .on('error', reject)
          .on('exit', (code, signal) => {
            self.logger.debug("probeStream [exit] code:%s signal:%s", code, signal);
          })
          .on('close', (out) => {
            self.logger.debug("probeStream %@", out);
            return resolve(result);
          });
        ffprobe.stdin.on("error", (err) => {
          if (err.code === "EPIPE") {

            // ignore EPIPE error
            // throws sometimes a EPIPE error, ignore as long ffmpeg exit with code 0

          } else {
            self.logger.warn("probeStream ffmepg stdin error", err);
          }
        });
        ffprobe.stdout.on('data', function (data) {
          result += data.toString();
        });
        if (streamOrBuffer instanceof Buffer) { // Buffer
          ffprobe.stdin.write(streamOrBuffer);
        } else { // ReadStream
          ffprobe.stdin.write((await stream2Buffer(streamOrBuffer)));
        }
        ffprobe.stdin.end();
      });
    }//probeStream

But ffprobe hangs on running the command:

probeStream probe options: -v quiet -loglevel debug -print_format json -show_chapters -show_format -show_streams -i pipe: -ar 44100

without exiting the process or processing the media.

1

There are 1 best solutions below

0
On BEST ANSWER

Okay, 2 things.

  1. You don't need to pass sample_rate, since the probe will automatically get this
  2. -i pipe: is not valid argument, the right format to pipe data is just -

This works for WSL2 nicely:

let probeStream = function (streamOrBuffer, params) {
      var self = this;
      return new Promise(async (resolve, reject) => {
        var loglevel = self.logger.isDebug() ? 'debug' : 'error';
        var args = [];
        args = args.concat([
          '-v', 'quiet',
          '-loglevel', loglevel,
          '-print_format', 'json',
          '-show_chapters',
          '-show_format',
          '-show_streams',
          '-'
        ]);
        // assuming opts = { stdio: ['pipe', 'pipe', 'pipe'] };
        const opts = self._options.child_process;

        self.logger.debug("probeStream probe options: %s", args.join(" "));
        var result = '';
        const ffprobe = cp.spawn('ffprobe', args, opts)
          .on('message', msg => self.logger.info(msg))
          .on('error', reject)
          .on('exit', (code, signal) => {
            self.logger.debug("probeStream [exit] code:%s signal:%s", code, signal);
          })
          .on('close', (out) => {
            ffprobe.stdin.end();
            self.logger.debug("probeStream %@", out);
            return resolve(result);
          });
        ffprobe.stdin.on("error", (err) => {
          if (err.code !== "EPIPE") {
            self.logger.warn("probeStream ffmepg stdin error", err);
          }
        });
        ffprobe.stdout.on('data', function (data) {
          result += data.toString();
        });
        if (streamOrBuffer instanceof Buffer) { // Buffer
          ffprobe.stdin.write(streamOrBuffer);
        } else { // ReadStream
          ffprobe.stdin.write((await stream2Buffer(streamOrBuffer)));
        }       
      });
    }//probeStream