|
| 1 | +package postgres |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + |
| 6 | + "github.com/jackc/pgx/v5" |
| 7 | + "github.com/jackc/pgx/v5/pgxpool" |
| 8 | +) |
| 9 | + |
| 10 | +// ----------------------------------------------------------------------------- |
| 11 | + |
| 12 | +// Conn encloses a single connection object. |
| 13 | +type Conn struct { |
| 14 | + db *Database |
| 15 | + conn *pgxpool.Conn |
| 16 | +} |
| 17 | + |
| 18 | +// ----------------------------------------------------------------------------- |
| 19 | + |
| 20 | +// DB returns the underlying database driver. |
| 21 | +func (c *Conn) DB() *Database { |
| 22 | + return c.db |
| 23 | +} |
| 24 | + |
| 25 | +// Exec executes an SQL statement within the single connection. |
| 26 | +func (c *Conn) Exec(ctx context.Context, sql string, args ...interface{}) (int64, error) { |
| 27 | + affectedRows := int64(0) |
| 28 | + ct, err := c.conn.Exec(ctx, sql, args...) |
| 29 | + if err == nil { |
| 30 | + affectedRows = ct.RowsAffected() |
| 31 | + } |
| 32 | + return affectedRows, c.db.processError(err) |
| 33 | +} |
| 34 | + |
| 35 | +// QueryRow executes a SQL query within the single connection. |
| 36 | +func (c *Conn) QueryRow(ctx context.Context, sql string, args ...interface{}) Row { |
| 37 | + return &rowGetter{ |
| 38 | + db: c.db, |
| 39 | + row: c.conn.QueryRow(ctx, sql, args...), |
| 40 | + } |
| 41 | +} |
| 42 | + |
| 43 | +// QueryRows executes a SQL query within the single connection. |
| 44 | +func (c *Conn) QueryRows(ctx context.Context, sql string, args ...interface{}) Rows { |
| 45 | + rows, err := c.conn.Query(ctx, sql, args...) |
| 46 | + return &rowsGetter{ |
| 47 | + db: c.db, |
| 48 | + ctx: ctx, |
| 49 | + rows: rows, |
| 50 | + err: err, |
| 51 | + } |
| 52 | +} |
| 53 | + |
| 54 | +// Copy executes a SQL copy query within the single connection. |
| 55 | +func (c *Conn) Copy(ctx context.Context, tableName string, columnNames []string, callback CopyCallback) (int64, error) { |
| 56 | + n, err := c.conn.CopyFrom( |
| 57 | + ctx, |
| 58 | + pgx.Identifier{tableName}, |
| 59 | + columnNames, |
| 60 | + ©WithCallback{ |
| 61 | + ctx: ctx, |
| 62 | + callback: callback, |
| 63 | + }, |
| 64 | + ) |
| 65 | + |
| 66 | + // Done |
| 67 | + return n, c.db.processError(err) |
| 68 | +} |
| 69 | + |
| 70 | +// WithinTx executes a callback function within the context of a single connection. |
| 71 | +func (c *Conn) WithinTx(ctx context.Context, cb WithinTxCallback) error { |
| 72 | + innerTx, err := c.conn.BeginTx(ctx, pgx.TxOptions{ |
| 73 | + IsoLevel: pgx.ReadCommitted, //pgx.Serializable, |
| 74 | + AccessMode: pgx.ReadWrite, |
| 75 | + DeferrableMode: pgx.NotDeferrable, |
| 76 | + }) |
| 77 | + if err == nil { |
| 78 | + err = cb(ctx, Tx{ |
| 79 | + db: c.db, |
| 80 | + tx: innerTx, |
| 81 | + }) |
| 82 | + if err == nil { |
| 83 | + err = innerTx.Commit(ctx) |
| 84 | + if err != nil { |
| 85 | + err = newError(err, "unable to commit db transaction") |
| 86 | + } |
| 87 | + } |
| 88 | + if err != nil { |
| 89 | + _ = innerTx.Rollback(context.Background()) // Using context.Background() on purpose |
| 90 | + } |
| 91 | + } else { |
| 92 | + err = newError(err, "unable to start transaction") |
| 93 | + } |
| 94 | + return c.db.processError(err) |
| 95 | +} |
0 commit comments