Monday, May 31, 2010

Real World Ruby and Cassandra

Introduction

At OtherInbox I recently built a QA system using the Cassandra datastore. I really like this technology and so far I would recommend it, but the learning curve for Rubyists is still pretty high. There are some good examples online (especially the canonical article by Evan Weaver) but nothing showing more intermediate, real-world usage. Hence, this article.

The system requires us to log millions of events per day. I could have built it using a traditional relational database like MySQL (which we use for the main application), but these factors led me to consider a NoSQL database:
  • We're only interested in large patterns in data, so we don't need 100% ACID assurance that every single write will succeed. The system would be useful to us even if it only caught 80% of the events.
  • Since we perform these actions millions of times per day, write speed is the prime consideration.
  • The QA reports are generated offline, once per day. We don't mind if reads happen more slowly, or if we need to do some extra programming to build reports because we can't use SQL.
  • The shear volume of events made me less excited about punishing a MySQL table. We already do a lot of extra work to keep MySQL healthy performing OtherInbox's main functions via sharding.
  • I was curious to see how a schema-less datastore would change the way I solved programming problems.
I will assume you have read Evan's article as well as the very useful 'WTF is a supercolumn?'. You may also want to read through the Twissandra Python code as well as the tests for Evan's cassandra gem.

I've been playing with the technology only for a few months so I'm sure I'll need to correct some parts of this article as I learn more - please comment if something is unclear or incorrect.

It's Sorta Like an Ordered Multi-Dimensional Hash

Rubyists can think of Cassandra like a hash of ordered hashes, or a hash of ordered hashes of ordered hashes, requiring up-front planning to use. You don't have to specify your schema, but you do need to tell Cassandra how your keys and columns will be organized. That affects how the data is stored on disk and how you'll read the data later.

