Skip to content

JDBC batch updates #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ To get the Java connector for Tarantool 1.6.9, visit

## Table of contents
* [Getting started](#getting-started)
* [JDBC](#JDBC)
* [Cluster support](#cluster-support)
* [Where to get help](#where-to-get-help)

Expand Down Expand Up @@ -170,6 +171,67 @@ System.out.println(template.query("select * from hello_world where hello=:id", C

For more implementation details, see [API documentation](http://tarantool.github.io/tarantool-java/apidocs/index.html).

## JDBC

### Batch updates

`Statement` and `PreparedStatement` objects can be used to submit batch
updates.

For instance, using `Statement` object:

```java
Statement statement = connection.createStatement();
statement.addBatch("INSERT INTO student VALUES (30, 'Joe Jones')");
statement.addBatch("INSERT INTO faculty VALUES (2, 'Faculty of Chemistry')");
statement.addBatch("INSERT INTO student_faculty VALUES (30, 2)");

int[] updateCounts = stmt.executeBatch();
```

or using `PreparedStatement`:

```java
PreparedStatement stmt = con.prepareStatement("INSERT INTO student VALUES (?, ?)");
stmt.setInt(1, 30);
stmt.setString(2, "Michael Korj");
stmt.addBatch();
stmt.setInt(1, 40);
stmt.setString(2, "Linda Simpson");
stmt.addBatch();

int[] updateCounts = stmt.executeBatch();
```

The connector uses a pipeliing when it performs a batch request. It means
each query is asynchronously sent one-by-one in order they were specified
in the batch.

There are a couple of caveats:

- JDBC spec recommends that *auto-commit* mode should be turned off
to prevent the driver from committing a transaction when a batch request
is called. The connector is not support transactions and *auto-commit* is
always enabled, so each statement from the batch is executed in its own
transaction.

- DDL operations aren't transactional in Tarantool. In this way, a batch
like this can produce an undefined behaviour (i.e. second statement can fail
with an error that `student` table does not exist).

```java
statement.addBatch("CREATE TABLE student (id INT PRIMARY KEY, name VARCHAR(100))");
statement.addBatch("INSERT INTO student VALUES (1, 'Alex Smith')");
```

- If `vinyl` storage engine is used an execution order of batch statements is
not specified. __NOTE:__ This behaviour is incompatible with JDBC spec in the
sentence "Batch commands are executed serially (at least logically) in the
order in which they were added to the batch"

- The driver continues processing the remaining commands in a batch once execution
of a command fails.

## Cluster support

To be more fault-tolerant the connector provides cluster extensions. In
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/org/tarantool/jdbc/SQLBatchResultHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.tarantool.jdbc;

import java.util.List;

/**
* Wrapper for batch SQL query results.
*/
public class SQLBatchResultHolder {

private final List<SQLResultHolder> results;
private final Exception error;

public SQLBatchResultHolder(List<SQLResultHolder> results, Exception error) {
this.results = results;
this.error = error;
}

public List<SQLResultHolder> getResults() {
return results;
}

public Exception getError() {
return error;
}

}
117 changes: 91 additions & 26 deletions src/main/java/org/tarantool/jdbc/SQLConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

/**
* Tarantool {@link Connection} implementation.
Expand Down Expand Up @@ -525,46 +527,59 @@ public int getNetworkTimeout() throws SQLException {
return (int) client.getOperationTimeout();
}

protected SQLResultHolder execute(long timeout, String sql, Object... args) throws SQLException {
protected SQLResultHolder execute(long timeout, SQLQueryHolder query) throws SQLException {
checkNotClosed();
return (useNetworkTimeout(timeout))
? executeWithNetworkTimeout(query)
: executeWithQueryTimeout(timeout, query);
}

protected SQLBatchResultHolder executeBatch(long timeout, List<SQLQueryHolder> queries) throws SQLException {
checkNotClosed();
SQLTarantoolClientImpl.SQLRawOps sqlOps = client.sqlRawOps();
SQLBatchResultHolder batchResult = useNetworkTimeout(timeout)
? sqlOps.executeBatch(queries)
: sqlOps.executeBatch(timeout, queries);

return batchResult;
}

private boolean useNetworkTimeout(long timeout) throws SQLException {
int networkTimeout = getNetworkTimeout();
return (timeout == 0 || (networkTimeout > 0 && networkTimeout < timeout))
? executeWithNetworkTimeout(sql, args)
: executeWithStatementTimeout(timeout, sql, args);
return timeout == 0 || (networkTimeout > 0 && networkTimeout < timeout);
}

private SQLResultHolder executeWithNetworkTimeout(String sql, Object... args) throws SQLException {
private SQLResultHolder executeWithNetworkTimeout(SQLQueryHolder query) throws SQLException {
try {
return client.sqlRawOps().execute(sql, args);
return client.sqlRawOps().execute(query);
} catch (Exception e) {
handleException(e);
throw new SQLException(formatError(sql, args), e);
throw new SQLException(formatError(query), e);
}
}

/**
* Executes a query using a custom timeout.
*
* @param timeout query timeout
* @param sql query
* @param args query bindings
* @param query query
*
* @return SQL result holder
*
* @throws StatementTimeoutException if query execution took more than query timeout
* @throws SQLException if any other errors occurred
*/
private SQLResultHolder executeWithStatementTimeout(long timeout, String sql, Object... args) throws SQLException {
private SQLResultHolder executeWithQueryTimeout(long timeout, SQLQueryHolder query) throws SQLException {
try {
return client.sqlRawOps().execute(timeout, sql, args);
return client.sqlRawOps().execute(timeout, query);
} catch (Exception e) {
// statement timeout should not affect the current connection
// but can be handled by the caller side
if (e.getCause() instanceof TimeoutException) {
throw new StatementTimeoutException(formatError(sql, args), e.getCause());
throw new StatementTimeoutException(formatError(query), e.getCause());
}
handleException(e);
throw new SQLException(formatError(sql, args), e);
throw new SQLException(formatError(query), e);
}
}

Expand Down Expand Up @@ -708,28 +723,74 @@ private void checkHoldabilitySupport(int holdability) throws SQLException {
/**
* Provides error message that contains parameters of failed SQL statement.
*
* @param sql SQL Text.
* @param params Parameters of the SQL statement.
* @param query SQL query
*
* @return Formatted error message.
*/
private static String formatError(String sql, Object... params) {
return "Failed to execute SQL: " + sql + ", params: " + Arrays.deepToString(params);
private static String formatError(SQLQueryHolder query) {
return "Failed to execute SQL: " + query.getQuery() + ", params: " + query.getParams();
}

static class SQLTarantoolClientImpl extends TarantoolClientImpl {

private Future<?> executeQuery(SQLQueryHolder queryHolder) {
return exec(Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams());
}

private Future<?> executeQuery(SQLQueryHolder queryHolder, long timeoutMillis) {
return exec(
timeoutMillis, Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams()
);
}

final SQLRawOps sqlRawOps = new SQLRawOps() {
@Override
public SQLResultHolder execute(String sql, Object... binds) {
return (SQLResultHolder) syncGet(exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, binds));
public SQLResultHolder execute(SQLQueryHolder query) {
return (SQLResultHolder) syncGet(executeQuery(query));
}

@Override
public SQLResultHolder execute(long timeoutMillis, String sql, Object... binds) {
return (SQLResultHolder) syncGet(
exec(timeoutMillis, Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, binds)
);
public SQLResultHolder execute(long timeoutMillis, SQLQueryHolder query) {
return (SQLResultHolder) syncGet(executeQuery(query, timeoutMillis));
}

@Override
public SQLBatchResultHolder executeBatch(List<SQLQueryHolder> queries) {
return executeInternal(queries, (query) -> executeQuery(query));
}

@Override
public SQLBatchResultHolder executeBatch(long timeoutMillis, List<SQLQueryHolder> queries) {
return executeInternal(queries, (query) -> executeQuery(query, timeoutMillis));
}

private SQLBatchResultHolder executeInternal(List<SQLQueryHolder> queries,
Function<SQLQueryHolder, Future<?>> fetcher) {
List<Future<?>> sqlFutures = new ArrayList<>();
// using queries pipelining to emulate a batch request
for (SQLQueryHolder query : queries) {
sqlFutures.add(fetcher.apply(query));
}
// wait for all the results
Exception lastError = null;
List<SQLResultHolder> items = new ArrayList<>(queries.size());
for (Future<?> future : sqlFutures) {
try {
SQLResultHolder result = (SQLResultHolder) syncGet(future);
if (result.isQueryResult()) {
lastError = new SQLException(
"Result set is not allowed in the batch response",
SQLStates.TOO_MANY_RESULTS.getSqlState()
);
}
items.add(result);
} catch (RuntimeException e) {
// empty result set will be treated as a wrong result
items.add(SQLResultHolder.ofEmptyQuery());
lastError = e;
}
}
return new SQLBatchResultHolder(items, lastError);
}
};

Expand Down Expand Up @@ -758,9 +819,13 @@ protected void completeSql(TarantoolOp<?> future, TarantoolPacket pack) {

interface SQLRawOps {

SQLResultHolder execute(String sql, Object... binds);
SQLResultHolder execute(SQLQueryHolder query);

SQLResultHolder execute(long timeoutMillis, SQLQueryHolder query);

SQLBatchResultHolder executeBatch(List<SQLQueryHolder> queries);

SQLResultHolder execute(long timeoutMillis, String sql, Object... binds);
SQLBatchResultHolder executeBatch(long timeoutMillis, List<SQLQueryHolder> queries);

}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/tarantool/jdbc/SQLDatabaseMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ public boolean insertsAreDetected(int type) throws SQLException {

@Override
public boolean supportsBatchUpdates() throws SQLException {
return false;
return true;
}

@Override
Expand Down
Loading