How to read new records

What you thought you knew about copying new records from a table with a serial id is all wrong once you have multiple writes happening to your database inside transactions. We ran across into a situation with a MS SQL where we missed getting some records from a table that is appended only. There is a serial primary key and we would grab any new serials since the last grab of data. We found that we missed some records due to the inserts being performed by more than one thread. So developers asked how could we prevent this problem with PostgreSQL. This article is my response to our developers on what caused this problem and how to avoid this problem in PostgreSQL.

Setting up the problem:

Lets create our table. I know oids are off by default, but I have included them on purpose so we can see if they will make any difference in the long run.

CREATE TABLE public.testing (
  id SERIAL,
  val TEXT,
  PRIMARY KEY(id)
) 
WITH (oids = true);

Now we need to create two threads and insert some data.

BEGIN;
INSERT INTO testing (val) VALUES ('test');

Please note that we are leaving the above thread "idle in transaction" on purpose.

BEGIN;
INSERT INTO testing (val) VALUES ('testing');
COMMIT;

Lets make a copy of our data, using the new records.

CREATE TABLE test_copy AS SELECT * FROM testing;

In Thread 1 we will finish our transaction.

INSERT INTO testing (val) VALUES ('tested');
COMMIT;

And now finish coping our new records.

INSERT INTO test_copy AS SELECT * FROM testing WHERE id > (SELECT max(id) FROM test_copy);

The data from our main table, as we expect.

SELECT * FROM testing;
public.testing
id val
1 test
2 testing
3 tested

Let's look at our copied data.

SELECT * FROM test_copy;
public.test_copy
id val
2 testing
3 tested

We find that the copied data it is missing record one, due to it's transaction not having been completed at the time the first set of data was copied. With an application that is single threaded, you will never run into this issue. But as you can see, with a multi-threaded application, you can run into missing records.

Looking for a solution

Let's take a look at the system columns to see if they can help.

SELECT oid, tableoid, xmin, cmin, xmax, cmax, ctid, * FROM public.testing;
public.testing
oid tableoid xmin cmin xmax cmax ctid id val
OID Table OID INSERT/UPDATE Transaction ID INSERT/UPDATE Transaction Line # DELETE Transaction ID DELETE Transaction Line # (Page, Record)
494969335 494969326 293897773 0 0 0 (0,1) 1 test
494969336 494969326 293897776 0 0 0 (0,2) 2 testing
494969337 494969326 293897773 1 0 1 (0,3) 3 tested

The xmin is our transaction id. We can't use the transaction id because record 1, "293897773", is numerically before record 2 "293897776". The ctid is our unique record indicator, but it is in the format (page, line on page) and since record 1 was written before record 2, this is also no help to us. The cmin lets us know the data line within the transaction, but that is no help either. xmax and cmax is our deleting transaction and deleting data row respectively and no help to us. If you turn oid on, then the oid will have the same issue as our serial id and transaction id but will just reach the problem faster as it is used across many tables. This means that there is nothing in the system fields that will help us figure out that we need to grab record 1.

Lets take a look at the page item attribute details. This does require superuser permissions, but has been interesting in the past for debugging issues.

CREATE EXTENSION pageinspect;
CREATE OR REPLACE FUNCTION public.heap_page_item_attrs_details (
  table_name pg_catalog.regclass
)
RETURNS TABLE (
  p integer,
  lp smallint,
  lp_off smallint,
  lp_flags smallint,
  lp_len smallint,
  t_xmin pg_catalog.xid,
  t_xmax pg_catalog.xid,
  t_field3 integer,
  t_ctid pg_catalog.tid,
  heap_hasnull boolean,
  heap_hasvarwidth boolean,
  heap_hasexternal boolean,
  heap_hasoid boolean,
  heap_xmax_keyshr_lock boolean,
  heap_combocid boolean,
  heap_xmax_excl_lock boolean,
  heap_xmax_lock_only boolean,
  heap_xmax_shr_lock boolean,
  heap_lock_mask boolean,
  heap_xmin_committed boolean,
  heap_xmin_invalid boolean,
  heap_xmax_committed boolean,
  heap_xmax_invalid boolean,
  heap_xmin_frozen boolean,
  heap_xmax_is_multi boolean,
  heap_updated boolean,
  heap_moved_off boolean,
  heap_moved_in boolean,
  heap_moved boolean,
  heap_xact_mask boolean,
  heap_natts_mask integer,
  heap_keys_updated boolean,
  heap_hot_updated boolean,
  heap_only_tuple boolean,
  heap2_xact_mask boolean,
  t_hoff smallint,
  t_bits text,
  t_oid oid,
  t_attrs bytea []
) AS
$body$
WITH RECURSIVE t(
    n) AS(
  SELECT (pg_relation_size / 8192 - 1)::integer AS int4
  FROM pg_relation_size($1::regclass)
  UNION ALL
  SELECT t_1.n - 1
  FROM t t_1
  WHERE t_1.n > 0)
