After setting up Seatunnel on my system, I'm currently attempting to perform CDC (Change Data Capture) from HBase to HBase using Seatunnel. However, when running the job, I encountered the following error.
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.ExceptionInInitializerError
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:3252)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:3230)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:76)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:3230)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:255)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.createConnection(PhoenixEmbeddedDriver.java:144)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:221)
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:123)
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat.openInputFormat(JdbcInputFormat.java:90)
at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader.open(JdbcSourceReader.java:47)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.open(SourceFlowLifeCycle.java:115)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:146)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:89)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:526)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.metrics2.MetricsException: Metrics source phoenix already exists!
at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.newSourceName(DefaultMetricsSystem.java:152)
at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.sourceName(DefaultMetricsSystem.java:125)
at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.register(MetricsSystemImpl.java:229)
at org.apache.phoenix.monitoring.GlobalMetricRegistriesAdapter$HBaseMetrics2HadoopMetricsAdapter.registerToDefaultMetricsSystem(GlobalMetricRegistriesAdapter.java:92)
at org.apache.phoenix.monitoring.GlobalMetricRegistriesAdapter$HBaseMetrics2HadoopMetricsAdapter.access$100(GlobalMetricRegistriesAdapter.java:77)
at org.apache.phoenix.monitoring.GlobalMetricRegistriesAdapter.registerMetricRegistry(GlobalMetricRegistriesAdapter.java:71)
at org.apache.phoenix.monitoring.GlobalClientMetrics.<clinit>(GlobalClientMetrics.java:131)
... 19 more
at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:119)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:174)
... 2 more
Here I mentioned my conf file details
env{
#parallelism = 3
job.mode = "STREAMING"
job.name="test_hbase_source_to_hbase"
}
source {
Jdbc {
driver = org.apache.phoenix.jdbc.PhoenixDriver
url = "jdbc:phoenix:localhost:/hbase-unsecure"
query = "select ROWID, MSISDN, ID, STATUS_FIELD from DEFAULT.TEST_MERGE_SPLIT"
}
}
sink{
Jdbc {
driver = org.apache.phoenix.jdbc.PhoenixDriver
url = "jdbc:phoenix:instance-2:/hbase-unsecure"
query = "upsert into DEFAULT.TEST_MERGE_SPLIT(ROWID, MSISDN, ID, STATUS_FIELD) values(?, ?,?,?)"
}
}