Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
477 views
in Technique[技术] by (71.8m points)

r - Transfer data from database to Spark using sparklyr

I have some data in a database, and I want to work with it in Spark, using sparklyr.

I can use a DBI-based package to import the data from the database into R

dbconn <- dbConnect(<some connection args>)
data_in_r <- dbReadTable(dbconn, "a table") 

then copy the data from R to Spark using

sconn <- spark_connect(<some connection args>)
data_ptr <- copy_to(sconn, data_in_r)

Copying twice is slow for big datasets.

How can I copy data directly from the database into Spark?

sparklyr has several spark_read_*() functions for import, but nothing database related. sdf_import() looks like a possibility, but it isn't clear how to use it in this context.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Sparklyr >= 0.6.0

You can use spark_read_jdbc.

Sparklyr < 0.6.0

I hope there is a more elegant solution out there but here is a minimal example using low level API:

  • Make sure that Spark has access to the required JDBC driver, for example by adding its coordinates to spark.jars.packages. For example with PostgreSQL (adjust for current version) you could add:

    spark.jars.packages org.postgresql:postgresql:9.4.1212
    

    to SPARK_HOME/conf/spark-defaults.conf

  • Load data and register as temporary view:

    name <- "foo"
    
    spark_session(sc) %>% 
      invoke("read") %>% 
      # JDBC URL and table name
      invoke("option", "url", "jdbc:postgresql://host/database") %>% 
      invoke("option", "dbtable", "table") %>% 
      # Add optional credentials
      invoke("option", "user", "scott") %>%
      invoke("option", "password", "tiger") %>% 
      # Driver class, here for PostgreSQL
      invoke("option", "driver", "org.postgresql.Driver") %>% 
      # Read and register as a temporary view
      invoke("format", "jdbc") %>% 
      invoke("load") %>% 
      # Spark 2.x, registerTempTable in 1.x
      invoke("createOrReplaceTempView", name)
    

    You can pass multiple options at once using an environment:

    invoke("options", as.environment(list(
      user="scott", password="tiger", url="jdbc:..."
    )))
    
  • Load temporary view with dplyr:

    dplyr::tbl(sc, name)
    
  • Be sure to read about further JDBC options, with focus on partitionColumn, *Bound and numPartitions.

  • For additional details see for example How to use JDBC source to write and read data in (Py)Spark? and How to improve performance for slow Spark jobs using DataFrame and JDBC connection?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...