Event Streaming
Event Streaming is a feature that notifies a client application of database changes as they occur. A subscribing application can use streaming to track changes without running ongoing polling queries. With streaming, client applications subscribe to changes on a target set of documents in the database identified by an FQL query.
Some use cases where event streaming is recommended:
-
Maintain "live" application state: Client apps or browsers are made aware immediately as data changes. Users can track live e-commerce inventory, auction status, and productivity workflow state.
-
Brokered communications, documents, and collections can be used as message channels between multiple parties. A chat application is a good example of such a case.
-
Publish-subscribe (pub-sub) use cases, including updating external services and data stores, and asynchronous work queuing. An application client can integrate with a message queue, inserting event notifications from Fauna into a Kafka topic, consumed by several downstream consumers, or data stores, such as a full-text search engine.
Create a stream
Define a stream
To create a stream for a set of documents, append the toStream()
method to a
query that returns the set.
For example, an e-commerce application manages a catalog of products in a
collection named Product
. The definition of this collection, which is used
in this guide, is as follows:
collection Product {
index byType {
terms [ .type ]
values [ .name, .price ]
}
}
A business requirement of the application may be to take some administrative
action when a change occurs in the Product
collection. In other words, we
want to get a notification when a document is added, removed, or changed in
the Product
collection. Fauna streaming is based on FQL. So, creating a
stream that captures these notifications is simply a matter of initializing the
stream with a query that captures the target set.
Product.all().toStream()
Notice that the base query Product.all()
is a full collection scan, which when
issued as a normal query returns all documents in a collection. That same query
may be used to define the set of documents to be subscribed to. To create a
stream, append the toStream()
function to the query that defines the set to be
streamed. The toStream()
function returns a stream token, which is used to
start the stream subscription.
Filter
Some use cases only require notifications for changes to a specific subset of documents in a collection. Because Fauna streaming is based on FQL queries, it is easy to filter, server-side, for that set of desired documents. Targeting for your desired set of documents with a stream query reduces data on the wire, and eliminates the need to filter our undesired event in your client application.
Consider the example requirement of our e-commerce application. It’s necessary
to receive notifications on products of the category book
that are less than
100
in price
. The following stream query satisfies that requirement:
Product.where( .type == 'book' && .price < 100).toStream()
The stream query specifies the exact set of documents that will be subscribed to. Event notifications will be sent on the stream only when a change occurs in the set of documents defined by the stream query, such as when a document is added, deleted, or changed. For example, each of the following database operations would trigger a notification on the stream defined above.
Product.create( {
name: "Music as History",
type: "book",
price: 16.89,
quantity: 20
})
let p = Product.byType('book')
.where(.name == "Music as History" ).first()!
p.update({ quantity: p.quantity + 10 })
Product.byType('book')
.where(.name == "Music as History" ).first()!
.update({ price: 110 })
Notice that the last example write operation, which increased the price
of the
book titled Music as History
to 110
, still triggers a notification event on
the stream. This is because the book was changed in a way that left the set of
documents the stream subscribed to. Although a notification was sent
when the document left the set, any changes that occur to that document will no
longer trigger an event notification because it no longer meets the criteria of
the stream query.
Track changes on specific fields
In addition to specifying what set of documents that a stream is subscribed to, users may also define the set of fields in a document that will trigger a notification. Consider the case where the application needs to receive a notification whenever the price of a product in a target category changes. An example scenario is that a user is browsing for products of type electronics. The UI of the web application needs to be updated when the price of a product the user is browsing changes. The following stream query serves the requirements.
Product.byType('electronics').changesOn(.price)
With this stream query, the subscribing client will only receive notifications
whenever the price
of a Product
of type electronics
changes.
Stream changes on a single document
In some use cases, streaming on a single document is appropriate. For instance, consider that a user has placed a given product in their shopping cart. During the time the product is in the cart, the quantity may drop as other users purchase the same product. A requirement of the application is the user be notified that the quantity of a product is changing while it is in their cart. The following query would meet this requirement but is inefficient.
Product.where(.id == 390896421523423744).toStream()
The above stream is highly inefficient as it watches for changes in the entire
Product
collection but only produces events for one of them. Users can
optimize this use case by creating a singleton set of a single product.
Set.single(Product.byId("390896421523423744")).toStream()
Optimize stream performance with indexes
Just as we strongly recommend the use of indexed queries for production workloads, we also recommend the use of indexes to increase the performance of stream queries. While an unindexed streaming query is allowed, your stream queries will be more performant and cost-effective when an index is used to serve them.
Notice that many of the preceding stream query examples used the byType
index
in our Product
collection definition at the top of this page. This index uses
the type
field as the index term, and the price
field as the index value.
This index has been designed to suit the query patterns we’re streaming on.
Therefore, the stream is defined as:
Product.where( .type == 'book' && .price < 100).toStream()
It would be far more efficient in cost and time if it were modified, as follows,
to use the byType
index:
Product.byType('book').where( .price < 100 ).toStream()
There is a trade-off between streams on a collection versus streams on an index: index streams only produce events when fields in their configured terms or values change. So, while the use of an index increases the efficiency of a stream query, users need to design their indexes such that they fit the change events they need to be subscribed to. For example, given the stream query above, the following write operation would not trigger a notification.
let p = Product.byType('book')
.where(.name == "Music as History" ).first()!
p.update({ quantity: p.quantity - 1 })
In this example, even though the book Music as History
meets the query
criteria of the indexed stream query, none of the fields covered by the index
changed as a result of the write operation. Yes, the quantity field did change,
but that is not a field that is covered by the index.
The need to have fields covered by the index extends to the use of changesOn()
as well. Any field users wish to trigger a notification on in an index stream
query must be covered by the index. The following example query would not be
accepted by Fauna as a legal stream request, because the quantity
field is
neither a term nor a value of the byType
index.
Product.byType('book').where(.price < 100 ).changesOn(.quantity)
Projecting only the data you need into the stream
By default, add
and update
events project the entire document that was
modified, even when an index was used. In cases where it’s unnecessary to stream
the entire document, users may add a projection to the stream query to suit
their needs. Consider this example stream query, which projects only the name
and price
fields.
Product.byType('book').where( .price < 100 ).toStream(){name, price}
Only the document’s name
and price
fields will be returned in events on this
stream.
{
type: 'add',
data: {
name: 'The Fraud',
price: 80
},
txn_ts: 1710456655930000,
stats: {
...
}
}
Consume streams using Fauna’s client drivers
Applications consume streams with the use of a Fauna client driver. Use the
Client.stream()
function to initialize the stream. Take the following example,
where our earlier stream query is used to subscribe to the stream.
import { Client, fql } from "fauna";
const client = new Client();
const booksQuery = fql`Product.where(.type == 'book' && .price < 100).toStream()`;
const stream = client.stream(booksQuery);
try {
for await (const event of stream) {
switch (event.type) {
case "add":
case "update":
// Do something on update
console.log(event.data);
break;
case "remove":
// Do something on remove
console.log(event.data);
break;
}
}
} catch (error) {
console.log(error);
}
Event types
Write events that occur in the database will generate a set of notification types in the stream, based on what type of write occurred. You can see in the preceding JavaScript example that the various streaming events are handled with a switch statement. The set of event types are:
-
Add: Occurs whenever a new document has been created that matches the stream query’s criteria. Or, whenever a new document has been updated that matches the stream query’s criteria. Essentially, add events occur whenever a document enters the set defined by the stream query.
-
Remove: Occurs when a document leaves the set defined by the stream query. This can happen when a document that meets the stream query’s criteria is deleted. This can also happen when a document is updated to no longer meet the stream query’s criteria.
-
Update: Occurs when a field in a document has been updated that meets the stream query criteria.
-
Status: Occurs periodically, and serves to update the subscribing client, similar to a keep-alive, during periods when events are either not occurring on the database, or when events are filtered out of the stream server-side.
Your application code will need to handle the occurrence of any of these events.
Event shapes
Events sent through the stream include a field named type
, which identifies
the type of event that has occurred. This will be a string with a value of
status
, add
, remove
, or update
, as described above. The data
field
will include the entire document by default. In the case that there is a
projection in the stream query, then the data field will include only the fields
defined in the projection.
For example, for the stream query Product.byType('book').where(.price <
100).toStream()
, creating the following document.
Product.createData({
type: 'book',
name: 'The Fraud',
quantity: "55",
price: 23.0
})
Will generate the following notification:
{
type: 'add',
data: Document {
coll: Module { name: 'Product' },
id: '392372948586987553',
ts: TimeStub { isoString: '2024-03-14T22:20:53.520Z' },
type: 'book',
name: 'The Fraud',
quantity: '55',
price: 23
},
txn_ts: 1710454853520000,
stats: {
read_ops: 1,
storage_bytes_read: 109,
compute_ops: 1,
processing_time_ms: 1,
rate_limits_hit: []
}
}
Had the streaming query had the following projection of the name
and price
fields; Product.byType('book').where(.price < 100).toStream(){ name, price }
,
the update event would have had the following shape:
{
type: 'add',
data: { name: 'The Fraud', price: 23 },
txn_ts: 1710454853520000,
stats: {
read_ops: 1,
storage_bytes_read: 109,
compute_ops: 1,
processing_time_ms: 0,
rate_limits_hit: []
}
}
Stream snapshots
In some use cases, it’s necessary to first load a set of documents into the application that you then wish to keep updated with changes occurring in the database. Take the use case of presenting a leader board of a multiplayer gaming application as an example. As each player logs on to our multiplayer game, the leader board should be loaded into each player’s UI and kept up to date with player rankings and positions as each player’s score changes.
To serve such use cases, Fauna streaming allows you to both query for a set
of documents, and open a stream on that same set of documents, all in the same
operation. Consider the following code snippet, which builds on our earlier
examples. The following code queries for the set of products of type book
that are less than 100
in price. It starts an event stream on that same set.
import { Client, fql } from "fauna";
const client = new Client();
const booksQuery = fql`
let products = Product.where( .type == 'book' && .price < 100 )
{
products: products,
streamToken: products.toStream()
}`;
const response = await client.query(booksQuery);
const { products, streamToken } = response.data;
// Paginate through all matching products
for await (const product of client.paginate(products)) {
console.log(product);
}
const stream = client.stream(streamToken);
try {
for await (const event of stream) {
switch (event.type) {
case "add":
case "update":
// Do something on update
console.log(event.data);
break;
case "remove":
// Do something on remove
console.log(event.data);
break;
}
}
} catch (error) {
console.log(error);
}
Notice in the FQL query string, that the set of Product
documents returned
by the query is assigned to the variable products
. That set is returned to the
client using a projection to the products
field of the object returned in the
query operation. The toStream()
function is called on the same products
set,
and that stream is projected into the streamToken
field of the query result
object. The application code then iterates on the initial seed set of documents
returned by the query and then awaits for new events on that set.
Is this article helpful?
Tell Fauna how the article can be improved:
Visit Fauna's forums
or email docs@fauna.com
Thank you for your feedback!