通过使用 Http,我们调用了一个执行网络调用并返回 http observable 的方法:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
如果我们采用这个 observable 并向它添加多个订阅者:
let network$ = getCustomer();
let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
我们要做的是确保这不会导致多个网络请求。
这可能看起来像一个不寻常的场景,但实际上很常见:例如,如果调用者订阅 observable 以显示错误消息,并使用异步管道将其传递给模板,我们已经有两个订阅者。
在 RxJs 5 中这样做的正确方法是什么?
也就是说,这似乎工作正常:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json()).share();
}
但这是在 RxJs 5 中这样做的惯用方式,还是我们应该做其他事情?
注意:根据 Angular 5 新的 HttpClient
,所有示例中的 .map(res => res.json())
部分现在都没有用了,因为现在默认采用 JSON 结果。
编辑:截至 2021 年,正确的方法是使用 RxJs 原生提出的 shareReplay
运算符。在下面的答案中查看更多详细信息。
缓存数据,如果缓存可用,则返回此数据,否则发出 HTTP 请求。
import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';
@Injectable()
export class DataService {
private url: string = 'https://cors-test.appspot.com/test';
private data: Data;
private observable: Observable<any>;
constructor(private http: Http) {}
getData() {
if(this.data) {
// if `data` is available just return it as `Observable`
return Observable.of(this.data);
} else if(this.observable) {
// if `this.observable` is set then the request is in progress
// return the `Observable` for the ongoing request
return this.observable;
} else {
// example header (not necessary)
let headers = new Headers();
headers.append('Content-Type', 'application/json');
// create the request, store the `Observable` for subsequent subscribers
this.observable = this.http.get(this.url, {
headers: headers
})
.map(response => {
// when the cached data is available we don't need the `Observable` reference anymore
this.observable = null;
if(response.status == 400) {
return "FAILURE";
} else if(response.status == 200) {
this.data = new Data(response.json());
return this.data;
}
// make it shared so more than one subscriber can get the result
})
.share();
return this.observable;
}
}
}
这篇文章 https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html 很好地解释了如何使用 shareReplay
进行缓存。
根据@Cristian 的建议,这是一种适用于 HTTP 可观察对象的方法,它只发出一次然后完成:
getCustomer() {
return this.http.get('/someUrl')
.map(res => res.json()).publishLast().refCount();
}
share
运算符可能是一个合理的选择(尽管有一些令人讨厌的边缘情况)。有关选项的深入讨论,请参阅此博客文章中的评论部分:blog.jhades.org/…
publishLast().refCount()
共享的源 observable 无法取消,但一旦取消了对 refCount
返回的 observable 的所有订阅,净效果是源 observable 将被取消订阅,如果它取消它在哪里“机上”
更新:Ben Lesh 说,在 5.2.0 之后的下一个小版本中,您将能够调用 shareReplay() 来真正缓存。
之前.....
首先,不要使用 share() 或 publishReplay(1).refCount(),它们是相同的,问题在于它仅在可观察对象处于活动状态时建立连接时共享,如果您在完成后连接,它再次创建一个新的可观察对象,翻译,而不是真正的缓存。
Birowski 在上面给出了正确的解决方案,即使用 ReplaySubject。在我们的案例 1 中,ReplaySubject 将缓存你给它的值(bufferSize)。一旦 refCount 达到零并且你建立一个新的连接,它就不会像 share() 那样创建一个新的 observable,这是缓存的正确行为。
这是一个可重用的功能
export function cacheable<T>(o: Observable<T>): Observable<T> {
let replay = new ReplaySubject<T>(1);
o.subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
return replay.asObservable();
}
这是如何使用它
import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';
@Injectable()
export class SettingsService {
_cache: Observable<any>;
constructor(private _http: Http, ) { }
refresh = () => {
if (this._cache) {
return this._cache;
}
return this._cache = cacheable<any>(this._http.get('YOUR URL'));
}
}
下面是可缓存功能的更高级版本 这允许拥有自己的查找表 + 提供自定义查找表的能力。这样,您不必像上面的示例那样检查 this._cache。还要注意,不是传递 observable 作为第一个参数,而是传递一个返回 observables 的函数,这是因为 Angular 的 Http 立即执行,所以通过返回一个惰性执行的函数,我们可以决定不调用它,如果它已经在我们的缓存。
let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
if (!!key && (customCache || cacheableCache)[key]) {
return (customCache || cacheableCache)[key] as Observable<T>;
}
let replay = new ReplaySubject<T>(1);
returnObservable().subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
let observable = replay.asObservable();
if (!!key) {
if (!!customCache) {
customCache[key] = observable;
} else {
cacheableCache[key] = observable;
}
}
return observable;
}
用法:
getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")
const data$ = this._http.get('url').pipe(cacheable()); /*1st subscribe*/ data$.subscribe(); /*2nd subscribe*/ data$.subscribe();
?所以它的行为更像任何其他运算符..
rxjs 5.4.0 有一个新的 shareReplay 方法。
rx-book shareReplay()
在 reactivex.io/rxjs 没有文档
作者明确表示“非常适合处理缓存 AJAX 结果之类的事情”
rxjs PR #2443 feat(shareReplay): adds shareReplay
variant of publishReplay
shareReplay 返回一个 observable,它是通过 ReplaySubject 多播的源。该重播主题会在源错误时回收,但不会在源完成时回收。这使得 shareReplay 非常适合处理诸如缓存 AJAX 结果之类的事情,因为它是可重试的。然而,它的重复行为与 share 不同,它不会重复源 observable,而是重复源 observable 的值。
根据这个article
事实证明,我们可以通过添加 publishReplay(1) 和 refCount 轻松地将缓存添加到 observable。
所以里面 if 语句只是追加
.publishReplay(1)
.refCount();
.map(...)
rxjs version 5.4.0 (2017-05-09) 添加了对 shareReplay 的支持。
为什么要使用 shareReplay?当您不希望在多个订阅者之间执行的副作用或繁重的计算时,您通常希望使用 shareReplay。在您知道您将有迟到的订阅者需要访问先前发出的值的流的情况下,它也可能很有价值。这种在订阅时重播价值的能力是 share 和 shareReplay 的区别。
您可以轻松地修改一个角度服务来使用它并返回一个带有缓存结果的可观察对象,该结果只会进行一次 http 调用(假设第一次调用成功)。
示例 Angular 服务
这是一个使用 shareReplay
的非常简单的客户服务。
客户服务.ts
import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';
@Injectable({providedIn: 'root'})
export class CustomerService {
private readonly _getCustomers: Observable<ICustomer[]>;
constructor(private readonly http: HttpClient) {
this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
}
getCustomers() : Observable<ICustomer[]> {
return this._getCustomers;
}
}
export interface ICustomer {
/* ICustomer interface fields defined here */
}
请注意,构造函数中的赋值可以移动到方法 getCustomers
,但作为从 HttpClient
are "cold" 返回的 observable,在构造函数中这样做是可以接受的,因为 http 调用只会在第一次调用 {4 时进行}.
这里还假设初始返回的数据在应用程序实例的生命周期内不会过时。
我为这个问题加了星标,但我会尝试解决这个问题。
//this will be the shared observable that
//anyone can subscribe to, get the value,
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);
getCustomer().subscribe(customer$);
//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));
//here's the second subscriber
setTimeout(() => {
customer$.subscribe(val => console.log('subscriber 2: ' + val));
}, 1000);
function getCustomer() {
return new Rx.Observable(observer => {
console.log('api request');
setTimeout(() => {
console.log('api response');
observer.next('customer object');
observer.complete();
}, 500);
});
}
这是proof :)
只有一个要点:getCustomer().subscribe(customer$)
我们没有订阅 getCustomer()
的 api 响应,我们订阅了一个可观察的 ReplaySubject,它也能够订阅不同的 Observable 并且(这很重要)保存它的最后一个发出的值并将其重新发布到任何它是(ReplaySubject 的)订阅者。
我找到了一种将 http get 结果存储到 sessionStorage 并将其用于会话的方法,这样它就不会再调用服务器了。
我用它来调用 github API 以避免使用限制。
@Injectable()
export class HttpCache {
constructor(private http: Http) {}
get(url: string): Observable<any> {
let cached: any;
if (cached === sessionStorage.getItem(url)) {
return Observable.of(JSON.parse(cached));
} else {
return this.http.get(url)
.map(resp => {
sessionStorage.setItem(url, resp.text());
return resp.json();
});
}
}
}
仅供参考,sessionStorage 限制为 5M(或 4.75M)。因此,它不应该像这样用于大量数据。
------ 编辑 ------------- 如果你想用 F5 刷新数据,它使用内存数据而不是 sessionStorage;
@Injectable()
export class HttpCache {
cached: any = {}; // this will store data
constructor(private http: Http) {}
get(url: string): Observable<any> {
if (this.cached[url]) {
return Observable.of(this.cached[url]));
} else {
return this.http.get(url)
.map(resp => {
this.cached[url] = resp.text();
return resp.json();
});
}
}
}
sessionStorage
所说,我只会将它用于预期在整个会话中保持一致的数据。
getCustomer
。 ;) 所以只是想警告一些可能看不到风险的人 :)
您选择的实现将取决于您是否希望 unsubscribe() 取消您的 HTTP 请求。
在任何情况下,TypeScript decorators 都是标准化行为的好方法。这是我写的:
@CacheObservableArgsKey
getMyThing(id: string): Observable<any> {
return this.http.get('things/'+id);
}
装饰器定义:
/**
* Decorator that replays and connects to the Observable returned from the function.
* Caches the result using all arguments to form a key.
* @param target
* @param name
* @param descriptor
* @returns {PropertyDescriptor}
*/
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
const originalFunc = descriptor.value;
const cacheMap = new Map<string, any>();
descriptor.value = function(this: any, ...args: any[]): any {
const key = args.join('::');
let returnValue = cacheMap.get(key);
if (returnValue !== undefined) {
console.log(`${name} cache-hit ${key}`, returnValue);
return returnValue;
}
returnValue = originalFunc.apply(this, args);
console.log(`${name} cache-miss ${key} new`, returnValue);
if (returnValue instanceof Observable) {
returnValue = returnValue.publishReplay(1);
returnValue.connect();
}
else {
console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
}
cacheMap.set(key, returnValue);
return returnValue;
};
return descriptor;
}
Property 'connect' does not exist on type '{}'.
来自第 returnValue.connect();
行。你能详细说明吗?
使用 Rxjs Observer/Observable + 缓存 + 订阅的可缓存 HTTP 响应数据
请参阅下面的代码
*免责声明:我是 rxjs 的新手,所以请记住,我可能会滥用可观察/观察者方法。我的解决方案纯粹是我找到的其他解决方案的集合,并且是未能找到一个简单的、有据可查的解决方案的结果。因此,我提供了完整的代码解决方案(正如我希望找到的那样),希望它可以帮助其他人。
*注意,这种方法松散地基于 GoogleFirebaseObservables。不幸的是,我缺乏适当的经验/时间来复制他们在幕后所做的事情。但以下是提供对某些可缓存数据的异步访问的简单方法。
情况:“产品列表”组件的任务是显示产品列表。该网站是一个单页网络应用程序,带有一些菜单按钮,可以“过滤”页面上显示的产品。
解决方案:组件“订阅”一个服务方法。 service 方法返回一个产品对象数组,组件通过订阅回调访问这些对象。服务方法将其活动包装在一个新创建的观察者中并返回观察者。在这个观察者内部,它搜索缓存的数据并将其传递回订阅者(组件)并返回。否则,它会发出一个 http 调用来检索数据,订阅响应,您可以在其中处理该数据(例如将数据映射到您自己的模型),然后将数据传回给订阅者。
编码
产品列表.component.ts
import { Component, OnInit, Input } from '@angular/core';
import { ProductService } from '../../../services/product.service';
import { Product, ProductResponse } from '../../../models/Product';
@Component({
selector: 'app-product-list',
templateUrl: './product-list.component.html',
styleUrls: ['./product-list.component.scss']
})
export class ProductListComponent implements OnInit {
products: Product[];
constructor(
private productService: ProductService
) { }
ngOnInit() {
console.log('product-list init...');
this.productService.getProducts().subscribe(products => {
console.log('product-list received updated products');
this.products = products;
});
}
}
产品.service.ts
import { Injectable } from '@angular/core';
import { Http, Headers } from '@angular/http';
import { Observable, Observer } from 'rxjs';
import 'rxjs/add/operator/map';
import { Product, ProductResponse } from '../models/Product';
@Injectable()
export class ProductService {
products: Product[];
constructor(
private http:Http
) {
console.log('product service init. calling http to get products...');
}
getProducts():Observable<Product[]>{
//wrap getProducts around an Observable to make it async.
let productsObservable$ = Observable.create((observer: Observer<Product[]>) => {
//return products if it was previously fetched
if(this.products){
console.log('## returning existing products');
observer.next(this.products);
return observer.complete();
}
//Fetch products from REST API
console.log('** products do not yet exist; fetching from rest api...');
let headers = new Headers();
this.http.get('http://localhost:3000/products/', {headers: headers})
.map(res => res.json()).subscribe((response:ProductResponse) => {
console.log('productResponse: ', response);
let productlist = Product.fromJsonList(response.products); //convert service observable to product[]
this.products = productlist;
observer.next(productlist);
});
});
return productsObservable$;
}
}
product.ts(模型)
export interface ProductResponse {
success: boolean;
msg: string;
products: Product[];
}
export class Product {
product_id: number;
sku: string;
product_title: string;
..etc...
constructor(product_id: number,
sku: string,
product_title: string,
...etc...
){
//typescript will not autoassign the formal parameters to related properties for exported classes.
this.product_id = product_id;
this.sku = sku;
this.product_title = product_title;
...etc...
}
//Class method to convert products within http response to pure array of Product objects.
//Caller: product.service:getProducts()
static fromJsonList(products:any): Product[] {
let mappedArray = products.map(Product.fromJson);
return mappedArray;
}
//add more parameters depending on your database entries and constructor
static fromJson({
product_id,
sku,
product_title,
...etc...
}): Product {
return new Product(
product_id,
sku,
product_title,
...etc...
);
}
}
这是我在 Chrome 中加载页面时看到的输出示例。请注意,在初始加载时,产品是从 http 获取的(调用我的节点休息服务,该服务在端口 3000 上本地运行)。然后,当我单击导航到产品的“过滤”视图时,会在缓存中找到产品。
我的 Chrome 日志(控制台):
core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
app.component.ts:19 app.component url: /products
product.service.ts:15 product service init. calling http to get products...
product-list.component.ts:18 product-list init...
product.service.ts:29 ** products do not yet exist; fetching from rest api...
product.service.ts:33 productResponse: {success: true, msg: "Products found", products: Array(23)}
product-list.component.ts:20 product-list received updated products
...[单击菜单按钮过滤产品]...
app.component.ts:19 app.component url: /products/chocolatechip
product-list.component.ts:18 product-list init...
product.service.ts:24 ## returning existing products
product-list.component.ts:20 product-list received updated products
结论:这是我(到目前为止)发现的实现可缓存 http 响应数据的最简单方法。在我的 Angular 应用程序中,每次我导航到产品的不同视图时,产品列表组件都会重新加载。 ProductService 似乎是一个共享实例,所以导航时会保留 ProductService 中 'products: Product[]' 的本地缓存,后续调用“GetProducts()”会返回缓存的值。最后一点,我已经阅读了有关在完成后如何关闭可观察对象/订阅以防止“内存泄漏”的评论。我没有在此处包含此内容,但请记住这一点。
我认为 @ngx-cache/core 可能有助于维护 http 调用的缓存功能,尤其是当 HTTP 调用同时在 browser 和 server 平台上进行时。
假设我们有以下方法:
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
您可以使用 @ngx-cache/core 的 Cached
装饰器来存储在 cache storage
进行 HTTP 调用的方法返回的值(storage
可以配置,请检查 ng-seed/universal< /em>) - 就在第一次执行时。下次调用该方法时(无论是在 browser 还是 server 平台上),都会从 cache storage
中检索该值。
import { Cached } from '@ngx-cache/core';
...
@Cached('get-customer') // the cache key/identifier
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
还可以通过 caching API 使用缓存方法(has
、get
、set
)。
任何类.ts
...
import { CacheService } from '@ngx-cache/core';
@Injectable()
export class AnyClass {
constructor(private readonly cache: CacheService) {
// note that CacheService is injected into a private property of AnyClass
}
// will retrieve 'some string value'
getSomeStringValue(): string {
if (this.cache.has('some-string'))
return this.cache.get('some-string');
this.cache.set('some-string', 'some string value');
return 'some string value';
}
}
以下是客户端和服务器端缓存的包列表:
@ngx-cache/core:缓存实用程序
@ngx-cache/platform-browser:SPA/浏览器平台实现
@ngx-cache/platform-server:服务器平台实现
@ngx-cache/fs-storage:存储实用程序(服务器平台需要)
我们要做的是确保这不会导致多个网络请求。
我个人最喜欢的是使用 async
方法来发出网络请求。这些方法本身不返回值,而是更新同一服务中的 BehaviorSubject
,组件将订阅该服务。
现在为什么要使用 BehaviorSubject
而不是 Observable
?因为,
订阅后 BehaviorSubject 返回最后一个值,而常规 observable 仅在收到 onnext 时触发。
如果要在不可观察的代码中检索 BehaviorSubject 的最后一个值(无需订阅),可以使用 getValue() 方法。
例子:
客户服务.ts
public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);
public async getCustomers(): Promise<void> {
let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
if (customers)
this.customers$.next(customers);
}
然后,只要需要,我们就可以订阅 customers$
。
public ngOnInit(): void {
this.customerService.customers$
.subscribe((customers: Customer[]) => this.customerList = customers);
}
或者,也许您想直接在模板中使用它
<li *ngFor="let customer of customerService.customers$ | async"> ... </li>
因此,在您再次调用 getCustomers
之前,数据将保留在 customers$
BehaviorSubject 中。
那么如果你想刷新这些数据呢?只需拨打 getCustomers()
public async refresh(): Promise<void> {
try {
await this.customerService.getCustomers();
}
catch (e) {
// request failed, handle exception
console.error(e);
}
}
使用此方法,我们不必在后续网络调用之间显式保留数据,因为它是由 BehaviorSubject
处理的。
PS: 通常,当组件被销毁时,最好摆脱订阅,因为您可以使用this答案中建议的方法。
您可以构建简单的类 Cacheable<> 来帮助管理从具有多个订阅者的 http 服务器检索的数据:
declare type GetDataHandler<T> = () => Observable<T>;
export class Cacheable<T> {
protected data: T;
protected subjectData: Subject<T>;
protected observableData: Observable<T>;
public getHandler: GetDataHandler<T>;
constructor() {
this.subjectData = new ReplaySubject(1);
this.observableData = this.subjectData.asObservable();
}
public getData(): Observable<T> {
if (!this.getHandler) {
throw new Error("getHandler is not defined");
}
if (!this.data) {
this.getHandler().map((r: T) => {
this.data = r;
return r;
}).subscribe(
result => this.subjectData.next(result),
err => this.subjectData.error(err)
);
}
return this.observableData;
}
public resetCache(): void {
this.data = null;
}
public refresh(): void {
this.resetCache();
this.getData();
}
}
用法
声明 Cacheable<> 对象(大概作为服务的一部分):
list: Cacheable<string> = new Cacheable<string>();
和处理程序:
this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}
从组件调用:
//gets data from server
List.getData().subscribe(…)
您可以订阅多个组件。
更多详细信息和代码示例在这里:http://devinstance.net/articles/20171021/rxjs-cacheable
很好的答案。
或者你可以这样做:
这是来自最新版本的 rxjs。我正在使用 5.5.7 版本的 RxJS
import {share} from "rxjs/operators";
this.http.get('/someUrl').pipe(share());
rxjs 5.3.0
我对.map(myFunction).publishReplay(1).refCount()
不满意
对于多个订阅者,.map()
在某些情况下会执行 myFunction
两次(我希望它只执行一次)。一种修复方法似乎是 publishReplay(1).refCount().take(1)
您可以做的另一件事就是不要使用 refCount()
并立即使 Observable 变热:
let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;
无论订阅者如何,这都会启动 HTTP 请求。我不确定在 HTTP GET 完成之前取消订阅是否会取消它。
它是 .publishReplay(1).refCount();
或 .publishLast().refCount();
,因为 Angular Http observables 在请求后完成。
这个简单的类缓存了结果,因此您可以多次订阅 .value 并且只发出 1 个请求。您还可以使用 .reload() 发出新请求并发布数据。
你可以像这样使用它:
let res = new RestResource(() => this.http.get('inline.bundleo.js'));
res.status.subscribe((loading)=>{
console.log('STATUS=',loading);
});
res.value.subscribe((value) => {
console.log('VALUE=', value);
});
和来源:
export class RestResource {
static readonly LOADING: string = 'RestResource_Loading';
static readonly ERROR: string = 'RestResource_Error';
static readonly IDLE: string = 'RestResource_Idle';
public value: Observable<any>;
public status: Observable<string>;
private loadStatus: Observer<any>;
private reloader: Observable<any>;
private reloadTrigger: Observer<any>;
constructor(requestObservableFn: () => Observable<any>) {
this.status = Observable.create((o) => {
this.loadStatus = o;
});
this.reloader = Observable.create((o: Observer<any>) => {
this.reloadTrigger = o;
});
this.value = this.reloader.startWith(null).switchMap(() => {
if (this.loadStatus) {
this.loadStatus.next(RestResource.LOADING);
}
return requestObservableFn()
.map((res) => {
if (this.loadStatus) {
this.loadStatus.next(RestResource.IDLE);
}
return res;
}).catch((err)=>{
if (this.loadStatus) {
this.loadStatus.next(RestResource.ERROR);
}
return Observable.of(null);
});
}).publishReplay(1).refCount();
}
reload() {
this.reloadTrigger.next(null);
}
}
只需在地图之后和任何订阅之前调用 share() 。
就我而言,我有一个通用服务 (RestClientService.ts),它正在进行其余调用、提取数据、检查错误并将 observable 返回到具体实现服务(例如:ContractClientService.ts),最后是这个具体实现返回可观察到的 de ContractComponent.ts,并且这个订阅更新视图。
RestClientService.ts:
export abstract class RestClientService<T extends BaseModel> {
public GetAll = (path: string, property: string): Observable<T[]> => {
let fullPath = this.actionUrl + path;
let observable = this._http.get(fullPath).map(res => this.extractData(res, property));
observable = observable.share(); //allows multiple subscribers without making again the http request
observable.subscribe(
(res) => {},
error => this.handleError2(error, "GetAll", fullPath),
() => {}
);
return observable;
}
private extractData(res: Response, property: string) {
...
}
private handleError2(error: any, method: string, path: string) {
...
}
}
合同服务.ts:
export class ContractService extends RestClientService<Contract> {
private GET_ALL_ITEMS_REST_URI_PATH = "search";
private GET_ALL_ITEMS_PROPERTY_PATH = "contract";
public getAllItems(): Observable<Contract[]> {
return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH);
}
}
合同组件.ts:
export class ContractComponent implements OnInit {
getAllItems() {
this.rcService.getAllItems().subscribe((data) => {
this.items = data;
});
}
}
我写了一个缓存类,
/**
* Caches results returned from given fetcher callback for given key,
* up to maxItems results, deletes the oldest results when full (FIFO).
*/
export class StaticCache
{
static cachedData: Map<string, any> = new Map<string, any>();
static maxItems: number = 400;
static get(key: string){
return this.cachedData.get(key);
}
static getOrFetch(key: string, fetcher: (string) => any): any {
let value = this.cachedData.get(key);
if (value != null){
console.log("Cache HIT! (fetcher)");
return value;
}
console.log("Cache MISS... (fetcher)");
value = fetcher(key);
this.add(key, value);
return value;
}
static add(key, value){
this.cachedData.set(key, value);
this.deleteOverflowing();
}
static deleteOverflowing(): void {
if (this.cachedData.size > this.maxItems) {
this.deleteOldest(this.cachedData.size - this.maxItems);
}
}
/// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
/// However that seems not to work. Trying with forEach.
static deleteOldest(howMany: number): void {
//console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
let iterKeys = this.cachedData.keys();
let item: IteratorResult<string>;
while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
//console.debug(" Deleting: " + item.value);
this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
}
}
static clear(): void {
this.cachedData = new Map<string, any>();
}
}
由于我们如何使用它,这一切都是静态的,但请随意将其设为普通类和服务。我不确定 Angular 是否一直保留一个实例(Angular2 的新手)。
这就是我使用它的方式:
let httpService: Http = this.http;
function fetcher(url: string): Observable<any> {
console.log(" Fetching URL: " + url);
return httpService.get(url).map((response: Response) => {
if (!response) return null;
if (typeof response.json() !== "array")
throw new Error("Graph REST should return an array of vertices.");
let items: any[] = graphService.fromJSONarray(response.json(), httpService);
return array ? items : items[0];
});
}
// If data is a link, return a result of a service call.
if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
{
// Make an HTTP call.
let url = this.data[verticesLabel][name]["link"];
let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
if (!cachedObservable)
throw new Error("Failed loading link: " + url);
return cachedObservable;
}
我认为可能有一个更聪明的方法,它会使用一些 Observable
技巧,但这对我的目的来说很好。
只需使用这个缓存层,它就可以满足您的所有需求,甚至可以管理 ajax 请求的缓存。
http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html
这很容易使用
@Component({
selector: 'home',
templateUrl: './html/home.component.html',
styleUrls: ['./css/home.component.css'],
})
export class HomeComponent {
constructor(AjaxService:AjaxService){
AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;});
}
articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]};
}
该层(作为可注入的角度服务)是
import { Injectable } from '@angular/core';
import { Http, Response} from '@angular/http';
import { Observable } from 'rxjs/Observable';
import './../rxjs/operator'
@Injectable()
export class AjaxService {
public data:Object={};
/*
private dataObservable:Observable<boolean>;
*/
private dataObserver:Array<any>=[];
private loading:Object={};
private links:Object={};
counter:number=-1;
constructor (private http: Http) {
}
private loadPostCache(link:string){
if(!this.loading[link]){
this.loading[link]=true;
this.links[link].forEach(a=>this.dataObserver[a].next(false));
this.http.get(link)
.map(this.setValue)
.catch(this.handleError).subscribe(
values => {
this.data[link] = values;
delete this.loading[link];
this.links[link].forEach(a=>this.dataObserver[a].next(false));
},
error => {
delete this.loading[link];
}
);
}
}
private setValue(res: Response) {
return res.json() || { };
}
private handleError (error: Response | any) {
// In a real world app, we might use a remote logging infrastructure
let errMsg: string;
if (error instanceof Response) {
const body = error.json() || '';
const err = body.error || JSON.stringify(body);
errMsg = `${error.status} - ${error.statusText || ''} ${err}`;
} else {
errMsg = error.message ? error.message : error.toString();
}
console.error(errMsg);
return Observable.throw(errMsg);
}
postCache(link:string): Observable<Object>{
return Observable.create(observer=> {
if(this.data.hasOwnProperty(link)){
observer.next(this.data[link]);
}
else{
let _observable=Observable.create(_observer=>{
this.counter=this.counter+1;
this.dataObserver[this.counter]=_observer;
this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]);
_observer.next(false);
});
this.loadPostCache(link);
_observable.subscribe(status=>{
if(status){
observer.next(this.data[link]);
}
}
);
}
});
}
}
你可以简单地使用 ngx-cacheable!它更适合您的场景。
使用它的好处它只调用一次rest API,缓存响应并为后续请求返回相同的响应。创建/更新/删除操作后可以根据需要调用API。
因此,您的服务类将是这样的 -
import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';
const customerNotifier = new Subject();
@Injectable()
export class customersService {
// relieves all its caches when any new value is emitted in the stream using notifier
@Cacheable({
cacheBusterObserver: customerNotifier,
async: true
})
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}
// notifies the observer to refresh the data
@CacheBuster({
cacheBusterNotifier: customerNotifier
})
addCustomer() {
// some code
}
// notifies the observer to refresh the data
@CacheBuster({
cacheBusterNotifier: customerNotifier
})
updateCustomer() {
// some code
}
}
Here 是更多参考链接。
上面的大多数答案都适用于不接受输入的 http 请求。每次您想使用某些输入进行 api 调用时,都需要重新创建请求。上面唯一可以处理此问题的响应是 @Arlo's reply。
我创建了一个稍微简单的装饰器,您可以使用它来将响应共享给具有相同输入的每个调用者。与 Arlo 的回复不同,这不会重播对延迟订阅者的响应,而是将同时处理的请求作为一个请求。如果目标是重放对延迟观察者的响应(也称为缓存响应),您可以修改下面的代码并将 share()
替换为 shareReplay(1)
:
https://gist.github.com/OysteinAmundsen/b97a2359292463feb8c0e2270ed6695a
import { finalize, Observable, share } from 'rxjs';
export function SharedObservable(): MethodDecorator {
const obs$ = new Map<string, Observable<any>>();
return (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
const originalMethod = descriptor.value;
descriptor.value = function (...args: any[]) {
const key = JSON.stringify(args);
if (!obs$.has(key)) {
// We have no observable for this key yet, so we create one
const res = originalMethod.apply(this, args).pipe(
share(), // Make the observable hot
finalize(() => obs$.delete(key)) // Cleanup when observable is complete
);
obs$.set(key, res);
}
// Return the cached observable
return obs$.get(key);
};
return descriptor;
};
}
用法:
@SharedObservable()
myFunc(id: number): Observable<any> {
return this.http.get<any>(`/api/someUrl/${id}`);
}
您是否尝试过运行已有的代码?
因为您是根据 getJSON()
产生的 Promise 构造 Observable,所以网络请求是在任何人订阅之前发出的。由此产生的承诺由所有订阅者共享。
var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...
do()
与map()
相反,不会修改事件。您也可以使用map()
,但是您必须确保在回调结束时返回正确的值。.subscribe()
的调用站点不需要该值,您可以这样做,因为它可能只得到null
(取决于this.extractData
返回的内容),但恕我直言,这并没有表达代码很好。this.extraData
像extraData() { if(foo) { doSomething();}}
一样结束时,返回最后一个表达式的结果,这可能不是您想要的。if (this.observable) { return this.observable; } else { this.observable = this.http.get(url) .map(res => res.json().data); return this.observable; }