Azure/azure-functions-kafka-extension

Exception: AttributeError: 'str' object has no attribute 'type' python confluent kafka trigger

sramayanam opened this issue · 20 comments

Trying to process many records from confluent kafka and getting the str object error somewhere in the trigger code. Tried with both 3.8 and 3.9 venv. Following is the stack trace. There is no issue in processing record by record.

[2022-09-02T21:27:58.776Z] Exception: AttributeError: 'str' object has no attribute 'type'
[2022-09-02T21:27:58.776Z] Stack: File "/usr/local/Cellar/azure-functions-core-tools@4/4.0.4736/workers/python/3.8/OSX/X64/azure_functions_worker/dispatcher.py", line 388, in _handle__invocation_request
[2022-09-02T21:27:58.776Z] args[pb.name] = bindings.from_incoming_proto(
[2022-09-02T21:27:58.776Z] File "/usr/local/Cellar/azure-functions-core-tools@4/4.0.4736/workers/python/3.8/OSX/X64/azure_functions_worker/bindings/meta.py", line 88, in from_incoming_proto
[2022-09-02T21:27:58.776Z] return binding.decode(datum, trigger_metadata=metadata)
[2022-09-02T21:27:58.776Z] File "/usr/local/Cellar/azure-functions-core-tools@4/4.0.4736/workers/python/3.8/OSX/X64/azure/functions/kafka.py", line 176, in decode
[2022-09-02T21:27:58.776Z] return cls.decode_multiple_events(data, trigger_metadata)
[2022-09-02T21:27:58.776Z] File "/usr/local/Cellar/azure-functions-core-tools@4/4.0.4736/workers/python/3.8/OSX/X64/azure/functions/kafka.py", line 265, in decode_multiple_events
[2022-09-02T21:27:58.776Z] key=cls._decode_typed_data(
[2022-09-02T21:27:58.776Z] File "/usr/local/Cellar/azure-functions-core-tools@4/4.0.4736/workers/python/3.8/OSX/X64/azure/functions/meta.py", line 126, in _decode_typed_data
[2022-09-02T21:27:58.776Z] data_type = data.type
[2022-09-02T21:27:58.776Z] .
[2022-09-02T21:27:58.777Z] Failed to executed function with 64 items in topic_0 / 3 / 2990
[2022-09-02T21:27:58.777Z] System.Private.CoreLib: Exception while executing function: Functions.processeventsconfkafka. System.Private.CoreLib: Result: Failure
[2022-09-02T21:27:58.777Z] Exception: AttributeError: 'str' object has no attribute 'type'
[2022-09-02T21:27:58.777Z] Stack: File "/usr/local/Cellar/azure-functions-core-tools@4/4.0.4736/workers/python/3.8/OSX/X64/azure_functions_worker/dispatcher.py", line 388, in _handle__invocation_request
[2022-09-02T21:27:58.777Z] args[pb.name] = bindings.from_incoming_proto(
[2022-09-02T21:27:58.777Z] File "/usr/local/Cellar/azure-functions-core-tools@4/4.0.4736/workers/python/3.8/OSX/X64/azure_functions_worker/bindings/meta.py", line 88, in from_incoming_proto
[2022-09-02T21:27:58.777Z] return binding.decode(datum, trigger_metadata=metadata)
[2022-09-02T21:27:58.777Z] File "/usr/local/Cellar/azure-functions-core-tools@4/4.0.4736/workers/python/3.8/OSX/X64/azure/functions/kafka.py", line 176, in decode
[2022-09-02T21:27:58.777Z] return cls.decode_multiple_events(data, trigger_metadata)
[2022-09-02T21:27:58.777Z] File "/usr/local/Cellar/azure-functions-core-tools@4/4.0.4736/workers/python/3.8/OSX/X64/azure/functions/kafka.py", line 265, in decode_multiple_events
[2022-09-02T21:27:58.777Z] key=cls._decode_typed_data(
[2022-09-02T21:27:58.777Z] File "/usr/local/Cellar/azure-functions-core-tools@4/4.0.4736/workers/python/3.8/OSX/X64/azure/functions/meta.py", line 126, in _decode_typed_data
[2022-09-02T21:27:58.777Z] data_type = data.type
[2022-09-02T21:27:58.777Z] .

Hi @sramayanam, can you please share the function body for further debugging?

We have a samples folder in our repo where you can find examples for trigger and output binding for all languages. Adding reference for python samples - https://github.com/Azure/azure-functions-kafka-extension/tree/dev/samples/python

Hi,
i am facing the same error, i am attaching the code i am using

import json
import logging
import os
import uuid
import pandas as pd
import traceback
from datetime import date
from azure.functions import KafkaEvent
from azure.storage.blob import BlobServiceClient

blob_service_client = BlobServiceClient.from_connection_string(
    os.environ['AZURE_STORAGE_CONNECTION_STRING'])


def transform(a: KafkaEvent):
    return json.loads(json.loads(a.get_body())['Value'])


def writeToBlob(df: pd.DataFrame):
    uniqueClients = df.client.unique()
    for client in uniqueClients:
        blob_client = blob_service_client.get_blob_client(
            container=os.environ['BLOB_CONTAINER_NAME'],
            blob='aaa/{}/aaa/{}/{}.csv'.format(
                client, date.today().strftime("%d-%m-%Y"), uuid.uuid4())
        )
        tempDF = df[df['client'] == client]
        blob_client.upload_blob(tempDF.astype(str)
                                .drop(['_rid', '_self', '_ts', '_etag'], axis=1)
                                .replace('undefined', '')
                                .replace('nan', '').to_parquet(index=False)
                                )


def main(kevents):
    try:
        logging.info("Batch size {}".format(len(kevents)))
        df = pd.DataFrame(map(transform, kevents))
        writeToBlob(df=df)
    except Exception:
        print("----------------------------------------------------")
        traceback.print_exc()
        print("----------------------------------------------------")
        raise Exception("Some Error")

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "kafkaTrigger",
      "direction": "in",
      "name": "kevents",
      "protocol": "SASLSSL",
      "password": "KAFKA_PASSWORD",
      "topic": "xxx",
      "authenticationMode": "PLAIN",
      "cardinality": "MANY",
      "dataType": "string",
      "consumerGroup": "KAFKA_CONSUMER_GROUP",
      "username": "KAFKA_USERNAME",
      "BrokerList": "KAFKA_BROKER_LIST"
    }
  ]
}

Dockerfile

# To enable ssh & remote debugging on app service change the base image to the one below
# FROM mcr.microsoft.com/azure-functions/python:3.0-python3.9-appservice
FROM mcr.microsoft.com/azure-functions/python:3.0-python3.9

ENV AzureWebJobsScriptRoot=/home/site/wwwroot \
    AzureFunctionsJobHost__Logging__Console__IsEnabled=true

COPY requirements.txt /
RUN pip install -r /requirements.txt
ENV LD_LIBRARY_PATH=/home/site/wwwroot/bin/runtimes/linux-x64/native
COPY . /home/site/wwwroot

host.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  },
  "extensions": {
    "kafka":{
      "maxBatchSize":1000,
      "subscriberIntervalInSeconds":300
    }
  }
}

