Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
89 views
in Technique[技术] by (71.8m points)

spark-streaming and connection pool implementation

The spark-streaming website at https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams mentions the following code:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

I have tried to implement this using org.apache.commons.pool2 but running the application fails with the expected java.io.NotSerializableException:

15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
 ...

I am wondering how realistic it is to implement a connection pool that is serializable. Has anyone succeeded in doing this ?

Thank you.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

To address this "local resource" problem what's needed is a singleton object - i.e. an object that's warranted to be instantiated once and only once in the JVM. Luckily, Scala object provides this functionality out of the box.

The second thing to consider is that this singleton will provide a service to all tasks running on the same JVM where it's hosted, so, it MUST take care of concurrency and resource management.

Let's try to sketch(*) such service:

class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
   def release() = pool.returnObject(socket)
}

// singleton object 
object SocketPool {
    var hostPortPool:Map[(String, Int),ObjectPool] = Map()
    sys.addShutdownHook{
        hostPortPool.values.foreach{ // terminate each pool } 
    }

    // factory method
    def apply(host:String, port:String): ManagedSocket = {
        val pool = hostPortPool.getOrElse{(host,port), {
            val p = ??? // create new pool for (host, port)
            hostPortPool += (host,port) -> p
            p
        }
        new ManagedSocket(pool, pool.borrowObject)
    }
}

Then usage becomes:

val host = ???
val port = ???
stream.foreachRDD { rdd =>
    rdd.foreachPartition { partition => 
        val mSocket = SocketPool(host, port)
        partition.foreach{elem => 
            val os = mSocket.socket.getOutputStream()
            // do stuff with os + elem
        }
        mSocket.release()
    }
}

I'm assuming that the GenericObjectPool used in the question is taking care of concurrency. Otherwise, access to each pool instance need to be guarded with some form of synchronization.

(*) code provided to illustrate the idea on how to design such object - needs additional effort to be converted into a working version.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...