ホームページ > バックエンド開発 > Golang > Golang 接続プールに関するいくつかの実装事例

Golang 接続プールに関するいくつかの実装事例

藏色散人
リリース: 2020-12-14 14:42:41
転載
2529 人が閲覧しました

次の golang チュートリアル コラムでは、Golang 接続プールのいくつかの実装事例を紹介します。必要!

Golang 接続プールに関するいくつかの実装事例

TCP の 3 ウェイ ハンドシェイクやその他の理由により、接続の確立は比較的コストのかかる行為です。したがって、特定のエンティティと複数回対話する必要があるプログラムでは、再利用可能な接続を備えた接続プールを再利用のために維持する必要があります。

接続プールを維持するための最も基本的な要件は、特に goroutine を特徴とする Golang のような言語では、スレッド セーフ (スレッド セーフ) を達成することです。

単純な接続プールの実装

type Pool struct {
    m sync.Mutex //保证多个goroutine访问时候,closed的线程安全
    res chan io.Closer //连接存储的chan
    factory func() (io.Closer,error) //新建连接的工厂方法
    closed bool //连接池关闭标志
}
ログイン後にコピー

この単純な接続プールでは、chan を使用して接続をプールに保存します。新しい構造を作成する方法は比較的簡単です:

func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("size的值太小了。")
    }
    return &Pool{
        factory: fn,
        res:     make(chan io.Closer, size),
    }, nil
}
ログイン後にコピー

対応するファクトリ関数と接続プールのサイズを指定するだけです。

接続を取得する

それでは、接続からリソースを取得するにはどうすればよいでしょうか?内部ストレージ接続の構造は chan であるため、スレッドの安全性を確保するには、単純な select だけが必要です。

//从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer,error) {
    select {
    case r,ok := <-p.res:
        log.Println("Acquire:共享资源")
        if !ok {
            return nil,ErrPoolClosed
        }
        return r,nil
    default:
        log.Println("Acquire:新生成资源")
        return p.factory()
    }
}
ログイン後にコピー

まず、接続プールの res chan から取得します (そうでない場合)。接続を構築するためにすでに準備されているファクトリー関数を使用します。同時に、res から接続を取得するときに、ok を使用して、まず接続プールが閉じられているかどうかを判断します。すでに接続が閉じられている場合は、すでに準備されている接続が閉じられたエラーを返します。

接続プールを閉じる

接続プールを閉じると述べたので、接続プールを閉じるにはどうすればよいでしょうか?

//关闭资源池,释放资源
func (p *Pool) Close() {
    p.m.Lock()
    defer p.m.Unlock()
    if p.closed {
        return
    }
    p.closed = true
    //关闭通道,不让写入了
    close(p.res)
    //关闭通道里的资源
    for r:=range p.res {
        r.Close()
    }
}
ログイン後にコピー

ここでは、最初に p.m.Lock()* ロック操作を実行する必要があります。これは、構造体の *closed を読み書きする必要があるためです。最初にこのフラグを設定してから res chan を閉じ、Acquire メソッドが新しい接続を取得できないようにする必要があります。次に、res チャネルの接続に対して Close 操作を実行します。

接続を解放します

接続を解放するには前提条件が必要です。つまり、接続プールが閉じられていない必要があります。接続プールが閉じられ、接続が res に送信されると、パニックがトリガーされます。

func (p *Pool) Release(r io.Closer){
    //保证该操作和Close方法的操作是安全的
    p.m.Lock()
    defer p.m.Unlock()
    //资源池都关闭了,就省这一个没有释放的资源了,释放即可
    if p.closed {
        r.Close()
        return
    }
    select {
    case p.res <- r:
        log.Println("资源释放到池子里了")
    default:
        log.Println("资源池满了,释放这个资源吧")
        r.Close()
    }
}
ログイン後にコピー

