博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Scala使用Akka模拟RPC机制代码2
阅读量:6329 次
发布时间:2019-06-22

本文共 5941 字,大约阅读时间需要 19 分钟。

RemoteMessage.scala

//对象要序列化才能通过网络传输   这个地方没有大括号....这有这个extends声明trait RemoteMessage extends Serializable//Worker ->(发送给) Master  Worker给Master节点发送注册消息case class RegisterWorker(id:String, memory:Int, scores:Int ) extends RemoteMessage//接收已经在Master注册过的Worker定时发送来的心跳  Woker->Mastercase class Heartbeat(id:String) extends RemoteMessage//Master -> Worker   Master反馈给Worker已经注册的Worker有哪些case class RegisteredWorker(masterUrl:String) extends RemoteMessage//Worker -> self  //Worker先向自己发送一下心跳,这个地方可以做一下判断,根据实际情况,定义什么时候向Master发送消息//在这个样例类中发送给Master的心跳  case class Heartbeatcase object SendHeartbeat//Master -> self  Master自检查看Worker的状态case object CheckTimeOutWorker

 

WorkerInfo.scala

class WorkerInfo(val id:String,val memory:Int,val scores:Int) {  //记录上一次心跳  var lastHeartbeatTime : Long = _}

 

Worker.scala

import java.util.UUIDimport com.typesafe.config.ConfigFactoryimport akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsimport akka.actor.ActorSelectionimport scala.concurrent.duration._  //如果不引入这个包 在 context.system.scheduler.schedule 中的mills 地方会报错//使用Akka框架来实现RCP不同进程之间的方法调用   导入Actor是导入的akka包下的//在Worker类上定义一个主构造器class Worker (val masterHost:String ,val masterPort:Int ,val memory:Int ,val cores: Int) extends Actor {  var master : ActorSelection = _  //记录Worker从节点的id  val workerId = UUID.randomUUID().toString()  //记录Worker从节点的发送心跳的时间间隔  val HEART_INTERVAL = 10000    //preStart()方法在构造器之后,在receive之前执行  actorOf()方法执行 就会执行生命周期方法  override def preStart() : Unit={    //跟Master建立连接    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")    //向Master发送注册消息    master ! RegisterWorker(workerId,memory,cores)  }    override def receive: Receive = {    //Master对注册的Worker存储注册之后需要向Worker发送消息 来说明该Woker注册成功    //此处是Worker节点接收到Master节点的反馈消息    case RegisteredWorker(masterUrl) => {      println(masterUrl)      //启动定时器发送心跳      import context.dispatcher      //多长时间后执行 单位 ,多长时间执行一次 单位  ,消息的接受者(直接给master发不好,先给自己发送消息,以后可以做下判断,什么情况下再发送消息), 消息      context.system.scheduler.schedule(0 millis, HEART_INTERVAL millis, self, SendHeartbeat)    }    //Worker先向自己self发送一下心跳,这个地方可以做一下判断,根据实际情况,定义什么时候向Master发送消息    case SendHeartbeat =>{      println("send heartbeat to Master")      master ! Heartbeat(workerId)    }  }}object Worker{  def main(args: Array[String]): Unit = {    //从传入的参数列表中取得    val host = args(0)    val port = args(1).toInt    val masterHost = args(2)    val masterPort = args(3).toInt    val memory = args(4).toInt    val cores = args(5).toInt    //为了通过ActorSystem老大来获得Actor  准备配置    val configStr =       s"""         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"         |akka.remote.netty.tcp.hostname = "$host"         |akka.remote.netty.tcp.port = "$port"        """.stripMargin    val config = ConfigFactory.parseString(configStr)    //ActorSystem是所有Actor的老大,辅助创建和监控下面的Actor, 该老大是单例的.    //得到该老大的对象需要一个名字和配置对象config    val actorSystem = ActorSystem("WorkerSystem",config)     //actorOf方法执行,就会执行生命周期方法    actorSystem.actorOf(Props(new Worker(masterHost, masterPort, memory, cores)),"Worker")    actorSystem.awaitTermination()  }}

 

Master.scala

import com.typesafe.config.ConfigFactoryimport akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsimport scala.collection.mutableimport scala.concurrent.duration._class Master(val host:String,val port:Int) extends Actor{    //定义一个HashMap用来存储注册Worker的信息  val idToWorkerHashMap = new mutable.HashMap[String,WorkerInfo]()  //定义一个HashSet用于存储 注册的Worker对象 WorkerInfo  val workersSet = new mutable.HashSet[WorkerInfo]() //使用set删除快, 也可用linkList  //超时检查的间隔  这个时间间隔一定要大于Worker向Master发送心跳的时间间隔.  val CHECK_INTERVAL = 15000    override def preStart() :Unit = {    println("preStart invoked")    //导入隐式转换    //使用timer太low了,可以使用akka的定时器,需要导入这个包    import context.dispatcher     context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)  }  //接收Worker发送来的消息  可以有注册RegisterWorker 心跳Heartbeat  override def receive :Receive = {    case RegisterWorker(id,memory,cores) =>{      //判断该Worker是否已经注册过       if(!idToWorkerHashMap.contains(id)){        //把Worker的信息封装起来保存到内存中        val workerInfo = new WorkerInfo(id,memory,cores)        //以id为键  worker对象为值 添加到该idToWorker的HashMap对象中        idToWorkerHashMap(id) = workerInfo         //把workerInfo对象放到存储Woker的HashSet中        workersSet += workerInfo        //对注册的Worker存储注册之后需要向Worker发送消息 来说明该Woker注册成功        sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")//通知worker注册      }    }    //接收已经在Master注册过的Worker定时发送来的心跳     case Heartbeat(id) =>{      if(idToWorkerHashMap.contains(id)){        val workerInfo = idToWorkerHashMap(id)        //报活  把该Worker给Master心跳的时间记录下来        val currentTime = System.currentTimeMillis()        workerInfo.lastHeartbeatTime = currentTime      }    }    //Master节点主动检测在Master自己这里注册Worker节点的存活状态    case CheckTimeOutWorker =>{      val currentTime = System.currentTimeMillis();      //如果当前时间和最近一次心跳时间的差值大于检测的时间间隔 就要干掉这个Worker      val toRemove = workersSet.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)      for(w <- toRemove){        workersSet -= w        idToWorkerHashMap -= w.id      }      println(workersSet.size)          }  }}object Master{  def main(args: Array[String]): Unit = {    val host = args(0)    val port = args(1).toInt    //准备配置    val configStr =       s"""         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"         |akka.remote.netty.tcp.hostname = "$host"         |akka.remote.netty.tcp.port = "$port"                """.stripMargin    //ConfigFactory 既有parseString 也有parseFile     val config = ConfigFactory.parseString(configStr)    //ActorSystem是所有Actor的老大,辅助创建和监控下面的所有Actor,它是单例的    val actorSystem = ActorSystem("MasterSystem",config)    val master = actorSystem.actorOf(Props(new Master(host,port)),"Master")    actorSystem.awaitTermination()  }}

另外一个版本:

转载于:https://www.cnblogs.com/DreamDrive/p/6740440.html

你可能感兴趣的文章
新形势下初创B2B行业网站如何经营
查看>>
初心大陆-----python宝典 第五章之列表
查看>>
java基础学习2
查看>>
sysbench使用笔记
查看>>
有关电子商务信息的介绍
查看>>
NFC·(近距离无线通讯技术)
查看>>
nginx 禁止某个IP访问立网站的设置方法
查看>>
多线程基础(三)NSThread基础
查看>>
PHP的学习--Traits新特性
查看>>
ubuntu下,py2,py3共存,/usr/bin/python: No module named virtualenvwrapper错误解决方法
查看>>
Ext.form.field.Number numberfield
查看>>
异地多活数据中心项目
查看>>
Linux文件夹分析
查看>>
解决部分月份绩效无法显示的问题:timestamp\union al\autocommit等的用法
查看>>
CRT + lrzsz 进行远程linux系统服务器文件上传下载
查看>>
nginx 域名跳转 Nginx跳转自动到带www域名规则配置、nginx多域名向主域名跳转
查看>>
man openstack >>1.txt
查看>>
linux几大服务器版本大比拼
查看>>
在BT5系统中安装postgresQL
查看>>
Can't connect to MySQL server on 'localhost'
查看>>