Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
333 views
in Technique[技术] by (71.8m points)

python - How to print out Structured Stream in Console format

I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.

My program:

  • Simulates the streaming arrival of files to the folder "monitoring_dir" (one new file is transferred from "source_dir" each 10 seconds).
  • Uses a DataStreamReader to populate the Unbounded DataFrame "inputUDF" with the content of each new file.
  • Uses a DataStreamWriter to output the new rows of "inputUDF" to a valid sink.

Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.

Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.

My question is:

  • How can I make this program to output to Console sink and display the results when using Databricks?

Thank you very much in advance!

Best regards, Nacho


My Program: myTest.py

import pyspark
import pyspark.sql.functions

import time

#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------ 
def get_source_dir_file_names(source_dir):

    # 1. We create the output variable
    res = []

    # 2. We get the FileInfo representation of the files of source_dir
    fileInfo_objects = dbutils.fs.ls(source_dir)

    # 3. We traverse the fileInfo objects, to get the name of each file
    for item in fileInfo_objects:      
        # 3.1. We get a string representation of the fileInfo
        file_name = str(item)

        # 3.2. We look for the pattern name= to remove all useless info from the start
        lb_index = file_name.index("name='")
        file_name = file_name[(lb_index + 6):]

        # 3.3. We look for the pattern ') to remove all useless info from the end
        ub_index = file_name.index("',")
        file_name = file_name[:ub_index]

        # 3.4. We append the name to the list
        res.append(file_name)

    # 4. We sort the list in alphabetic order
    res.sort()

    # 5. We return res
    return res

#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------ 
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
    # 1. We get the names of the files on source_dir
    files = get_source_dir_file_names(source_dir)

    # 2. We get the starting time of the process
    time.sleep(time_step_interval * 0.1)

    start = time.time()

    # 3. We set a counter in the amount of files being transferred
    count = 0

    # 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
    # (i.e, the files are moved one by one for each time period, simulating their generation).
    for file in files:
        # 4.1. We copy the file from source_dir to dataset_dir#
        dbutils.fs.cp(source_dir + file, monitoring_dir + file)

        # 4.2. We increase the counter, as we have transferred a new file
        count = count + 1

        # 4.3. We wait the desired transfer_interval until next time slot.
        time.sleep((start + (count * time_step_interval)) - time.time())

    # 5. We wait a last time_step_interval
    time.sleep(time_step_interval)

#------------------------------------
# FUNCTION my_main
#------------------------------------ 
def my_main():
    # 0. We set the mode
    console_sink = True

    # 1. We set the paths to the folders
    source_dir = "/FileStore/tables/my_dataset/"
    monitoring_dir = "/FileStore/tables/my_monitoring/"
    checkpoint_dir = "/FileStore/tables/my_checkpoint/"
    result_dir = "/FileStore/tables/my_result/"

    dbutils.fs.rm(monitoring_dir, True)
    dbutils.fs.rm(result_dir, True)
    dbutils.fs.rm(checkpoint_dir, True)  

    dbutils.fs.mkdirs(monitoring_dir)
    dbutils.fs.mkdirs(result_dir)
    dbutils.fs.mkdirs(checkpoint_dir)    

    # 2. We configure the Spark Session
    spark = pyspark.sql.SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel('WARN')    

    # 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
    inputUDF = spark.readStream.format("text")
                               .load(monitoring_dir)

    myDSW = None
    # 4. Operation A1: We create the DataStreamWritter...

    # 4.1. To either save to result_dir in append mode  
    if console_sink == False:
        myDSW = inputUDF.writeStream.format("text")
                                    .option("path", result_dir) 
                                    .option("checkpointLocation", checkpoint_dir)
                                    .trigger(processingTime="10 seconds")
                                    .outputMode("append")   
    # 4.2. Or to display by console in append mode    
    else:
        myDSW = inputUDF.writeStream.format("console")
                                    .trigger(processingTime="10 seconds")
                                    .outputMode("append")   

    # 5. We get the StreamingQuery object derived from starting the DataStreamWriter
    mySQ = myDSW.start()

    # 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
    streaming_simulation(source_dir, monitoring_dir, 10)

    # 7. We stop the StreamingQuery to finish the application
    mySQ.stop()    

#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
    my_main()

My Dataset: f1.txt

First sentence.

Second sentence.


My Dataset: f2.txt

Third sentence.

Fourth sentence.


My Dataset: f3.txt

Fifth sentence.

Sixth sentence.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

"How can I make this program to output to Console sink and display the results when using Databricks?"

The easiest way is to use display which Databricks provides. You can use it as shown below:

# Cell 1
rateDf = (spark.readStream
  .format("rate")
  .option("rowsPerSecond", 1)
  .option("numPartitions", 1)
  .load())

# Cell 2
display(rateDf, streamName="rate_stream")

The Console sink does not work in Databricks as you would expect it to work in your IDE or when submitting it to your cluster. Instead, you can use the memory format and query the data with an %sql query:

inputUDF.writeStream 
  .format("memory") 
  .trigger(processingTime = "10 seconds") 
  .queryName("inputUDF_console") 
  .outputMode("append") 
  .start()  

In another Databricks Cell you can look into the data by querying the table as given in the queryName:

%sql select * from inputUDF_console

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...