Upload to S3 and from S3 insert table to Redshift

50 Views Asked by At

I am experimenting with rusoto. The goal is to ingest a local file called "exchange_rates.csv" to S3 and then from S3 to AWS Redshift. Below is the code I have tried:

extern crate rusoto_core;
extern crate rusoto_redshift;
extern crate rusoto_s3;
extern crate rusoto_credential;
extern crate config;

use rusoto_core::Region;
use rusoto_redshift::{Redshift, RedshiftClient};
use rusoto_redshift::CopyCommand;
use rusoto_s3::S3;
use rusoto_s3::{S3Client, PutObjectRequest};
use rusoto_credential::DefaultCredentialsProvider;
use std::fs::File;
use std::io::Read;
use config::Config;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Load the configuration file
    let mut settings = Config::default();
    settings.merge(config::File::with_name("config.toml"))?;

    // Read the AWS S3 bucket and key from the configuration file
    let s3_bucket = settings.get_str("aws.s3_bucket")?;
    let s3_key = settings.get_str("aws.s3_key")?;

    // Read the Redshift cluster details from the configuration file
    let redshift_username = settings.get_str("redshift.username")?;
    let redshift_password = settings.get_str("redshift.password")?;
    let redshift_host = settings.get_str("redshift.host")?;
    let redshift_port = settings.get_str("redshift.port")?;

    // Read the exchange rates CSV file
    let mut csv_file = File::open("exchange_rates.csv")?;
    let mut csv_data = Vec::new();
    csv_file.read_to_end(&mut csv_data)?;

    // Create an S3 client with AWS credentials
    let credentials_provider = DefaultCredentialsProvider::new()?;
    let s3_client = S3Client::new_with(
        rusoto_core::HttpClient::new().expect("Failed to create HTTP client"),
        credentials_provider,
        Region::default(),
    );

    // Upload the CSV file to S3
    let s3_put_request = PutObjectRequest {
        bucket: s3_bucket.clone(),
        key: s3_key.clone(),
        body: Some(csv_data.into()),
        ..Default::default()
    };
    let response = s3_client.put_object(s3_put_request).await?;
    println!("S3 upload successful. ETag: {:?}", response.e_tag);

    // Create a Redshift client
    let client = RedshiftClient::new(Region::default());

    // Prepare the COPY command to load data from S3 to Redshift
    let copy_command = format!(
        "COPY bi_etl.erbe__rust FROM 's3://{}/{}' CREDENTIALS 'aws_access_key_id={}\
        ;aws_secret_access_key={}' DELIMITER ',' CSV;",
        s3_bucket, s3_key, redshift_username, redshift_password
    );

    // Prepare the ExecuteStatementMessage
    let redshift_cluster_identifier = settings.get_str("redshift.cluster_identifier")?;
    let execute_statement = ExecuteStatementMessage {
        cluster_identifier: redshift_cluster_identifier.to_owned(),
        database: Some("dwh".to_owned()),
        sql: Some(copy_command.to_owned()),
        ..Default::default()
    };

    // Execute the statement
    match client.execute_statement(execute_statement).await {
        Ok(output) => {
            // Process the response
            println!("Statement executed successfully: {:?}", output);
        }
        Err(error) => {
            // Handle the error
            println!("Error executing statement: {:?}", error);
        }
    }

    Ok(())
}

The code above returns 3 Errors:

error[E0432]: unresolved import `rusoto_redshift::CopyCommand`
 --> src/s3_to_redshift.rs:9:5
  |
9 | use rusoto_redshift::CopyCommand;
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ no `CopyCommand` in the root

error[E0422]: cannot find struct, variant or union type `ExecuteStatementMessage` in this scope
  --> src/s3_to_redshift.rs:68:29
   |
68 |     let execute_statement = ExecuteStatementMessage {
   |                             ^^^^^^^^^^^^^^^^^^^^^^^ not found in this scope

warning: unused import: `Redshift`
 --> src/s3_to_redshift.rs:8:23
  |
8 | use rusoto_redshift::{Redshift, RedshiftClient};
  |                       ^^^^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

error[E0599]: no method named `execute_statement` found for struct `RedshiftClient` in the current scope
  --> src/s3_to_redshift.rs:76:18
   |
76 |     match client.execute_statement(execute_statement).await {
   |                  ^^^^^^^^^^^^^^^^^ method not found in `RedshiftClient`
0

There are 0 best solutions below