ChatGPT解决这个技术问题 Extra ChatGPT

在 RxJs 5 中共享 Angular Http 网络调用结果的正确方法是什么?

通过使用 Http,我们调用了一个执行网络调用并返回 http observable 的方法:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

如果我们采用这个 observable 并向它添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

我们要做的是确保这不会导致多个网络请求。

这可能看起来像一个不寻常的场景,但实际上很常见:例如,如果调用者订阅 observable 以显示错误消息,并使用异步管道将其传递给模板,我们已经有两个订阅者。

在 RxJs 5 中这样做的正确方法是什么?

也就是说,这似乎工作正常:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

但这是在 RxJs 5 中这样做的惯用方式,还是我们应该做其他事情?

注意:根据 Angular 5 新的 HttpClient,所有示例中的 .map(res => res.json()) 部分现在都没有用了,因为现在默认采用 JSON 结果。

> share 与 publish().refCount() 相同。其实不是。请参阅以下讨论:github.com/ReactiveX/rxjs/issues/1363
编辑过的问题,根据问题看起来代码上的文档需要更新-> github.com/ReactiveX/rxjs/blob/master/src/operator/share.ts
我认为“这取决于”。但是对于无法在本地 b/c 缓存数据的调用,由于参数更改/组合,它可能没有意义 .share() 似乎绝对是正确的。但是,如果您可以在本地缓存内容,则有关 ReplaySubject/BehaviorSubject 的其他一些答案也是不错的解决方案。
我认为我们不仅需要缓存数据,还需要更新/修改缓存的数据。这是一个常见的情况。例如,如果我想向缓存的模型添加一个新字段或更新字段的值。也许使用 CRUD 方法创建一个单例 DataCacheService 是更好的方法?就像 Redux 的商店。你怎么看?
你可以简单地使用 ngx-cacheable!它更适合您的场景。参考下面我的回答

G
Gaël J

编辑:截至 2021 年,正确的方法是使用 RxJs 原生提出的 shareReplay 运算符。在下面的答案中查看更多详细信息。

缓存数据,如果缓存可用,则返回此数据,否则发出 HTTP 请求。

import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';

@Injectable()
export class DataService {
  private url: string = 'https://cors-test.appspot.com/test';
  
  private data: Data;
  private observable: Observable<any>;

  constructor(private http: Http) {}

  getData() {
    if(this.data) {
      // if `data` is available just return it as `Observable`
      return Observable.of(this.data); 
    } else if(this.observable) {
      // if `this.observable` is set then the request is in progress
      // return the `Observable` for the ongoing request
      return this.observable;
    } else {
      // example header (not necessary)
      let headers = new Headers();
      headers.append('Content-Type', 'application/json');
      // create the request, store the `Observable` for subsequent subscribers
      this.observable = this.http.get(this.url, {
        headers: headers
      })
      .map(response =>  {
        // when the cached data is available we don't need the `Observable` reference anymore
        this.observable = null;

        if(response.status == 400) {
          return "FAILURE";
        } else if(response.status == 200) {
          this.data = new Data(response.json());
          return this.data;
        }
        // make it shared so more than one subscriber can get the result
      })
      .share();
      return this.observable;
    }
  }
}

Plunker example

这篇文章 https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html 很好地解释了如何使用 shareReplay 进行缓存。


do()map() 相反,不会修改事件。您也可以使用 map(),但是您必须确保在回调结束时返回正确的值。
如果执行 .subscribe() 的调用站点不需要该值,您可以这样做,因为它可能只得到 null(取决于 this.extractData 返回的内容),但恕我直言,这并没有表达代码很好。
this.extraDataextraData() { if(foo) { doSomething();}} 一样结束时,返回最后一个表达式的结果,这可能不是您想要的。
@Günter,谢谢你的代码,它有效。但是,我试图了解您为什么要分别跟踪 Data 和 Observable。通过只缓存 Observable<Data> 你不会有效地达到同样的效果吗?像这样? if (this.observable) { return this.observable; } else { this.observable = this.http.get(url) .map(res => res.json().data); return this.observable; }
@HarleenKaur 这是一个将接收到的 JSON 反序列化为的类,以获得强大的类型检查和自动完成功能。没有必要使用它,但它很常见。
A
Angular University

根据@Cristian 的建议,这是一种适用于 HTTP 可观察对象的方法,它只发出一次然后完成:

getCustomer() {
    return this.http.get('/someUrl')
        .map(res => res.json()).publishLast().refCount();
}

使用这种方法有几个问题——返回的 observable 不能被取消或重试。这对您来说可能不是问题,但它可能再次出现。如果这是一个问题,那么 share 运算符可能是一个合理的选择(尽管有一些令人讨厌的边缘情况)。有关选项的深入讨论,请参阅此博客文章中的评论部分:blog.jhades.org/…
小澄清...虽然严格来说,publishLast().refCount() 共享的源 observable 无法取消,但一旦取消了对 refCount 返回的 observable 的所有订阅,净效果是源 observable 将被取消订阅,如果它取消它在哪里“机上”
@Christian嘿,你能解释一下“不能取消或重试”的意思吗?谢谢。
t
taras-d

