Iceberg

Exploring Apache Icebery using PyIceberg – Part 1

Posted on Updated on

Apache Iceberg, an open-source table format that has become the industry standard for data sharing in modern data architectures. In a previous post I explored the core feature of Apache Iceberg and compared it with related technologies such as Apache Hudi and Delta Lake.

In this post we’ll look at some of the inital steps to setup and explore Iceberg tables using Python. I’ll have follow-on posts which will explore more advanced features of Apache Iceberg, again using Python. In this post, we’ll explore the following:

  • Environmental setup
  • Create an Iceberg Table from a Pandas dataframae
  • Explore the Iceberg Table and file system
  • Appending data and Time Travel
  • Read an Iceberg Table into a Pandas dataframe
  • Filtered scans with push-down predicates

Check out the link at the bottom of this post to download the Notebook containing all the PyIceberg code.

Environmental setup

Before we can get started with Apache Iceberg, we need to install it in our environment. I’m going with using Python for these blog posts and that means we need to install PyIceberg. In addition to this package, we also need to install pyiceberg-core. This is needed for some additional feature and optimisations of Iceberg.

pip install "pyiceberg[pyiceberg-core]"

This is a very quick install.

Next we need to do some environmental setup, like importing various packages used in the example code, setuping up some directories on the OS where the Iceberg files will be stored, creating a Catalog and a Namespace.

# Import other packages for this Demo Notebook
import pyarrow as pa
import pandas as pd
from datetime import date
import os
from pyiceberg.catalog.sql import SqlCatalog
#define location for the WAREHOUSE, where the Iceberg files will be located
WAREHOUSE = "/Users/brendan.tierney/Dropbox/Iceberg-Demo"
#create the directory, True = if already exists, then don't report an error
os.makedirs(WAREHOUSE, exist_ok=True)
#create a local Catalog
catalog = SqlCatalog(
"local",
**{
"uri": f"sqlite:///{WAREHOUSE}/catalog.db",
"warehouse": f"file://{WAREHOUSE}",
})
#create a namespace (a bit like a database schema)
NAMESPACE = "sales_db"
if NAMESPACE not in [ns[0] for ns in catalog.list_namespaces()]:
catalog.create_namespace(NAMESPACE)

That’s the initial setup complete.

Create an Iceberg Table from a Pandas dataframe

We can not start creating tables in Iceberg. To do this, the following code examples will initially create a Pandas dataframe, will convert it from table to columnar format (as the data will be stored in Parquet format in the Iceberg table), and then create and populate the Iceberg table.

#create a Pandas DF with some basic data
# Create a sample sales DataFrame
df = pd.DataFrame({
"order_id": [1001, 1002, 1003, 1004, 1005],
"customer": ["Alice", "Bob", "Carol", "Dave", "Eve"],
"product": ["Laptop", "Phone", "Tablet", "Monitor", "Keyboard"],
"quantity": [1, 2, 1, 3, 5],
"unit_price": [1299.99, 899.00, 549.50, 399.00, 79.99],
"order_date": [
date(2024, 1, 15), date(2024, 1, 16), date(2024, 2, 3), date(2024, 2, 20), date(2024, 3, 5)],
"region": ["EU", "US", "EU", "APAC", "US"]
})
# Compute total revenue per order
df["revenue"] = df["quantity"] * df["unit_price"]
print(df)
print(df.dtypes)
order_id customer product quantity unit_price order_date region \
0 1001 Alice Laptop 1 1299.99 2024-01-15 EU
1 1002 Bob Phone 2 899.00 2024-01-16 US
2 1003 Carol Tablet 1 549.50 2024-02-03 EU
3 1004 Dave Monitor 3 399.00 2024-02-20 APAC
4 1005 Eve Keyboard 5 79.99 2024-03-05 US
revenue
0 1299.99
1 1798.00
2 549.50
3 1197.00
4 399.95
order_id int64
customer object
product object
quantity int64
unit_price float64
order_date object
region object
revenue float64
dtype: object

That’s the Pandas dataframe created. Now we can convert it to columnar format using PyArrow.

#Convert pandas DataFrame → PyArrow Table
# PyIceberg writes via Arrow (columnar format), so this step is required
arrow_table = pa.Table.from_pandas(df)
print("Arrow schema:")
print(arrow_table.schema)
Arrow schema:
order_id: int64
customer: string
product: string
quantity: int64
unit_price: double
order_date: date32[day]
region: string
revenue: double
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 1180

Now we can define the Iceberg table along with the namespace for it.

#Create an Iceberg table from the Arrow schema
TABLE_NAME = (NAMESPACE, "orders")
table = catalog.create_table(
TABLE_NAME,
schema=arrow_table.schema,
)
table
orders(
1: order_id: optional long,
2: customer: optional string,
3: product: optional string,
4: quantity: optional long,
5: unit_price: optional double,
6: order_date: optional date,
7: region: optional string,
8: revenue: optional double
),
partition by: [],
sort order: [],
snapshot: null

The table has been defined in Iceberg and we can see there are no partitions, snapshots, etc. The Iceberg table doesn’t have any data. We can Append the Arrow table data to the Iceberg table.

#add the data to the table
table.append(arrow_table)
#table.append() adds new data files without overwriting existing ones.
#Use table.overwrite() to replace all data in a single atomic operation.

We can look at the file system to see what has beeb written.

print(f"Table written to: {WAREHOUSE}/sales_db/orders/")
print(f"Snapshot ID: {table.current_snapshot().snapshot_id}")
Table written to: /Users/brendan.tierney/Dropbox/Iceberg-Demo/sales_db/orders/
Snapshot ID: 3939796261890602539

Explore the Iceberg Table and File System

And Iceberg Table is just a collection of files on the file system, organised into a set of folders. You can look at these using your file system app, or use a terminal window, or in the examples below are from exploring those directories and files from a Jupyter Notebook.

Let’s start at the Warehouse level. This is the topic level that was declared back when the Environment was being setup. Check that out above in the first section.

!ls -l {WAREHOUSE}
-rw-r--r--@ 1 brendan.tierney staff 20480 28 Feb 15:26 catalog.db
drwxr-xr-x@ 3 brendan.tierney staff 96 28 Feb 12:59 sales_db

We can see the catalog file and a directiory for our ‘sales_db’ namespace. When you explore the contents of this file you will find two directorys. These contain the ‘metadata’ and the ‘data’ files. The following list the files found in the ‘data’ directory containing the data and these are stored in parquet format.

!ls -l {WAREHOUSE}/sales_db/orders/data
-rw-r--r--@ 1 brendan.tierney staff 3179 28 Feb 15:22 00000-0-357c029e-b420-459b-8248-b1caf3c030ce.parquet
-rw-r--r--@ 1 brendan.tierney staff 3307 28 Feb 15:23 00000-0-3fae55d7-1229-448c-9ffb-ae33c77003a3.parquet
-rw-r--r--@ 1 brendan.tierney staff 3179 28 Feb 15:03 00000-0-4ec1ef74-cd24-412e-a35f-bcf3d745bf42.parquet
-rw-r--r--@ 1 brendan.tierney staff 3179 28 Feb 15:23 00000-0-984ed6d2-4067-43c4-8c11-5f5a96febd24.parquet
-rw-r--r-- 1 brendan.tierney staff 3307 28 Feb 15:26 00000-0-a61264e8-b361-490e-90f7-105a33f20dec.parquet
-rw-r--r--@ 1 brendan.tierney staff 3307 28 Feb 15:22 00000-0-ac913dfe-548c-4cb3-99aa-4f1332e02248.parquet
-rw-r--r--@ 1 brendan.tierney staff 3307 28 Feb 13:00 00000-0-b3fa23ec-79c6-48da-ba81-ba35f25aa7ad.parquet
-rw-r--r--@ 1 brendan.tierney staff 3179 28 Feb 15:25 00000-0-d534a298-adab-4744-baa1-198395cc93bd.parquet
-rw-r--r--@ 1 brendan.tierney staff 3307 28 Feb 15:21 00000-0-ef5dd6d8-84c0-4860-828e-86e4e175a9eb.parquet
-rw-r--r--@ 1 brendan.tierney staff 3179 28 Feb 15:21 00000-0-f108db1c-39f9-4e2b-b825-3e580cccc808.parquet

I’ll leave you to explore the ‘metadata’ directory.

Read the Iceberg Table back into our Environment

To load an Iceberg table into your environment, you’ll need to load the Catalog and then load the table. We have already have the Catalog setup from a previous step, but tht might not be the case in your typical scenario. The following sets up the Catalog and loads the Iceberg table.

#Re-load the catalog and table (as you would in a new session)
catalog = SqlCatalog(
"local",
**{
"uri": f"sqlite:///{WAREHOUSE}/catalog.db",
"warehouse": f"file://{WAREHOUSE}",
}
)
table2 = catalog.load_table(("sales_db", "orders"))

When we inspect the structure of the Iceberg table we get the names of the columns and the datatypes.

print("--- Iceberg Schema ---")
print(table2.schema())
--- Iceberg Schema ---
table {
1: order_id: optional long
2: customer: optional string
3: product: optional string
4: quantity: optional long
5: unit_price: optional double
6: order_date: optional date
7: region: optional string
8: revenue: optional double
}

An Iceberg table can have many snapshots for version control. As we have only added data to the Iceberg table, we should only have one snapshot.

#Snapshot history
print("--- Snapshot History ---")
for snap in table2.history():
print(snap)
--- Snapshot History ---
snapshot_id=3939796261890602539 timestamp_ms=1772292384231

We can also inspect the details of the snapshot.

#Current snapshot metadata
snap = table2.current_snapshot()
print("--- Current Snapshot ---")
print(f" ID: {snap.snapshot_id}")
print(f" Operation: {snap.summary.operation}")
print(f" Records: {snap.summary.get('total-records')}")
print(f" Data files: {snap.summary.get('total-data-files')}")
print(f" Size bytes: {snap.summary.get('total-files-size')}")
--- Current Snapshot ---
ID: 3939796261890602539
Operation: Operation.APPEND
Records: 5
Data files: 1
Size bytes: 3307

The above shows use there was 5 records added using an Append operation.

An Iceberg table can be partitioned. When we created this table we didn’t specify a partition key, but in an example in another post I’ll give an example of partitioning this table

#Partition spec & sort order
print("--- Partition Spec ---")
print(table.spec()) # unpartitioned by default
--- Partition Spec ---
[]

We can also list the files that contain the data for our Iceberg table.

#List physical data files via scan
print("--- Data Files ---")
for task in table.scan().plan_files():
print(f" {task.file.file_path}")
print(f" record_count={task.file.record_count}, "
f"file_size={task.file.file_size_in_bytes} bytes")
--- Data Files ---
file:///Users/brendan.tierney/Dropbox/Iceberg-Demo/sales_db/orders/data/00000-0-a61264e8-b361-490e-90f7-105a33f20dec.parquet
record_count=5, file_size=3307 bytes

Appending Data and Time Travel

Iceberg tables facilitates changes to the schema and data, and to be able to view the data at different points in time. This is refered to as Time Travel. Let’s have a look at an example of this by adding some additional data to the Iceberg table.

# Get and Save the first snapshot id before writing more
snap_v1 = table.current_snapshot().snapshot_id
# New batch of orders - 2 new orders
df2 = pd.DataFrame({
"order_id": [1006, 1007],
"customer": ["Frank", "Grace"],
"product": ["Headphones", "Webcam"],
"quantity": [2, 1],
"unit_price": [249.00, 129.00],
"order_date": [date(2024, 4, 1), date(2024, 4, 5)],
"region": ["EU", "APAC"],
"revenue": [498.00, 129.00],
})
#Add the data
table.append(pa.Table.from_pandas(df2))

We can list the snapshots.

#Get the new snapshot id and check if different to previous
snap_v2 = table.current_snapshot().snapshot_id
print(f"v1 snapshot: {snap_v1}")
print(f"v2 snapshot: {snap_v2}")
v1 snapshot: 3939796261890602539
v2 snapshot: 8666063993760292894

and we can see how see how many records are in each Snapshot, using Time Travel.

#Time travel: read the ORIGINAL 5-row table
df_v1 = table.scan(snapshot_id=snap_v1).to_pandas()
print(f"Snapshot v1 — {len(df_v1)} rows")
#Current snapshot has all 7 rows
df_v2 = table.scan().to_pandas()
print(f"Snapshot v2 — {len(df_v2)} rows")
Snapshot v1 — 5 rows
Snapshot v2 — 7 rows

If you inspect the file system, in the data and metadata dirctories, you will notices some additional files.

!ls -l {WAREHOUSE}/sales_db/orders/data
!ls -l {WAREHOUSE}/sales_db/orders/metadata

Read an Iceberg Table into a Pandas dataframe

To load the Iceberg table into a Pandas dataframe we can

pd_df = table2.scan().to_pandas()

or we can use the Pandas package fuction

df = pd.read_iceberg("orders", "catalog")

Filtered scans with push-down predicates

PyIceberg provides a fluent scan API. You can read the full table or push down filters, column projections, and row limits — all evaluated at the file level.

Filtered Scan with Push Down Predicates

from pyiceberg.expressions import (
EqualTo, GreaterThanOrEqual, And
)
# Only EU orders with revenue above €1000
df_filtered = (
table2.scan(
row_filter=And(
EqualTo("region", "EU"),
GreaterThanOrEqual("revenue", 1000.0),
)
).to_pandas() )
print(df_filtered)
order_id customer product quantity unit_price order_date region revenue
0 1001 Alice Laptop 1 1299.99 2024-01-15 EU 1299.99

Column Projection – select specific columns

# Only fetch the columns you need — saves I/O
df_slim = (
table2.scan(selected_fields=("order_id", "customer", "revenue"))
.to_pandas() )
print(df_slim)
order_id customer revenue
0 1001 Alice 1299.99
1 1002 Bob 1798.00
2 1003 Carol 549.50
3 1004 Dave 1197.00
4 1005 Eve 399.95

We can also use Arrow for more control.

arrow_result = table2.scan().to_arrow()
print(arrow_result.schema)
df_from_arrow = arrow_result.to_pandas(timestamp_as_object=True)
print(df_from_arrow.head())
order_id: int64
customer: string
product: string
quantity: int64
unit_price: double
order_date: date32[day]
region: string
revenue: double
   order_id customer   product  quantity  unit_price  order_date region  \
0      1001    Alice    Laptop         1     1299.99  2024-01-15     EU   
1      1002      Bob     Phone         2      899.00  2024-01-16     US   
2      1003    Carol    Tablet         1      549.50  2024-02-03     EU   
3      1004     Dave   Monitor         3      399.00  2024-02-20   APAC   
4      1005      Eve  Keyboard         5       79.99  2024-03-05     US   

   revenue  
0  1299.99  
1  1798.00  
2   549.50  
3  1197.00  
4   399.95 

I’ve put all of the above into a Juputer Notebook. You can download this from here, and you can use it for your explorations of Apache Iceberg.

Check out my next post of Apache Iceberg to see my Python code on explore some additional, and advanced features of Apache Iceberg.