利用akka模拟Spark的Master与Worker通信
利用akka模拟Spark的Master与Worker通信
文章目录
- 利用akka模拟Spark的Master与Worker通信
-
- Master与Worker通信过程
- 消息类
- Master实现
- Worker实现
- 总结
Spark是一个基于内存计算的大数据处理框架,它提供了一个独立部署模式(Standalone),可以在自己的集群中运行Spark应用程序。在这种模式下,Spark有两种角色:Master和Worker。Master是集群的控制者,负责管理Worker的注册、注销、状态变更等,以及调度Driver和Executor的运行。Worker是集群的工作者,负责运行Driver和Executor,并向Master报告自己的状态和资源信息。
在本文中,我们将使用Akka框架来模拟Spark的Master与Worker通信过程。Akka是一个基于Actor模型的并发和分布式编程框架,它提供了一种简单而强大的抽象,让我们可以用异步消息来构建高性能、可扩展、容错的系统。
Master与Worker通信过程
我们将使用Akka来模拟以下几个步骤:
- 启动Master和Worker,并建立连接。
- Worker向Master注册自己的信息(内存、核数等)。
- Master收到Worker的注册信息后,回复注册成功的消息。
- Worker收到注册成功的消息后,启动一个定时任务,定期向Master发送心跳包(3秒)。
- Master收到心跳包后,更新Worker的状态信息。
- Master启动一个定时任务,检查Worker是否超时(30秒),如果超时,则删除Worker的信息。
消息类
首先,我们需要定义一些消息类型,用于在Master和Worker之间传递数据:
//使用样例类是因为方便序列化,模拟是在一台机器上,但多台机器时不行,所以要序列化
/*样例类,表示从节点的注册信息* @param slave_id 从节点的Id* @param lastUpdateTime 上次更新心跳的时间* @param cores 从节点的核数* @param memory 从节点的内存大小*/
case class RegisterClass(val slave_id:String,var lastUpdateTime:Long,val cores:Int,val memory:String)/* 样例对象,表示注册成功*/
case object RegisterSuccess/* 样例类,表示心跳* @param slave_id 发送心跳的从节点ID*/
case class HeartBeat(val slave_id:String)/* 样例对象,表示主节点检测从节点是否超时发送心跳*/
case object CheckTimeOut
Master实现
然后,我们需要定义一个Master类,继承自Actor特质,并实现receive方法,用于处理收到的消息。Master类还需要一个preStart方法,在启动前执行一些初始化操作,例如启动定时任务检查Worker是否超时。
需要导入的包
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactoryimport java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.duration.Duration
//import scala.collection.mutable.ListBuffer
class Master
class Master extends Actor {//val buffer: ListBuffer[RegisterClass] = ListBuffer[RegisterClass]()// 定义一个可变的映射变量,名为slaves ,初始化这个映射为空var slaves: mutable.Map[String, RegisterClass] = mutable.Map[String, RegisterClass]()// 重写receive方法,定义Actor可以处理的消息类型override def receive: Receive = {// 如果收到一个RegisterClass类型的消息,将其赋值给xcase x: RegisterClass => {// 将x中的slave_id和RegisterClass对象放入slaves映射中slaves.put(x.slave_id, x)// 打印出当前注册的worker的数量println(s"worker:${x.slave_id} is registering................................................ current workers ${slaves.size}")// 向发送者回复一个RegisterSuccess消息sender() ! RegisterSuccess}// 如果收到HeartBeat类型的消息case x: HeartBeat => {// 从Map中获取slave actor的信息slaves.get(x.slave_id) match {// 如果存在该slave actor的信息case Some(value) => {// 更新该slave actor的最后更新时间value.lastUpdateTime = System.currentTimeMillis()// 将更新后的信息存储到Map中slaves.put(x.slave_id, value)// 打印一句话,表示该slave actor正常运行println(s"${x.slave_id} is responsive and functional. ")// 打印一句话,表示当前在线的slave actor数量println(s"Current online workers is ${slaves.size}")}// 如果不存在该slave actor的信息case None => {// 打印一句话,表示该slave actor不存在println(s"${x.slave_id} does not exist !!!")}}}// 如果收到CheckTimeOut消息case CheckTimeOut => {// 如果Map不为空if (slaves.nonEmpty) {// 过滤Map中的信息slaves = slaves.filter(tuple => {// 如果该slave actor的最后更新时间距离当前时间超过30秒if (System.currentTimeMillis() - tuple._2.lastUpdateTime > 30000) {// 打印一句话,表示该slave actor已经超时,并从Map中删除该信息println(s"${tuple._1} is timeout , removed from mater !!!!!!!!!!!!!!!!!!!!!!!!!!!!!")false} else true})}}}// 重写preStart方法,该方法在Actor启动时被调用override def preStart(): Unit = {// 导入context中的dispatcher,用于执行定时任务import context.dispatcher// 使用context中的system调用scheduler方法,创建一个定时任务context.system.scheduler.schedule(// 定时任务的初始延迟时间为10秒Duration(10, TimeUnit.SECONDS),// 定时任务的执行间隔为10秒Duration(10, TimeUnit.SECONDS),// 定时任务的接收者为self,即当前Actorself,// 定时任务发送的消息为CheckTimeOut,用于检查超时CheckTimeOut )}
}
最后,我们需要定义一个Master对象,用于启动Master并创建Master实例。我们需要配置Master的地址和端口,并根据这些参数创建一个配置对象。然后,我们需要使用这个配置对象创建一个ActorSystem,并使用它创建一个Master实例。
object Master
object Master {def main(args: Array[String]): Unit = {// 定义一个配置字符串,包含actor的提供者,主机名和端口号val conf ="""|akka.actor.provider = akka.remote.RemoteActorRefProvider|akka.remote.netty.tcp.hostname = localhost|akka.remote.netty.tcp.port = 8888|""".stripMargin// 解析配置字符串,得到一个配置对象val config = ConfigFactory.parseString(conf)// 根据配置对象创建一个actor系统,命名为Hadoopval actorSystem = ActorSystem("Hadoop",config)// 在actor系统中创建一个slave actor,命名为masteractorSystem.actorOf(Props(new Master),"master")}
}
Worker实现
同样地,我们也需要定义一个Worker类,继承自Actor特质,并实现receive方法,用于处理收到的消息,并启动定时任务发送心跳的方法。所以还有写一个启动定时任务发送心跳的方法。Worker类也需要一个preStart方法,在启动前执行一些初始化操作,例如向Master注册自己的信息,并启动定时任务发送心跳包。
需要导入的包
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactoryimport java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
class Worker
class Slave extends Actor {// 定义一个slave_id变量,赋值为"slave1",表示从节点的idval slave_id = "slave1"// 定义一个cores变量,赋值为30,表示从节点的核心数val cores = 30// 定义一个memory变量,赋值为"64G",表示从节点的内存大小val memory = "64G"// 重写receive方法,它返回一个Receive类型的偏函数override def receive: Receive = {// 匹配RegisterSuccess消息,表示注册成功case RegisterSuccess =>// 打印一条信息,表示从节点注册成功println(s"${slave_id} register successfully!!!!!!!!!!")// 调用sendHeartBeat方法,向sender发送心跳消息sendHeartBeat()}// 这是一个sendHeartBeat方法,它没有参数和返回值def sendHeartBeat():Unit={/ initialDelay: FiniteDuration, // 这是一个有限的持续时间,表示第一次发送心跳的延迟时间interval: FiniteDuration, // 这是一个有限的持续时间,表示发送心跳的间隔时间receiver: ActorRef, // 这是一个actor引用,表示接收心跳的actormessage: Any) // 这是一个任意类型的值,表示心跳的内容*/// 导入context.dispatcher,它是一个执行器,用于执行定时任务import context.dispatcher// 调用context.system.scheduler.schedule方法,创建一个定时任务context.system.scheduler.schedule(// 设置第一次发送心跳的延迟时间为3秒Duration(3,TimeUnit.SECONDS),// 设置发送心跳的间隔时间为3秒Duration(3,TimeUnit.SECONDS),// 设置接收心跳的actor为sender,即调用该方法的actorsender(),// 设置心跳的内容为HeartBeat(slave_id),其中slave_id是一个变量,表示从节点的idHeartBeat(slave_id))}// 重写preStart方法,它没有参数和返回值override def preStart(): Unit = {// 创建一个RegisterClass对象,它包含从节点的id,注册时间,核心数和内存大小val register = RegisterClass(slave_id, System.currentTimeMillis(), cores, memory)// 创建一个proxy对象,它是一个actor选择器,用于选择远程的master actorval proxy = context.actorSelection("akka.tcp://Hadoop@localhost:8888/user/master")// 通过proxy向master actor发送register消息,表示请求注册proxy ! register}
}
最后,我们需要定义一个Worker对象,用于启动Worker并创建Worker实例。我们配置Worker和Master的地址和端口,以及Worker的核数和内存等参数,并根据这些参数创建一个配置对象。然后,我们需要使用这个配置对象创建一个ActorSystem,并使用它创建一个Worker实例。
object Worker
object Slave {def main(args: Array[String]): Unit = {// 定义一个配置字符串,包含actor的提供者,主机名和端口号val conf ="""|akka.actor.provider = akka.remote.RemoteActorRefProvider|akka.remote.netty.tcp.hostname = localhost|akka.remote.netty.tcp.port = 6666|""".stripMargin// 解析配置字符串,得到一个配置对象val config = ConfigFactory.parseString(conf)// 根据配置对象创建一个actor系统,命名为Hadoop2val actorSystem = ActorSystem("Hadoop2",config)// 在actor系统中创建一个slave actor,命名为slave1actorSystem.actorOf(Props(new Slave),"slave1")}
}
运行master
运行第一个slave
查看master
运行第二个slave
点击Edit Configurations
点击Modify options
选择Allow
然后点击apply
修改为slave2,端口改为6662
查看master
运行三个slave
修改同slave2
查看master
关闭slave3
等待30秒查看结果
运行结果
我们可以分别运行Master和Slave对象,观察控制台输出。我们可以看到以下结果:
Master启动,Slave1启动后
,
Master的控制台
打印出"worker:slave1 is registering… current workers 1"
Slave1的控制台
打印出"slave1 register successfully!!!"
表明slave1注册成功
,随后master控制台打印出
"slave1 is responsive and functional.
Current online workers is 1
slave1 is responsive and functional.
Current online workers is 1
slave1 is responsive and functional.
Current online workers is 1
…"
表明slave1按时发送心跳检测
slave2启动
Master的控制台
打印出"worker:slave2 is registering… current workers 2"
Slave2的控制台
打印出"slave2 register successfully!!!"
表明slave2注册成功
,随后master控制台打印出
"slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
…"
表明slave1和slave2按时发送心跳检测
slave3启动
Master的控制台
打印出"worker:slave3 is registering… current workers 3"
Slave3的控制台
打印出"slave3 register successfully!!!"
表明slave3注册成功
,随后master控制台打印出
“slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave3 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave3 is responsive and functional.
Current online workers is 3
…”
表明slave1,slave2和slave3按时发送心跳检测
slave3关闭
Master的控制台
打印出"slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
…"
可以看出此时只有slave1和slave2在发送心跳
,但Current online workers is 3
,说明master还没有检测出slave3已经关闭
.
待master心跳机制检测
出来后,master控制台
打印出
“slave3 is timeout , removed from mater !!!
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
…”
表明slave1,slave2按时发送心跳检测,slave3已经超时
,删除
总结
本文介绍了如何使用Akka模拟Spark的Master与Worker通信过程。首先定义了一些消息类型,然后分别实现了Master和Worker类,并使用ActorSystem来创建和管理它们。我们还使用ActorRef、ActorSelection、Props等对象来发送和接收消息。我们模拟了Worker向Master注册、Master回复注册成功、Worker发送心跳、Master更新状态信息、Master检查Worker是否超时等步骤。Akka是一个强大的框架,可以帮助我们构建高并发、分布式和容错的系统。