上記は、シンプルでスレッドセーフな接続プールの実装です。ここでわかるのは、接続プールは実装されましたが、まだいくつかの小さな欠点があるということです:

  1. 最大接続数に制限はありません。スレッド プールが空の場合、デフォルトでは新しい接続を直接作成して返します。同時実行の量が多くなると、新しい接続が作成され続けるため、too manyconnections エラー (特に MySQL) が発生しやすくなります。

  2. 取得可能な接続の最大数を確保する必要があるため、数を厳密にしすぎないようにしてください。アイドル時に一定数のアイドル接続 idleNum を維持できることを期待していますが、取得可能な接続の最大数 maxNum を制限できることも期待しています。

  3. 最初の状況は同時実行が多すぎる場合ですが、同時実行の量が少なすぎる場合はどうなるでしょうか?新しい接続を作成して返した後は、この接続を長期間使用しなくなります。この場合、この接続は数時間以上前に確立されている可能性があります。長期間アイドル状態だった接続の可用性を保証する方法はありません。次回取得する接続は期限切れの接続である可能性があります。

次に、成熟して使用されている MySQL 接続プール ライブラリと Redis 接続プール ライブラリから、これらの問題をどのように解決するかを見ていきます。

Golang 標準ライブラリの SQL 接続プール

Golang の接続プールは、標準ライブラリ database/sql/sql.go の下に実装されています。

db, err := sql.Open("mysql", "xxxx")
ログイン後にコピー

を実行すると、接続プールが開きます。返された db の構造を見てみましょう:

type DB struct {
    waitDuration int64 // Total time waited for new connections.
    mu           sync.Mutex // protects following fields
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
    nextRequest  uint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{}
    closed            bool
    maxIdle           int                    // zero means defaultMaxIdleConns; negative means 0
    maxOpen           int                    // <= 0 means unlimited
    maxLifetime       time.Duration          // maximum amount of time a connection may be reused
    cleanerCh         chan struct{}
    waitCount         int64 // Total number of connections waited for.
    maxIdleClosed     int64 // Total number of connections closed due to idle.
    maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
}
ログイン後にコピー

当面は注意する必要のないいくつかのフィールドは省略されています。 DB 接続プール内に接続を保存するために使用される構造 freeConn は、以前に使用した chan ではなく、接続スライスである []*driverConn であることがわかります。同時に、アイドル接続の数を制御するための maxIdle などの関連変数があることもわかります。 DB 初期化関数 Open 関数は新しいデータベース接続を作成しないことに注意してください。新しい接続を作成するにはどの関数が使用されますか? Query メソッドをずっと前に戻ると、関数 func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error) が表示されます。 ###。接続プールから接続を取得する方法はここから始まります:

获取连接

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    // 先判断db是否已经关闭。
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    // 注意检测context是否已经被超时等原因被取消。
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime
    // 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        conn.Lock()
        err := conn.lastErr
        conn.Unlock()
        if err == driver.ErrBadConn {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }
    // 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 下面的动作相当于往connRequests这个map插入自己的号码牌。
        // 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.waitCount++
        db.mu.Unlock()
        waitStart := time.Now()
        // Timeout the connection request with the context.
        select {
        case <-ctx.Done():
            // context取消操作的时候,记得从connRequests这个map取走自己的号码牌。
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            select {
            default:
            case ret, ok := <-req:
                // 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还!
                if ok && ret.conn != nil {
                    db.putConn(ret.conn, ret.err, false)
                }
            }
            return nil, ctx.Err()
        case ret, ok := <-req:
            // 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            if ret.conn == nil {
                return nil, ret.err
            }
            ret.conn.Lock()
            err := ret.conn.lastErr
            ret.conn.Unlock()
            if err == driver.ErrBadConn {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }
    // 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}
复制代码
ログイン後にコピー

简单来说,DB结构体除了用的是slice来存储连接,还加了一个类似排队机制的connRequests来解决获取等待连接的过程。同时在判断连接健康性都有很好的兼顾。那么既然有了排队机制,归还连接的时候是怎么做的呢?

释放连接

我们可以直接找到 func(db*DB)putConnDBLocked(dc*driverConn,err error)bool这个方法。就像注释说的,这个方法主要的目的是:

Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.

我们主要来看看里面重点那几行:

