你好,游客 登录
背景:
阅读新闻

8.3 Scala多线程示例

[日期:2021-09-27] 来源:  作者: [字体: ]

本文来自艾叔编著的《零基础快速入门Scala》免费电子书,添加文末艾叔微信,获取完整版的PDF电子书

8.3  Scala多线程示例

我们把程序运行过程中,所执行的代码序列,称为一条执行路径。多线程技术可以使得一个程序内出现多条执行路径,同一时间内,可能会有多条执行路径上的代码,同时运行。这样,可以充分利用多核、多处理器的资源,提高程序处理速度。

传统的单机版程序,从main开始,向下顺序执行,到最后退出,任何时候都只有一条代码在执行,因此,它只有一条执行路径,通常把这条执行路径称为主线,多线程程序,除了主线外,还有其它的线程同时执行。

简单的说,如果处理器有多个、或者多个核、或者支持超线程,那么使用多线程编程,其程序可以实现CPU利用率100%

此外,如果一个程序既要接受用户输入,又要完成其它的工作,那么最好设计成多线程,这样可以避免程序执行任务时,无法响应。

多线程需要处理器、操作系统、应用程序三方面支持。对于开发者来说,就是要使用多线程的接口和方法来编写程序。Scala提供了多线程编程接口,它实质上使用的是Java所提供的多线程接口,下面通过具体的示例来说明在Scala中如何进行多线程编程。

8.3.1  最简单的多线程编程:Runnable/Callable

本示例提供了一个最简单的多线程编程示例,使用的是Java所提供的Runnable/Callable接口,Runnable中定义了一个run的抽象方法,我们需要重写此run方法,线程执行时,就会执行此run方法。

本例启动了6个线程,也就是说,当此程序运行时,最多可能有6条执行路径在同时执行(前提是,CPU要有6个执行单元,例如6CPU、或者6个核、或者6个超线程,这样才能做到真正的并行,如果没有6个执行单元,操作系统会将时间片分配给各个线程来执行,因此,即使只有1个执行单元,也不会影响程序执行,但此时就不是6个线程真正的并行了)。

1行,定义变量num,用于线程编号;

2行,定义变量threadNum,赋值6,用于确定此次程序启动的线程格式;

4行,引入ArrayBuffer所在的package

5行,创建一个Thread类型的ArrayBuffer,赋值给threadList,每个Thread对象就代表一个线程;

6~24行,循环创建Runnable对象,每个对象有一个id成员变量,id=num,作为Runnable对象的编号,依次将其加入threadList中;

10~20行,重写Runnablerun方法,此方法会循环3遍,每一遍,打印I am thread加上id编号和循环编号,等待随机的时间,进入到下轮,每个线程启动后,就会执行run方法;

22行,将创建好的Runable对象传入Thread的构造函数,创建Thread对象,并将其对象引用加入到threadList中;

26~29行,所有Thread对象构建完后,使用for循环,调用start方法依次启动threadList中的线程,线程执行的代码是run方法,这样,会有6个线程同时在执行run方法。

      1 var num=0

      2 val threadNum=6

      3

      4 import scala.collection.mutable.ArrayBuffer

      5 val threadList = new ArrayBuffer[Thread]()

      6 while (num < threadNum) {

      7

      8   val threadRun = new Runnable {

      9     val id = num

     10     def run() = {

     11       var i=0

     12       while(i<3){

     13         println("I am thread " + id + " i=" + i)

     14         i+=1

     15         import scala.util.Random

     16         val rd = new Random()

     17         val sleepTime = Math.abs(rd.nextLong()) % 1000

     18         Thread.sleep(sleepTime)

     19       }

     20     }

     21   }

     22   threadList += new Thread(threadRun)

     23   num+=1

     24 }

     25

     26 for(t <- threadList){

     27   t.start()

     28   Thread.sleep(50)

     29 }

由执行结果可以看到,每个线程的输出是交错的,这说明这些线程是在并行执行的,它们的输出顺序和当时CPU的调度有关。

注意

  • 线程间是共享变量的,闭包特性对线程方法run同样适用,本例中,run方法共享第7行以上的变量(包括本级以及上级函数中所定义的变量);
  • 9num是外部变量,利用闭包特性可以访问,如果不使用闭包,可以直接扩展Runnable特质,在扩展子类的构造函数中设置参数列表,传参即可,后面会有具体的例子;
  • 特别注意第9num,如果把第9行放到第11行,即run方法的后面,会有怎样的结果?为什么?;
  • 本例中,待所有线程都创建好后,才统一启动线程,这样做的好处是,防止先启动的线程对后续还在创建的线程造成影响,因为线程间是共享变量的;

8.3.2  Executors实现线程池

线程池,顾名思义就是一个线程资源池,在这个资源池中,有事先创建好的多个线程,当有任务提交时,就从线程池中选择一个线程来执行此任务,任务完成后,再将此线程回收到线程池。

3.10.1中线程即时申请、退出销毁的机制相比,线程池有很多好处。

1)线程池的系统开销更小,每个线程的创建和销毁都需要系统开销,频繁地操作会导致系统开销增大,而线程池中的线程是事先创建好的,一旦创建,后续新任务的执行都不需要再创建,任务完成后,只需要回收线程,不需要销毁线程,因此,系统开销更小;

2)线程池可以控制并发规模,如果给每个任务创建一个线程,在任务量多的情况下,会同时创建很多线程,例如1000个并发任务,就会创建1000个线程,由于硬件限制,比如4CPU,真正并发的线程只有4个,CPU会在这1000个线程间来回切换,真正用于执行程序的CPU时间并不多,导致效率低下,而线程池在一开始创建就固定了线程的数量,不管任务有多少,都是由线程池中的线程去完成,因此,并发规模从一开始就是可控的,而且,由于线程并不多,不需要频繁切换,真正用于执行任务的CPU时间相对更多,效率更高。

  1. 线程池中使用没有返回值的线程

