At Stack Builders, we're constantly thinking out of the box while looking for pragmatic software solutions. We're about to see how CDC is important and can help you interact with data change much more smoothly, reducing this practice's friction (might take architectural aspects into account). It hasn't always been clear how straightforward it can be to use within the Elixir ecosystem, and we're going to demonstrate one of the simplest ways we can easily achieve it.
To experiment with this, we will see how a simple TODO would be done on top of CDC in conjunction with the Phoenix ecosystem. The main goal is to visualize how this approach fits in many other scenarios and introduce you to these powerful technologies.
Setup Phoenix
Assuming you already have Elixir and PostgreSQL installed on your machine, you might consider using the below docker-compose structure used for this Demo:
version: "3.4"
volumes:
cdc_demo:
services:
postgres:
container_name: cdc_demo_postgresql
image: postgres:12.0-alpine
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=cdc_demo_dev
ports:
- "5432:5432"
volumes:
- cdc_demo:/var/lib/postgresql/data
logging:
options:
max-file: "5"
max-size: "10m"
Let's install Phoenix:
mix archive.install hex phx_new
Now, after choosing your projects directory, we will generate a new Phoenix (Live) project:
- When asked if you want to install the dependencies type Y
# In this case, I choose cdc_demo but feel free to choose any other name you'd like to.
mix phx.new cdc_demo --live
Following the next instructions suggested by the generator, let's configure the PostgreSQL connection in
config/dev.exs
using your own Database credentials:
# Configure your database
# If you choose to use the Docker compose suggested above, no changes are needed.
config: cdc_demo, CdcDemo.Repo,
username: "postgres",
password: "postgres",
database: "cdc_demo_dev",
hostname: "localhost",
show_sensitive_data_on_connection_error: true,
pool_size: 10
To save us time, we're going to use a Phoenix generator to generate the TODO scaffolding, which is going to generate most pieces we need to have the TODO CRUD:
mix phx.gen.html Todos Todo todos todo:text marked_as_done:boolean --table todos --web Todos
As suggested by the output, we should run the migration:
mix ecto.migrate
And update our router lib/cdc_demo_web/router.ex
scope "/todos", CdcDemoWeb.Todos, as: :todos do
pipe_through :browser
resources "/todos", TodoController
end
If everything went well, you should be able to find the todos table on your database, and you should be able to see all tests passing:
mix test
...........
Finished in 0.5 seconds
19 tests, 0 failures
Randomized with seed 345370
Try starting your Phoenix server using mix:
mix phx.server
Then, access http://localhost:4000/ on your preferred web browser.
Do not proceed if you can't see the Phoenix Welcome page.
We should also be able to access the TODOs page http://localhost:4000/todos/todos.
Try playing with it to see how things work.
CDC
"...change data capture (CDC) is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data.” - Wikipedia.
CDC has been applied in an assortment of different and complex scenarios, for example, auditing, data-replication, analytics systems, and many others. However, we're going to use this simple TODO example to learn a bit more about this technology.
Capturing changes
This post will be using the Cainophile library to simplify the setup on how the events are exposed from PostgreSQL to the app layer.
First, let's prepare our Database. For that, we need to create a PostgreSQL publication only for the todos table:
mix ecto.gen.migration create_todos_publication
Replace the file content with the following:
priv/repo/migrations/20210314133342_create_todos_publication.exs
The timestamp at the beginning will be different
defmodule CdcDemo.Repo.Migrations.CreateTodosPublication do
use Ecto.Migration
def up do
execute("CREATE PUBLICATION todos_publication FOR TABLE todos;")
end
def down do
execute("DROP PUBLICATION todos_publication;")
end
end
Now, let's set todos table replica identity to FULL:
mix ecto.gen.migration change_todos_replica_identity_to_full
We should also replace its content:
priv/repo/migrations/20210314133715_change_todos_replica_identity_to_full.exs
The timestamp at the beginning will be different
defmodule CdcDemo.Repo.Migrations.ChangeTodosReplicaIdentityToFull do
use Ecto.Migration
def up do
execute("ALTER TABLE todos REPLICA IDENTITY FULL;")
end
def down do
execute("ALTER TABLE todos REPLICA IDENTITY DEFAULT;")
end
Finally, we need to enable PostgreSQL logical replication.
mix ecto.gen.migration alter_system_wal_level_to_logical
defmodule CdcDemo.Repo.Migrations.AlterSystemWalLevelToLogical do
use Ecto.Migration
@disable_ddl_transaction true # needed to avoid ALTER SYSTEM cannot run inside a transaction block
def change do
execute("ALTER SYSTEM SET wal_level = 'logical';")
end
end
Let's apply the new migrations:
mix ecto.migrate
You should be able to find the todos_publication and the todos table replica identity on your own PostgreSQL instance:
SELECT * FROM pg_publication WHERE pubname = 'todos_publication';
SELECT relreplident FROM pg_class WHERE oid = 'todos'::regclass;
We need to restart the PostgreSQL server to apply the wal_level. If you're using the suggested Docker compose, simply run:
docker-compose restart
After restarting the PostgreSQL server, you should be able to see the new value of wal_level.
show "wal_level"; -- logical
Now, PostgreSQL is prepared and ready for logical replication, so let's set up our App.
First, we need to add Cainophile to our project dependencies:
mix.exs
{:cainophile, "~> 0.1.0"}
And install it by running:
mix deps.get
A quick break to explain the strategy we're going to use in the post with Cainophile to listen to events. As described on the Cainophile doc page, there are several ways we could approach this within a Phoenix app. We’re going to use a simple version of a GenServer (no worries if you're not familiar with the GenServer concept; for now, just think about it as an Event Subscriber - for this case).
We already have our TODO CRUD working on Phoenix, and now we will build an isolated
Phoenix Live View screen for the reading part to
demonstrate how we'd use PostgreSQL CDC in this case. First, let's add the required Cainophile config to our config/dev.exs
.
config :cdc_demo, Cainophile.Adapters.Postgres,
register: Cainophile.CdcDemoPublisher, # name this process will be registered globally as, for usage with Cainophile.Adapters.Postgres.subscribe/2
epgsql: %{
host: 'localhost',
username: "postgres",
database: "cdc_demo_dev",
password: "postgres"
},
slot: :temporary,
wal_position: {"0", "0"},
publications: ["todos_publication"]
Then, we will add the subscriber (GenServer) that will subscribe to the registered Cainophile process (which will be added to the App Supervision tree very shortly) and will be listening to Database changes on the todos table:
lib/cdc_demo/todos/subscriber.ex
defmodule CdcDemo.Todos.Subscriber do
use GenServer
alias Cainophile.{Adapters, Changes}
def start_link(_) do
GenServer.start_link(__MODULE__, %{})
end
@impl true
def init(state) do
Adapters.Postgres.subscribe(register(), self()) # subscribe to the Cainophile process in order to receive "todos" events (see handle_info bellow)
{:ok, state}
end
@impl true
def handle_info(
%Changes.Transaction{
changes: changes
},
state
) do
IO.inspect changes # temporary debug code
{:noreply, state}
end
@impl true
def handle_info(_, state), do: {:noreply, state}
defp register do
:cdc_demo
|> Application.fetch_env!(Cainophile.Adapters.Postgres)
|> Keyword.fetch!(:register) # this is the register we set above in config/dev.exs
end
end
Next, we need to add the GenServer under our application's supervision tree:
lib/application.ex
children = [
# ...
CdcDemoWeb.Endpoint,
{
Cainophile.Adapters.Postgres,
Application.fetch_env!(:cdc_demo, Cainophile.Adapters.Postgres)
},
CdcDemo.Todos.Subscriber
]
The server should be running fine, and if you add the Cainophile config to config/test.exs as we've done for dev,
the tests should also be passing (Note you need to set the correct database for the test, e.g.,
"cdc_demo_test#{System.get_env("MIX_TEST_PARTITION")}")
So, let's see it in action!
Start the Phoenix server via IEX:
iex -S mix phx.server
Once it starts, try creating a TODO in the IEX prompt:
iex(3)> CdcDemo.Todos.create_todo(%{todo: "testing"})
If the TODO was successfully created, you should be able to see the inspect we added on our GenServer:
[
%Cainophile.Changes.NewRecord{
record: %{
"id" => "1",
"inserted_at" => "2021-03-14 14:54:26",
"marked_as_done" => "f",
"todo" => "testing",
"updated_at" => "2021-03-14 14:54:26"
},
relation: {"public", "todos"}
}
]
Let's try to update an existing TODO:
iex(4)> 1 |> CdcDemo.Todos.get_todo!() |> CdcDemo.Todos.update_todo(%{todo: "testing again"})
As expected, we should also see something similar to:
[
%Cainophile.Changes.UpdatedRecord{
old_record: %{
"id" => "1",
"inserted_at" => "2021-03-14 14:54:26",
"marked_as_done" => "f",
"todo" => "testing",
"updated_at" => "2021-03-14 14:54:26"
},
record: %{
"id" => "1",
"inserted_at" => "2021-03-14 14:54:26",
"marked_as_done" => "f",
"todo" => "testing again",
"updated_at" => "2021-03-14 14:59:23"
},
relation: {"public", "todos"}
}
]
As we saw above, the struct Cainophile.Changes.NewRecord represents an inserted record and the struct Cainophile.Changes.UpdatedRecord represents an updated record. Both are Cainophile internal data structures.
To make things even simpler, we're going to use the existing LiveView components that Phoenix generated.
Replace lib/cdc_demo_web/live/page_live.html.leex content
with the following:
<%= for todo <- @todos do %>
<h1><%= todo.todo %></h1>
<% end %>
We're just looping through a variable @todos and displaying its todo property.
Now, let's replace the lib/cdc_demo_web/live/page_live.ex
with the following:
defmodule CdcDemoWeb.PageLive do
use CdcDemoWeb, :live_view
alias Phoenix.Socket.Broadcast
@impl true
def mount(_params, _session, socket) do
if connected?(socket) do
Phoenix.PubSub.subscribe(CdcDemo.PubSub, "todos")
end
todos = CdcDemo.Todos.list_todos()
{:ok, assign(socket, todos: todos)}
end
@impl true
def handle_info(%Cainophile.Changes.Transaction{
changes: [
%Cainophile.Changes.NewRecord{
record: new_todo,
relation: {"public", "todos"}
}
]
}, state) do
CdcDemoWeb.Endpoint.broadcast_from!(self(), "todos", "new_todo", %{todo: new_todo})
{:noreply, state}
end
end
As you probably guessed, we are subscribing to a topic todos in the Phoenix PubSub and initializing the view todos state with all existing todos. Then, we handle a Broadcast that matches the event "new_todo" to append the current Socket "todos" assignment with the incoming new todo.
Running the server using IEX (as already done above) we can access our project home page at http://localhost:4000/ and test the live view by sending a new todo by broadcasting a fake message:
It should be something similar to:
Finally, we're going to connect our GenServer Subscriber to our Live View, by replacing its handle_info method with the following:
@impl true
def handle_info(%Cainophile.Changes.Transaction{
changes: [
%Cainophile.Changes.NewRecord{
record: new_todo,
relation: {"public", "todos"}
}
]
}, state) do
CdcDemoWeb.Endpoint.broadcast_from!(self(), "todos", "new_todo", %{todo: new_todo})
{:noreply, state}
end
As you see, now we match the NewRecord
struct, expecting only inserted todos
and we broadcast the incoming one to the live view topic.
*Side Note: You might ask yourself why we need this entire Cainophile structure at all. Why not just broadcast it right after persisting the TODO on the Todos module? So, this approach gives us some benefits like better isolation. It reduces the complexity while testing a business module. It’s a much more agnostic architecture, and it allows us to listen for Database changes produced by a third-party system. Imagine an existing app with lots of modules dealing with the TODO table so that this approach would speed up the process, and you wouldn't even need to change the existing modules. In general, the standalone approach proposed here is just another way of approaching the problem.*
Restart the server and try creating another todo, after accessing the home page:
(1)> CdcDemo.Todos.create_todo(%{todo: "magic!"})
I want to challenge you to add the remaining event to our Subscriber and the Page Live View to handle update and deletion operations, then access simultaneously (side-by-side) the home (view page) and the new todo page (http://localhost:4000/ - http://localhost:4000/todos/todos/new).
Optional
If you want all tests to be passing, simply replace test/cdc_demo_web/live/page_live_test.exs with the following:
defmodule CdcDemoWeb.PageLiveTest do
use CdcDemoWeb.ConnCase
import Phoenix.LiveViewTest
test "disconnected and connected render", %{conn: conn} do
{:ok, page_live, disconnected_html} = live(conn, "/")
assert disconnected_html =~ "Get Started"
refute render(page_live) =~ "Get Started"
end
test "render todos", %{conn: conn} do
{:ok, todo1} = CdcDemo.Todos.create_todo(%{todo: "first todo"})
{:ok, todo2} = CdcDemo.Todos.create_todo(%{todo: "second todo"})
{:ok, page_live, _} = live(conn, "/")
render_content = render(page_live)
assert render_content =~ todo1.todo
assert render_content =~ todo2.todo
end
end
Conclusion
This tutorial leveraged a simple TODO as an experiment for powerful tools like PostgreSQL logical replication and the Phoenix ecosystem. Today, we saw how simple the setup can be for such powerful technology and how great Phoenix is. With this simple example, we can now extend the usage of both to any scenario we find helpful.
There are several other similar options on PostgreSQL; if you get interested, here are some options that might be nice to check: Triggers, PG Notify, Pg Subscription, PgAudit, and the native pg_statement.
The code used in this post can be found at cdc_demo.
For more content check our other blogs.
Thanks for reading this tutorial! If you have any feedback, please feel free to drop us a line on Twitter, LinkedIn or Facebook. You could also open issues and pull requests on GitHub.