list - Control main thread with multiple Future jobs in scala -
def fixture = new { val xyz = new xyz(spark) } val flist: scala.collection.mutable.mutablelist[scala.concurrent.future[dataset[row]]] = scala.collection.mutable.mutablelist[scala.concurrent.future[dataset[row]]]() //mutable list of future means list[future] test("test case") { val tasks = (i <- 1 10) { flist ++ scala.collection.mutable.mutablelist[scala.concurrent.future[dataset[row]]](future { println("executing task " + ) val ds = read(fixture.etlsparklayer,i) ds }) } thread.sleep(1000*4200) val futureoflist = future.sequence(flist)//list of future job in future sequence println(await.ready(futureoflist, duration.inf)) val await_result: seq[dataset[row]] = await.result(futureoflist, duration.inf) println("squares: " + await_result) futureoflist.oncomplete { case success(x) => println("success!!! " + x) case failure(ex) => println("failed !!! " + ex) } }
i executing 1 test case sequence of future list , list have collection of future.i trying execute same fuction multiple time parallely of using future in scala.in system 4 job start in 1 time after completion of 4 jobs next 4 job starting complete jobs. how start more 4 job @ time , how main thread wait complete future thread ? tried await.result , await.ready not able control main thread , main thread control m use thread.sleep concept.this program read rdbms table , write in elasticsearch. how control main thread main issue?
assuming use scala.concurrent.executioncontext.implicits.global
executioncontext
can tune number of threads described here:
specifically following system properties: scala.concurrent.context.minthreads
, scala.concurrent.context.numthreads
. scala.concurrent.context.maxthreads
, , scala.concurrent.context.maxextrathreads
otherwise, can rewrite code this:
import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent._ import java.util.concurrent.executors test("test case") { implicit val ec = executioncontext.fromexecutorservice(executorservice.newfixedthreadpool(numberofthreadsyouwant)) val afuture = future.traverse(1 10) { => future { println("executing task " + i) read(fixture.etlsparklayer,i) // if blocking operation may want consider wrapping in `blocking {}`-block. } } afuture.oncomplete(_ => ec.shutdownnow()) // test, , make sure pool gets cleaned val await_result: immutable.seq[dataset[row]] = await.result(afuture, 60.minutes) // or other timeout println("squares: " + await_result) }
Comments
Post a Comment