medzin/beam-postgres

How to update values from PCollection.

Opened this issue · 12 comments

Hey I have been trying to use your library (which is very useful by the way) to try to Update a few records in by database. I am providing a snippet of code here which should provide some insight on what I am trying to achieve.

import apache_beam as beam
from beam_postgres.io import WriteToPostgres
from dataclasses import dataclass

DB_USERNAME = <db user name>
DB_PASSWORD = <db pass>
DB_HOST = <db host>
DB_PORT = <db port>
DB_DATABASENAME = <db name>

@dataclass
class DocId:
    document_id: str

class ProcessVals(beam.DoFn):
    def process(self, value):
        if value is None:
            yield DocId(None)
        else:
            yield DocId(value)

def check_if_none(value):
    if value.document_id is None:
        return False
    else :
        return True
    
with beam.Pipeline() as p:
    data = p | "Creating" >> beam.Create(
        ['02267a6d-0a9f-40bf-9051-4971961cb0ac', 
         '05db919e-adda-41c2-9197-54623dff6d1a', 
         '04c607ec-64e8-420e-b5a5-bb63e035ba2f', 
         None, 
         None]
       )
    
    data2 = data | "Molding" >> beam.ParDo(ProcessVals()) | "Filter" >> beam.Filter(check_if_none)
    
    data2 | "Writing example records to database" >> WriteToPostgres(
        conninfo = f"host={DB_HOST} dbname={DB_DATABASENAME} user={DB_USERNAME} password={DB_PASSWORD}",
        statement ="UPDATE documents SET is_available = true WHERE document_id = %s",
        )

Unfortunately this does not seem to be working, the pipeline runs fine, but I do not see the changes reflected in my database. I am really new to the apcache-beam space and I am having troubleshooting, could you help?

Hi, thanks for using the library! I would start by adding error logging, just like in this example: https://github.com/medzin/beam-postgres/blob/main/examples/write_error.py#L31

Also, it looks like there is a typo in the process method of the ProcessVals class:

if value is not None:
    yield DocId(None)

This DoFn will always create DocId objects with document_id set to None, so filtering will always return an empty collection.

Thank you for your prompt reply. Yes, that seems to have been the case. As you might guess this is just an imitation of the actual code I am trying to run. Let me recheck if I have made the same mistake in my main code.

I tried to log the error and here is the error I got.

(DocId(document_id='0230d273-ca7f-42d4-bf2d-5552d8ec9efa'), ProgrammingError('the query has 1 placeholders but 0 parameters were passed'))

It seems like I am passing the correct object and it still looks like it does not recognise the parameter.
For reference, the query I am using is this.

UPDATE documents SET is_available = true WHERE document_id = %s

The error message suggests that document_id is set to None. You could try printing data classes before writing with beam.Map(print).

But the error message shows the dataclass value right? The document id is clearly set to a valid value.

What runner are you using to run the pipeline (e.g. DataflowRunner)?

Right now I'm testing using the direct runner but eventually I'll be using the dataflow runner

I experienced a few problems with data class serialization on Dataflow - fields appeared to be set. Still, some class metadata was lost after deserialization, and function call here is returning an empty tuple. They only work without problems when I put my models in a dedicated Python package outside the pipeline code. I use apache-beam[gcp]==2.42.0 and Dataflow Flex Templates.

The issue is I am not yet even using DataFlow, just the plain old direct runner. I see that the code example above actually updates the required documents. And in my main code, I am actually using the exact same environment apache-beam==2.46.0 and python versions (3.10.8). I am also using the same dataclass and query. So I am not exactly sure why these are behaving differently.

Well, I'm using Python 3.9.16, which could be the difference causing the problem. Still, I suggest you put the data class in a dedicated Python package and check if it removes the problematic behavior.

Okay, let me try that. I'll report the results.

Hey so what I did was use tuples instead of Dataclasses. It seems to work now although I am not entirely sure what was causing the line you mentioned to fail.

The problem is that the astuple returns an empty tuple for data classes defined in the main session. It looks like that after the serialization/deserialization process __dataclass_fields__ attribute is corrupted somehow (it is used inside the astuple implementation).