/azure-log-analytics-data-export

python implementation of a queue for exporting large volumes of data from log analytics to a storage account

Primary LanguagePython

Azure Log Analytics Data Export

Summary | Files | Setup | Usage | Issues | References

Summary

This Azure Function App enables the export of big data (10M+ records per hour) from Azure Log Analytics to Blob Storage via Python SDKs, FastAPI, and API Management. In testing, 50M records with 10 columns were successfully exported in approximately 1 hour using a Consumption hosting plan.

Inputs and Outputs:

  • Input: log analytics workspace table(s), columns, and date range
  • Output: JSON (line delimited), CSV, or PARQUET files

Azure FastAPI HTTP Functions:

  1. azure_ingest_test_data(): creates and ingests test data (optional)
  2. azure_submit_query(): submits single query that is split into smaller queries/jobs and sends to queue
  3. azure_submit_query_parallel(): breaks up initial query and submits multiple queries in parallel
  4. azure_get_status(): gives high-level status of query (number of sub-queries, successes, failures, row counts, file sizes)

Azure Queue Functions:

  1. azure_queue_query(): processes split queries
  2. azure_queue_process(): processes subqueries and saves output to storage blobs
  3. azure_queue_query_poison(): processes invalid messages in query queue and saves to table log
  4. azure_queue_process_poison(): processes invalid message in process queue and saves to table log

image

image

image

Files

  • azure-log-analytics-data-export.ipynb: python notebook for development, testing, or interactive use
  • function_app.py: Azure Function App python source code
  • host.json: Azure Function App settings
  • requirements.txt: python package requirements file

Setup

You will need to have access to or provision the following Azure Resources:

  1. Log Analytics Workspace (data source)
  2. Storage Account
  • Container (data output destination)
  • Queues (temp storage for split query messages/jobs)
  • Tables (logging)
  1. Azure Function App (Python 3.11+, consumption or premium)
  1. Azure API Management (consumption, do not use developer)

Authentication (Managed Identity or Service Principal) Roles Setup:

  • Azure Portal -> Function App -> Identity -> System Assigned -> On -> Add Azure Role Assignments
  1. Monitoring Metrics Publisher: Ingest to Log Analytics (optional)
  2. Log Analytics Contributor: Query Log Analytics
  3. Storage Queue Data Contributor: Storage Queue Send/Get/Delete
  4. Storage Queue Data Message Processor: Storage Queue Trigger for Azure Function
  5. Storage Blob Data Contributor: Upload to Blob Storage
  6. Storage Table Data Contributor: Logging

image

Environment Variables for Queue Triggers via Managed Identity:

  • Setup via Azure Portal -> Function App -> Settings -> Configuration -> Environment Variables
  1. storageAccountConnectionString__queueServiceUri -> https://<STORAGE_ACCOUNT>.queue.core.windows.net/
  2. storageAccountConnectionString__credential -> managedidentity
  3. QueueQueryName -> <STORAGE_QUEUE_NAME_FOR_QUERIES>
  4. QueueProcessName -> <STORAGE_QUEUE_NAME_FOR_PROCESSING>

image

Azure Storage Setup:

  1. Create 1 container for data output files
    • <STORAGE_CONTAINER_NAME>
  2. Create 4 queues for messages/jobs
    • <STORAGE_QUEUE_NAME_FOR_QUERIES>
    • <STORAGE_QUEUE_NAME_FOR_PROCESSING>
    • <STORAGE_QUEUE_NAME_FOR_QUERIES>-poison for failed messages
    • <STORAGE_QUEUE_NAME_FOR_PROCESSING>-poison for failed messages
  3. Create 3 tables for logging (i.e. ingestlog, querylog, and processlog)
    • <STORAGE_TABLE_INGEST_LOG_NAME>
    • <STORAGE_TABLE_QUERY_LOG_NAME>
    • <STORAGE_TABLE_PROCESS_LOG_NAME>

API Management (APIM) Setup:

  • Note: APIM is used to access the FastAPI Swagger/OpenAPI docs
  1. Create APIM Service -> Consumption Pricing Tier (do not select developer)
  2. Add new API -> Function App
    • Function App: <YOUR_FUNCTION>
    • Display Name: Protected API Calls
    • Name: protected-api-calls
    • Suffix: api
  3. Remove all operations besides POST
    • Edit POST operation
      • Display name: azure_ingest_test_data
      • URL: POST /azure_ingest_test_data
    • Clone and Edit new POST operation
      • Display name: azure_submit_query
      • URL: POST /azure_submit_query
    • Clone and Edit new POST operation
      • Display name: azure_submit_query_parallel
      • URL: POST /azure_submit_query_parallel
    • Clone and Edit new POST operation
      • Display name: azure_get_status_post
      • URL: POST /azure_get_status
    • Clone azure_get_status operation
      • Change from POST to GET
      • Display name: azure_get_status
      • URL: GET /azure_get_status
    • Edit OpenAPI spec json operation ids to match above
  4. Add new API -> Function App
    • Function App: <YOUR_FUNCTION>
    • Display Name: Public Docs
    • Name: public-docs
    • Suffix: public
  5. Remove all operations besides GET
    • Settings -> uncheck 'subscription required'
    • Edit GET operation
      • Display name: Documentation
      • URL: GET /docs
    • Clone and Edit new GET operation
      • Display name: OpenAPI Schema
      • URL: GET /openapi.json
    • Edit OpenAPI spec json operation ids to match above
    • Test at https://<APIM_ENDPOINT_NAME>.azure-api.net/public/docs

image

image

Queue Trigger Setup::

  • To fix message encoding errors (default is base64), add "extensions": {"queues": {"messageEncoding": "none"}} to host.json
  • Note: Failed messages/jobs are sent to <QUEUE_NAME>-poison

Optional Data Collection Endpoint and Rule Setup for Log Analytics Ingest:

  1. Azure Portal -> Monitor -> Data Collection Endpoints -> Create
  2. Azure Portal -> Log Analytics -> Table -> Create New Custom Table
  3. Reference: Tutorial: Send data to Azure Monitor Logs with Logs ingestion API (Azure portal)

Optional Environment Variables (reduces number of params in requests):

  • Setup via Azure Portal -> Function App -> Settings -> Configuration -> Environment Variables
  1. QueueURL -> <STORAGE_QUEUE_URL>
  2. TableURL -> <STORAGE_TABLE_URL>
  3. TableIngestName -> <STORAGE_TABLE_INGEST_LOG_NAME>
  4. TableQueryName -> <STORAGE_TABLE_QUERY_LOG_NAME>
  5. TableProcessName -> <STORAGE_TABLE_PROCESS_LOG_NAME>

image

Optional Security Settings:

  1. Restrict Azure Function App and APIM to specific IP address range(s)
  • Networking -> Public Access -> Select Virtual Networks or IPs

image

Usage

Swagger UI Docs at https://<APIM_ENDPOINT_NAME>.azure-api.net/public/docs

  • API calls require a APIM subscription key
  • APIM -> Subscription -> Create Subscription -> Copy Key
  • Paste in "Ocp-Apim-Subscription-Key" header field

image

  1. azure_submit_query() or azure_submit_query_parallel():
  • HTTP POST Example:
{
    "subscription_id" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "resource_group_name" : "XXXXXXXXXXXXXXXXXXXXXXX",
    "log_analytics_worksapce_name" : "XXXXXXXXXXXXXXXX",
    "log_analytics_workspace_id" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "storage_blob_url" : "https://XXXXXXXXXXXXXXXXXXXXX.blob.core.windows.net/",
    "storage_blob_container_name" : "XXXXXXXXXXXXX",
    "table_names_and_columns" : { "XXXXXXXXXXXXXXX_CL": ["TimeGenerated","DataColumn1","DataColumn2","DataColumn3","DataColumn4","DataColumn5","DataColumn6","DataColumn7","DataColumn8","DataColumn9"]},
    "start_datetime" : "2024-03-19 00:00:00",
    "end_datetime" : "2024-03-20 00:00:00"
}
  • HTTP Response Examples:
    • azure_submit_query()
    • azure_submit_query_parallel()
{
    "query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "submit_status": "Success",
    "table_names": "XXXXXXXXXXX_CL",
    "start_datetime": "2024-03-19 00:00:00.000000",
    "end_datetime": "2024-03-20 00:00:00.000000",
    "total_row_count": 23000000,
    "subqueries_generated": 95,
    "subqueries_sent_to_queue": 95,
    "runtime_seconds": 92.1,
    "submit_datetime": "2024-03-26 16:24:38.771336"
}
{
    "query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "split_status": "Success",
    "table_names": "XXXXXXXXXXX_CL",
    "start_datetime": "2024-04-04 00:00:00.000000",
    "end_datetime": "2024-04-10 00:00:00.000000",
    "number_of_messages_generated": 6,
    "number_of_messages_sent": 6,
    "total_row_count": 2010000,
    "runtime_seconds": 0.9,
    "split_datetime": "2024-04-12 14:06:41.688752"
}
  1. azure_get_status():
  • HTTP POST Request Example:
{
    "query_uuid" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}
  • HTTP Response Example:
{
    "query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "query_partitions" : 1,
    "submit_status": "Success",
    "processing_status": "Partial",
    "percent_complete": 29.5,
    "runtime_since_submit_seconds": 463.6
    "estimated_time_remaining_seconds": 1107.9,
    "number_of_subqueries": 95,
    "number_of_subqueries_success": 28,
    "number_of_subqueries_failed": 0,
    "query_row_count": 23000000,
    "output_row_count": 6972002,
    "output_file_size": 2.05,
    "output_file_units" : "GB"
}

Issues

  1. Azure Function App stops processing sub-queries, queue trigger not processing messages in queue:

    • Manually restart Azure Function App in Azure Portal
    • Use Premium or Dedicated Plan
  2. Submit exceed 10 min Azure Function limit and fails

    • Use azure_submit_query_parallel() function
    • Reduce the datetime range of the query (recommend less than 100M records)
    • Decrease break_up_query_freq value in azure_submit_query()
    • Decrease parallel_process_break_up_query_freq value in azure_submit_query_parallel()
    • Use Premium or Dedicated Plan with no time limit
  3. Table row count values exceeding 2,147,483,647

    • Change type from int32 to int64

Changelog

2.0.0:

  • Changed Azure Function code to use FastAPI in order to use Swagger UI
  • Added pydantic input/output JSON schemas
  • Updated documentation

1.5.0:

  • Added azure_submit_queries() function for larger datetime ranges and parallel processing

1.4.0:

  • Refactored code and made pylint edits
  • Changed logging to % formatting from f-strings

1.3.1:

  • Fixed UTC time zone bug
  • Added estimated time remaining to get_status() response
  • Added option to put storage queue and table params in env variables

1.3.0:

  • Added pydantic input validation
  • Added Open API yaml file for Azure API Management

1.2.0:

  • Added get_status() azure function

1.1.0:

  • Added logging to Azure Table Storage
  • Added row count checks

1.0.0:

  • Initial release

References

  1. How to use logic apps to handle large amounts of data from log analtyics
  2. FastAPI on Azure Functions