i tired following this and removing the types, but it didn't help
Azure/azure-functions-python-worker#670

Also having the same issue. Any update on this?

I am running the following example from this repo

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "xxxx",
            "topic" : "xxxx",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "xxxx",
            "BrokerList" : "xxxx"    
        }
    ]
}
import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

I get the following error:

[2022-09-26T13:16:36.183Z] System.Private.CoreLib: Exception while executing function: Functions.kafkatrigger. System.Private.CoreLib: Result: Failure
Exception: AttributeError: 'str' object has no attribute 'type'
Stack: File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 429, in _handle__invocation_request
args[pb.name] = bindings.from_incoming_proto(
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.9/WINDOWS/X64\azure_functions_worker\bindings\meta.py", line 88, in from_incoming_proto
return binding.decode(datum, trigger_metadata=metadata)
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.9/WINDOWS/X64\azure\functions\kafka.py", line 176, in decode
return cls.decode_multiple_events(data, trigger_metadata)
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.9/WINDOWS/X64\azure\functions\kafka.py", line 265, in decode_multiple_events
key=cls._decode_typed_data(
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.9/WINDOWS/X64\azure\functions\meta.py", line 126, in _decode_typed_data
data_type = data.type

@mortenbnftw,
It seems to be working fine for me.
I am using func core tools version - 4.0.4785.
If you are using an older version of func, can you upgrade it and check?

@mortenbnftw, It seems to be working fine for me. I am using func core tools version - 4.0.4785. If you are using an older version of func, can you upgrade it and check?

@krishna-kariya, Yes - Please see attached attached screenshot

Screenshot 2022-09-27 125548

I have tried using python 3.8 venv, python 3.9 venv and used different version of core tools 4.0.4785, 4.0.4829 and few others .

I have used the code present in the sample python azure function kafka extension in git "https://github.com/Azure/azure-functions-kafka-extension/tree/dev/samples/python/KafkaTriggerMany"
image
Iam trying for quite a few days.Any help would be much appreciated.

I have been facing the same issue since last three weeks, before to that it was working fine and I was able to consume the kafka messages, Not sure what has changed, but I had got confirmation from Kafka source team that they haven't done any changes at their end.

Link to the issue: https://learn.microsoft.com/en-us/answers/questions/1046604/python-azure-function-issue-for-kafka-trigger.html

I have tried below multiple things to resolve this issue but no luck.

  1. Have installed core tools version 4.0.785 and older as well.
  2. Have tried with multiple older version of the azure function extensions from visual studio code.
  3. I have re-installed, visual studio code and python in my system.

Can anyone suggest the root cause of this issue and were you guys able to resolve it? @mortenbnftw @sramayanam @isaacnikon93 @krishna-kariya

@chandrabisen i haven't been able to solve it, we are still facing the same issue

@isaacnikon93 : Have you got any official confirmation from Microsoft support, if there is any internal issue going on?

nope I haven't received any official communication, we had to temporarily start using node js azure function which seems to be working fine

I am looking into this issue.

I have tried using python 3.8 venv, python 3.9 venv and used different version of core tools 4.0.4785, 4.0.4829 and few others .

I have used the code present in the sample python azure function kafka extension in git "https://github.com/Azure/azure-functions-kafka-extension/tree/dev/samples/python/KafkaTriggerMany" image Iam trying for quite a few days.Any help would be much appreciated.

Hi @JyothsnaGoalla/ @mortenbnftw request you to please share the sample payload where it crashed, that would be helpful to analyze more.

Hi @shrohilla I am connecting to confluent cloud using kafkatrigger where I receive the below payload from topic
{'Timestamp': '10/13/2022 10:54:33', 'Headers': '[]', 'Key': '4', 'Partition': '1', 'Topic': 'order', 'Value': '{"ordertime":1235,"orderid":5,"itemid":"Item_184","address":{"city":"Mountain View","state":"CA","zipcode":94041}}', 'Offset': '1'}

When i want to receive multiple events from the same topic, I used List[KafkaEvent] functionality which is provided in "https://github.com/Azure/azure-functions-kafka-extension/tree/dev/samples/python/KafkaTriggerMany" .This is where the error is generated.

@shrohilla: Thanks for looking into it. As updated in your pull request, I changed the code in kafka.py file as key=parsed_key_props[i], in place of key=cls._decode_typed_data(parsed_key_props[i], python_type=str),. It is working for me(I can sonsume the kafka messages in my local system). But the same issue still persist in Azure function app in azure portal, because old code is being used and it can't be modified by us(due read-only system). Could you please let us know when we can expect this changes to be implemented by Micorosoft?

Hi @chandrabisen thanks alot for a very quick response, we are working very hard to deliver this asap.

Hi @chandrabisen / @JyothsnaGoalla / @isaacnikon93 / @mortenbnftw / @sramayanam
Please use the Azure functions library of version 1.13.0b1
in requirement.txt file please add this dependency like -- azure-functions==1.13.0b1

This is beta released by the partner team they will soon be live with 1.13 release for python client.

Closing the issue for now, feel free to reopen if this issue persists

Hi,
I'm getting similar error when running my azure function with kafka batch trigger. I've tried to change azure-fuctions version to 1.13.0b1 but it doesn't worked for me, also tried older versions but no luck, the same error. Do you have any workaround to escape from the error and run the function successfully?

Result: Failure Exception: UnboundLocalError: local variable 'parsed_headers_props' referenced before assignment Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 387, in _handle__invocation_request args[pb.name] = bindings.from_incoming_proto( File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/bindings/meta.py", line 88, in from_incoming_proto return binding.decode(datum, trigger_metadata=metadata) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure/functions/kafka.py", line 176, in decode return cls.decode_multiple_events(data, trigger_metadata) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure/functions/kafka.py", line 270, in decode_multiple_events headers=parsed_headers_props[i],

cc @shrohilla

@hovhanns both the errors are not inter-related request you to raise a new issue and do share the sample kafka payload where it crashed and do add if the headers are also there. I'll look into this and respond you asap.

i am still getting the following error event after using the beta version of azure function

2022-12-10T16:12:05.69592 Successfully Connected to container: 'swan-data-datalake' [Revision: 'swan-data-datalake--08t59dx', Replica: 'swan-data-datalake--08t59dx-6d54b9cccd-7nkrj']
2022-12-10T16:10:40.498879649Z args[pb.name] = bindings.from_incoming_proto(
2022-12-10T16:10:40.498881649Z File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/bindings/meta.py", line 88, in from_incoming_proto
2022-12-10T16:10:40.498885249Z return binding.decode(datum, trigger_metadata=metadata)
2022-12-10T16:10:40.498889949Z File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure/functions/kafka.py", line 176, in decode
2022-12-10T16:10:40.498893949Z return cls.decode_multiple_events(data, trigger_metadata)
2022-12-10T16:10:40.498898249Z File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure/functions/kafka.py", line 265, in decode_multiple_events
2022-12-10T16:10:40.498902949Z key=cls._decode_typed_data(
2022-12-10T16:10:40.498907149Z File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure/functions/meta.py", line 126, in _decode_typed_data
2022-12-10T16:10:40.498911549Z data_type = data.type
2022-12-10T16:10:40.498915549Z
2022-12-10T16:10:40.498920449Z at Microsoft.Azure.WebJobs.Script.Description.WorkerFunctionInvoker.InvokeCore(Object[] parameters, FunctionInvocationContext context) in /src/azure-functions-host/src/WebJobs.Script/Description/Workers/WorkerFunctionInvoker.cs:line 96
2022-12-10T16:10:40.498932149Z at Microsoft.Azure.WebJobs.Script.Description.FunctionInvokerBase.Invoke(Object[] parameters) in /src/azure-functions-host/src/WebJobs.Script/Description/FunctionInvokerBase.cs:line 82
2022-12-10T16:10:40.498937249Z at Microsoft.Azure.WebJobs.Host.Executors.VoidTaskMethodInvoker2.InvokeAsync(TReflected instance, Object[] arguments) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\VoidTaskMethodInvoker.cs:line 20 2022-12-10T16:10:40.498940149Z at Microsoft.Azure.WebJobs.Host.Executors.FunctionInvoker2.InvokeAsync(Object instance, Object[] arguments) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionInvoker.cs:line 52
2022-12-10T16:10:40.498942849Z at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.InvokeWithTimeoutAsync(IFunctionInvoker invoker, ParameterHelper parameterHelper, CancellationTokenSource timeoutTokenSource, CancellationTokenSource functionCancellationTokenSource, Boolean throwOnTimeout, TimeSpan timerInterval, IFunctionInstance instance) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 581
2022-12-10T16:10:40.498945949Z at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.ExecuteWithWatchersAsync(IFunctionInstanceEx instance, ParameterHelper parameterHelper, ILogger logger, CancellationTokenSource functionCancellationTokenSource) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 527
2022-12-10T16:10:40.498948449Z at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.ExecuteWithLoggingAsync(IFunctionInstanceEx instance, FunctionStartedMessage message, FunctionInstanceLogEntry instanceLogEntry, ParameterHelper parameterHelper, ILogger logger, CancellationToken cancellationToken) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 306
2022-12-10T16:10:40.498950349Z --- End of inner exception stack trace ---
2022-12-10T16:10:40.498952349Z at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.ExecuteWithLoggingAsync(IFunctionInstanceEx instance, FunctionStartedMessage message, FunctionInstanceLogEntry instanceLogEntry, ParameterHelper parameterHelper, ILogger logger, CancellationToken cancellationToken) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 352
2022-12-10T16:10:40.498964749Z at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.TryExecuteAsync(IFunctionInstance functionInstance, CancellationToken cancellationToken) in C:\projects\azure-webjobs-sdk-rqm4t\src\Microsoft.Azure.WebJobs.Host\Executors\FunctionExecutor.cs:line 108