Axon Event Sourcing Events Multiplication on service restart

27 Views Asked by At

I try to learn by doing Axon and Event Sourcing. I try to create web application where I could track musicians presence/absence on rehearsals or performances. I need few features such Add Musician, Modify Musician, Add Music Event (rehearsal or performance), Modify Music Event and based on that I try to create list of checkboxes (PresenceSlot) to mark musician presence on selected musical event.

Problem is that, on every restart of my app, events are send again from axon event store and with each another restart my database is populated with data witch make duplication of entries.

*I know it is because I am sending commands in PresenceProjector - but I have no idea how to model my application in other way. *

From where I should sent new events/commands to avoid sending new events on every restart?

Part of code I created below:

MusicianAggregate:

@Aggregate
public class MusicianAggregate {

    @AggregateIdentifier
    private UUID musicianId;
    private String firstName;
    private String lastName;
    private String phone;
    private LocalDate dateOfBirth;
    private LocalDate joinDate;
    private Boolean active;
    private String address;

    protected MusicianAggregate() {

    }

    @CommandHandler
    public MusicianAggregate(CreateMusicianCommand command) {
         apply(new MusicianSignedUpEvent(command.getMusicianId(),
                command.getFirstName(),
                command.getLastName(),
                command.getPhone(),
                command.getDateOfBirth(),
                command.getJoinDate(),
                command.getActive(),
                command.getAddress()
        )).andThenApply(() -> new MusicianJoinDateCreatedEvent(command.getMusicianId(), 
            command.getJoinDate(),
            command.getFirstName() + " " + command.getLastName(),
            command.getActive()));
    }

    @EventSourcingHandler
    public void on(MusicianSignedUpEvent evt) {
        if (evt.getJoinDate() == null) {
            throw new MusicianJoinDateIsEmptyException();
        }
        this.musicianId = evt.getMusicianId();
        this.firstName = evt.getFirstName();
        this.lastName = evt.getLastName();
        this.phone = evt.getPhone();
        this.dateOfBirth = evt.getDateOfBirth();
        this.joinDate = evt.getJoinDate();
        this.active = evt.getActive();
        this.address = evt.getAddress();
    }

    @CommandHandler
    public void on(UpdateMusicianCommand command) {
        var currentJoinDate = this.joinDate;
        apply(new MusicianUpdatedEvent(command.getId(),
                command.getFirstName(),
                command.getLastName(),
                command.getPhone(),
                command.getDateOfBirth(),
                command.getJoinDate(),
                command.getActive(),
                command.getAddress()
        )).andThenApplyIf(
            () -> !currentJoinDate.equals(command.getJoinDate()), 
            () -> new MusicianJoinDateUpdatedEvent(command.getId(), currentJoinDate, command.getJoinDate()));
    }

    @EventSourcingHandler
    public void on(MusicianUpdatedEvent evt) {
        this.musicianId = evt.getMusicianId();
        this.firstName = evt.getFirstName();
        this.lastName = evt.getLastName();
        this.phone = evt.getPhone();
        this.dateOfBirth = evt.getDateOfBirth();
        this.joinDate = evt.getJoinDate();
        this.active = evt.getActive();
        this.address = evt.getAddress();
    }

MusicEventAggregate

@Aggregate
public class MusicEventAggregate {

    @AggregateIdentifier
    private UUID id;
    private String name;
    private String address;
    private LocalDateTime dateTime;

    public MusicEventAggregate() {

    }

    @CommandHandler
    public MusicEventAggregate(CreateMusicEventCommand command) {
        AggregateLifecycle.apply(new MusicEventCreatedEvent(
                command.getMusicEventId(),
                command.getName(),
                command.getAddress(),
                command.getDateTime()))
            .andThenApply(() -> new MusicEventDateCreated(command.getMusicEventId(), command.getDateTime()));
    }

    @EventSourcingHandler
    public void on(MusicEventCreatedEvent event) {
        this.id = event.getMusicEventId();
        this.name = event.getName();
        this.address = event.getAddress();
        this.dateTime = event.getDateTime();
    }

    @CommandHandler
    public void handle(UpdateMusicEventCommand command) {
        var currentDateTime = this.dateTime;
        AggregateLifecycle.apply(new MusicEventCreatedEvent(
                command.getId(),
                command.getName(),
                command.getAddress(),
                command.getDateTime()))
            .andThenApplyIf(
                () -> !currentDateTime.equals(command.getDateTime()), 
                () -> new MusicEventDateUpdated(command.getId(), currentDateTime, command.getDateTime()));
    }

    @EventSourcingHandler
    public void on(MusicEventUpdatedEvent event) {
        this.name = event.getName();
        this.address = event.getAddress();
        this.dateTime = event.getDateTime();
    }

And then I have PresenceProjector where I want catch events from mentioned aggregates and based on musician joinDate creation or change I want to create or remove entities of presence slot to show on UI and also I want to create instances of PresenceSlotAggregate to be able change state from absent to present and store.

@Component
@Slf4j
public class PresenceProjector {

    private final PresenceRepository presenceRepository;
    private final MusicianRepository musicianRepository;
    private final MusicEventRepository musicEventRepository;
    private final CommandGateway commandGateway;

    public PresenceProjector(PresenceRepository presenceRepository,
            MusicianRepository musicianRepository,
            MusicEventRepository musicEventRepository,
            CommandGateway commandGateway) {
        this.presenceRepository = presenceRepository;
        this.musicianRepository = musicianRepository;
        this.musicEventRepository = musicEventRepository;
        this.commandGateway = commandGateway;
    }

