Make WrappedQueues and PersistableChannelUniqueQueues Pausable (#18393)
Implements the Pausable interface on WrappedQueues and PersistableChannelUniqueQueues Reference #15928 Signed-off-by: Andrew Thornton art27@cantab.net
This commit is contained in:
		
							parent
							
								
									43c6b27716
								
							
						
					
					
						commit
						ab7f701671
					
				| 
						 | 
					@ -196,7 +196,7 @@ func RegisteredTypesAsString() []string {
 | 
				
			||||||
func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
 | 
					func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
 | 
				
			||||||
	newFn, ok := queuesMap[queueType]
 | 
						newFn, ok := queuesMap[queueType]
 | 
				
			||||||
	if !ok {
 | 
						if !ok {
 | 
				
			||||||
		return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
 | 
							return nil, fmt.Errorf("unsupported queue type: %v", queueType)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return newFn(handlerFunc, opts, exemplar)
 | 
						return newFn(handlerFunc, opts, exemplar)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -92,7 +92,7 @@ func (q *ByteFIFOQueue) Push(data Data) error {
 | 
				
			||||||
// PushBack pushes data to the fifo
 | 
					// PushBack pushes data to the fifo
 | 
				
			||||||
func (q *ByteFIFOQueue) PushBack(data Data) error {
 | 
					func (q *ByteFIFOQueue) PushBack(data Data) error {
 | 
				
			||||||
	if !assignableTo(data, q.exemplar) {
 | 
						if !assignableTo(data, q.exemplar) {
 | 
				
			||||||
		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
 | 
							return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	bs, err := json.Marshal(data)
 | 
						bs, err := json.Marshal(data)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
| 
						 | 
					@ -110,7 +110,7 @@ func (q *ByteFIFOQueue) PushBack(data Data) error {
 | 
				
			||||||
// PushFunc pushes data to the fifo
 | 
					// PushFunc pushes data to the fifo
 | 
				
			||||||
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
 | 
					func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
 | 
				
			||||||
	if !assignableTo(data, q.exemplar) {
 | 
						if !assignableTo(data, q.exemplar) {
 | 
				
			||||||
		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
 | 
							return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	bs, err := json.Marshal(data)
 | 
						bs, err := json.Marshal(data)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
| 
						 | 
					@ -398,7 +398,7 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
 | 
				
			||||||
// Has checks if the provided data is in the queue
 | 
					// Has checks if the provided data is in the queue
 | 
				
			||||||
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
 | 
					func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
 | 
				
			||||||
	if !assignableTo(data, q.exemplar) {
 | 
						if !assignableTo(data, q.exemplar) {
 | 
				
			||||||
		return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
 | 
							return false, fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	bs, err := json.Marshal(data)
 | 
						bs, err := json.Marshal(data)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -93,7 +93,7 @@ func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
 | 
				
			||||||
// Push will push data into the queue
 | 
					// Push will push data into the queue
 | 
				
			||||||
func (q *ChannelQueue) Push(data Data) error {
 | 
					func (q *ChannelQueue) Push(data Data) error {
 | 
				
			||||||
	if !assignableTo(data, q.exemplar) {
 | 
						if !assignableTo(data, q.exemplar) {
 | 
				
			||||||
		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
 | 
							return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	q.WorkerPool.Push(data)
 | 
						q.WorkerPool.Push(data)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -59,7 +59,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
 | 
				
			||||||
			if s, ok := cfg.([]byte); ok {
 | 
								if s, ok := cfg.([]byte); ok {
 | 
				
			||||||
				cfg = string(s)
 | 
									cfg = string(s)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			return fmt.Errorf("Timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
 | 
								return fmt.Errorf("timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
 | 
				
			||||||
		default:
 | 
							default:
 | 
				
			||||||
			queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
 | 
								queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
 | 
				
			||||||
			if err == nil {
 | 
								if err == nil {
 | 
				
			||||||
| 
						 | 
					@ -76,9 +76,9 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
 | 
				
			||||||
			i++
 | 
								i++
 | 
				
			||||||
			if q.maxAttempts > 0 && i > q.maxAttempts {
 | 
								if q.maxAttempts > 0 && i > q.maxAttempts {
 | 
				
			||||||
				if bs, ok := q.cfg.([]byte); ok {
 | 
									if bs, ok := q.cfg.([]byte); ok {
 | 
				
			||||||
					return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
 | 
										return fmt.Errorf("unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				return fmt.Errorf("Unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
 | 
									return fmt.Errorf("unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			sleepTime := 100 * time.Millisecond
 | 
								sleepTime := 100 * time.Millisecond
 | 
				
			||||||
			if q.timeout > 0 && q.maxAttempts > 0 {
 | 
								if q.timeout > 0 && q.maxAttempts > 0 {
 | 
				
			||||||
| 
						 | 
					@ -271,6 +271,46 @@ func (q *WrappedQueue) Terminate() {
 | 
				
			||||||
	log.Debug("WrappedQueue: %s Terminated", q.name)
 | 
						log.Debug("WrappedQueue: %s Terminated", q.name)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// IsPaused will return if the pool or queue is paused
 | 
				
			||||||
 | 
					func (q *WrappedQueue) IsPaused() bool {
 | 
				
			||||||
 | 
						q.lock.Lock()
 | 
				
			||||||
 | 
						defer q.lock.Unlock()
 | 
				
			||||||
 | 
						pausable, ok := q.internal.(Pausable)
 | 
				
			||||||
 | 
						return ok && pausable.IsPaused()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Pause will pause the pool or queue
 | 
				
			||||||
 | 
					func (q *WrappedQueue) Pause() {
 | 
				
			||||||
 | 
						q.lock.Lock()
 | 
				
			||||||
 | 
						defer q.lock.Unlock()
 | 
				
			||||||
 | 
						if pausable, ok := q.internal.(Pausable); ok {
 | 
				
			||||||
 | 
							pausable.Pause()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Resume will resume the pool or queue
 | 
				
			||||||
 | 
					func (q *WrappedQueue) Resume() {
 | 
				
			||||||
 | 
						q.lock.Lock()
 | 
				
			||||||
 | 
						defer q.lock.Unlock()
 | 
				
			||||||
 | 
						if pausable, ok := q.internal.(Pausable); ok {
 | 
				
			||||||
 | 
							pausable.Resume()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
 | 
				
			||||||
 | 
					func (q *WrappedQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
 | 
				
			||||||
 | 
						q.lock.Lock()
 | 
				
			||||||
 | 
						defer q.lock.Unlock()
 | 
				
			||||||
 | 
						if pausable, ok := q.internal.(Pausable); ok {
 | 
				
			||||||
 | 
							return pausable.IsPausedIsResumed()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return context.Background().Done(), closedChan
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var closedChan chan struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func init() {
 | 
					func init() {
 | 
				
			||||||
	queuesMap[WrappedQueueType] = NewWrappedQueue
 | 
						queuesMap[WrappedQueueType] = NewWrappedQueue
 | 
				
			||||||
 | 
						closedChan = make(chan struct{})
 | 
				
			||||||
 | 
						close(closedChan)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -22,7 +22,7 @@ func validType(t string) (Type, error) {
 | 
				
			||||||
			return typ, nil
 | 
								return typ, nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
 | 
						return PersistableChannelQueueType, fmt.Errorf("unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func getQueueSettings(name string) (setting.QueueSettings, []byte) {
 | 
					func getQueueSettings(name string) (setting.QueueSettings, []byte) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -111,7 +111,7 @@ func (q *ChannelUniqueQueue) Push(data Data) error {
 | 
				
			||||||
// PushFunc will push data into the queue
 | 
					// PushFunc will push data into the queue
 | 
				
			||||||
func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
 | 
					func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
 | 
				
			||||||
	if !assignableTo(data, q.exemplar) {
 | 
						if !assignableTo(data, q.exemplar) {
 | 
				
			||||||
		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
 | 
							return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	bs, err := json.Marshal(data)
 | 
						bs, err := json.Marshal(data)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -239,6 +239,26 @@ func (q *PersistableChannelUniqueQueue) IsEmpty() bool {
 | 
				
			||||||
	return q.channelQueue.IsEmpty()
 | 
						return q.channelQueue.IsEmpty()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// IsPaused will return if the pool or queue is paused
 | 
				
			||||||
 | 
					func (q *PersistableChannelUniqueQueue) IsPaused() bool {
 | 
				
			||||||
 | 
						return q.channelQueue.IsPaused()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Pause will pause the pool or queue
 | 
				
			||||||
 | 
					func (q *PersistableChannelUniqueQueue) Pause() {
 | 
				
			||||||
 | 
						q.channelQueue.Pause()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Resume will resume the pool or queue
 | 
				
			||||||
 | 
					func (q *PersistableChannelUniqueQueue) Resume() {
 | 
				
			||||||
 | 
						q.channelQueue.Resume()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
 | 
				
			||||||
 | 
					func (q *PersistableChannelUniqueQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
 | 
				
			||||||
 | 
						return q.channelQueue.IsPausedIsResumed()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Shutdown processing this queue
 | 
					// Shutdown processing this queue
 | 
				
			||||||
func (q *PersistableChannelUniqueQueue) Shutdown() {
 | 
					func (q *PersistableChannelUniqueQueue) Shutdown() {
 | 
				
			||||||
	log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
 | 
						log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -105,7 +105,7 @@ func (q *WrappedUniqueQueue) Push(data Data) error {
 | 
				
			||||||
// PushFunc will push the data to the internal channel checking it against the exemplar
 | 
					// PushFunc will push the data to the internal channel checking it against the exemplar
 | 
				
			||||||
func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
 | 
					func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
 | 
				
			||||||
	if !assignableTo(data, q.exemplar) {
 | 
						if !assignableTo(data, q.exemplar) {
 | 
				
			||||||
		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
 | 
							return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	q.tlock.Lock()
 | 
						q.tlock.Lock()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -57,14 +57,12 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
 | 
				
			||||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	dataChan := make(chan Data, config.QueueLength)
 | 
						dataChan := make(chan Data, config.QueueLength)
 | 
				
			||||||
	resumed := make(chan struct{})
 | 
					 | 
				
			||||||
	close(resumed)
 | 
					 | 
				
			||||||
	pool := &WorkerPool{
 | 
						pool := &WorkerPool{
 | 
				
			||||||
		baseCtx:            ctx,
 | 
							baseCtx:            ctx,
 | 
				
			||||||
		baseCtxCancel:      cancel,
 | 
							baseCtxCancel:      cancel,
 | 
				
			||||||
		batchLength:        config.BatchLength,
 | 
							batchLength:        config.BatchLength,
 | 
				
			||||||
		dataChan:           dataChan,
 | 
							dataChan:           dataChan,
 | 
				
			||||||
		resumed:            resumed,
 | 
							resumed:            closedChan,
 | 
				
			||||||
		paused:             make(chan struct{}),
 | 
							paused:             make(chan struct{}),
 | 
				
			||||||
		handle:             handle,
 | 
							handle:             handle,
 | 
				
			||||||
		blockTimeout:       config.BlockTimeout,
 | 
							blockTimeout:       config.BlockTimeout,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue