menu

Questions & Answers

Read files from s3 using pyspark every 15 minutes

I've my iot device data in a s3 bucket batched and converted to parquet from json by kinesis firehose. I now want to read the data from the s3 bucket which have arrived in last 15 minutes as I want to calculate the average for all parameters arrived in s3 in last 15 minutes,the timeframe goes like this:-

  1. 00:00 - 00:15 (first block)
  2. 00:15 - 00:30 (second block)
  3. 00:30 - 00:45 (third block)
  4. 00:45 - 01:00 (fourth block)

and so on, each day has 96 blocks

1440 minutes in a day / 15 minutes = 96 blocks

I want to load data in my pyspark application and the average for all columns and then again store the average as mentioned above and then store it into another s3 bucket. Here is my code which i have tried so far to read and retrive data from a s3 bucket

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
S3_PATH = "s3a://<my-bucket-name>/<sitename>/year=2023/month=01/day=17/*.parquet"
spark = SparkSession.builder.appName('practice_spark_s3_data_firehose').config("spark.pyspark.python","python").getOrCreate()
df = spark.read.parquet(S3_PATH)
df = df.withColumn("time",f.to_timestamp("time",'dd-MM-yyyy HH:mm:ss')) #converting string timestamp to timestamp type
df.orderBy("time").show(100,truncate=False)

Here I'm getting entire data for the day, any solution to work this out will be helpful, thanks.

Comments:
2023-01-21 00:30:11
Maybe just a question around saving money and making this easier but is it time critical that this runs every 15 minutes, or could it be hourly?
2023-01-21 00:30:11
Running the pyspark script and calculating averages every 15 minutes is the business requirement at the same time critical too, rather than just saving money or making it easier.
2023-01-21 00:30:11
Glue is not a tool for this. Take a look at Kinesis Data Analytics where you can use Firehose data as an input and with the use of KDA window functions you can calculate that averages easily with SQL and have the data written to the second S3 bucket through Firehose
2023-01-21 00:30:11
I was not intending to use Glue for it, but rather EMR serverless maybe for it. But thanks for the kinesis data analytics approach.
2023-01-21 00:30:11
Is there any way to set the start time for the tumbling window in KDA? For instance, I start the window at 00:00 and then it tumbles every 15 minutes from there onwards, not using sliding window because it can have data overlapping which in this case in not affordable.
2023-01-21 00:30:11
You can add a simple filter to your SparkSQL query for the most recent 15 minute window from when your code started, but you shouldn't hard-code year/month/day... Those should be arguments from the lambda/cron execution
Answers(0) :