更新:Ben Lesh 说,在 5.2.0 之后的下一个小版本中,您将能够调用 shareReplay() 来真正缓存。

之前.....

首先,不要使用 share() 或 publishReplay(1).refCount(),它们是相同的,问题在于它仅在可观察对象处于活动状态时建立连接时共享,如果您在完成后连接,它再次创建一个新的可观察对象,翻译,而不是真正的缓存。

Birowski 在上面给出了正确的解决方案,即使用 ReplaySubject。在我们的案例 1 中,ReplaySubject 将缓存你给它的值(bufferSize)。一旦 refCount 达到零并且你建立一个新的连接,它就不会像 share() 那样创建一个新的 observable,这是缓存的正确行为。

这是一个可重用的功能

export function cacheable<T>(o: Observable<T>): Observable<T> {
  let replay = new ReplaySubject<T>(1);
  o.subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  return replay.asObservable();
}

这是如何使用它

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';

@Injectable()
export class SettingsService {
  _cache: Observable<any>;
  constructor(private _http: Http, ) { }

  refresh = () => {
    if (this._cache) {
      return this._cache;
    }
    return this._cache = cacheable<any>(this._http.get('YOUR URL'));
  }
}

下面是可缓存功能的更高级版本 这允许拥有自己的查找表 + 提供自定义查找表的能力。这样,您不必像上面的示例那样检查 this._cache。还要注意,不是传递 observable 作为第一个参数,而是传递一个返回 observables 的函数,这是因为 Angular 的 Http 立即执行,所以通过返回一个惰性执行的函数,我们可以决定不调用它,如果它已经在我们的缓存。

let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
  if (!!key && (customCache || cacheableCache)[key]) {
    return (customCache || cacheableCache)[key] as Observable<T>;
  }
  let replay = new ReplaySubject<T>(1);
  returnObservable().subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  let observable = replay.asObservable();
  if (!!key) {
    if (!!customCache) {
      customCache[key] = observable;
    } else {
      cacheableCache[key] = observable;
    }
  }
  return observable;
}

用法:

getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")

有什么理由不将此解决方案用作 RxJs 运算符:const data$ = this._http.get('url').pipe(cacheable()); /*1st subscribe*/ data$.subscribe(); /*2nd subscribe*/ data$.subscribe();?所以它的行为更像任何其他运算符..
A
Arlo

rxjs 5.4.0 有一个新的 shareReplay 方法。

rx-book shareReplay()

在 reactivex.io/rxjs 没有文档

作者明确表示“非常适合处理缓存 AJAX 结果之类的事情”

rxjs PR #2443 feat(shareReplay): adds shareReplay variant of publishReplay

shareReplay 返回一个 observable,它是通过 ReplaySubject 多播的源。该重播主题会在源错误时回收,但不会在源完成时回收。这使得 shareReplay 非常适合处理诸如缓存 AJAX 结果之类的事情,因为它是可重试的。然而,它的重复行为与 share 不同,它不会重复源 observable,而是重复源 observable 的值。


与此有关吗?不过,这些文档来自 2014 年。 github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/…
我尝试将 .shareReplay(1, 10000) 添加到可观察对象,但我没有注意到任何缓存或行为变化。有可用的工作示例吗?
查看更新日志 github.com/ReactiveX/rxjs/blob/… 它出现在前面,在 v5 中被删除,在 5.4 中重新添加 - rx-book 链接确实引用了 v4,但它存在于当前的 LTS v5.5.6 中并且它在 v6 中。我想那里的 rx-book 链接已经过时了。
G
Günter Zöchbauer

根据这个article

事实证明,我们可以通过添加 publishReplay(1) 和 refCount 轻松地将缓存添加到 observable。

所以里面 if 语句只是追加

.publishReplay(1)
.refCount();

.map(...)


I
Igor

rxjs version 5.4.0 (2017-05-09) 添加了对 shareReplay 的支持。

为什么要使用 shareReplay?当您不希望在多个订阅者之间执行的副作用或繁重的计算时,您通常希望使用 shareReplay。在您知道您将有迟到的订阅者需要访问先前发出的值的流的情况下,它也可能很有价值。这种在订阅时重播价值的能力是 share 和 shareReplay 的区别。

您可以轻松地修改一个角度服务来使用它并返回一个带有缓存结果的可观察对象,该结果只会进行一次 http 调用(假设第一次调用成功)。

示例 Angular 服务

这是一个使用 shareReplay 的非常简单的客户服务。

客户服务.ts

