spring-cloud/spring-cloud-stream-binder-aws-kinesis

PartitionKeyExpression can't find paylod properties

rsercano opened this issue · 3 comments

Hi,

I'm using below configuration

spring:
  cloud:
    stream:
      kinesis:
        binder:
          autoCreateStream: false
          kplKclEnabled: true
          autoAddShards: false
      bindings:
        producer-out-0:
          destination: ${KINESIS_DESTINATION:producer}
          content-type: application/json
          producer:
            partitionKeyExpression: payload.id
            headerMode: none

With below producer bean

@Component
@Slf4j
public class KinesisProducer {

    private final BlockingQueue<MyBean> event = new LinkedBlockingQueue<>();

    @Bean
    public Supplier<MyBean> gpsProducer() {
        return this.event::poll;
    }

    public void sendGps(MyBean event) {
        if (this.event.offer(event)) {
            log.info("Event sent: " + event);
        } else {
            log.error("Couldn't manage to send to Kinesis");
        }
    }
}

And it throws below

Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'id' cannot be found on object of type 'byte[]' - maybe not public or not valid?

Actually my bean has an id property, but can this be a timing issue in the binder itself?

I'm currently building aws_partitionKey header by myself before putting it into blockingqueue.

        Message<UserDeviceDto> message = MessageBuilder.withPayload(event)
                .setHeader("aws_partitionKey", event.getId().toString())
                .build();

I understand your struggle, but that is not possible against payload since it is serialized as a JSON into byte[] before it reaches that partitionKeyExpression.
So, the recommended way is indeed to transfer such an info via header.

At least that is how it works in the old version of Spring Cloud Stream.

Try with recently released Kinesis Binder 4.0.0.

Closing as Works as Designed

I see, thanks. To be honest it still feels weird this way since there's a configuration namely partitionKeyExpression but it doesn't do what the name suggests, will try new version asap

Thanks anyway