Feature request: Parser integration for Batch Processing
Closed this issue · 9 comments
Use case
As a customer, when working with Batch Processing, I want to have the utility parse and validate payloads before they're passed to my record handler1, so that I can focus on my business logic.
Today, assuming I'm working with SQS - but this applies to all the sources supported by the Batch Processing utility - I need to manually parse/validate payloads against my schemas manually in the body of the record handler like this:
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { parse } from '@aws-lambda-powertools/parser';
import { JSONStringified } from '@aws-lambda-powertools/parser/helpers';
import {
SqsRecordSchema,
SqsSchema,
} from '@aws-lambda-powertools/parser/schemas/sqs';
import { z } from 'zod';
import type { SQSHandler, SQSRecord } from 'aws-lambda';
const customSchema = z.object({
name: z.string(),
age: z.number(),
});
const SqsExtendedSchema = SqsSchema.extend({
Records: z.array(
SqsRecordSchema.extend({
body: JSONStringified(customSchema),
})
),
});
const processor = new BatchProcessor(EventType.SQS);
const recordHandler = async (record: SQSRecord) => {
const payload = parse(record, undefined, SqsExtendedSchema);
const { body: { name, age } } = payload; // this is safe to use because it's parsed
};
export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});We could simplify this experience quite a bit by making some assumptions (see below).
Solution/User Experience
Because we always know the EventType at the time of instantiation of the BatchProcessor we can allow customers to simply pass us a schema for the record/item like so:
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { z } from 'zod';
import type { SQSHandler, SQSRecord } from 'aws-lambda';
const customSchema = z.object({
name: z.string(),
age: z.number(),
});
const processor = new BatchProcessor(EventType.SQS, customSchema);
const recordHandler = async (
{ body: { name, age } }: SQSRecord & { body: z.infer<typeof customSchema> }
) => {
// this is safe to use because it's parsed
};
export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});Under the hood, we can then:
- dynamically require the
parsefunction,JSONStringifiedhelper, andSqsSchemaschema - extend the
SqsSchemawith the customer-provided schema - call
parse(<data>, undefined, <customer-provided-schema>) - mark the item as failed if parsing fails, pass it to the record handler if parsing is successful
Open questions:
Note that this will require us to use dynamic imports - this should be fine since the record handler is meant to be async, so that code path is already async. Alternatively, we'll need to make the customer pass us the parser function and the schema already extended, which basically saves one line.
Alternative solutions
Acknowledgment
- This feature request meets Powertools for AWS Lambda (TypeScript) Tenets
- Should this be considered in other Powertools for AWS Lambda languages? i.e. Python, Java, and .NET
Future readers
Please react with 👍 and your use case to help us understand customer demand.
Footnotes
-
A record handler is the function designated to be called by the Batch Processing utility for each item ↩
I think it's more intuitive for the customer to pass just the schema instead of making them pass the parser function and also extend the schema as well. I guess it wouldn't be too much of an overhead with the dynamic import
I would should this second parameter be an options object here, in case we want to extend in the future:
const processor = new BatchProcessor(EventType.SQS, {schema: customSchema});I have have a weak preference for passing in the parser as an argument, 1) it's explicit and 2) it's easier to test. Is this what it would look like?
const processor = new BatchProcessor(EventType.SQS, {schema: customSchema, parser: myParser});But I am happy to go the dynamic import route to if that's what we decide.
I would should this second parameter be an options object here, in case we want to extend in the future:
const processor = new BatchProcessor(EventType.SQS, {schema: customSchema});
Yeah, sounds good to me
I have have a weak preference for passing in the parser as an argument, 1) it's explicit and 2) it's easier to test. Is this what it would look like?
const processor = new BatchProcessor(EventType.SQS, {schema: customSchema, parser: myParser});
Here, I think the schema needs to be extended based on the event type by the customers and then just pass the parser in to the BatchProcessor if I'm understanding this correctly.
const SqsExtendedSchema = SqsSchema.extend({
Records: z.array(
SqsRecordSchema.extend({
body: JSONStringified(customSchema),
})
),
});
const myParser = parser({ schema: SqsExtendedSchema });
const processor = new BatchProcessor(EventType.SQS, { parser: myParser });Ah I see. Yeah, I guess it does feel more natural to just pass in the schema now that I think about it.
After reading through the discussion, the options object approach that @svozza suggested makes perfect sense for extensibility, and I agree with @sdangol that just passing the schema is much more intuitive than requiring customers to set up the parser themselves.
Building on the Discussion
I've been digging through the current BatchProcessor and Parser implementations to understand how it works, and I think we can make this work really cleanly while following the consensus that's emerging here.
Supporting the Options Object Approach
@svozza's suggestion for the options object is spot on:
const processor = new BatchProcessor(EventType.SQS, { schema: customSchema });This gives us room to grow and keeps the API clean. Here's how I think we can implement this:
Enhanced Constructor:
interface BatchProcessorOptions {
schema?: z.ZodSchema | StandardSchema;
// Future options can go here
}
constructor(
eventType: EventType,
options?: BatchProcessorOptions
)Type-Safe Integration:
// The handler signature would change based on schema presence
type RecordHandler<T> = T extends undefined
? (record: EventRecord) => Promise<void>
: (record: ParsedRecord<EventRecord, T>) => Promise<void>;Implementation Ideas
-
Envelope Auto-Detection: We can automatically map
EventTypeto the right envelopes:const ENVELOPE_MAP = { [EventType.SQS]: SqsEnvelope, [EventType.KinesisDataStreams]: KinesisEnvelope, [EventType.DynamoDBStreams]: DynamoDBEnvelope, } as const;
-
Parse-then-Process Flow:
- Parse record → success? call handler → handler fails? mark as failed
- Parse record → fails? mark as failed (skip handler)
- This keeps the existing failure behavior while adding validation
-
Zero Breaking Changes: All existing code keeps working, new stuff is opt-in
Addressing the Implementation Details
Dynamic Imports: Totally agree with the team that this is the right approach. The batch processing path is already async, and it keeps the Parser dependency optional for users who don't need it.
Schema Handling: Building on the discussion, here's how I think we can handle the automatic envelope detection:
private getEnvelopeForEventType(eventType: EventType) {
const envelopeMap = {
[EventType.SQS]: () => import('@aws-lambda-powertools/parser/envelopes').then(m => m.SqsEnvelope),
[EventType.KinesisDataStreams]: () => import('@aws-lambda-powertools/parser/envelopes').then(m => m.KinesisEnvelope),
[EventType.DynamoDBStreams]: () => import('@aws-lambda-powertools/parser/envelopes').then(m => m.DynamoDBEnvelope),
};
return envelopeMap[eventType];
}This way we automatically pick the right envelope based on the EventType, which addresses @dreamorosi's original goal of simplifying the experience.
Type Safety: For the parsed records, we can use conditional types to make the handler signature change based on whether a schema is provided:
type RecordHandler<T extends BatchProcessorOptions> = T['schema'] extends undefined
? (record: EventRecord) => Promise<void>
: (record: ParsedRecord<EventRecord, T['schema']>) => Promise<void>;Example Implementation Sketch
class BatchProcessor<TSchema extends Schema = undefined> {
private parseRecord(record: EventRecord): ParsedRecord | null {
if (!this.schema) return record as ParsedRecord;
try {
const envelope = ENVELOPE_MAP[this.eventType];
return parse(record, this.schema, envelope);
} catch (error) {
// Log parsing error, return null to mark as failed
return null;
}
}
async processRecord(record: EventRecord): Promise<void> {
const parsedRecord = this.parseRecord(record);
if (!parsedRecord) {
throw new Error('Parsing failed'); // Will be caught by existing error handling
}
return this.handler(parsedRecord);
}
}How This Fits Together
This feels consistent with how the @parser decorator works, just applied to batch processing. Anyone already using the Parser utility should find this pretty intuitive.
Implementation Approach
Based on the discussion, here's how I see this working:
Phase 1: Basic Schema Integration
- Options-based constructor (following @svozza's suggestion)
- Dynamic import of parser utilities (as @sdangol agreed)
- Automatic envelope detection based on
EventType - Graceful parsing failures (mark record as failed, let batch processing handle it)
Phase 2: Polish & Edge Cases
- Full type safety with conditional types
- Error handling refinements
- Integration with existing parser options (safeParse, etc.)
I'd Love to Help Build This
Since @sdangol is assigned to this, I'd be happy to collaborate or help in any way! I could:
- Draft the TypeScript interfaces based on our discussion
- Create the parsing integration logic with the dynamic imports
- Handle the envelope auto-detection for different event types
- Write comprehensive tests for all the scenarios
Would you be open to me putting together some initial code based on this discussion? I'm thinking I start with the basic integration following the consensus here, then iterate on the details.
Hi @dcabib - thank you for offering your help - once @sdangol is done with the implementation and the PR is merged please feel free to pull the main branch and run some tests. If anything interesting comes up we can definitely open a new issue and address it there.
Regarding the suggestions, I just wanted to leave a couple comments/questions
private getEnvelopeForEventType(eventType: EventType) { const envelopeMap = { [EventType.SQS]: () => import('@aws-lambda-powertools/parser/envelopes').then(m => m.SqsEnvelope), [EventType.KinesisDataStreams]: () => import('@aws-lambda-powertools/parser/envelopes').then(m => m.KinesisEnvelope), [EventType.DynamoDBStreams]: () => import('@aws-lambda-powertools/parser/envelopes').then(m => m.DynamoDBEnvelope), }; return envelopeMap[eventType]; }
I don't this we should implement it this way.
You correctly pointed out that we need to rely on an async behavior but you suggested we make the getEnvelopeForEventType synchronous which with the suggested implementation might end up in a situation in which we try to use the envelope an the Promise hasn't been resolved yet.
In practice, getEnvelopeForEventType should be marked as async and the import should be awaited and possibly centralized and wrapped with a try/catch rather than repeated in line three times:
private async getEnvelopeForEventType(eventType: EventType) { // type is wrong
try {
const envelopes = await import('@aws-lambda-powertools/parser/envelopes');
if (eventType === EventType.SQS) {
return envelope.SqsEnvelope;
} else if (eventType === EventType.DynamoDBStreams) {
return envelope.DynamoDBEnvelope;
}
return m.KinesisEnvelope;
} catch (error) {
// log an error that informs the customer they didn't install the Parser package
}
}Also, in your code you're using EventType as a type in the function signature, while in reality it's a constant object which means it should've been used as keyof typeof EventType.
In practice however, since we expose scoped sub-path exports (i.e. @aws-lambda-powertools/parser/envelopes/sqs) instead of having a generic function like the above we'll likely end up with either 3 (one for each event type), or some additional if/else statements at the top to build the import path.
type RecordHandler<T extends BatchProcessorOptions> = T['schema'] extends undefined ? (record: EventRecord) => Promise<void> : (record: ParsedRecord<EventRecord, T['schema']>) => Promise<void>;
I am not sure this is possible with the current implementation.
The record handler is currently typed simply as CallableFunction (here) and the module's architecture has basically no relationship with the BatchProcessor class.
This means that it's going to be tricky to dynamically assign the type of the record handler - which belongs to the processPartialResponse function (here) from a parameter passed to the BasePartialBatchProcessor class constructor here.
Am I missing something?
Hey @dreamorosi! Thanks for the detailed review and the technical insights.
On the Async Import Approach
For the envelope handling, I was thinking of a factory pattern where we lazily load the right envelope based on the event type. The idea was to keep the import resolution separate from the actual usage:
private async getEnvelopeForEventType(eventType: keyof typeof EventType) {
try {
const envelopes = await import('@aws-lambda-powertools/parser/envelopes');
const envelopeMap = {
[EventType.SQS]: envelopes.SqsEnvelope,
[EventType.KinesisDataStreams]: envelopes.KinesisEnvelope,
[EventType.DynamoDBStreams]: envelopes.DynamoDBEnvelope,
};
return envelopeMap[eventType];
} catch (error) {
// Handle parser package not installed
throw new Error('Parser package required for schema validation');
}
}Since you mentioned the scoped sub-path exports, we could also go with individual imports per event type, which might be cleaner:
private async loadSqsEnvelope() {
const { SqsEnvelope } = await import('@aws-lambda-powertools/parser/envelopes/sqs');
return SqsEnvelope;
}In my point of view, both approaches work - just depends on whether we want centralized or distributed import logic. We can follow your approach with no issues at at...
Type Safety Strategy
For the type safety piece, I was exploring conditional types to make the handler signature change based on whether a schema is provided. The challenge you pointed out about the CallableFunction typing is real, but there might be a path forward.
What if we approach it differently - instead of trying to change the handler signature dynamically, we could:
- Keep the existing
CallableFunctionfor backward compatibility - Add an optional generic overload for typed scenarios
- Use the schema information internally for parsing, but let TypeScript inference handle the rest
Something like:
// Existing signature (unchanged)
processPartialResponse(event, recordHandler, processor, options?)
// New typed overload
processPartialResponse<TSchema>(event, recordHandler: TypedHandler<TSchema>, processor, options?)The parsing would happen transparently in the processor, and developers who want type safety can opt into the typed version.
Implementation Thoughts
The core idea still feels solid - taking the schema at the BatchProcessor level and handling the envelope detection automatically. The implementation details you raised are good catches, but they're more about execution than the overall approach.
Looking forward to seeing how @sdangol tackles this! I'll definitely test it out once it's ready.
Warning
This issue is now closed. Please be mindful that future comments are hard for our team to see.
If you need more assistance, please either reopen the issue, or open a new issue referencing this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.
This is now released under v2.26.0 version!