Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions goldens/public-api/common/common.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ export declare const APP_BASE_HREF: InjectionToken<string>;
export declare class AsyncPipe implements OnDestroy, PipeTransform {
constructor(_ref: ChangeDetectorRef);
ngOnDestroy(): void;
transform<T>(obj: Observable<T> | Promise<T>): T | null;
transform<T>(obj: Subscribable<T> | Promise<T>): T | null;
transform<T>(obj: null | undefined): null;
transform<T>(obj: Observable<T> | Promise<T> | null | undefined): T | null;
transform<T>(obj: Subscribable<T> | Promise<T> | null | undefined): T | null;
}

export declare class CommonModule {
Expand Down
39 changes: 20 additions & 19 deletions packages/common/src/pipes/async_pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
* found in the LICENSE file at https://angular.io/license
*/

import {ChangeDetectorRef, EventEmitter, OnDestroy, Pipe, PipeTransform, ɵisObservable, ɵisPromise} from '@angular/core';
import {Observable, SubscriptionLike} from 'rxjs';
import {ChangeDetectorRef, EventEmitter, OnDestroy, Pipe, PipeTransform, ɵisPromise, ɵisSubscribable} from '@angular/core';
import {Subscribable, Unsubscribable} from 'rxjs';

import {invalidPipeArgumentError} from './invalid_pipe_argument_error';

interface SubscriptionStrategy {
createSubscription(async: Observable<any>|Promise<any>, updateLatestValue: any): SubscriptionLike
createSubscription(async: Subscribable<any>|Promise<any>, updateLatestValue: any): Unsubscribable
|Promise<any>;
dispose(subscription: SubscriptionLike|Promise<any>): void;
onDestroy(subscription: SubscriptionLike|Promise<any>): void;
dispose(subscription: Unsubscribable|Promise<any>): void;
onDestroy(subscription: Unsubscribable|Promise<any>): void;
}

class ObservableStrategy implements SubscriptionStrategy {
createSubscription(async: Observable<any>, updateLatestValue: any): SubscriptionLike {
class SubscribableStrategy implements SubscriptionStrategy {
createSubscription(async: Subscribable<any>, updateLatestValue: any): Unsubscribable {
return async.subscribe({
next: updateLatestValue,
error: (e: any) => {
Expand All @@ -27,11 +28,11 @@ class ObservableStrategy implements SubscriptionStrategy {
});
}

dispose(subscription: SubscriptionLike): void {
dispose(subscription: Unsubscribable): void {
subscription.unsubscribe();
}

onDestroy(subscription: SubscriptionLike): void {
onDestroy(subscription: Unsubscribable): void {
subscription.unsubscribe();
}
}
Expand All @@ -49,7 +50,7 @@ class PromiseStrategy implements SubscriptionStrategy {
}

const _promiseStrategy = new PromiseStrategy();
const _observableStrategy = new ObservableStrategy();
const _subscribableStrategy = new SubscribableStrategy();

/**
* @ngModule CommonModule
Expand Down Expand Up @@ -82,8 +83,8 @@ const _observableStrategy = new ObservableStrategy();
export class AsyncPipe implements OnDestroy, PipeTransform {
private _latestValue: any = null;

private _subscription: SubscriptionLike|Promise<any>|null = null;
private _obj: Observable<any>|Promise<any>|EventEmitter<any>|null = null;
private _subscription: Unsubscribable|Promise<any>|null = null;
private _obj: Subscribable<any>|Promise<any>|EventEmitter<any>|null = null;
private _strategy: SubscriptionStrategy = null!;

constructor(private _ref: ChangeDetectorRef) {}
Expand All @@ -94,10 +95,10 @@ export class AsyncPipe implements OnDestroy, PipeTransform {
}
}

transform<T>(obj: Observable<T>|Promise<T>): T|null;
transform<T>(obj: Subscribable<T>|Promise<T>): T|null;
transform<T>(obj: null|undefined): null;
transform<T>(obj: Observable<T>|Promise<T>|null|undefined): T|null;
transform<T>(obj: Observable<T>|Promise<T>|null|undefined): T|null {
transform<T>(obj: Subscribable<T>|Promise<T>|null|undefined): T|null;
transform<T>(obj: Subscribable<T>|Promise<T>|null|undefined): T|null {
if (!this._obj) {
if (obj) {
this._subscribe(obj);
Expand All @@ -113,20 +114,20 @@ export class AsyncPipe implements OnDestroy, PipeTransform {
return this._latestValue;
}

private _subscribe(obj: Observable<any>|Promise<any>|EventEmitter<any>): void {
private _subscribe(obj: Subscribable<any>|Promise<any>|EventEmitter<any>): void {
this._obj = obj;
this._strategy = this._selectStrategy(obj);
this._subscription = this._strategy.createSubscription(
obj, (value: Object) => this._updateLatestValue(obj, value));
}

private _selectStrategy(obj: Observable<any>|Promise<any>|EventEmitter<any>): any {
private _selectStrategy(obj: Subscribable<any>|Promise<any>|EventEmitter<any>): any {
if (ɵisPromise(obj)) {
return _promiseStrategy;
}

if (ɵisObservable(obj)) {
return _observableStrategy;
if (ɵisSubscribable(obj)) {
return _subscribableStrategy;
}

throw invalidPipeArgumentError(AsyncPipe, obj);
Expand Down
1 change: 1 addition & 0 deletions packages/common/test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ts_library(
"//packages/platform-browser-dynamic",
"//packages/platform-browser/testing",
"//packages/private/testing",
"@npm//rxjs",
],
)

Expand Down
48 changes: 32 additions & 16 deletions packages/common/test/pipes/async_pipe_spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,70 +10,87 @@ import {AsyncPipe, ɵgetDOM as getDOM} from '@angular/common';
import {EventEmitter} from '@angular/core';
import {AsyncTestCompleter, beforeEach, describe, expect, inject, it} from '@angular/core/testing/src/testing_internal';
import {browserDetection} from '@angular/platform-browser/testing/src/browser_util';
import {Subscribable, Unsubscribable} from 'rxjs';

import {SpyChangeDetectorRef} from '../spies';

{
describe('AsyncPipe', () => {
describe('Observable', () => {
// only expose methods from the Subscribable interface, to ensure that
// the implementation does not rely on other methods:
const wrapSubscribable = <T>(input: Subscribable<T>): Subscribable<T> => ({
subscribe(...args: any): Unsubscribable {
const subscription = input.subscribe(...args);
return {
unsubscribe() {
subscription.unsubscribe();
}
};
}
});

let emitter: EventEmitter<any>;
let subscribable: Subscribable<any>;
let pipe: AsyncPipe;
let ref: any;
const message = {};

beforeEach(() => {
emitter = new EventEmitter();
subscribable = wrapSubscribable(emitter);
ref = new SpyChangeDetectorRef();
pipe = new AsyncPipe(ref);
});

describe('transform', () => {
it('should return null when subscribing to an observable', () => {
expect(pipe.transform(emitter)).toBe(null);
expect(pipe.transform(subscribable)).toBe(null);
});

it('should return the latest available value',
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
pipe.transform(emitter);
pipe.transform(subscribable);
emitter.emit(message);

setTimeout(() => {
expect(pipe.transform(emitter)).toEqual(message);
expect(pipe.transform(subscribable)).toEqual(message);
async.done();
}, 0);
}));


it('should return same value when nothing has changed since the last call',
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
pipe.transform(emitter);
pipe.transform(subscribable);
emitter.emit(message);

setTimeout(() => {
pipe.transform(emitter);
expect(pipe.transform(emitter)).toBe(message);
pipe.transform(subscribable);
expect(pipe.transform(subscribable)).toBe(message);
async.done();
}, 0);
}));

it('should dispose of the existing subscription when subscribing to a new observable',
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
pipe.transform(emitter);
pipe.transform(subscribable);

const newEmitter = new EventEmitter();
expect(pipe.transform(newEmitter)).toBe(null);
const newSubscribable = wrapSubscribable(newEmitter);
expect(pipe.transform(newSubscribable)).toBe(null);
emitter.emit(message);

// this should not affect the pipe
setTimeout(() => {
expect(pipe.transform(newEmitter)).toBe(null);
expect(pipe.transform(newSubscribable)).toBe(null);
async.done();
}, 0);
}));

it('should request a change detection check upon receiving a new value',
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
pipe.transform(emitter);
pipe.transform(subscribable);
emitter.emit(message);

setTimeout(() => {
Expand All @@ -83,12 +100,11 @@ import {SpyChangeDetectorRef} from '../spies';
}));

it('should return value for unchanged NaN', () => {
const emitter = new EventEmitter<any>();
emitter.emit(null);
pipe.transform(emitter);
pipe.transform(subscribable);
emitter.next(NaN);
const firstResult = pipe.transform(emitter);
const secondResult = pipe.transform(emitter);
const firstResult = pipe.transform(subscribable);
const secondResult = pipe.transform(subscribable);
expect(firstResult).toBeNaN();
expect(secondResult).toBeNaN();
});
Expand All @@ -101,12 +117,12 @@ import {SpyChangeDetectorRef} from '../spies';

it('should dispose of the existing subscription',
inject([AsyncTestCompleter], (async: AsyncTestCompleter) => {
pipe.transform(emitter);
pipe.transform(subscribable);
pipe.ngOnDestroy();
emitter.emit(message);

setTimeout(() => {
expect(pipe.transform(emitter)).toBe(null);
expect(pipe.transform(subscribable)).toBe(null);
async.done();
}, 0);
}));
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/core_private_export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export {_sanitizeHtml as ɵ_sanitizeHtml} from './sanitization/html_sanitizer';
export {_sanitizeUrl as ɵ_sanitizeUrl} from './sanitization/url_sanitizer';
export {makeDecorator as ɵmakeDecorator} from './util/decorators';
export {global as ɵglobal} from './util/global';
export {isObservable as ɵisObservable, isPromise as ɵisPromise} from './util/lang';
export {isObservable as ɵisObservable, isPromise as ɵisPromise, isSubscribable as ɵisSubscribable} from './util/lang';
export {stringify as ɵstringify} from './util/stringify';
export {clearOverrides as ɵclearOverrides, initServicesIfNeeded as ɵinitServicesIfNeeded, overrideComponentView as ɵoverrideComponentView, overrideProvider as ɵoverrideProvider} from './view/index';
export {NOT_FOUND_CHECK_ONLY_ELEMENT_INJECTOR as ɵNOT_FOUND_CHECK_ONLY_ELEMENT_INJECTOR} from './view/provider';
14 changes: 10 additions & 4 deletions packages/core/src/util/lang.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* found in the LICENSE file at https://angular.io/license
*/

import {Observable} from 'rxjs';
import {Observable, Subscribable} from 'rxjs';

/**
* Determine if the argument is shaped like a Promise
Expand All @@ -17,6 +17,13 @@ export function isPromise<T = any>(obj: any): obj is Promise<T> {
return !!obj && typeof obj.then === 'function';
}

/**
* Determine if the argument is a Subscribable
*/
export function isSubscribable(obj: any|Subscribable<any>): obj is Subscribable<any> {
return !!obj && typeof obj.subscribe === 'function';
}

/**
* Determine if the argument is an Observable
*
Expand All @@ -26,6 +33,5 @@ export function isPromise<T = any>(obj: any): obj is Promise<T> {
* `subscribe()` method, and RxJS has mechanisms to wrap `Subscribable` objects
* into `Observable` as needed.
*/
export function isObservable(obj: any|Observable<any>): obj is Observable<any> {
return !!obj && typeof obj.subscribe === 'function';
}
export const isObservable =
isSubscribable as ((obj: any|Observable<any>) => obj is Observable<any>);