I'm working on a Keycloak SPI that publishes changes to Realms and Clients in Keycloak via RabbitMQ and handles RabbitMQ RPC requests to receive Keycloak specific configuration. The first part is already functioning.
The second part is causing issues. I'm using a singleton class called RabbitMqRpcReceiver
which handles the reception of RPC requests. This singleton class is instantiated within the Factory inside the RabbitMqEventListenerProviderFactory
, in the postInit()
method.
Here are the code snippets:
@AutoService(EventListenerProviderFactory.class)
@JBossLog
public class RabbitMqEventListenerProviderFactory implements EventListenerProviderFactory {
private RabbitMqConfig cfg;
private RabbitMqRpcReceiver rpcReceiver;
@Override
public EventListenerProvider create(KeycloakSession session) {
return new RabbitMqEventListenerProvider(cfg, session);
}
@Override
public void init(Scope config) {
cfg = RabbitMqConfig.createFromScope(config);
}
@Override
public void postInit(KeycloakSessionFactory factory) {
rpcReceiver = RabbitMqRpcReceiver.getInstance(cfg, factory.create());
}
@Override
public void close() {
if (rpcReceiver != null) {
rpcReceiver.close();
}
}
@Override
public String getId() {
return "keycloak-to-rabbitmq";
}
}
@JBossLog
public class RabbitMqRpcReceiver {
private final KeycloakSession keycloakSession;
public static final String RPC_QUEUE_NAME = "rpc_queue";
private Channel rpcChannel;
private static RabbitMqRpcReceiver instance = null;
public static RabbitMqRpcReceiver getInstance(RabbitMqConfig cfg, KeycloakSession session) {
if (instance == null) {
instance = new RabbitMqRpcReceiver(cfg, session);
}
return instance;
}
private RabbitMqRpcReceiver(RabbitMqConfig cfg, KeycloakSession session) {
this.keycloakSession = session;
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(cfg.getUsername());
factory.setPassword(cfg.getPassword());
factory.setVirtualHost(cfg.getVhost());
factory.setHost(cfg.getHostUrl());
factory.setPort(cfg.getPort());
if (Boolean.TRUE.equals(cfg.getUseTls())) {
try {
factory.useSslProtocol();
} catch (Exception e) {
log.error("Could not use SSL protocol", e);
}
}
try {
rpcChannel = factory.newConnection().createChannel();
rpcChannel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
rpcChannel.basicQos(1);
log.infof(" [x] Awaiting RPC requests");
rpcChannel.basicConsume(RPC_QUEUE_NAME, false, (consumerTag, delivery) -> handleDelivery(delivery), (consumerTag -> {}));
} catch (IOException e) {
log.error("Error declaring RPC queue", e);
} catch (TimeoutException e) {
log.error("Timeout declaring RPC queue", e);
}
}
private void handleDelivery(Delivery delivery) {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.contentType("application/json")
.headers(Map.of("__TypeId__", ConfigurationResponse.class.getName()))
.contentEncoding(StandardCharsets.UTF_8.name())
.correlationId(delivery.getProperties().getCorrelationId())
.build();
ObjectMapper mapper = new ObjectMapper();
ConfigurationRequest request = null;
ConfigurationResponse response = null;
try {
request = mapper.readValue(delivery.getBody(), ConfigurationRequest.class);
log.infof(" [.] Received request for client %s", request.client());
response = getConfiguration(request.client());
} catch (RuntimeException | IOException e) {
log.error(" [.] " + e.toString());
} finally {
try {
rpcChannel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, mapper.writeValueAsBytes(response));
rpcChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (IOException e) {
log.error("Error sending response", e);
}
}
}
public ConfigurationResponse getConfiguration(String clientName) {
log.info("Getting configuration for client " + clientName);
RealmProvider realmProvider = keycloakSession.realms();
// This one succeeds
log.info("Realms: " + realmProvider);
// This one fails
log.info("Realms: " + realmProvider.getRealmsStream().toList());
return null;
}
public void close() {
try {
rpcChannel.close();
} catch (IOException | TimeoutException e) {
log.error("Error closing RPC channel", e);
}
}
}
The code works except for the invocation of realmProvider.getRealmsStream()
within the getConfiguration(String clientName)
method. This invocation results in the following error message: java.lang.IllegalStateException: Cannot access delegate without a transaction
I would be genuinely grateful for any resources which might lead me to a working RabbitMQ Queue Consumer with access to the realms and clients within keycloak.
Further Testing – 1:
I edited the method getConfiguration(String clientName)
to begin a transaction if none is active.
public ConfigurationResponse getConfiguration(String clientName) {
if (!keycloakSession.getTransactionManager().isActive()) {
keycloakSession.getTransactionManager().begin();
}
log.info("Getting configuration for client " + clientName);
RealmProvider realmProvider = keycloakSession.realms();
// This one succeeds
log.info("Realms: " + realmProvider);
// This one fails
log.info(keycloakSession.getTransactionManager().isActive());
log.info("Realms: " + realmProvider.getRealmsStream().toList());
keycloakSession.getTransactionManager().commit();
return null;
}
The first method call succeeds, as you can see in the log. Any call afterward fails:
11:40:40,155 INFO [RabbitMqRpcReceiver] (pool-8-thread-4) [.] Received request for client gateway-test-client
11:40:40,156 INFO [RabbitMqRpcReceiver] (pool-8-thread-4) Getting configuration for client gateway-test-client
11:40:40,156 INFO [RabbitMqRpcReceiver] (pool-8-thread-4) Realms: org.keycloak.models.cache.infinispan.RealmCacheSession@5de3281a
11:40:40,156 INFO [RabbitMqRpcReceiver] (pool-8-thread-4) true
11:40:40,505 INFO [RabbitMqRpcReceiver] (pool-8-thread-4) Realms: [7d0ce53d-a1fe-4769-bbdc-b4410c162a3d@72a3d4c0, c73da31a-2c74-4650-a5e8-3f514516fb5f@1cd216b1, 188fcb9d-1411-445b-9849-3e0f849c06fb@65507055, c5dbb5ef-1482-4b88-9b94-3004b62a7508@7a07c3fb]
11:40:48,014 INFO [RabbitMqRpcReceiver] (pool-8-thread-3) [.] Received request for client gateway-test-client
11:40:48,015 INFO [RabbitMqRpcReceiver] (pool-8-thread-3) Getting configuration for client gateway-test-client
11:40:48,016 INFO [RabbitMqRpcReceiver] (pool-8-thread-3) Realms: org.keycloak.models.cache.infinispan.RealmCacheSession@5de3281a
11:40:48,016 INFO [RabbitMqRpcReceiver] (pool-8-thread-3) true
11:40:48,016 ERROR [RabbitMqRpcReceiver] (pool-8-thread-3) [.] java.lang.IllegalStateException: Cannot access delegate without a transaction
Further Testing – 2 (Possible Solution):
I changed the method postInit(KeycloakSessionFactory factory)
in the RabbitMqEventListenerProviderFactory
Class as follows:
@Override
public void postInit(KeycloakSessionFactory factory) {
rpcReceiver = RabbitMqRpcReceiver.getInstance(cfg, factory);
}
Instead of handing over a Keycloak Session created by factory.create()
I pass the KeycloakSessionFactory to the RabbitMqRpcReceiver
.
This gives me the opportunity to create a new session every time the method getConfiguration
:
public ConfigurationResponse getConfiguration(String clientName) {
KeycloakSession keycloakSession = sessionFactory.create();
if (!keycloakSession.getTransactionManager().isActive()) {
keycloakSession.getTransactionManager().begin();
}
log.info("Getting configuration for client " + clientName);
RealmProvider realmProvider = keycloakSession.realms();
// This one succeeds
log.info("Realms: " + realmProvider);
// This one fails
log.info(keycloakSession.getTransactionManager().isActive());
log.info("Realms: " + realmProvider.getRealmsStream().toList());
keycloakSession.getTransactionManager().commit();
keycloakSession.close();
return null;
}
This approach seems to work as expected every call succeeds now, but I do not know if this is best practice. Until I find some better solution, I will stick to this one.