To address this "local resource" problem what's needed is a singleton object - i.e. an object that's warranted to be instantiated once and only once in the JVM. Luckily, Scala object
provides this functionality out of the box.
The second thing to consider is that this singleton will provide a service to all tasks running on the same JVM where it's hosted, so, it MUST take care of concurrency and resource management.
Let's try to sketch(*) such service:
class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
def release() = pool.returnObject(socket)
}
// singleton object
object SocketPool {
var hostPortPool:Map[(String, Int),ObjectPool] = Map()
sys.addShutdownHook{
hostPortPool.values.foreach{ // terminate each pool }
}
// factory method
def apply(host:String, port:String): ManagedSocket = {
val pool = hostPortPool.getOrElse{(host,port), {
val p = ??? // create new pool for (host, port)
hostPortPool += (host,port) -> p
p
}
new ManagedSocket(pool, pool.borrowObject)
}
}
Then usage becomes:
val host = ???
val port = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val mSocket = SocketPool(host, port)
partition.foreach{elem =>
val os = mSocket.socket.getOutputStream()
// do stuff with os + elem
}
mSocket.release()
}
}
I'm assuming that the GenericObjectPool
used in the question is taking care of concurrency. Otherwise, access to each pool
instance need to be guarded with some form of synchronization.
(*) code provided to illustrate the idea on how to design such object - needs additional effort to be converted into a working version.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…