メインコンテンツにスキップ

ストリーミング更新

概要

RTK Queryは、永続的なクエリに対してストリーミング更新を受信する機能を提供します。これにより、クエリはサーバーへの継続的な接続を確立し(通常はWebSocketを使用)、サーバーから追加情報を受信するたびにキャッシュされたデータに更新を適用できます。

ストリーミング更新は、APIがバックエンドデータへのリアルタイム更新(新しいエントリの作成や、重要なプロパティの更新など)を受信できるようにするために使用できます。

クエリのストリーミング更新を有効にするには、非同期のonCacheEntryAdded関数をクエリに渡し、ストリーミングデータを受信したときにクエリを更新する方法のロジックを含めます。詳細については、onCacheEntryAdded APIリファレンスを参照してください。

ストリーミング更新を使用する場合

主に、クエリデータの更新は、一定の間隔でポーリングを断続的に行うか、クエリとミューテーションに関連付けられたタグに基づいてデータを無効化するキャッシュの無効化、またはデータを使用するコンポーネントがマウントされたときに新しいデータをフェッチするrefetchOnMountOrArgChangeを使用して行う必要があります。

ただし、ストリーミング更新は、以下のようなシナリオで特に役立ちます。

  • 大きなオブジェクトに対する、小さな頻繁な変更。大きなオブジェクトを繰り返しポーリングするのではなく、最初のクエリでオブジェクトをフェッチし、ストリーミング更新で更新を受信するたびに個々のプロパティを更新できます。
  • 外部イベント駆動の更新。サーバーまたはその他の外部ユーザーによってデータが変更され、アクティブなユーザーにリアルタイムの更新を表示する必要がある場合、ポーリングだけではクエリの間でデータが古くなる期間が発生し、状態が同期しなくなる可能性があります。ストリーミング更新は、次の間隔が経過するのを待つのではなく、更新が発生するとすべてのアクティブなクライアントを更新できます。

ストリーミング更新の恩恵を受ける使用例は次のとおりです。

  • GraphQLサブスクリプション
  • リアルタイムチャットアプリケーション
  • リアルタイムマルチプレイヤーゲーム
  • 複数の同時ユーザーによる共同ドキュメント編集

onCacheEntryAddedライフサイクルの使用

onCacheEntryAddedライフサイクルコールバックを使用すると、新しいキャッシュエントリがRTK Queryキャッシュに追加された後(つまり、コンポーネントが特定のendpoint+paramsの組み合わせに対して新しいサブスクリプションを作成した後)に実行される任意の非同期ロジックを記述できます。

onCacheEntryAddedは、サブスクリプションに渡されたargと、「ライフサイクルプロミス」とユーティリティ関数を含むオプションオブジェクトの2つの引数で呼び出されます。これらを使用して、データが追加されるのを待機し、サーバー接続を開始し、部分的な更新を適用し、クエリサブスクリプションが削除されたときに接続をクリーンアップするシーケンス化されたロジックを記述できます。

通常、最初のデータがフェッチされたタイミングを判断するためにawait cacheDataLoadedを使用し、メッセージを受信する際にストリーミング更新を適用するためにupdateCacheDataユーティリティを使用します。updateCacheDataは、現在のキャッシュ値のdraftを受け取るImmer駆動のコールバックです。受信した値に基づいて必要に応じてドラフト値を「変更」できます。RTK Queryは、これらの変更に基づいて差分パッチを適用するアクションをディスパッチします。

最後に、サーバー接続をクリーンアップするタイミングを知るためにawait cacheEntryRemovedを使用できます。

ストリーミング更新の例

WebsocketチャットAPI

import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
id: number
channel: Channel
userName: string
text: string
}

export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<Message[], Channel>({
query: (channel) => `messages/${channel}`,
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
// create a websocket connection when the cache subscription starts
const ws = new WebSocket('ws://localhost:8080')
try {
// wait for the initial query to resolve before proceeding
await cacheDataLoaded

// when data is received from the socket connection to the server,
// if it is a message and for the appropriate channel,
// update our query result with the received message
const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return

updateCachedData((draft) => {
draft.push(data)
})
}

ws.addEventListener('message', listener)
} catch {
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
// in which case `cacheDataLoaded` will throw
}
// cacheEntryRemoved will resolve when the cache subscription is no longer active
await cacheEntryRemoved
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
ws.close()
},
}),
}),
})

