list - Control main thread with multiple Future jobs in scala -


def fixture =     new {        val xyz = new xyz(spark)     }     val flist: scala.collection.mutable.mutablelist[scala.concurrent.future[dataset[row]]] = scala.collection.mutable.mutablelist[scala.concurrent.future[dataset[row]]]() //mutable list of future means list[future]      test("test case") {              val tasks = (i <- 1 10) {           flist ++ scala.collection.mutable.mutablelist[scala.concurrent.future[dataset[row]]](future {             println("executing task " + )             val ds = read(fixture.etlsparklayer,i)                        ds           })         }          thread.sleep(1000*4200)         val futureoflist = future.sequence(flist)//list of future job in future sequence                println(await.ready(futureoflist, duration.inf))           val await_result: seq[dataset[row]] = await.result(futureoflist, duration.inf)         println("squares: " + await_result)          futureoflist.oncomplete {           case success(x) => println("success!!! " + x)           case failure(ex) => println("failed !!! " + ex)         }                      } 

i executing 1 test case sequence of future list , list have collection of future.i trying execute same fuction multiple time parallely of using future in scala.in system 4 job start in 1 time after completion of 4 jobs next 4 job starting complete jobs. how start more 4 job @ time , how main thread wait complete future thread ? tried await.result , await.ready not able control main thread , main thread control m use thread.sleep concept.this program read rdbms table , write in elasticsearch. how control main thread main issue?

assuming use scala.concurrent.executioncontext.implicits.global executioncontext can tune number of threads described here:

https://github.com/scala/scala/blob/2.12.x/src/library/scala/concurrent/impl/executioncontextimpl.scala#l100

specifically following system properties: scala.concurrent.context.minthreads, scala.concurrent.context.numthreads. scala.concurrent.context.maxthreads, , scala.concurrent.context.maxextrathreads

otherwise, can rewrite code this:

import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent._ import java.util.concurrent.executors  test("test case") {    implicit val ec = executioncontext.fromexecutorservice(executorservice.newfixedthreadpool(numberofthreadsyouwant))   val afuture = future.traverse(1 10) {     => future {       println("executing task " + i)       read(fixture.etlsparklayer,i) // if blocking operation may want consider wrapping in `blocking {}`-block.               }   }   afuture.oncomplete(_ => ec.shutdownnow()) // test, , make sure pool gets cleaned   val await_result: immutable.seq[dataset[row]] = await.result(afuture, 60.minutes) // or other timeout   println("squares: " + await_result)  } 

Comments

Popular posts from this blog

javascript - Clear button on addentry page doesn't work -

c# - Selenium Authentication Popup preventing driver close or quit -

tensorflow when input_data MNIST_data , zlib.error: Error -3 while decompressing: invalid block type -