...
    // 如果已经超过最大打开数量了,就不需要在回归pool了
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 这边是重点了,基本来说就是从connRequest这个map里面随机抽一个在排队等着的请求。取出来后发给他。就不用归还池子了。
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // 删除这个在排队的请求。
        if err == nil {
            dc.inUse = true
        }
        // 把连接给这个正在排队的连接。
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        // 既然没人排队,就看看到了最大连接数目没有。没到就归还给freeConn。
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
...
ログイン後にコピー

我们可以看到,当归还连接时候,如果有在排队轮候的请求就不归还给池子直接发给在轮候的人了。

现在基本就解决前面说的小问题了。不会出现连接太多导致无法控制too many connections的情况。也很好了维持了连接池的最小数量。同时也做了相关对于连接健康性的检查操作。

值得注意的是,作为标准库的代码,相关注释和代码都非常完美,真的可以看的神清气爽。

redis Golang实现的Redis客户端

这个Golang实现的Redis客户端,是怎么实现连接池的。这边的思路非常奇妙,还是能学习到不少好思路。当然了,由于代码注释比较少,啃起来第一下还是有点迷糊的。相关代码地址在https://github.com/go-redis/redis/blob/master/internal/pool/pool.go 可以看到。

而它的连接池结构如下

type ConnPool struct {
    ...
    queue chan struct{}
    connsMu      sync.Mutex
    conns        []*Conn
    idleConns    []*Conn
    poolSize     int
    idleConnsLen int
    stats Stats
    _closed  uint32 // atomic
    closedCh chan struct{}
}
ログイン後にコピー

我们可以看到里面存储连接的结构还是slice。但是我们可以重点看看 queueconnsidleConns这几个变量,后面会提及到。但是值得注意的是!我们可以看到,这里有两个[]*Conn结构:connsidleConns,那么问题来了:

到底连接存在哪里?

新建连接池连接

我们先从新建连接池连接开始看:

func NewConnPool(opt *Options) *ConnPool {
    ....
    p.checkMinIdleConns()
    if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
        go p.reaper(opt.IdleCheckFrequency)
    }
    ....
}
ログイン後にコピー

初始化连接池的函数有个和前面两个不同的地方。

  1. checkMinIdleConns方法,在连接池初始化的时候就会往连接池填满空闲的连接。

  2. go p.reaper(opt.IdleCheckFrequency)则会在初始化连接池的时候就会起一个go程,周期性的淘汰连接池里面要被淘汰的连接。

获取连接

func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
    if p.closed() {
        return nil, ErrClosed
    }
    //这边和前面sql获取连接函数的流程不同。sql是先看看连接池有没有空闲连接,有的话先获取不到再排队。这边是直接先排队获取令牌,排队函数后面会分析。
    err := p.waitTurn(ctx)
    if err != nil {
        return nil, err
    }
    //前面没出error的话,就已经排队轮候到了。接下来就是获取的流程。
    for {
        p.connsMu.Lock()
        //从空闲连接里面先获取一个空闲连接。
        cn := p.popIdle()
        p.connsMu.Unlock()
        if cn == nil {
            // 没有空闲连接时候直接跳出循环。
            break
        }
        // 判断是否已经过时,是的话close掉了然后继续取出。
        if p.isStaleConn(cn) {
            _ = p.CloseConn(cn)
            continue
        }
        atomic.AddUint32(&p.stats.Hits, 1)
        return cn, nil
    }
    atomic.AddUint32(&p.stats.Misses, 1)
    // 如果没有空闲连接的话,这边就直接新建连接了。
    newcn, err := p.newConn(ctx, true)
    if err != nil {
        // 归还令牌。
        p.freeTurn()
        return nil, err
    }
    return newcn, nil
}
ログイン後にコピー

我们可以试着回答开头那个问题:连接到底存在哪里?答案是从 cn:=p.popIdle()这句话可以看出,获取连接这个动作,是从 idleConns里面获取的,而里面的函数也证明了这一点。同时我的理解是:

  1. sql的排队意味着我对连接池申请连接后,把自己的编号告诉连接池。连接那边一看到有空闲了,就叫我的号。我答应了一声,然后连接池就直接给个连接给我。我如果不归还,连接池就一直不叫下一个号。

  2. redis这边的意思是,我去和连接池申请的不是连接而是令牌。我就一直排队等着,连接池给我令牌了,我才去仓库里面找空闲连接或者自己新建一个连接。用完了连接除了归还连接外,还得归还令牌。当然了,如果我自己新建连接出错了,我哪怕拿不到连接回家,我也得把令牌给回连接池,不然连接池的令牌数少了,最大连接数也会变小。

