Named job
What’s named job
By default doradilla will create a cluster based JobApi to process anonymous job at initialization stage.
When received a named job, the doradilla will create a named JobApi to handled the job.
The code implementation see Named job Runner:
private def getNamedJobApi(jobName:String):JobApi={
namedJobApiMap.get(jobName) match {
case Some(jobApi) => jobApi
case _=> val jobApi: JobApi = new JobApi(Some( getActorSystem()))
namedJobApiMap +=(jobName ->jobApi)
jobApi
}
}
def runNamedProcessCommand(processJob: JobMsg,
jobName:String,
timeout: Timeout = ConstVars.longTimeOut,
priority: Option[Int] = None)(implicit ex: ExecutionContext): Future[JobResult] = {
val jobApi = getNamedJobApi(jobName)
val receiveActor = jobApi.actorSystem.actorOf(ReceiveActor.receiveActorProps, CNaming.timebasedName("Receive"))
val processJobRequest = JobRequest(processJob, receiveActor, jobApi.processTranActor, priority)
getProcessCommandFutureResult(processJobRequest, jobApi.defaultDriver, receiveActor,timeout)
}
Usage
package app
import doracore.ActorTestClass
import doracore.core.msg.Job.JobMsg
import doracore.util.{ProcessService, ProcessServiceSpec}
import doracore.vars.ConstVars
import doradilla.back.BackendServer
import doradilla.conf.TestVars
import org.scalatest.Matchers
import scala.concurrent.Await
/**
* For app in doradilla
* Created by whereby[Tao Zhou](187225577@qq.com) on 2019/12/14
*/
class NamedJobRunnerSpec extends ActorTestClass with Matchers {
ProcessService.nameToClassOpt = ProcessServiceSpec.safeProcessServiceNameToClassOpt
import scala.concurrent.ExecutionContext.Implicits.global
val timeout = ConstVars.timeout1S * 4
"Named Job Runner" should {
"start new driver when name is different" in {
val job1 = TestVars.sleepProcessJob
BackendServer.runNamedProcessCommand(job1, "job1")
val job2 = TestVars.processJob
val resultFuture = BackendServer.runNamedProcessCommand(job2, "job2")
val result = Await.ready(resultFuture, timeout)
println(result)
}
"Named Job Runner" should {
"start new driver when name is different but will failed without fsm " in {
val job1 = TestVars.sleepProcessJob
BackendServer.runNamedProcessCommand(job1, "job1")
val job2 = TestVars.processJob
BackendServer.changeFSMForNamedJob("job2", -1)
val resultFuture = BackendServer.runNamedProcessCommand(job2, "job2")
var timeOut = false
try {
val result = Await.ready(resultFuture, timeout)
println(result)
} catch {
case exception: Exception =>
timeOut = true
println(exception)
}
timeOut shouldBe (true)
}
}
"use same driver when name same" in {
val job1 = TestVars.sleepProcessJob
val result1Future = BackendServer.runNamedProcessCommand(job1, "job3")
val job2 = TestVars.processJob
val resultFuture = BackendServer.runNamedProcessCommand(job2, "job3")
var timeOut = false
try {
val result = Await.ready(resultFuture, timeout)
println(result)
} catch {
case exception: Exception =>
timeOut = true
println(exception)
}
timeOut shouldBe (true)
}
"use same driver when name same with increased fsm " in {
val job1 = TestVars.sleepProcessJob
BackendServer.changeFSMForNamedJob("job4", 1)
val result1Future = BackendServer.runNamedProcessCommand(job1, "job4")
val job2 = TestVars.processJob
val resultFuture = BackendServer.runNamedProcessCommand(job2, "job4")
var timeOut = false
try {
val result = Await.ready(resultFuture, timeout)
println(result)
} catch {
case exception: Exception =>
timeOut = true
println(exception)
}
timeOut shouldBe (false)
}
}
}
FSMActor number control for named JobApi
For named JobApi, there will create only one FSMActor, you can increase and decrease FSMActor by use api as below:
def changeFSMForNamedJob(jobName: String, num:Int)={
val jobApi = getNamedJobApi(jobName)
if(num >0){
jobApi.defaultDriver ! FSMIncrease(num)
}else{
jobApi.defaultDriver ! FSMDecrease(Math.abs(num))
}
}
The source code for this page can be found here.