Friday, July 16, 2010

Random sampling or processing data streams in Ruby

7 14 10 by BernieG10 from flickr (CC-NC-ND)

It might sound like I'm tackling a long solved problem here - sort_by{rand}[0, n] is a well known idiom, and in more recent versions of Ruby you can use even simpler shuffle[0, n] or sample(n).

They all suffer from two problems. The minor one is that quite often I want elements in the sample to be in the same relative order as in the original collection (this in no way implies sorted) - what can be dealt with by a Schwartzian transform to [index, item] space, sampling that, sorting results, and transforming out to just item.

The major problem is far worse - for any of these to work, the entire collection must be loaded to memory, and if that was possible, why even bother with random sampling? More often than not, the collection I'm interested in sampling is something disk-based that I can iterate only once with #each (or twice if I really really have to), and I'm lucky if I even know its #size in advance.

By the way - this is totally unrelated, but I really hate #length method with passion - collections have sizes, not "lengths" - for a few kinds of collections we can imagine them arranged in a neat ordered line, and so their size is also length, but it's really lame to name a method after special case instead of far more general "size" - hashtables have sizes not lengths, sets have sizes not lengths, and so on - #length should die in fire!

When size is known


So we have a collection we can only iterate once - for now let's assume we're really lucky and we know exactly how many elements it has - this isn't all that common, but it happens every now and then. As we want n elements out of size, probability of each element being included is n/size, and so select{ n > rand(size) } will nearly do the trick - even keeping samples in the right order... except it will only return approximately n elements.

If we're sampling 1000 out of a billion we might not really care all that much, but it turns out it's not so difficult to do better than that. Sampling n elements out of [first, *rest] collection neatly reduces to: [first, *rest.sample(n-1)] with n/size probability, or rest.sample(n) otherwise. Except Ruby doesn't have decent tail-call optimization, so we'll use counters for it.


module Enumerable
  def random_sample_known_size(wanted, remaining=size)
    if block_given?
      each{|it|
        if wanted > rand(remaining) 
          yield(it)
          wanted -= 1
        end
        remaining -= 1
      }
    else
      rv = []
      random_sample_known_size(wanted, remaining){|it| rv.push(it) }
      rv
    end
  end
end


This way of sampling has an extra feature that it can yield samples one at a time and never needs to store any in memory - something you might appreciate if you want to take a couple million elements out of 10 billions or so, and you will not only avoid loading them to memory, you will be able to use the results immediately, instead of only when the entire input finishes.


This is only possible if collection size is known - if we don't know if there's 1 element ahead or 100 billion, there's really no way of deciding what to put in the sample.

If you cannot fit even the sample in memory at once, and don't know collection size in advice - it might be the easiest thing to iterate twice, first to compute the size, and then to yield random records one at a time (assuming collection size doesn't change between iterations at least). CPU and sequential I/O are cheap, memory and random I/O are expensive.

Russian Blue by Adam Zdebel from flickr (CC-NC-ND)

When size is unknown

Usually we don't know collection size in advance, so we need to keep a running sample - initialize it with the first n elements, and then for each element that arrives replace a random one from the sample with probability n / size_so_far.

The first idea would be something like this:

module Enumerable
  def random_sample(wanted)
    rv = []
    size_so_far = 0
    each{|it|
      size_so_far += 1
      j = rand(size_so_far)
      rv.delete_at(j) if wanted == rv.size and wanted > j
      rv.push(it) if wanted > rv.size
    }
    rv
  end
end


It suffers from a rather annoying performance problem - we're keeping the sample in a Ruby Array, and while they're optimized for adding and removing elements at both ends, deleting something from the middle is a O(size) memmove.

We could replace rv.delete_at(j); rv.push(it) with rv[j] = it to gain performance at cost of item order in the sample... or we could do that plus Schwarzian transform into [index, item] space to get correctly ordered results fast. This only matters once sample size reaches tens of thousands, before that brute memmove is simply faster than evaluating extra Ruby code.

module Enumerable
  def random_sample(wanted)
    rv = []
    size_so_far = 0
    each{|it|
      size_so_far += 1
      j = wanted > rv.size ? rv.size : rand(size_so_far)
      rv[j] = [size_so_far, it] if wanted > j
    }
    rv.sort.map{|idx, it| it}
  end
end



This isn't what stream processing looks like!

The algorithms are as good as they'll get, but API is really not what we want. When we actually do have an iterate-once collection, we usually want to do more than just collect a sample. So let's encapsulate such continuously updated sample into Sample class:

class Sample
  def initialize(wanted)
    @wanted = wanted
    @size_so_far = 0
    @sample = []
  end
  def add(it)
    @size_so_far += 1
    j = @wanted > @sample.size ? @sample.size : rand(@size_so_far)
    @sample[j] = [@size_so_far, it] if @wanted > j
  end
  def each
    @sample.sort.each{|idx, it| yield(it)}
  end
  def total_size
    @size_so_far
  end
  include Enumerable
end

It's a fully-featured Enumerable, so it should be really easy to use. #total_size will return count of all elements seen so far - calling that #size would conflict with the usual meaning of number of times #each yields. You can even nondestructively access the sample, and then keep updating it - usually you wouldn't want that, but it might be useful for scripts that run forever and periodically save partial results.

To see how it can be used, here's a very simple script, which reads a possibly extremely long list of URLs, and prints a sample of 3 by host. By the way notice autovivification of Samples inside the Hash - it's a really useful trick, and Ruby's autovivification can do a lot more than Perl's.

require "uri"
sites = Hash.new{|ht,k| ht[k] = Sample.new(3)}
STDIN.each{|url|
  url.chomp!
  host = URI.parse(url).host rescue next
  sites[host].add(url)
}
sites.sort.each{|host, url_sample|
  puts "#{host} - #{url_sample.total_size}:"
  url_sample.each{|u| puts "* #{u}"}
}


So enjoy your massive data streams.

No comments:

Post a Comment