ngFor not displaying data from Observable Event Source

1k Views Asked by At

In my Angular6 application I have a problem with displaying data with async ngfor. I am expecting some Server Sent Events from backend (just object with answer string field). While console.log shows that answer list from service contains answers, ngfor displays nothing.

Here is my component:

import { Component } from '@angular/core';
import { Answer } from './answer';
import { AnswerReactiveService } from './answer-reactive.service';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-answers',
  templateUrl: './answers.component.html',
  providers: [AnswerReactiveService],
  styleUrls: ['./answers.component.css']
})
export class AnswersComponent {
  answers: Observable<Answer[]>;

  constructor(private answerReactiveService: AnswerReactiveService) {
  }

  requestAnswerStream(): void {
    this.answers = this.answerReactiveService.getAnswerStream();
  }

}

Here is the service:

import { Injectable } from '@angular/core';

import { Answer } from './answer';

import { Observable } from 'rxjs';

@Injectable()
export class AnswerReactiveService {

  private answers: Answer[] = [];
  private url: string = 'http://localhost:8080/events/1';

  getAnswerStream(): Observable<Array<Answer>> {
    return Observable.create((observer) => {
      let url = this.url;
      let eventSource = new EventSource(url);
      eventSource.onmessage = (event) => {
        console.log('Received event: ', event);
        const json = JSON.parse(event.data);
        console.log(json);
        this.answers.push(new Answer(json['answer']));
        console.log(this.answers);
        observer.next(this.answers);
      };
      eventSource.onerror = (error) => {
        if (eventSource.readyState === 0) {
          console.log('The stream has been closed by the server.');
          eventSource.close();
          observer.complete();
        } else {
          observer.error('EventSource error: ' + error);
        }
      };
    });
  }
}

And HTML:

<div class="row">
  <div class="col-md-8">
    <p>
      <button (click)="requestAnswerStream()">Gimmie answers</button>
    </p>
    <ul>
      <li *ngFor="let answer of answers | async">{{answer.answer}}</li>
    </ul>
  </div>
</div>
4

There are 4 best solutions below

0
On

Once you got the service result complete the sequence by using observer.complete after emit the value using observer.next()

eventSource.onmessage = (event) => {
        console.log('Received event: ', event);
        const json = JSON.parse(event.data);
        console.log(json);
        this.answers.push(new Answer(json['answer']));
        console.log(this.answers);
        observer.next(this.answers);
        observer.complete();
      };
   };

Inside component

export class AnswersComponent implements OnInit {
  answers: Observable<Answer[]>;

  constructor(private answerReactiveService: AnswerReactiveService) {
  }

  ngOnInit(): void {
        this.answerReactiveService.getAnswerStream().subscribe((response)=>{
        this.answers=response;
    });
  }
}

And then in Html you can simply iterate the answers

 <li *ngFor="let answer of answers">{{answer.answer}}</li>
3
On

This is a synchronicity issue. Observable.create runs synchronously and creates an observable. The callback from your EventSource does get called but its too late to effect that created observable.

The solution is to use a Subject which is a member of the service class and will persist until the EventSource callback can effect it. You will have to 'hook-up' the service subject to your component observable in the component's constructor. Then getAnswerStream() becomes triggerAnswerStream() which sets the Eventsource posting events to the subject.

Something like this:

@Component({
  selector: 'app-answers',
  templateUrl: './answers.component.html',
  providers: [AnswerReactiveService],
  styleUrls: ['./answers.component.css']
})
export class AnswersComponent {
  answers: Observable<Answer[]>;

  constructor(private answerReactiveService: AnswerReactiveService) {
    this.answers = answerReactiveService.answerStreamSubject.asObservable();
  }

  requestAnswerStream(): void {
    this.answerReactiveService.getAnswerStream();
  }

}

and

@Injectable()
export class AnswerReactiveService {

  private answers: Answer[] = [];
  private url: string = 'http://localhost:8080/events/1';

  public answerStreamSubject: Subject<Array<Answer>>;

  triggerAnswerStream(): void {
      let url = this.url;
      let eventSource = new EventSource(url);
      eventSource.onmessage = (event) => {
        console.log('Received event: ', event);
        const json = JSON.parse(event.data);
        console.log(json);
        this.answers.push(new Answer(json['answer']));
        console.log(this.answers);
        this.answerStreamSubject.next(this.answers);
      };
      eventSource.onerror = (error) => {
        if (eventSource.readyState === 0) {
          console.log('The stream has been closed by the server.');
          eventSource.close();
          this.answerStreamSubject.complete();
        } else {
          this.answerStreamSubject.error('EventSource error: ' + error);
        }
      };
  }
}
1
On

After some research I achieved the result by using ngZone. I found similar problem here: https://blog.octo.com/en/angular-2-sse-and-changes-detection/

and now my service code looks like that and it works correctly:

      eventSource.onmessage = (event) => {
        this.zone.run(() => {
          console.log('Received event: ', event);
          const json = JSON.parse(event.data);
          this.answers.push(new Answer(json['answer']));
          observer.next(this.answers);
        });
      }; 

I am just wondering if this solution can be perceived as fully reactive? I didn't find any other way to make this work.

1
On

Try using - addEventListener

An example of which works for me:

@Injectable()
export class EmployeeService {

  public employeesBehavior: BehaviorSubject<Employee>;
  public employeesObservable: Observable<Employee>;

  private URL = 'http://localhost:8080';

  constructor() {
    this.employeesBehavior = new BehaviorSubject(null);
    this.employeesObservable = this.employeesBehavior.asObservable();
  }

  public stream() {
    const streamURL = this.URL + '/stream';
    const eventSource = new EventSource(streamURL);

    eventSource.addEventListener('employees', (event: any) => {
      const employee = JSON.parse(event.data) as Employee;
      this.employeesBehavior.next(employee);
    });
  }
}

Сomponent

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit {

  public employees: Employee[] = [];

  constructor(
    public employeeService: EmployeeService
  ) {

  }

  ngOnInit(): void {
    this.employeeService.stream();

    this.employeeService.employeesObservable.subscribe(e => {
      if (e != null) {
        this.employees.push(e);
      }
    });
  }
}

Template

<div *ngFor="let employee of employees">
     {{employee.name}}
</div>

Server (Spring Boot)

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Employee>> stream() {
        return employeeService.stream().delayElements(Duration.ofSeconds(1)).map(e ->
            ServerSentEvent.<Employee>builder()
                .id(String.valueOf(e.getId()))
                .event("employees")
                .data(e)
                .build()
        );
    }