data:image/s3,"s3://crabby-images/c4689/c46892093c6e5f07e3f03da5e2b5f90a2bddc210" alt="Search icon"
Businesses need real-time insights to stay competitive and make well-informed decisions in today's data-driven environment. Because they add latency and restrict the ability to respond to events as they occur, traditional batch processing techniques are no longer adequate. Streaming data pipelines are useful in this situation. Organizations can ingest, process, store and visualize data in almost real-time by utilizing AWS services like Kinesis, Glue Streaming, S3 and QuickSight. This potent combination encourages agility and innovation by allowing companies to track consumer behavior, identify fraud and streamline operations quickly.
Consider an e-commerce platform that wishes to improve user experience and increase sales by tracking customer interactions in real-time. Through monitoring activities such as page views, clicks and purchases the platform can discover popular products, learn a lot about user behavior and spot irregularities like unsuccessful transactions. This constant data flow is made possible by a streaming data pipeline which gives the business the ability to see trends in QuickSight dashboards and make data-driven decisions immediately. This increases customer satisfaction through tailored experiences in addition to improving operational efficiency.
The below architecture diagram uses AWS Analytics Services components to process and visualize real time data. Below are the components and its uses.
AWS Kinesis: Ingests real-time data streams.
AWS Glue Streaming: Processes and transforms streaming data.
Amazon S3: Stores transformed data in a cost-effective, scalable manner.
Amazon QuickSight: Provides real-time dashboards and visualizations.
The suggested architecture involves real-time applications ingesting data from Clickstream and Telemetry Hubs and writing it to Amazon Kinesis Data Streams. These streams guarantee continuous and scalable data ingestion by storing the data in shards for the specified retention period. After that AWS Glue Streaming a service that is entirely managed by AWS and comparable to Spark Streaming uses the data. The incoming data is efficiently read processed and subjected to ETL transformations by Glue Streaming. Following processing the converted data is written in a partitioned well-structured format to Amazon S3 (e. g. A. year/month/day/hour) making it possible to store and query data effectively. The data is set up as a data source for Amazon QuickSight after it has been stored in S3 allowing for real-time dashboarding and reporting. Based on the most recent data trends these dashboards enable business teams to make well-informed decisions. The architecture also incorporates AWS CloudTrail and Amazon CloudWatch for monitoring and logging. CloudTrail offers comprehensive API call logging for auditing and CloudWatch keeps an eye on the pipelines overall health and performance guaranteeing real-time visibility and problem-alerting.
Sample Codes for below Step by Step implementation of the above architecture.
import boto3
region = 'us-east-1'
stream_name = 'clickstream-data-stream'
shard_count = 2 # Adjust based on expected data volume
kinesis_client = boto3.client('kinesis', region_name=region)
response = kinesis_client.create_stream(
StreamName=stream_name,
ShardCount=shard_count
)
print(f"Created Kinesis Data Stream: {stream_name}")
# Enable KMS Encryption
kinesis_client.start_stream_encryption(
StreamName=stream_name,
EncryptionType='KMS',
KeyId='alias/aws/kinesis' # Use custom KMS Key if needed
)
# Enable Enhanced Monitoring
kinesis_client.enable_enhanced_monitoring(
StreamName=stream_name,
ShardLevelMetrics=['IncomingBytes', 'OutgoingBytes', 'ReadProvisionedThroughputExceeded', 'WriteProvisionedThroughputExceeded']
)
import boto3
import json
import random
import time
import logging
from datetime import datetime
from botocore.exceptions import ClientError
# Configuration
region = 'us-east-1'
stream_name = 'clickstream-data-stream'
partition_key = 'user_id'
# Initialize Logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize Kinesis Client
kinesis_client = boto3.client('kinesis', region_name=region)
def generate_clickstream_event():
event = {
'event_type': 'page_view',
'user_id': f"user_{random.randint(1, 100)}",
'timestamp': datetime.utcnow().isoformat(),
'page': random.choice(['home', 'product', 'cart', 'checkout']),
'session_id': f"session_{random.randint(1000, 9999)}"
}
return event
def put_records_to_stream():
while True:
try:
event = generate_clickstream_event()
response = kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=event[partition_key]
)
logger.info(f"Sent data: {event}")
except ClientError as e:
logger.error(f"Failed to send data: {e}")
time.sleep(0.5) # Control the event rate
if __name__ == '__main__':
put_records_to_stream()
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, from_json, to_date, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
# Initialize Glue Context
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Configuration
stream_name = "clickstream-data-stream"
region = "us-east-1"
endpoint_url = f"https://kinesis.{region}.amazonaws.com"
output_path = "s3://your-bucket-name/clickstream-data/"
# Define Schema
schema = StructType([
StructField("event_type", StringType(), True),
StructField("user_id", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("page", StringType(), True),
StructField("session_id", StringType(), True)
])
# Read from Kinesis Stream
df = spark \
.readStream \
.format("kinesis") \
.option("streamName", stream_name) \
.option("region", region) \
.option("startingPosition", "LATEST") \
.option("endpointUrl", endpoint_url) \
.load()
# Deserialize JSON and Apply Schema
df = df.selectExpr("CAST(data AS STRING)")
df = df.select(from_json(col("data"), schema).alias("event_data"))
df = df.select("event_data.*")
# Transformations
df = df.withColumn("event_date", to_date(col("timestamp")))
df = df.withColumn("event_time", to_timestamp(col("timestamp")))
# Write to S3 in Parquet Format
query = df.writeStream \
.format("parquet") \
.option("checkpointLocation", "s3://your-bucket-name/checkpoints/") \
.option("path", output_path) \
.partitionBy("event_date") \
.outputMode("append") \
.start()
query.awaitTermination()
job.commit()
Glue Streaming, S3 and AWS Kinesis can be used to build a streaming data pipeline that allows businesses to process and analyze real-time data at scale. This architecture enables organizations to make data-driven decisions more quickly by offering low-latency data ingestion seamless transformation and economical storage. The system guarantees high performance and scalability by utilizing Parquet format for optimal storage and PySpark for intricate transformations. The ability to make decisions is further improved by integrating QuickSight for real-time dashboards. In addition to addressing today’s data challenges this end-to-end pipeline provides a strong basis for predictive insights and advanced analytics.
https://docs.aws.amazon.com/glue/latest/dg/streaming-chapter.html
https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html
https://docs.aws.amazon.com/streams/latest/dev/introduction.html
https://docs.aws.amazon.com/quicksight/latest/user/welcome.html