AWS problems: copy a large file in the same bucket but in different folder

172 Views Asked by At

i have a problem with a lambda function, here the problem: my lambda function needs to handle a PutObject event:

When the PutObject is managed, I have to copy the large file uploaded to the same bucket but to a different folder.

I tried with s3.copyObject (), with s3.PutObject () and with createMultipartUpload () [implementing the whole loop to handle this function: uploadPart, etc ...] but nothing worked!

The event is captured but subsequently the function does not print anything in the console, neither failure nor success.

Here the lambda:

    // dependencies
const AWS = require('aws-sdk');
const util = require('util');
const fs = require('fs');

// get reference to S3 client
const s3 = new AWS.S3();

const uploadPart = (params, chunk, partno, final, cb) => {
  console.log("##### Upload part: ", partno);
  s3.uploadPart({
    Body: chunk,
    Bucket: params.Bucket,
    Key: params.Key,
    UploadId: params.UploadId,
    PartNumber: partno
  }, (err, res) => {
    if (err) { console.log('## Errore: failed part uploaded: ', err); return; }
    if (cb) cb(null, { size: chunk.length, ETag: res.ETag });
  });
};

const completeMultipartUpload = (params, PartMap) => {
  console.log("##### 4. INIT COMPLETE MULTIPART UPLOAD");
  s3.completeMultipartUpload({
    Bucket: params.Bucket,
    Key: params.Key,
    UploadId: params.UploadId,
    MultipartUpload: PartMap
  }, (err, data) => {
    if (err) { console.log('## Errore: failed complete multipart upload: ', err); return; }
    console.log('###### 5. Upload completed: ', JSON.stringify(data));
  });
};

exports.handler = async (event, context, callback) => {

  // Read options from the event parameter.
  console.log("Reading options from event:\n", util.inspect(event, {depth: 5}));
  
  const srcBucket = event.Records[0].s3.bucket.name;
  const srcKey    = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, " "));
  const maxchunksize = event.Records[0].s3.object.size;
  const dstBucket = "bucketsrctest";
  
  console.log("SRC KEY: ", srcKey, ", File Size: ", ((maxchunksize / 1024) / 1024), " MB");

  // Infer the file type from the file suffix.
  const typeMatch = srcKey.match(/\.([^.]*)$/);
  if (!typeMatch) {
      console.log("Could not determine the file type.");
      return;
  }

  // Check that the file type is supported
  const fileType = typeMatch[1].toLowerCase();
  if (fileType != "csv") {
      console.log(`Unsupported file type: ${fileType}`);
      return;
  }
  
  const URI_PARTS = srcKey.split('/');
  const TOTAL_PARTS = URI_PARTS.length;
  
  const pre_file_folder = URI_PARTS[TOTAL_PARTS - 2];
  const hour = URI_PARTS[TOTAL_PARTS - 3];
  const day = URI_PARTS[TOTAL_PARTS - 4];
  const month = URI_PARTS[TOTAL_PARTS - 5];
  const year = URI_PARTS[TOTAL_PARTS - 6];
  const sub_folder = URI_PARTS[TOTAL_PARTS - 7];
  const main_folder = URI_PARTS[TOTAL_PARTS - 8];
  
  console.log("PATHS: ", URI_PARTS);
  
  const dst = prepareData(main_folder);
  
  try {
          const finalDestinationPath = dst.folder + '/' + (dst.subfolder ? dst.subfolder + '/' + dst.renamedFile : dst.renamedFile);
          
          const params = {
              Bucket: srcBucket,
              CopySource: srcKey,
              Key: finalDestinationPath
          };
          
          console.log("####1. INITIALIZE UPLOAD: ", finalDestinationPath);
          
            s3.createMultipartUpload({
            Bucket: dstBucket,
            Key: srcKey,
            ContentType: 'text/csv'
          }, (err, data) => {
            console.log("##### 2. INIT MULTIPART UPLOAD");
            if (err) { console.log('## Errore: failed create multipart upload: ', err); return; }
            const file = fs.createReadStream(finalDestinationPath);
            let pi = 1;
            let partMap = [];
            let streamedLength = 0;
            let uploadedSize = 0;
            let curchunk = Buffer(0);
            
            const cmuParams = {
              Key: srcKey,
              Bucket: dstBucket,
              UploadId: data.UploadId
            };
            
            const Writable = require('stream').Writable;
            const ws = Writable();
            
            ws.oend = ws.end;
            ws.end = (chunk, encoding, callback) => {
              ws.oend(chunk, encoding, callback);
              uploadPart(cmuParams, curchunk, pi, true, (err, data) => {
                partMap.push({ ETag: data.ETag, PartNumber: pi });
                completeMultipartUpload(cmuParams, { Parts: partMap });
              });
            };
            
            ws._write = (chunk, enc, next) => {
              curchunk = Buffer.concat([curchunk, chunk]);
              streamedLength += chunk.length;
              if (curchunk.length > maxchunksize) {
                uploadPart(cmuParams, curchunk, pi, false, (err, data) => {
                  uploadedSize += data.length;
                  partMap.push({ ETag: data.ETag, PartNumber: pi });
                  pi+=1;
                  curchunk = Buffer(0);
                  next();
                });
              } else {
                next();
              }
            };
            
            file.pipe(ws);
          });
  } catch(err) {
    console.log("Result error: ", err);
    return { statusCode: 500, body: err };
    }
};
1

There are 1 best solutions below

2
On

This probably happens because you are using async lambda handler. Thus your function completes, before the body of your handler actually executes.

To fix that, you either have to modify your code to use sync handler, or use promise pattern shown in AWS docs for async handler.