SELECT 
             n AS p,
             lp,
             lp_off,
             lp_flags,
             lp_len,
             t_xmin, /* inserting xact ID */
             t_xmax, /* deleting or locking xact ID */
             t_field3, /* Union of inserting or deleting command ID, or both AND old-style VACUUM FULL xact ID */
             t_ctid, /*(Page Number,Tuple Number within Page) current TID of this or newer tuple (or a speculative insertion token) */ 
             ((t_infomask) & x'0001'::integer)::boolean AS HEAP_HASNULL,      /* has null attribute(s) */
             ((t_infomask) & x'0002'::integer)::boolean AS HEAP_HASVARWIDTH,      /* has variable-width attribute(s) */
             ((t_infomask) & x'0004'::integer)::boolean AS HEAP_HASEXTERNAL,      /* has external stored attribute(s) */
             ((t_infomask) & x'0008'::integer)::boolean AS HEAP_HASOID,      /* has an object-id field */
             ((t_infomask) & x'0010'::integer)::boolean AS HEAP_XMAX_KEYSHR_LOCK,      /* xmax is a key-shared locker */
             ((t_infomask) & x'0020'::integer)::boolean AS HEAP_COMBOCID,      /* t_cid is a combo cid */
             (( t_infomask) & x'0040'::integer)::boolean AS HEAP_XMAX_EXCL_LOCK,      /* xmax is exclusive locker */
             (( t_infomask) & x'0080'::integer)::boolean AS HEAP_XMAX_LOCK_ONLY,      /* xmax, if valid, is only a locker */
       
             ((t_infomask) & (x'0040' | x'0010')::integer)::boolean AS HEAP_XMAX_SHR_LOCK, /* xmax is a shared locker #define HEAP_XMAX_SHR_LOCK  (HEAP_XMAX_EXCL_LOCK | HEAP_XMAX_KEYSHR_LOCK) */
             ((t_infomask) & ((x'0040' | x'0010') | x'0040' | x'0010')::integer)::boolean AS HEAP_LOCK_MASK, /* xmax is a shared locker #define HEAP_XMAX_SHR_LOCK    (HEAP_XMAX_EXCL_LOCK | HEAP_XMAX_KEYSHR_LOCK) */
       
             ((t_infomask) & x'0100'::integer)::boolean AS HEAP_XMIN_COMMITTED,    /* t_xmin committed */
             ((t_infomask) & x'0200'::integer)::boolean AS HEAP_XMIN_INVALID,    /* t_xmin invalid/aborted */
             ((t_infomask) & x'0400'::integer)::boolean AS HEAP_XMAX_COMMITTED,    /* t_xmax committed */
             ((t_infomask) & x'0800'::integer)::boolean AS HEAP_XMAX_INVALID,  /* t_xmax invalid/aborted aka xmax_rolled_back */
             ((t_infomask) & (x'0800' | x'0400')::integer)::boolean AS HEAP_XMIN_FROZEN,  /* (HEAP_XMIN_COMMITTED|HEAP_XMIN_INVALID) */
       
             ((t_infomask) & x'1000'::integer)::boolean AS HEAP_XMAX_IS_MULTI,    /* t_xmax is a MultiXactId */
             ((t_infomask) & x'2000'::integer)::boolean AS HEAP_UPDATED,    /* this is UPDATEd version of row */
             ((t_infomask) & x'4000'::integer)::boolean AS HEAP_MOVED_OFF,    /* moved to another place by pre-9.0
                                         * VACUUM FULL; kept for binary
                                         * upgrade support */
             ((t_infomask) & x'8000'::integer)::boolean AS HEAP_MOVED_IN,    /* moved from another place by pre-9.0
                                         * VACUUM FULL; kept for binary
                                         * upgrade support */
             ((t_infomask) & (x'4000' | x'8000')::integer)::boolean AS HEAP_MOVED,    /* HEAP_MOVED (HEAP_MOVED_OFF | HEAP_MOVED_IN) */
             ((t_infomask) & x'FFF0'::integer)::boolean AS HEAP_XACT_MASK,    /* visibility-related bits */

             ((t_infomask2) & x'07FF'::integer) AS HEAP_NATTS_MASK,    /* 11 bits for number of attributes */
             ((t_infomask2) & x'2000'::integer)::boolean AS HEAP_KEYS_UPDATED,    /* tuple was updated and key cols
                                         * modified, or tuple deleted */
             ((t_infomask2) & x'4000'::integer)::boolean AS HEAP_HOT_UPDATED,    /* tuple was HOT-updated */
             ((t_infomask2) & x'8000'::integer)::boolean AS HEAP_ONLY_TUPLE,    /* this is heap-only tuple */
             ((t_infomask2) & x'E000'::integer)::boolean AS HEAP2_XACT_MASK,    /* visibility-related bits */
             t_hoff, /* sizeof header incl. bitmap, padding */
             t_bits, /* bitmap of NULLs */
             t_oid,
             t_attrs
