Taking the Azure Fabric Ignite Edition Challenges to Complete

Microsoft Learn Challenge conducting a challenge to get good in few of the challenges which are super useful to complete to gain knowledge on Microsoft Fabric.

Documenting Microsoft Learning challenge on fabric

Ignite Edition Challenge: Prepare for the next generation of data analytics with Microsoft Fabric

Microsoft Fabric - Unified data analytics platform

Introduction to end-to-end analytics using Microsoft Fabric : Link

Note : while I follow this challenge I will also create the resources in Azure and attach screenshots for better documentation.

Azure fabric workspace: https://app.fabric.microsoft.com/home

signed_up_fabric activate_freetrial new_workspace

intro_to_fabric

Get started with lakehouses in Microsoft Fabric Module : link

lake_house

exercise

exercise_2

alt text

exercise_1_4

completed_2

Use Apache Spark in Microsoft Fabric : Link

%%configure 
{ 
   "conf": {
       "spark.native.enabled": "true", 
       "spark.shuffle.manager": "org.apache.spark.shuffle.sort.ColumnarShuffleManager" 
   } 
}

spark_df

Code blocks and all the data used here is sales data : sales

# running using pyspark
%%pyspark
df = spark.read.load('Files/data/sales.csv',
    format='csv',
    header=True
)
display(df.limit(10))

# running using scala
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/sales.csv")
display(df.limit(10))

# Defining schema for loading

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", StringType()),
    StructField("OrderDate", StringType()),
    StructField("CustomerName", FloatType()),
    StructField("EmailAddress", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", StringType()),
    StructField("UnitPrice", StringType()),
    StructField("TaxAmount", StringType())
    ])

df = spark.read.load('Files/data/sales.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

#Selecting required columns
pricelist_df = df.select("OrderDate", "EmailAddress")

# Filtering with some conditions
bikes_df = df.select("OrderDate", "EmailAddress", "UnitPrice").where((df["UnitPrice"]>=3200) & (df["OrderDate"] >= "2019-07-05") )

# Group by with aggregation sum
counts_df = df.select("OrderDate", "EmailAddress", "UnitPrice").groupBy("OrderDate").agg(sum("UnitPrice").alias("Total_Price"))
display(counts_df)

# Just getting group by with count
counts_df = df.select("OrderDate", "EmailAddress", "UnitPrice").groupBy("OrderDate").count()
display(counts_df)

# Writing to parquet
bikes_df.write.mode("overwrite").parquet('Files/data/bikes.parquet')

# Writing to a partition
bikes_df.write.partitionBy("OrderDate").mode("overwrite").parquet("Files/bike_data")

# Reading from a partition 
road_bikes_df = spark.read.parquet('Files/bike_data/OrderDate=2019-07-05')
display(road_bikes_df.limit(5))

# Observation from reading above command - not showing the partition column while reading

spark_notebook_run

df.createOrReplaceTempView("sales_view")

# over writing the table
df.write.format("delta").mode('overwrite').saveAsTable("products")


df_products = spark.sql("SELECT * FROM challenge.products")
display(df_products.count())


#Partitioning the delta table for better performance

df.write.format("delta").mode("overwrite").partitionBy("OrderDate").saveAsTable("sales_partitioned")


sales_df = spark.sql("SELECT * \
                      FROM challenge.sales_partitioned ")
display(sales_df.count())


%%sql

SELECT OrderDate, COUNT(SalesOrderNumber) AS ProductCount
FROM sales_partitioned
GROUP BY OrderDate
ORDER BY OrderDate

table_temp

alt text

from matplotlib import pyplot as plt

# Get the data as a Pandas dataframe
data = spark.sql("SELECT OrderDate, COUNT(SalesOrderNumber) AS ProductCount \
FROM sales_partitioned \
GROUP BY OrderDate \
ORDER BY OrderDate").toPandas()

# Clear the plot area
plt.clf()

# Create a Figure
fig = plt.figure(figsize=(12,8))

# Create a bar plot of product counts by category
plt.bar(x=data['OrderDate'], height=data['ProductCount'], color='orange')

# Customize the chart
plt.title('Product Counts by OrderDate')
plt.xlabel('OrderDate')
plt.ylabel('Products')
plt.grid(color='#95a5a6', linestyle='--', linewidth=2, axis='y', alpha=0.7)
plt.xticks(rotation=70)

# Show the plot area
plt.show()

plot_result

module_3_completed

Orchestrate processes and data movement with Microsoft Fabric : link

copy_data_activity

pipeline_execution pipeline_monitor

pipeline_copy

table_name = "sales_copy"

from pyspark.sql.functions import *

# Read the new sales data
df = spark.read.format("csv").option("header","true").load("Files/new_data/*.csv")

## Add month and year columns
df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

# Derive FirstName and LastName columns
df = df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Filter and reorder columns
df = df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "EmailAddress", "Item", "Quantity", "UnitPrice", "TaxAmount"]

# Load the data into a table
df.write.format("delta").mode("append").saveAsTable(table_name)

# Just checking some syntax
display(df.select(df_copy['OrderDate'],year(to_date(df_copy['OrderDate'],'yyyy-MM-dd'))))

copy_pipline_delete

module_4

Ingest Data with Dataflows Gen2 in Microsoft Fabric : link

dataflow_gen2_1

dataflow_gen2_1

dataflow_gen2_3

module_5

Get started with data warehouses in Microsoft Fabric : link

data_warehouse_1

dw_completed

Load data into a Microsoft Fabric data warehouse : link

fabric_view

etl_dw

IF EXISTS (SELECT 1 FROM Dim_Products WHERE SourceKey = @ProductID AND IsActive = 'True')
BEGIN
    -- Existing product record
    UPDATE Dim_Products
    SET ValidTo = GETDATE(), IsActive = 'False'
    WHERE SourceKey = @ProductID 
        AND IsActive = 'True';
END
ELSE
BEGIN
    -- New product record
    INSERT INTO Dim_Products (SourceKey, ProductName, StartDate, EndDate, IsActive)
    VALUES (@ProductID, @ProductName, GETDATE(), '9999-12-31', 'True');
END

scd_dw

ErrorCode=DWCopyCommandOperationFailed,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message='DataWarehouse' Copy Command operation failed with error ''Column 'TaxAmount' of type 'REAL' is not compatible with external data type 'Parquet physical type: BYTE_ARRAY, logical type: UTF8', please try with 'VARCHAR(8000)'. Underlying data description: file 'https://olspn5q0h1kvg2htgcb6p.dfs.core.windows.net/93ba4f12-bd72-422f-b6f8-869e5ee2c177/_system/services/DI/pipelines/6906c723-8f93-4afa-a121-4ab78f070661/MSSQLImportCommand/6f2ec288-dfaa-450b-ad61-5fb9b4f90c79.parquet'.

4-query-using-workspace data_pipeline_ingest data_pipeline_ingest_staging

Load data using Dataflow

data_warehouse_load_module_done

Secure a Microsoft Fabric data warehouse : link

continued...