introduction
Message flow
JobRequest flow
JobRequest
case class JobRequest(
taskMsg: JobMsg,
replyTo: ActorRef,
tranActor: ActorRef,
priority: Option[Int] = None,
jobMetaOpt: Option[JobMeta] = None
)
ReceiveActor
The AskProcessResult using ReceiveActor create a API to trigger jobrequest call of ( jobRequest: JobRequest, driver: ActorRef, receiveActor: ActorRef, timeout: Timeout) =>Future[JobResult], and ReceiveActor is “RequestActor” in the JobRequest work flow chat.
- AskProcessResult
-
package doracore.api import akka.actor.{ActorRef, PoisonPill} import akka.event.slf4j.Logger import akka.util.Timeout import doracore.core.msg.Job.{JobRequest, JobResult, JobStatus} import doracore.tool.receive.ReceiveActor.{FetchResult, ProxyControlMsg} import scala.concurrent.{ExecutionContext, Future} import akka.pattern.ask import doracore.util.ProcessService.ProcessResult trait AskProcessResult { this: GetBlockIOExecutor => def getProcessCommandFutureResult( jobRequest: JobRequest, driver: ActorRef, receiveActor: ActorRef, timeout: Timeout ): Future[JobResult] = { driver.tell(jobRequest, receiveActor) getResult(receiveActor, timeout) } def getResult(receiveActor: ActorRef, timeout: Timeout): Future[JobResult] = { implicit val ex: ExecutionContext = getBlockDispatcher() implicit val timeoutValue: Timeout = timeout var result = JobResult(JobStatus.Unknown, "Unkonwn").asInstanceOf[Any] (receiveActor ? FetchResult()) .map { resultT => resultT.asInstanceOf[JobResult] } .recover { case ex: Throwable => val tName = Thread.currentThread.getName Logger.apply(this.getClass.getName).error(s"$tName=> Job timeout after $timeout") result = JobResult(JobStatus.TimeOut, ProcessResult(JobStatus.Failed, ex)) receiveActor ! ProxyControlMsg(result) Thread.sleep(100) result.asInstanceOf[JobResult] } .map { result => receiveActor ! ProxyControlMsg(PoisonPill) receiveActor ! PoisonPill result } } }
- ReceiveActor
-
package doracore.tool.receive import akka.actor.{ActorRef, Props} import doracore.base.BaseActor import doracore.core.driver.DriverActor.ProxyActorMsg import doracore.core.msg.Job.JobResult import doracore.tool.receive.ReceiveActor.{FetchResult, ProxyControlMsg, QueryResult} /** For doradilla.tool.receive in Doradilla * Created by whereby[Tao Zhou](187225577@qq.com) on 2019/4/13 */ class ReceiveActor extends BaseActor { var retriverActorOpt: Option[ActorRef] = None var jobResultOpt: Option[JobResult] = None var proxyActorOpt: Option[ActorRef] = None def sendBackReuslt() = { retriverActorOpt.get ! jobResultOpt.get } def handleFetchMsg() = { retriverActorOpt = Some(sender()) jobResultOpt match { case Some(jobResult) => sendBackReuslt() case _ => } } def handleJobResult(jobResult: JobResult) = { jobResultOpt = Some(jobResult) retriverActorOpt match { case Some(retriverActor) => sendBackReuslt() case _ => } } def handleProxyActorMsg(msg: ProxyActorMsg) = { proxyActorOpt = Some(msg.proxyActor) } def handleProxyControlMsg(proxyControlMsg: ProxyControlMsg) = { proxyActorOpt.map { proxyActor => proxyActor ! proxyControlMsg.proxyControlMsg } } def handleQueryResult() = { sender() ! jobResultOpt } override def receive: Receive = { case msg: FetchResult => handleFetchMsg() case jobResult: JobResult => handleJobResult(jobResult) case proxyActorMsg: ProxyActorMsg => handleProxyActorMsg(proxyActorMsg) case queryResult: QueryResult => handleQueryResult() case proxyControlMsg: ProxyControlMsg => handleProxyControlMsg(proxyControlMsg) } } object ReceiveActor { val receiveActorProps = Props(new ReceiveActor()) case class FetchResult() case class StopProxy() case class QueryResult() case class ProxyControlMsg(proxyControlMsg: Any) }
Cluster setup
Cluster feature is not applied in the dora lib.
1.8.0.7.006+7-a57987e5+20230629-1448*