Data pipelines on cloud (Storage)
Storage
Goal: persisting data
What storage do we choose?
Taxonomy of storage models (Mansouri, Toosi, and Buyya 2017)
| Data structure | Data abstraction | Data access |
|---|---|---|
| Structured | Database | Relational |
Relational database
| Data structure | Data abstraction | Data access |
|---|---|---|
| Semi/unstructured | Database | * |


| Data structure | Data abstraction | Data access |
|---|---|---|
| Unstructured | File/Database | Key-value |
File system (EFS), object storage (S3) (or DB K-V ; e.g., DynamoDB)
Differ in the supported features
Simple Storage Service (S3)
Benefits
AWS S3: storage classes
Two types of actions:


Having consistent principles on how to organize your data is important
Landing area (LA)
Staging area (SA)
Archive area (AA)
Production area (PA)
Pass-through job
(Cloud) data warehouse (DWH)
Failed area (FA)
| Area | Permissions | Tier |
|---|---|---|
| Landing | Ingestion applications can write Scheduled pipelines can read Data consumers can’t access |
Hot |
| Staging | Scheduled pipelines can read/write Selected data consumers can read |
Hot |
| Production | Scheduled pipelines can read/write Selected data consumers can read |
Hot |
| Archive | Scheduled pipelines can write Dedicated data reprocessing pipelines can read |
Cold/Archive |
| Failed | Scheduled pipelines can write Dedicated data reprocessing pipelines can read Data consumers don’t have access |
Hot |
Alternative organizations are available
“A data lake is a central repository system for storage, processing, and analysis of raw data, in which the data is kept in its original format and is processed to be queried only when needed. It can store a varied amount of formats in big data ecosystems, from unstructured, semi-structured, to structured data sources.”
Use folders to organize data inside areas into a logical structure
Different areas will have slightly different folder structures
/landing/ETL/sales_oracle_ingest/customers/01DFTFX89YDFAXREPJTR94Combine the key benefits of data lakes and DWHs
Key question: can we effectively combine these benefits?
Data independence: modify the schema at one level of the database system without altering the schema at the next higher level

From DWH to Data Lakehouse
1st generation systems: data warehousing started with helping business decision-makers get analytical insights
2nd generation: offloading all the raw data into data lakes
A two-tier architecture is highly complex for users
(Some) main problems:


