473 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			473 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Go
		
	
	
	
package zkhelper
 | 
						|
 | 
						|
import (
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"path"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	etcderr "github.com/coreos/etcd/error"
 | 
						|
	"github.com/coreos/go-etcd/etcd"
 | 
						|
	zk "github.com/ngaut/go-zookeeper/zk"
 | 
						|
	"github.com/ngaut/log"
 | 
						|
	"github.com/ngaut/pools"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	singleInstanceLock sync.Mutex
 | 
						|
	etcdInstance       *etcdImpl
 | 
						|
)
 | 
						|
 | 
						|
type PooledEtcdClient struct {
 | 
						|
	c *etcd.Client
 | 
						|
}
 | 
						|
 | 
						|
func (c *PooledEtcdClient) Close() {
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) Seq2Str(seq int64) string {
 | 
						|
	return fmt.Sprintf("%d", seq)
 | 
						|
}
 | 
						|
 | 
						|
type etcdImpl struct {
 | 
						|
	sync.Mutex
 | 
						|
	cluster  string
 | 
						|
	pool     *pools.ResourcePool
 | 
						|
	indexMap map[string]uint64
 | 
						|
}
 | 
						|
 | 
						|
func convertToZkError(err error) error {
 | 
						|
	//todo: convert other errors
 | 
						|
	if ec, ok := err.(*etcd.EtcdError); ok {
 | 
						|
		switch ec.ErrorCode {
 | 
						|
		case etcderr.EcodeKeyNotFound:
 | 
						|
			return zk.ErrNoNode
 | 
						|
		case etcderr.EcodeNotFile:
 | 
						|
		case etcderr.EcodeNotDir:
 | 
						|
		case etcderr.EcodeNodeExist:
 | 
						|
			return zk.ErrNodeExists
 | 
						|
		case etcderr.EcodeDirNotEmpty:
 | 
						|
			return zk.ErrNotEmpty
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func convertToZkEvent(watchPath string, resp *etcd.Response, err error) zk.Event {
 | 
						|
	//log.Infof("convert event from path:%s, %+v, %+v", watchPath, resp, resp.Node.Key)
 | 
						|
	var e zk.Event
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		e.Err = convertToZkError(err)
 | 
						|
		e.State = zk.StateDisconnected
 | 
						|
		return e
 | 
						|
	}
 | 
						|
 | 
						|
	e.State = zk.StateConnected
 | 
						|
 | 
						|
	e.Path = resp.Node.Key
 | 
						|
	if len(resp.Node.Key) > len(watchPath) {
 | 
						|
		e.Type = zk.EventNodeChildrenChanged
 | 
						|
		return e
 | 
						|
	}
 | 
						|
 | 
						|
	switch resp.Action {
 | 
						|
	case "set":
 | 
						|
		e.Type = zk.EventNodeDataChanged
 | 
						|
	case "delete":
 | 
						|
		e.Type = zk.EventNodeDeleted
 | 
						|
	case "update":
 | 
						|
		e.Type = zk.EventNodeDataChanged
 | 
						|
	case "create":
 | 
						|
		e.Type = zk.EventNodeCreated
 | 
						|
	case "expire":
 | 
						|
		e.Type = zk.EventNotWatching
 | 
						|
	}
 | 
						|
 | 
						|
	return e
 | 
						|
}
 | 
						|
 | 
						|
func NewEtcdConn(zkAddr string) (Conn, error) {
 | 
						|
	singleInstanceLock.Lock()
 | 
						|
	defer singleInstanceLock.Unlock()
 | 
						|
	if etcdInstance != nil {
 | 
						|
		return etcdInstance, nil
 | 
						|
	}
 | 
						|
 | 
						|
	p := pools.NewResourcePool(func() (pools.Resource, error) {
 | 
						|
		cluster := strings.Split(zkAddr, ",")
 | 
						|
		for i, addr := range cluster {
 | 
						|
			if !strings.HasPrefix(addr, "http://") {
 | 
						|
				cluster[i] = "http://" + addr
 | 
						|
			}
 | 
						|
		}
 | 
						|
		newClient := etcd.NewClient(cluster)
 | 
						|
		newClient.SetConsistency(etcd.STRONG_CONSISTENCY)
 | 
						|
		return &PooledEtcdClient{c: newClient}, nil
 | 
						|
	}, 10, 10, 0)
 | 
						|
 | 
						|
	etcdInstance = &etcdImpl{
 | 
						|
		cluster:  zkAddr,
 | 
						|
		pool:     p,
 | 
						|
		indexMap: make(map[string]uint64),
 | 
						|
	}
 | 
						|
 | 
						|
	log.Infof("new etcd %s", zkAddr)
 | 
						|
	if etcdInstance == nil {
 | 
						|
		return nil, errors.New("unknown error")
 | 
						|
	}
 | 
						|
 | 
						|
	return etcdInstance, nil
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) Get(key string) (data []byte, stat zk.Stat, err error) {
 | 
						|
	conn, err := e.pool.Get()
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	defer e.pool.Put(conn)
 | 
						|
	c := conn.(*PooledEtcdClient).c
 | 
						|
 | 
						|
	resp, err := c.Get(key, true, false)
 | 
						|
	if resp == nil {
 | 
						|
		return nil, nil, convertToZkError(err)
 | 
						|
	}
 | 
						|
 | 
						|
	return []byte(resp.Node.Value), nil, nil
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) setIndex(key string, index uint64) {
 | 
						|
	e.Lock()
 | 
						|
	defer e.Unlock()
 | 
						|
 | 
						|
	e.indexMap[key] = index
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) getIndex(key string) uint64 {
 | 
						|
	e.Lock()
 | 
						|
	defer e.Unlock()
 | 
						|
 | 
						|
	index := e.indexMap[key]
 | 
						|
 | 
						|
	return index
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) watch(key string, children bool) (resp *etcd.Response, stat zk.Stat, watch <-chan zk.Event, err error) {
 | 
						|
	conn, err := e.pool.Get()
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	defer e.pool.Put(conn)
 | 
						|
	c := conn.(*PooledEtcdClient).c
 | 
						|
	index := e.getIndex(key)
 | 
						|
	resp, err = c.Get(key, true, true)
 | 
						|
	if resp == nil {
 | 
						|
		return nil, nil, nil, convertToZkError(err)
 | 
						|
	}
 | 
						|
 | 
						|
	if index < resp.Node.ModifiedIndex {
 | 
						|
		index = resp.Node.ModifiedIndex
 | 
						|
	}
 | 
						|
 | 
						|
	for _, n := range resp.Node.Nodes {
 | 
						|
		if n.ModifiedIndex > index {
 | 
						|
			index = n.ModifiedIndex
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	log.Info("try watch", key)
 | 
						|
	ch := make(chan zk.Event, 100)
 | 
						|
	originVal := resp.Node.Value
 | 
						|
 | 
						|
	go func() {
 | 
						|
		defer func() {
 | 
						|
			e.setIndex(key, index)
 | 
						|
		}()
 | 
						|
 | 
						|
		for {
 | 
						|
			conn, err := e.pool.Get()
 | 
						|
			if err != nil {
 | 
						|
				log.Error(err)
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			c := conn.(*PooledEtcdClient).c
 | 
						|
 | 
						|
			resp, err := c.Watch(key, index, children, nil, nil)
 | 
						|
			e.pool.Put(conn)
 | 
						|
 | 
						|
			if err != nil {
 | 
						|
				if ec, ok := err.(*etcd.EtcdError); ok {
 | 
						|
					if ec.ErrorCode == etcderr.EcodeEventIndexCleared {
 | 
						|
						index++
 | 
						|
						continue
 | 
						|
					}
 | 
						|
				}
 | 
						|
 | 
						|
				log.Warning("watch", err)
 | 
						|
				ch <- convertToZkEvent(key, resp, err)
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			if key == resp.Node.Key && originVal == string(resp.Node.Value) { //keep alive event
 | 
						|
				index++
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			ch <- convertToZkEvent(key, resp, err)
 | 
						|
			//update index
 | 
						|
			if index <= resp.Node.ModifiedIndex {
 | 
						|
				index = resp.Node.ModifiedIndex + 1
 | 
						|
			} else {
 | 
						|
				index++
 | 
						|
			}
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	return resp, nil, ch, nil
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) GetW(key string) (data []byte, stat zk.Stat, watch <-chan zk.Event, err error) {
 | 
						|
	resp, stat, watch, err := e.watch(key, false)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	return []byte(resp.Node.Value), stat, watch, nil
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) Children(key string) (children []string, stat zk.Stat, err error) {
 | 
						|
	conn, err := e.pool.Get()
 | 
						|
	if err != nil {
 | 
						|
		return nil, nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	defer e.pool.Put(conn)
 | 
						|
	c := conn.(*PooledEtcdClient).c
 | 
						|
 | 
						|
	resp, err := c.Get(key, true, false)
 | 
						|
	if resp == nil {
 | 
						|
		return nil, nil, convertToZkError(err)
 | 
						|
	}
 | 
						|
 | 
						|
	for _, c := range resp.Node.Nodes {
 | 
						|
		children = append(children, path.Base(c.Key))
 | 
						|
	}
 | 
						|
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) ChildrenW(key string) (children []string, stat zk.Stat, watch <-chan zk.Event, err error) {
 | 
						|
	resp, stat, watch, err := e.watch(key, true)
 | 
						|
	if err != nil {
 | 
						|
		return nil, stat, nil, convertToZkError(err)
 | 
						|
	}
 | 
						|
 | 
						|
	for _, c := range resp.Node.Nodes {
 | 
						|
		children = append(children, path.Base(c.Key))
 | 
						|
	}
 | 
						|
 | 
						|
	return children, stat, watch, nil
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) Exists(key string) (exist bool, stat zk.Stat, err error) {
 | 
						|
	conn, err := e.pool.Get()
 | 
						|
	if err != nil {
 | 
						|
		return false, nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	defer e.pool.Put(conn)
 | 
						|
	c := conn.(*PooledEtcdClient).c
 | 
						|
 | 
						|
	_, err = c.Get(key, true, false)
 | 
						|
	if err == nil {
 | 
						|
		return true, nil, nil
 | 
						|
	}
 | 
						|
 | 
						|
	if ec, ok := err.(*etcd.EtcdError); ok {
 | 
						|
		if ec.ErrorCode == etcderr.EcodeKeyNotFound {
 | 
						|
			return false, nil, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return false, nil, convertToZkError(err)
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) ExistsW(key string) (exist bool, stat zk.Stat, watch <-chan zk.Event, err error) {
 | 
						|
	_, stat, watch, err = e.watch(key, false)
 | 
						|
	if err != nil {
 | 
						|
		return false, nil, nil, convertToZkError(err)
 | 
						|
	}
 | 
						|
 | 
						|
	return true, nil, watch, nil
 | 
						|
}
 | 
						|
 | 
						|
const MAX_TTL = 365 * 24 * 60 * 60
 | 
						|
 | 
						|
func (e *etcdImpl) doKeepAlive(key string, ttl uint64) error {
 | 
						|
	conn, err := e.pool.Get()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	defer e.pool.Put(conn)
 | 
						|
	c := conn.(*PooledEtcdClient).c
 | 
						|
 | 
						|
	resp, err := c.Get(key, false, false)
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	if resp.Node.Dir {
 | 
						|
		return fmt.Errorf("can not set ttl to directory", key)
 | 
						|
	}
 | 
						|
 | 
						|
	//log.Info("keep alive ", key)
 | 
						|
	resp, err = c.CompareAndSwap(key, resp.Node.Value, ttl, resp.Node.Value, resp.Node.ModifiedIndex)
 | 
						|
	if err == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	if ec, ok := err.(*etcd.EtcdError); ok && ec.ErrorCode == etcderr.EcodeTestFailed {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
//todo:add test for keepAlive
 | 
						|
func (e *etcdImpl) keepAlive(key string, ttl uint64) {
 | 
						|
	go func() {
 | 
						|
		for {
 | 
						|
			time.Sleep(1 * time.Second)
 | 
						|
			err := e.doKeepAlive(key, ttl)
 | 
						|
			if err != nil {
 | 
						|
				log.Error(err)
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) Create(wholekey string, value []byte, flags int32, aclv []zk.ACL) (keyCreated string, err error) {
 | 
						|
	seq := (flags & zk.FlagSequence) != 0
 | 
						|
	tmp := (flags & zk.FlagEphemeral) != 0
 | 
						|
	ttl := uint64(MAX_TTL)
 | 
						|
	if tmp {
 | 
						|
		ttl = 5
 | 
						|
	}
 | 
						|
 | 
						|
	var resp *etcd.Response
 | 
						|
 | 
						|
	conn, err := e.pool.Get()
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	defer e.pool.Put(conn)
 | 
						|
	c := conn.(*PooledEtcdClient).c
 | 
						|
 | 
						|
	fn := c.Create
 | 
						|
	log.Info("create", wholekey)
 | 
						|
 | 
						|
	if seq {
 | 
						|
		wholekey = path.Dir(wholekey)
 | 
						|
		fn = c.CreateInOrder
 | 
						|
	} else {
 | 
						|
		for _, v := range aclv {
 | 
						|
			if v.Perms == PERM_DIRECTORY {
 | 
						|
				log.Info("etcdImpl:create directory", wholekey)
 | 
						|
				fn = nil
 | 
						|
				resp, err = c.CreateDir(wholekey, uint64(ttl))
 | 
						|
				if err != nil {
 | 
						|
					return "", convertToZkError(err)
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if fn == nil {
 | 
						|
		if tmp {
 | 
						|
			e.keepAlive(wholekey, ttl)
 | 
						|
		}
 | 
						|
		return resp.Node.Key, nil
 | 
						|
	}
 | 
						|
 | 
						|
	resp, err = fn(wholekey, string(value), uint64(ttl))
 | 
						|
	if err != nil {
 | 
						|
		return "", convertToZkError(err)
 | 
						|
	}
 | 
						|
 | 
						|
	if tmp {
 | 
						|
		e.keepAlive(resp.Node.Key, ttl)
 | 
						|
	}
 | 
						|
 | 
						|
	return resp.Node.Key, nil
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) Set(key string, value []byte, version int32) (stat zk.Stat, err error) {
 | 
						|
	if version == 0 {
 | 
						|
		return nil, errors.New("invalid version")
 | 
						|
	}
 | 
						|
 | 
						|
	conn, err := e.pool.Get()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	defer e.pool.Put(conn)
 | 
						|
	c := conn.(*PooledEtcdClient).c
 | 
						|
 | 
						|
	resp, err := c.Get(key, true, false)
 | 
						|
	if resp == nil {
 | 
						|
		return nil, convertToZkError(err)
 | 
						|
	}
 | 
						|
 | 
						|
	_, err = c.Set(key, string(value), uint64(resp.Node.TTL))
 | 
						|
	return nil, convertToZkError(err)
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) Delete(key string, version int32) (err error) {
 | 
						|
	//todo: handle version
 | 
						|
	conn, err := e.pool.Get()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	defer e.pool.Put(conn)
 | 
						|
	c := conn.(*PooledEtcdClient).c
 | 
						|
 | 
						|
	resp, err := c.Get(key, true, false)
 | 
						|
	if resp == nil {
 | 
						|
		return convertToZkError(err)
 | 
						|
	}
 | 
						|
 | 
						|
	if resp.Node.Dir {
 | 
						|
		_, err = c.DeleteDir(key)
 | 
						|
	} else {
 | 
						|
		_, err = c.Delete(key, false)
 | 
						|
	}
 | 
						|
 | 
						|
	return convertToZkError(err)
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) GetACL(key string) ([]zk.ACL, zk.Stat, error) {
 | 
						|
	return nil, nil, nil
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) SetACL(key string, aclv []zk.ACL, version int32) (zk.Stat, error) {
 | 
						|
	return nil, nil
 | 
						|
}
 | 
						|
 | 
						|
func (e *etcdImpl) Close() {
 | 
						|
	//how to implement this
 | 
						|
}
 |