Iceberg
Exploring Apache Icebery using PyIceberg – Part 1
Apache Iceberg, an open-source table format that has become the industry standard for data sharing in modern data architectures. In a previous post I explored the core feature of Apache Iceberg and compared it with related technologies such as Apache Hudi and Delta Lake.
In this post we’ll look at some of the inital steps to setup and explore Iceberg tables using Python. I’ll have follow-on posts which will explore more advanced features of Apache Iceberg, again using Python. In this post, we’ll explore the following:
- Environmental setup
- Create an Iceberg Table from a Pandas dataframae
- Explore the Iceberg Table and file system
- Appending data and Time Travel
- Read an Iceberg Table into a Pandas dataframe
- Filtered scans with push-down predicates
Check out the link at the bottom of this post to download the Notebook containing all the PyIceberg code.
Environmental setup
Before we can get started with Apache Iceberg, we need to install it in our environment. I’m going with using Python for these blog posts and that means we need to install PyIceberg. In addition to this package, we also need to install pyiceberg-core. This is needed for some additional feature and optimisations of Iceberg.
pip install "pyiceberg[pyiceberg-core]"
This is a very quick install.
Next we need to do some environmental setup, like importing various packages used in the example code, setuping up some directories on the OS where the Iceberg files will be stored, creating a Catalog and a Namespace.
# Import other packages for this Demo Notebookimport pyarrow as paimport pandas as pdfrom datetime import dateimport osfrom pyiceberg.catalog.sql import SqlCatalog#define location for the WAREHOUSE, where the Iceberg files will be locatedWAREHOUSE = "/Users/brendan.tierney/Dropbox/Iceberg-Demo"#create the directory, True = if already exists, then don't report an erroros.makedirs(WAREHOUSE, exist_ok=True)#create a local Catalogcatalog = SqlCatalog( "local", **{ "uri": f"sqlite:///{WAREHOUSE}/catalog.db", "warehouse": f"file://{WAREHOUSE}", })#create a namespace (a bit like a database schema)NAMESPACE = "sales_db"if NAMESPACE not in [ns[0] for ns in catalog.list_namespaces()]: catalog.create_namespace(NAMESPACE)
That’s the initial setup complete.
Create an Iceberg Table from a Pandas dataframe
We can not start creating tables in Iceberg. To do this, the following code examples will initially create a Pandas dataframe, will convert it from table to columnar format (as the data will be stored in Parquet format in the Iceberg table), and then create and populate the Iceberg table.
#create a Pandas DF with some basic data# Create a sample sales DataFramedf = pd.DataFrame({ "order_id": [1001, 1002, 1003, 1004, 1005], "customer": ["Alice", "Bob", "Carol", "Dave", "Eve"], "product": ["Laptop", "Phone", "Tablet", "Monitor", "Keyboard"], "quantity": [1, 2, 1, 3, 5], "unit_price": [1299.99, 899.00, 549.50, 399.00, 79.99], "order_date": [ date(2024, 1, 15), date(2024, 1, 16), date(2024, 2, 3), date(2024, 2, 20), date(2024, 3, 5)], "region": ["EU", "US", "EU", "APAC", "US"]})# Compute total revenue per orderdf["revenue"] = df["quantity"] * df["unit_price"]print(df)print(df.dtypes) order_id customer product quantity unit_price order_date region \0 1001 Alice Laptop 1 1299.99 2024-01-15 EU 1 1002 Bob Phone 2 899.00 2024-01-16 US 2 1003 Carol Tablet 1 549.50 2024-02-03 EU 3 1004 Dave Monitor 3 399.00 2024-02-20 APAC 4 1005 Eve Keyboard 5 79.99 2024-03-05 US revenue 0 1299.99 1 1798.00 2 549.50 3 1197.00 4 399.95 order_id int64customer objectproduct objectquantity int64unit_price float64order_date objectregion objectrevenue float64dtype: object
That’s the Pandas dataframe created. Now we can convert it to columnar format using PyArrow.
#Convert pandas DataFrame → PyArrow Table # PyIceberg writes via Arrow (columnar format), so this step is requiredarrow_table = pa.Table.from_pandas(df)print("Arrow schema:")print(arrow_table.schema)Arrow schema:order_id: int64customer: stringproduct: stringquantity: int64unit_price: doubleorder_date: date32[day]region: stringrevenue: double-- schema metadata --pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 1180
Now we can define the Iceberg table along with the namespace for it.
#Create an Iceberg table from the Arrow schemaTABLE_NAME = (NAMESPACE, "orders")table = catalog.create_table( TABLE_NAME, schema=arrow_table.schema,)tableorders( 1: order_id: optional long, 2: customer: optional string, 3: product: optional string, 4: quantity: optional long, 5: unit_price: optional double, 6: order_date: optional date, 7: region: optional string, 8: revenue: optional double),partition by: [],sort order: [],snapshot: null
The table has been defined in Iceberg and we can see there are no partitions, snapshots, etc. The Iceberg table doesn’t have any data. We can Append the Arrow table data to the Iceberg table.
#add the data to the tabletable.append(arrow_table)#table.append() adds new data files without overwriting existing ones. #Use table.overwrite() to replace all data in a single atomic operation.
We can look at the file system to see what has beeb written.
print(f"Table written to: {WAREHOUSE}/sales_db/orders/")print(f"Snapshot ID: {table.current_snapshot().snapshot_id}")Table written to: /Users/brendan.tierney/Dropbox/Iceberg-Demo/sales_db/orders/Snapshot ID: 3939796261890602539
Explore the Iceberg Table and File System
And Iceberg Table is just a collection of files on the file system, organised into a set of folders. You can look at these using your file system app, or use a terminal window, or in the examples below are from exploring those directories and files from a Jupyter Notebook.
Let’s start at the Warehouse level. This is the topic level that was declared back when the Environment was being setup. Check that out above in the first section.
!ls -l {WAREHOUSE}-rw-r--r--@ 1 brendan.tierney staff 20480 28 Feb 15:26 catalog.dbdrwxr-xr-x@ 3 brendan.tierney staff 96 28 Feb 12:59 sales_db
We can see the catalog file and a directiory for our ‘sales_db’ namespace. When you explore the contents of this file you will find two directorys. These contain the ‘metadata’ and the ‘data’ files. The following list the files found in the ‘data’ directory containing the data and these are stored in parquet format.
!ls -l {WAREHOUSE}/sales_db/orders/data-rw-r--r--@ 1 brendan.tierney staff 3179 28 Feb 15:22 00000-0-357c029e-b420-459b-8248-b1caf3c030ce.parquet-rw-r--r--@ 1 brendan.tierney staff 3307 28 Feb 15:23 00000-0-3fae55d7-1229-448c-9ffb-ae33c77003a3.parquet-rw-r--r--@ 1 brendan.tierney staff 3179 28 Feb 15:03 00000-0-4ec1ef74-cd24-412e-a35f-bcf3d745bf42.parquet-rw-r--r--@ 1 brendan.tierney staff 3179 28 Feb 15:23 00000-0-984ed6d2-4067-43c4-8c11-5f5a96febd24.parquet-rw-r--r-- 1 brendan.tierney staff 3307 28 Feb 15:26 00000-0-a61264e8-b361-490e-90f7-105a33f20dec.parquet-rw-r--r--@ 1 brendan.tierney staff 3307 28 Feb 15:22 00000-0-ac913dfe-548c-4cb3-99aa-4f1332e02248.parquet-rw-r--r--@ 1 brendan.tierney staff 3307 28 Feb 13:00 00000-0-b3fa23ec-79c6-48da-ba81-ba35f25aa7ad.parquet-rw-r--r--@ 1 brendan.tierney staff 3179 28 Feb 15:25 00000-0-d534a298-adab-4744-baa1-198395cc93bd.parquet-rw-r--r--@ 1 brendan.tierney staff 3307 28 Feb 15:21 00000-0-ef5dd6d8-84c0-4860-828e-86e4e175a9eb.parquet-rw-r--r--@ 1 brendan.tierney staff 3179 28 Feb 15:21 00000-0-f108db1c-39f9-4e2b-b825-3e580cccc808.parquet
I’ll leave you to explore the ‘metadata’ directory.
Read the Iceberg Table back into our Environment
To load an Iceberg table into your environment, you’ll need to load the Catalog and then load the table. We have already have the Catalog setup from a previous step, but tht might not be the case in your typical scenario. The following sets up the Catalog and loads the Iceberg table.
#Re-load the catalog and table (as you would in a new session)catalog = SqlCatalog( "local", **{ "uri": f"sqlite:///{WAREHOUSE}/catalog.db", "warehouse": f"file://{WAREHOUSE}", })table2 = catalog.load_table(("sales_db", "orders"))
When we inspect the structure of the Iceberg table we get the names of the columns and the datatypes.
print("--- Iceberg Schema ---")print(table2.schema())--- Iceberg Schema ---table { 1: order_id: optional long 2: customer: optional string 3: product: optional string 4: quantity: optional long 5: unit_price: optional double 6: order_date: optional date 7: region: optional string 8: revenue: optional double}
An Iceberg table can have many snapshots for version control. As we have only added data to the Iceberg table, we should only have one snapshot.
#Snapshot history print("--- Snapshot History ---")for snap in table2.history(): print(snap)--- Snapshot History ---snapshot_id=3939796261890602539 timestamp_ms=1772292384231
We can also inspect the details of the snapshot.
#Current snapshot metadata snap = table2.current_snapshot()print("--- Current Snapshot ---")print(f" ID: {snap.snapshot_id}")print(f" Operation: {snap.summary.operation}")print(f" Records: {snap.summary.get('total-records')}")print(f" Data files: {snap.summary.get('total-data-files')}")print(f" Size bytes: {snap.summary.get('total-files-size')}")--- Current Snapshot --- ID: 3939796261890602539 Operation: Operation.APPEND Records: 5 Data files: 1 Size bytes: 3307
The above shows use there was 5 records added using an Append operation.
An Iceberg table can be partitioned. When we created this table we didn’t specify a partition key, but in an example in another post I’ll give an example of partitioning this table
#Partition spec & sort order print("--- Partition Spec ---")print(table.spec()) # unpartitioned by default--- Partition Spec ---[]
We can also list the files that contain the data for our Iceberg table.
#List physical data files via scanprint("--- Data Files ---")for task in table.scan().plan_files(): print(f" {task.file.file_path}") print(f" record_count={task.file.record_count}, " f"file_size={task.file.file_size_in_bytes} bytes")--- Data Files --- file:///Users/brendan.tierney/Dropbox/Iceberg-Demo/sales_db/orders/data/00000-0-a61264e8-b361-490e-90f7-105a33f20dec.parquet record_count=5, file_size=3307 bytes
Appending Data and Time Travel
Iceberg tables facilitates changes to the schema and data, and to be able to view the data at different points in time. This is refered to as Time Travel. Let’s have a look at an example of this by adding some additional data to the Iceberg table.
# Get and Save the first snapshot id before writing moresnap_v1 = table.current_snapshot().snapshot_id# New batch of orders - 2 new ordersdf2 = pd.DataFrame({ "order_id": [1006, 1007], "customer": ["Frank", "Grace"], "product": ["Headphones", "Webcam"], "quantity": [2, 1], "unit_price": [249.00, 129.00], "order_date": [date(2024, 4, 1), date(2024, 4, 5)], "region": ["EU", "APAC"], "revenue": [498.00, 129.00],})#Add the datatable.append(pa.Table.from_pandas(df2))
We can list the snapshots.
#Get the new snapshot id and check if different to previoussnap_v2 = table.current_snapshot().snapshot_idprint(f"v1 snapshot: {snap_v1}")print(f"v2 snapshot: {snap_v2}")v1 snapshot: 3939796261890602539v2 snapshot: 8666063993760292894
and we can see how see how many records are in each Snapshot, using Time Travel.
#Time travel: read the ORIGINAL 5-row tabledf_v1 = table.scan(snapshot_id=snap_v1).to_pandas()print(f"Snapshot v1 — {len(df_v1)} rows")#Current snapshot has all 7 rowsdf_v2 = table.scan().to_pandas()print(f"Snapshot v2 — {len(df_v2)} rows")Snapshot v1 — 5 rowsSnapshot v2 — 7 rows
If you inspect the file system, in the data and metadata dirctories, you will notices some additional files.
!ls -l {WAREHOUSE}/sales_db/orders/data!ls -l {WAREHOUSE}/sales_db/orders/metadata
Read an Iceberg Table into a Pandas dataframe
To load the Iceberg table into a Pandas dataframe we can
pd_df = table2.scan().to_pandas()
or we can use the Pandas package fuction
df = pd.read_iceberg("orders", "catalog")
Filtered scans with push-down predicates
PyIceberg provides a fluent scan API. You can read the full table or push down filters, column projections, and row limits — all evaluated at the file level.
Filtered Scan with Push Down Predicates
from pyiceberg.expressions import ( EqualTo, GreaterThanOrEqual, And)# Only EU orders with revenue above €1000df_filtered = ( table2.scan( row_filter=And( EqualTo("region", "EU"), GreaterThanOrEqual("revenue", 1000.0), ) ).to_pandas() )print(df_filtered) order_id customer product quantity unit_price order_date region revenue0 1001 Alice Laptop 1 1299.99 2024-01-15 EU 1299.99
Column Projection – select specific columns
# Only fetch the columns you need — saves I/Odf_slim = ( table2.scan(selected_fields=("order_id", "customer", "revenue")) .to_pandas() )print(df_slim) order_id customer revenue0 1001 Alice 1299.991 1002 Bob 1798.002 1003 Carol 549.503 1004 Dave 1197.004 1005 Eve 399.95
We can also use Arrow for more control.
arrow_result = table2.scan().to_arrow()print(arrow_result.schema)df_from_arrow = arrow_result.to_pandas(timestamp_as_object=True)print(df_from_arrow.head())
order_id: int64 customer: string product: string quantity: int64 unit_price: double order_date: date32[day] region: string revenue: double order_id customer product quantity unit_price order_date region \ 0 1001 Alice Laptop 1 1299.99 2024-01-15 EU 1 1002 Bob Phone 2 899.00 2024-01-16 US 2 1003 Carol Tablet 1 549.50 2024-02-03 EU 3 1004 Dave Monitor 3 399.00 2024-02-20 APAC 4 1005 Eve Keyboard 5 79.99 2024-03-05 US revenue 0 1299.99 1 1798.00 2 549.50 3 1197.00 4 399.95
I’ve put all of the above into a Juputer Notebook. You can download this from here, and you can use it for your explorations of Apache Iceberg.
Check out my next post of Apache Iceberg to see my Python code on explore some additional, and advanced features of Apache Iceberg.