Small problems of Scala multithreading in spark process

This time, we changed the source code of ThriftServer and added some services. In the middle of the change, we encountered such a problem. When we submitted tasks asynchronously, we wanted to make them multithreaded. At the beginning, we used scala's Actor, which passed sqlcontext and sql. We found that every sparkSessionId changed all the time, and the sessionId generated after each submission and triggering Action was inconsistent. What's the matter? Later, we found that it was multiple For the problem of thread asynchrony, passing sqlcontext to execute tasks on the other side of the thread will trigger a session again. What can we do? It can only be implemented in the following ways

java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Callable<Void>() {
@Override
public Void call(){
        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class);
        return null;
}
});
executorService.submit(new Callable<Void>() {
@Override
public Void call(){
        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class);
        return null;
}
});

executorService.shutdown();

Use the context variable inside the current method

Of course, in order to facilitate your study, another common way of writing is as follows:

import java.util.concurrent.{Executors, ExecutorService}

 object Test {
     def main(args: Array[String]) {
//Create thread pool
val threadPool:ExecutorService=Executors.newFixedThreadPool(5)
try {
//Commit 5 threads
for(i <- 1 to 5){
//threadPool.submit(new ThreadDemo("thread"+i))
threadPool.execute(new ThreadDemo("thread"+i))
             }
         }finally {
             threadPool.shutdown()
         }
     }

//Define thread class, sleep for 100ms every time printing
class ThreadDemo(threadName:String) extends Runnable{
         override def run(){
for(i <- 1 to 10){
                 println(threadName+"|"+i)
                 Thread.sleep(100)
             }
         }
     }
 }

Callable example

import java.util.concurrent.{Callable, FutureTask, Executors, ExecutorService}

object Test {
  def main(args: Array[String]) {
    val threadPool:ExecutorService=Executors.newFixedThreadPool(3)
    try {
      val future=new FutureTask[String](new Callable[String] {
        override def call(): String = {
          Thread.sleep(100)
          return "im result"
        }
      })
      threadPool.execute(future)
      println(future.get())
    }finally {
      threadPool.shutdown()
    }
  }
}

 

Keywords: Big Data Java Hadoop Scala SQL

Added by 88fingers on Wed, 11 Dec 2019 17:15:14 +0200