I am using the Play Framework 2.8.18 and I am trying to use akka's Durable State Feature. This is my implementation so far:
public class CompanyActorTest extends DurableStateBehavior<CompanyActorTest.CompanyCommand, CompanyActorTest.CompanyState> {
public interface CompanyCommand {
}
public static class GetCompany implements CompanyCommand {
public final ActorRef<Object> replyTo;
public final String userEmail;
public GetCompany(ActorRef<Object> replyTo, String userEmail) {
this.replyTo = replyTo;
this.userEmail = userEmail;
}
}
public static class CompanyState {
private final Set<Company> companies;
public CompanyState(Set<Company> companies) {
this.companies = companies;
}
public Set<Company> getCompanies() {
return companies;
}
}
public static class AddCompany implements CompanyCommand {
public final Company company;
public AddCompany(Company company) {
this.company = company;
}
}
private final ActorContext<CompanyCommand> ctx;
public static Behavior<CompanyCommand> create(PersistenceId persistenceId) {
return Behaviors.setup(ctx -> new CompanyActorTest(persistenceId, ctx));
}
private CompanyActorTest(PersistenceId persistenceId, ActorContext<CompanyCommand> ctx) {
super(persistenceId);
this.ctx = ctx;
}
@Override
public CompanyState emptyState() {
return new CompanyState(Collections.emptySet());
}
@Override
public CommandHandler<CompanyCommand, CompanyState> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(AddCompany.class, (state, command) -> {
state.companies.add(command.company);
return Effect().persist(new CompanyState(state.companies));
})
.onCommand(
GetCompany.class, (state, command) -> {
ActorRef<GetCompanyActor.GetCompany> ref = this.ctx.getSystem().systemActorOf(GetCompanyActor.create(), "Get-Company-Actor", Props.empty());
ref.tell(new GetCompanyActor.GetCompany(command.replyTo, command.userEmail));
return Effect().none();
})
.build();
}
}
Since I can't use my MySQL Database (that's what I think at least because I didn't find any docu for it) I am now using Postgres.
In my application.conf file I set
akka.persistence.state.plugin = "akka.persistence.r2dbc.state"
akka.persistence.r2dbc {
# postgres or yugabyte
dialect = "postgres"
# set this to your database schema if applicable, empty by default
schema = ""
connection-factory {
driver = "postgres"
# the connection can be configured with a url, eg: "r2dbc:postgresql://<host>:5432/<database>"
url = ""
# The connection options to be used. Ignored if 'url' is non-empty
host = "localhost"
port = 5432
database = "postgres"
user = "postgres"
password = "postgres"
ssl {
enabled = off
# See PostgresqlConnectionFactoryProvider.SSL_MODE
# Possible values:
# allow - encryption if the server insists on it
# prefer - encryption if the server supports it
# require - encryption enabled and required, but trust network to connect to the right server
# verify-ca - encryption enabled and required, and verify server certificate
# verify-full - encryption enabled and required, and verify server certificate and hostname
# tunnel - use a SSL tunnel instead of following Postgres SSL handshake protocol
mode = ""
# Server root certificate. Can point to either a resource within the classpath or a file.
root-cert = ""
# Client certificate. Can point to either a resource within the classpath or a file.
cert = ""
# Key for client certificate. Can point to either a resource within the classpath or a file.
key = ""
# Password for client key.
password = ""
}
# Initial pool size.
initial-size = 5
# Maximum pool size.
max-size = 20
# Maximum time to create a new connection.
connect-timeout = 3 seconds
# Maximum time to acquire connection from pool.
acquire-timeout = 5 seconds
# Number of retries if the connection acquisition attempt fails.
# In the case the database server was restarted all connections in the pool will
# be invalid. To recover from that without failed acquire you can use the same number
# of retries as max-size of the pool
acquire-retry = 1
# Maximum idle time of the connection in the pool.
# Background eviction interval of idle connections is derived from this property
# and max-life-time.
max-idle-time = 30 minutes
# Maximum lifetime of the connection in the pool.
# Background eviction interval of connections is derived from this property
# and max-idle-time.
max-life-time = 60 minutes
# Configures the statement cache size.
# 0 means no cache, negative values will select an unbounded cache
# a positive value will configure a bounded cache with the passed size.
statement-cache-size = 5000
# Validate the connection when acquired with this SQL.
# Enabling this has some performance overhead.
# A fast query for Postgres is "SELECT 1"
validation-query = ""
}
# If database timestamp is guaranteed to not move backwards for two subsequent
# updates of the same persistenceId there might be a performance gain to
# set this to `on`. Note that many databases use the system clock and that can
# move backwards when the system clock is adjusted.
db-timestamp-monotonic-increasing = off
# Enable this for testing or workaround of https://github.com/yugabyte/yugabyte-db/issues/10995
# FIXME: This property will be removed when the Yugabyte issue has been resolved.
use-app-timestamp = off
# Logs database calls that take longer than this duration at INFO level.
# Set to "off" to disable this logging.
# Set to 0 to log all calls.
log-db-calls-exceeding = 300 ms
}
I am running the database in my docker. I also ran the following script:
CREATE TABLE IF NOT EXISTS public.event_journal(
ordering BIGSERIAL,
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
deleted BOOLEAN DEFAULT FALSE NOT NULL,
writer VARCHAR(255) NOT NULL,
write_timestamp BIGINT,
adapter_manifest VARCHAR(255),
event_ser_id INTEGER NOT NULL,
event_ser_manifest VARCHAR(255) NOT NULL,
event_payload BYTEA NOT NULL,
meta_ser_id INTEGER,
meta_ser_manifest VARCHAR(255),
meta_payload BYTEA,
PRIMARY KEY(persistence_id, sequence_number)
);
CREATE UNIQUE INDEX event_journal_ordering_idx ON public.event_journal(ordering);
CREATE TABLE IF NOT EXISTS public.event_tag(
event_id BIGINT,
tag VARCHAR(256),
PRIMARY KEY(event_id, tag),
CONSTRAINT fk_event_journal
FOREIGN KEY(event_id)
REFERENCES event_journal(ordering)
ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS public.snapshot (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
created BIGINT NOT NULL,
snapshot_ser_id INTEGER NOT NULL,
snapshot_ser_manifest VARCHAR(255) NOT NULL,
snapshot_payload BYTEA NOT NULL,
meta_ser_id INTEGER,
meta_ser_manifest VARCHAR(255),
meta_payload BYTEA,
PRIMARY KEY(persistence_id, sequence_number)
);
CREATE TABLE IF NOT EXISTS public.durable_state (
global_offset BIGSERIAL,
persistence_id VARCHAR(255) NOT NULL,
revision BIGINT NOT NULL,
state_payload BYTEA NOT NULL,
state_serial_id INTEGER NOT NULL,
state_serial_manifest VARCHAR(255),
tag VARCHAR,
state_timestamp BIGINT NOT NULL,
PRIMARY KEY(persistence_id)
);
CREATE INDEX CONCURRENTLY state_tag_idx on public.durable_state (tag);
CREATE INDEX CONCURRENTLY state_global_offset_idx on public.durable_state (global_offset);
Nevertheless, when trying to create that actor I get the following error:
[error] c.k.a.c.CompanyActorTest - Supervisor StopSupervisor saw failure: Exception during recovery. PersistenceId [Company]. Relation »durable_state« existiert nicht
akka.persistence.typed.state.internal.DurableStateStoreException: Exception during recovery. PersistenceId [Company]. Relation »durable_state« existiert nicht
at akka.persistence.typed.state.internal.Recovering.onRecoveryFailure(Recovering.scala:125)
at akka.persistence.typed.state.internal.Recovering.onGetFailure(Recovering.scala:188)
at akka.persistence.typed.state.internal.Recovering.onMessage(Recovering.scala:78)
at akka.persistence.typed.state.internal.Recovering.onMessage(Recovering.scala:61)
at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:84)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:281)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:237)
at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
at akka.persistence.typed.state.internal.DurableStateBehaviorImpl$$anon$1.aroundReceive(DurableStateBehaviorImpl.scala:125)
at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlBadGrammarException: Relation »durable_state« existiert nicht
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:96)
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:65)
at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:132)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:176)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
I am also unsure about these dependencies, I tried increasing and decreasing versions, which resulted in dependency errors:
libraryDependencies += "com.typesafe.akka" %% "akka-serialization-jackson" % "2.8.0"
libraryDependencies += "com.lightbend.akka" %% "akka-persistence-r2dbc" % "1.0.0"
libraryDependencies += "com.github.dnvriend" %% "akka-persistence-jdbc" % "3.5.3"
I tried changing the config and the three dependencies I included. Could it be that I need to add another column in the config? By the way: I am not using Event Sourcing, I would only like to use the durable state feature.
Thank you for your help!
Kind regards,
Alex