NoBrainer

A Ruby ORM for RethinkDB

Real Time Tutorial

The following showcases an example of using RethinkDB, NoBrainer, EventMachine, Goliath and Ruby fibers to demonstrate RethinkDB real-time features through a simple HTTP interface.

Our application contains 3 files: Gemfile, init.rb and app.rb, which respectively specify the gems we’ll be using, the application configuration, and the application logic.

Gemfile

source 'https://rubygems.org'
gem 'goliath', '~> 1.0.4'
gem 'nobrainer', '~> 0.29.0'

init.rb

# First we load our gems.
require 'bundler'
Bundler.require

# Then we configure NoBrainer.
NoBrainer.configure do |config|
  config.app_name = "goliath"
  config.environment = Goliath.env
  config.driver = :em
  config.logger = Logger.new(STDERR).tap { |log| log.level = Logger::DEBUG }
end

# Next, we define a helper method `stream(&block)' that immediately returns
# HTTP headers to the client, and schedule the passed blocked to be ran in a
# Fiber at the next EventMachine tick. Because we must not let any exceptions
# bubble up from the fiber to prevent killing the EventMachine loop, we catch
# them and handle them accordingly in `guard_async_response()'.

module StreamFiber
  def stream(env, &block)
    EM.next_tick { Fiber.new { guard_async_response(env, &block) }.resume }
    chunked_streaming_response
  end

  def guard_async_response(env, &block)
    block.call(env)
  rescue Exception => e
    begin
      msg = {:error => "#{e.class}: #{e.message.split("\n").first}"}
      STDERR.puts msg
      env.chunked_stream_send("#{msg.to_json}\n")
    rescue
    end
  ensure
    env.chunked_stream_close
  end
end

# Then, we introduce a helper to cancel outstanding requests made to the
# database when an HTTP client disconnect.
# `bind_cursor_to_connection(env, cursor)' binds a NoBrainer cursor to an
# HTTP connection, meaning that when the connection gets closed through the
# `on_close()' Goliath callback, `close()' is called on all registered
# cursors.

module BindCursor
  def bind_cursor_to_connection(env, cursor)
    if env['connection_closed']
      cursor.close
    else
      env['cursors'] ||= []
      env['cursors'] << cursor
    end
  end

  def on_close(env)
    env['connection_closed'] = true
    env['cursors'].to_a.each(&:close)
  end
end

app.rb

require './init'

# We define a simple Item model with two fields: an SKU with a uniqueness
# constraint, and a name.

class Item
  include NoBrainer::Document
  field :sku,  :type => String, :required => true, :uniq => true
  field :name, :type => String
end

# We define our Goliath application which responds to the /upsert and /changes
# endpoints. /upsert simply upserts Items (insert if not found, update if
# found), and /changes opens a firehose emitting changes on the items table.

class App < Goliath::API
  use Goliath::Rack::Params
  include StreamFiber
  include BindCursor

  def upsert(env)
    item = Item.upsert!(env['params'])
    [200, {}, item.to_json]
  end

  def changes(env)
    stream(env) do
      Item.where(env['params']).raw.changes(:include_states => true)
        .tap { |cursor| bind_cursor_to_connection(env, cursor) }
        .each { |changes| env.chunked_stream_send("#{changes.to_json}\n") }
    end
  end

  def response(env)
    case [env['REQUEST_METHOD'].downcase.to_sym, env['PATH_INFO']]
      when [:post, '/upsert']  then upsert(env)
      when [:get,  '/changes'] then changes(env)
      else raise Goliath::Validation::NotFoundError
    end
  end
end

Running the Example

When running the server as shown below, we can issue requests on our server.

$ ruby app.rb -sv
[28846:INFO] 2015-08-17 02:13:51 :: Starting server on 0.0.0.0:9000 in development mode. Watch out for stones.

Example 1: Creating a valid Item

$ curl -X POST localhost:9000/upsert?sku=123\&name=hello
{"id":"2J3EyCBX5JyjIX","name":"hello","sku":"123"}

Example 2: Creating an invalid Item

$ curl -X POST localhost:9000/upsert
[:error, "#<Item id: \"blah\"> is invalid: Sku can't be blank"]

Example 3: Modifying items while listening for changes

First we listen for changes:

$ curl localhost:9000/changes
{"state":"ready"}

Then we open a new shell and run:

$ curl -X POST localhost:9000/upsert?sku=456\&name=hello
{"id":"2J3K0C71Nn0RQ6","name":"hello","sku":"456"}
$ curl -X POST localhost:9000/upsert?sku=456\&name=ohai
{"id":"2J3K0C71Nn0RQ6","name":"ohai","sku":"456"}
$ curl -X POST localhost:9000/upsert?sku=456
{"id":"2J3K0C71Nn0RQ6","name":"ohai","sku":"456"}

We see on previous curl appear:

$ curl localhost:9000/changes
{"state":"ready"}
{"new_val":{"id":"2J3K0C71Nn0RQ6","name":"hello","sku":"456"},"old_val":null}
{"new_val":{"id":"2J3K0C71Nn0RQ6","name":"ohai","sku":"456"},"old_val":{"id":"2J3K0C71Nn0RQ6","name":"hello","sku":"456"}}

Example 4: Listening for changes on a specific subject

$ curl localhost:9000/changes?sku=222
$ curl -X POST localhost:9000/upsert?sku=111
$ curl -X POST localhost:9000/upsert?sku=222

We only see the changes of the second Item, not the first one.

Example 5: Running many clients

$ for i in `seq 10`; do curl -N localhost:9000/changes &; done

We see 10 times {"state":"ready"}

$ curl -X POST localhost:9000/upsert?sku=333

We see 10 times {"new_val":{"id":"2J3MDIpNLPchjS","sku":"333"},"old_val":null}.

This demonstrate that our server can handle many clients simultaneously.

Example 6: Handling connection failures

If we kill the RethinkDB server while a /changes call is in progress, we see the following:

$ curl localhost:9000/changes
{"state":"ready"}
<-- kill the RethinkDB server at this point -->
{"error":"RethinkDB::RqlDriverError: Connection closed by server."}
$ 

If we re-issue the curl command, our web server rejects immediately our request.

$ curl localhost:9000/changes
{"error":"RethinkDB::RqlRuntimeError: Connection is closed."}
$

Once we restart the RethinkDB server, we can reissue requests immediately:

$ curl localhost:9000/changes
{"state":"ready"}