Stream multiple responses in the Resource API

When the Resource API first came out, the resource function returned a promise, and the rxResource function only returned the first value of the Observable. It was particularly obvious in the rxResource function when rxResource({ request : this.num, loader: ({ request: n }) => timer(0, 1000).pipe(take(n)) }) emits one integer and complete, not n integers. Fortunately, the Resource API will support streaming where the functions can return multiple responses in Angular 19.2.0. The options of the rxResource function remain the same, but the resource function has a new stream option to support streaming. In this blog post, I will demonstrate streaming table rows using the rxResource and resource functions. Custom makeARow RxJS operator function makeRows(numRows: number, numElementsPerRow: number) { return function (source: Observable) { return source.pipe( tap((i) => console.log('timer called', i)), map((num) => Array(numElementsPerRow).fill(0).map((_, index) => (num * numElementsPerRow + index) + 1)), take(numRows), ) } } The makeRows custom operator populates an array of numbers and unsubscribes after a specified number of rows are populated. Stream data with the rxResource function @let rxResourceTitle = 'Aggregate table rows with rxResource stream'; {{ title }} @for (row of tableResource.value(); track $index) { @for (data of row; track data; let last=$last) { @let delimiter = last ? '' : ', '; {{ `${data}${delimiter}` }} } } In the HTML template, I create a NgTemplate to display the table rows and a static title. In the ng-container element, I assign the template variable, table, to the ngTemplateOutlet input. Moreover, I assign the static title and the resource value to the ngTemplateOutletContext input. @Component({ selector: 'app-root', imports: [NgTemplateOutlet], templateUrl: 'main.component.html', changeDetection: ChangeDetectionStrategy.OnPush, }) export class App { rowSub = new Subject(); table$ = this.rowSub.pipe( scan((acc, values) => ([...acc, values]), [] as number[][]), ); tableDataRxResource = rxResource({ loader: () => this.table$, }); constructor() { timer(0, 1000) .pipe(makeRows(10, 10)) .subscribe({ next: (array) => this.rowSub.next(array) }); } } The rowSub is a Subject that emits an array of numbers. When rowSub emits a new array, the table$ Observable uses the scan RxJS operator to append it into the nested array. The tableDataRxResource resource is created by the rxResource function. The resource has a loader that streams the table row as it arrives to the $table Observable. The RxJS timer generates a new row every second in the constructor and stops after the tenth time. When the Observable subscribes, the result is fed to the rowSub Subject. Stream data with the resource function {{ title }} @for (row of tableResource.value(); track $index) { @for (data of row; track data; let last=$last) { @let delimiter = last ? '' : ', '; {{ `${data}${delimiter}` }} } } The ng-container uses the same template with a different resource and static tile. table = signal({ value: [] }); The table signal holds an object with a value property. The property stores a nested number array. tableDataResource = resource({ stream: async () => this.table, }); The tableDataResource resource uses the resource function to stream the table rows. The stream option is new and expects an async function that returns a signal. @Component({ ... }) export class App { table = signal({ value: [] }); tableDataResource = resource({ stream: async () => this.table, }); constructor() { timer(0, 1500) .pipe(makeRows(10, 10)) .subscribe({ next: (array) => this.table.update(({ value }) => ({ value: [...value, array] })) }); } } The RxJS timer subscribes and appends the new number array to the table signal. The resource function achieves the same result as the rxResource function; the demo displays a row every second. References: PR: https://github.com/angular/angular/pull/59573 Resource API: https://angular.dev/guide/signals/resource Stackblitz Demo: https://stackblitz.com/edit/stackblitz-starters-doif2hds?file=src%2Fmain.ts

Feb 5, 2025 - 02:05
 0
Stream multiple responses in the Resource API

When the Resource API first came out, the resource function returned a promise, and the rxResource function only returned the first value of the Observable. It was particularly obvious in the rxResource function when

