Tag: Microsoft

  • dbt – A quick fix to Unit Test feature with models having CTEs in DBT Fabric adapter

    dbt – A quick fix to Unit Test feature with models having CTEs in DBT Fabric adapter

    TL;DR

    DBT Fabric supports unit test feature but doesn’t support models having Common Table Expression (CTEs). I have setup a test case and found the issue. My solution is to use 2 regular expressions to fix the issue. The first regular expression is used to detect if a model has CTEs, while the second one is to extract the final selection statement and put CTE definition part on top of any generated SQL code in 2 internal macros relating to the unit test feature.

    You can find my code in this zipfile on Github at this link

    https://github.com/user-attachments/files/16408951/macros.zip

    The issue is raised in DBT Fabric Adapter repository on GitHub at this link

    https://github.com/microsoft/dbt-fabric/issues/185

    The details of my finding is presented in below sections.

    DBT Unit Test

    DBT has had test feature for a long time and now be classified as data test in version 1.8 and onwards. This is sensible since the data test aims to test quality and uniqueness of columns in a model or the statistics of the whole model. Unit Test is adopted for these reasons:

    • Allow to test complex transformative logics in a data model with multiple scenarios without actually deploying it to the warehouse.
    • A fail in data test could be the result of a bug of an internal CTE or transformative logic.

    With the great benefit, I was super eager to hands on the feature with a test scenario like below.

    Note: you are now familiar with unit test, I suggest have a look at PyTest at this link https://docs.pytest.org/en/stable/. You will find some terms will be used by DBT’s unit test feature

    Scenario

    Test Model

    I developed a conform model about Vacancy Referrals with 4 CTEs to find out which staff referred which vacancy to which customer at what time. You can see from the screenshot that the SQL code is pretty long and has some complex logics in the collapsed sections.

    Vacancy Referral Model Structure

    Fixtures

    Fixtures are self-prepared resources for unit tests. They can be configurations, environment variables, or mock data. In DBT unit test, they are mock data. DBT supports inline mock data, or in a CSV file and a SQL file. However, if you wish to test your model, which still not exist and has non-exist related models/views/seeds in the warehouse, you would have to choose SQL.

    According to DBT doc site, all fixtures must be put into fixtures folder in tests folder, which is configurable if you don’t want to stick with that standard name.

    Unit Test YML

    I created a YML file declaring all unit tests applying to the model as above. All relation macros like source and ref used in the model needs to be overridden with mock data. Even the expectation, which are expected rows, is also declared in a SQL file.

    My tip is to prepare mock data in CSV files in a user friendly editor, e.g., Excel, then convert to SQL selection statements by an online tool, which is a heap on the internet.

    A fixture’s mock data in SQL format

    Execution

    Now run this command to execute the test.

    dbt test -s Stg_Vacancy_Referrals test_type:unit

    Issue

    DBT log states that there is an error around the WITH statement, but I am pretty sure there’s nothing wrong with it. After checking the log for a while and looking around for the implementation of the unit test feature in DBT Fabric, I found this.

    Path: .venv/Lib/site-packages/dbt/include/fabric/macros/materializations/tests/helpers.sql

    Unit test SQL code generator

    The Fabric adapter put the main_sql, which is the actual code of my model, inside a subquery. Since my model uses CTEs, the generated SQL code won’t work.

    Solution

    My idea is to use a regular expression to extract the final selection in my model and put all CTEs on top of that macro’s SQL query. Fortunately, the idea becomes feasible because DBT supports some Python modules including re, which is for regular expression, in their Jinja template engine. Technically, I can compose an expression and test it in Python before putting into those DBT’s Jinja macros.

    Disclaimer: I am not a Regex Expert, so please do not expect my expressions would work with every coding style. You can enforce the style by a SQL lint to minimize the effort of maintaining your expressions.

    My regular expression is something like below. Firstly, I use the regex to check if the main_query has any CTEs. If it doesn’t have one, just stick with the default code.

    CTE Regex Pattern

    When the main_query has CTEs, another regex is used to extract the final selection statement. The final selection is removed from the main_query that is now the CTE definition part. Next, put the CTE definition part on top then the internal unit test CTEs. The implementation as below.

    The fix’s actual code

    I thought it was over and could be able to run the unit test gain without any issues. I was wrong. Another part in this process got the similar issue too. It is the get column name macro, which is used to find out column name of any SQL query.

    Path: .venv/Lib/site-packages/dbt/include/fabric/macros/adapters/columns.sql

    get_columns_in_query macro

    I can see the same issue lying in here. The fix is the same as applied to the unit test code generator.

    The fixed version

    Result

    After applying those fixes, the unit test now can run with my model. The result is as below.

    Unit Test result

    My model was not deployed, but SQL codes were actually executed in Fabric Warehouse.

    Summary

    I find the unit test feature super helpful. During doing the physical data modeling, I can quickly test some models without creating/deploying all related models but just use mock data. This become handy when the DBT project gradually grows in time with models convoluted to models, and this can save a lot of time to measure the impact of a change in a super complex model to its downstream.

    Hope you find this article helpful and informative.

  • fsspec – Remove the footer of a CSV file in Azure Synapse and Fabric

    fsspec – Remove the footer of a CSV file in Azure Synapse and Fabric

    CSV format is pretty common and sometimes has a footer for many reasons. A common reason can be that systems producing those CSV files want to put auditing properties for cross-checking or simply a default format due to their design. In my case, the CSV looks like the below screenshot. The footer is separated by an empty line following a separator then some properties.

    a CSV with footer

    To get rid of the footer, there are many ways that don’t even modify the file. For example, I used to declare an External Table for a CSV having a footer and remove the footer by creating a view with a filter condition something like below.

    WHERE TRY_CONVERT(<Actual Type>, <First Column in the CSV>) IS NOT NULL.

    It just doesn’t work in the case when the first column’s data type is textual, e.g., CHAR or VARCHAR. This solution was also annoying when I had to use TRY_CONVERT/TRY_CAST on non-text columns to get their original format in the view. It also shifted my schema validation process on raw files to the view level.

    Once a Spark runtime is available for my team, a lot of potential solutions are unlocked. One among them is to use fsspec, a filesystem interface in Python, to remove the footer with its tail function.

    FSSPEC

    fsspec is the backend of some utilities in Microsoft Spark Runtime. If you import fsspec in a notebook, you will find the implementation classes are AzureBlobFileSystem and OnelakeFileSystem for Azure Synapse Analytics and Fabric respectively. Those classes are internal and available in Microsoft Spark Runtime for their services.

    fsspec implementation in Azure Synapse
    fsspec implementation in Fabric

    A quick note if we import pandas, a popular data analysis/processing library, before importing fsspec, the actual implementation will be adlfs instead of an internal one. More information can be found in the link below. In short, the adlfs implementation does not work with Managed Identity in Synapse or the default connection in Fabric.

    https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/synapse-file-mount-api

    fsspec implementation for pandas package in both Synapse and Fabric

    Furthermore, mssparkutils.fs only has the head function for showing top n-bytes (row if you parse), but not the tail, which is declared in fsspec. This means mssparkutils.fs is not the solution I am looking for. It seems I will have to use fsspec directly.

    The tail function in fsspec

    https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.tail

    Solution

    The idea is simple and straightforward. I read the first 1024 bytes from the end of a CSV file and find the last empty line if applicable. Once the index of the empty line is located, I will write the file from the start to that index to a new file. The final result is a CSV file without a footer and can have its schema validated easily.

    Step 1 – Read the tail

    # file_path to the CSV file in Azure Blob Storage/ One Lake
    # In this case, I only read 1kb from the tail. If you know your footer is longer, use a bigger number.
    
    tail : bytes = fs.tail(file_path, size=1024) 
    total = fs.size(file_path) # We get total size in bytes of the file

    Step 2 – Find the empty line/separator

    # import re priorly
     match_it = re.finditer(b'(\r?\n){2}', tail, re.MULTILINE) 
    
    # Since I want to detect empty line and can be a heap of them, I store all matches in a # queue. In other words, empty line is the separator.
    q = deque(match_it, maxlen=1) 
    
    # Try to find out if the file has footer or not
     separator_idx = -1 if len(q) == 0 else q.pop().start() 

    Step 3 – Extract properties in the footer (Optional)

    if separator_idx > -1:
            record_count_m = re.findall(b'Record Count: (\d+)', tail)
            some_date_m = re.findall(b'Some Date: (\d{2}\/\d{2}\/\d{4})', tail)
            record_count = int(record_count_m[0])
            some_date = some_date_m[0].decode()

    Step 4 – Store the CSV body in a new file

    # CSV body's cutoff index = total size - tail's read size + separator index
    csv_cutoff = (total - 1024) + (separator_idx if separator_idx > -1 else 1024)
    
    csv_body = fs.read_block(file_path, 0, cutoff)
    
    # Store the CSV body in a new file somewhere
    fs.write_bytes(new_file_path, csv_body)

    Summary

    Sometimes being curious is helpful like this case when I check the abstract functions of fsspec. I really hope Microsoft will put the tail function to mssparkutils.fs in the future as it is handy in some cases like checking the last entries of a log file or the footer of a CSV file like in this scenario. Hope you find this article helpful.