Skip to content

Instantly share code, notes, and snippets.

@brandur
Created May 1, 2026 17:13
Show Gist options
  • Select an option

  • Save brandur/4962c085d1a790bf70703a5048918e2e to your computer and use it in GitHub Desktop.

Select an option

Save brandur/4962c085d1a790bf70703a5048918e2e to your computer and use it in GitHub Desktop.
River readonly check
package river_test
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"time"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
)
// Example_readOnlyCheck demonstrates how to run a background goroutine
// alongside a River client that periodically verifies the database still
// supports write operations. If a write probe fails with PostgreSQL error
// 25006 (read_only_sql_transaction), the process exits immediately. This is
// useful for detecting failover to a read replica that hasn't been promoted.
func Example_readOnlyCheck() {
ctx := context.Background()
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
river.AddWorker(workers, &NoOpWorker{})
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
})
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Start a background goroutine that periodically checks whether the
// database is still writable. In a real application, the check interval
// would be longer (e.g. 30 seconds), and the goroutine would run for the
// lifetime of the process.
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Use a lightweight write operation to verify the database
// accepts writes. A temporary table scoped to the transaction
// avoids any lasting side effects.
if _, err := dbPool.Exec(ctx, "CREATE TEMP TABLE river_write_check () ON COMMIT DROP"); err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == pgerrcode.ReadOnlySQLTransaction {
fmt.Fprintf(os.Stderr, "river: read-only transaction error, exiting: %s\n", err)
os.Exit(1)
}
}
}
}
}()
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
// Out of example scope, but used to wait until a job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()
_, err = riverClient.Insert(ctx, NoOpArgs{}, nil)
if err != nil {
panic(err)
}
// Wait for jobs to complete. Only needed for purposes of the example test.
riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
fmt.Printf("Client stopped\n")
// Output:
// NoOpWorker.Work ran
// Client stopped
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment