[GRAPHQL] Subscriptions (ok)

https://www.apollographql.com/docs/apollo-server/data/subscriptions/

Đăng ký là hoạt động đọc GraphQL lâu dài có thể cập nhật kết quả của chúng bất cứ khi nào một sự kiện phía máy chủ cụ thể xảy ra. Thông thường nhất, các kết quả cập nhật được đẩy từ máy chủ đến các khách hàng đăng ký. Ví dụ: máy chủ của ứng dụng trò chuyện có thể sử dụng đăng ký để đẩy các tin nhắn mới nhận được đến tất cả các khách hàng trong một phòng trò chuyện cụ thể.

Vì các bản cập nhật đăng ký thường được đẩy bởi máy chủ (thay vì được khách hàng thăm dò), chúng thường sử dụng giao thức WebSocket thay vì HTTP. Để hỗ trợ điều này, Apollo Server cho phép bạn đặt điểm cuối dành riêng cho đăng ký tách biệt với điểm cuối mặc định cho các truy vấn và đột biến.

Bạn có thể sử dụng đăng ký với thư viện apollo-server cốt lõi hoặc với bất kỳ tích hợp phần mềm trung gian nào được hỗ trợ của Apollo Server.

Quan trọng: So với truy vấn và đột biến, đăng ký phức tạp hơn đáng kể để triển khai. Trước khi bạn bắt đầu, hãy xác nhận rằng trường hợp sử dụng của bạn yêu cầu đăng ký.

Loại Đăng ký trong lược đồ của bạn xác định các trường cấp cao nhất mà khách hàng có thể đăng ký:

type Subscription {
  postCreated: Post
}

Trường postCreate sẽ cập nhật giá trị của nó bất cứ khi nào một Bài đăng mới được tạo trên chương trình phụ trợ, do đó đẩy Bài đăng đến các khách hàng đăng ký.

Clients can subscribe to the postCreated field with a GraphQL string like this:

subscription PostFeed {
  postCreated {
    author
    comment
  }
}

Mỗi thao tác đăng ký chỉ có thể đăng ký một trường của loại Đăng ký.

Bởi vì đăng ký sử dụng WebSocket thay vì HTTP, Apollo Server sử dụng điểm cuối GraphQL thứ hai dành riêng cho đăng ký. Điểm cuối này sử dụng giao thức ws thay vì http.

Theo mặc định, đường dẫn của điểm cuối đăng ký khớp với đường dẫn của điểm cuối GraphQL chính của bạn (/ graphql nếu chưa được đặt). Bạn có thể chỉ định một đường dẫn khác cho điểm cuối đăng ký của mình như sau:

const server = new ApolloServer({
  subscriptions: {
    path: '/subscriptions'
  },
  // ...other options...
}));

Trình phân giải cho các trường Đăng ký khác với trình phân giải cho các trường thuộc loại khác. Cụ thể, trình phân giải trường Đăng ký là các đối tượng xác định chức năng đăng ký:

index.js

const resolvers = {
  Subscription: {
    postCreated: {
      // More on pubsub below
      subscribe: () => pubsub.asyncIterator(['POST_CREATED']),
    },
  },
  // ...other resolvers...
};

The subscribe function must return an object of type AsyncIterator, a standard interface for iterating over asynchronous results. In the example above, an AsyncIterator is generated by pubsub.asyncIterator (more on this below).

Hàm đăng ký phải trả về một đối tượng kiểu AsyncIterator, một giao diện tiêu chuẩn để lặp qua các kết quả không đồng bộ. Trong ví dụ trên, một AsyncIterator được tạo bởi pubsub.asyncIterator (thêm về điều này bên dưới).

Lớp PubSub không được khuyến nghị cho môi trường sản xuất, vì nó là hệ thống sự kiện trong bộ nhớ chỉ hỗ trợ một phiên bản máy chủ duy nhất. Sau khi bạn nhận được các đăng ký đang hoạt động trong quá trình phát triển, chúng tôi thực sự khuyên bạn nên chuyển nó sang một lớp con khác của lớp PubSubEngine trừu tượng. Các lớp con được đề xuất được liệt kê trong thư viện Production PubSub.

Apollo Server uses a publish-subscribe (pub/sub) model to track events that update active subscriptions. The graphql-subscriptions library (included in every apollo-server package) provides the PubSub class as a basic in-memory event bus to help you get started:

Apollo Server sử dụng mô hình đăng ký xuất bản (pub / sub) để theo dõi các sự kiện cập nhật các đăng ký đang hoạt động. Thư viện graphql-subscribe (bao gồm trong mọi gói apollo-server) cung cấp lớp PubSub như một bus sự kiện trong bộ nhớ cơ bản để giúp bạn bắt đầu:

const { PubSub } = require('apollo-server');
const pubsub = new PubSub();

Một phiên bản PubSub cho phép mã máy chủ của bạn vừa xuất bản các sự kiện lên một nhãn cụ thể vừa lắng nghe các sự kiện được liên kết với một nhãn cụ thể.

Bạn xuất bản một sự kiện bằng phương pháp xuất bản của một phiên bản PubSub:

pubsub.publish('POST_CREATED', {
  postCreated: {
    author: 'Ali Baba',
    comment: 'Open sesame'
  }
});
  • The first parameter is the name of the event label you're publishing to, as a string.

    • You don't need to register a label name before publishing to it.

  • The second parameter is the payload associated with the event.

    • The payload should include whatever data is necessary for your resolvers to populate the associated Subscription field and its subfields

  • Tham số đầu tiên là tên của nhãn sự kiện mà bạn đang xuất bản, dưới dạng một chuỗi.

    • Bạn không cần phải đăng ký tên nhãn trước khi xuất bản lên nó.

  • Tham số thứ hai là trọng tải được liên kết với sự kiện.

    • Tải trọng phải bao gồm bất kỳ dữ liệu nào cần thiết để trình phân giải của bạn điền vào trường Đăng ký được liên kết và các trường con của nó.

Khi làm việc với đăng ký GraphQL, bạn xuất bản một sự kiện bất cứ khi nào giá trị trả về của đăng ký được cập nhật. Một nguyên nhân phổ biến của bản cập nhật như vậy là một đột biến, nhưng bất kỳ logic back-end nào cũng có thể dẫn đến các thay đổi cần được xuất bản.

As an example, let's say our GraphQL API supports a createPost mutation:

type Mutation {
  createPost(author: String, comment: String): Post
}

A basic resolver for createPost might look like this:

const resolvers = {
  Mutation: {
    createPost(parent, args, context) {
      // Datastore logic lives in postController
      return postController.createPost(args);
    },
  },
  // ...other resolvers...
};

Trước khi chúng tôi duy trì thông tin chi tiết của bài đăng mới trong kho dữ liệu của mình, chúng tôi có thể xuất bản một sự kiện cũng bao gồm các chi tiết đó:

const resolvers = {
  Mutation: {
    createPost(parent, args, context) {
      pubsub.publish('POST_CREATED', { postCreated: args });
      return postController.createPost(args);
    },
  },
  // ...other resolvers...
};

Next, we can listen for this event in our Subscription field's resolver.

An AsyncIterator object listens for events that are associated with a particular label (or set of labels) and adds them to a queue for processing. You create an AsyncIterator by calling the asyncIterator method of PubSub:

Đối tượng AsyncIterator lắng nghe các sự kiện được liên kết với một nhãn cụ thể (hoặc tập hợp các nhãn) và thêm chúng vào một hàng đợi để xử lý. Bạn tạo một AsyncIterator bằng cách gọi phương thức asyncIterator của PubSub:

pubsub.asyncIterator(['POST_CREATED']);

You pass this method an array containing the names of all event labels that the AsyncIterator should listen for.

Bạn truyền cho phương thức này một mảng chứa tên của tất cả các nhãn sự kiện mà AsyncIterator sẽ lắng nghe.

Every Subscription field resolver's subscribe function must return an AsyncIterator object. This brings us back to the code sample at the top of Resolving a subscription:

Mọi chức năng đăng ký của trình phân giải trường Đăng ký phải trả về một đối tượng AsyncIterator. Điều này đưa chúng ta trở lại mẫu mã ở đầu Giải quyết đăng ký:

index.js

const resolvers = {
  Subscription: {
    postCreated: {
      subscribe: () => pubsub.asyncIterator(['POST_CREATED']),
    },
  },
  // ...other resolvers...
};

With this subscribe function set, Apollo Server uses the payloads of POST_CREATED events to push updated values for the postCreated field.

Với bộ chức năng đăng ký này, Apollo Server sử dụng trọng tải của các sự kiện POST_CREATED để đẩy các giá trị cập nhật cho trường postCreate.

Đôi khi, khách hàng sẽ chỉ nhận được dữ liệu đăng ký cập nhật nếu dữ liệu đó đáp ứng các tiêu chí nhất định. Để hỗ trợ điều này, bạn có thể gọi hàm trợ giúp withFilter trong trình phân giải của trường Đăng ký của bạn.

Example

Let's say our server provides a commentAdded subscription, which should notify clients whenever a comment is added to a specified code repository. A client can execute a subscription that looks like this:

Giả sử máy chủ của chúng tôi cung cấp đăng ký commentAdded, đăng ký này sẽ thông báo cho khách hàng bất cứ khi nào nhận xét được thêm vào kho mã được chỉ định. Khách hàng có thể thực hiện một đăng ký giống như sau:

subscription($repoName: String!){
  commentAdded(repoFullName: $repoName) {
    id
    content
  }
}

This presents a potential issue: our server probably publishes a COMMENT_ADDED event whenever a comment is added to any repository. This means that the commentAdded resolver executes for every new comment, regardless of which repository it's added to. As a result, subscribing clients might receive data they don't want (or shouldn't even have access to).

To fix this, we can use the withFilter helper function to control updates on a per-client basis.

Here's an example resolver for commentAdded that uses the withFilter function:

Điều này cho thấy một vấn đề tiềm ẩn: máy chủ của chúng tôi có thể xuất bản sự kiện COMMENT_ADDED bất cứ khi nào nhận xét được thêm vào bất kỳ kho lưu trữ nào. Điều này có nghĩa là trình phân giải commentAdded thực thi mọi nhận xét mới, bất kể nó được thêm vào kho lưu trữ nào. Do đó, khách hàng đăng ký có thể nhận được dữ liệu mà họ không muốn (hoặc thậm chí không nên có quyền truy cập).

Để khắc phục điều này, chúng ta có thể sử dụng chức năng trợ giúp withFilter để kiểm soát các bản cập nhật trên cơ sở từng máy khách.

Đây là một ví dụ về trình phân giải cho commentAdded sử dụng hàm withFilter:

const { withFilter } = require('apollo-server');

const resolvers = {
  Subscription: {

    commentAdded: {
      subscribe: withFilter(
        () => pubsub.asyncIterator('COMMENT_ADDED'),
        (payload, variables) => {
          // Only push an update if the comment is on
          // the correct repository for this operation
          return (payload.commentAdded.repository_name === variables.repoFullName);
        },
      ),
    }
  },
    // ...other resolvers...
};

The withFilter function takes two parameters:

  • The first parameter is exactly the function you would use for subscribe if you weren't applying a filter.

  • The second parameter is a filter function that returns true if a subscription update should be sent to a particular client, and false otherwise (Promise<boolean> is also allowed). This function takes two parameters of its own:

    • payload is the payload of the event that was published.

    • variables is an object containing all arguments the client provided when initiating their subscription.

Use withFilter to make sure clients get exactly the subscription updates they want (and are allowed to receive).

Hàm withFilter nhận hai tham số:

  • Tham số đầu tiên chính xác là hàm bạn sẽ sử dụng để đăng ký nếu bạn không áp dụng bộ lọc.

  • Tham số thứ hai là một hàm bộ lọc trả về true nếu một bản cập nhật đăng ký được gửi đến một ứng dụng khách cụ thể và false nếu không thì (Promise cũng được phép). Hàm này nhận hai tham số của riêng nó:

    • payloadlà trọng tải của sự kiện đã được xuất bản.

    • variables là một đối tượng chứa tất cả các đối số mà khách hàng cung cấp khi bắt đầu đăng ký của họ.

Sử dụng withFilter để đảm bảo khách hàng nhận được chính xác các bản cập nhật đăng ký mà họ muốn (và được phép nhận).

Máy chủ mẫu này hiển thị một đăng ký (numberIncremented) trả về một số nguyên được tăng lên trên máy chủ mỗi giây. Ví dụ này chỉ yêu cầu thư viện apollo-server.

Sau khi khởi động máy chủ này, bạn có thể chạy thử đăng ký với Apollo Studio Explorer hoặc GraphQL Playground, như được mô tả trong hướng dẫn toàn ngăn xếp. Bạn sẽ thấy cập nhật giá trị của đăng ký mỗi giây.

Trong Apollo Studio Explorer, bạn phải chỉ định điểm cuối đăng ký của máy chủ của mình (ws: // localhost: 4000 / subscribe) trong tab Cài đặt Explorer.

const { ApolloServer, PubSub, gql } = require('apollo-server');
const pubsub = new PubSub();
const PORT = 4000;

// Schema definition
const typeDefs = gql`
  type Query {
    currentNumber: Int
  }

  type Subscription {
    numberIncremented: Int
  }
`;

// Resolver map
const resolvers = {
  Query: {
    currentNumber() {
      return currentNumber;
    }
  },
  Subscription: {
    numberIncremented: {
      subscribe: () => pubsub.asyncIterator(['NUMBER_INCREMENTED']),
    },
  }
};

const server = new ApolloServer({
  typeDefs,
  resolvers,
  subscriptions: {
    path: '/subscriptions',
    onConnect: (connectionParams, webSocket, context) => {
      console.log('Client connected');
    },
    onDisconnect: (webSocket, context) => {
      console.log('Client disconnected')
    },
  },
});

let currentNumber = 0;
function incrementNumber() {
  currentNumber++;
  pubsub.publish('NUMBER_INCREMENTED', { numberIncremented: currentNumber });
  setTimeout(incrementNumber, 1000);
}

server.listen().then(({ url }) => {
  console.log(`🚀 Subscription endpoint ready at ws://localhost:${PORT}${server.subscriptionsPath}`)
  console.log('Query at studio.apollographql.com/dev')
});

// Start incrementing
incrementNumber();

When initializing context for a query or mutation, you usually extract HTTP headers and other request metadata from the req object provided to the context function.

For subscriptions, you extract this metadata from the connection object instead. This object adheres to the ExecutionParams interface.

Because all operation types use the same context initialization function, you should check which of req or connection is present for each incoming request:

Khi khởi tạo ngữ cảnh cho một truy vấn hoặc đột biến, bạn thường trích xuất tiêu đề HTTP và siêu dữ liệu yêu cầu khác từ đối tượng yêu cầu được cung cấp cho hàm ngữ cảnh.

Đối với đăng ký, bạn trích xuất siêu dữ liệu này từ đối tượng kết nối. Đối tượng này tuân theo giao diện ExecutionParams.

Bởi vì tất cả các loại hoạt động sử dụng cùng một chức năng khởi tạo ngữ cảnh, bạn nên kiểm tra yêu cầu hoặc kết nối nào hiện có cho mỗi yêu cầu đến:

const server = new ApolloServer({
  context: ({ req, connection }) => {
    if (connection) { // Operation is a Subscription
      // Obtain connectionParams-provided token from connection.context
      const token = connection.context.authorization || "";
      return { token };
    } else { // Operation is a Query/Mutation
      // Obtain header-provided token from req.headers
      const token = req.headers.authorization || "";
      return { token };
    }
  },
});

Điều này đặc biệt quan trọng vì siêu dữ liệu như mã thông báo xác thực được gửi khác nhau tùy thuộc vào quá trình vận chuyển.

You can define functions that Apollo Server executes whenever a subscription request connects (onConnect) or disconnects (onDisconnect).

Defining an onConnect function provides the following benefits:

  • You can reject a particular incoming connection by throwing an exception or returning false in onConnect.

  • If onConnect returns an object, that object's fields are added to the WebSocket connection's context object.

    • This is not the operation context that's passed between resolvers. However, you can transfer these values from the connection's context when you initialize operation context.

Bạn có thể xác định các chức năng mà Apollo Server thực thi bất cứ khi nào yêu cầu đăng ký kết nối (onConnect) hoặc ngắt kết nối (onDisconnect).

Việc xác định chức năng onConnect cung cấp các lợi ích sau:

  • Bạn có thể từ chối một kết nối đến cụ thể bằng cách ném một ngoại lệ hoặc trả về false trong onConnect.

    • Điều này đặc biệt hữu ích cho việc xác thực.

  • Nếu onConnect trả về một đối tượng, các trường của đối tượng đó sẽ được thêm vào đối tượng ngữ cảnh của kết nối WebSocket.

    • Đây không phải là ngữ cảnh hoạt động được truyền giữa các trình phân giải. Tuy nhiên, bạn có thể chuyển các giá trị này từ ngữ cảnh của kết nối khi bạn khởi tạo ngữ cảnh hoạt động.

Bạn cung cấp các định nghĩa hàm này cho phương thức khởi tạo của ApolloServer, như sau:

const server = new ApolloServer(
  subscriptions: {
    onConnect: (connectionParams, webSocket, context) => {
      console.log('Connected!')
    },
    onDisconnect: (webSocket, context) => {
      console.log('Disconnected!')
    },
    // ...other options...
  },
);

Các hàm này được truyền các tham số sau:

connectionParams

Object

Passed to onConnect only.

An object containing parameters included in the request, such as an authentication token.

For details, see Authenticate over WebSocket in the Apollo Client documentation.

webSocket

WebSocket

The connecting or disconnecting WebSocket.

context

ConnectionContext

Context object for the WebSocket connection. This is not the context object for the associated subscription operation.

See fields

connectionParams

Object

Chỉ được chuyển cho onConnect.

Một đối tượng chứa các tham số được bao gồm trong yêu cầu, chẳng hạn như mã thông báo xác thực.

Để biết chi tiết, hãy xem Xác thực qua WebSocket trong tài liệu Apollo Client.

webSocket

WebSocket

Kết nối hoặc ngắt kết nối WebSocket.

context

ConnectionContext

Đối tượng ngữ cảnh cho kết nối WebSocket. Đây không phải là đối tượng ngữ cảnh cho hoạt động đăng ký được liên kết. Xem các lĩnh vực

On the client, SubscriptionsClient supports adding token information to connectionParams (example) that will be sent with the first WebSocket message. In the server, all GraphQL subscriptions are delayed until the connection has been fully authenticated and the onConnect callback returns a truthy value.

The connectionParams argument in the onConnect callback contains the information passed by the client and can be used to validate user credentials. The GraphQL context can also be extended with the authenticated user data to enable fine grain authorization.

Trên máy khách, SubscriberClient hỗ trợ thêm thông tin mã thông báo vào connectionParams (ví dụ) sẽ được gửi cùng với thông báo WebSocket đầu tiên. Trong máy chủ, tất cả các đăng ký GraphQL bị trì hoãn cho đến khi kết nối được xác thực hoàn toàn và lệnh gọi lại onConnect trả về giá trị trung thực.

Đối số connectionParams trong lệnh gọi lại onConnect chứa thông tin do máy khách chuyển và có thể được sử dụng để xác thực thông tin xác thực của người dùng. Ngữ cảnh GraphQL cũng có thể được mở rộng với dữ liệu người dùng đã xác thực để cho phép ủy quyền hạt mịn.

const { ApolloServer } = require('apollo-server');
const { resolvers, typeDefs } = require('./schema');

const validateToken = authToken => {
  // ... validate token and return a Promise, rejects in case of an error
};

const findUser = authToken => {
  return tokenValidationResult => {
    // ... finds user by auth token and return a Promise, rejects in case of an error
  };
};

const server = new ApolloServer({
  typeDefs,
  resolvers,
  subscriptions: {
    onConnect: (connectionParams, webSocket) => {
      if (connectionParams.authToken) {
        return validateToken(connectionParams.authToken)
          .then(findUser(connectionParams.authToken))
          .then(user => {
            return {
              currentUser: user,
            };
          });
      }

      throw new Error('Missing auth token!');
    },
  },
});

server.listen().then(({ url, subscriptionsUrl }) => {
  console.log(`🚀 Server ready at ${url}`);
  console.log(`🚀 Subscriptions ready at ${subscriptionsUrl}`);
});

Ví dụ trên xác thực mã thông báo của người dùng được gửi cùng với thông báo khởi tạo đầu tiên trên phương tiện truyền tải, sau đó nó tra cứu người dùng và trả về đối tượng người dùng dưới dạng Lời hứa. Đối tượng người dùng được tìm thấy sẽ có sẵn dưới dạng context.currentUser trong các trình phân giải GraphQL của bạn.

Trong trường hợp xảy ra lỗi xác thực, Lời hứa sẽ bị từ chối, điều này ngăn cản kết nối của khách hàng.

Bạn có thể sử dụng đăng ký với bất kỳ tích hợp phần mềm trung gian nào được hỗ trợ của Apollo Server. Để làm như vậy, bạn gọi installSubscriptionHandlers trên phiên bản ApolloServer của mình.

Ví dụ này cho phép đăng ký một máy chủ Express sử dụng apollo-server-express:

const http = require('http');
const { ApolloServer } = require('apollo-server-express');
const express = require('express');

const PORT = 4000;
const app = express();
const server = new ApolloServer({ typeDefs, resolvers });

server.applyMiddleware({app})

const httpServer = http.createServer(app);

server.installSubscriptionHandlers(httpServer);

// Make sure to call listen on httpServer, NOT on app.
httpServer.listen(PORT, () => {
  console.log(`🚀 Server ready at http://localhost:${PORT}${server.graphqlPath}`)
  console.log(`🚀 Subscriptions ready at ws://localhost:${PORT}${server.subscriptionsPath}`)
})

As mentioned above, the PubSub class is not recommended for production environments, because its event-publishing system is in-memory. This means that events published by one instance of your GraphQL server are not received by subscriptions that are handled by other instances.

Instead, you should use a subclass of the PubSubEngine abstract class that you can back with an external datastore such as Redis or Kafka.

Như đã đề cập ở trên, lớp PubSub không được khuyến nghị cho môi trường sản xuất vì hệ thống xuất bản sự kiện của nó nằm trong bộ nhớ. Điều này có nghĩa là các sự kiện được xuất bản bởi một phiên bản của máy chủ GraphQL của bạn sẽ không nhận được các đăng ký do các phiên bản khác xử lý.

Thay vào đó, bạn nên sử dụng một lớp con của lớp trừu tượng PubSubEngine mà bạn có thể quay lại với một kho dữ liệu bên ngoài như Redis hoặc Kafka.

Sau đây là các thư viện PubSub do cộng đồng tạo cho các hệ thống xuất bản sự kiện phổ biến:

Last updated

Navigation

Lionel

@Copyright 2023