import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Injectable({providedIn: 'root'})
export class CustomerService {

    private readonly _getCustomers: Observable<ICustomer[]>;

    constructor(private readonly http: HttpClient) {
        this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
    }
    
    getCustomers() : Observable<ICustomer[]> {
        return this._getCustomers;
    }
}

export interface ICustomer {
  /* ICustomer interface fields defined here */
}

请注意,构造函数中的赋值可以移动到方法 getCustomers,但作为从 HttpClient are "cold" 返回的 observable,在构造函数中这样做是可以接受的,因为 http 调用只会在第一次调用 {4 时进行}.

这里还假设初始返回的数据在应用程序实例的生命周期内不会过时。


我真的很喜欢这种模式,并希望在我跨多个应用程序使用的 api 服务共享库中实现它。一个例子是 UserService,除了几个地方之外的任何地方都不需要在应用程序的生命周期内使缓存无效,但是对于这些情况,我将如何在不导致以前的订阅成为孤立的情况下使其无效?
如果我们将构造函数中 Observable 的创建移到 getCustomer 方法中,那么调用 getCustomer 的不同组件将收到不同的 observable 实例。那可能不是我们想要的。所以我相信 observable 的创建应该在构造函数中。如果我们认为对 getCustomer() 的不同调用应该返回不同的 observables 是可以的,那么在方法本身中就可以了。
G
German Lashevich

我为这个问题加了星标,但我会尝试解决这个问题。

//this will be the shared observable that 
//anyone can subscribe to, get the value, 
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);

getCustomer().subscribe(customer$);

//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));

//here's the second subscriber
setTimeout(() => {
  customer$.subscribe(val => console.log('subscriber 2: ' + val));  
}, 1000);

function getCustomer() {
  return new Rx.Observable(observer => {
    console.log('api request');
    setTimeout(() => {
      console.log('api response');
      observer.next('customer object');
      observer.complete();
    }, 500);
  });
}

这是proof :)

只有一个要点:getCustomer().subscribe(customer$)

我们没有订阅 getCustomer() 的 api 响应,我们订阅了一个可观察的 ReplaySubject,它也能够订阅不同的 Observable 并且(这很重要)保存它的最后一个发出的值并将其重新发布到任何它是(ReplaySubject 的)订阅者。


我喜欢这种方法,因为它很好地利用了 rxjs,无需添加自定义逻辑,谢谢
a
allenhwkim

我找到了一种将 http get 结果存储到 sessionStorage 并将其用于会话的方法,这样它就不会再调用服务器了。

我用它来调用 github API 以避免使用限制。

@Injectable()
export class HttpCache {
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    let cached: any;
    if (cached === sessionStorage.getItem(url)) {
      return Observable.of(JSON.parse(cached));
    } else {
      return this.http.get(url)
        .map(resp => {
          sessionStorage.setItem(url, resp.text());
          return resp.json();
        });
    }
  }
}

仅供参考,sessionStorage 限制为 5M(或 4.75M)。因此,它不应该像这样用于大量数据。

------ 编辑 ------------- 如果你想用 F5 刷新数据,它使用内存数据而不是 sessionStorage;

@Injectable()
export class HttpCache {
  cached: any = {};  // this will store data
  constructor(private http: Http) {}

  get(url: string): Observable<any> {
    if (this.cached[url]) {
      return Observable.of(this.cached[url]));
    } else {
      return this.http.get(url)
        .map(resp => {
          this.cached[url] = resp.text();
          return resp.json();
        });
    }
  }
}

如果您将存储在会话存储中,那么当您离开应用程序时,您将如何确保会话存储被销毁?
但这会给用户带来意想不到的行为。当用户点击浏览器的 F5 或刷新按钮时,他期望来自服务器的新数据。但实际上他正在从 localStorage 获取过时的数据。收到的错误报告、支持票等...正如名称 sessionStorage 所说,我只会将它用于预期在整个会话中保持一致的数据。
@MA-Maddin 正如我所说“我用它来避免使用限制”。如果你想用 F5 刷新数据,你需要使用内存而不是 sessionStorage。答案已使用这种方法进行了编辑。
是的,这可能是一个用例。我刚刚被触发,因为每个人都在谈论 Cache 并且 OP 在他的示例中有 getCustomer。 ;) 所以只是想警告一些可能看不到风险的人 :)
G
German Lashevich

您选择的实现将取决于您是否希望 unsubscribe() 取消您的 HTTP 请求。

在任何情况下,TypeScript decorators 都是标准化行为的好方法。这是我写的:

  @CacheObservableArgsKey
  getMyThing(id: string): Observable<any> {
    return this.http.get('things/'+id);
  }

装饰器定义:

