I have Scala code that is used to run a spark job. At runtime the main class uses reflection to load all implementations of an abstract class that can be run as spark jobs and selects the correct one based on a command line parameter e.g. spark-submit etl.jar job-name-i-want-to-run
. I am running with spark 3.0.1 and Scala 2.12.10. It works perfectly when I try to run it with spark-submit, but I attempted to write a unit test (using scalatest v3.2.2) and it fails with a NoSuchFieldException
on the job.getField("MODULE$")
piece for subclasses of SparkJob
that are declared in a package inside the test
folder. Note that the implementation is done in an object
, not a class. So my question is: why does it work for subclasses in the main folder, but fails for subclasses declared in the test package (specifically for testing this functionality only)?
import org.reflections.Reflections
object SparkJobLauncher {
def runSparkJob(args: Array[String]): Unit = {
val availableJobs = loadSparkJobs()
availableJobs(args(0)).run(args)
}
private def loadSparkJobs(): Map[String, SparkJob] = {
new Reflections("com.parent.package")
.getSubTypesOf(classOf[SparkJob])
.asScala
.map(job => {
val instance = job.getField("MODULE$").get(null).asInstanceOf[SparkJob]
(instance.name, instance)
})
.filter(_ != null)
.toMap
}
}
The implementation of the SparkJob abstract class looks like this:
abstract class SparkJob {
def name: String
def run(args: Array[String]): Unit
// ... other helper methods ...
}
And a job might look something like this:
object MyEtlJob extends SparkJob {
override def name = "my-etl"
override def run(args: Array[String]): Unit = {
// do spark stuff...
}
}
And the unit test is pretty straightforward:
class SparkJobLauncherTest extends AnyFunSuite {
test("Run job") {
SparkJobLauncher.runSparkJob(Array("test"))
}
}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…