Seatunnel with Phoenix

54 Views Asked by At

After setting up Seatunnel on my system, I'm currently attempting to perform CDC (Change Data Capture) from HBase to HBase using Seatunnel. However, when running the job, I encountered the following error.

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.ExceptionInInitializerError
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:3252)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:3230)
        at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76)
        at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:3230)
        at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:255)
        at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:144)
        at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:123)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat.openInputFormat(JdbcInputFormat.java:90)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader.open(JdbcSourceReader.java:47)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.open(SourceFlowLifeCycle.java:115)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:146)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:89)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:526)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.metrics2.MetricsException: Metrics source phoenix already exists!
        at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.newSourceName(DefaultMetricsSystem.java:152)
        at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.sourceName(DefaultMetricsSystem.java:125)
        at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.register(MetricsSystemImpl.java:229)
        at org.apache.phoenix.monitoring.GlobalMetricRegistriesAdapter$HBaseMetrics2HadoopMetricsAdapter.registerToDefaultMetricsSystem(GlobalMetricRegistriesAdapter.java:92)
        at org.apache.phoenix.monitoring.GlobalMetricRegistriesAdapter$HBaseMetrics2HadoopMetricsAdapter.access$100(GlobalMetricRegistriesAdapter.java:77)
        at org.apache.phoenix.monitoring.GlobalMetricRegistriesAdapter.registerMetricRegistry(GlobalMetricRegistriesAdapter.java:71)
        at org.apache.phoenix.monitoring.GlobalClientMetrics.<clinit>(GlobalClientMetrics.java:131)
        ... 19 more

        at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:119)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:174)
        ... 2 more

Here I mentioned my conf file details

env{
  #parallelism = 3
  job.mode = "STREAMING"
  job.name="test_hbase_source_to_hbase"
}

source {
    Jdbc {
        driver = org.apache.phoenix.jdbc.PhoenixDriver
        url = "jdbc:phoenix:localhost:/hbase-unsecure"
        query = "select ROWID, MSISDN, ID, STATUS_FIELD from DEFAULT.TEST_MERGE_SPLIT"
    }
}

sink{
    Jdbc {
        driver = org.apache.phoenix.jdbc.PhoenixDriver
        url = "jdbc:phoenix:instance-2:/hbase-unsecure"
        query = "upsert into DEFAULT.TEST_MERGE_SPLIT(ROWID, MSISDN, ID, STATUS_FIELD) values(?, ?,?,?)"
    }
}
0

There are 0 best solutions below