diff --git a/lib/process_shared/shared_memory.rb b/lib/process_shared/shared_memory.rb index 2a2459c..fe0cd3f 100644 --- a/lib/process_shared/shared_memory.rb +++ b/lib/process_shared/shared_memory.rb @@ -1,6 +1,7 @@ require 'process_shared/rt' require 'process_shared/libc' require 'process_shared/with_self' +require 'process_shared/shared_memory_io' module ProcessShared # Memory block shared across processes. @@ -44,7 +45,9 @@ module ProcessShared LibC::PROT_READ | LibC::PROT_WRITE, LibC::MAP_SHARED, @fd, - 0) + 0). + slice(0, size) # slice to get FFI::Pointer that knows its size + # (and thus does bounds checking) @finalize = self.class.make_finalizer(@pointer.address, @size, @fd) ObjectSpace.define_finalizer(self, @finalize) @@ -52,9 +55,47 @@ module ProcessShared super(@pointer) end + # Write the serialization of +obj+ (using Marshal.dump) to this + # shared memory object at +offset+ (in bytes). + # + # Raises IndexError if there is insufficient space. + def put_object(offset, obj) + io = SharedMemoryIO.new(self) + io.seek(offset) + Marshal.dump(obj, io) + end + + # Read the serialized object at +offset+ (in bytes) using + # Marshal.load. + # + # @return [Object] + def get_object(offset) + io = to_shm_io + io.seek(offset) + Marshal.load(io) + end + + # Equivalent to {#put_object(0, obj)} + def write_object(obj) + Marshal.dump(obj, to_shm_io) + end + + # Equivalent to {#read_object(0, obj)} + # + # @return [Object] + def read_object + Marshal.load(to_shm_io) + end + def close ObjectSpace.undefine_finalizer(self) @finalize.call end + + private + + def to_shm_io + SharedMemoryIO.new(self) + end end end diff --git a/lib/process_shared/shared_memory_io.rb b/lib/process_shared/shared_memory_io.rb new file mode 100644 index 0000000..1164575 --- /dev/null +++ b/lib/process_shared/shared_memory_io.rb @@ -0,0 +1,309 @@ +module ProcessShared + # Does some bounds checking for EOF, but assumes underlying memory + # object (FFI::Pointer) will do bounds checking, in particular the + # {#_putbytes} method relies on this. + # + # Note: an unbounded FFI::Pointer may be converted into a bounded + # pointer using +ptr.slice(0, size)+. + class SharedMemoryIO + + attr_accessor :pos + attr_reader :mem + + def initialize(mem) + @mem = mem + @pos = 0 + @closed = false # TODO: actually pay attention to this + end + + def <<(*args) + raise NotImplementedError + end + + def advise(*args) + # no-op + end + + def autoclose=(*args) + raise NotImplementedError + end + + def autoclose? + raise NotImplementedError + end + + def binmode + # no-op; always in binmode + end + + def binmode? + true + end + + def bytes + if block_given? + until eof? + yield _getbyte + end + else + raise NotImplementedError + end + end + alias_method :each_byte, :bytes + + def chars + raise NotImplementedError + end + alias_method :each_char, :chars + + def close + @closed = true + end + + def close_on_exec=(bool) + raise NotImplementedError + end + + def close_on_exec? + raise NotImplementedError + end + + def close_read + raise NotImplementedError + end + + def close_write + raise NotImplementedError + end + + def closed? + @closed + end + + def codepoints + raise NotImplementedError + end + alias_method :each_codepoint, :codepoints + + def each + raise NotImplementedError + end + alias_method :each_line, :each + alias_method :lines, :each + + def eof? + pos == mem.size + end + alias_method :eof, :eof? + + def external_encoding + raise NotImplementedError + end + + def fcntl + raise NotImplementedError + end + + def fdatasync + raise NotImplementedError + end + + def fileno + raise NotImplementedError + end + alias_method :to_i, :fileno + + def flush + # no-op + end + + def fsync + raise NotImplementedError + end + + def getbyte + return nil if eof? + _getbyte + end + + def getc + raise NotImplementedError + end + + def gets + raise NotImplementedError + end + + def internal_encoding + raise NotImplementedError + end + + def ioctl + raise NotImplementedError + end + + def tty? + false + end + alias_method :isatty, :tty? + + def lineno + raise NotImplementedError + end + + def lineno= + raise NotImplementedError + end + + def lines + raise NotImplementedError + end + + def pid + raise NotImplementedError + end + + alias_method :tell, :pos + + def print(*args) + raise NotImplementedError + end + def printf(*args) + raise NotImplementedError + end + + def putc(arg) + raise NotImplementedError + end + + def puts(*args) + raise NotImplementedError + end + + # FIXME: this doesn't match IO#read exactly (corner cases about + # EOF and whether length was nil or not), but it's enough for + # {Marshal::load}. + def read(length = nil, buffer = nil) + length ||= (mem.size - pos) + buffer ||= '' + + actual_length = [(mem.size - pos), length].min + actual_length.times do + buffer << _getbyte + end + buffer + end + + def read_nonblock(*args) + raise NotImplementedError + end + + def readbyte + raise EOFError if eof? + _getbyte + end + + def readchar + raise NotImplementedError + end + + def readline + raise NotImplementedError + end + + def readlines + raise NotImplementedError + end + + def readpartial + raise NotImplementedError + end + + def reopen + raise NotImplementedError + end + + def rewind + pos = 0 + end + + def seek(amount, whence = IO::SEEK_SET) + case whence + when IO::SEEK_CUR + self.pos += amount + when IO::SEEK_END + self.pos = (mem.size + amount) + when IO::SEEK_SET + self.pos = amount + else + raise ArgumentError, "bad seek whence #{whence}" + end + end + + def set_encoding + raise NotImplementedError + end + + def stat + raise NotImplementedError + end + + def sync + true + end + + def sync= + raise NotImplementedError + end + + def sysread(*args) + raise NotImplementedError + end + + def sysseek(*args) + raise NotImplementedError + end + + def syswrite(*args) + raise NotImplementedError + end + + def to_io + raise NotImplementedError + end + + def ungetbyte + raise IOError if pos == 0 + pos -= 1 + end + + def ungetc + raise NotImplementedError + end + + def write(str) + s = str.to_s + _putbytes(s) + s.size + end + + def write_nonblock(str) + raise NotImplementedError + end + + private + + # Like {#getbyte} but does not perform eof check. + def _getbyte + b = mem.get_uchar(pos) + self.pos += 1 + b + end + + def _putbytes(str) + mem.put_bytes(pos, str, 0, str.size) + self.pos += str.size + end + + end +end diff --git a/spec/process_shared/shared_memory_spec.rb b/spec/process_shared/shared_memory_spec.rb index d635157..cc1abb5 100644 --- a/spec/process_shared/shared_memory_spec.rb +++ b/spec/process_shared/shared_memory_spec.rb @@ -32,5 +32,30 @@ module ProcessShared mem.get_int(0).must_equal(1234567) end + + describe 'Object dump/load' do + it 'writes serialized objects' do + mem = SharedMemory.new(1024) + pid = fork do + mem.write_object(['a', 'b']) + Kernel.exit! + end + ::Process.wait(pid) + mem.read_object.must_equal ['a', 'b'] + end + + it 'raises IndexError when insufficient space' do + mem = SharedMemory.new(2) + proc { mem.write_object(['a', 'b']) }.must_raise(IndexError) + end + + it 'writes with an offset' do + mem = SharedMemory.new(1024) + mem.put_object(2, 'string') + proc { mem.read_object }.must_raise(TypeError) + proc { mem.get_object(0) }.must_raise(TypeError) + mem.get_object(2).must_equal 'string' + end + end end end