Certain RethinkDB drivers support asynchronous connections by integrating with popular async libraries. This is particularly useful with changefeeds and other real-time applications.
Due to its event-driven nature, JavaScript can easily execute RethinkDB queries in an asynchronous fashion. The official RethinkDB drivers currently support integration with EventMachine for Ruby, and Tornado and Twisted for Python.
No special procedures or commands are necessary to execute RethinkDB queries asynchronously in JavaScript. Read about using callbacks and promises with RethinkDB in the documentation for the run command.
In addition, RethinkDB’s cursors and feeds implement an EventEmitter interface compatible with Node’s. This allows your application to set up listeners to receive data from queries as the data becomes available.
The RethinkDB Ruby driver adds a new ReQL command, em_run, designed to work with EventMachine. In addition, it provides a superclass, RethinkDB::Handler
, with event-specific methods (e.g., on_open
, on_close
) that may be overridden by a class your application defines and passes to em_run
.
The easiest way to use RethinkDB with EventMachine is simply by passing a block to em_run
. If RethinkDB returns a sequence (including a stream), the block will be called once with each element of the sequence. Otherwise, the block will be called just once with the returned value.
Example: Iterate over a stream
require 'eventmachine' require 'rethinkdb' include RethinkDB::Shortcuts conn = r.connect(host: 'localhost', port: 28015) EventMachine.run { r.table('test').order_by(:index => 'id').em_run(conn) { |row| # do something with returned row data p row } }
The em_run
command returns a QueryHandle
instance. The QueryHandle
will be closed when all results have been received, or when EventMachine stops running. You can explicitly close it with the close
method.
EventMachine.run { printed = 0 handle = r.table('test').order_by(:index => 'id').em_run(conn) { |row| printed += 1 if printed > 3 handle.close else p row end } }
In the form above—with a block that accepts a single argument—RethinkDB’s EventMachine adapter will throw errors back up to your application for you to handle in the same fashion as you would using RethinkDB without EventMachine. If the table test
did not exist in the database above, you would receive the standard ReqlRunTimeError
:
RethinkDB::ReqlRunTimeError: Table `test.test` does not exist. Backtrace: r.table('test') ^^^^^^^^^^^^^^^
You can also choose to receive errors in the block by accepting two arguments.
EventMachine.run { r.table('test').order_by(:index => 'id').em_run(conn) { |err, row| if err p [:err, err.to_s] else p [:row, row] end } }
In this form, the block will receive nil
as the first argument if there is no error. In the case of an error, the second argument will be nil
.
To gain more precise control, write a class that inherits from RethinkDB::Handler
and override the event handling methods, then pass an instance of that class to em_run
.
Example: Iterate over a stream using a handler
require 'eventmachine' require 'rethinkdb' include RethinkDB::Shortcuts conn = r.connect(host: 'localhost', port: 28015) class Printer < RethinkDB::Handler def on_open p :open end def on_close p :closed end def on_error(err) p [:err, err.to_s] end def on_val(val) p [:val, val] end end EventMachine.run { r.table('test').order_by(:index => 'id').em_run(conn, Printer) } # Sample output :open [:val, {"id"=>1}] [:val, {"id"=>2}] [:val, {"id"=>3}] :closed
In addition to the simple on_val
method, you can provide methods that specifically apply to arrays, streams and atoms.
class Printer < RethinkDB::Handler def on_open p :open end def on_close p :closed end def on_error(err) p [:err, err.to_s] end # Handle arrays def on_array(array) p [:array, array] end # Handle atoms def on_atom(atom) p [:atom, atom] end # Handle individual values received from streams def on_stream_val(val) p [:stream_val, val] end def on_val(val) p [:val, val] end end EventMachine.run { r.table('test').order_by(:index => 'id').em_run(conn, Printer) # print an array r.expr([1, 2, 3]).em_run(conn, Printer) # print a single row r.table('test').get(1).em_run(conn, Printer) } # Sample output :open [:stream_val, {"id"=>0}] [:stream_val, {"id"=>1}] [:stream_val, {"id"=>2}] :closed :open [:array, [1, 2, 3]] :closed :open [:atom, {"id"=>0}] :closed
The various on_*
methods provide fallbacks for one another:
on_array
if defined; otherwise it will be handled by on_atom
. If neither of those are defined, the individual elements of the array will be handled by on_stream_val
or, if that is not defined, on_val
.on_stream_val
if defined; otherwise it will be handled by on_val
.on_atom
if defined; otherwise it will be handled by on_val
.Thus, on_val
acts a “catch-all” for any data that is not handled by a more specific method.
The order in which callbacks are called in the EventMachine.run
block is not guaranteed; in the sample output above, [:array, [1, 2, 3]]
might have printed first.
A changefeed is handled like any other stream; when you pass a block to em_run
, the block is called with each document received on the feed. If you pass a Handler
that defines on_stream_val
(or on_val
), those methods will be called with each document.
In addition, there are changefeed-specific methods that may be defined.
on_initial_val
: if the changefeed returns initial values (include_initial
has been specified as an option to changes, those values will be passed to this method.on_uninitial_val
: a changefeed that returns initial values may also return “uninitial” values to indicate a document already sent as an initial value has been changed (see the changes
documentation for details); those values, if any, will be passed to this method.on_change
: changes will be passed to this method.on_change_error
: if the feed includes a document specifying errors that do not cause the feed to abort (for instance, a notification the server discarded some changes), those errors will be passed to this method.on_state
: a feed may include documents specifying the state of the stream; those documents will be passed to this function if defined.class FeedPrinter < RethinkDB::Handler def on_open p :open end def on_close p :closed end def on_error(err) p [:err, err.to_s] end def on_initial_val(val) p [:initial, val] end def on_state(state) p [:state, state] end def on_change(old, new) p [:change, old, new] end end # Subscribe to changes on the documents with the two lowest ids EventMachine.run { r.table('test').order_by(:index => 'id').limit(2).changes .em_run(conn, FeedPrinter) } # Sample output :open [:state, "initializing"] [:initial_val, {"id"=>1}] [:initial_val, {"id"=>0}] [:state, "ready"] # Execute: r.table('test').insert({id: 0.5}).run(conn) [:change, {"id"=>1}, {"id"=>0.5}] # Execute: r.table_drop('test').run(conn) [:err, "Changefeed aborted (table unavailable).\nBacktrace..."] :closed
You can register multiple queries with the same Handler
instance. If you define Handler
methods with an additional argument (two arguments instead of one, or one argument instead of zero), that argument will receive the appropriate QueryHandle
instance.
class MultiQueryPrinter < RethinkDB::Handler def on_open(qh) p [:open, names[qh]] end def on_close(qh) p [:close, names[qh]] EventMachine.stop if @closed == 2 end def on_val(val, qh) p [:val, val, names[qh]] end end EventMachine.run { printer = Printer.new h1 = r.expr(1).em_run(conn, printer) h2 = r.expr(2).em_run(conn, printer) names = { h1 => "h1", h2 => "h2" } } # Sample output [:open, "h1"] [:val, 1, "h1"] [:close, "h1"] [:open, "h2"] [:val, 2, "h2"] [:close, "h2"]
If you call the stop
method on a Handler
, it will stop processing changes and open streams using that handler will be closed. Queries registered with that handler instance will not be interrupted if they are currently processing (e.g., a batch write), but will close rather than executing after the handler has been stopped.
Example: Print the first five changes to a table. After the handler has been stopped, the changefeed query will be closed on the next change to the table rather than returning a value.
class FeedPrinter < RethinkDB::Handler def initialize(max) @counter = max stop if @counter <= 0 end def on_open # Once the changefeed is open, insert 10 rows r.table('test').insert([{}] * 10).run(conn, noreply: true) end def on_val(val) # Every time we print a change, decrement @counter and stop if we hit 0 p val @counter -= 1 stop if @counter <= 0 end end EventMachine.run { r.table('test').changes.em_run(conn, Printer.new(5)) }
The RethinkDB Python driver integrates with both the Tornado web framework and the Twisted networking engine. By using the set_loop_type command, you can select either the 'tornado'
or 'twisted'
event loop model, returning Tornado Future
objects or Twisted Deferred
objects respectively.
Before connect
, use the set_loop_type("tornado")
command to set RethinkDB to use asynchronous event loops compatible with Tornado.
import rethinkdb as r from tornado import ioloop, gen from tornado.concurrent import Future, chain_future import functools r.set_loop_type("tornado") connection = r.connect(host='localhost', port=28015)
After executing set_loop_type
, r.connect
will return a Tornado Future
, as will r.run
.
Example: Simple use
@gen.coroutine def single_row(connection_future): # Wait for the connection to be ready connection = yield connection_future # Insert some data yield r.table('test').insert([{"id": 0}, {"id": 1}, {"id": 2}]).run(connection) # Print the first row in the table row = yield r.table('test').get(0).run(connection) print(row) # Output {u'id': 0}
Example: Using a cursor
@gen.coroutine def use_cursor(connection_future): # Wait for the connection to be ready connection = yield connection_future # Insert some data yield r.table('test').insert([{"id": 0}, {"id": 1}, {"id": 2}]).run(connection) # Print every row in the table. cursor = yield r.table('test').order_by(index="id").run(connection) while (yield cursor.fetch_next()): item = yield cursor.next() print(item) # Output {u'id': 0} {u'id': 1} {u'id': 2}
Note that looping over a cursor must be done with while
and fetch_next
, rather than using a for x in cursor
loop.
If an error occurs during an asynchronous operation, the yield
statement will throw an exception as normal. This may happen immediately (for example, you might reference a table that doesn’t exist), but your application might receive large amounts of data before the error (for example, your network might be disrupted after the connection is established).
One error in particular is notable. If you have a coroutine set to consume a changefeed indefinitely, and the connection closes, the coroutine will experience a ReqlRuntimeError
.
Example: Re-thrown errors
@gen.coroutine def bad_table(connection): yield r.table('non_existent').run(connection) Traceback (most recent call last): ... elided ... rethinkdb.errors.ReqlRuntimeError: Table `test.non_existent` does not exist. in: r.table('non_existent') ^^^^^^^^^^^^^^^^^^^^^^^
Example: Catching errors in the coroutine
@gen.coroutine def catch_bad_table(connection): try: yield r.table('non_existent').run(connection) except r.ReqlRuntimeError: print("Saw error") # Output Saw error
The asynchronous database API allows you to handle multiple changefeeds simultaneously by scheduling background coroutines. As an example, consider this changefeed handler:
@gen.coroutine def print_cfeed_data(connection_future, table): connection = yield connection_future feed = yield r.table(table).changes().run(connection) while (yield feed.fetch_next()): item = yield feed.next() print(item)
We can schedule it on the Tornado IO loop with this code:
ioloop.IOLoop.current().add_callback(print_cfeed_data, connection, table)
Now the coroutine will run in the background, printing out changes. When we alter the table, the changes will be noticed.
Now, consider a larger example.
class ChangefeedNoticer(object): def __init__(self, connection): self._connection = connection self._sentinel = object() self._cancel_future = Future() @gen.coroutine def print_cfeed_data(self, table): feed = yield r.table(table).changes().run(self._connection) self._feeds_ready[table].set_result(True) while (yield feed.fetch_next()): cursor = feed.next() chain_future(self._cancel_future, cursor) item = yield cursor if item is self._sentinel: return print("Seen on table %s: %s" % (table, item)) @gen.coroutine def table_write(self, table): for i in range(10): yield r.table(table).insert({'id': i}).run(self._connection) @gen.coroutine def exercise_changefeeds(self): self._feeds_ready = {'a': Future(), 'b': Future()} loop = ioloop.IOLoop.current() loop.add_callback(self.print_cfeed_data, 'a') loop.add_callback(self.print_cfeed_data, 'b') yield self._feeds_ready yield [self.table_write('a'), self.table_write('b')] self._cancel_future.set_result(self._sentinel) @classmethod @gen.coroutine def run(cls, connection_future): connection = yield connection_future if 'a' in (yield r.table_list().run(connection)): yield r.table_drop('a').run(connection) yield r.table_create('a').run(connection) if 'b' in (yield r.table_list().run(connection)): yield r.table_drop('b').run(connection) yield r.table_create('b').run(connection) noticer = cls(connection) yield noticer.exercise_changefeeds() # Output Seen on table a: {u'old_val': None, u'new_val': {u'id': 0}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 0}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 1}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 1}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 2}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 2}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 3}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 3}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 4}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 4}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 5}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 6}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 5}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 7}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 6}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 8}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 7}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 9}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 8}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 9}}
Here, we listen for changes on multiple tables at once. We simultaneously write into the tables, and observe our writes appear in the changefeeds. We then cancel the changefeeds after we’ve written 10 items into each of the tables.
Before connect
, use the set_loop_type("twisted")
command to set RethinkDB to use asynchronous event loops compatible with the Twisted reactor.
import rethinkdb as r from twisted.internet import reactor, defer from twisted.internet.defer import inlineCallbacks, returnValue r.set_loop_type('twisted') connection = r.connect(host='localhost', port=28015)
After executing set_loop_type
, r.connect
will return a Twisted Deferred
, as will r.run
.
Example: Simple use
@inlineCallbacks def single_row(conn_deferred): # Wait for the connection to be ready conn = yield conn_deferred # Insert some data yield r.table('test').insert([{"id": 0}, {"id": 1}, {"id": 2}]).run(conn) # Print the first row in the table row = yield r.table('test').get(0).run(conn) print(row) # Output {u'id': 0}
Example: Using a cursor
@inlineCallbacks def use_cursor(conn): # Insert some data yield r.table('test').insert([{"id": 0}, {"id": 1}, {"id": 2}]).run(conn) # Print every row in the table. cursor = yield r.table('test').order_by(index="id").run(conn) while (yield cursor.fetch_next()): item = yield cursor.next() print(item) # Output: {u'id': 0} {u'id': 1} {u'id': 2}
Note that looping over a cursor must be done with while
and fetch_next
, rather than using a for x in cursor
loop.
If an error occurs during an asynchronous operation, the yield
statement will throw an exception as normal. This may happen immediately (for example, you might reference a table that doesn’t exist), but your application might receive large amounts of data before the error (for example, your network might be disrupted after the connection is established).
One error in particular is notable. If you have a task that consumes a changefeed indefinitely, and the connection closes, the task will experience a ReqlRuntimeError
.
Example: Re-thrown errors
@inlineCallbacks def bad_table(conn): yield r.table('non_existent').run(conn) Unhandled error in Deferred: Traceback (most recent call last): Failure: rethinkdb.errors.ReqlOpFailedError: Table `test.non_existent` does not exist in: r.table('non_existent') ^^^^^^^^^^^^^^^^^^^^^^^
Example: Catching runtime errors
@inlineCallbacks def catch_bad_table(conn): try: yield r.table('non_existent').run(conn) except r.ReqlRuntimeError: print("Saw error") # Output Saw error
The asynchronous database API allows you to handle multiple changefeeds simultaneously by running multiple background tasks. As an example, consider this changefeed handler:
@inlineCallbacks def print_feed(conn, table): feed = yield r.table(table).changes().run(conn) while (yield feed.fetch_next()): item = yield feed.next() print("Seen on table %s: %s" % (table, str(item)))
We can schedule it on the Twisted reactor
with this code:
reactor.callLater(0, print_cfeed_data, conn, table)
Now the task will run in the background, printing out changes. When we alter the table, the changes will be noticed.
Now consider a larger example:
@inlineCallbacks def print_feed(conn, table, ready, cancel): def errback_feed(feed, err): feed.close() return err feed = yield r.table(table).changes().run(conn) cancel.addErrback(lambda err: errback_feed(feed, err)) ready.callback(None) while (yield feed.fetch_next()): item = yield feed.next() print("Seen on table %s: %s" % (table, str(item))) @inlineCallbacks def table_write(conn, table): for i in range(10): yield r.table(table).insert({'id': i}).run(conn) @inlineCallbacks def notice_changes(conn, *tables): # Reset the state of the tables on the server if len(tables) > 0: table_list = yield r.table_list().run(conn) yield defer.DeferredList([r.table_drop(t).run(conn) for t in tables if t in table_list]) yield defer.DeferredList([r.table_create(t).run(conn) for t in tables]) readies = [defer.Deferred() for t in tables] cancel = defer.Deferred() feeds = [print_feed(conn, table, ready, cancel) for table, ready in zip(tables, readies)] # Wait for the feeds to become ready yield defer.gatherResults(readies) yield defer.gatherResults([table_write(conn, table) for table in tables]) # Cancel the feeds and wait for them to exit cancel.addErrback(lambda err: None) cancel.cancel() yield defer.DeferredList(feeds) yield notice_changes(conn, 'a', 'b') # Output Seen on table b: {u'old_val': None, u'new_val': {u'id': 0}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 0}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 1}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 1}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 2}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 2}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 3}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 3}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 4}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 4}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 5}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 5}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 6}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 6}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 7}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 7}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 8}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 8}} Seen on table a: {u'old_val': None, u'new_val': {u'id': 9}} Seen on table b: {u'old_val': None, u'new_val': {u'id': 9}}
Here, we listen for changes on multiple tables at once. We simultaneously write into the tables, and observe our writes appear in the changefeeds. We then cancel the changefeeds after we’ve written 10 items into each of the tables.
© RethinkDB contributors
Licensed under the Creative Commons Attribution-ShareAlike 3.0 Unported License.
https://rethinkdb.com/docs/async-connections/