Reusing primitive data types in Saga context

110 Views Asked by At

Since i have alot of logic in the saga class introduced a couple of services each having to deal with one part of the logic, Everything is working as expected besides the orderTotalPrice variable in the ProductReservationService. When i go through the code with the debugger the order price is getting incremented in the handle ProductReservedEvent method but when i try to use this variable in the OrderCreationHandler to emit an update to the subscription query i get a 0 as a value of the orderTotalPrice. when i remove the productReservationService.resetOrderTotalPrice(); statement in the processOrder function i get the orderTotalPrice for all the orders made in the app (even when i restart the app)

This is the code i have:

 
 
import com.jchaaban.common.command.ReserveProductCommand;
import com.jchaaban.common.command.UnreserveProductCommand;
import com.jchaaban.common.event.ProductReservedEvent;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandExecutionException;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.stereotype.Service;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
 
@Slf4j
@Service
public class ProductReservationService {
    private final transient CommandGateway commandGateway;
    private final Map<String, Integer> successfullyReservedProducts = new HashMap<>();
    @Getter
    private double orderTotalPrice = 0;
 
    @EventHandler
    public void handle(ProductReservedEvent event) {
        log.info("Handling ProductReservedEvent for productId: {}, productPrice: {}, quantity: {}",
                event.getProductId(), event.getProductPrice(), event.getQuantity());
 
        orderTotalPrice += event.getProductPrice().doubleValue() * event.getQuantity();;
 
        log.info("Updated orderPrice: {}", orderTotalPrice);
    }
    public ProductReservationService(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }
 
    public boolean reserveProducts(List<String> productIds) {
        Map<String, Integer> productOccurrences = countProductOccurrences(productIds);
        return productOccurrences.entrySet().stream().allMatch(entry -> reserveProduct(entry.getKey(), entry.getValue()));
    }
 
    public void rollbackReservations() {
        log.info("Rolling back product reservations.");
        successfullyReservedProducts.forEach((productId, quantity) ->
                commandGateway.send(new UnreserveProductCommand(productId, quantity)));
        successfullyReservedProducts.clear();
        orderTotalPrice = 0;
    }
 
    public void resetOrderTotalPrice(){
        orderTotalPrice = 0;
    }
 
    private Map<String, Integer> countProductOccurrences(List<String> productIds) {
        return productIds.stream()
                .collect(Collectors.groupingBy(productId -> productId, Collectors.summingInt(productId -> 1)));
    }
    private boolean reserveProduct(String productId, int quantity) {
        try {
            commandGateway.sendAndWait(new ReserveProductCommand(productId, quantity));
            successfullyReservedProducts.put(productId, quantity);
        } catch (CommandExecutionException exception) {
            rollbackReservations();
            return false;
        }
        return true;
    }
}
 
import com.jchaaban.common.dto.ReadPaymentDetailsDto;
import com.jchaaban.ordersservice.saga.service.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
import java.util.List;
 
@Slf4j
@Component
@Data
public class OrderCreationHandler {
 
    private static final String USER_NOT_FOUND_TEMPLATE = "User with ID: %s not found.";
    private static final String PAYMENT_DETAILS_NOT_FOUND_TEMPLATE = "Payment details for user with ID: %s not found.";
    private static final String INSUFFICIENT_BALANCE = "Insufficient balance.";
    private static final String ORDER_SUCCESS_TEMPLATE = "Order processed successfully. Total cost: %s.";
 
    private final transient OrderSummaryEmitter orderSummaryEmitter;
    private final transient ProductReservationService productReservationService;
    private final transient UserQueryService userQueryService;
    private final transient OrderCancellationService orderCancellationService;
    private final transient PaymentService paymentService;
    public OrderCreationHandler(
            OrderSummaryEmitter orderSummaryEmitter,
            ProductReservationService productReservationService,
            UserQueryService userQueryService,
            OrderCancellationService orderCancellationService,
            PaymentService paymentService
    ) {
        this.orderSummaryEmitter = orderSummaryEmitter;
        this.productReservationService = productReservationService;
        this.userQueryService = userQueryService;
        this.orderCancellationService = orderCancellationService;
        this.paymentService = paymentService;
    }
 
 
 
    public void processOrder(String orderId, String userId, List<String> productIds) {
        userQueryService.fetchUserInformation(userId).ifPresentOrElse(
                user -> processPaymentDetails(orderId, userId, productIds),
                () -> orderSummaryEmitter.emitOrderSummary(orderId, String.format(USER_NOT_FOUND_TEMPLATE, userId))
        );
    }
 
