与我们在 10 月 8-10 日于纽约市相聚,了解 GraphQL 联邦和 API 平台工程的最新技巧、趋势和新闻。与我们相约 GraphQL Summit 2024 纽约站
文档
免费开始

Apollo Server 中的订阅

持久的 GraphQL 读取操作


Apollo Server 并不提供内建的订阅支持。您可以通过支持,如以下所述

本文使用graphql-ws库为 subscriptions Apollo Server 功能提供支持。我们不再推荐使用此前记录的subscriptions-transport-ws,因为这个库没有得到积极的维护。有关两个库之间差异的更多信息,请参阅subscriptions-transport-ws的迁移。

订阅是持续时间较长的 GraphQL读取操作,它可以在特定服务器端事件发生时更新其结果。最常见的是,更新后的结果将由服务器将新收到的消息推送到特定聊天室中的所有客户端。

由于 subscription更新通常是由服务器推送的(而不是客户端轮询),WebSocket 协议而不是 HTTP。

重要:与查询和相比,订阅实现起来要复杂得多。在开始之前,请确认您的用例需要订阅

模式定义

您的模式Subscription类型定义了客户端可以订阅的顶级 fields

type Subscription {
postCreated: Post
}

每当在后台创建新的 Post 数据时,会更新其值,从而将 Post 发送到订阅的客户端。

客户端可以通过以下GraphQL字符串订阅到 postCreated 字段:

subscription PostFeed {
postCreated {
author
comment
}
}

每个 subscription 操作只能订阅 一个 Subscription 类型中的字段。

启用订阅

由于 Apollo Server 4 的 支持通过 Apollo Server 4的 startStandaloneServer 函数。要启用 subscriptions,您必须首先 切换到使用 expressMiddleware 函数

以下步骤假定您已经切换到 expressMiddleware

为了同时运行一个Express应用 以及一个用于订阅的独立的WebSocket服务器,我们需要创建一个 http.Server 实例,它将这两个应用 wrap 在一起并成为我们的新 listener

  1. 安装 graphql-ws, ws, 和 @graphql-tools/schema:

    npm install graphql-ws ws @graphql-tools/schema
  2. 将以下导入添加到初始化你的 ApolloServer 实例的文件中(我们将在后续步骤中使用这些导入):

    index.ts
    import { createServer } from 'http';
    import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
    import { makeExecutableSchema } from '@graphql-tools/schema';
    import { WebSocketServer } from 'ws';
    import { useServer } from 'graphql-ws/lib/use/ws';
  3. 接下来,为了设置HTTP和 subscription 服务器,我们首先需要创建一个 http.Server 实例。通过将你的Express app 传递给从 http 模块导入的 createServer 函数来这样做:

    index.ts
    // This `app` is the returned value from `express()`.
    const httpServer = createServer(app);
  4. (如果尚未创建)创建一个 GraphQLSchema 实例。

    如果你已经将 schema 选项传递到 ApolloServer 构造函数(而不是 typeDefsresolvers),你就可以跳过这一步骤。

    (我们将在下一步中实例化) subscription 服务器不接收 typeDefsresolvers 选项。相反,它接收一个可执行的 GraphQLSchema。我们可以将此 schema 对象传递给 subscription 服务器和 ApolloServer。这样,我们确保两个地方都在使用相同的模式。

    index.ts
    const schema = makeExecutableSchema({ typeDefs, resolvers });
    // ...
    const server = new ApolloServer({
    schema,
    });
  5. 创建一个 WebSocketServer 作为你的 subscription 服务器。

    index.ts
    // Creating the WebSocket server
    const wsServer = new WebSocketServer({
    // This is the `httpServer` we created in a previous step.
    server: httpServer,
    // Pass a different path here if app.use
    // serves expressMiddleware at a different path
    path: '/subscriptions',
    });
    // Hand in the schema we just created and have the
    // WebSocketServer start listening.
    const serverCleanup = useServer({ schema }, wsServer);
  6. plugins 添加到你的 ApolloServer 构造函数以关闭HTTP服务器和 WebSocketServer

    index.ts
    const server = new ApolloServer({
    schema,
    plugins: [
    // Proper shutdown for the HTTP server.
    ApolloServerPluginDrainHttpServer({ httpServer }),
    // Proper shutdown for the WebSocket server.
    {
    async serverWillStart() {
    return {
    async drainServer() {
    await serverCleanup.dispose();
    },
    };
    },
    },
    ],
    });
  7. 最后,请确认您正在 listen 监听您的 httpServer

    大多数 Express 应用程序调用 app.listen(...),但对于您的配置,请将其更改为使用相同的参数 httpServer.listen(...)。这样,服务器将同时监听 HTTP 和 WebSocket 传输。

以下是一个设置 订阅 的完整示例:

⚠️ 遇到错误吗? 如果您正在使用 graphql-ws 库,您的指定 订阅 协议必须在您的后端、前端以及 您使用的每个其他工具(包括 Apollo 测试环境)中保持一致。有关更多信息,请参阅 subscriptions-transport-ws 切换

解决订阅问题

对于 Subscription 字段 与其他类型的字段解析器不同。具体来说,Subscription 字段 解析器是 对象,定义了一个 subscribe 函数:

index.ts
const resolvers = {
Subscription: {
hello: {
// Example using an async generator
subscribe: async function* () {
for await (const word of ['Hello', 'Bonjour', 'Ciao']) {
yield { hello: word };
}
},
},
postCreated: {
// More on pubsub below
subscribe: () => pubsub.asyncIterator(['POST_CREATED']),
},
},
// ...other resolvers...
};

subscribe 函数 必须 返回一个类型为 AsyncIterator 的对象,这是一个迭代异步结果的标准接口。在上面的 postCreated.subscribe 字段中,一个 AsyncIteratorpubsub.asyncIterator(关于这一点将在下面说明)生成。

PubSub

The PubSub类不建议在生产环境中使用,因为它是一个只支持单个服务器实例的内存事件系统。在开发环境中使subscriptions正常运行后,我们强烈建议将其更换为抽象类PubSubEngine的其它子类。推荐使用的子类在生产环境 PubSub 库中列出。

您可以使用发布-订阅pub/sub)模型来跟踪更新活动subscriptions的事件。提供了graphql-subscriptions,它提供了PubSub类作为基本内存事件总线,帮助您开始:

要使用graphql-subscriptions包,首先按照以下方式安装它:

npm install graphql-subscriptions

一个PubSub实例允许您的服务器代码既可以将事件发布到特定的标签,也可以监听与特定标签相关的事件。我们可以这样创建一个PubSub实例:

import { PubSub } from 'graphql-subscriptions';
const pubsub = new PubSub();

发布事件

您可以使用PubSub实例的publish方法来发布事件:

pubsub.publish('POST_CREATED', {
postCreated: {
author: 'Ali Baba',
comment: 'Open sesame',
},
});
  • 第一个参数是要发布的标签名称,作为一个字符串。
    • 在发布到它之前,不需要注册标签名称。
  • 第二个参数是与事件相关的有效载荷。
    • 有效载荷应包括您的解析器填充相关Subscription字段及其子字段所需的所有数据。

在使用GraphQL订阅时,每当订阅的返回值应该更新时,您可以发布一个事件。这种情况的一个常见原因是,但任何后端逻辑导致的变化都应该发布

例如,假设我们的GraphQL API支持一个GraphQLcreatePost mutation:

type Mutation {
createPost(author: String, comment: String): Post
}

一个基本的createPost可能如下所示:

const resolvers = {
Mutation: {
createPost(parent, args, { postController }) {
// Datastore logic lives in postController
return postController.createPost(args);
},
},
// ...other resolvers...
};

在我们将新的帖子细节持久化到我们的数据存储之前,我们可以发布一个事件,该事件也包含了这些细节:

const resolvers = {
Mutation: {
createPost(parent, args, { postController }) {
pubsub.publish('POST_CREATED', { postCreated: args });
return postController.createPost(args);
},
},
// ...other resolvers...
};

然后,我们可以在Subscription字段 resolver中监听此事件。

监听事件

一个 AsyncIterator 对象监听与特定标签(或一组标签)相关的事件,并将它们添加到队列进行处理。

您可以通过调用 PubSubasyncIterator 方法并传递一个包含该 AsyncIterator 应该监听的事件标签名称的数组来创建一个 AsyncIterator

pubsub.asyncIterator(['POST_CREATED']);

每个 Subscription 字段解析器 subscribe 函数必须返回一个 AsyncIterator 对象。

这又把我们带回了文档顶部的代码示例 处理订阅

index.ts
const resolvers = {
Subscription: {
postCreated: {
subscribe: () => pubsub.asyncIterator(['POST_CREATED']),
},
},
// ...other resolvers...
};

在这个 subscribe 函数设置下,Apollo Server 使用 POST_CREATED 事件的负载来推送 postCreated 字段的更新值。

事件过滤

有时,客户端应该只接收符合某些条件的数据的更新订阅。为了支持这一点,您可以在您的 Subscription 字段解析器中调用 withFilter 辅助函数。

示例

比如,我们的服务器提供一个 commentAdded 订阅,每当指定代码仓库中添加评论时,就通知客户端。客户端可以执行看起来像这样的订阅:

subscription ($repoName: String!) {
commentAdded(repoFullName: $repoName) {
id
content
}
}

这提出了一个问题:我们的服务器可能在 发布COMMENT_ADDED 事件时,无论添加到哪个仓库,都发布。这意味着任何新的评论都会执行 commentAdded 解析器。结果,订阅客户端可能会接收到他们不想要(或者甚至没有访问权限)的数据。

为了解决这个问题,我们可以使用 withFilter 辅助函数来按客户端控制更新。

以下是一个使用 withFilter 函数的 commentAdded 解析器示例:

import { withFilter } from 'graphql-subscriptions';
const resolvers = {
Subscription: {
commentAdded: {
subscribe: withFilter(
() => pubsub.asyncIterator('COMMENT_ADDED'),
(payload, variables) => {
// Only push an update if the comment is on
// the correct repository for this operation
return (
payload.commentAdded.repository_name === variables.repoFullName
);
},
),
},
},
// ...other resolvers...
};

withFilter 函数接受两个参数:

  • 第一个参数是在不应用过滤器的情况下,你会用于 subscribe 的相同函数。
  • 第二个参数是一个 过滤器函数,当订阅更新应该发送给特定客户端时返回 true,否则返回 false(也允许返回 Promise<boolean>)。
    • payload 是已发布事件的负载。
    • variables 是一个对象,包含客户端在初始化订阅时提供的所有 参数

使用 withFilter 确保客户端获得他们想要(并且允许接收)的精确订阅更新。

基本可运行示例

一个示例服务器可以在 GitHub 和 CodeSandbox:

Edit server-subscriptions-as4

该服务器公开一个 subscriptionnumberIncremented),服务器每秒增加一个整数。以下是一个可以针对您的服务器运行的示例 subscription

subscription IncrementingNumber {
numberIncremented
}

如果您没有看到您订阅操作的结果,您可能需要 调整沙盒设置以使用 graphql-ws 协议

在 CodeSandbox 中启动服务器后,按照浏览器中的说明测试在 numberIncremented subscription 中执行 。您应该看到订阅值每秒更新。

操作上下文

当为 查询mutation 初始化上下文时,通常从传递给 context 函数的 req 对象中提取 HTTP 头部和其他请求元数据。

对于订阅,您可以通过向传递给 options 函数的第一个 添加信息。

例如,您可以为 context 属性 绑定到您的 GraphQL operation 以添加值到您的 contextValue

// ...
useServer(
{
// Our GraphQL schema.
schema,
// Adding a context property lets you add data to your GraphQL operation contextValue
context: async (ctx, msg, args) => {
// You can define your own function for setting a dynamic context
// or provide a static value
return getDynamicContext(ctx, msg, args);
},
},
wsServer,
);

请注意,上面传入到 context 函数的第一个参数是 ctx. 这个 ctx 对象代表了你的 订阅服务器的上下文,而不是传递到你的解析器中的 GraphQL 操作 contextValue

你可以通过 ctx.connectionParams 属性访问客户端对你的 WebSocket 服务器的 subscription 请求的参数。

下面是一个常见的用例,从客户端的 subscription 请求中提取身份验证令牌,并使用它来查找当前用户:

const getDynamicContext = async (ctx, msg, args) => {
// ctx is the graphql-ws Context where connectionParams live
if (ctx.connectionParams.authentication) {
const currentUser = await findUser(ctx.connectionParams.authentication);
return { currentUser };
}
// Otherwise let our resolvers know we don't have a current user
return { currentUser: null };
};
useServer(
{
schema,
context: async (ctx, msg, args) => {
// Returning an object will add that information to
// contextValue, which all of our resolvers have access to.
return getDynamicContext(ctx, msg, args);
},
},
wsServer,
);

将所有这些放在一起,useServer.context 函数返回一个 对象,即 contextValue,它可以在你的解析器中使用。

请注意,context 选项在每个 subscription 请求上只调用一次,而不是在每个事件发出时调用一次。这意味着在上述示例中,每次客户端发送一个 subscription 操作时,我们都会检查他们的身份验证令牌。

onConnectonDisconnect

你可以配置 subscription 服务器在客户端连接(onConnect)或断开连接(onDisconnect)时的行为。

定义一个 onConnect 函数,你可以通过返回 false 或抛出异常来拒绝特定的传入连接。这在你想要在客户端首次连接到你的 subscription 服务器时检查身份验证时非常有用。

