NAME EV::Pg - asynchronous PostgreSQL client using libpq and EV SYNOPSIS use v5.10; use EV; use EV::Pg; my $pg = EV::Pg->new( conninfo => 'dbname=mydb', on_error => sub { die "PG error: $_[0]\n" }, ); $pg->on_connect(sub { $pg->query_params( 'select $1::int + $2::int', [10, 20], sub { my ($rows, $err) = @_; die $err if $err; say $rows->[0][0]; # 30 EV::break; }, ); }); EV::run; DESCRIPTION EV::Pg is a non-blocking PostgreSQL client built on top of libpq and the EV event loop. It drives the libpq async API ("PQsendQuery", "PQconsumeInput", "PQgetResult") through "ev_io" watchers on the libpq socket, so the event loop never blocks on database I/O. Features: parameterized queries, prepared statements, pipeline mode, single-row and chunked rows (libpq >= 17), COPY IN/OUT, LISTEN/NOTIFY, async cancel (libpq >= 17), structured error fields, protocol tracing, and notice handling. CALLBACKS Query callbacks always receive a single positional argument on success and "(undef, $error_message)" on error, so my ($result, $err) = @_; works for every shape: $result is the success payload, $err is defined only on error. The shape of $result depends on the query: SELECT (or single-row / chunked mode) "\@rows" -- arrayref of rows; each row is an arrayref of column values with SQL NULL mapping to Perl "undef". INSERT / UPDATE / DELETE $cmd_tuples -- the string from "PQcmdTuples" (e.g. "1", "0"). PREPARE / close_prepared / close_portal "" -- always an empty string (these commands return no row count). describe_prepared / describe_portal "\%meta" -- hashref with "nfields", "nparams", and (when non-zero) "fields" (arrayref of "{name, type}" hashes) and "paramtypes" (arrayref of OIDs). COPY "COPY_IN", "COPY_OUT", or "COPY_BOTH" -- a string tag identifying the COPY direction. pipeline_sync 1. Exceptions thrown inside callbacks are caught and reported via "warn" so that one bad callback does not derail the rest of the queue. CONSTRUCTOR new my $pg = EV::Pg->new(%args); Returns a new EV::Pg object. If "conninfo" or "conninfo_params" is supplied, an asynchronous connect starts immediately; otherwise call "connect" later. Recognized arguments: conninfo libpq connection string passed to "connect". conninfo_params Hashref of connection parameters (e.g. "{ host => 'localhost', dbname => 'mydb', port => 5432 }"), passed to "connect_params". Mutually exclusive with "conninfo". expand_dbname When true together with "conninfo_params", the "dbname" value is itself parsed as a connection string -- so "dbname => 'postgresql://host/db?sslmode=require'" works. on_connect Fires once with no arguments when the handshake completes. on_error Fires as "($error_message)" on connection-level errors. Defaults to "sub { die @_ }"; pass an explicit handler to keep the loop alive. on_notify Fires as "($channel, $payload, $backend_pid)" for LISTEN/NOTIFY messages. on_notice Fires as "($message)" for server NOTICE/WARNING messages. on_drain Fires with no arguments when the libpq send buffer has been fully flushed during a COPY -- use it to resume sending after "put_copy_data" returned 0. keep_alive When true, the connection keeps "EV::run" alive even with an empty callback queue. See "keep_alive". loop An EV loop object. Defaults to "EV::default_loop". Unknown arguments produce a "carp" warning and are otherwise ignored. CONNECTION METHODS connect $pg->connect($conninfo); Starts an asynchronous connection from a libpq connection string. "on_connect" fires on success, "on_error" on failure. connect_params $pg->connect_params(\%params); $pg->connect_params(\%params, $expand_dbname); Like "connect" but takes a hashref of keyword/value parameters. When $expand_dbname is true, the "dbname" entry may itself be a connection string or URI. reset $pg->reset; Drops the current connection and reconnects with the same parameters. Pending callbacks fire with "(undef, "connection reset")" first. Alias: "reconnect". finish $pg->finish; Closes the connection. Pending callbacks fire with "(undef, "connection finished")". Alias: "disconnect". is_connected my $bool = $pg->is_connected; True if the handshake has completed and the connection is ready for queries. False during connect, after "finish", and after a fatal error. status my $st = $pg->status; libpq connection status: "CONNECTION_OK" or "CONNECTION_BAD". Returns "CONNECTION_BAD" when not connected. QUERY METHODS query $pg->query($sql, sub { my ($result, $err) = @_; }); Sends a simple query. Multi-statement strings (e.g. "SELECT 1; SELECT 2") are accepted, but only the final result reaches the callback -- intermediate results are silently discarded, and because PostgreSQL stops at the first error, errors always arrive as that final result. Not allowed in pipeline mode -- use "query_params" there. Alias: "q". query_params $pg->query_params($sql, \@params, sub { my ($result, $err) = @_; }); Sends a parameterized query. Parameters are referenced in SQL as $1, $2, etc.; "undef" elements become SQL NULL. Values are sent in PostgreSQL's text format. Embedded NUL bytes cause the call to croak (text-format params cannot legally contain NULs) -- pass binary data through "escape_bytea" if you need a "bytea" column. Alias: "qp". prepare $pg->prepare($name, $sql, sub { my ($result, $err) = @_; }); Creates a prepared statement at the protocol level (no SQL "PREPARE" parsing). The callback receives "" on success. Alias: "prep". query_prepared $pg->query_prepared($name, \@params, sub { my ($result, $err) = @_; }); Executes a prepared statement created by "prepare". Same param rules as "query_params". Alias: "qx". describe_prepared $pg->describe_prepared($name, sub { my ($meta, $err) = @_; }); Describes a prepared statement. The callback receives a hashref with keys "nfields" and "nparams". When "nfields" is non-zero, a "fields" key is also present (arrayref of "{name, type}" hashes). When "nparams" is non-zero, a "paramtypes" key is also present (arrayref of OIDs). describe_portal $pg->describe_portal($name, sub { my ($meta, $err) = @_; }); Describes a portal. The callback receives the same hashref structure as "describe_prepared". set_single_row_mode my $ok = $pg->set_single_row_mode; Switches the most recently sent query into single-row mode. Must be called immediately after a send method ("query", "query_params", ...) and before the event loop delivers any results -- a 0 return means no query was in the right async state and should be treated as a programmer error rather than a runtime condition. The query callback then fires once per row with a single-row "\@rows" (e.g. "[[$col0, $col1, ...]]"), and once more at the end with an empty "\@rows" as the completion sentinel. set_chunked_rows_mode my $ok = $pg->set_chunked_rows_mode($chunk_size); Like "set_single_row_mode" but delivers up to $chunk_size rows per callback (requires libpq >= 17), reducing per-callback overhead for large result sets. Same call-timing constraint and same trailing empty-rows completion sentinel. close_prepared $pg->close_prepared($name, sub { my ($result, $err) = @_; }); Closes (deallocates) a prepared statement at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success. Works in pipeline mode, unlike "DEALLOCATE" SQL. close_portal $pg->close_portal($name, sub { my ($result, $err) = @_; }); Closes a portal at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success. cancel my $err = $pg->cancel; Sends a cancel request using the legacy "PQcancel" API. Blocks the event loop for one network round trip; prefer "cancel_async" on libpq >= 17. Returns "undef" on success or an error string on failure. cancel_async $pg->cancel_async(sub { my ($r, $err) = @_; }); Sends a non-blocking cancel request using the "PQcancelConn" API (requires libpq >= 17). The callback receives "(1)" on success or "(undef, $errmsg)" on failure. Croaks if a cancel is already in progress. pending_count my $n = $pg->pending_count; Number of callbacks currently in the queue (queries sent but not yet delivered). keep_alive $pg->keep_alive(1); my $bool = $pg->keep_alive; When true, the read watcher keeps "EV::run" alive even when the callback queue is empty. Required when waiting for server-side "NOTIFY" events via "on_notify" -- without this flag the loop would exit as soon as the "LISTEN" query completes. Getter/setter. skip_pending $pg->skip_pending; Drops every queued callback, invoking each with "(undef, "skipped")". Any in-flight server results are drained and discarded; the connection remains usable for new queries. PIPELINE METHODS Pipeline mode lets you send multiple queries without waiting for individual results, then receive the results in order after a sync point. Inside a pipeline you must use "query_params" or "query_prepared" -- "query" is rejected. enter_pipeline $pg->enter_pipeline; Switches the connection into pipeline mode. Croaks if there are unfinished results outstanding. exit_pipeline $pg->exit_pipeline; Returns to normal mode. Croaks if the pipeline is not idle. pipeline_sync $pg->pipeline_sync(sub { my ($r, $err) = @_; }); Sends a pipeline sync point. The callback fires with "(1)" after all preceding queries in the batch have completed, or "(undef, $errmsg)" if the connection drops first. Alias: "sync". send_pipeline_sync $pg->send_pipeline_sync(sub { my ($r, $err) = @_; }); Like "pipeline_sync" but does not flush the send buffer (requires libpq >= 17). Useful for batching multiple sync points before a single manual flush via "send_flush_request". send_flush_request $pg->send_flush_request; Asks the server to deliver results for queries sent so far -- the manual companion to "send_pipeline_sync". Alias: "flush". pipeline_status my $st = $pg->pipeline_status; One of "PQ_PIPELINE_OFF", "PQ_PIPELINE_ON", or "PQ_PIPELINE_ABORTED". COPY METHODS A "COPY" command runs in two phases: the query callback first fires with a string tag ("COPY_IN" / "COPY_OUT" / "COPY_BOTH") to signal that streaming has started, then fires a second time with the final command result (or error) when the stream ends. See eg/copy_in.pl and eg/copy_out.pl. put_copy_data my $rc = $pg->put_copy_data($data); Sends a chunk during COPY IN. Returns 1 on success (data buffered or flushed), 0 if the send buffer is full (wait for writability via "on_drain", then retry), or -1 on error. put_copy_end my $rc = $pg->put_copy_end; my $rc = $pg->put_copy_end($errmsg); Ends a COPY IN. With $errmsg aborts the COPY server-side. Same return convention as "put_copy_data". get_copy_data my $row = $pg->get_copy_data; Retrieves the next row during COPY OUT. Returns the row bytes, the integer -1 when the stream is complete, or "undef" if nothing is currently buffered (call again after the next read). HANDLER METHODS Each handler is a getter/setter: pass a coderef to install it (returning the new value), pass "undef" to clear it, or call without arguments to read the current handler. on_connect Fires once with no arguments after the handshake completes. on_error Fires as "($error_message)" for connection-level errors (handshake failure, lost socket, libpq protocol errors). Per-query errors come through the query callback, not here. on_notify Fires as "($channel, $payload, $backend_pid)" for each LISTEN/NOTIFY message. on_notice Fires as "($message)" for server NOTICE/WARNING messages. on_drain Fires with no arguments when the libpq send buffer has been fully flushed during a COPY -- use it to resume "put_copy_data" after a 0 return. CONNECTION INFO String accessors ("db", "user", "host", "hostaddr", "port", "error_message", "parameter_status", "ssl_attribute") return "undef" when not connected. Integer accessors return a default value (typically 0 or -1). Methods that require an active connection ("client_encoding", "set_client_encoding", "set_error_verbosity", "set_error_context_visibility", "conninfo") croak otherwise. error_message my $msg = $pg->error_message; Last error message from libpq. Alias: "errstr". transaction_status my $st = $pg->transaction_status; One of "PQTRANS_IDLE", "PQTRANS_ACTIVE", "PQTRANS_INTRANS", "PQTRANS_INERROR", "PQTRANS_UNKNOWN". Alias: "txn_status". parameter_status my $val = $pg->parameter_status($name); Server parameter value (e.g. "server_version", "client_encoding", "server_encoding"). backend_pid my $pid = $pg->backend_pid; Backend process ID. Alias: "pid". server_version my $ver = $pg->server_version; Server version as a packed integer: "MMmmpp" (major * 10000 + minor * 100 + patch) for releases before 10, "MM0000 + patch" from 10 onwards. PostgreSQL 18.0 returns 180000; 17.5 returns 170005. protocol_version my $ver = $pg->protocol_version; Frontend/backend protocol version (typically 3). db my $dbname = $pg->db; Database name. user my $user = $pg->user; Connected user name. host my $host = $pg->host; Server host as supplied to "connect" (may be a hostname or socket dir). hostaddr my $addr = $pg->hostaddr; Server IP address. port my $port = $pg->port; Server port. socket my $fd = $pg->socket; Underlying socket file descriptor (for advanced uses such as installing your own watcher). Returns -1 when not connected. ssl_in_use my $bool = $pg->ssl_in_use; True if the connection is encrypted with SSL. ssl_attribute my $val = $pg->ssl_attribute($name); SSL attribute (e.g. "protocol", "cipher", "key_bits"). ssl_attribute_names my $names = $pg->ssl_attribute_names; Arrayref of available SSL attribute names, or "undef" if the connection does not use SSL. client_encoding my $enc = $pg->client_encoding; Current client encoding name (e.g. "UTF8"). set_client_encoding $pg->set_client_encoding($encoding); Sets the client encoding (e.g. "UTF8", "SQL_ASCII"). This is a synchronous (blocking) call that stalls the event loop for one server round trip, so it is best invoked right after "on_connect" fires and before any queries are dispatched. Croaks if there are pending queries or on failure. set_error_verbosity my $old = $pg->set_error_verbosity($level); Sets error verbosity. $level is one of "PQERRORS_TERSE", "PQERRORS_DEFAULT", "PQERRORS_VERBOSE", or "PQERRORS_SQLSTATE". Returns the previous setting. set_error_context_visibility my $old = $pg->set_error_context_visibility($level); Sets error context visibility. $level is one of "PQSHOW_CONTEXT_NEVER", "PQSHOW_CONTEXT_ERRORS" (default), or "PQSHOW_CONTEXT_ALWAYS". Returns the previous setting. error_fields my $fields = $pg->error_fields; Returns a hashref of structured error fields from the most recent "PGRES_FATAL_ERROR" result, or "undef" if no fatal error has been seen. Persists until the next fatal error; successful queries do not clear it. Each key is present only when the corresponding field is non-NULL in the server response: sqlstate severity primary detail hint position context schema table column datatype constraint internal_position internal_query source_file source_line source_function result_meta my $meta = $pg->result_meta; Returns a hashref of metadata for the most recent query result, or "undef" if no result has been delivered. Refreshed by every successful result (including commands with no columns) but not by errors, COPY, or pipeline sync results -- so after an error this returns metadata for the last successful query and you should check $err before relying on it. Cleared by "reset"/"finish". Keys: nfields number of columns cmd_status command status string (e.g. "SELECT 3", "INSERT 0 1") inserted_oid OID of inserted row -- only present for single-row INSERTs that generated an OID (legacy WITH OIDS tables); absent for normal INSERTs and other commands fields arrayref of column metadata hashrefs: name, type (OID), ftable (OID), ftablecol, fformat (0=text, 1=binary), fsize, fmod conninfo my $info = $pg->conninfo; Returns a hashref of the connection parameters actually used by the live connection (keyword => value pairs). connection_used_password my $bool = $pg->connection_used_password; Returns 1 if the connection authenticated with a password. connection_used_gssapi my $bool = $pg->connection_used_gssapi; Returns 1 if the connection used GSSAPI authentication. connection_needs_password my $bool = $pg->connection_needs_password; Returns 1 if the server requested a password during authentication. trace $pg->trace($filename); Enables libpq protocol tracing, writing the wire-level frontend/backend exchange to $filename. Croaks if the file cannot be opened. untrace $pg->untrace; Stops tracing and closes the trace file. Safe to call when tracing is not active. set_trace_flags $pg->set_trace_flags($flags); Sets the trace output style. $flags is a bitmask of "PQTRACE_SUPPRESS_TIMESTAMPS" and/or "PQTRACE_REGRESS_MODE" (handy when diffing traces). UTILITY METHODS escape_literal my $quoted = $pg->escape_literal($string); Quotes and escapes a string for safe interpolation into SQL (wraps the value in single quotes and doubles internal quotes). Alias: "quote". escape_identifier my $quoted = $pg->escape_identifier($string); Quotes and escapes an identifier for safe interpolation into SQL (wraps in double quotes). Alias: "quote_id". escape_bytea my $escaped = $pg->escape_bytea($binary); Escapes binary bytes into the textual "bytea" form expected by the server (the "\x..." hex notation). Pair with "unescape_bytea" to go the other way. encrypt_password my $hashed = $pg->encrypt_password($password, $user); my $hashed = $pg->encrypt_password($password, $user, $algorithm); Hashes a password client-side (so the cleartext never reaches the server) ready to be passed to "ALTER ROLE ... PASSWORD". $algorithm is optional; when omitted the server's "password_encryption" setting decides (typically "scram-sha-256"). unescape_bytea my $binary = EV::Pg->unescape_bytea($escaped); Class method. Decodes the textual "bytea" form back to raw bytes. lib_version my $ver = EV::Pg->lib_version; Class method. Returns the libpq version as an integer (same encoding as "server_version"; e.g. 170000 for libpq 17.0). conninfo_parse my $params = EV::Pg->conninfo_parse($conninfo); Class method. Parses a connection string and returns a hashref of the recognized keyword/value pairs, croaking if the string is malformed. Handy for validating connection strings before connecting. ALIASES Short aliases for common methods: q query qp query_params qx query_prepared prep prepare reconnect reset disconnect finish flush send_flush_request (libpq >= 17) sync pipeline_sync quote escape_literal quote_id escape_identifier errstr error_message txn_status transaction_status pid backend_pid EXPORT TAGS :status PGRES_* result status constants :conn CONNECTION_OK, CONNECTION_BAD :transaction PQTRANS_* transaction status constants :pipeline PQ_PIPELINE_* pipeline status constants :verbosity PQERRORS_* verbosity constants :context PQSHOW_CONTEXT_* context visibility constants :trace PQTRACE_* trace flag constants :all all of the above BENCHMARK 500k queries over Unix socket, PostgreSQL 18, libpq 18: Workload EV::Pg sequential EV::Pg pipeline DBD::Pg sync DBD::Pg async+EV SELECT 83,998 q/s 144,939 q/s 73,195 q/s 65,966 q/s INSERT 67,053 q/s 85,701 q/s 60,127 q/s 58,329 q/s UPSERT 37,360 q/s 43,019 q/s 40,278 q/s 40,173 q/s Sequential mode uses prepared statements (parse once, bind+execute per call). Pipeline mode batches queries with "pipeline_sync" every 1000 queries. See bench/bench.pl to reproduce. REQUIREMENTS libpq >= 14 (PostgreSQL client library) and EV. A handful of features -- chunked rows mode, "close_prepared"/"close_portal", "send_pipeline_sync"/"send_flush_request", and "cancel_async" -- require libpq >= 17 and degrade gracefully when not available (the methods are simply not defined). SEE ALSO EV, DBD::Pg, Mojo::Pg, AnyEvent::Pg LICENSE This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.