Skip to content

Add BYTEA support #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions examples/5-bytea.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env php
<?php

require \dirname(__DIR__) . '/vendor/autoload.php';

use Amp\Postgres\ByteA;
use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresConnectionPool;

$config = PostgresConfig::fromString('host=localhost user=postgres');
$pool = new PostgresConnectionPool($config);

$pool->query('DROP TABLE IF EXISTS test');

$transaction = $pool->beginTransaction();

$transaction->query('CREATE TABLE test (value BYTEA)');

$statement = $transaction->prepare('INSERT INTO test VALUES (?)');

$statement->execute([new ByteA($a = \random_bytes(10))]);
$statement->execute([new ByteA($b = \random_bytes(10))]);
$statement->execute([new ByteA($c = \random_bytes(10))]);

$result = $transaction->execute('SELECT * FROM test WHERE value = :value', ['value' => new ByteA($a)]);

foreach ($result as $row) {
assert($row['value'] === $a);
var_dump(bin2hex($row['value']));
}

$transaction->rollback();
16 changes: 16 additions & 0 deletions src/ByteA.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php declare(strict_types=1);

namespace Amp\Postgres;

final class ByteA
{
public function __construct(
private readonly string $data,
) {
}

public function getData(): string
{
return $this->data;
}
}
10 changes: 10 additions & 0 deletions src/Internal/AbstractHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Amp\DeferredFuture;
use Amp\Pipeline\Queue;
use Amp\Postgres\ByteA;
use Amp\Postgres\PostgresHandle;
use Amp\Postgres\PostgresResult;
use Amp\Postgres\PostgresStatement;
Expand Down Expand Up @@ -85,4 +86,13 @@ protected static function shutdown(
$onClose->complete();
}
}

protected function escapeParams(array $params): array
{
return \array_map(fn (mixed $param) => match (true) {
$param instanceof ByteA => $this->escapeByteA($param->getData()),
\is_array($param) => $this->escapeParams($param),
default => $param,
}, $params);
}
}
20 changes: 18 additions & 2 deletions src/Internal/PgSqlHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ private function fetchNextResult(string $sql): ?PostgresResult
public function statementExecute(string $name, array $params): PostgresResult
{
\assert(isset($this->statements[$name]), "Named statement not found when executing");
return $this->createResult($this->send(\pg_send_execute(...), $name, $params), $this->statements[$name]->sql);
$result = $this->send(\pg_send_execute(...), $name, \array_map(cast(...), $this->escapeParams($params)));
return $this->createResult($result, $this->statements[$name]->sql);
}

