博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark异步job
阅读量:5123 次
发布时间:2019-06-13

本文共 3230 字,大约阅读时间需要 10 分钟。

What if we want to execute 2 actions concurrently on different RDD’s, Spark actions are always synchronous. Like if we perform two actions one after other they always execute in sequentially like one after other.

Let see example

1
2
3
4
val
rdd
=
sc.parallelize(List(
32
,
34
,
2
,
3
,
4
,
54
,
3
),
4
)
rdd.collect().map{ x
=
> println(
"Items in the lists:"
+ x)}
val
rddCount
=
sc.parallelize(List(
434
,
3
,
2
,
43
,
45
,
3
,
2
),
4
)
println(
"Number of items in the list"
+ rddCount.count())

In the above exmaple 2 actions are perform one after other collect and count, both are execute synchronous. So count will always execute after collect will finish. The out of the above code is as follows

Now question is if we want to run spark jobs concurrently in async fashion.

So for above question answer is simple apache spark also provide a asyn action for concurrent execution of jobs, Few Asynchronous actions spark provide as follows

collectAsync() -> Returns a future for retrieving all elements of this RDD.

countAsync() -> Returns a future for counting the number of elements in the RDD.
foreachAsync(scala.Function1<T,scala.runtime.BoxedUnit> f) -> Applies a function f to all elements of this RDD.
foreachPartitionAsync(scala.Function1<scala.collection.Iterator,scala.runtime.BoxedUnit> f) ->
Applies a function f to each partition of this RDD.
takeAsync(int num) -> Returns a future for retrieving the first num elements of the RDD.

Now let us see what happen when we use async actions.

1
2
3
4
val
rdd
=
sc.parallelize(List(
32
,
34
,
2
,
3
,
4
,
54
,
3
),
4
)
rdd.collectAsync().map{ x
=
> x.map{x
=
> println(
"Items in the list:"
+x)} }
val
rddCount
=
sc.parallelize(List(
434
,
3
,
2
,
43
,
45
,
3
,
2
),
4
)
rddCount.countAsync().map { x
=
>println(
"Number of items in the list: "
+x) }

So output of the above code is as follows

You can see in above output the result of the second job is come first because first job return future and execute second one but still have you noticed that jobs are execute one after other that’s means a job use all resources of cluster so another job will delayed.

So for take full advantage of Asynchronous jobs we need to configure job scheduler.

Job Scheduling

By default spark scheduler run spark jobs in FIFO (First In First Out) fashion. In FIFO scheduler the priority is given to the first job and then second and so on. If the jobs is not using whole cluster then second job is also run parallel but if first job is too big then second job will wait soo long even it take too less to execute. So for solution spark provide fair scheduler, fair scheduler jobs will execute in “round robin” fashion.

To configure job scheduler we need to set configuration for it as follows

val conf = new SparkConf().setAppName("spark_auth").setMaster("local[*]").set("spark.scheduler.mode", "FAIR")

After configure FAIR scheduling you can see both the jobs are running concurrently and share resources of the spark cluster.

So after this the out of the above code is as follows

You can see in above result both jobs are running concurrently. The result of both the actions are not wait for each other.

For above code you can checkout: 

转载于:https://www.cnblogs.com/shexinwei/p/5148573.html

你可能感兴趣的文章
pair的例子
查看>>
前端框架性能对比
查看>>
uva 387 A Puzzling Problem (回溯)
查看>>
12.2日常
查看>>
同步代码时忽略maven项目 target目录
查看>>
Oracle中包的创建
查看>>
团队开发之个人博客八(4月27)
查看>>
发布功能完成
查看>>
【原】小程序常见问题整理
查看>>
C# ITextSharp pdf 自动打印
查看>>
【Java】synchronized与lock的区别
查看>>
django高级应用(分页功能)
查看>>
【转】Linux之printf命令
查看>>
关于PHP会话:session和cookie
查看>>
STM32F10x_RTC秒中断
查看>>
display:none和visiblity:hidden区别
查看>>
C#double转化成字符串 保留小数位数, 不以科学计数法的形式出现。
查看>>
SpringMVC学习总结(三)——Controller接口详解(1)
查看>>
牛的障碍Cow Steeplechase
查看>>
Zookeeper选举算法原理
查看>>