PartitionKeyExpression can't find paylod properties
rsercano opened this issue · 3 comments
Hi,
I'm using below configuration
spring:
cloud:
stream:
kinesis:
binder:
autoCreateStream: false
kplKclEnabled: true
autoAddShards: false
bindings:
producer-out-0:
destination: ${KINESIS_DESTINATION:producer}
content-type: application/json
producer:
partitionKeyExpression: payload.id
headerMode: none
With below producer bean
@Component
@Slf4j
public class KinesisProducer {
private final BlockingQueue<MyBean> event = new LinkedBlockingQueue<>();
@Bean
public Supplier<MyBean> gpsProducer() {
return this.event::poll;
}
public void sendGps(MyBean event) {
if (this.event.offer(event)) {
log.info("Event sent: " + event);
} else {
log.error("Couldn't manage to send to Kinesis");
}
}
}
And it throws below
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'id' cannot be found on object of type 'byte[]' - maybe not public or not valid?
Actually my bean has an id property, but can this be a timing issue in the binder itself?
I'm currently building aws_partitionKey
header by myself before putting it into blockingqueue.
Message<UserDeviceDto> message = MessageBuilder.withPayload(event)
.setHeader("aws_partitionKey", event.getId().toString())
.build();
I understand your struggle, but that is not possible against payload
since it is serialized as a JSON into byte[]
before it reaches that partitionKeyExpression
.
So, the recommended way is indeed to transfer such an info via header.
At least that is how it works in the old version of Spring Cloud Stream.
Try with recently released Kinesis Binder 4.0.0
.
Closing as Works as Designed
I see, thanks. To be honest it still feels weird this way since there's a configuration namely partitionKeyExpression
but it doesn't do what the name suggests, will try new version asap
Thanks anyway