Control results flow with reactive streams
In a reactive flow, consumers dictate the rate at which they consume records from queries, and the driver in turn manages the rate at which records are requested from the server.
An example use-case is an application fetching records from a Neo4j server and doing some very time-consuming post-processing on each one. If the server were allowed to push records to the client as soon as it has them available, the client could be overflown with a lot of entries while its processing is still lagging behind. The Reactive API ensures that the receiving side is not forced to buffer arbitrary amounts of data.
The Reactive API is recommended for applications that already work in a reactive programming style, and which have needs that only Reactive workflows can address. |
Install dependencies
To use reactive features, you need to add the relevant dependencies to your project first.
The driver’s reactive implementation lives in the Neo4j.Driver.Reactive
NuGet package (although symbols are defined in the regular Neo4j.Driver
namespace).
dotnet add package Neo4j.Driver.Reactive
Reactive query examples
The basic driver’s concepts are the same as the synchronous case, except that queries are run through a RxSession
, and the objects related to querying have a reactive counterpart and prefix.
Managed transaction with reactive sessions
.executeRead()
exampleusing Neo4j.Driver;
const string dbUri = "<database-uri>";
const string dbUser = "<username>";
const string dbPassword = "<password>";
await using var driver = GraphDatabase.Driver(
dbUri,
AuthTokens.Basic(dbUser, dbPassword));
var rxSession = driver.RxSession(SessionConfigBuilder.ForDatabase("neo4j"));
var observable = rxSession.ExecuteRead(
tx => {
return tx
.Run("UNWIND range (1, 5) AS x RETURN x")
.Records();
}
);
var tcs = new TaskCompletionSource();
observable.Subscribe(
i => {
Console.WriteLine(i.Get<string>("x"));
},
ex => {
Console.WriteLine($"Error Occurred: {ex.Message}");
tcs.SetResult();
},
() => {
Console.WriteLine("Finished receiving records");
tcs.SetResult();
}
);
await tcs.Task;
Implicit transaction with reactive sessions
The following example is very similar to the previous one, except it uses an implicit transaction.
.run()
exampleusing Neo4j.Driver;
const string dbUri = "<database-uri>";
const string dbUser = "<username>";
const string dbPassword = "<password>";
await using var driver = GraphDatabase.Driver(
dbUri,
AuthTokens.Basic(dbUser, dbPassword));
var rxSession = driver.RxSession(SessionConfigBuilder.ForDatabase("neo4j"));
var observable = rxSession
.Run("UNWIND range (1, 5) AS x RETURN x")
.Records();
var tcs = new TaskCompletionSource();
observable.Subscribe(
i => {
Console.WriteLine(i.Get<string>("x"));
},
ex => {
Console.WriteLine($"Error Occurred: {ex.Message}");
tcs.SetResult();
},
() => {
Console.WriteLine("Finished receiving records");
tcs.SetResult();
}
);
await tcs.Task;
Always defer session creation
Remember that in reactive programming a Publisher doesn’t come to life until a Subscriber attaches to it. A Publisher is just an abstract description of your asynchronous process, but it’s only the act of subscribing that triggers the flow of data in the whole chain.
For this reason, always be mindful to make session creation/destruction part of this chain, and not to create sessions separately from the query Publisher chain. Doing so may result in many open sessions, none doing work and all waiting for a Publisher to use them, potentially exhausting the number of available sessions for your application.
Glossary
- LTS
-
A Long Term Support release is one guaranteed to be supported for a number of years. Neo4j 4.4 is LTS, and Neo4j 5 will also have an LTS version.
- Aura
-
Aura is Neo4j’s fully managed cloud service. It comes with both free and paid plans.
- Cypher
-
Cypher is Neo4j’s graph query language that lets you retrieve data from the database. It is like SQL, but for graphs.
- APOC
-
Awesome Procedures On Cypher (APOC) is a library of (many) functions that can not be easily expressed in Cypher itself.
- Bolt
-
Bolt is the protocol used for interaction between Neo4j instances and drivers. It listens on port 7687 by default.
- ACID
-
Atomicity, Consistency, Isolation, Durability (ACID) are properties guaranteeing that database transactions are processed reliably. An ACID-compliant DBMS ensures that the data in the database remains accurate and consistent despite failures.
- eventual consistency
-
A database is eventually consistent if it provides the guarantee that all cluster members will, at some point in time, store the latest version of the data.
- causal consistency
-
A database is causally consistent if read and write queries are seen by every member of the cluster in the same order. This is stronger than eventual consistency.
- NULL
-
The null marker is not a type but a placeholder for absence of value. For more information, see Cypher → Working with
null
. - transaction
-
A transaction is a unit of work that is either committed in its entirety or rolled back on failure. An example is a bank transfer: it involves multiple steps, but they must all succeed or be reverted, to avoid money being subtracted from one account but not added to the other.
- backpressure
-
Backpressure is a force opposing the flow of data. It ensures that the client is not being overwhelmed by data faster than it can handle.
- transaction function
-
A transaction function is a callback executed by an
.ExecuteReadAsync()
or.ExecuteWriteAsync()
call. The driver automatically re-executes the callback in case of server failure. - IDriver
-
A
IDriver
object holds the details required to establish connections with a Neo4j database.