Processing layer
The processing layer consists of three parts, the Lambda function that is used for the dynamic partitioning of the incoming data, and two Lambda functions that use the COPY TO PARTITION BY feature of DuckDB to aggregate and repartition the ingested, enriched and stored page views data, on two different time aggregation levels (daily and hourly).
Data transformation & Dynamic partitioning Lambda
Data transformation is a Kinesis Data Firehose Delivery Stream feature that enables the cleaning, transformation and enrichment of incoming records in a batched manner. In combination with the dynamic partitioning feature, this provides powerful data handling capabilities with the data still being “on stream”. When writing data to S3 as parquet files, a schema configuration in the form of a Glue Table needs to be defined as well to make it work (see “Cataloging & search layer” below).
It’s necessary to define some buffer configuration for the Lambda function, meaning that you need to specify the time interval of 60 seconds (this will add a max delay of one minute to the stream data), the size in MB (between 0.2 and 3), and the number of retries (3 is the only usable default).
The input coming from the Kinesis Data Firehose Delivery Stream are a base64 encoded strings that contain the loglines coming from the CloudFront distribution:
MTY4MjA4NDI0MS40NjlcdDIwMDM6ZTE6YmYxZjo3YzAwOjhlYjoxOGY4OmExZmI6OWRhZFx0MzA0XHQvaGVsbG8uZ2lmP3Q9cHYmdHM9MTY4MjA4MzgwNDc2OCZ1PWh0dHBzJTI1M0ElMjUyRiUyNTJGbXlkb21haW4udGxkJTI1MkYmaG49bXlkb21haW4udGxkJnBhPSUyNTJGJnVhPU1vemlsbGElMjUyRjUuMCUyNTIwKE1hY2ludG9zaCUyNTNCJTI1MjBJbnRlbCUyNTIwTWFjJTI1MjBPUyUyNTIwWCUyNTIwMTBfMTVfNyklMjUyMEFwcGxlV2ViS2l0JTI1MkY1MzcuMzYlMjUyMChLSFRNTCUyNTJDJTI1MjBsaWtlJTI1MjBHZWNrbyklMjUyMENocm9tZSUyNTJGMTEyLjAuMC4wJTI1MjBTYWZhcmklMjUyRjUzNy4zNiZpdz0xMjkyJmloPTkyNiZ0aT1NeSUyNTIwRG9tYWluJnc9MzQ0MCZoPTE0NDAmZD0yNCZsPWRlLURFJnA9TWFjSW50ZWwmbT04JmM9OCZ0ej1FdXJvcGUlMjUyRkJlcmxpblx0Nzg5XHRIQU01MC1QMlx0MC4wMDFcdE1vemlsbGEvNS4wJTIwKE1hY2ludG9zaDslMjBJbnRlbCUyME1hYyUyME9TJTIwWCUyMDEwXzE1XzcpJTIwQXBwbGVXZWJLaXQvNTM3LjM2JTIwKEtIVE1MLCUyMGxpa2UlMjBHZWNrbyklMjBDaHJvbWUvMTEyLjAuMC4wJTIwU2FmYXJpLzUzNy4zNlx0LVx0dD1wdiZ0cz0xNjgyMDgzODA0NzY4JnU9aHR0cHMlMjUzQSUyNTJGJTI1MkZteWRvbWFpbi50bGQlMjUyRiZobj1teWRvbWFpbi50bGQmcGE9JTI1MkYmdWE9TW96aWxsYSUyNTJGNS4wJTI1MjAoTWFjaW50b3NoJTI1M0IlMjUyMEludGVsJTI1MjBNYWMlMjUyME9TJTI1MjBYJTI1MjAxMF8xNV83KSUyNTIwQXBwbGVXZWJLaXQlMjUyRjUzNy4zNiUyNTIwKEtIVE1MJTI1MkMlMjUyMGxpa2UlMjUyMEdlY2tvKSUyNTIwQ2hyb21lJTI1MkYxMTIuMC4wLjAlMjUyMFNhZmFyaSUyNTJGNTM3LjM2Jml3PTEyOTImaWg9OTI2JnRpPU15JTI1MjBEb21haW4mdz0zNDQwJmg9MTQ0MCZkPTI0Jmw9ZGUtREUmcD1NYWNJbnRlbCZtPTgmYz04JnR6PUV1cm9wZSUyNTJGQmVybGluXHRIaXRcdDMzMjBcbg==
After decoding, the logline is visible and contains the info from the real-time log fields, which are tab-separated and contain newlines:
1682084241.469\t2003:e1:bf1f:7c00:8eb:18f8:a1fb:9dad\t304\t/hello.gif?t=pv&ts=1682083804768&u=https%253A%252F%252Fmydomain.tld%252F&hn=mydomain.tld&pa=%252F&ua=Mozilla%252F5.0%2520(Macintosh%253B%2520Intel%2520Mac%2520OS%2520X%252010_15_7)%2520AppleWebKit%252F537.36%2520(KHTML%252C%2520like%2520Gecko)%2520Chrome%252F112.0.0.0%2520Safari%252F537.36&iw=1292&ih=926&ti=My%2520Domain&w=3440&h=1440&d=24&l=de-DE&p=MacIntel&m=8&c=8&tz=Europe%252FBerlin\t789\tHAM50-P2\t0.001\tMozilla/5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010_15_7)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/112.0.0.0%20Safari/537.36\t-\tt=pv&ts=1682083804768&u=https%253A%252F%252Fmydomain.tld%252F&hn=mydomain.tld&pa=%252F&ua=Mozilla%252F5.0%2520(Macintosh%253B%2520Intel%2520Mac%2520OS%2520X%252010_15_7)%2520AppleWebKit%252F537.36%2520(KHTML%252C%2520like%2520Gecko)%2520Chrome%252F112.0.0.0%2520Safari%252F537.36&iw=1292&ih=926&ti=My%2520Domain&w=3440&h=1440&d=24&l=de-DE&p=MacIntel&m=8&c=8&tz=Europe%252FBerlin\tHit\t3320\n
During transformation and enrichment, the following steps are followed:
- Validating the source record
- Enriching the browser and device data from the user agent string
- Determine whether the record was generated by a bot (by user agent string)
- Add nearest geographical information based on edge locations
- Compute referer
- Derive requested URI
- Compute UTM data
- Get the event type (either a page view or a tracking event)
- Build the time hierarchy (year, month, day, event timestamp)
- Compute data arrival delays (data/process metrics)
- Generate hashes for page view, daily page view and daily visitor ids (later used to calculate page views and visits)
- Add metadata with the partition key values (in our case, the partition keys are domain_name, event_date, and event_type), to be able to use the dynamic partitioning feature
The generated JSON looks like this:
{ "result": "Ok", "error": null, "data": { "event_year": 2023, "event_month": 4, "event_day": 21, "event_timestamp": "2023-04-21T13:30:04.768Z", "arrival_timestamp": "2023-04-21T13:37:21.000Z", "arrival_delay_ms": -436232, "edge_city": "Hamburg", "edge_state": null, "edge_country": "Germany", "edge_country_code": "DE", "edge_latitude": 53.630401611328, "edge_longitude": 9.9882297515869, "edge_id": "HAM", "referer": null, "referer_domain_name": "Direct / None", "browser_name": "Chrome", "browser_version": "112.0.0.0", "browser_os_name": "Mac OS", "browser_os_version": "10.15.7", "browser_timezone": "Europe/Berlin", "browser_language": "de-DE", "device_type": "Desktop", "device_vendor": "Apple", "device_outer_resolution": "3440x1440", "device_inner_resolution": "1292x926", "device_color_depth": 24, "device_platform": "MacIntel", "device_memory": 8, "device_cores": 8, "utm_source": null, "utm_campaign": null, "utm_medium": null, "utm_content": null, "utm_term": null, "request_url": "https://mydomain.tld/", "request_path": "/", "request_query_string": "t=pv&ts=1682083804768&u=https%253A%252F%252Fmydomain.tld%252F&hn=mydomain.tld&pa=%252F&ua=Mozilla%252F5.0%2520(Macintosh%253B%2520Intel%2520Mac%2520OS%2520X%252010_15_7)%2520AppleWebKit%252F537.36%2520(KHTML%252C%2520like%2520Gecko)%2520Chrome%252F112.0.0.0%2520Safari%252F537.36&iw=1292&ih=926&ti=My%2520Domain&w=3440&h=1440&d=24&l=de-DE&p=MacIntel&m=8&c=8&tz=Europe%252FBerlin\t789\tHAM50-P2\t0.001\tMozilla/5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010_15_7)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/112.0.0.0%20Safari/537.36\t-\tt=pv&ts=1682083804768&u=https%253A%252F%252Fmydomain.tld%252F&hn=mydomain.tld&pa=%252F&ua=Mozilla%252F5.0%2520(Macintosh%253B%2520Intel%2520Mac%2520OS%2520X%252010_15_7)%2520AppleWebKit%252F537.36%2520(KHTML%252C%2520like%2520Gecko)%2520Chrome%252F112.0.0.0%2520Safari%252F537.36&iw=1292&ih=926&ti=My%2520Domain&w=3440&h=1440&d=24&l=de-DE&p=MacIntel&m=8&c=8&tz=Europe%252FBerlin", "request_bytes": 789, "request_status_code": 304, "request_cache_status": "Hit", "request_delivery_time_ms": 1, "request_asn": 3320, "request_is_bot": 0, "event_name": null, "event_data": null, "page_view_id": "f4e1939bc259131659b00cd5f73e55a5bed04fbfa63f095b561fd87009d0a228", "daily_page_view_id": "7c82d13036aa2cfe04720e0388bb8645eb90de084bd50cf69356fa8ec9d8b407", "daily_visitor_id": "9f0ac3a2560cfa6d5c3494e1891d284225e15f088414390a40fece320021a658", "domain_name": "mydomain.tld", "event_date": "2023-04-21", "event_type": "pageview" }, "metadata": { "partitionKeys": { "domain_name": "mydomain.tld", "event_date": "2023-04-21", "event_type": "pageview" } }}
Then, the following steps are done by the Lambda function:
- Encode the JSON stringified records in base64 again
- Return them to the Kinesis Data Firehose Delivery Stream, which will then persist the data based on the defined prefix in the S3 bucket for incoming data.
Daily Aggregation Lambda
As the ingested data contains information on a single request level, it makes sense to aggregate the data so that queries can be run optimally, and query response times are reduced.
The aggregation Lambda function is based on tobilg/serverless-parquet-repartitioner, which also has an accompanying blog post that explains in more detail how the DuckDB Lambda Layer can be used to repartition or aggregate existing data in S3.
The Lambda function is scheduled to run each night at 00:16AM UTC, which makes sure that all the Kinesis Firehose Delivery Stream output files of the last day have been written to S3 (this is because the maximum buffer time is 15 minutes).
It also writes the historical data to a DuckDB database file in the duckdb/data.duckdb
key of the curated data S3 Bucket.
When it runs, it does three things:
- Create a session aggregation, that derives the session information and whether single requests were bounces or not
- Calculate the pageviews and visitor numbers, broken down by several dimensions which are later needed for querying (see
stats
table below) - Store the extraction of the event data separately, newly partitioned by
event_name
to speed up queries
Hourly Aggregation Lambda
The hourly aggregation Lambda function is scheduled to run at every 16th minute of an hour, which makes sure that all the Kinesis Firehose Delivery Stream output files of the last hour have been written to S3 (maximum buffer time is 15 minutes).
It computes the same data as the daily aggregation Lambda function, but for the last hour.