Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
632 views
in Technique[技术] by (71.8m points)

python - How to run multiple jobs in one Sparkcontext from separate threads in PySpark?

It is understood from Spark documentation about Scheduling Within an Application:

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."

I could found few example code of the same in Scala and Java. Can somebody give an example of how this can be implemented using PySpark?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I was running into the same issue, so I created a tiny self-contained example. I create multiple threads using python's threading module and submit multiple spark jobs simultaneously.

Note that by default, spark will run the jobs in First-In First-Out (FIFO): http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. In the example below, I change it to FAIR scheduling

# Prereqs:
# set 
# spark.dynamicAllocation.enabled         true
# spark.shuffle.service.enabled           true
  spark.scheduler.mode                    FAIR
# in spark-defaults.conf

import threading
from pyspark import SparkContext, SparkConf

def task(sc, i):
  print sc.parallelize(range(i*10000)).count()

def run_multiple_jobs():
  conf = SparkConf().setMaster('local[*]').setAppName('appname')
  # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
  conf.set('spark.scheduler.mode', 'FAIR')
  sc = SparkContext(conf=conf)
  for i in range(4):
    t = threading.Thread(target=task, args=(sc, i))
    t.start()
    print 'spark task', i, 'has started'


run_multiple_jobs()

Output:

spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0 
10000
20000

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...