awslabs/amazon-redshift-utils

sp_check_primary_key procedure not working

bgmello opened this issue · 0 comments

I tried to create the procedure in my cluster and noticed it has some issues:

  1. It doesn't work with multiple keys;
  2. It doesn't work for tables inside schemas;
  3. It raises error when trying to FIX but the number of duplicates is 0.

I created my own version:

create procedure sp_check_primary_key(batch_time timestamp without time zone, check_table character varying, check_action character varying, log_table character varying, max_fix_rows integer)
    language plpgsql
as
$$
DECLARE
      sql        VARCHAR(MAX) := '';
      pk_columns VARCHAR(512) := '';
      dupe_count INTEGER := 0;
      temp_count INTEGER := 0;
  BEGIN
      IF check_action NOT IN ('LOG', 'FIX') THEN
           RAISE EXCEPTION 'The `check_action` parameter must be "LOG" or "FIX".';
      END IF;
      IF check_table='' OR log_table='' THEN
           RAISE EXCEPTION 'Parameters `check_table` and `log_table` cannot be empty.';
      END IF;
      IF check_action='FIX' AND max_fix_rows<=0 THEN
           RAISE EXCEPTION 'Parameter `max_fix_rows` must be >0 when `check_action` = "FIX".';
      END IF;
      --Retrieve the primary key column(s) for the table
      sql := 'SELECT REPLACE(REPLACE(pg_get_constraintdef(con.oid),''PRIMARY KEY ('',''''),'')'','''')'||
             ' FROM pg_constraint AS con '||
             ' JOIN pg_class      AS tbl ON tbl.relnamespace = con.connamespace AND tbl.oid = con.conrelid'||
             ' JOIN pg_namespace  AS sch ON sch.oid =tbl.relnamespace'||
             ' WHERE tbl.relkind = ''r'' AND con.contype = ''p''';
      IF CHARINDEX('.',check_table) > 0 THEN
          sql := sql||' AND sch.nspname = \''||quote_ident(SPLIT_PART(check_table,'.',1))||'\''
                    ||' AND tbl.relname = \''||quote_ident(SPLIT_PART(check_table,'.',2))||'\';';
      ELSE
          sql := sql||' AND tbl.relname = '''||check_table||''';';
      END IF;
      EXECUTE sql||' ;' INTO pk_columns;
      IF pk_columns = '' THEN
           RAISE INFO 'No PRIMARY KEY found for table "%".',check_table;
      ELSE
          --Count the number of duplicates in the PRIMARY KEY
          sql := 'SELECT SUM(dupes) FROM (SELECT '||pk_columns||', COUNT(*) dupes'||
                  ' FROM '||check_table||' GROUP BY '||pk_columns||' HAVING COUNT(*) > 1)';
          EXECUTE sql||' ;' INTO dupe_count;
          IF dupe_count IS NULL THEN
              dupe_count := 0;
          END IF;
          IF dupe_count = 0 THEN
               EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''', '''||SYSDATE||''',''OK - No duplicates found'',0;';
               RAISE INFO 'OK - No duplicates found';
          ELSIF check_action = 'LOG' THEN
               EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''', '''||SYSDATE||''',''ERROR (LOG) - Duplicates found'','||dupe_count||';';
               RAISE INFO 'ERROR (LOG) - % Duplicates found',dupe_count;
          ELSIF check_action = 'FIX' THEN
              IF max_fix_rows <= dupe_count THEN --Too many dupes, cannot fix
                   EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''', '''||SYSDATE||''',''ERROR (FIX) - Duplicate count exceeds `max_fix_rows` value.'','||dupe_count||';';
                   RAISE INFO 'ERROR (FIX) - Duplicate count % exceeds `max_fix_rows` %',dupe_count,max_fix_rows;
              ELSE --Attempt to correct the PK
                  EXECUTE 'DROP TABLE IF EXISTS tmp_sp_fix_pk;';
                  EXECUTE 'CREATE TEMPORARY TABLE tmp_sp_fix_pk (LIKE '||check_table||' );';
                  --Insert distinct rows for PK duplicates into temp table
                  EXECUTE 'INSERT INTO tmp_sp_fix_pk'||
                         ' SELECT DISTINCT * FROM '||check_table||' WHERE ('||pk_columns||') IN (SELECT '||pk_columns||
                           ' FROM '||check_table||' GROUP BY '||pk_columns||' HAVING COUNT(*) > 1)';
                  --Check that PK duplciates are removed in the temp table
                  EXECUTE 'SELECT COUNT(*) FROM (SELECT '||pk_columns||
                          ' FROM tmp_sp_fix_pk GROUP BY '||pk_columns||' HAVING COUNT(*) > 1)' INTO temp_count ;
                  IF temp_count > 0 THEN
                      EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''','''||SYSDATE||''',''ERROR (FIX) - Failed. Duplicate PK rows are not identical.'','||dupe_count||';';
                      RAISE INFO 'ERROR (FIX) - Failed. Duplicate PK rows are not identical';
                  ELSE
                      --Delete all rows for the PK duplicates from the source table
                      EXECUTE 'DELETE FROM '||check_table||' WHERE ('||pk_columns||') IN (SELECT '||pk_columns||' FROM tmp_sp_fix_pk);';
                      --Insert the deduped rows from the temp table into the source
                      EXECUTE 'INSERT INTO '||check_table||' SELECT * FROM tmp_sp_fix_pk;';
                      --Update the log for the fix
                      EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''','''||SYSDATE||''',''SUCCESS (FIX) - Duplicates corrected.'','||dupe_count||';';
                      EXECUTE 'SELECT '||pk_columns;
                      --Commit the fixed data
                      COMMIT;
                      RAISE INFO 'SUCCESS (FIX) - % duplicates corrected.',dupe_count;
                  END IF;
              END IF;
          ELSE
              RAISE EXCEPTION 'ERROR in `sp_check_primary_key` execution';
          END IF;
      END IF;
  END
$$;