Since the columns are stored in sorted order, Cassandra can answer queries very quickly (which is why it's in use at sites like Facebook and Digg). I had to change the way I built keys and column names several times before I got it right. Anytime you change how the data is stored on disk you need to restart Cassandra.

Columns and ColumnFamilies

ColumFamilies store a set of columns (which you can think of as key-value pairs) partitioned by a row key. The column names can be arbitrary strings, long integers, or UUIDs; at start time you have to tell Cassandra how to sort the column names but beyond that you have complete freedom to create column names that will be useful to you.

 If each row has the same data, you might think of it like this:

{ user_id => {'email'=>'sarah@example.com', 'last_name'=>'Jones' }}

Where user_id is the row key (which Cassandra hashes and uses to determine which nodes should store the columns for this piece of data), and 'email', and 'last_name' are column names. Using the gem your code would look like:

@cassandra.insert(:Users,user_id,{ 'email'=>'sarah@example.com', 'last_name' => 'Jones'})
@cassandra.get(:Users,user_id)

But you can also store useful data in the column names. This is useful when there are many columns and you want to be able to select a particular range of columns. For the QA system, we page through a large range of columns within each key, and assigning smart column names helps this go faster. The data might look like this:

{ user_id => { UUID => 'Hey Sarah here's a question for you..', UUID2 => 'When are we going to meet up?' }}
@cassandra.insert(:Users,user_id, { UUID.new => 'Hey Sarah here's a question for you..'})
@cassandra.insert(:Users,user_id, { UUID.new => 'When are we going to meet up?' })

In this case we are storing messages for a particular user, and we're using unique identifiers for columns that we can query later in ranges. If your data has a temporal component you might use time-based UUIDs (where the most significant bits are a timestamp and the less significant bits are entropy) so that you query only columns that fall within a particular range of times.

You do need to tell Cassandra how your column names should be sorted on disk, which happens in the configuration file for each ColumnFamily:

<keyspaces>
  <keyspace name="OtherInbox">
    <columnfamily comparewith="LexicalUUIDType" name="Users">
  </keyspace>
</keyspaces>

In the first example, I'd use "LongType" since user_id is probably an integer. In the second example I'd use "LexicalUUIDType", as shown, or "TimeUUIDType".

SuperColumnFamilies

For more structured, nested data, you should consider using a SuperColumnFamily, which let you store columns of columns. Examples:

{ user_id => { 'details' => { 'email' => 'sarah@example.com', 'last_name' => 'Jones'}, 
                    'preferences' => { 'expert_controls' => 'true' }}}
@cassandra.insert(:Users,user_id,  { 'details' => { 'email' => 'sarah@example.com', 'last_name' => 'Jones'}, 
                    'preferences' => { 'expert_controls' => 'true' }}})
@cassandra.get(:Users,user_id,'details')
@cassandra.get(:Users,user_id,'expert_controls')

'details' and 'preferences' are super columns containing columns 'email', 'last_name', and 'expert_controls'. Just as with regular column families, you can encode arbitrary data in the column names, or just set them to UUIDs. When you define a SuperColumnFamily, you tell Cassandra how to sort and store the column names and the subcolumn names:

<columnfamily columntype="Super" comparesubcolumnswith="UTF8Type" comparewith="LongType" name="Users">

One key consideration: as of this writing the current version of Cassandra (0.6.2) does not do any indexing of the subcolumns, which means when you load a supercolumn, all of its subcolumns are loaded into memory. If you expect to have more than a few thousand subcolumns, you would be better off using a regular column family, and overloading the row keys and columnnames with your nested data. In our example, your column names could be something like "sarah@example.com/Jones/true", and it would be up to you to split the data on retrieval.

There is an open ticket to address this in a future release.

Key Names vs. Column Names

For the QA system, everything we keep track of is associated with a timestamp. The most natural partitioning of the data seems to be by day, hour, and whether we synced the message or not. The reporting system runs once per day, iterating over each hour and each synced state for the previous day. This gives us rows that are small enough for Cassandra to easily distribute across nodes without loading up too many columns in any one row. Our keys look like this:

key = "#{time.strftime("%Y-%m-%d-%H")}*#{is_synced}"

Since I only need to track 4 or 5 properties about each sync/nosync decision, I decided to use supercolumns. Actually, I first used the composite column approach described above, but I found supercolumns made for better-looking, slightly more-efficient code. The columns looks like this:
{ 'example.com*6f1ed002ab5595859014ebf0951522d9' => { 'from_address' => 'marketing@example.com', 'is_system_merchant' => true }}

Each supercolumn name is a composite of the domain name of the message we examined and an MD5 hash of the message header. This ensures we don't store duplicate records if the same message gets processed twice. It also means I can drill down on specific senders in the future if needed by using range queries with partial column names, as shown next.

Range Queries

I don't know the optimal number of columns that Cassandra can serve up in one request, but in our system one row (meaning one hour's worth of sync and nonsync events) could comprise tens of thousands of columns, more than we would want to request at once. But since the columns are stored in sorted order, it's easy to fetch them with a range query. Here is a super simplified version of what we do:
  • I divide up the previous day into 48 keys (synced events for each hour and nonsynced events for each hour).
  • I then thread these requests, 24 at a time. According to the docs, "a good rule of thumb is 2 concurrent reads per processor core", so 3 machines times four cores times 2 reads per core = 24 concurrent reads. Each node has its ConcurrentRead property set to 8. I may not be doing the math correctly so feel free to chime in with a correcting comment.
  • Each thread executes the following (simplified) code:


    start = ''
        loop do
          # count is completely arbitrary, need to experiment with what's best
          columns = cass.get(:MessageSyncing,key,:start => start,:count => 2500)
          break if columns.empty?
    
          columns.each do |column_name,column_values|
            fqdn = column_name.split(/\//)[0]
            from_address = column_values['from_address']
            is_system_merchant = column_values['is_system_merchant']
            # increment counters/manipulate data here
          end
    
          start = columns.keys.last.succ!
        end

    This code uses a range query to page through all the columns within one row. Since the columns are stored by UTF8Type, I can just increment the key and know that I'll get the next chunk of columns. You can also query with partial range keys, so that if I wanted to see all of the data for a domain, I could range query with the column start as "example.com*". I also have some code that aggregates the results of those 48 queries.

    I also have some pretty complicated code that collates the resuls of those 48 threads, which are themselves composites of all the range queries I ran within each row. I realized after writing it that I had essential re-implemented my own half-assed map-reduce.

    Happily, while I was implementing this code, Cassandra 0.6 came out, which includes built-in support for Hadoop. Cassandra has a Pig load function, so it should eventually be possible to replace the above code and my half-assed map-reduce with something much more elegant, maybe just a few lines of Pig. For now, this works great. Of course you don't need any of this if you aren't using your datastore for reporting.

    Notes on EC2

    I don't have enough experience yet to recommend whether you should use EC2 or not. I originally built this to use one xlarge instance, but I found that it could not keep up with the network load. There were a lot of timeouts from the nodes reporting to Cassandra. As soon as I split it into three smaller Cassandra nodes, the timeouts went way down. It might even make more sense to split into six small nodes.

    Each node has two EBS volumes, a smaller one for the commit log and a large one for the data. The commit log is append-only and is used to replay writes in case Cassandra crashes before the data in memory can be written to the data disk. Keeping them separate improves throughput so one operation doesn't block the other. It might make more sense to use an ephemeral store for the commitlog; I haven't had time to explore.

    I definitely recommend following the recommendation in the documentation: use at least three nodes in production.

    Notes on Adding Nodes

    Adding each node was easy, and that's one of Cassandra's key features. All you have to do is tell the new node the address of at least one other node and set its AutoBootstrap value to true.

    The only problem I had was that the first server was getting hammered so hard by all these requests, many of which were timing-out, that it took awhile for the second node to complete the gossiping with the first node to start bootstrapping.

    CassandraObject


    Michael Koziarski wrote a cool ActiveModel interface to Cassandra called CassandraObject which I haven't played with yet, but offers a higher level abstraction for accessing data beyond just "a hash of hashes".  He's presenting it at RailsConf this year and I'll definitely be in attendance for that talk.


    Further Reading

    I found these articles/sites particularly helpful:


Incidentally, Stackoverflow is starting to become the site I go to first when I'm searching for the solution to a technical question.  Google searches return at least 50% garbage or duplicate mailing list content for a lot of technical topics.  That makes me more interested in the future of niche/vertical search engines. It is definitely possible to out-Google Google within a niche.

    8 comments:

    David Balatero said...

    Cool article. Have you considered using throwing your data at Splunk, which also supports distributed setups, and has a bunch of tools that let you query the data and build out reports?

    I'm currently looking into Splunk to do heavy log crunching from my Rails app, and it's looking pretty nice. Less raw coding, more results?

    Mike Subelsky said...

    Hi David,

    Thanks! That's not a bad alternative to consider for this situation, except you have to pay for Splunk in a distributed setup or if you have > 500 MB. We also use Cassandra for a couple of other things (more secret sauce than this, but comparable use cases) so that's why I was favoring this approach.

    -Mike

    dboek said...

    Thanks for your long post and sharing your experiences with Cassandra.
    In your example a key and later a column name consists of a domain name and a timestamp. Is it possible to tell Cassandra to get alle keys starting with the domain name? Similar to the Like statement in MySQL?

    - Daniel

    Mike Subelsky said...

    dboek, yes, you can do range slices based on partial keys. This works very efficiently if you are using the Order Preserving Partitioner (since Cassandra can just go around the ring one node at a time finding the keys). If you have the Random Partitioner it should still work but more slowly.

    Matthew McEachen said...

    Thanks for taking the time to write this down!

    Mark D. Blackwell said...
    This comment has been removed by the author.
    Mark D. Blackwell said...
    This comment has been removed by the author.
    Mark D. Blackwell said...

    The canonical Evan Weaver article, 'Up and running with cassandra' (to which you link in your first paragraph, as well as in your 'further reading' section) has moved to http://blog.evanweaver.com/2009/07/06/up-and-running-with-cassandra/

    Arin Sarkissian's blog post, 'WTF is a SuperColumn? An Intro to the Cassandra Data Model' (to which you link as 'WTF is a SuperColumn?' in your third paragraph, as well as in your 'further reading' section) has moved to: http://arin.me/post/40054651676/wtf-is-a-supercolumn-cassandra-data-model

    Eric Evans' article, 'Cassandra By Example'
    (to which you link in your 'further reading' section) has moved to http://www.rackspace.com/blog/cassandra-by-example/

    The article, '4 Months with Cassandra, a love story' by Team Cloudkick (to which you link as 'Cloudkick Use Case' in your 'further reading' section) has been reposted by Chris Adams at http://chris.improbable.org/2010/3/2/4-months-with-cassandra-a-love-story/