Writing Data from Postgres to MongoDB
"Last time I had data like this, I stored the data in MongoDB." --Me, last week.
I told this to a friend while talking through some of their data problems. As Craig likes to say, Crunchy Data is "Just Postgres", but we also know there are some clear cut use cases where Postgres isn’t the perfect tool for the job. Don’t get us wrong, Postgres is an amazing database and we strongly believe what you should start with for most applications, but that doesn’t mean you can’t benefit from more than one database.
If you missed the announcement recently, we launched support for the Mongo Foreign Data Wrapper on Crunchy Bridge. If you’re wondering why? Well let’s look at a practical use case.
The Use Case
For that use case, we used Postgres to back purchase data for carbon removal projects, and MongoDB to serve the public presentation of that data. The data flow looks something like below.
In SQL, the data for this purchase was distributed across a few tables, so the query required a multiple joins across tables called purchases, line items, credits, and invoices. But, once a purchase was completed, it never changed. Thus, we would take the output of the purchase query, package it up into JSON, and store it in MongoDB in a single document. Then, we handed a receipt token back to the end-user to fetch the purchase data later.
Why use two different databases?
By targeting different use cases to different databases, you can maintain some flexibility in how you work. You can target the fast, complex data to a database with more processing power.
The Code
Below, we’ll walk through using a trigger in Postgres to write the code from Postgres to MongoDB using the MongoDB FDW.
To get started, you’ll need a Crunchy Bridge Postgres database and a MongoDB Atlas database. I’ll reference each of them below.
We have some SQL data and structure files here. Load the structure, then the data files, and then this tutorial will show how to flow data from Postgres to MongoDB.
Load the Postgres data structures
Below are some tables we’ll use to provide a tangible example of the capabilities.
CREATE TABLE public.accounts (
id SERIAL PRIMARY KEY,
name character varying,
created_at timestamp(6) without time zone NOT NULL,
updated_at timestamp(6) without time zone NOT NULL
);
CREATE TABLE public.carbon_removal_projects (
id SERIAL PRIMARY KEY,
name character varying,
description text,
originating_organization character varying,
registry_name character varying,
registry_id character varying,
created_at timestamp(6) without time zone NOT NULL,
updated_at timestamp(6) without time zone NOT NULL
);
CREATE TABLE public.purchases (
id SERIAL PRIMARY KEY,
account_id integer,
tranche_id integer,
amount_in_cents integer,
amount_of_carbon_in_kg integer,
public_url_token character varying,
synced_to_mongo_at timestamp(6) without time zone,
created_at timestamp(6) without time zone NOT NULL,
updated_at timestamp(6) without time zone NOT NULL
);
CREATE TABLE public.tranches (
id SERIAL PRIMARY KEY,
carbon_removal_project_id integer,
vintage character varying,
cost_per_kg_in_cents integer,
carbon_available_in_kg integer,
created_at timestamp(6) without time zone NOT NULL,
updated_at timestamp(6) without time zone NOT NULL
);
To piece together the tables into conceptual space, below is a diagram with explanations:
- Carbon Removal Projects are created that contain information about the project, such as location, technology used, and the organization performing the removal.
- Tranches are created that represent blocks of carbon removed from the atmosphere. So, last month they may have removed 250kg, and the prior months 250kg and 50kg respectively.
- An account performs a purchase of a certain amount of carbon removal, which gets matched with a tranche and a carbon removal project. With this function, we can never have the amount of carbon purchased from a tranche greater than the amount of carbon removed by the tranche.
Create the query to generate JSON
With the above schema, we’ll want to join the purchases, tranches, and carbon_removal_projects data into a reasonable JSON object. Below we use some of Postgres’ JSON builder functions to create the object:
SELECT
purchases.public_url_token,
json_build_object(
'account', json_build_object('name', accounts.name),
'project', row_to_json(carbon_removal_projects.*),
'tranche', row_to_json(tranches.*),
'carbon_offset_in_kg', purchases.amount_of_carbon_in_kg
)
FROM purchases
INNER JOIN accounts ON purchases.account_id = accounts.id
INNER JOIN tranches ON purchases.tranche_id = tranches.id
INNER JOIN carbon_removal_projects ON tranches.carbon_removal_project_id = carbon_removal_projects.id
WHERE purchases.id = 1;
Run this query against that dataset, and you’ll see it builds a JSON object. We’ll write that JSON object to MongoDB.
Create foreign table
Then, we’ll configure our Mongo foreign data table that allows us to write from
Postgres to MongoDB. The following code assumes that you have already run
CREATE EXTENSION
and CREATE SERVER
and CREATE USER MAPPING
to configure
the MongoFDW.
For a tutorial defining connecting to Mongo, checkout the first post in this series.
-- see link above for connection information if this command fails
CREATE FOREIGN TABLE mongo_purchases
(
_id name,
public_url_token character varying,
purchase_data json, -- jsonb not allowed
created_at timestamp
)
SERVER atlas_server
OPTIONS (database 'carbon_removal_purchases', collection 'purchases');
Create a trigger
On insert, we’ll package up a document and write it to the mongo_purchases
table. We’ll use the above query to write it to the foreign table:
CREATE OR REPLACE FUNCTION purchases_insert_trigger()
RETURNS TRIGGER AS $$
BEGIN
-- Insert the new row into the mongo_purchases table
INSERT INTO mongo_purchases (public_url_token, purchase_data, created_at)
VALUES (NEW.public_url_token,
json_build_object(
'account', json_build_object('name', (SELECT name FROM accounts WHERE accounts.id = NEW.account_id LIMIT 1)),
'project', (SELECT row_to_json(carbon_removal_projects.*) FROM carbon_removal_projects INNER JOIN tranches ON tranches.carbon_removal_project_id = carbon_removal_projects.id WHERE tranches.id = NEW.tranche_id LIMIT 1),
'tranche', (SELECT row_to_json(tranches.*) FROM tranches WHERE tranches.id = NEW.tranche_id LIMIT 1),
'carbon_offset_in_kg', NEW.amount_of_carbon_in_kg
),
NEW.created_at);
UPDATE purchases SET synced_to_mongo_at = now() WHERE id = NEW.id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER purchases_insert
AFTER INSERT ON purchases
FOR EACH ROW
EXECUTE FUNCTION purchases_insert_trigger();
This function builds the JSON object based on the newly inserted purchases row.
Then, it writes that object to the mongo_purchases
table, which actually
resides in the Mongo Atlas database.
At the end, it sets a timestamp for synced to Mongo. This allows us to sync all existing records to Mongo with the following instructions!
Sync existing data
We have a way to sync all new purchases rows, but what do we need to do for the existing rows? Be sure to load the the data.sql file from above, it’ll give you some real data to play with. Run the following to insert existing rows:
WITH unsynced_purchases AS ( -- #1
UPDATE purchases
SET synced_to_mongo_at = now()
WHERE synced_to_mongo_at < updated_at
OR synced_to_mongo_at IS NULL
RETURNING id
)
INSERT INTO mongo_purchases (public_url_token, purchase_data, created_at) -- #2
SELECT -- #3
purchases.public_url_token,
json_build_object( -- #4
'account', json_build_object('name', accounts.name),
'project', row_to_json(carbon_removal_projects.*),
'tranche', row_to_json(tranches.*),
'carbon_offset_in_kg', purchases.amount_of_carbon_in_kg
),
purchases.created_at
FROM purchases
INNER JOIN unsynced_purchases ON purchases.id = unsynced_purchases.id -- #4
INNER JOIN accounts ON purchases.account_id = accounts.id
INNER JOIN tranches ON purchases.tranche_id = tranches.id
INNER JOIN carbon_removal_projects ON tranches.carbon_removal_project_id = carbon_removal_projects.id;
What is this magic? I’m quite proud of this query, so let me unwind it for you based on the numbers notated above:
UPDATE
purchases that have not synced since being updated, and return the ids- Define an
INSERT
statement for the values returned by - Use the
SELECT
statement that we built previously - Filter by the
unsynched_purchases
as defined in the CTE
Insert data and watch it flow
Run the following in your Mongo, and you’ll see the latest purchase data:
db.purchases.find().sort({created_at: -1}).limit(1)
If you followed the tutorial above, then you should see the same in Postgres, when you run. This proves that our data synced correctly. If you run any count queries, you’ll see that also matches.
SELECT * FROM purchases ORDER BY created_at DESC LIMIT 1;
Now, to test the insert, run the following:
INSERT INTO purchases (
account_id,
tranche_id,
amount_in_cents,
amount_of_carbon_in_kg,
public_url_token,
created_at,
updated_at
) VALUES (
1,
1,
50,
0.2,
gen_random_uuid(),
now(),
now()
) returning *;
Then, when querying MongoDB, you’ll see the new document:
db.purchases.find().sort({created_at: -1}).limit(1)
Summary
- Using different databases can give you flexibility and agility with your data strategy.
- In a multi-database environment, you can use a relational database and a NoSQL one, each for their specific use case
- With a tool like the Postgres MongoDB Foreign Data Wrapper, you can use easily sync data from Postgres to MongoDB
- Combining the Foreign Data Wrapper with JSON creation and Postgres Triggers will allow you to keep both data stores in sync