Micronaut AWS Lambda and MSK Event Trigger

72 Views Asked by At

Currently I am trying to write a micronaut Lambda Function which will listen to AWS MSK as Event Trigger. It will Listen to this Topic and POST the Message To Keyspace. Also need to use GraalVM to optimize cold startup time.

This is my Event Handler

@Serdeable
public class SecuredRequestHandler extends MicronautRequestHandler<KafkaEvent, Void> {

    private static final Logger LOG = LoggerFactory.getLogger(AuditRequestHandler.class);

    @Inject
    ObjectMapper objectMapper;

    @Inject
    CassandraDataAccessObject cassandraDataAccessObject;

    @Override
    public Void execute(KafkaEvent input) {
        LOG.info("Lambda Invoked To Process MSK Event");
        // Process the Kafka records from MSK
        final Map<String, List<KafkaEventRecord>> topicRecordMapping = input.getRecords();
        final List<KafkaEventRecord> records =
                topicRecordMapping.values().stream().flatMap(java.util.Collection::stream).collect(Collectors.toList());
        for (KafkaEventRecord record : records) {
            String key = record.getKey();
            String value = record.getValue();
            try {
                LOG.info("Event Key For The Message" + key);
                SecureActionEvent event= objectMapper.readValue(value, UserActionEvent.class);
                cassandraDataAccessObject.insertDataIntoKeyspace(event);
            } catch (Exception exception) {
                LOG.error("Failure in Processing JSON Message From Kafka Topic In Lambda with Error" + exception.getMessage());
            }
        }
        return null;
    }
}

This is My Keyspace Code

@Singleton
public class CassandraDataAccessObject {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraDataAccessObject.class);


    @Inject
    private  CqlSession cqlSession;

    public void insertDataIntoKeyspace(Event event) {
        LOG.info("Executing Query To Insert Data Into Cassandra Keyspace For UID {}",userActionEvent.getUid());
        String insertQuery = "INSERT INTO event(id,uid, result_url,response_time_ms) VALUES (?,?, ?, ?)";
        cqlSession.execute(insertQuery, UUID.randomUUID().toString(),event.getUid(), event.getRequestUrl(),event.getResponseTimeInMs());
    }
}

But I am not able to Deploy and Run this Lambda. One Issue is that It is not taking up the port mentioned in application.yml. It is hitting localhost:9042 2) Issue with Invocation In Lambda. It is Not Getting Triggered

0

There are 0 best solutions below