The market is pushing for the adoption of Lakehouse as a standard de facto
Is there a real need for many unstructured and integrated datasets?
Juliana Freire, keynote @ EDBT 2023
Idea
Lakehouse
Delta Lake uses a transaction log and stores data into Apache Parquet for fast metadata operations
_delta_log subdirectoryChanges are recorded as ordered, atomic commits in the transaction log.
000000.json.000003.json) contains an array of actions to apply to the previous table versionWhenever a user modifies a table (such as an INSERT, UPDATE, or DELETE), Delta Lake breaks that operation down into a series of discrete steps composed of one or more of the actions below.
You own your data: we are decoupling the data from database engines!
CREATE TABLE suppliers(id INT, name STRING, age INT)
TBLPROPERTIES ('foo'='bar')
COMMENT 'this is a comment'
LOCATION 's3://...';Delta tables are stored in S3 (simple files in a data lake), and they can be read using different computes:
… and languages
Create a table of suppliers, the content of 00000000000000000000.json
{
"commitInfo":{
"timestamp":1709133408152,
"userId":"8355321721036096",
"userName":"user1@foo.bar",
"operation":"CREATE TABLE AS SELECT",
"operationParameters":{
"partitionBy":"[]",
"description":null,
"isManaged":"false",
"properties":"{}",
"statsOnLoad":false
},
"notebook":{
"notebookId":"68312033830310"
},
"clusterId":"0112-095737-cgwksnoz",
"isolationLevel":"WriteSerializable",
"isBlindAppend":true,
"operationMetrics":{
"numFiles":"4",
"numOutputRows":"1000000",
"numOutputBytes":"79811576"
},
"tags":{
"restoresDeletedRows":"false"
},
"engineInfo":"Databricks-Runtime/13.3.x-scala2.12",
"txnId":"afc094e5-7096-40cb-b4f7-33e98c5d3a4b"
}
}{
"add":{
"path":"part-00000-d7654bfc-8169-41a7-a7fc-28586c8f73f9-c000.snappy.parquet",
"partitionValues":{},
"size":20588082,
"modificationTime":1709133407000,
"dataChange":true,
"stats":"{\"numRecords\":257994,\"minValues\":{\"s_suppkey\":1,\"s_name\":\"Supplier#000000001\",\"s_address\":\" , Jd6qNPDAgz\",\"s_nationkey\":0,\"s_phone\":\"10-100-166-6237\",\"s_acctbal\":-999.94,\"s_comment\":\" Customer blithely regular pint\"},\"maxValues\":{\"s_suppkey\":257994,\"s_name\":\"Supplier#000257994\",\"s_address\":\"zzyv9d9xGUF QcjHQG8gDjuLo pLBxBZ�\",\"s_nationkey\":24,\"s_phone\":\"34-999-987-5257\",\"s_acctbal\":9999.93,\"s_comment\":\"zzle. sometimes bold pinto beans�\"},\"nullCount\":{\"s_suppkey\":0,\"s_name\":0,\"s_address\":0,\"s_nationkey\":0,\"s_phone\":0,\"s_acctbal\":0,\"s_comment\":0}}",
"tags":{
"INSERTION_TIME":"1709133407000000",
"MIN_INSERTION_TIME":"1709133407000000",
"MAX_INSERTION_TIME":"1709133407000000",
"OPTIMIZE_TARGET_SIZE":"268435456"
}
}
}{
"add":{
"path":"part-00001-758ed86b-1400-46b8-b73f-50c6ad4324f1-c000.snappy.parquet",
"partitionValues":{},
"size":20516343,
"modificationTime":1709133408000,
"dataChange":true,
"stats":"{\"numRecords\":257111,\"minValues\":{\"s_suppkey\":257995,\"s_name\":\"Supplier#000257995\",\"s_address\":\" t2HGWJzQQcWUyx\",\"s_nationkey\":0,\"s_phone\":\"10-100-154-1322\",\"s_acctbal\":-999.96,\"s_comment\":\" Customer blithe requesComplain\"},\"maxValues\":{\"s_suppkey\":515105,\"s_name\":\"Supplier#000515105\",\"s_address\":\"zzyvSACyGWpp5gCaZbUL7lKRUnhe7m6p�\",\"s_nationkey\":24,\"s_phone\":\"34-999-802-1817\",\"s_acctbal\":9999.95,\"s_comment\":\"zzle. regular foxes are ironic p�\"},\"nullCount\":{\"s_suppkey\":0,\"s_name\":0,\"s_address\":0,\"s_nationkey\":0,\"s_phone\":0,\"s_acctbal\":0,\"s_comment\":0}}",
"tags":{
"INSERTION_TIME":"1709133407000001",
"MIN_INSERTION_TIME":"1709133407000001",
"MAX_INSERTION_TIME":"1709133407000001",
"OPTIMIZE_TARGET_SIZE":"268435456"
}
}
}Add a new supplier, content of 00000000000000000009.json
{
"commitInfo":{
"timestamp":1709134798441,
"userId":"8047431628735957",
"userName":"user2@foo.bar",
"operation":"WRITE",
"operationParameters":{
"mode":"Append",
"statsOnLoad":false,
"partitionBy":"[]"
},
"notebook":{
"notebookId":"4471242088384584"
},
"clusterId":"0112-095737-cgwksnoz",
"readVersion":8,
"isolationLevel":"WriteSerializable",
"isBlindAppend":true,
"operationMetrics":{
"numFiles":"1",
"numOutputRows":"1",
"numOutputBytes":"2675"
},
"tags":{
"restoresDeletedRows":"false"
},
"engineInfo":"Databricks-Runtime/13.3.x-scala2.12",
"txnId":"45786330-12ee-4e73-85ff-38cdd2caffcf"
}
}{
"add":{
"path":"part-00000-7b0e114f-e86f-4952-a030-b877001f8074-c000.snappy.parquet",
"partitionValues":{},
"size":2675,
"modificationTime":1709134799000,
"dataChange":true,
"stats":"{\"numRecords\":1,\"minValues\":{\"s_suppkey\":1,\"s_name\":\"Supplier#000000001\",\"s_address\":\" N kD4on9OM Ipw3,gf0JBoQDd7tgrzr\",\"s_nationkey\":17,\"s_phone\":\"27-918-335-1736\",\"s_acctbal\":5755.94,\"s_comment\":\"each slyly above the careful\"},\"maxValues\":{\"s_suppkey\":1,\"s_name\":\"Supplier#000000001\",\"s_address\":\" N kD4on9OM Ipw3,gf0JBoQDd7tgrzr�\",\"s_nationkey\":17,\"s_phone\":\"27-918-335-1736\",\"s_acctbal\":5755.94,\"s_comment\":\"each slyly above the careful\"},\"nullCount\":{\"s_suppkey\":0,\"s_name\":0,\"s_address\":0,\"s_nationkey\":0,\"s_phone\":0,\"s_acctbal\":0,\"s_comment\":0}}",
"tags":{
"INSERTION_TIME":"1709134799000000",
"MIN_INSERTION_TIME":"1709134799000000",
"MAX_INSERTION_TIME":"1709134799000000",
"OPTIMIZE_TARGET_SIZE":"268435456"
}
}
}Once we have made several commits to the transaction log, Delta Lake saves a checkpoint file in Parquet format in _delta_log