/**
Expand Down Expand Up @@ -318,6 +319,15 @@ public function statementDeallocate(string $name): void
$storage->future->ignore();
}

public function escapeByteA(string $data): string
{
if ($this->handle === null) {
throw new \Error("The connection to the database has been closed");
}

return \pg_escape_bytea($this->handle, $data);
}

public function query(string $sql): PostgresResult
{
if ($this->handle === null) {
Expand All @@ -336,7 +346,13 @@ public function execute(string $sql, array $params = []): PostgresResult
$sql = parseNamedParams($sql, $names);
$params = replaceNamedParams($params, $names);

return $this->createResult($this->send(\pg_send_query_params(...), $sql, $params), $sql);
$result = $this->send(
pg_send_query_params(...),
$sql,
\array_map(cast(...), $this->escapeParams($params))
);

return $this->createResult($result, $sql);
}

public function prepare(string $sql): PostgresStatement
Expand Down
5 changes: 4 additions & 1 deletion src/Internal/PgSqlResultIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ private function cast(int $oid, ?string $value): array|bool|int|float|string|nul
790 => $value, // money includes currency symbol as string
default => (int) $value, // All other numeric types cast to an integer
},
default => $value, // Return a string for all other types
default => match ($oid) { // String
17 => \pg_unescape_bytea($value),
default => $value, // Return a string for all other types
},
};
}
}
9 changes: 9 additions & 0 deletions src/Internal/PostgresConnectionTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,13 @@ public function quoteName(string $name): string
$this->assertOpen();
return $this->handle->quoteName($name);
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function escapeByteA(string $data): string
{
$this->assertOpen();
return $this->handle->escapeByteA($data);
}
}
5 changes: 5 additions & 0 deletions src/Internal/PostgresPooledTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,9 @@ public function quoteName(string $name): string
{
return $this->transaction->quoteName($name);
}

public function escapeByteA(string $data): string
{
return $this->transaction->escapeByteA($data);
}
}
2 changes: 1 addition & 1 deletion src/Internal/PqBufferedResultSet.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private static function generate(pq\Result $result): \Generator
$position = 0;

while (++$position <= $result->numRows) {
$result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY;
$result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY | pq\Result::CONV_BYTEA;
yield $result->fetchRow(pq\Result::FETCH_ASSOC);
}
}
Expand Down
22 changes: 20 additions & 2 deletions src/Internal/PqHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,11 @@ public function statementExecute(string $name, array $params): PostgresResult
throw new SqlException('Statement unexpectedly closed before being executed');
}

return $this->send($storage->sql, $statement->execAsync(...), $params);
return $this->send(
$storage->sql,
$statement->execAsync(...),
\array_map(cast(...), $this->escapeParams($params)),
);
}

/**
Expand Down Expand Up @@ -332,6 +336,15 @@ public function statementDeallocate(string $name): void
});
}

public function escapeByteA(string $data): string
{
if (!$this->handle) {
throw new \Error("The connection to the database has been closed");
}

return $this->handle->escapeBytea($data);
}

public function query(string $sql): PostgresResult
{
if (!$this->handle) {
Expand All @@ -350,7 +363,12 @@ public function execute(string $sql, array $params = []): PostgresResult
$sql = parseNamedParams($sql, $names);
$params = replaceNamedParams($params, $names);

return $this->send($sql, $this->handle->execParamsAsync(...), $sql, $params);
return $this->send(
$sql,
$this->handle->execParamsAsync(...),
$sql,
\array_map(cast(...), $this->escapeParams($params)),
);
}

public function prepare(string $sql): PostgresStatement
Expand Down
2 changes: 1 addition & 1 deletion src/Internal/PqUnbufferedResultSet.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public function __construct(
private static function generate(\Closure $fetch, pq\Result $result): \Generator
{
do {
$result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY;
$result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY | pq\Result::CONV_BYTEA;
yield $result->fetchRow(pq\Result::FETCH_ASSOC);
$result = $fetch();
} while ($result instanceof pq\Result);
Expand Down
4 changes: 2 additions & 2 deletions src/Internal/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ function parseNamedParams(string $sql, ?array &$names): string
* @param array $params User-provided array of statement parameters.
* @param list<int|string> $names Array generated by the $names param of {@see parseNamedParams()}.
*
* @return list<int|float|string|null>
* @return list<mixed>
*
* @throws \Error If the $param array does not contain a key corresponding to a named parameter.
*/
Expand All @@ -85,7 +85,7 @@ function replaceNamedParams(array $params, array $names): array
throw new \Error($message);
}

$values[] = cast($params[$name]);
$values[] = $params[$name];
}

return $values;
Expand Down
5 changes: 5 additions & 0 deletions src/PostgresConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,9 @@ final public function quoteName(string $name): string
{
return $this->handle->quoteName($name);
}

final public function escapeByteA(string $data): string
{
return $this->handle->escapeByteA($data);
}
}
2 changes: 1 addition & 1 deletion src/PostgresHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ interface PostgresHandle extends PostgresReceiver, PostgresQuoter
/**
* Execute the statement with the given name and parameters.
*
* @param list<int|float|string|null> $params List of statement parameters, indexed starting at 0.
* @param list<mixed> $params List of statement parameters, indexed starting at 0.
*/
public function statementExecute(string $name, array $params): PostgresResult;

