Wednesday, February 17, 2010

Using Ruby's Queue class to manage inter-thread communication

Bob Potter just showed me an awesome way to use Ruby's Queue class to communicate between two threads. I try to avoid multi-threaded programming as much as possible since I don't feel super confident with concurrency (at least with a language like Ruby). It can add a lot of complexity and headaches even when you know what you're doing, but there are several cases in OtherInbox where it makes a lot of sense.

In this case, I needed to log a very frequently-occurring event to SimpleDB. We don't need these logs to be 100% accurate, and the data gets processed offline for reports, so there's no need to keep the data in our main relational database. My first naive version of this code pushed a new item to SimpleDB each time the event occurred, but this ended up chewing up a lot of memory. To avoid slowing down the main process, I had been handing these one-off SimpleDB calls to an EventMachine deferred block, which caused references to all the objects I was using to generate the SimpleDB items to stick around for too long.

That's when Bob suggested I use a Queue to batch the work into chunks of 25 (the SimpleDB BatchPutAttribute limit) onto a separate standard Ruby thread. The resulting code looks like this (simplified for readability):
class Tracker
def track_event(a,b,c)
@queue ||= Queue.new
start_pushing_thread
hash = { :a => a, :b =>, :c => c }
@queue << hash
end

private

def start_pushing_thread
return if @pushing_thread
@pushing_thread = Thread.new do
items = []

loop do
# pop will block until there are queue items available
items << @queue.pop

if items.size > 24
SdbWrapper::batch_log_to_sdb(items,'tracking_domain')
items.clear
end
end
end
end

end

Once more than 24 items are in the queue, the thread calls code that we wrote to handle batch puts, then clears the array and waits for more.

We use an inter-process/inter-machine version of this paradigm all the time at OtherInbox: we make heavy use of SQS queues to handoff work from one box to another, but that wouldn't have helped here because then we'd just be introducing yet another web service dependency. So if you're in a situation where you need to keep a high volume process running quickly while handing off less important work in chunks, check out Queues!

4 comments:

cremes said...

If you look at the code for EM.defer, you would see it is essentially doing the exact same thing as your example. The difference here is that you are batching up your items and doing a single write.

Just FYI.

Mike Subelsky said...

Hi Cremes,

The first technique I tried was EM.defer, but I needed a bunch of objects to create the SimpleDB item, which were bound to the block, which slowed-down garbage collection and led to bloated memory over time. I don't think this is better than EM.defer necessarily but it worked better in this case -- most other places in the code we do use EM.defer. I also like the cleanliness of the style that Bob showed me.

Anonymous said...

if you're calling track_event on the same object from multiple threads you might want to synchronize these lines:
@queue ||= Queue.new
start_pushing_thread

Anonymous said...

shoe carnival
famous footwear