> 文章列表 > 分享一个极好的go高并发服务练习题,能够串起几乎所有知识点。

分享一个极好的go高并发服务练习题,能够串起几乎所有知识点。

分享一个极好的go高并发服务练习题,能够串起几乎所有知识点。

​ 起因是在学到rabbitmq的用法时,看到官方教程的最后一篇:用rabbitmq来模拟一个rpc调用教程链接。感觉demo过于简陋,于是动了将其改造为高并发形态的念头。过程中越来越觉得对于巩固golang高并发的实践应用非常有益,是一个很好的练手方向,故此分享出来。
其具体代码如下——

服务端

package mainimport ("context""log""strconv""time"amqp "github.com/rabbitmq/amqp091-go"
)func failOnError(err error, msg string) {if err != nil {log.Panicf("%s: %s", msg, err)}
}func fib(n int) int {if n == 0 {return 0} else if n == 1 {return 1} else {return fib(n-1) + fib(n-2)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("rpc_queue", // namefalse,       // durablefalse,       // delete when unusedfalse,       // exclusivefalse,       // no-waitnil,         // arguments)failOnError(err, "Failed to declare a queue")err = ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global)failOnError(err, "Failed to set QoS")msgs, err := ch.Consume(q.Name, // queue"",     // consumerfalse,  // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")var forever chan struct{}go func() {ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()for d := range msgs {n, err := strconv.Atoi(string(d.Body))failOnError(err, "Failed to convert body to integer")log.Printf(" [.] fib(%d)", n)response := fib(n)err = ch.PublishWithContext(ctx,"",        // exchanged.ReplyTo, // routing keyfalse,     // mandatoryfalse,     // immediateamqp.Publishing{ContentType:   "text/plain",CorrelationId: d.CorrelationId,Body:          []byte(strconv.Itoa(response)),})failOnError(err, "Failed to publish a message")d.Ack(false)}}()log.Printf(" [*] Awaiting RPC requests")<-forever
}


客户端

package mainimport ("context""log""math/rand""os""strconv""strings""time"amqp "github.com/rabbitmq/amqp091-go"
)func failOnError(err error, msg string) {if err != nil {log.Panicf("%s: %s", msg, err)}
}func randomString(l int) string {bytes := make([]byte, l)for i := 0; i < l; i++ {bytes[i] = byte(randInt(65, 90))}return string(bytes)
}func randInt(min int, max int) int {return min + rand.Intn(max-min)
}func fibonacciRPC(n int) (res int, err error) {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // noWaitnil,   // arguments)failOnError(err, "Failed to declare a queue")msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")corrId := randomString(32)ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()err = ch.PublishWithContext(ctx,"",          // exchange"rpc_queue", // routing keyfalse,       // mandatoryfalse,       // immediateamqp.Publishing{ContentType:   "text/plain",CorrelationId: corrId,ReplyTo:       q.Name,Body:          []byte(strconv.Itoa(n)),})failOnError(err, "Failed to publish a message")for d := range msgs {if corrId == d.CorrelationId {res, err = strconv.Atoi(string(d.Body))failOnError(err, "Failed to convert body to integer")break}}return
}func main() {rand.Seed(time.Now().UTC().UnixNano())n := bodyFrom(os.Args)log.Printf(" [x] Requesting fib(%d)", n)res, err := fibonacciRPC(n)failOnError(err, "Failed to handle RPC request")log.Printf(" [.] Got %d", res)
}func bodyFrom(args []string) int {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "30"} else {s = strings.Join(args[1:], " ")}n, err := strconv.Atoi(s)failOnError(err, "Failed to convert arg to integer")return n
}

这一套服务的大致思路为:服务端注册一个名为“rpc_queue”的队列,用以接收客户端发来的,计算斐波那契数列对应值的计算请求;客户端在发送请求前,注册一个随机名称的队列,用以接收计算结果的返回值(以下简称回应队列),该队列名会在发送计算请求到“rpc_queue”时附带在请求信息的“ReplyTo”属性中,而服务端在计算完毕后向该队列名发送计算结果即可。

原代码中感觉过于简陋的地方主要在fibonacciRPC方法:fibonacciRPC方法在每次被调用时,都完全重新生成一个新的mq连接,包括之后的注册回应队列、注册消费者、监听并处理返回值、关闭连接等一系列操作,都仅仅服务于某一次的方法调用。在现实生产中,这属于典型的资源浪费。
当然这种简陋、浪费本身也是教程有意为之,正如教程末尾所言:例子只是为了示范用法而存在,会过度简化一些东西以便突出重点,并且也声明了该例子不适合直接用于生产环境。

所以我们的改造方向也就因此诞生了:将以上例子改造为更适合生产环境的高并发服务,在保证fibonacciRPC功能不变且可被高并发调用的基础上,令其中conn、chan、回应队列等资源得到最大程度的复用。
这个改造方向几乎能把多线程并发的知识点串个遍,诸如chan的应用、select+context控制线程自尽、多线程并存时如何保证每条数据各回各家等等。
我个人也在连接复用、支持高并发的基础上,加入了客户端中断报错、服务端中断报错等各种花里胡哨的玩法,总之可玩性很高。大家可以自行练手。
服务端计算的延迟我是用斐波那契数列计算结果的位数来模拟,也即计算结果是n位的数字时,服务端线程休眠n秒再返回,以此方便更直观的测试中断、负载均衡等功能。

附:我自己写的例子。