/**
 * Decorator that replays and connects to the Observable returned from the function.
 * Caches the result using all arguments to form a key.
 * @param target
 * @param name
 * @param descriptor
 * @returns {PropertyDescriptor}
 */
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
  const originalFunc = descriptor.value;
  const cacheMap = new Map<string, any>();
  descriptor.value = function(this: any, ...args: any[]): any {
    const key = args.join('::');

    let returnValue = cacheMap.get(key);
    if (returnValue !== undefined) {
      console.log(`${name} cache-hit ${key}`, returnValue);
      return returnValue;
    }

    returnValue = originalFunc.apply(this, args);
    console.log(`${name} cache-miss ${key} new`, returnValue);
    if (returnValue instanceof Observable) {
      returnValue = returnValue.publishReplay(1);
      returnValue.connect();
    }
    else {
      console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
    }
    cacheMap.set(key, returnValue);
    return returnValue;
  };

  return descriptor;
}

嗨@Arlo - 上面的示例无法编译。 Property 'connect' does not exist on type '{}'. 来自第 returnValue.connect(); 行。你能详细说明吗?
O
ObjectiveTC

使用 Rxjs Observer/Observable + 缓存 + 订阅的可缓存 HTTP 响应数据

请参阅下面的代码

*免责声明:我是 rxjs 的新手,所以请记住,我可能会滥用可观察/观察者方法。我的解决方案纯粹是我找到的其他解决方案的集合,并且是未能找到一个简单的、有据可查的解决方案的结果。因此,我提供了完整的代码解决方案(正如我希望找到的那样),希望它可以帮助其他人。

*注意,这种方法松散地基于 GoogleFirebaseObservables。不幸的是,我缺乏适当的经验/时间来复制他们在幕后所做的事情。但以下是提供对某些可缓存数据的异步访问的简单方法。

情况:“产品列表”组件的任务是显示产品列表。该网站是一个单页网络应用程序,带有一些菜单按钮,可以“过滤”页面上显示的产品。

解决方案:组件“订阅”一个服务方法。 service 方法返回一个产品对象数组,组件通过订阅回调访问这些对象。服务方法将其活动包装在一个新创建的观察者中并返回观察者。在这个观察者内部,它搜索缓存的数据并将其传递回订阅者(组件)并返回。否则,它会发出一个 http 调用来检索数据,订阅响应,您可以在其中处理该数据(例如将数据映射到您自己的模型),然后将数据传回给订阅者。

编码

产品列表.component.ts

import { Component, OnInit, Input } from '@angular/core';
import { ProductService } from '../../../services/product.service';
import { Product, ProductResponse } from '../../../models/Product';

@Component({
  selector: 'app-product-list',
  templateUrl: './product-list.component.html',
  styleUrls: ['./product-list.component.scss']
})
export class ProductListComponent implements OnInit {
  products: Product[];

  constructor(
    private productService: ProductService
  ) { }

  ngOnInit() {
    console.log('product-list init...');
    this.productService.getProducts().subscribe(products => {
      console.log('product-list received updated products');
      this.products = products;
    });
  }
}

产品.service.ts

import { Injectable } from '@angular/core';
import { Http, Headers } from '@angular/http';
import { Observable, Observer } from 'rxjs';
import 'rxjs/add/operator/map';
import { Product, ProductResponse } from '../models/Product';

@Injectable()
export class ProductService {
  products: Product[];

  constructor(
    private http:Http
  ) {
    console.log('product service init.  calling http to get products...');

  }

  getProducts():Observable<Product[]>{
    //wrap getProducts around an Observable to make it async.
    let productsObservable$ = Observable.create((observer: Observer<Product[]>) => {
      //return products if it was previously fetched
      if(this.products){
        console.log('## returning existing products');
        observer.next(this.products);
        return observer.complete();

      }
      //Fetch products from REST API
      console.log('** products do not yet exist; fetching from rest api...');
      let headers = new Headers();
      this.http.get('http://localhost:3000/products/',  {headers: headers})
      .map(res => res.json()).subscribe((response:ProductResponse) => {
        console.log('productResponse: ', response);
        let productlist = Product.fromJsonList(response.products); //convert service observable to product[]
        this.products = productlist;
        observer.next(productlist);
      });
    }); 
    return productsObservable$;
  }
}

product.ts(模型)

export interface ProductResponse {
  success: boolean;
  msg: string;
  products: Product[];
}

export class Product {
  product_id: number;
  sku: string;
  product_title: string;
  ..etc...

  constructor(product_id: number,
    sku: string,
    product_title: string,
    ...etc...
  ){
    //typescript will not autoassign the formal parameters to related properties for exported classes.
    this.product_id = product_id;
    this.sku = sku;
    this.product_title = product_title;
    ...etc...
  }



  //Class method to convert products within http response to pure array of Product objects.
  //Caller: product.service:getProducts()
  static fromJsonList(products:any): Product[] {
    let mappedArray = products.map(Product.fromJson);
    return mappedArray;
  }