FROM 
(
    SELECT t.n, (heap_page_item_attrs(get_raw_page($1::text, n), $1::regclass, true)).* 
    FROM t
) a;
$body$
LANGUAGE 'sql'
VOLATILE
RETURNS NULL ON NULL INPUT
SECURITY DEFINER;

GRANT EXECUTE ON FUNCTION public.heap_page_item_attrs_details (table_name regclass) TO PUBLIC;

SELECT * FROM public.heap_page_item_attrs_details('public.testing');

Now when we look at the records, we will see all the information that is visible to PostgreSQL internally for each record. This includes deleted records and updated records, old and new records.

public.testing
p lp lp_off lp_flags lp_len t_xmin t_xmax t_field3 t_ctid heap_hasnull heap_hasvarwidth heap_hasexternal heap_hasoid heap_xmax_keyshr_lock heap_combocid heap_xmax_excl_lock heap_xmax_lock_only heap_xmax_shr_lock heap_lock_mask heap_xmin_committed heap_xmin_invalid heap_xmax_committed heap_xmax_invalid heap_xmin_frozen heap_xmax_is_multi heap_updated heap_moved_off heap_moved_in heap_moved heap_xact_mask heap_natts_mask heap_keys_updated heap_hot_updated heap_only_tuple heap2_xact_mask t_hoff t_bits t_oid t_attrs
0 1 8152 1 41 293897773 0 0 (0,1) False True False False False False False False False False True False False True True False False False False False True 2 False False False False 32 Null 494969335 {"\x01000000","\x74657374"}
0 2 8112 1 44 293897776 0 0 (0,2) False True False False False False False False False False True False False True True False False False False False True 2 False False False False 32 Null 494969336 {"\x02000000","\x74657374696e67"}
0 3 8072 1 43 293897773 0 1 (0,3) False True False False False False False False False False True False False True True False False False False False True 2 False False False False 32 Null 494969337 {"\x03000000","\x746573746564"}

While this is a lot of great information for debugging issue with bloat, etc., none of it is actually useful in this case.

Lets take a look at our transaction commit timestamps.

SELECT pg_catalog.pg_xact_commit_timestamp(xmin), * FROM public.testing;
-- ERROR:  could not get commit timestamp data
-- HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.

Ops, we don't have that turned on in our config. This means that we need to edit the postgresql.conf and change "track_commit_timestamp" from off to on. Once you have done this, you will need to restart postgres. For any new transaction, there will now be a timestamp for each commit. There is a disk space price to pay for this feature, but for most people, this is a small price to pay.

ALTER SYSTEM SET track_commit_timestamp TO 'on';
-- Now restart postgres

By not having this feature on, this means that you/we need to restart our testing from the beginning, drop our two test tables and start over with our tests at the top of this article.

DROP FUNCTION public.heap_page_item_attrs_details (table_name regclass);
DROP EXTENSION pageinspect;
DROP TABLE public.test_copy;
DROP TABLE public.testing;
-- Go to the top of this article and start over on the with the testing.

Now, we should get results that look like this.

public.testing
pg_xact_commit_timestamp id val
2019-01-07 13:22:59.257975-08 1 test
2019-01-07 13:21:55.770918-08 2 testing
2019-01-07 13:22:59.257975-08 3 tested

Since the pg_xact_commit_timestamp for Record 2 is before the pg_xact_commit_timestamp of Record 1, this means a solution is possible. Lets reset our tests and then test our solution.

DROP TABLE public.test_copy;
TRUNCATE TABLE public.testing;

The solution

BEGIN;
INSERT INTO testing (val) VALUES ('test');

Please note that we are leaving the above thread "idle in transaction" on purpose.

BEGIN;
INSERT INTO testing (val) VALUES ('testing');
COMMIT;

Lets make a copy of our data, using the new records.

-- Prepping for our copy
CREATE TABLE test_copy (LIKE public.testing);
CREATE TABLE test_copy_last_record (
  t_type TEXT,
  t_time TIMESTAMP WITH TIME ZONE,
  PRIMARY KEY(t_type)
);
INSERT INTO test_copy_last_record VALUES ('next', NULL), ('last', NULL);

-- Grabbing the latest transaction timestamp
INSERT INTO test_copy_last_record (t_type, t_time) 
SELECT 'next', max(pg_catalog.pg_xact_commit_timestamp(xmin)) FROM public.testing
ON CONFLICT (t_type) DO UPDATE SET t_time = EXCLUDED.t_time;

-- Insert the new data after last timestamp up to and including next timestamp
INSERT INTO test_copy SELECT * FROM public.testing 
WHERE pg_catalog.pg_xact_commit_timestamp(xmin)  (SELECT t_time FROM test_copy_last_record WHERE t_type = 'last') 
OR (SELECT t_time FROM test_copy_last_record WHERE t_type = 'last') IS NULL);

-- Update the last timestamp
INSERT INTO test_copy_last_record (t_type, t_time) 
SELECT 'last', t_time FROM public.test_copy_last_record WHERE t_type = 'next'
ON CONFLICT (t_type) DO UPDATE SET t_time = EXCLUDED.t_time;

In Thread 1 we will finish our transaction.

INSERT INTO testing (val) VALUES ('tested');
COMMIT;

And now finish coping our new records.

-- Grabbing the latest transaction timestamp
INSERT INTO test_copy_last_record (t_type, t_time) 
SELECT 'next', max(pg_catalog.pg_xact_commit_timestamp(xmin)) FROM public.testing
ON CONFLICT (t_type) DO UPDATE SET t_time = EXCLUDED.t_time;

-- Insert the new data after last timestamp up to and including next timestamp
INSERT INTO test_copy SELECT * FROM public.testing 
WHERE pg_catalog.pg_xact_commit_timestamp(xmin)  (SELECT t_time FROM test_copy_last_record WHERE t_type = 'last') 
OR (SELECT t_time FROM test_copy_last_record WHERE t_type = 'last') IS NULL);

-- Update the last timestamp
INSERT INTO test_copy_last_record (t_type, t_time) 
SELECT 'last', t_time FROM public.test_copy_last_record WHERE t_type = 'next'
ON CONFLICT (t_type) DO UPDATE SET t_time = EXCLUDED.t_time;

Let's check our table

SELECT * FROM test_copy;
public.test_copy
id val
1 test
2 testing
3 tested

IT WORKS!!!

Note: Because pg_xact_commit_timestamp is a stable function, this means that any time you call it with the same transaction id within a transaction, it will return the same result without having to re-compute it. This means that is you have have many rows inserted by a single transaction, it will only have to lookup the value once.

Doing it faster

If the transaction id and timestamp were exposed by PostgreSQL as a system table/view, then we could left join to it and it might be faster as there would only be one scan of the table instead of many by the function being called once per row.

Universal Solutions aka Non-PostgreSQL Solutions

Yes, there is another solution that is usable across all platforms. Let's reset our test.

DROP TABLE public.test_copy;
TRUNCATE TABLE public.testing;
BEGIN;
INSERT INTO testing (val) VALUES ('test');

Please note that we are leaving the above thread "idle in transaction" on purpose.

BEGIN;
INSERT INTO testing (val) VALUES ('testing');
COMMIT;

Lets make a copy of our data, using the new records.

BEGIN;
-- Setup our two tables
CREATE TABLE public.copied_ids (
  id INTEGER,
  CONSTRAINT copied_ids_idx UNIQUE(id)
);
CREATE TABLE test_copy (LIKE public.testing);

-- Add new records
INSERT INTO test_copy
SELECT testing.* FROM public.testing LEFT JOIN copied_ids ON testing.id = copied_ids.id WHERE copied_ids.id IS NULL;

-- Add new ids that were copied, so that we don't copy them again.
INSERT INTO copied_ids
SELECT test_copy.id FROM public.test_copy LEFT JOIN copied_ids ON testing.id = copied_ids.id WHERE copied_ids.id IS NULL;

COMMIT;

In Thread 1 we will finish our transaction.

INSERT INTO testing (val) VALUES ('tested');
COMMIT;

And now finish coping our new records.

-- Add new records
INSERT INTO test_copy
SELECT testing.* FROM public.testing LEFT JOIN copied_ids ON testing.id = copied_ids.id WHERE copied_ids.id IS NULL;

-- Add new ids that were copied, so that we don't copy them again.
INSERT INTO copied_ids
SELECT test_copy.id FROM public.test_copy LEFT JOIN copied_ids ON testing.id = copied_ids.id WHERE copied_ids.id IS NULL;

Let's check our table

SELECT * FROM test_copy;
public.test_copy
id val
1 test
2 testing
3 tested

WORKS PERFECTLY!!!

If we wanted to do this with PostgreSQL specific code, we could use the following code.

-- Add new records
INSERT INTO test_copy
SELECT testing.* FROM public.testing LEFT JOIN copied_ids USING(id) WHERE copied_ids.id IS NULL;

-- Add new ids that were copied, so that we don't copy them again.
INSERT INTO copied_ids
SELECT test_copy.id FROM public.test_copy LEFT JOIN copied_ids USING (id) WHERE copied_ids.id IS NULL;

This second version appears faster in an explain, but I have not yet tested it with a large table.

-- Add new records
INSERT INTO test_copy 
SELECT testing.* FROM 
(SELECT id FROM public.testing EXCEPT SELECT id FROM public.copied_ids) AS new_ids
LEFT JOIN public.testing USING (id);

-- Add new ids that were copied, so that we don't copy them again.
INSERT INTO copied_ids
SELECT test_copy.id FROM public.test_copy LEFT JOIN copied_ids USING (id) WHERE copied_ids.id IS NULL;

Which is faster for large tables?

This needs to be tested.

Microsoft SQL Server

MS SQL has the ability to track changes. This includes INSERT, UPDATE and DELETE. Change Tracking is turned off by default. You need to turn it on for both the database and each table you wish to track.

-- SQL Server (starting with 2008) 
-- Azure SQL Database 

-- Turn on Database change tracking
ALTER DATABASE testing 
SET CHANGE_TRACKING = ON 
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON); 

-- Create our table for testing 
CREATE TABLE dbo.testing ( 
  id int NOT NULL, 
  val text NULL, 
  PRIMARY KEY CLUSTERED (id) 
) 
GO;

-- Turn on Table change tracking
ALTER TABLE dbo.testing 
ENABLE CHANGE_TRACKING 
WITH (TRACK_COLUMNS_UPDATED = ON);

Once you have done this, any new transaction can be found in the commit table.

-- SQL Server (starting with 2008)
-- Azure SQL Database
SELECT * FROM sys.dm_tran_commit_table;

-- Azure SQL Data Warehouse
-- Parallel Data Warehouse
SELECT * FROM sys.dm_pdw_nodes_tran_commit_table;
sys.dm_tran_commit_table & sys.dm_pdw_nodes_tran_commit_table
commit_ts xdes_id commit_lbn commit_csn commit_time pwd_node_id

Using one of the above tables to view the commit_ts which is assigned when the transaction commits and can be used to tell what order the transactions were committed. The commit_ts field is a sequence stored in a timestamp field and is guaranteed to be unique.

