Difficulty defining RxJS and Stomp operators

41 Views Asked by At

I'm stuck on a problem, I can't set RxJS to update the UI in real time as messages are sent to websocket. The connection is working and messages are being sent to the socket.

private connectedUserMessages$: Subject<ChatMessage> = new Subject();   

onConnected(nickname: string, fullname: string): any {
this.stompClient.subscribe(
  `/user/${nickname}/queue/messages`,
  (payload: any) => {
    const newMessage = JSON.parse(payload.body);
    this.connectedUserMessages$.next(newMessage);
  }
);

Observables:

  getUserMessages(
    senderId: string,
    recipientId: string
  ): Observable<ChatMessage[]> {
    return this.httpClient.get<ChatMessage[]>(
      `${this.messagesUrl}/${senderId}/${recipientId}`
    );
  } 

     getConnectedUserMessages(): Observable<ChatMessage> {
        return this.connectedUserMessages$.asObservable();
      }

In the component I concatenate the old messages from the database with the new ones sent with the "map" and manipulate the service in the constructor:

messages$: Observable<ChatMessage[]> = EMPTY;
constructor(private messageService: MessageService) {
    this.messageService.getConnectedUserMessages().subscribe(message => {
      console.log("New message from socket");
      console.log({ message });
      this.addMessageIfNotExists(message);
    });
    this.messages$.subscribe();    
  }

This is the rest of the logic for adding a message to the interface:

 selectUser(selectedUser: User) {
    this.selectedUserId = selectedUser.nickName;
    this.selectedUser = selectedUser;
    this.messages$ = concat(this.messageService.getUserMessages(this.connectedUser!.nickName, selectedUser.nickName),
      this.messageService.getConnectedUserMessages()
    ).pipe(
      scan((messages: ChatMessage[], newMessage) => {
        console.log("scan");
        console.log([messages]);
        console.log({ newMessage });
        if (Array.isArray(newMessage)) {
          return [...messages, ...newMessage];
        } else {
          return [...messages, newMessage];
        }
      }, []),
      map((messages) =>
        messages.sort(
          (a, b) =>
            new Date(a.timestamp ?? new Date()).getTime() -
            new Date(b.timestamp ?? new Date()).getTime()
        )
      )
    );
  }

  addMessageIfNotExists(message: ChatMessage) {
    this.messages$ = this.messages$.pipe(
      map((messages) => {
        const messageExists = messages.some((msg) => msg.id === message.id);
        return messageExists ? messages : [...messages, message];
      }),
      map((messages) =>
        messages.sort(
          (a, b) =>
            new Date(a.timestamp ?? new Date()).getTime() -
            new Date(b.timestamp ?? new Date()).getTime()
        )
      )
    );
  }

  sendMessage() {
    if (this.connectedUser && this.selectedUserId && this.message) {
      const uniqueMessageId = Date.now().toString();
      this.messageService.sendMessage(
        this.connectedUser.nickName,
        this.selectedUserId,
        this.message,
        uniqueMessageId
      );
      const newMessage: ChatMessage = {
        id: uniqueMessageId,
        chat: undefined,
        senderId: this.connectedUser.nickName,
        recipientId: this.selectedUserId,
        content: this.message,
        timestamp: new Date(),
      };
      this.addMessageIfNotExists(newMessage);
      this.message = '';
    }
  }
<div id="chat-messages" #chatMessages>
        <ng-container *ngIf="(messages$ | async) as messages">
          <div *ngFor="let receivedMessage of messages; let i = index" class="message">
            <div *ngIf="isFirstMessageOfDay(receivedMessage, i, messages)" class="date-marker">
              {{ receivedMessage.timestamp | date : "EEEE, d 'de' MMMM 'de' y" : "+0000" : "pt" }}
            </div>
            <div class="message-header" [ngClass]="
                receivedMessage.senderId === connectedUser.nickName ? 'sender-header': 'receiver-header' ">
              <span>{{ getUserFullName(receivedMessage.senderId) }} às&nbsp;</span>
              <span>{{ receivedMessage.timestamp | date : "HH:mm" }}</span>
            </div>
            <p [ngClass]="
                receivedMessage.senderId === connectedUser.nickName ? 'sender' : 'receiver'">
              {{ receivedMessage.content }}
            </p>
          </div>
        </ng-container>
      </div>

Where is my error in updating the UI in real time as messages arrive in the Observable

1

There are 1 best solutions below

0
Geo242 On

There are a couple of issues here.

  1. Replacing the observable used in the template (this.messages$ = ...) during the lifetime of the component will break the binding. Try to find a way to set messages$ once and let it change over time in response other events.

  2. You should not need to call .subscribe() on an observable in your component code if you are also using it in a template binding (... | async). The async pipe already subscribes to the observable for you, and will unsubscribe when your component is destroyed. Adding a .subscribe() will cause a memory leak if you don't manually unsubscribe on destroy.