This time, we changed the source code of ThriftServer and added some services. In the middle of the change, we encountered such a problem. When we submitted tasks asynchronously, we wanted to make them multithreaded. At the beginning, we used scala's Actor, which passed sqlcontext and sql. We found that every sparkSessionId changed all the time, and the sessionId generated after each submission and triggering Action was inconsistent. What's the matter? Later, we found that it was multiple For the problem of thread asynchrony, passing sqlcontext to execute tasks on the other side of the thread will trigger a session again. What can we do? It can only be implemented in the following ways
java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Callable<Void>() { @Override public Void call(){ df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class); return null; } }); executorService.submit(new Callable<Void>() { @Override public Void call(){ df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class); return null; } }); executorService.shutdown();
Use the context variable inside the current method
Of course, in order to facilitate your study, another common way of writing is as follows:
import java.util.concurrent.{Executors, ExecutorService} object Test { def main(args: Array[String]) { //Create thread pool val threadPool:ExecutorService=Executors.newFixedThreadPool(5) try { //Commit 5 threads for(i <- 1 to 5){ //threadPool.submit(new ThreadDemo("thread"+i)) threadPool.execute(new ThreadDemo("thread"+i)) } }finally { threadPool.shutdown() } } //Define thread class, sleep for 100ms every time printing class ThreadDemo(threadName:String) extends Runnable{ override def run(){ for(i <- 1 to 10){ println(threadName+"|"+i) Thread.sleep(100) } } } }
Callable example
import java.util.concurrent.{Callable, FutureTask, Executors, ExecutorService} object Test { def main(args: Array[String]) { val threadPool:ExecutorService=Executors.newFixedThreadPool(3) try { val future=new FutureTask[String](new Callable[String] { override def call(): String = { Thread.sleep(100) return "im result" } }) threadPool.execute(future) println(future.get()) }finally { threadPool.shutdown() } } }