page_type | languages | products | description | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
sample |
|
|
This sample outlines ways to accomplish validation across files received in a batch format using Azure Serverless technologies. |
This sample outlines multiple ways to accomplish the following set of requirements using Azure Serverless technologies. One way uses the "traditional" serverless approach, another Logic Apps, and another Azure Functions' Durable Functions feature.
Given a set of customers, assume each customer uploads data to our backend for historical record keeping and analysis. This data arrives in the form of a set of .csv
files with each file containing different data. Think of them almost as SQL Table dumps in CSV format.
When the customer uploads the files, we have two primary objectives:
- Ensure that all the files required for the customer are present for a particular "set" (aka "batch") of data
- Only when we have all the files for a set, continue on to validate the structure of each file ensuring a handful of requirements:
- Each file must be UTF-8 encoded
- Depending on the file (type1, type2, etc), ensure the correct # of columns are present in the CSV file
To accomplish this sample, you'll need to set up a few things:
- Azure General Purpose Storage
- For the Functions SDK to store its dashboard info, and the Durable Functions to store their state data
- Azure Blob Storage
- For the customer files to be uploaded in to
- Azure Event Grid (with Storage Events)
- ngrok to enable local Azure Function triggering from Event Grid (see this blog post for more)
- Visual Studio 2019
- Azure Storage Explorer (makes testing easier)
For the Python version of this sample (folder AzureFunctions.Python
), follow the instructions in its dedicated readme.
Pull down the code.
Copy sample.local.settings.json
in the AzureFunctions.v3
project to a new file called local.settings.json
.
This file will be used across the functions, durable or otherwise.
Next, run any of the Function apps in this solution. You can use the v1 (.Net Framework) or the v3 (.Net Core) version, it's only needed for Event Grid validation. With the function running, add an Event Grid Subscription to the Blob Storage account (from step 2), pointing to the ngrok-piped endpoint you created in step 4. The URL should look something like this:
- Normal Functions:
https://b3252cc3.ngrok.io/api/EnsureAllFiles
- Durable Functions:
https://b3252cc3.ngrok.io/api/Orchestrator
Upon saving this subscription, you'll see your locally-running Function get hit with a request and return HTTP OK, then the Subscription will go green in Azure and you're set.
Now, open Azure Storage Explorer and connect to the Blob Storage Account you've created. In here, create a container named cust1
. Inside the container, create a new folder called inbound
.
Take one of the .csv
files from the sampledata
folder of this repo, and drop it in to the inbound folder.
You'll see the endpoint you defined as your Event Grid webhook subscription get hit.
- Determine the "batch prefix" of the file that was dropped. This consists of the customer name (cust1), and a datetime stamp in the format YYYYMMDD_HHMM, making the batch prefix for the first batch in
sampledata
defined ascust1_20171010_1112
- Check to see if a sub-orchestration for this batch already exists.
- If not, spin one up and pass along the Event Grid data that triggered this execution
- If so, use
RaiseEvent
to pass the filename along to the instance.
In the EnsureAllFiles
sub-orchestration, we look up what files we need for this customer (cust1) and check to see which files have come through thus far. As long as we do not have the files we need, we loop within the orchestration. Each time waiting for an external newfile
event to be thrown to let us know a new file has come through and should be processed.
When we find we have all the files that constitute a "batch" for the customer, we call the ValidateFileSet
activity function to process each file in the set and validate the structure of them according to our rules.
When Validation completes successfully, all files from the batch are moved to a valid-set
subfolder in the blob storage container. If validation fails (try removing a column in one of the lines in one of the files), the whole set gets moved to invalid-set
Because of the persistent behavior of state for Durable Functions, if you need to reset the execution because something goes wrong it's not as simple as just re-running the function. To do this properly, you must:
- Delete the
DurableFunctionsHubHistory
Table in the "General Purpose" Storage Account you created in Step 1 above. - Delete any files you uploaded to the
/inbound
directory of the blob storage container triggering the Functions.
Note: after doing these steps you'll have to wait a minute or so before running either of the Durable Function implementations as the storage table creation will error with 409 CONFLICT while deletion takes place.
- Determine the "batch prefix" of the file that was dropped. This consists of the customer name (cust1), and a datetime stamp in the format YYYYMMDD_HHMM, making the batch prefix for the first batch in
sampledata
defined ascust1_20171010_1112
- Check to see if we have all necessary files in blob storage with this prefix.
- If we do, check to see if there's a lock entry in the
FileProcessingLocks
table of the General Purpose Storage Account containing this prefix. If so, bail. If not, create one, then call theValidateFunctionUrl
endpoint with the batch prefix as payload. - The Validate function gets the request & checks to see if the lock is marked as 'in progress'. If so, bail. If not, mark it as such and continue validating the files in the Blob Storage account which match the prefix passed in.
When Validation completes successfully, all files from the batch are moved to a valid-set
subfolder in the blob storage container. If validation fails (try removing a column in one of the lines in one of the files), the whole set gets moved to invalid-set
- Delete the
FileProcessingLocks
table from the General Purpose Storage Account. - Delete any files you uploaded to the
/inbound
directory of the blob storage container triggering the Functions.
Note: after doing these steps you'll have to wait a minute or so before running either of the Durable Function implementations as the storage table creation will error with 409 CONFLICT while deletion takes place.
While not identically behaved, this repo also contains deployment scripts for two Logic App instances which perform roughly the same flow.
This LA gets Storage Events from event grid, pulls off the full prefix of the file (also containing the URL), and sends this on to...
This receives events from the Processor and waits for 3 containing the same prefix to arrive before sending the batch on to the next step (you can change this to be whatever you want after deployment)
- If you drop all the files in at once, there exists a race condition when the events fired from Event Grid hit the top-level Orchestrator endpoint; it doesn't execute
StartNewAsync
fast enough and instead of one instance per batch, you'll end up with multiple instances for the same prefix (even though we desire one instance per, acting like a singleton).
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.
When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.