  //add more parameters depending on your database entries and constructor
  static fromJson({ 
      product_id,
      sku,
      product_title,
      ...etc...
  }): Product {
    return new Product(
      product_id,
      sku,
      product_title,
      ...etc...
    );
  }
}

这是我在 Chrome 中加载页面时看到的输出示例。请注意,在初始加载时,产品是从 http 获取的(调用我的节点休息服务,该服务在端口 3000 上本地运行)。然后,当我单击导航到产品的“过滤”视图时,会在缓存中找到产品。

我的 Chrome 日志(控制台):

core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
app.component.ts:19 app.component url: /products
product.service.ts:15 product service init.  calling http to get products...
product-list.component.ts:18 product-list init...
product.service.ts:29 ** products do not yet exist; fetching from rest api...
product.service.ts:33 productResponse:  {success: true, msg: "Products found", products: Array(23)}
product-list.component.ts:20 product-list received updated products

...[单击菜单按钮过滤产品]...

app.component.ts:19 app.component url: /products/chocolatechip
product-list.component.ts:18 product-list init...
product.service.ts:24 ## returning existing products
product-list.component.ts:20 product-list received updated products

结论:这是我(到目前为止)发现的实现可缓存 http 响应数据的最简单方法。在我的 Angular 应用程序中,每次我导航到产品的不同视图时,产品列表组件都会重新加载。 ProductService 似乎是一个共享实例,所以导航时会保留 ProductService 中 'products: Product[]' 的本地缓存,后续调用“GetProducts()”会返回缓存的值。最后一点,我已经阅读了有关在完成后如何关闭可观察对象/订阅以防止“内存泄漏”的评论。我没有在此处包含此内容,但请记住这一点。


注意 - 我已经找到了一个更强大的解决方案,包括 RxJS BehaviorSubjects,它简化了代码并显着减少了“开销”。在 products.service.ts, 1. import { BehaviorSubject } from 'rxjs'; 2. 将 'products:Product[]' 更改为 'product$: BehaviorSubject = new BehaviorSubject([]);' 3. 现在您可以简单地调用 http 而不返回任何内容。 http_getProducts(){this.http.get(...).map(res => res.json()).subscribe(products => this.product$.next(products))};
局部变量 'product$' 是一个 behaviorSubject,它会 EMIT 和 STORE 最新产品(来自第 3 部分中的 product$.next(..) 调用)。现在在您的组件中,照常注入服务。您可以使用 productService.product$.value 获得最近分配的 product$ 值。如果您想在 product$ 接收到新值时执行操作,或者订阅 product$(即,在第 3 部分中调用 product$.next(...) 函数)。
例如,在 products.component.ts... this.productService.product$ .takeUntil(this.ngUnsubscribe) .subscribe((products) => {this.category);让filteredProducts = this.productService.getProductsByCategory(this.category); this.products = 过滤产品; });
关于取消订阅 observables 的重要说明:“.takeUntil(this.ngUnsubscribe)”。请参阅此堆栈溢出问题/答案,它似乎显示了取消订阅事件的“事实上”推荐方式:stackoverflow.com/questions/38008334/…
如果 observable 仅用于接收数据一次,则替代方法是 .first() 或 .take(1)。在 'ngOnDestroy()' 中应取消订阅所有其他可观察对象的“无限流”,如果您不这样做,那么您最终可能会得到重复的“可观察”回调。 stackoverflow.com/questions/28007777/…
B
Burak Tasci

我认为 @ngx-cache/core 可能有助于维护 http 调用的缓存功能,尤其是当 HTTP 调用同时在 browserserver 平台上进行时。

假设我们有以下方法:

getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

您可以使用 @ngx-cache/coreCached 装饰器来存储在 cache storage 进行 HTTP 调用的方法返回的值(storage 可以配置,请检查 ng-seed/universal< /em>) - 就在第一次执行时。下次调用该方法时(无论是在 browser 还是 server 平台上),都会从 cache storage 中检索该值。

import { Cached } from '@ngx-cache/core';

...

@Cached('get-customer') // the cache key/identifier
getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

还可以通过 caching API 使用缓存方法(hasgetset)。

任何类.ts

...
import { CacheService } from '@ngx-cache/core';

@Injectable()
export class AnyClass {
  constructor(private readonly cache: CacheService) {
    // note that CacheService is injected into a private property of AnyClass
  }

  // will retrieve 'some string value'
  getSomeStringValue(): string {
    if (this.cache.has('some-string'))
      return this.cache.get('some-string');

    this.cache.set('some-string', 'some string value');
    return 'some string value';
  }
}

以下是客户端和服务器端缓存的包列表:

@ngx-cache/core:缓存实用程序

@ngx-cache/platform-browser:SPA/浏览器平台实现

@ngx-cache/platform-server:服务器平台实现

@ngx-cache/fs-storage:存储实用程序(服务器平台需要)


c
cyberpirate92

我们要做的是确保这不会导致多个网络请求。

我个人最喜欢的是使用 async 方法来发出网络请求。这些方法本身不返回值,而是更新同一服务中的 BehaviorSubject,组件将订阅该服务。

现在为什么要使用 BehaviorSubject 而不是 Observable?因为,

订阅后 BehaviorSubject 返回最后一个值,而常规 observable 仅在收到 onnext 时触发。

如果要在不可观察的代码中检索 BehaviorSubject 的最后一个值(无需订阅),可以使用 getValue() 方法。

例子:

客户服务.ts

public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);

public async getCustomers(): Promise<void> {
    let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
    if (customers) 
        this.customers$.next(customers);
}

然后,只要需要,我们就可以订阅 customers$

public ngOnInit(): void {
    this.customerService.customers$
    .subscribe((customers: Customer[]) => this.customerList = customers);
}

或者,也许您想直接在模板中使用它

<li *ngFor="let customer of customerService.customers$ | async"> ... </li>

因此,在您再次调用 getCustomers 之前,数据将保留在 customers$ BehaviorSubject 中。

那么如果你想刷新这些数据呢?只需拨打 getCustomers()

public async refresh(): Promise<void> {
    try {
      await this.customerService.getCustomers();
    } 
    catch (e) {
      // request failed, handle exception
      console.error(e);
    }
}

使用此方法,我们不必在后续网络调用之间显式保留数据,因为它是由 BehaviorSubject 处理的。

PS: 通常,当组件被销毁时,最好摆脱订阅,因为您可以使用this答案中建议的方法。


y
yfranz

您可以构建简单的类 Cacheable<> 来帮助管理从具有多个订阅者的 http 服务器检索的数据:

declare type GetDataHandler<T> = () => Observable<T>;

export class Cacheable<T> {

    protected data: T;
    protected subjectData: Subject<T>;
    protected observableData: Observable<T>;
    public getHandler: GetDataHandler<T>;

    constructor() {
      this.subjectData = new ReplaySubject(1);
      this.observableData = this.subjectData.asObservable();
    }

    public getData(): Observable<T> {
      if (!this.getHandler) {
        throw new Error("getHandler is not defined");
      }
      if (!this.data) {
        this.getHandler().map((r: T) => {
          this.data = r;
          return r;
        }).subscribe(
          result => this.subjectData.next(result),
          err => this.subjectData.error(err)
        );
      }
      return this.observableData;
    }

    public resetCache(): void {
      this.data = null;
    }

    public refresh(): void {
      this.resetCache();
      this.getData();
    }

}

用法

声明 Cacheable<> 对象(大概作为服务的一部分):

list: Cacheable<string> = new Cacheable<string>();

和处理程序:

this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}

从组件调用:

//gets data from server
List.getData().subscribe(…)

您可以订阅多个组件。

更多详细信息和代码示例在这里:http://devinstance.net/articles/20171021/rxjs-cacheable


c
candidJ

很好的答案。

或者你可以这样做:

这是来自最新版本的 rxjs。我正在使用 5.5.7 版本的 RxJS

import {share} from "rxjs/operators";

this.http.get('/someUrl').pipe(share());

A
Arlo

rxjs 5.3.0

我对.map(myFunction).publishReplay(1).refCount()不满意

对于多个订阅者,.map() 在某些情况下会执行 myFunction 两次(我希望它只执行一次)。一种修复方法似乎是 publishReplay(1).refCount().take(1)

您可以做的另一件事就是不要使用 refCount() 并立即使 Observable 变热:

let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;

无论订阅者如何,这都会启动 HTTP 请求。我不确定在 HTTP GET 完成之前取消订阅是否会取消它。


M
Matjaz Hirsman

它是 .publishReplay(1).refCount();.publishLast().refCount();,因为 Angular Http observables 在请求后完成。

这个简单的类缓存了结果,因此您可以多次订阅 .value 并且只发出 1 个请求。您还可以使用 .reload() 发出新请求并发布数据。

你可以像这样使用它:

let res = new RestResource(() => this.http.get('inline.bundleo.js'));

res.status.subscribe((loading)=>{
    console.log('STATUS=',loading);
});

res.value.subscribe((value) => {
  console.log('VALUE=', value);
});

和来源:

export class RestResource {

  static readonly LOADING: string = 'RestResource_Loading';
  static readonly ERROR: string = 'RestResource_Error';
  static readonly IDLE: string = 'RestResource_Idle';

  public value: Observable<any>;
  public status: Observable<string>;
  private loadStatus: Observer<any>;

  private reloader: Observable<any>;
  private reloadTrigger: Observer<any>;

  constructor(requestObservableFn: () => Observable<any>) {
    this.status = Observable.create((o) => {
      this.loadStatus = o;
    });

    this.reloader = Observable.create((o: Observer<any>) => {
      this.reloadTrigger = o;
    });

    this.value = this.reloader.startWith(null).switchMap(() => {
      if (this.loadStatus) {
        this.loadStatus.next(RestResource.LOADING);
      }
      return requestObservableFn()
        .map((res) => {
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.IDLE);
          }
          return res;
        }).catch((err)=>{
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.ERROR);
          }
          return Observable.of(null);
        });
    }).publishReplay(1).refCount();
  }

  reload() {
    this.reloadTrigger.next(null);
  }

}

s
surfealokesea

只需在地图之后和任何订阅之前调用 share() 。

就我而言,我有一个通用服务 (RestClientService.ts),它正在进行其余调用、提取数据、检查错误并将 observable 返回到具体实现服务(例如:ContractClientService.ts),最后是这个具体实现返回可观察到的 de ContractComponent.ts,并且这个订阅更新视图。

RestClientService.ts:

export abstract class RestClientService<T extends BaseModel> {

      public GetAll = (path: string, property: string): Observable<T[]> => {
        let fullPath = this.actionUrl + path;
        let observable = this._http.get(fullPath).map(res => this.extractData(res, property));
        observable = observable.share();  //allows multiple subscribers without making again the http request
        observable.subscribe(
          (res) => {},
          error => this.handleError2(error, "GetAll", fullPath),
          () => {}
        );
        return observable;
      }

  private extractData(res: Response, property: string) {
    ...
  }
  private handleError2(error: any, method: string, path: string) {
    ...
  }

}

合同服务.ts:

export class ContractService extends RestClientService<Contract> {
  private GET_ALL_ITEMS_REST_URI_PATH = "search";
  private GET_ALL_ITEMS_PROPERTY_PATH = "contract";
  public getAllItems(): Observable<Contract[]> {
    return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH);
  }

}

合同组件.ts:

export class ContractComponent implements OnInit {

  getAllItems() {
    this.rcService.getAllItems().subscribe((data) => {
      this.items = data;
   });
  }

}

O
Ondra Žižka

我写了一个缓存类,

/**
 * Caches results returned from given fetcher callback for given key,
 * up to maxItems results, deletes the oldest results when full (FIFO).
 */
export class StaticCache
{
    static cachedData: Map<string, any> = new Map<string, any>();
    static maxItems: number = 400;

    static get(key: string){
        return this.cachedData.get(key);
    }

    static getOrFetch(key: string, fetcher: (string) => any): any {
        let value = this.cachedData.get(key);

        if (value != null){
            console.log("Cache HIT! (fetcher)");
            return value;
        }

        console.log("Cache MISS... (fetcher)");
        value = fetcher(key);
        this.add(key, value);
        return value;
    }

    static add(key, value){
        this.cachedData.set(key, value);
        this.deleteOverflowing();
    }

    static deleteOverflowing(): void {
        if (this.cachedData.size > this.maxItems) {
            this.deleteOldest(this.cachedData.size - this.maxItems);
        }
    }

    /// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
    /// However that seems not to work. Trying with forEach.
    static deleteOldest(howMany: number): void {
        //console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
        let iterKeys = this.cachedData.keys();
        let item: IteratorResult<string>;
        while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
            //console.debug("    Deleting: " + item.value);
            this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
        }
    }

    static clear(): void {
        this.cachedData = new Map<string, any>();
    }

}

由于我们如何使用它,这一切都是静态的,但请随意将其设为普通类和服务。我不确定 Angular 是否一直保留一个实例(Angular2 的新手)。

这就是我使用它的方式:

            let httpService: Http = this.http;
            function fetcher(url: string): Observable<any> {
                console.log("    Fetching URL: " + url);
                return httpService.get(url).map((response: Response) => {
                    if (!response) return null;
                    if (typeof response.json() !== "array")
                        throw new Error("Graph REST should return an array of vertices.");
                    let items: any[] = graphService.fromJSONarray(response.json(), httpService);
                    return array ? items : items[0];
                });
            }

            // If data is a link, return a result of a service call.
            if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
            {
                // Make an HTTP call.
                let url = this.data[verticesLabel][name]["link"];
                let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
                if (!cachedObservable)
                    throw new Error("Failed loading link: " + url);
                return cachedObservable;
            }

我认为可能有一个更聪明的方法,它会使用一些 Observable 技巧,但这对我的目的来说很好。


R
Ravinder Payal

只需使用这个缓存层,它就可以满足您的所有需求,甚至可以管理 ajax 请求的缓存。

http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html

这很容易使用

@Component({
    selector: 'home',
    templateUrl: './html/home.component.html',
    styleUrls: ['./css/home.component.css'],
})
export class HomeComponent {
    constructor(AjaxService:AjaxService){
        AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;});
    }

    articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]};
}

