Initial attempt at ConditionVariable implementation.
This commit is contained in:
		
							parent
							
								
									a1f881f59c
								
							
						
					
					
						commit
						d65979cc2e
					
				| 
						 | 
				
			
			@ -1,14 +1,16 @@
 | 
			
		|||
require 'process_shared/semaphore'
 | 
			
		||||
 | 
			
		||||
module ProcessShared
 | 
			
		||||
  # TODO: implement this
 | 
			
		||||
  class ConditionVariable
 | 
			
		||||
    def initialize
 | 
			
		||||
      @sem = Semaphore.new
 | 
			
		||||
      @internal = Semaphore.new(1)
 | 
			
		||||
      @waiting = SharedMemory.new(:int)
 | 
			
		||||
      @waiting.write_int(0)
 | 
			
		||||
      @sem = Semaphore.new(0)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    def broadcast
 | 
			
		||||
      @sem.post
 | 
			
		||||
      waiting.times { @sem.post }
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    def signal
 | 
			
		||||
| 
						 | 
				
			
			@ -18,10 +20,38 @@ module ProcessShared
 | 
			
		|||
    def wait(mutex, timeout = nil)
 | 
			
		||||
      mutex.unlock
 | 
			
		||||
      begin
 | 
			
		||||
        @sem.wait
 | 
			
		||||
        inc_waiting
 | 
			
		||||
        if timeout
 | 
			
		||||
          begin
 | 
			
		||||
            @sem.try_wait(timeout)
 | 
			
		||||
          rescue Errno::EAGAIN, Errno::ETIMEDOUT
 | 
			
		||||
            # success!
 | 
			
		||||
          end
 | 
			
		||||
        else
 | 
			
		||||
          @sem.wait
 | 
			
		||||
        end
 | 
			
		||||
        dec_waiting
 | 
			
		||||
      ensure
 | 
			
		||||
        mutex.lock
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    private
 | 
			
		||||
 | 
			
		||||
    def waiting
 | 
			
		||||
      @internal.synchronize do
 | 
			
		||||
        @waiting.read_int
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    def inc_waiting(val = 1)
 | 
			
		||||
      @internal.synchronize do
 | 
			
		||||
        @waiting.write_int(@waiting.read_int + val)
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    def dec_waiting
 | 
			
		||||
      inc_waiting(-1)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,67 @@
 | 
			
		|||
require 'spec_helper'
 | 
			
		||||
 | 
			
		||||
require 'process_shared/condition_variable'
 | 
			
		||||
 | 
			
		||||
module ProcessShared
 | 
			
		||||
  describe ConditionVariable do
 | 
			
		||||
    it 'runs the example of Ruby Stdlib ConditionVariable' do
 | 
			
		||||
      mutex = Mutex.new
 | 
			
		||||
      resource = ConditionVariable.new
 | 
			
		||||
 | 
			
		||||
      a = fork {
 | 
			
		||||
        mutex.synchronize {
 | 
			
		||||
          resource.wait(mutex)
 | 
			
		||||
        }
 | 
			
		||||
        Kernel.exit!
 | 
			
		||||
      }
 | 
			
		||||
      
 | 
			
		||||
      b = fork {
 | 
			
		||||
        mutex.synchronize {
 | 
			
		||||
          resource.signal
 | 
			
		||||
        }
 | 
			
		||||
        Kernel.exit!
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      ::Process.wait(a)
 | 
			
		||||
      ::Process.wait(b)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    it 'broadcasts to multiple processes' do
 | 
			
		||||
      mutex = Mutex.new
 | 
			
		||||
      cond = ConditionVariable.new
 | 
			
		||||
      mem = SharedMemory.new(:int)
 | 
			
		||||
      mem.write_int(0)
 | 
			
		||||
 | 
			
		||||
      pids = []
 | 
			
		||||
      10.times do
 | 
			
		||||
        pids << fork do
 | 
			
		||||
          mutex.synchronize {
 | 
			
		||||
            cond.wait(mutex)
 | 
			
		||||
            mem.write_int(mem.read_int + 1)
 | 
			
		||||
          }
 | 
			
		||||
          Kernel.exit!
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      sleep 0.2               # hopefully they are all waiting...
 | 
			
		||||
      mutex.synchronize {
 | 
			
		||||
        cond.broadcast
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      pids.each { |p| ::Process.wait(p) }
 | 
			
		||||
 | 
			
		||||
      mem.read_int.must_equal(10)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    it 'stops waiting after timeout' do
 | 
			
		||||
      mutex = Mutex.new
 | 
			
		||||
      cond = ConditionVariable.new
 | 
			
		||||
 | 
			
		||||
      mutex.synchronize {
 | 
			
		||||
        start = Time.now.to_f
 | 
			
		||||
        cond.wait(mutex, 0.1)
 | 
			
		||||
        (Time.now.to_f - start).must be_gte(0.1)
 | 
			
		||||
      }
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
		Loading…
	
		Reference in New Issue