Scala线程池例子代码如下。

1行,设置线程池中线程数量poolSize4

2行,引入创建线程池所需要的packagejava.util.concurrent.Executors

3行,创建线程池,赋值给threadPool,它的类型是:ExecutorService

5~14行,创建任务MyRunnable,每个MyRunnable在构造函数传入i作为任务id,调用threadPool.execute将任务提交到线程池上执行,一共有8个任务(taskNum),而threadPool只有4个线程,因此,先执行4个任务(每个线程对应一个任务,这4个任务也不一定是真正的并行,要看能够并行处理的部件数),一旦有一个任务完成,就再递补一个任务执行,直到最后所有的任务完成;

15行,如果在执行过程中,出现异常,将进入到第15行处理,即关闭整个线程池;

16行,使用while循环,每等待500毫秒,然后使用threadPool.isTermintated来判断线程池中所有的任务是否完成,一旦完成,退出循环。

      1 val poolSize=4

      2 import java.util.concurrent.Executors

      3 val threadPool = Executors.newFixedThreadPool(poolSize)

      4

      5 var i = 0

      6 val taskNum=8

      7 try {

      8   while (i < taskNum) {

      9     threadPool.execute(new MyRunnable(i))

     10     i += 1

     11   }

     12 } finally {

     13   threadPool.shutdown()

     14 }

     15

     16 while(!threadPool.isTerminated){

     17   Thread.sleep(500)

     18 }

线程池中线程数和任务数不是同一个概念,可以分开设置。

线程函数体,即MyRunnable对象的run方法代码如下。

1行,定义Runnable的扩展类MyRunnable,并扩展构造函数的参数列表,用于传参idid代表每个任务的编号;

2~12行,重写run方法,循环3次,每次等待随机时间,打印任务编号和循环次数的编号;

8行,创建Random对象,它可以产生随机数,随机数的种子是系统时间,因此,每次调用,随机数种子都会变;

9行,调用rd.nextLong产生随机数,由于随机数可能会是负数,因此,使用Math.abs来取绝对值,然后对1000取余,这样产生的数字会在0~999之间,这样做的目的,主要是使得每个任务的运行时间不一样,这样,可以更好地看到线程间并发效果。

      1 class MyRunnable(id: Int) extends Runnable {

      2   def run() = {

      3     var i = 0

      4     while (i < 3) {

      5       println("I am thread " + id + " i=" + i)

      6       i += 1

      7       import scala.util.Random

      8       val rd = new Random()

      9       val sleepTime = Math.abs(rd.nextLong()) % 1000

     10       Thread.sleep(sleepTime)

     11     }

     12   }

     13 }

  1. 线程池中使用有返回值的线程

Runnable对象的run方法是没有返回值的,如果要使得线程执行完,将结果带回来,可以实现Callable对象,例子代码如下。

1~3行,创建线程池;

5行,设置任务编号变量i,初始值为0

6行,设置任务数taskNum=8

7行,引入Future所在的package,它用于线程返回值;

8行,创建线程返回值数组,元素类型是Future[String],其中String是泛型,可以自己修改,每个线程对应数组的一个元素;

9~16行,创建任务对象MyTaskMyTask扩展了Callable接口,并增加了构造函数的参数,用于传参;

11行,调用threadPool.submit提交任务,返回值是Future[String]

15行,如果在执行过程中,出现异常,将进入到第15行处理,即关闭整个线程池;

18~20行,依次等待每个线程执行完毕,打印其返回值。

      1 val poolSize=4

      2 import java.util.concurrent.Executors

      3 val threadPool = Executors.newFixedThreadPool(poolSize)

      4

      5 var i = 0

      6 val taskNum=8

      7 import java.util.concurrent.Future

      8 val taskAr = new Array[Future[String]](taskNum)

      9 try {

     10   while (i < taskNum) {

     11     taskAr(i) = threadPool.submit(new MyTask(i))

     12     i += 1

     13   }

     14 } finally {

     15   threadPool.shutdown()

     16 }

     17

     18 for(task <- taskAr){

     19   val rs = task.get()

     20   println(rs)

     21 }

MyTask扩展了Callable的代码如下。

1行,MyTask扩展Callable,它设置了一个构造函数参数id,作为任务编号,区分不同的任务,Callable后面的[String]是泛型,用于线程返回值类型;

2行,重写call方法,call是线程的函数体,其返回值类型由Callable的泛型决定;

3行,str为返回值;

4~10行,循环3次,每次等待随机时间,这样可以保证每个任务执行时间不一样;

12行,返回str,作为线程返回值,也就是前面代码中task.get()的结果。

      1 class MyTask(id: Int) extends Callable [String]{

      2   override def call() = {

      3     val str = "I am thread " + id

      4     var i = 0

      5     while (i < 3) {

      6       i += 1

      7       import scala.util.Random

      8       val rd = new Random()

      9       val sleepTime = Math.abs(rd.nextLong()) % 1000

     10       Thread.sleep(sleepTime)

     11     }

     12     str

     13   }

     14 }

 

加艾叔微信,加入Linux(Shell+Zabbix)、大数据(Spark+Hadoop)、云原生(Docker+Kubernetes)技术交流群

 

 

关注艾叔公众号,获取更多一手信息

 

 

 

 

收藏 推荐 打印 | 阅读:
相关新闻