rxResource({
  request : this.num,
  loader: ({ request: n }) => timer(0, 1000).pipe(take(n))
})

emits one integer and complete, not n integers.

Fortunately, the Resource API will support streaming where the functions can return multiple responses in Angular 19.2.0.

The options of the rxResource function remain the same, but the resource function has a new stream option to support streaming.

In this blog post, I will demonstrate streaming table rows using the rxResource and resource functions.

Custom makeARow RxJS operator

function makeRows(numRows: number, numElementsPerRow: number) {
 return function (source: Observable<number>) {
   return source.pipe(
     tap((i) => console.log('timer called', i)),
     map((num) => Array(numElementsPerRow).fill(0).map((_, index) => (num * numElementsPerRow + index) + 1)),
     take(numRows),
   )
 }
}

The makeRows custom operator populates an array of numbers and unsubscribes after a specified number of rows are populated.

Stream data with the rxResource function

@let rxResourceTitle = 'Aggregate table rows with rxResource stream';
 [ngTemplateOutlet]="table" [ngTemplateOutletContext]="{ $implicit: tableDataRxResource,  title: rxResourceTitle }"/>

 #table let-tableResource let-title="title">
 

{{ title }} @for (row of tableResource.value(); track $index) {

@for (data of row; track data; let last=$last) { @let delimiter = last ? '' : ', '; {{ `${data}${delimiter}` }} }
}

In the HTML template, I create a NgTemplate to display the table rows and a static title. In the ng-container element, I assign the template variable, table, to the ngTemplateOutlet input. Moreover, I assign the static title and the resource value to the ngTemplateOutletContext input.

@Component({
 selector: 'app-root',
 imports: [NgTemplateOutlet],
 templateUrl: 'main.component.html',
 changeDetection: ChangeDetectionStrategy.OnPush,
})
export class App {
 rowSub = new Subject<number[]>();
 table$ = this.rowSub.pipe(
   scan((acc, values) => ([...acc, values]), [] as number[][]),
 );

  tableDataRxResource = rxResource({
   loader: () => this.table$,
 });

 constructor() {
   timer(0, 1000)
     .pipe(makeRows(10, 10))
     .subscribe({
       next: (array) => this.rowSub.next(array)
     });
 }
}

The rowSub is a Subject that emits an array of numbers. When rowSub emits a new array, the table$ Observable uses the scan RxJS operator to append it into the nested array.

The tableDataRxResource resource is created by the rxResource function. The resource has a loader that streams the table row as it arrives to the $table Observable.

The RxJS timer generates a new row every second in the constructor and stops after the tenth time. When the Observable subscribes, the result is fed to the rowSub Subject.

Stream data with the resource function

 [ngTemplateOutlet]="table" [ngTemplateOutletContext]="{ $implicit: tableDataResource, title: 'Aggregate table rows with resource stream' }"/>

 #table let-tableResource let-title="title">
 

{{ title }} @for (row of tableResource.value(); track $index) {

@for (data of row; track data; let last=$last) { @let delimiter = last ? '' : ', '; {{ `${data}${delimiter}` }} }
}

The ng-container uses the same template with a different resource and static tile.

table = signal<{ value: number[][] }>({ value: [] });

The table signal holds an object with a value property. The property stores a nested number array.

tableDataResource = resource({
   stream: async () => this.table,
});

The tableDataResource resource uses the resource function to stream the table rows. The stream option is new and expects an async function that returns a signal.

@Component({
   ...
})
export class App {
 table = signal<{ value: number[][] }>({ value: [] });

 tableDataResource = resource({
   stream: async () => this.table,
 });

 constructor() {
   timer(0, 1500)
     .pipe(makeRows(10, 10))
     .subscribe({
       next: (array) => 
         this.table.update(({ value }) => ({ value: [...value, array] }))
     }); 
 }
}

The RxJS timer subscribes and appends the new number array to the table signal.

The resource function achieves the same result as the rxResource function; the demo displays a row every second.

References: