sp_check_primary_key procedure not working
bgmello opened this issue · 0 comments
bgmello commented
I tried to create the procedure in my cluster and noticed it has some issues:
- It doesn't work with multiple keys;
- It doesn't work for tables inside schemas;
- It raises error when trying to FIX but the number of duplicates is 0.
I created my own version:
create procedure sp_check_primary_key(batch_time timestamp without time zone, check_table character varying, check_action character varying, log_table character varying, max_fix_rows integer)
language plpgsql
as
$$
DECLARE
sql VARCHAR(MAX) := '';
pk_columns VARCHAR(512) := '';
dupe_count INTEGER := 0;
temp_count INTEGER := 0;
BEGIN
IF check_action NOT IN ('LOG', 'FIX') THEN
RAISE EXCEPTION 'The `check_action` parameter must be "LOG" or "FIX".';
END IF;
IF check_table='' OR log_table='' THEN
RAISE EXCEPTION 'Parameters `check_table` and `log_table` cannot be empty.';
END IF;
IF check_action='FIX' AND max_fix_rows<=0 THEN
RAISE EXCEPTION 'Parameter `max_fix_rows` must be >0 when `check_action` = "FIX".';
END IF;
--Retrieve the primary key column(s) for the table
sql := 'SELECT REPLACE(REPLACE(pg_get_constraintdef(con.oid),''PRIMARY KEY ('',''''),'')'','''')'||
' FROM pg_constraint AS con '||
' JOIN pg_class AS tbl ON tbl.relnamespace = con.connamespace AND tbl.oid = con.conrelid'||
' JOIN pg_namespace AS sch ON sch.oid =tbl.relnamespace'||
' WHERE tbl.relkind = ''r'' AND con.contype = ''p''';
IF CHARINDEX('.',check_table) > 0 THEN
sql := sql||' AND sch.nspname = \''||quote_ident(SPLIT_PART(check_table,'.',1))||'\''
||' AND tbl.relname = \''||quote_ident(SPLIT_PART(check_table,'.',2))||'\';';
ELSE
sql := sql||' AND tbl.relname = '''||check_table||''';';
END IF;
EXECUTE sql||' ;' INTO pk_columns;
IF pk_columns = '' THEN
RAISE INFO 'No PRIMARY KEY found for table "%".',check_table;
ELSE
--Count the number of duplicates in the PRIMARY KEY
sql := 'SELECT SUM(dupes) FROM (SELECT '||pk_columns||', COUNT(*) dupes'||
' FROM '||check_table||' GROUP BY '||pk_columns||' HAVING COUNT(*) > 1)';
EXECUTE sql||' ;' INTO dupe_count;
IF dupe_count IS NULL THEN
dupe_count := 0;
END IF;
IF dupe_count = 0 THEN
EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''', '''||SYSDATE||''',''OK - No duplicates found'',0;';
RAISE INFO 'OK - No duplicates found';
ELSIF check_action = 'LOG' THEN
EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''', '''||SYSDATE||''',''ERROR (LOG) - Duplicates found'','||dupe_count||';';
RAISE INFO 'ERROR (LOG) - % Duplicates found',dupe_count;
ELSIF check_action = 'FIX' THEN
IF max_fix_rows <= dupe_count THEN --Too many dupes, cannot fix
EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''', '''||SYSDATE||''',''ERROR (FIX) - Duplicate count exceeds `max_fix_rows` value.'','||dupe_count||';';
RAISE INFO 'ERROR (FIX) - Duplicate count % exceeds `max_fix_rows` %',dupe_count,max_fix_rows;
ELSE --Attempt to correct the PK
EXECUTE 'DROP TABLE IF EXISTS tmp_sp_fix_pk;';
EXECUTE 'CREATE TEMPORARY TABLE tmp_sp_fix_pk (LIKE '||check_table||' );';
--Insert distinct rows for PK duplicates into temp table
EXECUTE 'INSERT INTO tmp_sp_fix_pk'||
' SELECT DISTINCT * FROM '||check_table||' WHERE ('||pk_columns||') IN (SELECT '||pk_columns||
' FROM '||check_table||' GROUP BY '||pk_columns||' HAVING COUNT(*) > 1)';
--Check that PK duplciates are removed in the temp table
EXECUTE 'SELECT COUNT(*) FROM (SELECT '||pk_columns||
' FROM tmp_sp_fix_pk GROUP BY '||pk_columns||' HAVING COUNT(*) > 1)' INTO temp_count ;
IF temp_count > 0 THEN
EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''','''||SYSDATE||''',''ERROR (FIX) - Failed. Duplicate PK rows are not identical.'','||dupe_count||';';
RAISE INFO 'ERROR (FIX) - Failed. Duplicate PK rows are not identical';
ELSE
--Delete all rows for the PK duplicates from the source table
EXECUTE 'DELETE FROM '||check_table||' WHERE ('||pk_columns||') IN (SELECT '||pk_columns||' FROM tmp_sp_fix_pk);';
--Insert the deduped rows from the temp table into the source
EXECUTE 'INSERT INTO '||check_table||' SELECT * FROM tmp_sp_fix_pk;';
--Update the log for the fix
EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''','''||SYSDATE||''',''SUCCESS (FIX) - Duplicates corrected.'','||dupe_count||';';
EXECUTE 'SELECT '||pk_columns;
--Commit the fixed data
COMMIT;
RAISE INFO 'SUCCESS (FIX) - % duplicates corrected.',dupe_count;
END IF;
END IF;
ELSE
RAISE EXCEPTION 'ERROR in `sp_check_primary_key` execution';
END IF;
END IF;
END
$$;