> 文章列表 > go之基于rabbitmq的火山云服务器弹性伸缩管理程序

go之基于rabbitmq的火山云服务器弹性伸缩管理程序

go之基于rabbitmq的火山云服务器弹性伸缩管理程序

Author: wencoo
Blog:https://wencoo.blog.csdn.net/
Date: 18/04/2023
Details:

文章目录

  • 项目背景
  • 项目功能
  • 模块实现
    • configMq.json
    • configECS.json
    • configDB.json
  • 完整代码
  • 打赏

项目背景

项目服务器不够用了,需要弹性伸缩服务器,准备使用火山的弹性伸缩功能,但是在配置的过程中发现,弹性伸缩的指标都是针对于本机的负载等等进行监控,从而实现弹性伸缩。我的项目是使用的GPU服务器,在程序启动的时候,相关GPU资源都已经申请好了,达到了最大值,即使没有任务,也是资源占用掉了,任务的来源是中间件rabbitmq,所以在本机上监控,并不合理。

项目功能

因为使用的rabbitmq,任务都存储在队列中,先获取队列任务数量,根据队列任务数量,按照策略,决定是否创建弹性伸缩的服务器,创建之后,待任务消费减少,再删除之前创建好的弹性伸缩的服务器。

模块实现

创建配置mq的相关信息的配置文件configMq.json,创建配置服务器的相关信息的配置文件configECS.json,创建配置数据库的相关信息的配置文件configDB.json,这里内容很简单,也没必要整个数据库啦,所以直接使用配置文件解决

configMq.json

{"port": 5672,"hostName": "106.15.0.159","passwd": "xxx","userName": "admin","maxCounts": 3000,"interval": 3000
}

configECS.json

{"ImageId":            "image-ycct00qocb4qwlkufab4","InstanceName":       "instance-test","InstanceTypeId":     "ecs.g1tl.4xlarge","Password":           "xxx",	"ZoneId":             "cn-beijing-b"
}

configDB.json

{"min":2,"max":3,"base":["i-yc9mljz3vo2ud9z787ky","i-ycbpj0u2pz8rx7cc8n06"],"elasticity": [],"rule":1
}

读取相关配置信息

//读取configDB.jsonjsonFile, err := os.Open("configDB.json")if err != nil {fmt.Println("error opening json file")return}defer jsonFile.Close()jsonData, err := ioutil.ReadAll(jsonFile)if err != nil {fmt.Println("error reading json file")return}var dbS DbConfStructjson.Unmarshal(jsonData, &dbS)fmt.Println(dbS.Max)//读取configECS.jsonjsonFile, err = os.Open("configECS.json")if err != nil {fmt.Println("error opening json file")return}defer jsonFile.Close()jsonData, err = ioutil.ReadAll(jsonFile)if err != nil {fmt.Println("error reading json file")return}var ecsS EcsConfStructjson.Unmarshal(jsonData, &ecsS)fmt.Println(ecsS.ImageId)//读取configMq.jsonjsonFile, err = os.Open("configMq.json")if err != nil {fmt.Println("error opening json file")return}defer jsonFile.Close()jsonData, err = ioutil.ReadAll(jsonFile)if err != nil {fmt.Println("error reading json file")return}var mqS MqConfStructjson.Unmarshal(jsonData, &mqS)fmt.Println(mqS.Port)

获取mq队列里面的任务数量

//获取mq队列任务数量msgCounts := getQueueMsgCounts("material.analysis.new", "material.exchange","material.analysis.new", mqS.HostName, mqS.UserName, mqS.Passwd, mqS.Port)

创建弹性伸缩的机器的规则,并创建新的服务器