    private void processPaymentDetails(String orderId, String userId, List<String> productIds) {
        userQueryService.fetchUserPaymentDetails(userId).ifPresentOrElse(
                paymentDetails -> processOrder(orderId, productIds, paymentDetails),
                () -> orderSummaryEmitter.emitOrderSummary(orderId, String.format(PAYMENT_DETAILS_NOT_FOUND_TEMPLATE, userId))
        );
    }
 
    private void processOrder(String orderId, List<String> productIds, ReadPaymentDetailsDto paymentDetails) {
 
        if (!productReservationService.reserveProducts(productIds)) {
            orderCancellationService.cancelOrder(orderId, "Product reservation failed");
            return;
        }
 
        double orderPrice = productReservationService.getOrderTotalPrice();
 
        if (paymentDetails.getBalance() < orderPrice) {
            orderCancellationService.cancelOrder(orderId, INSUFFICIENT_BALANCE);
            return;
        }
 
        orderSummaryEmitter.emitOrderSummary(orderId, String.format(ORDER_SUCCESS_TEMPLATE, orderPrice));
        paymentService.updatePaymentDetailsBalance(paymentDetails.getPaymentDetailsId(), orderPrice);
        productReservationService.resetOrderTotalPrice();
    }
 
}
 
 
import com.jchaaban.common.event.ProductReservedEvent;
import com.jchaaban.ordersservice.event.OrderCreatedEvent;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.modelling.saga.SagaEventHandler;
import org.axonframework.modelling.saga.StartSaga;
import org.axonframework.spring.stereotype.Saga;
import org.springframework.beans.factory.annotation.Autowired;
 
@Saga // is already a spring component
@Slf4j
@NoArgsConstructor
public class OrderSaga {
    private String orderId;
 
    @Autowired
    private transient OrderCreationHandler orderCreationHandler;
  
    @StartSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void handleOrderCreated(OrderCreatedEvent event) {
        log.info("Handling OrderCreatedEvent for orderId: {}", event.getOrderId());
        this.orderId = event.getOrderId();
        orderCreationHandler.processOrder(orderId, event.getUserId(), event.getProductIds());
    }
}

Here are the logs:

2023-10-30T16:15:47.496+01:00 INFO 19832 --- [agaProcessor]-0] c.jchaaban.ordersservice.saga.OrderSaga : Handling OrderCreatedEvent for orderId: 4ea2f30f-6a7a-4886-a99d-58558ef1f769

Hibernate: update token_entry set token=?,token_type=?,timestamp=? where owner=? and processor_name=? and segment=?

2023-10-30T16:15:47.550+01:00 INFO 19832 --- [saga.service]-0] c.j.o.s.s.ProductReservationService : Handling ProductReservedEvent for productId: cb70a736-05fd-4091-8441-d83eaa117673, productPrice: 33, quantity: 2

2023-10-30T16:15:47.550+01:00 INFO 19832 --- [saga.service]-0] c.j.o.s.s.ProductReservationService : Updated orderPrice: 66.0

Hibernate: select nextval('association_value_entry_seq')

and the infos in the logs are correct the total price of this order is indeed 66, Do you know any solution for this problem?

Thank you

1

There are 1 best solutions below

6
On

Answer

Although moving logic from the Saga to Services makes sense to break up the code, you have effectively also moved state. More specifically, you've moved state from the stateful Saga, to the stateless services.

Axon Framework will de-/serialize a Saga for you to keep track of the process your saga manages as it proceeds over the event stream. In your specific example, the orderTotalPrice is part of the ProductReservationService, which in turn is part of the OrderCreationHandler as a transient field, which ends up in the OrderSaga as a transient field.

Although you definitely want services to be transient within a Saga, as de-/serializing those is asking for trouble on several levels, it does mean your service becomes a collection of state for all your sagas.

And I think it's that last point that comes to bite you: I assume the orderTotalPrice is tied into a single Saga instance, not state that applies to all your sagas. Whereas by moving that state to the services, you effectively make it a global application field instead of a saga-specific field. I gather that somewhere in that flow the price is adjusted unexpectedly.

So, although a service breakdown is helpful, all you've shared above seems like a lot to me for simply creating an Order. Added, a saga with a single event handler and no state doesn't have to be a Saga from Axon Framework. Although it's a handy tool, a saga is intended to coordinate activities between Aggregate types and Bounded Contexts. Your sample does neither, as it simply invokes services. As such, a plain event handler would've sufficed, and likely simplified your implementation.

Lastly, although I understand you've marked it as an Axon question, since you're using Axon, this specifically doesn't have anything to do with using primitives in a Saga. If any type of primitive would reside in the OrderSaga, Axon Framework will happily de-/serialize it for you, giving you the state of that field complying with its progress through the event stream.


Conclusion

Quite a story up there...I went a little back and forth when checking your implementation. If anything is unclear from my reply, Hadi, be sure to add a comment to my answer. I'll happily refine my answer to help you further.