Checkpoints save the entire state of the table at a point in time.
A “shortcut” to reproducing a table’s state to avoid reprocessing what could be thousands of tiny, inefficient JSON files.
Spark runs a listFrom v operation to view all files in the transaction log, starting from v

Imagine that we have created commits up to 000007.json and that Spark has cached this version of the table in memory.
0000012.json.listFrom version 7 operation to see the new changes to the table.#10.Checkpoint 00000000000000000002.checkpoint.parquet

Time Travel
select * from suppliers; -- read the last version of the table
delete from suppliers; -- delete all data from the table!
select * from suppliers version as of 3; -- read from a specific version of the table
restore table suppliers to version as of 3; -- restore to a specific versionData Lineage and Debugging
Optimize: Delta Lake can improve the speed of read queries from a table by coalescing small files into larger ones.
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable) # For path-based tables
# For Hive metastore-based tables: deltaTable = DeltaTable.forName(spark, tableName)
deltaTable.optimize().executeCompaction()
# If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate using `where`
deltaTable.optimize().where("date='2021-11-18'").executeCompaction()Auto compaction automatically reduce small file problems.
Example of a write transaction: read the data at table version r and attempt to write log record r+1
r, if required combine previous checkpoint and further log recordsr+1.json log object, if no other client has written this object.parquet checkpoint for log record r+1Not all large-scale storage systems have an atomic put operation
Check the scalability with respect to the length of the log
i = 0
while i < 20000:
if i % 10 == 0:
spark.sql("select sum(quantity) from lineitem") # Read the whole fact
spark.sql("insert (500K tuples) into lineitem") # Append new tuples
i += 1Scalability
Check the scalability with respect to the length of the log
i = 0
while i < 20000:
if i % 10 == 0:
spark.sql("select sum(quantity) from lineitem") # Read the whole fact
spark.sql("insert (500K tuples) into lineitem") # Append new tuples
if i % 100 == 0: OPTIMIZE # Optimize
i += 1Scalability

Format-independent optimizations are
See also I spent 5 hours understanding more about the delta lake table format
A medallion architecture is a data design pattern used to logically organize data in a lakehouse, with the goal of incrementally and progressively improving the structure and quality of data as it flows through each layer of the architecture
Medallion architecture
Example of usage
Matteo Francia - Big Data and Cloud Platforms (Module 2) - A.Y. 2025/26