Skip to content

Commit e4f0ff6

Browse files
Merge pull request #280 from microsoft/notebook_workshop
chore: Added Notebook to load tables instead of API
2 parents 4b5e170 + dce305c commit e4f0ff6

File tree

1 file changed

+170
-31
lines changed

1 file changed

+170
-31
lines changed

scripts/03_load_fabric_data.py

Lines changed: 170 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import sys
2222
import json
2323
import time
24+
import base64
2425

2526
# Load environment from azd + project .env
2627
from load_env import load_all_env, get_data_folder
@@ -146,7 +147,7 @@ def wait_for_lro(operation_url, operation_name="Operation", timeout=300):
146147
if resp.status_code == 200:
147148
result = resp.json()
148149
status = result.get("status", "Unknown")
149-
if status in ["Succeeded", "succeeded"]:
150+
if status in ["Succeeded", "succeeded", "Completed", "completed"]:
150151
print(f" [OK] {operation_name} completed")
151152
return result
152153
elif status in ["Failed", "failed"]:
@@ -156,6 +157,14 @@ def wait_for_lro(operation_url, operation_name="Operation", timeout=300):
156157
print(f" [FAIL] {operation_name} timed out")
157158
return None
158159

160+
def b64encode(content):
161+
"""Encode content to base64"""
162+
if isinstance(content, dict):
163+
content = json.dumps(content)
164+
if isinstance(content, str):
165+
content = content.encode("utf-8")
166+
return base64.b64encode(content).decode("utf-8")
167+
159168
# ============================================================================
160169
# Step 1: Get Workspace Name (needed for OneLake path)
161170
# ============================================================================
@@ -216,41 +225,173 @@ def wait_for_lro(operation_url, operation_name="Operation", timeout=300):
216225
if args.skip_tables:
217226
print(f"\n[3/3] Skipping table load (--skip-tables flag)")
218227
else:
219-
print(f"\n[3/3] Loading CSV files as Delta tables...")
220-
221-
tables_url = f"{FABRIC_API}/workspaces/{WORKSPACE_ID}/lakehouses/{LAKEHOUSE_ID}/tables"
222-
223-
for table_name in ontology_config["tables"].keys():
224-
csv_file = f"{table_name}.csv"
225-
print(f" Loading {csv_file} as table '{table_name}'...")
226-
227-
load_table_url = f"{tables_url}/{table_name}/load"
228-
load_payload = {
229-
"relativePath": f"Files/{csv_file}",
230-
"pathType": "File",
231-
"mode": "Overwrite",
232-
"formatOptions": {
233-
"format": "Csv",
234-
"header": True,
235-
"delimiter": ","
228+
print(f"\n[3/3] Loading CSV files as Delta tables via Fabric Notebook...")
229+
230+
# --- Build PySpark notebook code to load all CSVs as Delta tables ---
231+
table_names = list(ontology_config["tables"].keys())
232+
spark_code_lines = [
233+
"import os",
234+
"",
235+
f"lakehouse_name = '{LAKEHOUSE_NAME}'",
236+
f"table_names = {table_names}",
237+
"",
238+
"for table_name in table_names:",
239+
" csv_path = f'Files/{table_name}.csv'",
240+
" print(f'Loading {table_name} from {csv_path}...')",
241+
" df = spark.read.option('header', 'true').option('inferSchema', 'true').csv(csv_path)",
242+
" df.write.mode('overwrite').format('delta').saveAsTable(table_name)",
243+
" print(f' [OK] {table_name}: {df.count()} rows')",
244+
"",
245+
"print('All tables loaded successfully.')",
246+
]
247+
spark_code = "\n".join(spark_code_lines)
248+
249+
notebook_name = f"load_tables_{LAKEHOUSE_NAME}"
250+
251+
# Build notebook payload with Fabric notebook definition format
252+
notebook_metadata = {
253+
"language_info": {"name": "python"},
254+
"trident": {
255+
"lakehouse": {
256+
"default_lakehouse": LAKEHOUSE_ID,
257+
"default_lakehouse_name": LAKEHOUSE_NAME,
258+
"default_lakehouse_workspace_id": WORKSPACE_ID,
259+
"known_lakehouses": [
260+
{
261+
"id": LAKEHOUSE_ID
262+
}
263+
]
236264
}
237265
}
238-
239-
resp = make_request("POST", load_table_url, json=load_payload)
240-
241-
if resp.status_code == 200:
242-
print(f" [OK] Table '{table_name}' loaded successfully")
243-
elif resp.status_code == 202:
244-
operation_url = resp.headers.get("Location")
245-
wait_for_lro(operation_url, f"Table '{table_name}' loading")
266+
}
267+
268+
notebook_payload_content = {
269+
"nbformat": 4,
270+
"nbformat_minor": 5,
271+
"metadata": notebook_metadata,
272+
"cells": [
273+
{
274+
"cell_type": "code",
275+
"source": [spark_code],
276+
"metadata": {},
277+
"outputs": []
278+
}
279+
]
280+
}
281+
282+
# Check if notebook already exists, delete it to recreate
283+
print(f" Creating notebook '{notebook_name}'...")
284+
existing_nb_url = f"{FABRIC_API}/workspaces/{WORKSPACE_ID}/items?type=Notebook"
285+
existing_nb_resp = make_request("GET", existing_nb_url)
286+
if existing_nb_resp.status_code == 200:
287+
for item in existing_nb_resp.json().get("value", []):
288+
if item["displayName"] == notebook_name:
289+
del_url = f"{FABRIC_API}/workspaces/{WORKSPACE_ID}/items/{item['id']}"
290+
make_request("DELETE", del_url)
291+
print(f" Deleted existing notebook '{notebook_name}'")
292+
time.sleep(10) # Wait for deletion to propagate
293+
break
294+
295+
# Create the notebook
296+
create_nb_payload = {
297+
"displayName": notebook_name,
298+
"type": "Notebook",
299+
"definition": {
300+
"format": "ipynb",
301+
"parts": [
302+
{
303+
"path": "artifact.content.ipynb",
304+
"payload": b64encode(notebook_payload_content),
305+
"payloadType": "InlineBase64"
306+
}
307+
]
308+
}
309+
}
310+
311+
create_url = f"{FABRIC_API}/workspaces/{WORKSPACE_ID}/items"
312+
resp = make_request("POST", create_url, json=create_nb_payload)
313+
314+
if resp.status_code == 201:
315+
notebook_id = resp.json()["id"]
316+
print(f" [OK] Created notebook: {notebook_name} ({notebook_id})")
317+
elif resp.status_code == 202:
318+
operation_url = resp.headers.get("Location")
319+
result = wait_for_lro(operation_url, "Notebook creation")
320+
# Find the notebook to get its ID
321+
nb_resp = make_request("GET", existing_nb_url)
322+
notebook_id = None
323+
if nb_resp.status_code == 200:
324+
for item in nb_resp.json().get("value", []):
325+
if item["displayName"] == notebook_name:
326+
notebook_id = item["id"]
327+
break
328+
if not notebook_id:
329+
print(f" [FAIL] Could not find created notebook")
330+
sys.exit(1)
331+
print(f" [OK] Created notebook: {notebook_name} ({notebook_id})")
332+
else:
333+
print(f" [FAIL] Failed to create notebook: {resp.status_code} {resp.text}")
334+
sys.exit(1)
335+
336+
# Run the notebook
337+
print(f" Running notebook to load tables...")
338+
run_url = f"{FABRIC_API}/workspaces/{WORKSPACE_ID}/items/{notebook_id}/jobs/instances?jobType=RunNotebook"
339+
run_resp = make_request("POST", run_url)
340+
341+
if run_resp.status_code in [200, 202]:
342+
operation_url = run_resp.headers.get("Location")
343+
if operation_url:
344+
result = wait_for_lro(operation_url, "Notebook execution", timeout=600)
345+
if result is None:
346+
print(f" [FAIL] Notebook execution failed or timed out")
347+
sys.exit(1)
246348
else:
247-
print(f" ⚠ Table loading returned status: {resp.status_code}")
248-
print(f" Response: {resp.text}")
249-
349+
# No Location header, wait and check
350+
print(" Waiting for notebook execution...")
351+
time.sleep(60)
352+
print(f" [OK] Notebook execution completed - all tables loaded")
353+
else:
354+
print(f" [FAIL] Failed to run notebook: {run_resp.status_code} {run_resp.text}")
355+
sys.exit(1)
356+
250357
# Wait for tables to be indexed
251358
print(" Waiting for tables to be indexed...")
252359
time.sleep(30)
253360

361+
# --- Original Step 3 code (replaced by notebook approach above) ---
362+
# tables_url = f"{FABRIC_API}/workspaces/{WORKSPACE_ID}/lakehouses/{LAKEHOUSE_ID}/tables"
363+
#
364+
# for table_name in ontology_config["tables"].keys():
365+
# csv_file = f"{table_name}.csv"
366+
# print(f" Loading {csv_file} as table '{table_name}'...")
367+
#
368+
# load_table_url = f"{tables_url}/{table_name}/load"
369+
# load_payload = {
370+
# "relativePath": f"Files/{csv_file}",
371+
# "pathType": "File",
372+
# "mode": "Overwrite",
373+
# "formatOptions": {
374+
# "format": "Csv",
375+
# "header": True,
376+
# "delimiter": ","
377+
# }
378+
# }
379+
#
380+
# resp = make_request("POST", load_table_url, json=load_payload)
381+
#
382+
# if resp.status_code == 200:
383+
# print(f" [OK] Table '{table_name}' loaded successfully")
384+
# elif resp.status_code == 202:
385+
# operation_url = resp.headers.get("Location")
386+
# wait_for_lro(operation_url, f"Table '{table_name}' loading")
387+
# else:
388+
# print(f" Table loading returned status: {resp.status_code}")
389+
# print(f" Response: {resp.text}")
390+
#
391+
# # Wait for tables to be indexed
392+
# print(" Waiting for tables to be indexed...")
393+
# time.sleep(30)
394+
254395
# ============================================================================
255396
# Step 4: Trigger Ontology Materialization
256397
# ============================================================================
@@ -309,5 +450,3 @@ def wait_for_lro(operation_url, operation_name="Operation", timeout=300):
309450
Next step - Generate schema prompt:
310451
python scripts/04_generate_agent_prompt.py
311452
""")
312-
313-

0 commit comments

Comments
 (0)