Skip to content

Ingestion layer

The ingestion layer mainly consists of two services: A Kinesis Data Stream, which is the consumer of the real-time logs feature of CloudFront, and a Kinesis Data Firehose Delivery Stream, which will back up the raw data in S3, and also store the data as partitioned parquet files in another S3 bucket. Both S3 buckets are part of the storage layer.

The Kinesis Data Stream (one shard in provisioned mode) provides an ingest capacity of 1 MB/second or 1,000 records/second, for a price of $0.015/hour in us-east-1, and $0.014 per 1M PUT payload units. It forwards the incoming data to the Kinesis Data Firehose Delivery Stream, whose pricing is more complex. The ingestion costs $0.029/GB, the format conversion $0.018/GB, and the dynamic partitioning $0.02/GB. That sums up to $0.067/GB ingested and written to S3, plus the S3 costs of $0.005/1k PUT object calls.

The Kinesis Data Firehose Delivery Stream uses data transformation and dynamic partitioning with a Lambda function, which cleans, transforms and enriches the data so that it can be stored in S3 as parquet files with appropriate Hive partitions.

The Delivery Stream has so-called BufferingHints, which either define from which size (from 1 to 128MB) or in which interval (between 60 to 900 seconds) the data is flushed to S3. The interval defines the minimum latency at which the data gets persisted in the data lake. The Lambda function is part of the processing layer and is discussed below.

The CloudFormation resource definition for the Kinesis Data Firehose Delivery Stream can be found below. It sources its variables from the serverless.yml:

AnalyticsKinesisFirehose:
Type: 'AWS::KinesisFirehose::DeliveryStream'
Properties:
DeliveryStreamName: ${self:custom.kinesis.delivery.name}
DeliveryStreamType: KinesisStreamAsSource
# Source configuration
KinesisStreamSourceConfiguration:
KinesisStreamARN: !GetAtt 'AnalyticsKinesisStream.Arn'
RoleARN: !GetAtt 'AnalyticsKinesisFirehoseRole.Arn'
# Necessary configuration to transfrom and write data to S3 as parquet files
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt 'CleanedBucket.Arn'
BufferingHints:
IntervalInSeconds: ${self:custom.kinesis.delivery.limits.intervalInSeconds}
SizeInMBs: ${self:custom.kinesis.delivery.limits.sizeInMB}
# This enables logging to CloudWatch for better debugging possibilities
CloudWatchLoggingOptions:
Enabled: True
LogGroupName: ${self:custom.logs.groupName}
LogStreamName: ${self:custom.logs.streamName}
DataFormatConversionConfiguration:
Enabled: True
# Define the input format
InputFormatConfiguration:
Deserializer:
OpenXJsonSerDe:
CaseInsensitive: True
# Define the output format
OutputFormatConfiguration:
Serializer:
ParquetSerDe:
Compression: SNAPPY
WriterVersion: V1
# The schema configuration based on Glue tables
SchemaConfiguration:
RoleArn: !GetAtt 'AnalyticsKinesisFirehoseRole.Arn'
DatabaseName: '${self:custom.glue.database}'
TableName: 'incoming_events'
# Enable dynamic partitioning
DynamicPartitioningConfiguration:
Enabled: True
# Enable Lambda function for pre-processing the Kinesis records
ProcessingConfiguration:
Enabled: True
Processors:
- Type: Lambda
Parameters:
- ParameterName: NumberOfRetries
ParameterValue: 3
- ParameterName: BufferIntervalInSeconds
ParameterValue: 60
- ParameterName: BufferSizeInMBs
ParameterValue: 3
- ParameterName: LambdaArn
ParameterValue: !GetAtt 'ProcessKinesisRecordsLambdaFunction.Arn'
# Enable backups for the raw incoming data
S3BackupMode: Enabled
S3BackupConfiguration:
BucketARN: !GetAtt 'RawBucket.Arn'
BufferingHints:
IntervalInSeconds: ${self:custom.kinesis.delivery.limits.intervalInSeconds}
SizeInMBs: ${self:custom.kinesis.delivery.limits.sizeInMB}
# Disable logging to CloudWatch for raw data
CloudWatchLoggingOptions:
Enabled: false
CompressionFormat: GZIP
Prefix: '${self:custom.prefixes.raw}'
ErrorOutputPrefix: '${self:custom.prefixes.error}'
RoleARN: !GetAtt 'AnalyticsKinesisFirehoseRole.Arn'
RoleARN: !GetAtt 'AnalyticsKinesisFirehoseRole.Arn'
# Define output S3 prefixes
Prefix: '${self:custom.prefixes.incoming}/domain_name=!{partitionKeyFromLambda:domain_name}/event_type=!{partitionKeyFromLambda:event_type}/event_date=!{partitionKeyFromLambda:event_date}/'
ErrorOutputPrefix: '${self:custom.prefixes.error}'