How to move files from one S3 bucket directory to another directory in same bucket? Scala/Java

2.4k Views Asked by At

I want to move all files under a directory in my s3 bucket to another directory within the same bucket, using scala.

Here is what I have:

def copyFromInputFilesToArchive(spark: SparkSession) : Unit = {
    val sourcePath = new Path("s3a://path-to-source-directory/")
    val destPath = new Path("s3a:/path-to-destination-directory/")
    val fs = sourcePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
    fs.moveFromLocalFile(sourcePath,destPath)
  }

I get this error:

fs.copyFromLocalFile returns Wrong FS: s3a:// expected file:///

2

There are 2 best solutions below

0
On

Error explained

The error you are seeing is because the copyFromLocalFile method is really for moving files from a local filesystem to S3. You are trying to "move" files that are already both in S3.

It is important to note that directories don't really exist in Amazon S3 buckets - The folder/file hierarchy you see is really just key-value metadata attached to the file. All file objects are really sitting in the same big, single level container and that filename key is there to give the illusion of files/folders.

To "move" files in a bucket, what you really need to do is update the filename key with the new path which is really just editing object metadata.

How to do a "move" within a bucket with Scala

To accomplish this, you'd need to copy the original object, assign the new metadata to the copy, and then write it back to S3. In practice, you can copy it and save it to the same object which will overwrite the old version, which acts a lot like an update.

Try something like this (from datahackr):

/**
   * Copy object from a key to another in multiparts
   *
   * @param sourceS3Path S3 object key
   * @param targetS3Path S3 object key
   * @param fromBucketName bucket name
   * @param toBucketName bucket name
   */
  @throws(classOf[Exception])
  @throws(classOf[AmazonServiceException])
  def copyMultipart(sourceS3Path: String, targetS3Path: String, fromS3BucketName: String, toS3BucketName: String) {

    // Create a list of ETag objects. You retrieve ETags for each object part uploaded,
    // then, after each individual part has been uploaded, pass the list of ETags to
    // the request to complete the upload.
    var partETags = new ArrayList[PartETag]();

    // Initiate the multipart upload.
    val initRequest = new InitiateMultipartUploadRequest(toS3BucketName, targetS3Path);
    val initResponse = s3client.initiateMultipartUpload(initRequest);

    // Get the object size to track the end of the copy operation.
    var metadataResult = getS3ObjectMetadata(sourceS3Path, fromS3BucketName);
    var objectSize = metadataResult.getContentLength();

    // Copy the object using 50 MB parts.
    val partSize = (50 * 1024 * 1024) * 1L;
    var bytePosition = 0L;
    var partNum = 1;
    var copyResponses = new ArrayList[CopyPartResult]();
    while (bytePosition < objectSize) {
      // The last part might be smaller than partSize, so check to make sure
      // that lastByte isn't beyond the end of the object.
      val lastByte = Math.min(bytePosition + partSize - 1, objectSize - 1);

      // Copy this part.
      val copyRequest = new CopyPartRequest()
        .withSourceBucketName(fromS3BucketName)
        .withSourceKey(sourceS3Path)
        .withDestinationBucketName(toS3BucketName)
        .withDestinationKey(targetS3Path)
        .withUploadId(initResponse.getUploadId())
        .withFirstByte(bytePosition)
        .withLastByte(lastByte)
        .withPartNumber(partNum + 1);
      partNum += 1;
      copyResponses.add(s3client.copyPart(copyRequest));
      bytePosition += partSize;
    }

    // Complete the upload request to concatenate all uploaded parts and make the copied object available.
    val completeRequest = new CompleteMultipartUploadRequest(
      toS3BucketName,
      targetS3Path,
      initResponse.getUploadId(),
      getETags(copyResponses));
    s3client.completeMultipartUpload(completeRequest);
    logger.info("Multipart upload complete.");

  }

  // This is a helper function to construct a list of ETags.
  def getETags(responses: java.util.List[CopyPartResult]): ArrayList[PartETag] = {
    var etags = new ArrayList[PartETag]();
    val it = responses.iterator();
    while (it.hasNext()) {
      val response = it.next();
      etags.add(new PartETag(response.getPartNumber(), response.getETag()));
    }
    return etags;
  }

 def moveObject(sourceS3Path: String, targetS3Path: String, fromBucketName: String, toBucketName: String) {

    logger.info(s"Moving S3 frile from $sourceS3Path ==> $targetS3Path")
    // Get the object size to track the end of the copy operation.
    var metadataResult = getS3ObjectMetadata(sourceS3Path, fromBucketName);
    var objectSize = metadataResult.getContentLength();

    if (objectSize > ALLOWED_OBJECT_SIZE) {
      logger.info("Object size is greater than 1GB. Initiating multipart upload.");
      copyMultipart(sourceS3Path, targetS3Path, fromBucketName, toBucketName);
    } else {
      s3client.copyObject(fromBucketName, sourceS3Path, toBucketName, targetS3Path);
    }
    // Delete source object after successful copy
    s3client.deleteObject(fromS3BucketName, sourceS3Path);
  }
0
On

You will need the AWS Sdk for this.

If you are using AWS Sdk Version 1,

projectDependencies ++= Seq(
  "com.amazonaws" % "aws-java-sdk-s3" % "1.12.248"
)
import com.amazonaws.services.s3.transfer.{ Copy, TransferManager, TransferManagerBuilder }

val transferManager: TransferManager =
  TransferManagerBuilder.standard().build()

def copyFile(): Unit = {
  val copy: Copy = 
    transferManager.copy(
      "source-bucket-name", "source-file-key",
      "destination-bucket-name", "destination-file-key"
    )

  copy.waitForCompletion()
}

If you are using AWS Sdk Version 2

projectDependencies ++= Seq(
  "software.amazon.awssdk" % "s3" % "2.17.219",
  "software.amazon.awssdk" % "s3-transfer-manager" % "2.17.219-PREVIEW"
)
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model.CopyObjectRequest
import software.amazon.awssdk.transfer.s3.{Copy, CopyRequest, S3ClientConfiguration, S3TransferManager}

// change Region.US_WEST_2 to your required region
// or it might even work without the whole `.region(Region.US_WEST_2)` thing
val s3ClientConfig: S3ClientConfiguration =
    S3ClientConfiguration
      .builder()
      .region(Region.US_WEST_2)
      .build()

val s3TransferManager: S3TransferManager =
   S3TransferManager.builder().s3ClientConfiguration(s3ClientConfig).build()

def copyFile(): Unit = {
  val copyObjectRequest: CopyObjectRequest =
    CopyObjectRequest
      .builder()
      .sourceBucket("source-bucket-name")
      .sourceKey("source-file-key")
      .destinationBucket("destination-bucket-name")
      .destinationKey("destination-file-key")
      .build()

  val copyRequest: CopyRequest =
    CopyRequest
      .builder()
      .copyObjectRequest(copyObjectRequest)
      .build()

  val copy: Copy =
    s3TransferManager.copy(copyRequest)

  copy.completionFuture().get()
}

Keep in mind that you will need the AWS credentials with appropriate permissions for both the source and destination object. For this, You just need to get the credentials and make them available as following environment variables.

export AWS_ACCESS_KEY_ID=your_access_key_id
export AWS_SECRET_ACCESS_KEY=your_secret_access_key
export AWS_SESSION_TOKEN=your_session_token

Also, "source-file-key" and "destination-file-key" should be the full path of the file in the bucket.