Apollo Server 中的订阅
持久的 GraphQL 读取操作
Apollo Server 并不提供内建的订阅支持。您可以通过订阅支持,如以下所述。
本文使用graphql-ws
库为 subscriptions向 Apollo Server 订阅功能提供支持。我们不再推荐使用此前记录的subscriptions-transport-ws
,因为这个库没有得到积极的维护。有关两个库之间差异的更多信息,请参阅从 subscriptions-transport-ws
的迁移。
订阅是持续时间较长的 GraphQL读取操作操作,它可以在特定服务器端事件发生时更新其结果。最常见的是,更新后的结果将由服务器推送到订阅客户端。例如,一个聊天应用程序的服务器可能会使用订阅将新收到的消息推送到特定聊天室中的所有客户端。
由于 subscription更新通常是由服务器推送的(而不是客户端轮询),WebSocket 协议而不是 HTTP。
重要:与查询和变更相比,订阅实现起来要复杂得多。在开始之前,请确认您的用例需要订阅。
模式定义
您的模式Subscription
类型定义了客户端可以订阅的顶级 fields:
type Subscription {postCreated: Post}
每当在后台创建新的 Post
数据时,会更新其值,从而将 Post
发送到订阅的客户端。
客户端可以通过以下GraphQL字符串订阅到 postCreated
字段:
subscription PostFeed {postCreated {authorcomment}}
每个 subscription 操作只能订阅 一个 Subscription 类型中的字段。
启用订阅
由于 Apollo Server 4 的 不 支持通过 Apollo Server 4的 startStandaloneServer
函数。要启用 subscriptions,您必须首先 切换到使用 expressMiddleware
函数
以下步骤假定您已经切换到 expressMiddleware
。
为了同时运行一个Express应用 以及一个用于订阅的独立的WebSocket服务器,我们需要创建一个 http.Server
实例,它将这两个应用 wrap 在一起并成为我们的新 listener
。
安装
graphql-ws
,ws
, 和@graphql-tools/schema
:npm install graphql-ws ws @graphql-tools/schema将以下导入添加到初始化你的
ApolloServer
实例的文件中(我们将在后续步骤中使用这些导入):index.tsimport { createServer } from 'http';import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';import { makeExecutableSchema } from '@graphql-tools/schema';import { WebSocketServer } from 'ws';import { useServer } from 'graphql-ws/lib/use/ws';接下来,为了设置HTTP和 subscription 服务器,我们首先需要创建一个
http.Server
实例。通过将你的Expressapp
传递给从http
模块导入的createServer
函数来这样做:index.ts// This `app` is the returned value from `express()`.const httpServer = createServer(app);(如果尚未创建)创建一个
GraphQLSchema
实例。如果你已经将
schema
选项传递到ApolloServer
构造函数(而不是typeDefs
和resolvers
),你就可以跳过这一步骤。(我们将在下一步中实例化) subscription 服务器不接收
typeDefs
和resolvers
选项。相反,它接收一个可执行的GraphQLSchema
。我们可以将此schema
对象传递给 subscription 服务器和ApolloServer
。这样,我们确保两个地方都在使用相同的模式。index.tsconst schema = makeExecutableSchema({ typeDefs, resolvers });// ...const server = new ApolloServer({schema,});创建一个
WebSocketServer
作为你的 subscription 服务器。index.ts// Creating the WebSocket serverconst wsServer = new WebSocketServer({// This is the `httpServer` we created in a previous step.server: httpServer,// Pass a different path here if app.use// serves expressMiddleware at a different pathpath: '/subscriptions',});// Hand in the schema we just created and have the// WebSocketServer start listening.const serverCleanup = useServer({ schema }, wsServer);将 plugins 添加到你的
ApolloServer
构造函数以关闭HTTP服务器和WebSocketServer
:index.tsconst server = new ApolloServer({schema,plugins: [// Proper shutdown for the HTTP server.ApolloServerPluginDrainHttpServer({ httpServer }),// Proper shutdown for the WebSocket server.{async serverWillStart() {return {async drainServer() {await serverCleanup.dispose();},};},},],});最后,请确认您正在
listen
监听您的httpServer
。大多数 Express 应用程序调用
app.listen(...)
,但对于您的配置,请将其更改为使用相同的参数httpServer.listen(...)
。这样,服务器将同时监听 HTTP 和 WebSocket 传输。
以下是一个设置 订阅 的完整示例:
⚠️ 遇到错误吗? 如果您正在使用 graphql-ws
库,您的指定 订阅 协议必须在您的后端、前端以及 您使用的每个其他工具(包括 Apollo 测试环境)中保持一致。有关更多信息,请参阅 从 subscriptions-transport-ws
切换
解决订阅问题
解析器 对于 Subscription
字段 与其他类型的字段解析器不同。具体来说,Subscription
字段 解析器是 对象,定义了一个 subscribe
函数:
const resolvers = {Subscription: {hello: {// Example using an async generatorsubscribe: async function* () {for await (const word of ['Hello', 'Bonjour', 'Ciao']) {yield { hello: word };}},},postCreated: {// More on pubsub belowsubscribe: () => pubsub.asyncIterator(['POST_CREATED']),},},// ...other resolvers...};
该 subscribe
函数 必须 返回一个类型为 AsyncIterator
的对象,这是一个迭代异步结果的标准接口。在上面的 postCreated.subscribe
字段中,一个 AsyncIterator
由 pubsub.asyncIterator
(关于这一点将在下面说明)生成。
PubSub
类
The PubSub
类不建议在生产环境中使用,因为它是一个只支持单个服务器实例的内存事件系统。在开发环境中使subscriptions正常运行后,我们强烈建议将其更换为抽象类PubSubEngine
类的其它子类。推荐使用的子类在生产环境 PubSub 库中列出。
您可以使用发布-订阅(pub/sub)模型来跟踪更新活动subscriptions的事件。提供了graphql-subscriptions
库,它提供了PubSub
类作为基本内存事件总线,帮助您开始:
要使用graphql-subscriptions
包,首先按照以下方式安装它:
npm install graphql-subscriptions
一个PubSub
实例允许您的服务器代码既可以将事件发布到特定的标签,也可以监听与特定标签相关的事件。我们可以这样创建一个PubSub
实例:
import { PubSub } from 'graphql-subscriptions';const pubsub = new PubSub();
发布事件
您可以使用PubSub
实例的publish
方法来发布事件:
pubsub.publish('POST_CREATED', {postCreated: {author: 'Ali Baba',comment: 'Open sesame',},});
- 第一个参数是要发布的标签名称,作为一个字符串。
- 在发布到它之前,不需要注册标签名称。
- 第二个参数是与事件相关的有效载荷。
- 有效载荷应包括您的解析器填充相关
Subscription
字段及其子字段所需的所有数据。
- 有效载荷应包括您的解析器填充相关
在使用GraphQL订阅时,每当订阅的返回值应该更新时,您可以发布一个事件。这种情况的一个常见原因是mutation,但任何后端逻辑导致的变化都应该发布。
例如,假设我们的GraphQL API支持一个GraphQLcreatePost
mutation:
type Mutation {createPost(author: String, comment: String): Post}
一个基本的resolver为createPost
可能如下所示:
const resolvers = {Mutation: {createPost(parent, args, { postController }) {// Datastore logic lives in postControllerreturn postController.createPost(args);},},// ...other resolvers...};
在我们将新的帖子细节持久化到我们的数据存储之前,我们可以发布
一个事件,该事件也包含了这些细节:
const resolvers = {Mutation: {createPost(parent, args, { postController }) {pubsub.publish('POST_CREATED', { postCreated: args });return postController.createPost(args);},},// ...other resolvers...};
然后,我们可以在Subscription
字段 resolver中监听此事件。
监听事件
一个 AsyncIterator
对象监听与特定标签(或一组标签)相关的事件,并将它们添加到队列进行处理。
您可以通过调用 PubSub
的 asyncIterator
方法并传递一个包含该 AsyncIterator
应该监听的事件标签名称的数组来创建一个 AsyncIterator
。
pubsub.asyncIterator(['POST_CREATED']);
每个 Subscription
字段解析器 的 subscribe 函数必须返回一个 AsyncIterator
对象。
这又把我们带回了文档顶部的代码示例 处理订阅:
const resolvers = {Subscription: {postCreated: {subscribe: () => pubsub.asyncIterator(['POST_CREATED']),},},// ...other resolvers...};
在这个 subscribe
函数设置下,Apollo Server 使用 POST_CREATED
事件的负载来推送 postCreated
字段的更新值。
事件过滤
有时,客户端应该只接收符合某些条件的数据的更新订阅。为了支持这一点,您可以在您的 Subscription
字段解析器中调用 withFilter
辅助函数。
示例
比如,我们的服务器提供一个 commentAdded
订阅,每当指定代码仓库中添加评论时,就通知客户端。客户端可以执行看起来像这样的订阅:
subscription ($repoName: String!) {commentAdded(repoFullName: $repoName) {idcontent}}
这提出了一个问题:我们的服务器可能在 发布COMMENT_ADDED
事件时,无论添加到哪个仓库,都发布。这意味着任何新的评论都会执行 commentAdded
解析器。结果,订阅客户端可能会接收到他们不想要(或者甚至没有访问权限)的数据。
为了解决这个问题,我们可以使用 withFilter
辅助函数来按客户端控制更新。
以下是一个使用 withFilter
函数的 commentAdded
解析器示例:
import { withFilter } from 'graphql-subscriptions';const resolvers = {Subscription: {commentAdded: {subscribe: withFilter(() => pubsub.asyncIterator('COMMENT_ADDED'),(payload, variables) => {// Only push an update if the comment is on// the correct repository for this operationreturn (payload.commentAdded.repository_name === variables.repoFullName);},),},},// ...other resolvers...};
withFilter
函数接受两个参数:
- 第一个参数是在不应用过滤器的情况下,你会用于
subscribe
的相同函数。 - 第二个参数是一个 过滤器函数,当订阅更新应该发送给特定客户端时返回
true
,否则返回false
(也允许返回Promise<boolean>
)。payload
是已发布事件的负载。variables
是一个对象,包含客户端在初始化订阅时提供的所有 参数。
使用 withFilter
确保客户端获得他们想要(并且允许接收)的精确订阅更新。
基本可运行示例
一个示例服务器可以在 GitHub 和 CodeSandbox:
该服务器公开一个 subscription(numberIncremented
),服务器每秒增加一个整数。以下是一个可以针对您的服务器运行的示例 subscription:
subscription IncrementingNumber {numberIncremented}
如果您没有看到您订阅操作的结果,您可能需要 调整沙盒设置以使用 graphql-ws
协议。
在 CodeSandbox 中启动服务器后,按照浏览器中的说明测试在 numberIncremented
subscription 中执行 Apollo 沙盒。您应该看到订阅值每秒更新。
操作上下文
当为 查询 或 mutation 初始化上下文时,通常从传递给 context
函数的 req
对象中提取 HTTP 头部和其他请求元数据。
对于订阅,您可以通过向传递给 options 函数的第一个 argument 添加信息。
例如,您可以为 context
属性 绑定到您的 GraphQL operation 以添加值到您的 contextValue
:
// ...useServer({// Our GraphQL schema.schema,// Adding a context property lets you add data to your GraphQL operation contextValuecontext: async (ctx, msg, args) => {// You can define your own function for setting a dynamic context// or provide a static valuereturn getDynamicContext(ctx, msg, args);},},wsServer,);
请注意,上面传入到 context
函数的第一个参数是 ctx
. 这个 ctx 对象
代表了你的 订阅服务器的上下文,而不是传递到你的解析器中的 GraphQL 操作 contextValue
。
你可以通过 ctx.connectionParams
属性访问客户端对你的 WebSocket 服务器的 subscription
请求的参数。
下面是一个常见的用例,从客户端的 subscription
请求中提取身份验证令牌,并使用它来查找当前用户:
const getDynamicContext = async (ctx, msg, args) => {// ctx is the graphql-ws Context where connectionParams liveif (ctx.connectionParams.authentication) {const currentUser = await findUser(ctx.connectionParams.authentication);return { currentUser };}// Otherwise let our resolvers know we don't have a current userreturn { currentUser: null };};useServer({schema,context: async (ctx, msg, args) => {// Returning an object will add that information to// contextValue, which all of our resolvers have access to.return getDynamicContext(ctx, msg, args);},},wsServer,);
将所有这些放在一起,useServer.context
函数返回一个 对象,即 contextValue
,它可以在你的解析器中使用。
请注意,context
选项在每个 subscription
请求上只调用一次,而不是在每个事件发出时调用一次。这意味着在上述示例中,每次客户端发送一个 subscription
操作时,我们都会检查他们的身份验证令牌。
onConnect
和 onDisconnect
你可以配置 subscription 服务器在客户端连接(onConnect
)或断开连接(onDisconnect
)时的行为。
定义一个 onConnect
函数,你可以通过返回 false
或抛出异常来拒绝特定的传入连接。这在你想要在客户端首次连接到你的 subscription 服务器时检查身份验证时非常有用。
你将这些函数作为选项传递给 useServer
的第一个参数,如下所示:
useServer({schema,// As before, ctx is the graphql-ws Context where connectionParams live.onConnect: async (ctx) => {// Check authentication every time a client connects.if (tokenIsNotValid(ctx.connectionParams)) {// You can return false to close the connection or throw an explicit errorthrow new Error('Auth token missing!');}},onDisconnect(ctx, code, reason) {console.log('Disconnected!');},},wsServer,);
有关使用 onConnect
和 onDisconnect
的更多示例,请参阅 graphql-ws
菜单文档。
示例:与 Apollo Client 进行身份验证
如果你计划使用 subscriptions 与 Apollo Client,请确保你的客户端和服务器订阅协议与你要使用的订阅库(此示例使用 graphql-ws 库)一致。
在 Apollo 客户端 中,GraphQLWsLink
构造函数支持向 connectionParams
(示例)添加信息。这些 connectionParams
在连接时发送到您的服务器,允许您通过访问 Context.connectionParams
从客户端请求中提取信息。
假设我们创建我们的 subscription 客户端如下所示:
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';import { createClient } from 'graphql-ws';const wsLink = new GraphQLWsLink(createClient({url: 'ws://127.0.0.1:4000/subscriptions',connectionParams: {authentication: user.authToken,},}),);
包含客户端提供信息的 connectionParams
参数()被传递到您的服务器,使您能够通过验证用户凭据。
从那里您可以使用 useServer.context
属性 验证用户,返回一个对象,在执行过程中将该对象作为 context
参数传递给您的 resolvers。
在示例中,我们可以使用客户端提供的 connectionParams.authentication
值查找相关用户,然后再将用户传递给我们的 resolvers:
const findUser = async (authToken) => {// Find a user by their auth token};const getDynamicContext = async (ctx, msg, args) => {if (ctx.connectionParams.authentication) {const currentUser = await findUser(ctx.connectionParams.authentication);return { currentUser };}// Let the resolvers know we don't have a current user so they can// throw the appropriate errorreturn { currentUser: null };};// ...useServer({// Our GraphQL schema.schema,context: async (ctx, msg, args) => {// This will be run every time the client sends a subscription requestreturn getDynamicContext(ctx, msg, args);},},wsServer,);
总的来说,上面示例中基于每个 subscription 请求的认证令牌查找用户,在将用户对象返回给我们的 resolvers 使用之前。如果不存在用户或查找失败,我们的 resolvers 可以抛出错误,并且相应的 subscription
操作不会执行。
生产 PubSub
库
如上所述,PubSub
类不建议用于生产环境,因为其事件发布系统是内存中的。这意味着 一个 实例发布的 GraphQL服务器 的事件不会被由 其他 实例处理的 subscriptions 收到。
相反,您应该使用可以将外部数据存储(例如 Redis 或 Kafka)作为后端的 PubSubEngine
抽象类 的子类。
以下是一些社区创建的针对流行事件发布系统的 PubSub
库:
- Solace
- Redis
- Google PubSub
- MQTT支持代理
- RabbitMQ
- AMQP (RabbitMQ)
- Kafka
- Postgres
- Postgres与Typescript兼容
- Google Cloud Firestore
- Ably 实时服务
- Google Firebase 实时数据库
- Azure SignalR 服务
- Azure 服务总线
- MongoDB
如果没有这些库符合您的用途,您也可以创建自己的PubSubEngine
子类。如果您创建了一个新的开源库,请点击GitHub上的编辑让我们知道!
从subscriptions-transport-ws
切换
如果您使用 subscriptions 与 Apollo 客户端,您必须确保 您使用的客户端和服务器的订阅协议保持一致性。
本文先前演示了使用 subscriptions-transport-ws
库来设置订阅。但是,此库现已经不再活跃维护。您仍然可以与 Apollo Server 使用它,但我们强烈建议使用 graphql-ws
取代。
有关如何从 subscriptions-transport-ws
转到 graphql-ws
的详细信息,请参阅 Apollo Server 3 文档 中的步骤。
更新订阅客户端
如果您打算 将 subscriptions-transport-ws
切换到 graphql-ws
,则需要更新以下客户端:
客户端名称 | 使用 graphql-ws (推荐) | 使用 subscriptions-transport-ws |
---|---|---|
使用 | ||
| ||
|