Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
150 views
in Technique[技术] by (71.8m points)

python - unable to connect to mongodb with ssl using pyspark

pyspark version : 2.3.4 mongodb : 4.2

I have set up ssl for my mongodb and Now I am trying to connect mongodb with SSL using pyspark

my sample code:

from pyspark.sql import SparkSession

my_spark = SparkSession 
    .builder 
    .appName("mySparkMongoJob") 
    .config("spark.mongodb.input.uri", "mongodb://admin:[email protected]:27017/db_name.collection?authSource=admin&ssl=true") 
    .getOrCreate()

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

df.show()

then I have done

keytool -import -file /opt/certs/mdb.crt -alias mongodb -keystore /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/security/cacerts

then I have followed these steps in this link: How to generate keystore and truststore to generate keystore and truststore.

And when I run the script using

spark-submit 
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.2 
--conf spark.executor.extraJavaOptions="Djavax.net.ssl.trustStore=/home/svr_data_analytic/trustStore.jks -Djavax.net.ssl.trustStorePassword=changeit" 
--conf spark.driver.extraJavaOptions="-Djavax.net.ssl.trustStore=/home/svr_data_analytic//trustStore.jks -Djavax.net.ssl.trustStorePassword=changeit" 
--conf spark.executor.extraJavaOptions="-Djavax.net.ssl.keyStore=/home/svr_data_analytic/KeyStore.jks -Djavax.net.ssl.keyStorePassword=changeit"
test.py

I am getting this error

2021-01-29 14:06:50 INFO  SparkContext:54 - Created broadcast 0 from broadcast at MongoSpark.scala:542
2021-01-29 14:06:50 INFO  cluster:71 - Cluster created with settings {hosts=[www.mongod.com:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
2021-01-29 14:06:50 INFO  cluster:71 - Cluster description not yet available. Waiting for 30000 ms before timing out
2021-01-29 14:06:50 INFO  cluster:76 - Exception in monitor thread while connecting to server www.mongod.com:27017
com.mongodb.MongoSocketReadException: Prematurely reached end of stream
    at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:92)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:554)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:425)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:289)
    at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255)
    at com.mongodb.internal.connection.CommandHelper.sendAndReceive(CommandHelper.java:83)
    at com.mongodb.internal.connection.CommandHelper.executeCommand(CommandHelper.java:33)
    at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initializeConnectionDescription(InternalStreamConnectionInitializer.java:106)
    at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:63)
    at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:127)
    at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:117)
    at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "/home/svr_data_analytic/hmis-analytics-data-processing/src/main/python/scripts/test.py", line 62, in <module>
    main()
  File "/home/svr_data_analytic/hmis-analytics-data-processing/src/main/python/scripts/test.py", line 50, in main
    df = processing(spark)
  File "/home/svr_data_analytic/hmis-analytics-data-processing/src/main/python/scripts/test.py", line 12, in processing
    .option('uri', '{}/{}.{}?authSource={}&ssl=true'.format('mongodb://admin:[email protected]:27017', 'smarthis_prod', 'IpAppointment', 'admin')).load()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o39.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=www.mongod.com:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
    at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:179)
    at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
    at com.mongodb.client.internal.MongoClientDelegate.getServerAddressList(MongoClientDelegate.java:116)
    at com.mongodb.Mongo.getServerAddressList(Mongo.java:401)
    at com.mongodb.spark.connection.MongoClientCache$$anonfun$logClient$1.apply(MongoClientCache.scala:161)
    at com.mongodb.spark.connection.MongoClientCache$$anonfun$logClient$1.apply(MongoClientCache.scala:161)
    at com.mongodb.spark.LoggingTrait$class.logInfo(LoggingTrait.scala:48)
    at com.mongodb.spark.Logging.logInfo(Logging.scala:24)
    at com.mongodb.spark.connection.MongoClientCache.logClient(MongoClientCache.scala:161)
    at com.mongodb.spark.connection.MongoClientCache.acquire(MongoClientCache.scala:56)
    at com.mongodb.spark.MongoConnector.acquireClient(MongoConnector.scala:239)
    at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:152)
    at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
    at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:234)
    at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:217)
    at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:217)
    at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68)
    at com.mongodb.spark.sql.DefaultSource.constructRelation(DefaultSource.scala:97)
    at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

2021-01-29 14:07:20 INFO  SparkContext:54 - Invoking stop() from shutdown hook

can anyone tell me where I am going wrong? Actually, I new to this concept I have done all this after doing some research in google. Thanks in advance.

question from:https://stackoverflow.com/questions/65951266/unable-to-connect-to-mongodb-with-ssl-using-pyspark

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Review the server log for the reason why connections are getting closed.

Most likely reasons:

  • You haven't provided a client certificate and the server is configured to validate client certificates
  • You provided a certificate but the server can't validate it due to not having the right CA cert
  • You aren't actually using TLS

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...