Message flow

msgflow

JobRequest flow

JobRequest

  case class JobRequest(
      taskMsg: JobMsg,
      replyTo: ActorRef,
      tranActor: ActorRef,
      priority: Option[Int] = None,
      jobMetaOpt: Option[JobMeta] = None
  )

JobMessage flow

ReceiveActor

The AskProcessResult using ReceiveActor create a API to trigger jobrequest call of ( jobRequest: JobRequest, driver: ActorRef, receiveActor: ActorRef, timeout: Timeout) =>Future[JobResult], and ReceiveActor is “RequestActor” in the JobRequest work flow chat.

AskProcessResult
package doracore.api

import akka.actor.{ActorRef, PoisonPill}
import akka.event.slf4j.Logger
import akka.util.Timeout
import doracore.core.msg.Job.{JobRequest, JobResult, JobStatus}
import doracore.tool.receive.ReceiveActor.{FetchResult, ProxyControlMsg}

import scala.concurrent.{ExecutionContext, Future}
import akka.pattern.ask
import doracore.util.ProcessService.ProcessResult

trait AskProcessResult {
  this: GetBlockIOExecutor =>
  def getProcessCommandFutureResult(
      jobRequest: JobRequest,
      driver: ActorRef,
      receiveActor: ActorRef,
      timeout: Timeout
  ): Future[JobResult] = {
    driver.tell(jobRequest, receiveActor)
    getResult(receiveActor, timeout)
  }
  def getResult(receiveActor: ActorRef, timeout: Timeout): Future[JobResult] = {
    implicit val ex: ExecutionContext  = getBlockDispatcher()
    implicit val timeoutValue: Timeout = timeout
    var result                         = JobResult(JobStatus.Unknown, "Unkonwn").asInstanceOf[Any]
    (receiveActor ? FetchResult())
      .map { resultT =>
        resultT.asInstanceOf[JobResult]
      }
      .recover { case ex: Throwable =>
        val tName = Thread.currentThread.getName
        Logger.apply(this.getClass.getName).error(s"$tName=> Job timeout after $timeout")
        result = JobResult(JobStatus.TimeOut, ProcessResult(JobStatus.Failed, ex))
        receiveActor ! ProxyControlMsg(result)
        Thread.sleep(100)
        result.asInstanceOf[JobResult]
      }
      .map { result =>
        receiveActor ! ProxyControlMsg(PoisonPill)
        receiveActor ! PoisonPill
        result
      }
  }
}
ReceiveActor
package doracore.tool.receive

import akka.actor.{ActorRef, Props}
import doracore.base.BaseActor
import doracore.core.driver.DriverActor.ProxyActorMsg
import doracore.core.msg.Job.JobResult
import doracore.tool.receive.ReceiveActor.{FetchResult, ProxyControlMsg, QueryResult}

/** For doradilla.tool.receive in Doradilla
  * Created by whereby[Tao Zhou](187225577@qq.com) on 2019/4/13
  */
class ReceiveActor extends BaseActor {
  var retriverActorOpt: Option[ActorRef] = None
  var jobResultOpt: Option[JobResult]    = None
  var proxyActorOpt: Option[ActorRef]    = None

  def sendBackReuslt() = {
    retriverActorOpt.get ! jobResultOpt.get
  }

  def handleFetchMsg() = {
    retriverActorOpt = Some(sender())
    jobResultOpt match {
      case Some(jobResult) => sendBackReuslt()
      case _               =>
    }
  }

  def handleJobResult(jobResult: JobResult) = {
    jobResultOpt = Some(jobResult)
    retriverActorOpt match {
      case Some(retriverActor) => sendBackReuslt()
      case _                   =>
    }
  }

  def handleProxyActorMsg(msg: ProxyActorMsg) = {
    proxyActorOpt = Some(msg.proxyActor)
  }

  def handleProxyControlMsg(proxyControlMsg: ProxyControlMsg) = {
    proxyActorOpt.map { proxyActor =>
      proxyActor ! proxyControlMsg.proxyControlMsg
    }
  }

  def handleQueryResult() = {
    sender() ! jobResultOpt
  }

  override def receive: Receive = {
    case msg: FetchResult                 => handleFetchMsg()
    case jobResult: JobResult             => handleJobResult(jobResult)
    case proxyActorMsg: ProxyActorMsg     => handleProxyActorMsg(proxyActorMsg)
    case queryResult: QueryResult         => handleQueryResult()
    case proxyControlMsg: ProxyControlMsg => handleProxyControlMsg(proxyControlMsg)
  }
}

object ReceiveActor {
  val receiveActorProps = Props(new ReceiveActor())
  case class FetchResult()
  case class StopProxy()
  case class QueryResult()
  case class ProxyControlMsg(proxyControlMsg: Any)
}