Go 示例: 有状态的 Goroutine

在之前的示例中,我们使用显式锁定与 互斥锁 来同步多个 Goroutine 对共享状态的访问。另一种选择是使用 Goroutine 和通道的内置同步功能来实现相同的结果。这种基于通道的方法符合 Go 的理念,即通过通信来共享内存,并让每个数据块由一个 Goroutine 独占。

package main
import (
    "fmt"
    "math/rand"
    "sync/atomic"
    "time"
)

在本示例中,我们的状态将由单个 Goroutine 拥有。这将保证数据永远不会因并发访问而损坏。为了读取或写入该状态,其他 Goroutine 将向拥有 Goroutine 发送消息并接收相应的回复。这些 readOpwriteOp struct 封装了这些请求,以及拥有 Goroutine 响应的方式。

type readOp struct {
    key  int
    resp chan int
}
type writeOp struct {
    key  int
    val  int
    resp chan bool
}
func main() {

与之前一样,我们将计算执行的操作数量。

    var readOps uint64
    var writeOps uint64

readswrites 通道将被其他 Goroutine 用于分别发出读取和写入请求。

    reads := make(chan readOp)
    writes := make(chan writeOp)

以下是拥有 state 的 Goroutine,它是一个与之前示例中相同的映射,但现在对有状态的 Goroutine 是私有的。此 Goroutine 反复地在 readswrites 通道上进行选择,并在请求到达时进行响应。响应通过首先执行请求的操作,然后在响应通道 resp 上发送一个值来执行,以指示成功(以及在 reads 的情况下所需的价值)。

    go func() {
        var state = make(map[int]int)
        for {
            select {
            case read := <-reads:
                read.resp <- state[read.key]
            case write := <-writes:
                state[write.key] = write.val
                write.resp <- true
            }
        }
    }()

这将启动 100 个 Goroutine,通过 reads 通道向拥有状态的 Goroutine 发出读取请求。每次读取都需要构造一个 readOp,将其发送到 reads 通道,然后通过提供的 resp 通道接收结果。

    for r := 0; r < 100; r++ {
        go func() {
            for {
                read := readOp{
                    key:  rand.Intn(5),
                    resp: make(chan int)}
                reads <- read
                <-read.resp
                atomic.AddUint64(&readOps, 1)
                time.Sleep(time.Millisecond)
            }
        }()
    }

我们也启动了 10 次写入,使用类似的方法。

    for w := 0; w < 10; w++ {
        go func() {
            for {
                write := writeOp{
                    key:  rand.Intn(5),
                    val:  rand.Intn(100),
                    resp: make(chan bool)}
                writes <- write
                <-write.resp
                atomic.AddUint64(&writeOps, 1)
                time.Sleep(time.Millisecond)
            }
        }()
    }

让 Goroutine 工作一秒钟。

    time.Sleep(time.Second)

最后,捕获并报告操作计数。

    readOpsFinal := atomic.LoadUint64(&readOps)
    fmt.Println("readOps:", readOpsFinal)
    writeOpsFinal := atomic.LoadUint64(&writeOps)
    fmt.Println("writeOps:", writeOpsFinal)
}

运行我们的程序表明,基于 Goroutine 的状态管理示例完成了大约 80,000 次总操作。

$ go run stateful-goroutines.go
readOps: 71708
writeOps: 7177

对于这种情况,基于 Goroutine 的方法比基于互斥锁的方法稍微复杂一些。但是,它在某些情况下可能有用,例如,当您有其他通道参与或管理多个此类互斥锁会容易出错时。您应该使用感觉最自然的方法,尤其是在理解程序正确性方面。

下一个示例:排序.