Summary | Files | Setup | Usage | Issues | References
This Azure Function App enables the export of big data (10M+ records per hour) from Azure Log Analytics to Blob Storage via Python SDKs, FastAPI, and API Management. In testing, 50M records with 10 columns were successfully exported in approximately 1 hour using a Consumption hosting plan.
Inputs and Outputs:
- Input: log analytics workspace table(s), columns, and date range
- Output: JSON (line delimited), CSV, or PARQUET files
Azure FastAPI HTTP Functions:
- azure_ingest_test_data(): creates and ingests test data (optional)
- azure_submit_query(): submits single query that is split into smaller queries/jobs and sends to queue
- azure_submit_query_parallel(): breaks up initial query and submits multiple queries in parallel
- azure_get_status(): gives high-level status of query (number of sub-queries, successes, failures, row counts, file sizes)
Azure Queue Functions:
- azure_queue_query(): processes split queries
- azure_queue_process(): processes subqueries and saves output to storage blobs
- azure_queue_query_poison(): processes invalid messages in query queue and saves to table log
- azure_queue_process_poison(): processes invalid message in process queue and saves to table log
- azure-log-analytics-data-export.ipynb: python notebook for development, testing, or interactive use
- function_app.py: Azure Function App python source code
- host.json: Azure Function App settings
- requirements.txt: python package requirements file
You will need to have access to or provision the following Azure Resources:
- Log Analytics Workspace (data source)
- Storage Account
- Container (data output destination)
- Queues (temp storage for split query messages/jobs)
- Tables (logging)
- Azure Function App (Python 3.11+, consumption or premium)
- Clone this repo, use VS Code, install Azure Functions tools/extension, deploy to Azure subscription
- Reference: Create a function in Azure with Python using Visual Studio Code
- Azure API Management (consumption, do not use developer)
Authentication (Managed Identity or Service Principal) Roles Setup:
- Azure Portal -> Function App -> Identity -> System Assigned -> On -> Add Azure Role Assignments
- Monitoring Metrics Publisher: Ingest to Log Analytics (optional)
- Log Analytics Contributor: Query Log Analytics
- Storage Queue Data Contributor: Storage Queue Send/Get/Delete
- Storage Queue Data Message Processor: Storage Queue Trigger for Azure Function
- Storage Blob Data Contributor: Upload to Blob Storage
- Storage Table Data Contributor: Logging
Environment Variables for Queue Triggers via Managed Identity:
- Setup via Azure Portal -> Function App -> Settings -> Configuration -> Environment Variables
- storageAccountConnectionString__queueServiceUri -> https://<STORAGE_ACCOUNT>.queue.core.windows.net/
- storageAccountConnectionString__credential -> managedidentity
- QueueQueryName -> <STORAGE_QUEUE_NAME_FOR_QUERIES>
- QueueProcessName -> <STORAGE_QUEUE_NAME_FOR_PROCESSING>
Azure Storage Setup:
- Create 1 container for data output files
- <STORAGE_CONTAINER_NAME>
- Create 4 queues for messages/jobs
- <STORAGE_QUEUE_NAME_FOR_QUERIES>
- <STORAGE_QUEUE_NAME_FOR_PROCESSING>
- <STORAGE_QUEUE_NAME_FOR_QUERIES>-poison for failed messages
- <STORAGE_QUEUE_NAME_FOR_PROCESSING>-poison for failed messages
- Create 3 tables for logging (i.e. ingestlog, querylog, and processlog)
- <STORAGE_TABLE_INGEST_LOG_NAME>
- <STORAGE_TABLE_QUERY_LOG_NAME>
- <STORAGE_TABLE_PROCESS_LOG_NAME>
API Management (APIM) Setup:
- Note: APIM is used to access the FastAPI Swagger/OpenAPI docs
- Create APIM Service -> Consumption Pricing Tier (do not select developer)
- Add new API -> Function App
- Function App: <YOUR_FUNCTION>
- Display Name: Protected API Calls
- Name: protected-api-calls
- Suffix: api
- Remove all operations besides POST
- Edit POST operation
- Display name: azure_ingest_test_data
- URL: POST /azure_ingest_test_data
- Clone and Edit new POST operation
- Display name: azure_submit_query
- URL: POST /azure_submit_query
- Clone and Edit new POST operation
- Display name: azure_submit_query_parallel
- URL: POST /azure_submit_query_parallel
- Clone and Edit new POST operation
- Display name: azure_get_status_post
- URL: POST /azure_get_status
- Clone azure_get_status operation
- Change from POST to GET
- Display name: azure_get_status
- URL: GET /azure_get_status
- Edit OpenAPI spec json operation ids to match above
- Edit POST operation
- Add new API -> Function App
- Function App: <YOUR_FUNCTION>
- Display Name: Public Docs
- Name: public-docs
- Suffix: public
- Remove all operations besides GET
- Settings -> uncheck 'subscription required'
- Edit GET operation
- Display name: Documentation
- URL: GET /docs
- Clone and Edit new GET operation
- Display name: OpenAPI Schema
- URL: GET /openapi.json
- Edit OpenAPI spec json operation ids to match above
- Test at https://<APIM_ENDPOINT_NAME>.azure-api.net/public/docs
Queue Trigger Setup::
- To fix message encoding errors (default is base64), add "extensions": {"queues": {"messageEncoding": "none"}} to host.json
- Note: Failed messages/jobs are sent to <QUEUE_NAME>-poison
Optional Data Collection Endpoint and Rule Setup for Log Analytics Ingest:
- Azure Portal -> Monitor -> Data Collection Endpoints -> Create
- Azure Portal -> Log Analytics -> Table -> Create New Custom Table
- Reference: Tutorial: Send data to Azure Monitor Logs with Logs ingestion API (Azure portal)
Optional Environment Variables (reduces number of params in requests):
- Setup via Azure Portal -> Function App -> Settings -> Configuration -> Environment Variables
- QueueURL -> <STORAGE_QUEUE_URL>
- TableURL -> <STORAGE_TABLE_URL>
- TableIngestName -> <STORAGE_TABLE_INGEST_LOG_NAME>
- TableQueryName -> <STORAGE_TABLE_QUERY_LOG_NAME>
- TableProcessName -> <STORAGE_TABLE_PROCESS_LOG_NAME>
Optional Security Settings:
- Restrict Azure Function App and APIM to specific IP address range(s)
- Networking -> Public Access -> Select Virtual Networks or IPs
Swagger UI Docs at https://<APIM_ENDPOINT_NAME>.azure-api.net/public/docs
- API calls require a APIM subscription key
- APIM -> Subscription -> Create Subscription -> Copy Key
- Paste in "Ocp-Apim-Subscription-Key" header field
- azure_submit_query() or azure_submit_query_parallel():
- HTTP POST Example:
{
"subscription_id" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"resource_group_name" : "XXXXXXXXXXXXXXXXXXXXXXX",
"log_analytics_worksapce_name" : "XXXXXXXXXXXXXXXX",
"log_analytics_workspace_id" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"storage_blob_url" : "https://XXXXXXXXXXXXXXXXXXXXX.blob.core.windows.net/",
"storage_blob_container_name" : "XXXXXXXXXXXXX",
"table_names_and_columns" : { "XXXXXXXXXXXXXXX_CL": ["TimeGenerated","DataColumn1","DataColumn2","DataColumn3","DataColumn4","DataColumn5","DataColumn6","DataColumn7","DataColumn8","DataColumn9"]},
"start_datetime" : "2024-03-19 00:00:00",
"end_datetime" : "2024-03-20 00:00:00"
}
- HTTP Response Examples:
- azure_submit_query()
- azure_submit_query_parallel()
{
"query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"submit_status": "Success",
"table_names": "XXXXXXXXXXX_CL",
"start_datetime": "2024-03-19 00:00:00.000000",
"end_datetime": "2024-03-20 00:00:00.000000",
"total_row_count": 23000000,
"subqueries_generated": 95,
"subqueries_sent_to_queue": 95,
"runtime_seconds": 92.1,
"submit_datetime": "2024-03-26 16:24:38.771336"
}
{
"query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"split_status": "Success",
"table_names": "XXXXXXXXXXX_CL",
"start_datetime": "2024-04-04 00:00:00.000000",
"end_datetime": "2024-04-10 00:00:00.000000",
"number_of_messages_generated": 6,
"number_of_messages_sent": 6,
"total_row_count": 2010000,
"runtime_seconds": 0.9,
"split_datetime": "2024-04-12 14:06:41.688752"
}
- azure_get_status():
- HTTP POST Request Example:
{
"query_uuid" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}
- HTTP Response Example:
{
"query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"query_partitions" : 1,
"submit_status": "Success",
"processing_status": "Partial",
"percent_complete": 29.5,
"runtime_since_submit_seconds": 463.6
"estimated_time_remaining_seconds": 1107.9,
"number_of_subqueries": 95,
"number_of_subqueries_success": 28,
"number_of_subqueries_failed": 0,
"query_row_count": 23000000,
"output_row_count": 6972002,
"output_file_size": 2.05,
"output_file_units" : "GB"
}
-
Azure Function App stops processing sub-queries, queue trigger not processing messages in queue:
- Manually restart Azure Function App in Azure Portal
- Use Premium or Dedicated Plan
-
Submit exceed 10 min Azure Function limit and fails
- Use azure_submit_query_parallel() function
- Reduce the datetime range of the query (recommend less than 100M records)
- Decrease break_up_query_freq value in azure_submit_query()
- Decrease parallel_process_break_up_query_freq value in azure_submit_query_parallel()
- Use Premium or Dedicated Plan with no time limit
-
Table row count values exceeding 2,147,483,647
- Change type from int32 to int64
2.0.0:
- Changed Azure Function code to use FastAPI in order to use Swagger UI
- Added pydantic input/output JSON schemas
- Updated documentation
1.5.0:
- Added azure_submit_queries() function for larger datetime ranges and parallel processing
1.4.0:
- Refactored code and made pylint edits
- Changed logging to % formatting from f-strings
1.3.1:
- Fixed UTC time zone bug
- Added estimated time remaining to get_status() response
- Added option to put storage queue and table params in env variables
1.3.0:
- Added pydantic input validation
- Added Open API yaml file for Azure API Management
1.2.0:
- Added get_status() azure function
1.1.0:
- Added logging to Azure Table Storage
- Added row count checks
1.0.0:
- Initial release