CDC change data capture in GCP using Datastream, Postgres and Cloud storage

604 Views Asked by At

I want to setup CDC from postgres to cloud storage using Datastream through Terraform. I am referring to Terraform Docs. But the example given in this doc doesnt work out of box. Following is what I have built based on docs:

provider "google" {
  project = "project1"
}
data "google_project" "project" {
}

resource "google_sql_database_instance" "instance" {
  name             = "cdc-test-2"
  database_version = "POSTGRES_14"
  region           = "us-central1"
  settings {
    tier = "db-f1-micro"
    backup_configuration {
      enabled = true
    }

    ip_configuration {

      // Datastream IPs will vary by region.
      authorized_networks {
        value = "34.71.242.81"
      }

      authorized_networks {
        value = "34.72.28.29"
      }

      authorized_networks {
        value = "34.67.6.157"
      }

      authorized_networks {
        value = "34.67.234.134"
      }

      authorized_networks {
        value = "34.72.239.218"
      }
    }
  }

  deletion_protection = true
  #  database_flags      = [
  #    {
  #      name  = "logical_decoding"
  #      value = "on"
  #    }
  #  ]

}

resource "google_sql_database" "db" {
  instance = google_sql_database_instance.instance.name
  name     = "cdc-database"
}

#resource "random_password" "pwd" {
#  length  = 16
#  special = false
#}

resource "google_sql_user" "user" {
  name     = "user"
  instance = google_sql_database_instance.instance.name
  password = "plsimxbchr44&^%$#usheb"# random_password.pwd.result
}

resource "google_datastream_connection_profile" "source_connection_profile" {
  display_name          = "Source connection profile"
  location              = "us-central1"
  connection_profile_id = "source-profile"
  postgresql_profile {
    hostname = google_sql_database_instance.instance.public_ip_address
    username = google_sql_user.user.name
    password = google_sql_user.user.password
    database = google_sql_database.db.name
  }
}

resource "google_storage_bucket" "bucket" {
  name                        = "cdc-test-2"
  location                    = "US"
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_iam_member" "viewer" {
  bucket = google_storage_bucket.bucket.name
  role   = "roles/storage.objectViewer"
  member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-datastream.iam.gserviceaccount.com"
}

resource "google_storage_bucket_iam_member" "creator" {
  bucket = google_storage_bucket.bucket.name
  role   = "roles/storage.objectCreator"
  member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-datastream.iam.gserviceaccount.com"
}

resource "google_storage_bucket_iam_member" "reader" {
  bucket = google_storage_bucket.bucket.name
  role   = "roles/storage.legacyBucketReader"
  member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-datastream.iam.gserviceaccount.com"
}

resource "google_kms_key_ring" "keyring" {
  name     = "keyring-example"
  location = "us-central1"
}

# TODO can we instead use default key?
resource "google_kms_crypto_key" "example-key" {
  name            = "crypto-key-example"
  key_ring        = google_kms_key_ring.keyring.id
  rotation_period = "100000s"

  lifecycle {
    prevent_destroy = true
  }
}

resource "google_kms_crypto_key_iam_member" "key_user" {
  crypto_key_id = google_kms_crypto_key.example-key.id # "kms-name"
  role          = "roles/cloudkms.cryptoKeyEncrypterDecrypter"
  member        = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-datastream.iam.gserviceaccount.com"
}

resource "google_datastream_connection_profile" "destination_connection_profile" {
  display_name          = "Connection profile"
  location              = "us-central1"
  connection_profile_id = "destination-profile"

  gcs_profile {
    bucket    = google_storage_bucket.bucket.name
    root_path = "/path"
  }
}
#resource "postgresql_replication_slot" "replication_slot" {
#  name  = "replication_slot"
#  plugin = "pgoutput"
#}

resource "google_datastream_stream" "default" {
  depends_on = [
    google_kms_crypto_key_iam_member.key_user
  ]
  stream_id     = "my-stream"
  desired_state = "NOT_STARTED"
  location      = "us-central1"
  display_name  = "my stream"
  labels        = {
    key = "value"
  }


  source_config {
    source_connection_profile = google_datastream_connection_profile.source_connection_profile.id
    postgresql_source_config {
      publication      = "publication"
      replication_slot = "replication_slot"
      include_objects {
        postgresql_schemas {
          schema = "public"

        }
      }
    }
  }
  destination_config {
    destination_connection_profile = google_datastream_connection_profile.destination_connection_profile.id
    gcs_destination_config {
      path                   = "mydata"
      file_rotation_mb       = 200
      file_rotation_interval = "60s"
      json_file_format {
        schema_file_format = "NO_SCHEMA_FILE"
        compression        = "GZIP"

      }
    }

  }

  backfill_all {
  }

  customer_managed_encryption_key = google_kms_crypto_key.example-key.id
}

I get error:

Error: Error waiting to create Stream: Error waiting for Creating Stream: {"@type":"type.googleapis.com/google.rpc.ErrorInfo","domain":"datastream.googleapis.com","metadata":{"message":"Some validations failed to complete successfully, see the full list in the operation metadata.","originalMessage":"","time":"2023-06-29T18:41:27Z","uuid":"8eab09cb-16ac-11ee-89c9-16cd70881238"},"reason":"VALIDATION_FAILURE"}
│ {"code":"POSTGRES_VALIDATE_REPLICATION_SLOT","description":"Validates that the replication slot exists and not lost.","message":[{"code":"POSTGRES_REPLICATION_SLOT_DOES_NOT_EXIST","level":"ERROR","message":"Datastream failed to read from the PostgreSQL replication slot. Make sure that the slot exists and that Datastream has the necessary permissions to access it.","metadata":{"slot_name":"replication_slot"}}],"state":"FAILED"}
│ {"code":"POSTGRES_VALIDATE_LOGICAL_DECODING","description":"Validates that logical decoding is properly configured on the database.","message":[{"code":"POSTGRES_BAD_WAL_CONFIG","level":"ERROR","message":"The write ahead log (wal_level) must be configured to 'logical'."}],"state":"FAILED"}
│ {"code":"POSTGRES_VALIDATE_PUBLICATION","description":"Validates that the publication exists and configured for the required tables.","message":[{"code":"POSTGRES_PUBLICATION_DOES_NOT_EXIST","level":"ERROR","message":"Datastream failed to find the publication. Make sure that the publication exists and that Datastream has the necessary permissions to access it.","metadata":{"publication_name":"publication"}}],"state":"FAILED"}
1

There are 1 best solutions below

0
On

According to google documentation (https://cloud.google.com/datastream/docs/configure-your-source-postgresql-database#csqlforpostgresql) you need to run a few commands beforehand to be able to deploy streams.

  1. Replication
ALTER USER USER_NAME WITH REPLICATION;
  1. Publication
CREATE PUBLICATION PUBLICATION_NAME FOR ALL TABLES;

or more specific

CREATE PUBLICATION PUBLICATION_NAME
   FOR TABLE SCHEMA1.TABLE1, SCHEMA2.TABLE2;
  1. Replication slot
SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('REPLICATION_SLOT_NAME', 'pgoutput');

If you'd like to have it automated you could use postgres provider like you've tried at the commented out code cyrilgdn/terraform-provider-postgresql.

Remember to put publication and replication slot at the source database, not the postgres one.