The xdes_id will be your transaction id that you will need to match up to transaction that committed each record.

You may view a list of what databases have change tracking turned on.

-- What databases have change tracking turned on 
SELECT 
  ctd.database_id, 
  d.name, 
  ctd.is_auto_cleanup_on, 
  ctd.retention_period, 
  ctd.retention_period_units, 
  ctd.retention_period_units_desc, 
  ctd.max_cleanup_version 
FROM sys.change_tracking_databases ctd 
LEFT JOIN sys.databases d ON ctd.database_id = d.database_id;
database_id name is_auto_cleanup_on retention_period retention_period_units retention_period_units_desc max_cleanup_version
5 testing 1 2 3 DAYS Null

The same permission checks are made for sys.change_tracking_databases as are made for sys.databases. If the caller of sys.change_tracking_databases is not the owner of the database, the minimum permissions that are required to see the corresponding row are ALTER ANY DATABASE or VIEW ANY DATABASE server-level permission, or CREATE DATABASE permission in the master database or current database.

You may view a list of what tables have change tracking turned on.

-- What tables have change tracking turned on 
SELECT 
  ctt.object_id, 
  t.name, 
  ctt.is_track_columns_updated_on, 
  ctt.min_valid_version, 
  ctt.begin_version, 
  ctt.cleanup_version 
FROM sys.change_tracking_tables ctt 
LEFT JOIN sys.tables t ON ctt.object_id = t.object_id;
object_id name is_track_columns_updated_on min_valid_version begin_version cleanup_version
1093578934 testing True 5 5 0

The visibility of the metadata in catalog views is limited to securables that a user either owns or on which the user has been granted some permission.

Lets insert our test data.
Note, we have left Thread 1 idle in transaction on purpose.

BEGIN TRANSACTION; 
INSERT INTO testing (id, val) VALUES ('1', 'test'); 
BEGIN TRANSACTION; 
INSERT INTO testing (id, val) VALUES ('2', 'testing'); 
COMMIT; 

We can now use CHANGETABLE, which allows us to view the changes since our last synchronization version.

-- View New Records 
DECLARE @synchronization_version BIGINT; 
DECLARE @last_synchronization_version BIGINT; 
SET @synchronization_version = CHANGE_TRACKING_CURRENT_VERSION(); 
SET @last_synchronization_version = null;

SELECT t.* 
FROM CHANGETABLE(CHANGES dbo.testing, @last_synchronization_version) 
  AS CT 
LEFT JOIN dbo.testing t ON t.id = CT.id; 

SET @last_synchronization_version = @synchronization_version; 
id val
2 testing

Lets finished transaction 1 and then check for new records again.

INSERT INTO testing (id, val) VALUES ('3', 'tested'); 
COMMIT;
-- View New Records 
SET @synchronization_version = CHANGE_TRACKING_CURRENT_VERSION(); 

SELECT t.* 
FROM CHANGETABLE(CHANGES dbo.testing, @last_synchronization_version) 
  AS CT 
LEFT JOIN dbo.testing t ON t.id = CT.id; 

SET @last_synchronization_version = @synchronization_version; 
id val
1 test
3 tested

While I was just looking for new records, it is also possible to check for updated and deleted records.

BEGIN;
INSERT INTO testing (id, val) VALUES ('4', 'tester'); 
UPDATE testing SET val = 'more testing' WHERE id = '2'; 
DELETE FROM testing WHERE id = '1'; 
COMMIT;

The SYS_CHANGE_OPERATION allows us to know if the row was Inserted, Updated, or Deleted. In addition to putting put the SYS_CHANGE_OPERATION field, we also put out the key fields of our testing table via the CHANGETABLE, this way we have the key fields values for delete rows.

-- View New Records 
SET @synchronization_version = CHANGE_TRACKING_CURRENT_VERSION(); 

SELECT t.* 
FROM CHANGETABLE(CHANGES dbo.testing, @last_synchronization_version) 
  AS CT 
LEFT JOIN dbo.testing t ON t.id = CT.id; 

SET @last_synchronization_version = @synchronization_version; 
SYS_CHANGE_OPERATION id val
D 1 Null
U 2 more testing
I 4 tester