你将这些函数作为选项传递给 useServer 的第一个参数,如下所示:

useServer(
{
schema,
// As before, ctx is the graphql-ws Context where connectionParams live.
onConnect: async (ctx) => {
// Check authentication every time a client connects.
if (tokenIsNotValid(ctx.connectionParams)) {
// You can return false to close the connection or throw an explicit error
throw new Error('Auth token missing!');
}
},
onDisconnect(ctx, code, reason) {
console.log('Disconnected!');
},
},
wsServer,
);

有关使用 onConnectonDisconnect 的更多示例,请参阅 graphql-ws 菜单文档

示例:与 Apollo Client 进行身份验证

如果你计划使用 subscriptions,请确保你的客户端和服务器订阅协议与你要使用的订阅库(此示例使用 graphql-ws 库)一致。

Apollo 客户端 中,GraphQLWsLink 构造函数支持向 connectionParams示例)添加信息。这些 connectionParams 在连接时发送到您的服务器,允许您通过访问 Context.connectionParams 从客户端请求中提取信息。

假设我们创建我们的 subscription 客户端如下所示:

import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { createClient } from 'graphql-ws';
const wsLink = new GraphQLWsLink(
createClient({
url: 'ws://127.0.0.1:4000/subscriptions',
connectionParams: {
authentication: user.authToken,
},
}),
);

包含客户端提供信息的 connectionParams 参数()被传递到您的服务器,使您能够通过验证用户凭据。

从那里您可以使用 useServer.context 属性 验证用户,返回一个对象,在执行过程中将该对象作为 context 参数传递给您的 resolvers

在示例中,我们可以使用客户端提供的 connectionParams.authentication 值查找相关用户,然后再将用户传递给我们的 resolvers

const findUser = async (authToken) => {
// Find a user by their auth token
};
const getDynamicContext = async (ctx, msg, args) => {
if (ctx.connectionParams.authentication) {
const currentUser = await findUser(ctx.connectionParams.authentication);
return { currentUser };
}
// Let the resolvers know we don't have a current user so they can
// throw the appropriate error
return { currentUser: null };
};
// ...
useServer(
{
// Our GraphQL schema.
schema,
context: async (ctx, msg, args) => {
// This will be run every time the client sends a subscription request
return getDynamicContext(ctx, msg, args);
},
},
wsServer,
);

总的来说,上面示例中基于每个 subscription 请求的认证令牌查找用户,在将用户对象返回给我们的 resolvers 使用之前。如果不存在用户或查找失败,我们的 resolvers 可以抛出错误,并且相应的 subscription 操作不会执行。

生产 PubSub

如上所述PubSub 类不建议用于生产环境,因为其事件发布系统是内存中的。这意味着 一个 实例发布的 的事件不会被由 其他 实例处理的 subscriptions 收到。

相反,您应该使用可以将外部数据存储(例如 Redis 或 Kafka)作为后端的 PubSubEngine 抽象类 的子类。

以下是一些社区创建的针对流行事件发布系统的 PubSub 库:

如果没有这些库符合您的用途,您也可以创建自己的PubSubEngine子类。如果您创建了一个新的开源库,请点击GitHub上的编辑让我们知道!

subscriptions-transport-ws切换

如果您使用 subscriptions 与 Apollo 客户端,您必须确保 您使用的客户端和服务器的订阅协议保持一致性

本文先前演示了使用 subscriptions-transport-ws库来设置订阅。但是,此库现已经不再活跃维护。您仍然可以与 Apollo Server 使用它,但我们强烈建议使用 graphql-ws 取代。

有关如何从 subscriptions-transport-ws转到 graphql-ws的详细信息,请参阅 Apollo Server 3 文档 中的步骤。

更新订阅客户端

如果您打算 subscriptions-transport-ws 切换到 graphql-ws,则需要更新以下客户端:

客户端名称使用 graphql-ws (推荐)使用 subscriptions-transport-ws

Apollo Studio Explorer

graphql-ws

subscriptions-transport-ws

Apollo Client Web

使用 GraphQLWsLink
(需要 v3.5.10 或更高版本)

使用 WebSocketLink

Apollo iOS

graphql_transport_ws
(需要 v0.51.0 或更高版本)

graphql_ws

Apollo Kotlin

GraphQLWsProtocol
(需要 v3.0.0 或更高版本)

SubscriptionWsProtocol

上一页
错误处理
下一页
概述
评分文章评分在GitHub上编辑Edit论坛Discord

©2024Apollo Graph Inc.,经营名称 Apollo GraphQL。

隐私政策

公司