baseCounts := len(dbS.Base)elasticityCounts := len(dbS.Elasticity)nextMaxCounts := mqS.MaxCounts + (elasticityCounts+1)*mqS.Intervalif (baseCounts + elasticityCounts) >= dbS.Max {logger.Println("启动服务器数量已达到最大值...不再继续启动新服务器")goto Sleep}if msgCounts >= mqS.MaxCounts && msgCounts < nextMaxCounts {//资源申请serverIdList, err := RunInstances()var serverId *stringif err == nil {if len(serverIdList) >= 1 {serverId = serverIdList[0]logger.Println("创建伸缩服务器成功,serverId:", serverId)fmt.Println("create server id:", serverId)}} else {logger.Println("创建伸缩服务器失败,err:", err.Error())goto Sleep}...

删除服务器资源

//资源释放elasticityCount := len(dbS.Elasticity)if 0 == elasticityCount {fmt.Println("没有伸缩资源,继续休眠")goto Sleep} else {eServerId := dbS.Elasticity[len(dbS.Elasticity)-1].EServerIdDeleteInstance(eServerId)fmt.Println("删除伸缩资源:", eServerId)}

完整代码

package mainimport ("encoding/json""fmt""io/ioutil""log""os""time"
)var logger *log.Loggerfunc init() {//指定路径的文件,无则创建logFile, err := os.OpenFile("./log.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0666)if err != nil {panic(err)}logger = log.New(logFile, "", log.Lshortfile|log.Lmicroseconds)
}func main() {logger.Println("application start ...")//读取configDB.jsonjsonFile, err := os.Open("configDB.json")if err != nil {fmt.Println("error opening json file")return}defer jsonFile.Close()jsonData, err := ioutil.ReadAll(jsonFile)if err != nil {fmt.Println("error reading json file")return}var dbS DbConfStructjson.Unmarshal(jsonData, &dbS)fmt.Println(dbS.Max)//读取configECS.jsonjsonFile, err = os.Open("configECS.json")if err != nil {fmt.Println("error opening json file")return}defer jsonFile.Close()jsonData, err = ioutil.ReadAll(jsonFile)if err != nil {fmt.Println("error reading json file")return}var ecsS EcsConfStructjson.Unmarshal(jsonData, &ecsS)fmt.Println(ecsS.ImageId)//读取configMq.jsonjsonFile, err = os.Open("configMq.json")if err != nil {fmt.Println("error opening json file")return}defer jsonFile.Close()jsonData, err = ioutil.ReadAll(jsonFile)if err != nil {fmt.Println("error reading json file")return}var mqS MqConfStructjson.Unmarshal(jsonData, &mqS)fmt.Println(mqS.Port)for {//获取mq队列任务数量msgCounts := getQueueMsgCounts("material.analysis.new", "material.exchange","material.analysis.new", mqS.HostName, mqS.UserName, mqS.Passwd, mqS.Port)fmt.Println("get queue counts is: ", msgCounts)baseCounts := len(dbS.Base)elasticityCounts := len(dbS.Elasticity)nextMaxCounts := mqS.MaxCounts + (elasticityCounts+1)*mqS.Intervalif (baseCounts + elasticityCounts) >= dbS.Max {logger.Println("启动服务器数量已达到最大值...不再继续启动新服务器")goto Sleep}if msgCounts >= mqS.MaxCounts && msgCounts < nextMaxCounts {//资源申请serverIdList, err := RunInstances()var serverId *stringif err == nil {if len(serverIdList) >= 1 {serverId = serverIdList[0]logger.Println("创建伸缩服务器成功,serverId:", serverId)fmt.Println("create server id:", serverId)}} else {logger.Println("创建伸缩服务器失败,err:", err.Error())goto Sleep}//写入jsonvar eesId ElasticityMachineeesId.EServerId = *serverIddbS.Elasticity = append(dbS.Elasticity, eesId)jsonFile, err = os.Open("configDB.json")if err != nil {fmt.Println("error opening json file")return}defer jsonFile.Close()data, err := json.MarshalIndent(dbS, "", "	") // 第二个表示每行的前缀,这里不用,第三个是缩进符号,这里用tabif err != nil {panic(err)}err = ioutil.WriteFile("configDB.json", data, 0777)if err != nil {panic(err)}} else {fmt.Println("队列任务较少,尝试释放资源")//查询弹性设备创建时间,不用查,直接删除任意一个,反正值伸缩出来的服务// for _, serverId := range dbS.Elasticity {// 	sid := serverId.EServerId// }//资源释放elasticityCount := len(dbS.Elasticity)if 0 == elasticityCount {fmt.Println("没有伸缩资源,继续休眠")goto Sleep} else {eServerId := dbS.Elasticity[len(dbS.Elasticity)-1].EServerIdDeleteInstance(eServerId)fmt.Println("删除伸缩资源:", eServerId)}//删除数据写入jsondbS.Elasticity = dbS.Elasticity[:len(dbS.Elasticity)-1]jsonFile, err = os.Open("configDB.json")if err != nil {fmt.Println("error opening json file")return}defer jsonFile.Close()data, err := json.MarshalIndent(dbS, "", "   ") // 第二个表示每行的前缀,这里不用,第三个是缩进符号,这里用tabif err != nil {panic(err)}err = ioutil.WriteFile("configDB.json", data, 0777)if err != nil {panic(err)}}Sleep:time.Sleep(time.Second * 300)}}

项目代码已上传至gitee,传送门 volcano_ecs_manage

打赏

如果该文章对您有帮助,可以小小的打上一下哈,您的支持,是对原创的最大支持。
在这里插入图片描述