export const { useGetMessagesQuery } = api

期待されること

getMessagesクエリがトリガーされると(たとえば、useGetMessagesQuery()フックでコンポーネントがマウントされる場合)、エンドポイントのシリアライズされた引数に基づいてキャッシュエントリが追加されます。関連付けられたクエリは、キャッシュの初期データをフェッチするためにqueryプロパティに基づいて起動されます。一方、非同期のonCacheEntryAddedコールバックが開始され、新しいWebSocket接続が作成されます。最初のクエリの応答を受信すると、キャッシュは応答データで設定され、cacheDataLoadedプロミスが解決されます。cacheDataLoadedプロミスを待機した後、WebSocket接続にmessageイベントリスナーが追加され、関連付けられたメッセージを受信するとキャッシュデータが更新されます。

データへのアクティブなサブスクリプションがなくなった場合(たとえば、サブスクライブされたコンポーネントが十分な時間アンマウントされたままの場合)、cacheEntryRemovedプロミスが解決され、残りのコードを実行してwebsocket接続を閉じることができます。RTK Queryは、キャッシュから関連付けられたデータも削除します。

後で対応するキャッシュエントリのクエリが実行されると、キャッシュエントリ全体が上書きされ、ストリーミング更新リスナーは更新されたデータに対して引き続き機能します。

変換された応答形状を使用したWebsocketチャットAPI

import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react'
import { createEntityAdapter } from '@reduxjs/toolkit'
import type { EntityState } from '@reduxjs/toolkit'
import { isMessage } from './schemaValidators'

export type Channel = 'redux' | 'general'

export interface Message {
id: number
channel: Channel
userName: string
text: string
}

const messagesAdapter = createEntityAdapter<Message>()
export const api = createApi({
baseQuery: fetchBaseQuery({ baseUrl: '/' }),
endpoints: (build) => ({
getMessages: build.query<EntityState<Message, number>, Channel>({
query: (channel) => `messages/${channel}`,
transformResponse(response: Message[]) {
return messagesAdapter.addMany(
messagesAdapter.getInitialState(),
response
)
},
async onCacheEntryAdded(
arg,
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
) {
const ws = new WebSocket('ws://localhost:8080')
try {
await cacheDataLoaded

const listener = (event: MessageEvent) => {
const data = JSON.parse(event.data)
if (!isMessage(data) || data.channel !== arg) return

updateCachedData((draft) => {
messagesAdapter.upsertOne(draft, data)
})
}

ws.addEventListener('message', listener)
} catch {}
await cacheEntryRemoved
ws.close()
},
}),
}),
})

export const { useGetMessagesQuery } = api

この例では、前の例を、キャッシュにデータを追加するときに応答形状を変換できるように変更する方法を示します。

たとえば、データは次の形状から変換されます。

[
{
id: 0
channel: 'redux'
userName: 'Mark'
text: 'Welcome to #redux!'
},
{
id: 1
channel: 'redux'
userName: 'Lenz'
text: 'Glad to be here!'
},
]

以下のように変換されます。

{
// The unique IDs of each item. Must be strings or numbers
ids: [0, 1],
// A lookup table mapping entity IDs to the corresponding entity objects
entities: {
0: {
id: 0,
channel: "redux",
userName: "Mark",
text: "Welcome to #redux!",
},
1: {
id: 1,
channel: "redux",
userName: "Lenz",
text: "Glad to be here!",
},
},
};

留意すべき重要な点は、onCacheEntryAddedコールバック内のキャッシュされたデータへの更新は、キャッシュされたデータに存在する変換されたデータ形状を尊重する必要があることです。この例では、createEntityAdapterを最初のtransformResponseに使用し、正規化された状態構造を維持しながら、ストリーミング更新を受信したときに受信したアイテムをキャッシュデータにupsertするために使用する方法を示します。