I am experimenting with rusoto. The goal is to ingest a local file called "exchange_rates.csv" to S3 and then from S3 to AWS Redshift. Below is the code I have tried:
extern crate rusoto_core;
extern crate rusoto_redshift;
extern crate rusoto_s3;
extern crate rusoto_credential;
extern crate config;
use rusoto_core::Region;
use rusoto_redshift::{Redshift, RedshiftClient};
use rusoto_redshift::CopyCommand;
use rusoto_s3::S3;
use rusoto_s3::{S3Client, PutObjectRequest};
use rusoto_credential::DefaultCredentialsProvider;
use std::fs::File;
use std::io::Read;
use config::Config;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Load the configuration file
let mut settings = Config::default();
settings.merge(config::File::with_name("config.toml"))?;
// Read the AWS S3 bucket and key from the configuration file
let s3_bucket = settings.get_str("aws.s3_bucket")?;
let s3_key = settings.get_str("aws.s3_key")?;
// Read the Redshift cluster details from the configuration file
let redshift_username = settings.get_str("redshift.username")?;
let redshift_password = settings.get_str("redshift.password")?;
let redshift_host = settings.get_str("redshift.host")?;
let redshift_port = settings.get_str("redshift.port")?;
// Read the exchange rates CSV file
let mut csv_file = File::open("exchange_rates.csv")?;
let mut csv_data = Vec::new();
csv_file.read_to_end(&mut csv_data)?;
// Create an S3 client with AWS credentials
let credentials_provider = DefaultCredentialsProvider::new()?;
let s3_client = S3Client::new_with(
rusoto_core::HttpClient::new().expect("Failed to create HTTP client"),
credentials_provider,
Region::default(),
);
// Upload the CSV file to S3
let s3_put_request = PutObjectRequest {
bucket: s3_bucket.clone(),
key: s3_key.clone(),
body: Some(csv_data.into()),
..Default::default()
};
let response = s3_client.put_object(s3_put_request).await?;
println!("S3 upload successful. ETag: {:?}", response.e_tag);
// Create a Redshift client
let client = RedshiftClient::new(Region::default());
// Prepare the COPY command to load data from S3 to Redshift
let copy_command = format!(
"COPY bi_etl.erbe__rust FROM 's3://{}/{}' CREDENTIALS 'aws_access_key_id={}\
;aws_secret_access_key={}' DELIMITER ',' CSV;",
s3_bucket, s3_key, redshift_username, redshift_password
);
// Prepare the ExecuteStatementMessage
let redshift_cluster_identifier = settings.get_str("redshift.cluster_identifier")?;
let execute_statement = ExecuteStatementMessage {
cluster_identifier: redshift_cluster_identifier.to_owned(),
database: Some("dwh".to_owned()),
sql: Some(copy_command.to_owned()),
..Default::default()
};
// Execute the statement
match client.execute_statement(execute_statement).await {
Ok(output) => {
// Process the response
println!("Statement executed successfully: {:?}", output);
}
Err(error) => {
// Handle the error
println!("Error executing statement: {:?}", error);
}
}
Ok(())
}
The code above returns 3 Errors:
error[E0432]: unresolved import `rusoto_redshift::CopyCommand`
--> src/s3_to_redshift.rs:9:5
|
9 | use rusoto_redshift::CopyCommand;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ no `CopyCommand` in the root
error[E0422]: cannot find struct, variant or union type `ExecuteStatementMessage` in this scope
--> src/s3_to_redshift.rs:68:29
|
68 | let execute_statement = ExecuteStatementMessage {
| ^^^^^^^^^^^^^^^^^^^^^^^ not found in this scope
warning: unused import: `Redshift`
--> src/s3_to_redshift.rs:8:23
|
8 | use rusoto_redshift::{Redshift, RedshiftClient};
| ^^^^^^^^
|
= note: `#[warn(unused_imports)]` on by default
error[E0599]: no method named `execute_statement` found for struct `RedshiftClient` in the current scope
--> src/s3_to_redshift.rs:76:18
|
76 | match client.execute_statement(execute_statement).await {
| ^^^^^^^^^^^^^^^^^ method not found in `RedshiftClient`