Expand Down
5 changes: 5 additions & 0 deletions src/PostgresQuoter.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ public function quoteString(string $data): string;
* @throws \Error If the connection to the database has been closed.
*/
public function quoteName(string $name): string;

/**
* Escapes a binary string to be used as BYTEA data.
*/
public function escapeByteA(string $data): string;
}
53 changes: 4 additions & 49 deletions test/AbstractConnectTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,59 +3,14 @@
namespace Amp\Postgres\Test;

use Amp\Cancellation;
use Amp\CancelledException;
use Amp\DeferredCancellation;
use Amp\PHPUnit\AsyncTestCase;
use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresConnection;
use Amp\Sql\SqlException;
use Amp\TimeoutCancellation;

abstract class AbstractConnectTest extends AsyncTestCase
{
abstract public function connect(PostgresConfig $connectionConfig, Cancellation $cancellation = null): PostgresConnection;

public function testConnect()
{
$connection = $this->connect(
PostgresConfig::fromString('host=localhost user=postgres password=postgres'),
new TimeoutCancellation(1)
);
$this->assertInstanceOf(PostgresConnection::class, $connection);
}

/**
* @depends testConnect
*/
public function testConnectCancellationBeforeConnect()
{
$this->expectException(CancelledException::class);

$source = new DeferredCancellation;
$cancellation = $source->getCancellation();
$source->cancel();
$this->connect(PostgresConfig::fromString('host=localhost user=postgres password=postgres'), $cancellation);
}

/**
* @depends testConnectCancellationBeforeConnect
*/
public function testConnectCancellationAfterConnect()
{
$source = new DeferredCancellation;
$cancellation = $source->getCancellation();
$connection = $this->connect(PostgresConfig::fromString('host=localhost user=postgres password=postgres'), $cancellation);
$this->assertInstanceOf(PostgresConnection::class, $connection);
$source->cancel();
}

/**
* @depends testConnectCancellationBeforeConnect
*/
public function testConnectInvalidUser()
{
$this->expectException(SqlException::class);

$this->connect(PostgresConfig::fromString('host=localhost user=invalid password=invalid'), new TimeoutCancellation(100));
}
abstract public function connect(
PostgresConfig $connectionConfig,
?Cancellation $cancellation = null
): PostgresConnection;
}
63 changes: 63 additions & 0 deletions test/AbstractCreateConnectionTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

namespace Amp\Postgres\Test;

use Amp\CancelledException;
use Amp\DeferredCancellation;
use Amp\Postgres\PostgresConfig;
use Amp\Sql\SqlException;
use Amp\TimeoutCancellation;

abstract class AbstractCreateConnectionTest extends AbstractConnectTest
{
public function testConnect()
{
$connection = $this->connect(
PostgresConfig::fromString('host=localhost user=postgres password=postgres'),
new TimeoutCancellation(1)
);

$this->assertFalse($connection->isClosed());

$connection->onClose($this->createCallback(1));
$connection->close();

$this->assertTrue($connection->isClosed());
}

/**
* @depends testConnect
*/
public function testConnectCancellationBeforeConnect()
{
$this->expectException(CancelledException::class);

$source = new DeferredCancellation;
$cancellation = $source->getCancellation();
$source->cancel();
$this->connect(PostgresConfig::fromString('host=localhost user=postgres password=postgres'), $cancellation);
}

/**
* @depends testConnectCancellationBeforeConnect
*/
public function testConnectCancellationAfterConnect()
{
$source = new DeferredCancellation;
$cancellation = $source->getCancellation();
$connection = $this->connect(PostgresConfig::fromString('host=localhost user=postgres password=postgres'), $cancellation);
$source->cancel();

$this->assertFalse($connection->isClosed());
}

/**
* @depends testConnectCancellationBeforeConnect
*/
public function testConnectInvalidUser()
{
$this->expectException(SqlException::class);

$this->connect(PostgresConfig::fromString('host=localhost user=invalid password=invalid'), new TimeoutCancellation(100));
}
}
Loading