AWS::S3 is not threadsafe. Hell, it’s not even reusable; most methods go through a class constant. To use it in threaded code, it’s necessary to isolate S3 operations in memory. Fork to the rescue!
def s3(key, data, bucket, opts) begin fork_to do AWS::S3::Base.establish_connection!( :access_key_id => KEY, :secret_access_key => SECRET ) AWS::S3::S3Object.store key, data, bucket, opts end rescue Timeout::Error raise SubprocessTimedOut end end def fork_to(timeout = 4) r, w, pid = nil, nil, nil begin # Open pipe r, w = IO.pipe # Start subprocess pid = fork do # Child begin r.close val = begin Timeout.timeout(timeout) do # Run block yield end rescue Exception => e e end w.write Marshal.dump val w.close ensure # YOU SHALL NOT PASS # Skip at_exit handlers. exit! end end # Parent w.close Timeout.timeout(timeout) do # Read value from pipe begin val = Marshal.load r.read rescue ArgumentError => e # Marshal data too short # Subprocess likely exited without writing. raise Timeout::Error end # Return or raise value from subprocess. case val when Exception raise val else return val end end ensure if pid Process.kill "TERM", pid rescue nil Process.kill "KILL", pid rescue nil Process.waitpid pid rescue nil end r.close rescue nil w.close rescue nil end end
There’s a lot of bookkeeping here. In a nutshell we’re forking and running a given block in a forked subprocess. The result of that operation is returned to the parent by a pipe. The rest is just timeouts and process accounting. Subprocesses have a tendency to get tied up, leaving dangling pipes or zombies floating around. I know there are weak points and race conditions here, but with robust retry code this approach is suitable for production.
Using this approach, I can typically keep ~8 S3 uploads running concurrently (on a fairly busy 6-core HT Nehalem) and obtain ~sixfold throughput compared to locking S3 operations with a mutex.
Post a Comment