而:

func (p *ConnPool) freeTurn() {
    <-p.queue
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
...
    case p.queue <- struct{}{}:
        return nil
...
}
ログイン後にコピー

就是在靠queue这个chan来维持令牌数量。

那么 conns的作用是什么呢?我们可以来看看新建连接这个函数:

新建连接

func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
    cn, err := p.dialConn(ctx, pooled)
    if err != nil {
        return nil, err
    }
    p.connsMu.Lock()
    p.conns = append(p.conns, cn)
    if pooled {
        // 如果连接池满了,会在后面移除。
        if p.poolSize >= p.opt.PoolSize {
            cn.pooled = false
        } else {
            p.poolSize++
        }
    }
    p.connsMu.Unlock()
    return cn, nil
}
ログイン後にコピー

基本逻辑出来了。就是如果新建连接的话,我并不会直接放在 idleConns里面,而是先放 conns里面。同时先看池子满了没有。满的话后面归还的时候会标记,后面会删除。那么这个后面会删除,指的是什么时候呢?那就是下面说的归还连接的时候了。

归还连接

func (p *ConnPool) Put(cn *Conn) {
    if cn.rd.Buffered() > 0 {
        internal.Logger.Printf("Conn has unread data")
        p.Remove(cn, BadConnError{})
        return
    }
    //这就是我们刚刚说的后面了,前面标记过不要入池的,这边就删除了。当然了,里面也会进行freeTurn操作。
    if !cn.pooled {
        p.Remove(cn, nil)
        return
    }
    p.connsMu.Lock()
    p.idleConns = append(p.idleConns, cn)
    p.idleConnsLen++
    p.connsMu.Unlock()
    //我们可以看到很明显的这个归还号码牌的动作。
    p.freeTurn()
}
ログイン後にコピー

其实归还的过程,就是从 conns转移到 idleConns的过程。当然了,如果新建这个连接时候发现已经 超卖了,后面归还时候就不转移,直接删除了。

等等,上面的逻辑似乎有点不对?我们来理一下获取连接流程:

  1. 先 waitTurn,拿到令牌。而令牌数量是根据pool里面的 queue决定的。

  2. 拿到令牌了,去库房 idleConns里面拿空闲的连接。没有的话就自己 newConn一个,并且把他记录到 conns里面。

  3. が使い果たされたら、put を呼び出して戻ります。つまり、conns から idleConns に転送します。返品する際は、newConn で売られすぎとマークされているかどうかを確認してください。その場合、idleConns には転送されません。

ずっと困惑していたのですが、接続するには必ずトークンを取得する必要があるため、トークンの数は決まっています。なぜ未だに売られ過ぎているのでしょうか?ソース コードを調べた後の私の答えは次のとおりです。

接続を取得する Get メソッドは newConn ですが、このプライベート メソッドはトークン制御の対象であり、売られすぎの原因にはなりません。ただし、このメソッドはパラメータ pooledbool を受け入れます。したがって、他の人がこのメソッドを呼び出すと、状況に関係なくtrueが渡され、poolSizeがどんどん大きくなるのではないかと心配していると思います。

一般に、redis 接続プール内の接続の数は、トークンと呼ばれる queue chan によって制御されます。

概要

上記からわかるように、接続プールの最も基本的な保証は、接続を取得するときのスレッド セーフです。ただし、多くの追加機能を実装する場合、それらはさまざまな角度から実装されます。まだ非常に興味深いです。ただし、ストレージ構造がチャンであるかスライスであるかに関係なく、これは非常にうまく実現できます。 SQL や Redis などの接続を保存するためにスライスを使用する場合は、キューの効果を表す構造を維持する必要があります。

この記事の著者: Xiao Ling は幸運を求めています 日付: 2020-02-28 元のリンク: https://juejin.im/post/5e58e3b7f265da57537eb7ed


以上がGolang 接続プールに関するいくつかの実装事例の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:csdn.net
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート