{"id":8968,"date":"2024-02-19T14:33:06","date_gmt":"2024-02-19T14:33:06","guid":{"rendered":"https:\/\/craftydba.com\/?p=8968"},"modified":"2024-12-26T21:03:29","modified_gmt":"2024-12-26T21:03:29","slug":"thread-02-data-engineering-with-fabric","status":"publish","type":"post","link":"https:\/\/craftydba.com\/?p=8968","title":{"rendered":"Thread 02 &#8211; Data Engineering with Fabric"},"content":{"rendered":"<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/microsoft-fabric-logo.png\" alt=\"Microsoft Fabric Logo\" width=\"100\" height=\"100\" align=\"\u201dleft\u201d\" hspace=\"\u201d50\u201d\" vspace=\"\u201d50\u201d\" \/><\/p>\n<h2 style=\"color: green;\">Managing Files and Folders<\/h2>\n<p>What is a data lake?  It is just a bunch of files organized by folders.  Keeping these files organized prevents your <font style=\"color: green;\">data lake<\/font> from becoming a <font style=\"color: brown;\">data swamp<\/font>.  Today, we are going to learn about a python library that can help you.<\/p>\n<h4 style=\"color: brown;\">Business Problem<\/h4>\n<p>Our manager has given us weather data to load into Microsoft Fabric. We need to create folders in the landing zone to organize these files by both full and incremental loads.  How can we accomplish this task?<\/p>\n<h4 style=\"color: brown;\">Technical Solution<\/h4>\n<p>This use case allows data engineers to learn how to programmatically manage files and folders with the Microsoft Spark Utilities <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/synapse-analytics\/spark\/microsoft-spark-utilities?pivots=programming-language-python\">library<\/a>. The following topics will be explored in this article (thread).<\/p>\n<ol>\n<li>drop hive tables<\/li>\n<li>remove folders<\/li>\n<li>create folders<\/li>\n<li>move files<\/li>\n<li>copy files<\/li>\n<li>write files<\/li>\n<li>full load strategy<\/li>\n<li>incremental load strategy<\/li>\n<\/ol>\n<h4 style=\"color: brown;\">Architectural Overview<\/h4>\n<p>The architectural diagram shows how folders are used to organize the data into quality zones.  This is sometimes referred to as a <a href = \"https:\/\/learn.microsoft.com\/en-us\/fabric\/onelake\/onelake-medallion-lakehouse-architecture\">medallion<\/a> lakehouse architecture.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/onelake-medallion-lakehouse-architecture-example.png\" alt=\"Fabric Data Engineering\" width=\"600\" height=\"600\" ><\/p>\n<p>In practice, I have seen an additional quality zone called <b>raw<\/b> be used to stage files in their native format before converting to a delta file format.  Please note, the lake house uses either shortcuts or pipelines to get files into the lake.  We will talk more about bronze, silver and gold zones when I cover full and incremental loading later in this article.<\/p>\n<h4 style=\"color: brown;\">Data Lake Cleanup<\/h4>\n<p>One great property of a data lake is the fact that it can always be <b>torn down<\/b> from a current state and <b>rebuilt<\/b> from the source files.  We are going to remove tables and folders from the prior lesson at this time.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/thread-002-01.png\" width=\"600\" height=\"600\" ><\/p>\n<p>The above image shows the PySpark notebook for lesson 2 loaded into the lake house explorer.  The first two tasks involve the execution of Spark SQL to drop tables from the hive catalog.<\/p>\n<pre class=\"lang:TSQL theme:familiar mark:1,2-4\" title=\"drop unmanaged table\">\n%%sql\n--\n--  drop unmanaged (external) table\n--\n\ndrop table if exists unmanaged_weather;\n\n<\/pre>\n<p>The above snippet drops the unmanage table while the below snippet drops the managed table.<\/p>\n<pre class=\"lang:TSQL theme:familiar mark:1,2-4\" title=\"drop managed table\">\n%%sql\n--\n--  drop managed (internal) table\n--\n\ndrop table if exists managed_weather;\n\n<\/pre>\n<p>The last task is to remove the folder named bronze.  We can use the file system remove command with the recursive flag equal to true.  I will be talk about the Microsoft Spark Utilities library in detail shortly.<\/p>\n<pre class=\"lang:TSQL theme:familiar mark:1,2-4\" title=\"delete bronze folder\">\n#\n#  Remove bronze directory\n# \n\n# location\npath = \"Files\/bronze\"\n\n# remove folder\nmssparkutils.fs.rm(path, True)\n<\/pre>\n<p>The image below show that both tables and one folder have been deleted.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/thread-002-05.png\" width=\"600\" height=\"600\" ><\/p>\n<p>We are now ready to re-organize the raw (staging) folder (directory).<\/p>\n<h4 style=\"color: brown;\">Managing Folders<\/h4>\n<p>Microsoft has supplied the data lake engineer with the Spark Utilities library.  One can use the help method to list all the functions that can be called.  Please see image below for details on file system methods (functions).<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/thread-002-40.png\" width=\"600\" height=\"600\" ><\/p>\n<p>In this section, we are going to focus on the remove and create directory functions in this article.  We used the <b>rm <\/b>function to delete the <b>bronze<\/b> directory in the prior sections.<\/p>\n<p>We want to organize and\/or create data in the following folders.<\/p>\n<ul>\n<li>raw\/baseline &#8211; the original weather files<\/li>\n<li>raw\/full &#8211; one week of full weather files<\/li>\n<li>raw\/incremental &#8211; high\/low temperature files by date folder<\/li>\n<\/ul>\n<p>The image below shows the final folder structure.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/thread-002-10.png\" width=\"600\" height=\"600\" ><\/p>\n<p>Snippet for baseline folder.<\/p>\n<pre class=\"lang:TSQL theme:familiar mark:1,2-4\" title=\"create baseline folder\">\n#\n#  create baseline directory\n# \n\n# location\npath = \"Files\/raw\/weather\/baseline\"\n\n# remove folder\nmssparkutils.fs.mkdirs(path)\n<\/pre>\n<p>Snippet for full folder.<\/p>\n<pre class=\"lang:TSQL theme:familiar mark:1,2-4\" title=\"create full folder\">\n#\n#  create full directory\n# \n\n# location\npath = \"Files\/raw\/weather\/full\"\n\n# remove folder\nmssparkutils.fs.mkdirs(path)\n<\/pre>\n<p>Snippet for full folder.<\/p>\n<pre class=\"lang:TSQL theme:familiar mark:1,2-4\" title=\"create incremental folder\">\n#\n#  create incremental directory\n# \n\n# location\npath = \"Files\/raw\/weather\/incremental\"\n\n# remove folder\nmssparkutils.fs.mkdirs(path)\n<\/pre>\n<p>Creating, renaming and deleting folders can be easily accomplished with the Microsoft Spark Utilities library.<\/p>\n<h4 style=\"color: brown;\">Moving Files<\/h4>\n<p>In lesson one, there were three files in the weather directory:  high temperature, low temperature and read me.  We want to move these files to a sub-folder named <b>baseline<\/b><\/p>\n<p>The code below gets a file listing of the <b>weather<\/b> directory.  Then, it iterates thru all files and folders using a for loop.  For just files, it moves them to the <b>baseline<\/b>.  Please see the <b>ls<\/b> and <b>mv<\/b> functions for details.<\/p>\n<pre class=\"lang:TSQL theme:familiar mark:1,2-4\" title=\"move files\">\n#\n#  move files\n# \n\n# locations\nsrc_path = \"Files\/raw\/weather\"\ndst_path = \"Files\/raw\/weather\/baseline\"\n\n# get dir listing\nfiles = mssparkutils.fs.ls(path)\n\n# for each file\nfor file in files:\n\n    # not dirs\n    if not file.isDir:\n        \n        # src\/dst file\n        src_file = src_path + '\/' + file.name\n        dst_file = dst_path + '\/' + file.name\n\n        # debug\n        print(src_file)\n        print(dst_file)\n        print(\"\")\n\n        # move files\n        mssparkutils.fs.mv(src_file, dst_file)\n<\/pre>\n<p>This is a-lot of code to move just three files.  The power of programming file actions is when you have to deal with hundreds if not thousands of files.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/thread-002-15.png\" width=\"600\" height=\"600\" ><\/p>\n<p>The above image shows the originally upload data files moved to the baseline directory.<\/p>\n<h4 style=\"color: brown;\">Full Load<\/h4>\n<p>In lesson 3, I am going to talk about how to load the bronze and silver tables for a full load pattern.  For instance, the product list for a company is usually finite and static.  For drug companies, how many times do they come up with a new drug?<\/p>\n<p>To simulate this pattern with the weather data set, we want to create one week of folders.  I am using the last week in the data set which represents September 24th to the 30th.  Each folder will have the complete data file for both high and low temperatures.<\/p>\n<pre class=\"lang:TSQL theme:familiar mark:1,2-4\" title=\"copy files\">\n#\n#  create full copies\n# \n\n# source path\nsrc_path = \"Files\/raw\/weather\/baseline\"\n\n# destination path\ndst_path = \"Files\/raw\/weather\/full\"\n\n# one week\ni = 1\nwhile i < 8:\n\n    # get day as int\n    d = i + 23\n\n    # get date as sortable string\n    s = '201809' + str(d).rjust(2, \"0\")\n\n    # debug\n    # print(dst_path + '\/' + s)\n\n    # make sub dir\n    mssparkutils.fs.mkdirs(src_path)\n \n    # for each file\n    files = ['high_temps.csv', 'low_temps.csv']\n    for file in files:\n\n        # src\/dst files\n        dst_file = dst_path + '\/' + s + '\/' + file\n        src_file = src_path + '\/' + file\n\n        # make sub dir\n        mssparkutils.fs.cp(src_file, dst_file)\n\n        # debug\n        print(src_file)\n        print(dst_file)\n        print('')\n\n    # increment day\n    i += 1\n<\/pre>\n<p>The above code uses both the <b>mkdirs<\/b> method to create a sub-folder for each day as well as the <b>cp<\/b> method to copy the data files from the baseline directory to the daily folder.  Additionally, I choose to use two different forms of iteration:  a <b>while loop<\/b> and a <b>for loop<\/b>.<\/p>\n<p>The image below shows the completed task.  Seven new folders have been created and fourteen files have been copied.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/thread-002-20.png\" width=\"600\" height=\"600\" ><\/p>\n<h4 style=\"color: brown;\">Incremental Load<\/h4>\n<p>In lesson 3, I am going to talk about how to load the bronze and silver tables for a incremental load pattern.  This is a very common pattern for large datasets.  Let's make believe we are getting the orders from the Amazon website on a given day.  That is a very large number of records.  If we had a business requirement to have two years of data in the lake at a given point in time, a full load pattern would require moving stale data over and over again.<\/p>\n<p>Just remember, with a incremental load pattern, there is also a historical load component.  For instance, we might create 104 data files and each file would contain a week of orders.  This would seed our data lake with historical data.  Going forward, we would just upload a single days worth of orders.<\/p>\n<p>Now that we have the concept, let's work on breaking down the high temperature and low temperature data files into a single file per day.  Each file will have a single row.  We would never do this in the real life, but it is a good example of working with a large set of folders and files.<\/p>\n<pre class=\"lang:TSQL theme:familiar mark:1,2-4\" title=\"dataframe - high temps\">\n#\n#  load high temp data\n#\n\n# location\npath = \"Files\/raw\/weather\/baseline\/high_temps.csv\"\n\n# read file\ndf_high = spark.read.format(\"csv\").option(\"header\",\"true\").load(path)\n<\/pre>\n<pre class=\"lang:Python theme:familiar mark:1,2-4\" title=\"dataframe - low temps\">\n#\n#  load low temp data\n#\n\n# location\npath = \"Files\/raw\/weather\/baseline\/low_temps.csv\"\n\n# read file\ndf_low = spark.read.format(\"csv\").option(\"header\",\"true\").load(path)\n<\/pre>\n<p>The above code creates two DataFrames, one for high temps and one for low temps.  The below code creates our incremental files.  It uses the fact that the data files have matching rows by position on date value.  Thus, for a given row regardless of file, we have the same date.<\/p>\n<pre class=\"lang:Python theme:familiar mark:1,2-4\" title=\"create files\">\n#\n#  create incremental copies\n# \n\n# base of path\nroot = \"Files\/raw\/weather\/incremental\"\n\n# record count\nc = df_low.count()\n\n# convert df to list\nl = df_low.collect()\n\n# convert df to list\nh = df_high.collect()\n\n# static\nheader = 'date,temp'\n\n# for each row\nfor r in range(c):\n\n    # low temp - date, temp, folder, contents\n    d = l[r][0]\n    t = l[r][1]\n    f = d.replace(\"-\", \"\")\n    contents = f'{header}\\r\\n{d},{t}\\r\\n'\n\n    # make sub dir\n    path = root + '\/' + f\n    mssparkutils.fs.mkdirs(path)\n\n    # write low temp file\n    print(contents)\n    path = root + '\/' + f + '\/low_temps.csv'\n    mssparkutils.fs.put(path, contents, True)    \n\n    # high temp - date, temp, folder, contents\n    d = h[r][0]\n    t = h[r][1]\n    f = d.replace(\"-\", \"\")\n    contents = f'{header}\\r\\n{d},{t}\\r\\n'\n\n    # write high temp file\n    print(contents)    \n    path = root + '\/' + f + '\/high_temps.csv'\n    mssparkutils.fs.put(path, contents, True)    \n<\/pre>\n<p>The devil is in the details.<\/p>\n<p>  The <b>collect()<\/b> method converts the DataFrame to a list.  For each row, we extract the date and remove any delimiters.  In the end, we have a sortable folder name that represents year, month and day.  The <b>mkdirs<\/b> method is used to create a directory.  The header line for the files is static and the row if data is retrieved from our lists.  We write out the daily file using the <b>put<\/b> method from the file system class.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/thread-002-25.png\" width=\"600\" height=\"600\" ><\/p>\n<p>The above image shows the start of the daily folders for incremental data.<\/p>\n<h4 style=\"color: brown;\">Testing<\/h4>\n<p>The most important part of any coding assignment is testing.<\/p>\n<pre class=\"lang:Python theme:familiar mark:1,2-4\" title=\"display - full load\">\n#\n#  show full load data\n#\n\ndf = spark.read.format(\"csv\").option(\"header\",\"true\").load(\"Files\/raw\/weather\/full\/20180930\/high_temps.csv\")\ndisplay(df)\n<\/pre>\n<p>The above code reads in the latest full load file for high temperatures.  The below image shows the output data is scrollable.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/thread-002-30.png\" width=\"600\" height=\"600\" ><\/p>\n<pre class=\"lang:Python theme:familiar mark:1,2-4\" title=\"display - incremental load\">\n#\n#  show incremental data, load two files at once\n#\n\n# library\nfrom pyspark.sql.functions import *\n\n# high temp record\ndf1 = spark.read.format(\"csv\").option(\"header\",\"true\") \\\n  .load(\"Files\/raw\/weather\/incremental\/20180930\/high_temps.csv\") \\\n  .withColumn('desc', lit('high'))\n\n# low temp record\ndf2 = spark.read.format(\"csv\").option(\"header\",\"true\") \\\n  .load(\"Files\/raw\/weather\/incremental\/20180930\/low_temps.csv\") \\\n  .withColumn('desc', lit('low'))\n\n# combine records\ndf3 = df1.unionAll(df2)\ndisplay(df3)\n<\/pre>\n<p>The screen shot shown below captures the output of this code snippet.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/thread-002-35.png\" width=\"600\" height=\"600\" ><\/p>\n<h4 style=\"color: brown;\">Issues<\/h4>\n<p>It is very important to test.  I am not going to go over two issues that I have found with Microsoft Fabric.<\/p>\n<p>First, when creating a large number of folders and files, the lake house explorer stops refreshing.  The image below shows that the last incremental folder is dated 20180324.  However, we can list the last directory we created using the <b>ls<\/b> command.  It was dated 20180930.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/timing-issue-01.png\" width=\"600\" height=\"600\" ><\/p>\n<p>Since Fabric has many components that were in Azure Synapse, I tried reproducing this bug in that environment.  Fabric uses a scrollable list and Synapse uses a paging method.  We can see Synapse does not have this issue.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/timing-issue-02.png\" width=\"600\" height=\"600\" ><\/p>\n<p>In fact, I think it is some type of refresh or timing bug.  Much later, I logged out and logged back in.  Low and behold, the folders showed up.<\/p>\n<p>If you look hard enough at the <b>Fabric Graphical User Interface<\/b> (GUI), you can see parts of <b>Office 365<\/b> and <b>Power BI<\/b>.  Second, this historical foundation might be the root cause of the second issue.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/timing-issue-04.png\" width=\"600\" height=\"600\" ><\/p>\n<p>The above image shows that I saved the notebook for lesson-02 as lesson-03 and lesson04.  The below image shows the deletion of the notebook named lesson-03.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/timing-issue-05.png\" width=\"600\" height=\"600\" >\\<\/p>\n<p>If I try to rename lesson-04 as lesson-03, I get an error message stating the notebook already exists.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/timing-issue-06.png\" width=\"600\" height=\"600\" ><\/p>\n<p>This is a feature in Power BI in which an object is soft deleted.  Unfortunately, the system has not caught up with the fact that the file is now gone and the name can be re-used.  Again, if you log into the system minutes later, you will be able to rename the file.<\/p>\n<h4 style=\"color: brown;\">Summary<\/h4>\n<p>Fabric supplies the data engineer with the <font style=\"color: green;\">Microsoft Spark Utilities<\/font> library.  Today, we focused on some of the functions that work with both folders and files.  Writing code for large scale folder or file changes is the best way to go.  Who wants to create and upload a couple thousand files?<\/p>\n<p>Just remember that the Fabric service went to General Availability in November 2023.  Engineers are starting to use the system and suggest changes make it better.  In general, I love the <font style=\"color: brown;\">one lake concept<\/font> of Fabric.  Enclosed is the <a href=\"https:\/\/craftydba.com\/wp-content\/uploads\/2024\/02\/bundle-02.zip\">zip file<\/a> with the data files and Spark notebook.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Managing Files and Folders What is a data lake? It is just a bunch of files organized by folders. Keeping these files organized prevents your data lake from becoming a data swamp. Today, we are going to learn about a python library that can help you. Business Problem Our manager has given us weather data to load into Microsoft Fabric. We need to create folders in the landing zone to organize these files by both full and incremental loads. How can we accomplish this task? Technical Solution This use case&hellip;<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1132],"tags":[1134],"class_list":["post-8968","post","type-post","status-publish","format-standard","hentry","category-microsoft-fabric","tag-john-f-miner-iii-microsoft-fabric-data-engineering-apache-spark-microsoft-spark-utilities-folders-files-full-load-concept"],"_links":{"self":[{"href":"https:\/\/craftydba.com\/index.php?rest_route=\/wp\/v2\/posts\/8968","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/craftydba.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/craftydba.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/craftydba.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/craftydba.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=8968"}],"version-history":[{"count":132,"href":"https:\/\/craftydba.com\/index.php?rest_route=\/wp\/v2\/posts\/8968\/revisions"}],"predecessor-version":[{"id":9186,"href":"https:\/\/craftydba.com\/index.php?rest_route=\/wp\/v2\/posts\/8968\/revisions\/9186"}],"wp:attachment":[{"href":"https:\/\/craftydba.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=8968"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/craftydba.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=8968"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/craftydba.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=8968"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}