分享一个极好的go高并发服务练习题
起因是在学到rabbitmq的用法时,官方教程的最后一篇是用rabbitmq来模拟一个rpc调用。(教程链接)
其具体代码如下——
服务端:
package mainimport ("context""log""strconv""time"amqp "github.com/rabbitmq/amqp091-go"
)func failOnError(err error, msg string) {if err != nil {log.Panicf("%s: %s", msg, err)}
}func fib(n int) int {if n == 0 {return 0} else if n == 1 {return 1} else {return fib(n-1) + fib(n-2)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("rpc_queue", // namefalse, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)failOnError(err, "Failed to declare a queue")err = ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global)failOnError(err, "Failed to set QoS")msgs, err := ch.Consume(q.Name, // queue"", // consumerfalse, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)failOnError(err, "Failed to register a consumer")var forever chan struct{}go func() {ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()for d := range msgs {n, err := strconv.Atoi(string(d.Body))failOnError(err, "Failed to convert body to integer")log.Printf(" [.] fib(%d)", n)response := fib(n)err = ch.PublishWithContext(ctx,"", // exchanged.ReplyTo, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",CorrelationId: d.CorrelationId,Body: []byte(strconv.Itoa(response)),})failOnError(err, "Failed to publish a message")d.Ack(false)}}()log.Printf(" [*] Awaiting RPC requests")<-forever
}
客户端
package mainimport ("context""log""math/rand""os""strconv""strings""time"amqp "github.com/rabbitmq/amqp091-go"
)func failOnError(err error, msg string) {if err != nil {log.Panicf("%s: %s", msg, err)}
}func randomString(l int) string {bytes := make([]byte, l)for i := 0; i < l; i++ {bytes[i] = byte(randInt(65, 90))}return string(bytes)
}func randInt(min int, max int) int {return min + rand.Intn(max-min)
}func fibonacciRPC(n int) (res int, err error) {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // noWaitnil, // arguments)failOnError(err, "Failed to declare a queue")msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)failOnError(err, "Failed to register a consumer")corrId := randomString(32)ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()err = ch.PublishWithContext(ctx,"", // exchange"rpc_queue", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",CorrelationId: corrId,ReplyTo: q.Name,Body: []byte(strconv.Itoa(n)),})failOnError(err, "Failed to publish a message")for d := range msgs {if corrId == d.CorrelationId {res, err = strconv.Atoi(string(d.Body))failOnError(err, "Failed to convert body to integer")break}}return
}func main() {rand.Seed(time.Now().UTC().UnixNano())n := bodyFrom(os.Args)log.Printf(" [x] Requesting fib(%d)", n)res, err := fibonacciRPC(n)failOnError(err, "Failed to handle RPC request")log.Printf(" [.] Got %d", res)
}func bodyFrom(args []string) int {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "30"} else {s = strings.Join(args[1:], " ")}n, err := strconv.Atoi(s)failOnError(err, "Failed to convert arg to integer")return n
}
这一套服务的大致思路为:服务端注册一个名为“rpc_queue”的队列,用以接收客户端发来的,计算斐波那契数列对应值的计算请求;客户端在发送请求前,注册一个随机名称的队列,用以接收计算结果的返回值(以下简称回应队列),该队列名会在发送计算请求到“rpc_queue”时附带在请求信息的“ReplyTo”属性中,而服务端在计算完毕后向该队列名发送计算结果即可。
接下来找到客户端代码的fibonacciRPC方法的定义位置,该方法即rpc方法的本体,方法中包含了发送请求到rpc服务、阻塞线程以等待返回、获取返回结果后return给方法调用者等三个步骤,对外表现即为一个同步调用方法。
但是该方法中有个非常碍眼的地方:fibonacciRPC方法在每次被调用时,都完全重新生成一个新的mq连接,包括之后的注册回应队列、注册消费者、监听并处理返回值、关闭连接等一系列操作,都仅仅服务于一次方法调用。在网络服务高并发的情形下,这属于典型的资源浪费,所以一个好的高并发练习题就此诞生——
将以上例子改造为高并发服务,在保证fibonacciRPC功能不变且可被高并发调用的基础上,令其中conn、chan、回应队列等资源得到最大程度的复用。
这套练习几乎能把多线程并发的知识点串个遍,诸如chan的应用、select+context控制线程自尽、多线程并存时如何保证每条数据各回各家等等。
我自己完成的demo再稍做修饰,过会儿上传。