Using pyspark with kafka sources /u/OeroShake Python Education

Can someone help me with running pyspark with kafka? I want to take kafka data as the input for pyspark dataframes and then perform transformations on it, then write the files to the kafka sink. My kafka setup is running fine, I tried running the producer and consumer simulation on terminal and that works. But I’m not able to fetch it using the pyspark program that I’ve written. I’m attaching the program that I’ve written.

“` import json import time from pyspark.sql import SparkSession, DataFrame, functions as F from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if(name == “main“):

kafka_server = "localhost:9092" kafka_topic = "Day8" try: spark = SparkSession.builder.appName("Day8")  .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0")  .getOrCreate() except Exception as e: print(f"Exception {e} raised while setting up spark session") else: try: kafka_df = spark.readStream  .format("kafka")  .option("kafka.bootstrap.servers", kafka_server)  .option("subscribe", kafka_topic)  .load() except Exception as e: print(f"Exception {e} raised while reading from kafka") else: try: json_df = kafka_df.selectExpr("CAST(value AS STRING) AS output") except: print("Exception raised while selecting the value column") schema = StructType([ StructField("Name", StringType(), True), StructField("age", IntegerType(), True), ]) try: parsed_df = json_df.select(F.from_json(F.col("output"), schema).alias("parsed_value")) except: print("Exception raised while parsing the json") try: flattened_df = parsed_df.select("parsed_value.Name", "parsed_value.age") except: print("Exception raised while flattening the dataframe") try: flattened_df.printSchema() except: print("Exception raised while priting the schema") try: filtered_df = flattened_df.filter(F.col("age") > 18) except: print("Exception raised while filtering the dataframe") try: query = filtered_df.writeStream  .outputMode("append")  .format("console")  .start() except: print("Exception raised while writing the stream to console") time.sleep(600) query.awaitTermination() 

“`

submitted by /u/OeroShake
[link] [comments]

​r/learnpython Can someone help me with running pyspark with kafka? I want to take kafka data as the input for pyspark dataframes and then perform transformations on it, then write the files to the kafka sink. My kafka setup is running fine, I tried running the producer and consumer simulation on terminal and that works. But I’m not able to fetch it using the pyspark program that I’ve written. I’m attaching the program that I’ve written. “` import json import time from pyspark.sql import SparkSession, DataFrame, functions as F from pyspark.sql.types import StructType, StructField, StringType, IntegerType if(name == “main”): kafka_server = “localhost:9092” kafka_topic = “Day8” try: spark = SparkSession.builder.appName(“Day8”) .config(“spark.jars.packages”, “org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0″) .getOrCreate() except Exception as e: print(f”Exception {e} raised while setting up spark session”) else: try: kafka_df = spark.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafka_server) .option(“subscribe”, kafka_topic) .load() except Exception as e: print(f”Exception {e} raised while reading from kafka”) else: try: json_df = kafka_df.selectExpr(“CAST(value AS STRING) AS output”) except: print(“Exception raised while selecting the value column”) schema = StructType([ StructField(“Name”, StringType(), True), StructField(“age”, IntegerType(), True), ]) try: parsed_df = json_df.select(F.from_json(F.col(“output”), schema).alias(“parsed_value”)) except: print(“Exception raised while parsing the json”) try: flattened_df = parsed_df.select(“parsed_value.Name”, “parsed_value.age”) except: print(“Exception raised while flattening the dataframe”) try: flattened_df.printSchema() except: print(“Exception raised while priting the schema”) try: filtered_df = flattened_df.filter(F.col(“age”) > 18) except: print(“Exception raised while filtering the dataframe”) try: query = filtered_df.writeStream .outputMode(“append”) .format(“console”) .start() except: print(“Exception raised while writing the stream to console”) time.sleep(600) query.awaitTermination() “` submitted by /u/OeroShake [link] [comments] 

Can someone help me with running pyspark with kafka? I want to take kafka data as the input for pyspark dataframes and then perform transformations on it, then write the files to the kafka sink. My kafka setup is running fine, I tried running the producer and consumer simulation on terminal and that works. But I’m not able to fetch it using the pyspark program that I’ve written. I’m attaching the program that I’ve written.

“` import json import time from pyspark.sql import SparkSession, DataFrame, functions as F from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if(name == “main“):

kafka_server = "localhost:9092" kafka_topic = "Day8" try: spark = SparkSession.builder.appName("Day8")  .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0")  .getOrCreate() except Exception as e: print(f"Exception {e} raised while setting up spark session") else: try: kafka_df = spark.readStream  .format("kafka")  .option("kafka.bootstrap.servers", kafka_server)  .option("subscribe", kafka_topic)  .load() except Exception as e: print(f"Exception {e} raised while reading from kafka") else: try: json_df = kafka_df.selectExpr("CAST(value AS STRING) AS output") except: print("Exception raised while selecting the value column") schema = StructType([ StructField("Name", StringType(), True), StructField("age", IntegerType(), True), ]) try: parsed_df = json_df.select(F.from_json(F.col("output"), schema).alias("parsed_value")) except: print("Exception raised while parsing the json") try: flattened_df = parsed_df.select("parsed_value.Name", "parsed_value.age") except: print("Exception raised while flattening the dataframe") try: flattened_df.printSchema() except: print("Exception raised while priting the schema") try: filtered_df = flattened_df.filter(F.col("age") > 18) except: print("Exception raised while filtering the dataframe") try: query = filtered_df.writeStream  .outputMode("append")  .format("console")  .start() except: print("Exception raised while writing the stream to console") time.sleep(600) query.awaitTermination() 

“`

submitted by /u/OeroShake
[link] [comments] 

Leave a Reply

Your email address will not be published. Required fields are marked *