该层(作为可注入的角度服务)是

import { Injectable }     from '@angular/core';
import { Http, Response} from '@angular/http';
import { Observable }     from 'rxjs/Observable';
import './../rxjs/operator'
@Injectable()
export class AjaxService {
    public data:Object={};
    /*
    private dataObservable:Observable<boolean>;
     */
    private dataObserver:Array<any>=[];
    private loading:Object={};
    private links:Object={};
    counter:number=-1;
    constructor (private http: Http) {
    }
    private loadPostCache(link:string){
     if(!this.loading[link]){
               this.loading[link]=true;
               this.links[link].forEach(a=>this.dataObserver[a].next(false));
               this.http.get(link)
                   .map(this.setValue)
                   .catch(this.handleError).subscribe(
                   values => {
                       this.data[link] = values;
                       delete this.loading[link];
                       this.links[link].forEach(a=>this.dataObserver[a].next(false));
                   },
                   error => {
                       delete this.loading[link];
                   }
               );
           }
    }

    private setValue(res: Response) {
        return res.json() || { };
    }

    private handleError (error: Response | any) {
        // In a real world app, we might use a remote logging infrastructure
        let errMsg: string;
        if (error instanceof Response) {
            const body = error.json() || '';
            const err = body.error || JSON.stringify(body);
            errMsg = `${error.status} - ${error.statusText || ''} ${err}`;
        } else {
            errMsg = error.message ? error.message : error.toString();
        }
        console.error(errMsg);
        return Observable.throw(errMsg);
    }

    postCache(link:string): Observable<Object>{

         return Observable.create(observer=> {
             if(this.data.hasOwnProperty(link)){
                 observer.next(this.data[link]);
             }
             else{
                 let _observable=Observable.create(_observer=>{
                     this.counter=this.counter+1;
                     this.dataObserver[this.counter]=_observer;
                     this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]);
                     _observer.next(false);
                 });
                 this.loadPostCache(link);
                 _observable.subscribe(status=>{
                     if(status){
                         observer.next(this.data[link]);
                     }
                     }
                 );
             }
            });
        }
}

C
Community

你可以简单地使用 ngx-cacheable!它更适合您的场景。

使用它的好处它只调用一次rest API,缓存响应并为后续请求返回相同的响应。创建/更新/删除操作后可以根据需要调用API。

因此,您的服务类将是这样的 -

import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';

const customerNotifier = new Subject();

@Injectable()
export class customersService {

    // relieves all its caches when any new value is emitted in the stream using notifier
    @Cacheable({
        cacheBusterObserver: customerNotifier,
        async: true
    })
    getCustomer() {
        return this.http.get('/someUrl').map(res => res.json());
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    addCustomer() {
        // some code
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    updateCustomer() {
        // some code
    }
}

Here 是更多参考链接。


Ø
Øystein Amundsen

上面的大多数答案都适用于不接受输入的 http 请求。每次您想使用某些输入进行 api 调用时,都需要重新创建请求。上面唯一可以处理此问题的响应是 @Arlo's reply

我创建了一个稍微简单的装饰器,您可以使用它来将响应共享给具有相同输入的每个调用者。与 Arlo 的回复不同,这不会重播对延迟订阅者的响应,而是将同时处理的请求作为一个请求。如果目标是重放对延迟观察者的响应(也称为缓存响应),您可以修改下面的代码并将 share() 替换为 shareReplay(1)

https://gist.github.com/OysteinAmundsen/b97a2359292463feb8c0e2270ed6695a

import { finalize, Observable, share } from 'rxjs';

export function SharedObservable(): MethodDecorator {
  const obs$ = new Map<string, Observable<any>>();
  return (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
    const originalMethod = descriptor.value;
    descriptor.value = function (...args: any[]) {
      const key = JSON.stringify(args);
      if (!obs$.has(key)) {
        // We have no observable for this key yet, so we create one
        const res = originalMethod.apply(this, args).pipe(
          share(), // Make the observable hot
          finalize(() => obs$.delete(key)) // Cleanup when observable is complete
        );
        obs$.set(key, res);
      }
      // Return the cached observable
      return obs$.get(key);
    };
    return descriptor;
  };
}

用法:

@SharedObservable()
myFunc(id: number): Observable<any> {
  return this.http.get<any>(`/api/someUrl/${id}`);
}

B
Brandon

您是否尝试过运行已有的代码?

因为您是根据 getJSON() 产生的 Promise 构造 Observable,所以网络请求是在任何人订阅之前发出的。由此产生的承诺由所有订阅者共享。

var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...

我已经编辑了问题以使其特定于 Angular 2

关注公众号,不定期副业成功案例分享
关注公众号

不定期副业成功案例分享

领先一步获取最新的外包任务吗?

立即订阅