I'm stuck on a problem, I can't set RxJS to update the UI in real time as messages are sent to websocket. The connection is working and messages are being sent to the socket.
private connectedUserMessages$: Subject<ChatMessage> = new Subject();
onConnected(nickname: string, fullname: string): any {
this.stompClient.subscribe(
`/user/${nickname}/queue/messages`,
(payload: any) => {
const newMessage = JSON.parse(payload.body);
this.connectedUserMessages$.next(newMessage);
}
);
Observables:
getUserMessages(
senderId: string,
recipientId: string
): Observable<ChatMessage[]> {
return this.httpClient.get<ChatMessage[]>(
`${this.messagesUrl}/${senderId}/${recipientId}`
);
}
getConnectedUserMessages(): Observable<ChatMessage> {
return this.connectedUserMessages$.asObservable();
}
In the component I concatenate the old messages from the database with the new ones sent with the "map" and manipulate the service in the constructor:
messages$: Observable<ChatMessage[]> = EMPTY;
constructor(private messageService: MessageService) {
this.messageService.getConnectedUserMessages().subscribe(message => {
console.log("New message from socket");
console.log({ message });
this.addMessageIfNotExists(message);
});
this.messages$.subscribe();
}
This is the rest of the logic for adding a message to the interface:
selectUser(selectedUser: User) {
this.selectedUserId = selectedUser.nickName;
this.selectedUser = selectedUser;
this.messages$ = concat(this.messageService.getUserMessages(this.connectedUser!.nickName, selectedUser.nickName),
this.messageService.getConnectedUserMessages()
).pipe(
scan((messages: ChatMessage[], newMessage) => {
console.log("scan");
console.log([messages]);
console.log({ newMessage });
if (Array.isArray(newMessage)) {
return [...messages, ...newMessage];
} else {
return [...messages, newMessage];
}
}, []),
map((messages) =>
messages.sort(
(a, b) =>
new Date(a.timestamp ?? new Date()).getTime() -
new Date(b.timestamp ?? new Date()).getTime()
)
)
);
}
addMessageIfNotExists(message: ChatMessage) {
this.messages$ = this.messages$.pipe(
map((messages) => {
const messageExists = messages.some((msg) => msg.id === message.id);
return messageExists ? messages : [...messages, message];
}),
map((messages) =>
messages.sort(
(a, b) =>
new Date(a.timestamp ?? new Date()).getTime() -
new Date(b.timestamp ?? new Date()).getTime()
)
)
);
}
sendMessage() {
if (this.connectedUser && this.selectedUserId && this.message) {
const uniqueMessageId = Date.now().toString();
this.messageService.sendMessage(
this.connectedUser.nickName,
this.selectedUserId,
this.message,
uniqueMessageId
);
const newMessage: ChatMessage = {
id: uniqueMessageId,
chat: undefined,
senderId: this.connectedUser.nickName,
recipientId: this.selectedUserId,
content: this.message,
timestamp: new Date(),
};
this.addMessageIfNotExists(newMessage);
this.message = '';
}
}
<div id="chat-messages" #chatMessages>
<ng-container *ngIf="(messages$ | async) as messages">
<div *ngFor="let receivedMessage of messages; let i = index" class="message">
<div *ngIf="isFirstMessageOfDay(receivedMessage, i, messages)" class="date-marker">
{{ receivedMessage.timestamp | date : "EEEE, d 'de' MMMM 'de' y" : "+0000" : "pt" }}
</div>
<div class="message-header" [ngClass]="
receivedMessage.senderId === connectedUser.nickName ? 'sender-header': 'receiver-header' ">
<span>{{ getUserFullName(receivedMessage.senderId) }} às </span>
<span>{{ receivedMessage.timestamp | date : "HH:mm" }}</span>
</div>
<p [ngClass]="
receivedMessage.senderId === connectedUser.nickName ? 'sender' : 'receiver'">
{{ receivedMessage.content }}
</p>
</div>
</ng-container>
</div>
Where is my error in updating the UI in real time as messages arrive in the Observable
There are a couple of issues here.
Replacing the observable used in the template (
this.messages$ = ...) during the lifetime of the component will break the binding. Try to find a way to setmessages$once and let it change over time in response other events.You should not need to call
.subscribe()on an observable in your component code if you are also using it in a template binding (... | async). The async pipe already subscribes to the observable for you, and will unsubscribe when your component is destroyed. Adding a.subscribe()will cause a memory leak if you don't manually unsubscribe on destroy.