    @EventHandler
    public void on(MusicianJoinDateCreatedEvent event) {
        if (!event.isMusicianActive()) {
            return;
        }
        var musicianId = event.getMusicianId();
        var joinDate = event.getCurrentJoinDate();
        var musicEvents = musicEventRepository.findAllAfter(joinDate.atStartOfDay());

        for (MusicEvent musicEvent : musicEvents) {
            var presenceId = UUID.randomUUID();
            commandGateway.send(
                    new CreatePresenceSlotCommand(
                            presenceId,
                            musicEvent.getId(),
                            musicianId,
                            Boolean.FALSE));
        }
    }

    @EventHandler
    public void on(MusicianJoinDateUpdatedEvent event) {
        var musicianId = event.getMusicianId();
        var earlier = event.getCurrentJoinDate().isBefore(event.getNewJoinDate()) ? event.getCurrentJoinDate() : event.getNewJoinDate();
        var further = event.getCurrentJoinDate().isAfter(event.getNewJoinDate()) ? event.getCurrentJoinDate() : event.getNewJoinDate();
        if (event.getCurrentJoinDate().isBefore(event.getNewJoinDate())) {        
            var musicEvents = musicEventRepository.findAllBetween(earlier.atStartOfDay(), further.plusDays(1).atStartOfDay());
            for (MusicEvent musicEvent : musicEvents) {
            var presenceId = UUID.randomUUID();
            commandGateway.send(
                    new CreatePresenceSlotCommand(
                            presenceId,
                            musicEvent.getId(),
                            musicianId,
                            Boolean.FALSE));
            }
        } else {
            var presenceList = presenceRepository.findAllBetween(earlier, further);
            for(Presence p : presenceList) {
                commandGateway.send (new RemovePresenceCommand(p.getId()));
            }
        }
    }
        
    @EventHandler
    public void on(MusicEventDateCreated event) {
        var musicEventId = event.getMusicEventId();
        var eventDateTime = event.getDateTime();

        var musicians = musicianRepository.findAllActiveOnMusicEventTime(eventDateTime.toLocalDate());
        for (Musician musician : musicians) {
            var presenceSlotId = UUID.randomUUID();
            log.info("[email protected] - sending CreatePresenceEntryCommand");
            commandGateway.send(
                    new CreatePresenceSlotCommand(
                            presenceSlotId,
                            musicEventId,
                            musician.getId(),
                            Boolean.FALSE));
        }
    }

    @EventHandler
    public void on(MusicEventDateUpdated event) {
        var musicEventId = event.getMusicEventId();
        var earlier = event.getCurrentDateTime().isBefore(event.getNewDateTime()) ? event.getCurrentDateTime() : event.getNewDateTime();
        var further = event.getCurrentDateTime().isAfter(event.getNewDateTime()) ? event.getCurrentDateTime() : event.getNewDateTime();
        
         if (event.getCurrentDateTime().isBefore(event.getNewDateTime())) {     
            var musicians = musicianRepository.findAllActiveOnMusicEventTime(event.getNewDateTime().toLocalDate());
            for (Musician musician : musicians) {
                var presenceSlotId = UUID.randomUUID();
                log.info("[email protected] - sending CreatePresenceEntryCommand");
                commandGateway.send(
                        new CreatePresenceSlotCommand(
                                presenceSlotId,
                                musicEventId,
                                musician.getId(),
                                Boolean.FALSE));
            }
         } else {
            var presenceList = presenceRepository.findAllBetween(earlier.toLocalDate(), further.plusDays(1).toLocalDate());
            for(Presence p : presenceList) {
                commandGateway.send (new RemovePresenceCommand(p.getId()));
            }
         }

    }

    @EventHandler
    public void on(PresenceSlotCreatedEvent event) {
        log.info("[email protected]: " + event);
        var musician = musicianRepository.findById(event.getMusicianId());
        var musicianFullName = musician.get().getFullName();
        var p = new Presence(event.getPresenceId(), event.getMusicianId(), musicianFullName, event.getEventId());
        presenceRepository.save(p);
    }

    @EventHandler
    public void on(PresenceRemovedEvent event) {
        presenceRepository.deleteById(event.getId());
    }

    @EventHandler
    public void on(MusicianMusicEventPresenceChangedEvent event) {
        Presence presence = presenceRepository.findForMusicianIdAndEventId(event.getMusicianId(), event.getEventId());
        presence.setPresent(event.isPresent());
        presenceRepository.save(presence);
    }

    @QueryHandler
    public List<PresenceDto> handle(GetMusicianPresenceInMusicEventQuery query) {
        return presenceRepository.findAllForEventId(query.getMusicEventId());
    }
1

There are 1 best solutions below

1
Gerard Klijs On

You should never send commands from on EventHandler. The handler typically directly updates the projection, with the information from the event. So rather then looping over the entries, and send commands for each, it should change the data.

Typically a token store is configured to be used, so your not replaying the events echt time the app restarts, but you rather read from where you were left. A token store will also prevent multiple applications processing the same event, if you run multiple applications.

It does seem your API is very 'RESTy'. Which might be a necessity for your domain, but isn't a good practice.

For example, instead of having the MusicianJoinDateUpdatedEvent, I would expect something like a MusicianJoinedEvent. Events ideally should represent facts which happened, not that some data has been changed. Being used to REST, it's hard to unlearn building systems in that way.