From dd1f29fd340b2150213712fcfe3b67988ed787db Mon Sep 17 00:00:00 2001 From: nicktorwald Date: Thu, 19 Sep 2019 01:02:51 +0700 Subject: [PATCH] Soft automatic schema reload Now client keeps actual schema metadata and sends schemaId header to be checked against current Tarantool schema version. If client version mismatches DB version client does schema reloading in the background. Client operation interface was reworked in scope of support not only number identifiers for spaces and indexes but also their string names. Closes: #7, #137 --- README.md | 51 +- .../org/tarantool/AbstractTarantoolOps.java | 192 ++++++- src/main/java/org/tarantool/Iterator.java | 10 + .../java/org/tarantool/TarantoolBase.java | 13 +- .../java/org/tarantool/TarantoolClient.java | 4 + .../org/tarantool/TarantoolClientImpl.java | 493 ++++++++++-------- .../org/tarantool/TarantoolClientOps.java | 37 +- .../org/tarantool/TarantoolClusterClient.java | 89 ++-- .../org/tarantool/TarantoolConnection.java | 14 +- .../org/tarantool/TarantoolException.java | 12 +- .../org/tarantool/TarantoolOperation.java | 183 +++++++ .../java/org/tarantool/TarantoolRequest.java | 86 +++ .../tarantool/TarantoolRequestArgument.java | 23 + .../TarantoolRequestArgumentFactory.java | 79 +++ ...antoolClusterStoredFunctionDiscoverer.java | 18 +- .../org/tarantool/jdbc/SQLConnection.java | 23 +- .../tarantool/protocol/ProtoConstants.java | 19 + .../org/tarantool/protocol/ProtoUtils.java | 18 +- .../tarantool/protocol/TarantoolPacket.java | 15 +- .../tarantool/schema/TarantoolIndexMeta.java | 128 +++++ .../TarantoolIndexNotFoundException.java | 16 + .../schema/TarantoolMetaSpacesCache.java | 122 +++++ .../schema/TarantoolSchemaException.java | 15 + .../tarantool/schema/TarantoolSchemaMeta.java | 48 ++ .../tarantool/schema/TarantoolSpaceMeta.java | 99 ++++ .../TarantoolSpaceNotFoundException.java | 9 + .../java/org/tarantool/util/TupleTwo.java | 4 +- .../tarantool/ClientAsyncOperationsIT.java | 310 ++++++++++- .../org/tarantool/ClientOperationsIT.java | 185 +++++++ .../tarantool/ClientReconnectClusterIT.java | 38 +- .../java/org/tarantool/ClientReconnectIT.java | 19 +- .../FireAndForgetClientOperationsIT.java | 99 +++- src/test/java/org/tarantool/IteratorTest.java | 35 -- .../org/tarantool/TarantoolClientOpsIT.java | 10 +- src/test/java/org/tarantool/TestUtils.java | 2 +- ...sterServiceStoredFunctionDiscovererIT.java | 3 +- .../jdbc/JdbcConnectionTimeoutIT.java | 3 +- .../schema/ClientReconnectSchemaIT.java | 94 ++++ .../org/tarantool/schema/ClientSchemaIT.java | 247 +++++++++ .../schema/ClientThreadSafeSchemaIT.java | 105 ++++ 40 files changed, 2527 insertions(+), 443 deletions(-) create mode 100644 src/main/java/org/tarantool/TarantoolOperation.java create mode 100644 src/main/java/org/tarantool/TarantoolRequest.java create mode 100644 src/main/java/org/tarantool/TarantoolRequestArgument.java create mode 100644 src/main/java/org/tarantool/TarantoolRequestArgumentFactory.java create mode 100644 src/main/java/org/tarantool/protocol/ProtoConstants.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolIndexMeta.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolMetaSpacesCache.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolSchemaException.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java create mode 100644 src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java delete mode 100644 src/test/java/org/tarantool/IteratorTest.java create mode 100644 src/test/java/org/tarantool/schema/ClientReconnectSchemaIT.java create mode 100644 src/test/java/org/tarantool/schema/ClientSchemaIT.java create mode 100644 src/test/java/org/tarantool/schema/ClientThreadSafeSchemaIT.java diff --git a/README.md b/README.md index 0e5f2da4..b1e90500 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,7 @@ Feel free to override any method of `TarantoolClientImpl`. For example, to hook all the results, you could override this: ```java -protected void complete(TarantoolPacket packet, TarantoolOp future); +protected void complete(TarantoolPacket packet, CompletableFuture future); ``` ### Client config options @@ -181,6 +181,55 @@ Supported options are follow: 14. `operationExpiryTimeMillis` is a default request timeout in ms. Default value is `1000` (1 second). +## String space/index resolution + +Each operation that requires space or index to be executed, can work with +number ID as well as string name of a space or an index. +Assume, we have `my_space` space with space ID `512` and its primary index +`primary` with index ID `0`. Then, for instance, `select` operations can be +performed using their names: + +```java +client.syncOps().select(512, 0, Collections.singletonList(1), 0, 1, Iterator.EQ); +// or using more convenient way +client.syncOps().select("my_space", "primary", Collections.singletonList(1), 0, 1, Iterator.EQ); +``` + +Because _iproto_ has not yet supported string spaces and indexes, a client caches current server +schema in memory. The client relies on protocol SCHEMA_ID and sends each request with respect to +cached schema version. The schema is used primarily to resolve string names of spaces or indexes +against its integer IDs. + +### Schema update + +1. Just after a (re-)connection to the Tarantool instance. + The client cannot guarantee that new instance is the same and has same schema, + thus, the client drops the cached schema and fetches new one. +2. Receiving a schema version error as a response to our request. + It's possible some request can be rejected by server because of schema + mismatching between client and server. In this case the schema will be + reloaded and the refused request will be resent using the updated schema + version. +3. Sending a DDL request and receiving a new version in a response. +4. Sending a request against a non-existent space/index name. + The client cannot exactly know whether name was not found because of + it does not exist or it has not the latest schema version. A ping request + is sent in the case to check a schema version and then a client will reload + it if needed. The original request will be retried if a space / an index + name will be found in a new schema. + +### Schema support caveats + +1. Each schema reloading requires at least two extra requests to fetch spaces and + indexes metadata respectively. There is also a ping request followed by reloading + of the schema to check whether the client has outdated version (see point 4 in + [Schema update](#schema-update)). +2. In some circumstance, requests can be rejected several times until both client's + and server's versions matches. It may take significant amount of time or even be + a cause of request timeout. +3. The client guarantees an order of synchronous requests per thread. Other cases such + as asynchronous or multi-threaded requests may be out of order before the execution. + ## Spring NamedParameterJdbcTemplate usage example The JDBC driver uses `TarantoolClient` implementation to provide a communication with server. diff --git a/src/main/java/org/tarantool/AbstractTarantoolOps.java b/src/main/java/org/tarantool/AbstractTarantoolOps.java index 8cefb379..bdef668d 100644 --- a/src/main/java/org/tarantool/AbstractTarantoolOps.java +++ b/src/main/java/org/tarantool/AbstractTarantoolOps.java @@ -1,62 +1,204 @@ package org.tarantool; +import static org.tarantool.TarantoolRequestArgumentFactory.cacheLookupValue; +import static org.tarantool.TarantoolRequestArgumentFactory.value; -public abstract class AbstractTarantoolOps - implements TarantoolClientOps { +import org.tarantool.schema.TarantoolSchemaMeta; + +import java.util.List; + +public abstract class AbstractTarantoolOps + implements TarantoolClientOps, Object, Result> { private Code callCode = Code.CALL; - protected abstract Result exec(Code code, Object... args); + protected abstract Result exec(TarantoolRequest request); + + protected abstract TarantoolSchemaMeta getSchemaMeta(); + + public Result select(Integer space, Integer index, List key, int offset, int limit, Iterator iterator) { + return select(space, index, key, offset, limit, iterator.getValue()); + } - public Result select(Space space, Space index, Tuple key, int offset, int limit, Iterator iterator) { + @Override + public Result select(String space, String index, List key, int offset, int limit, Iterator iterator) { return select(space, index, key, offset, limit, iterator.getValue()); } - public Result select(Space space, Space index, Tuple key, int offset, int limit, int iterator) { + @Override + public Result select(Integer space, Integer index, List key, int offset, int limit, int iterator) { + return exec( + new TarantoolRequest( + Code.SELECT, + value(Key.SPACE), value(space), + value(Key.INDEX), value(index), + value(Key.KEY), value(key), + value(Key.ITERATOR), value(iterator), + value(Key.LIMIT), value(limit), + value(Key.OFFSET), value(offset) + ) + ); + } + + @Override + public Result select(String space, String index, List key, int offset, int limit, int iterator) { + return exec( + new TarantoolRequest( + Code.SELECT, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.INDEX), cacheLookupValue(() -> getSchemaMeta().getSpaceIndex(space, index).getId()), + value(Key.KEY), value(key), + value(Key.ITERATOR), value(iterator), + value(Key.LIMIT), value(limit), + value(Key.OFFSET), value(offset) + ) + ); + } + + @Override + public Result insert(Integer space, List tuple) { + return exec(new TarantoolRequest( + Code.INSERT, + value(Key.SPACE), value(space), + value(Key.TUPLE), value(tuple) + ) + ); + } + + @Override + public Result insert(String space, List tuple) { + return exec( + new TarantoolRequest( + Code.INSERT, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.TUPLE), value(tuple) + ) + ); + } + + @Override + public Result replace(Integer space, List tuple) { + return exec( + new TarantoolRequest( + Code.REPLACE, + value(Key.SPACE), value(space), + value(Key.TUPLE), value(tuple) + ) + ); + } + + @Override + public Result replace(String space, List tuple) { return exec( - Code.SELECT, - Key.SPACE, space, - Key.INDEX, index, - Key.KEY, key, - Key.ITERATOR, iterator, - Key.LIMIT, limit, - Key.OFFSET, offset + new TarantoolRequest( + Code.REPLACE, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.TUPLE), value(tuple) + ) ); } - public Result insert(Space space, Tuple tuple) { - return exec(Code.INSERT, Key.SPACE, space, Key.TUPLE, tuple); + @Override + public Result update(Integer space, List key, Object... operations) { + return exec( + new TarantoolRequest( + Code.UPDATE, + value(Key.SPACE), value(space), + value(Key.KEY), value(key), + value(Key.TUPLE), value(operations) + ) + ); } - public Result replace(Space space, Tuple tuple) { - return exec(Code.REPLACE, Key.SPACE, space, Key.TUPLE, tuple); + @Override + public Result update(String space, List key, Object... operations) { + return exec( + new TarantoolRequest( + Code.UPDATE, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.KEY), value(key), + value(Key.TUPLE), value(operations) + ) + ); + } + + @Override + public Result upsert(Integer space, List key, List defTuple, Object... operations) { + return exec( + new TarantoolRequest( + Code.UPSERT, + value(Key.SPACE), value(space), + value(Key.KEY), value(key), + value(Key.TUPLE), value(defTuple), + value(Key.UPSERT_OPS), value(operations) + ) + ); } - public Result update(Space space, Tuple key, Operation... args) { - return exec(Code.UPDATE, Key.SPACE, space, Key.KEY, key, Key.TUPLE, args); + @Override + public Result upsert(String space, List key, List defTuple, Object... operations) { + return exec( + new TarantoolRequest( + Code.UPSERT, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.KEY), value(key), + value(Key.TUPLE), value(defTuple), + value(Key.UPSERT_OPS), value(operations) + ) + ); } - public Result upsert(Space space, Tuple key, Tuple def, Operation... args) { - return exec(Code.UPSERT, Key.SPACE, space, Key.KEY, key, Key.TUPLE, def, Key.UPSERT_OPS, args); + @Override + public Result delete(Integer space, List key) { + return exec( + new TarantoolRequest( + Code.DELETE, + value(Key.SPACE), value(space), + value(Key.KEY), value(key) + ) + ); } - public Result delete(Space space, Tuple key) { - return exec(Code.DELETE, Key.SPACE, space, Key.KEY, key); + @Override + public Result delete(String space, List key) { + return exec( + new TarantoolRequest( + Code.DELETE, + value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()), + value(Key.KEY), value(key) + ) + ); } + @Override public Result call(String function, Object... args) { - return exec(callCode, Key.FUNCTION, function, Key.TUPLE, args); + return exec( + new TarantoolRequest( + callCode, + value(Key.FUNCTION), value(function), + value(Key.TUPLE), value(args) + ) + ); } + @Override public Result eval(String expression, Object... args) { - return exec(Code.EVAL, Key.EXPRESSION, expression, Key.TUPLE, args); + return exec( + new TarantoolRequest( + Code.EVAL, + value(Key.EXPRESSION), value(expression), + value(Key.TUPLE), value(args) + ) + ); } + @Override public void ping() { - exec(Code.PING); + exec(new TarantoolRequest(Code.PING)); } public void setCallCode(Code callCode) { this.callCode = callCode; } + } diff --git a/src/main/java/org/tarantool/Iterator.java b/src/main/java/org/tarantool/Iterator.java index 4452a744..c013530a 100644 --- a/src/main/java/org/tarantool/Iterator.java +++ b/src/main/java/org/tarantool/Iterator.java @@ -1,5 +1,7 @@ package org.tarantool; +import java.util.Arrays; + // Iterator info was taken from here https://github.com/tarantool/tarantool/blob/f66584c3bcdffe61d6d99a4868a9b72d62338a11/src/box/iterator_type.h#L62 public enum Iterator { EQ(0), // key == x ASC order @@ -24,4 +26,12 @@ public enum Iterator { public int getValue() { return value; } + + public static Iterator valueOf(int value) { + return Arrays.stream(Iterator.values()) + .filter(v -> value == v.getValue()) + .findFirst() + .orElseThrow(IllegalArgumentException::new); + } + } diff --git a/src/main/java/org/tarantool/TarantoolBase.java b/src/main/java/org/tarantool/TarantoolBase.java index c74647ae..26656112 100644 --- a/src/main/java/org/tarantool/TarantoolBase.java +++ b/src/main/java/org/tarantool/TarantoolBase.java @@ -6,10 +6,9 @@ import java.io.IOException; import java.net.Socket; import java.nio.channels.SocketChannel; -import java.util.List; import java.util.concurrent.atomic.AtomicLong; -public abstract class TarantoolBase extends AbstractTarantoolOps, Object, Result> { +public abstract class TarantoolBase extends AbstractTarantoolOps { protected String serverVersion; protected MsgPackLite msgPackLite = MsgPackLite.INSTANCE; protected AtomicLong syncId = new AtomicLong(); @@ -42,16 +41,6 @@ protected void closeChannel(SocketChannel channel) { } } - protected void validateArgs(Object[] args) { - if (args != null) { - for (int i = 0; i < args.length; i += 2) { - if (args[i + 1] == null) { - throw new NullPointerException(((Key) args[i]).name() + " should not be null"); - } - } - } - } - public void setInitialRequestSize(int initialRequestSize) { this.initialRequestSize = initialRequestSize; } diff --git a/src/main/java/org/tarantool/TarantoolClient.java b/src/main/java/org/tarantool/TarantoolClient.java index 2ad0c84c..d560d682 100644 --- a/src/main/java/org/tarantool/TarantoolClient.java +++ b/src/main/java/org/tarantool/TarantoolClient.java @@ -1,5 +1,7 @@ package org.tarantool; +import org.tarantool.schema.TarantoolSchemaMeta; + import java.util.List; import java.util.Map; import java.util.concurrent.CompletionStage; @@ -29,4 +31,6 @@ public interface TarantoolClient { boolean waitAlive(long timeout, TimeUnit unit) throws InterruptedException; + TarantoolSchemaMeta getSchemaMeta(); + } diff --git a/src/main/java/org/tarantool/TarantoolClientImpl.java b/src/main/java/org/tarantool/TarantoolClientImpl.java index 22922733..ae1d7360 100644 --- a/src/main/java/org/tarantool/TarantoolClientImpl.java +++ b/src/main/java/org/tarantool/TarantoolClientImpl.java @@ -2,40 +2,50 @@ import org.tarantool.logging.Logger; import org.tarantool.logging.LoggerFactory; +import org.tarantool.protocol.ProtoConstants; import org.tarantool.protocol.ProtoUtils; import org.tarantool.protocol.ReadableViaSelectorChannel; import org.tarantool.protocol.TarantoolGreeting; import org.tarantool.protocol.TarantoolPacket; +import org.tarantool.schema.TarantoolMetaSpacesCache; +import org.tarantool.schema.TarantoolSchemaException; +import org.tarantool.schema.TarantoolSchemaMeta; import org.tarantool.util.StringUtils; +import org.tarantool.util.TupleTwo; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.time.Duration; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.StampedLock; public class TarantoolClientImpl extends TarantoolBase> implements TarantoolClient { private static final Logger LOGGER = LoggerFactory.getLogger(TarantoolClientImpl.class); protected TarantoolClientConfig config; - protected long operationTimeout; + protected Duration operationTimeout; /** * External. @@ -46,7 +56,12 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected volatile Exception thumbstone; - protected Map> futures; + protected ScheduledExecutorService workExecutor; + + protected StampedLock schemaLock = new StampedLock(); + protected BlockingQueue delayedOperationsQueue; + + protected Map futures; protected AtomicInteger pendingResponsesCount = new AtomicInteger(); /** @@ -66,6 +81,7 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected SyncOps syncOps; protected FireAndForgetOps fireAndForgetOps; protected ComposableAsyncOps composableAsyncOps; + protected UnsafeSchemaOps unsafeSchemaOps; /** * Inner. @@ -75,6 +91,8 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar protected Thread reader; protected Thread writer; + protected TarantoolSchemaMeta schemaMeta = new TarantoolMetaSpacesCache(this); + protected Thread connector = new Thread(new Runnable() { @Override public void run() { @@ -106,10 +124,13 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient private void initClient(SocketChannelProvider socketProvider, TarantoolClientConfig config) { this.config = config; this.initialRequestSize = config.defaultRequestSize; - this.operationTimeout = config.operationExpiryTimeMillis; + this.operationTimeout = Duration.ofMillis(config.operationExpiryTimeMillis); this.socketProvider = socketProvider; this.stats = new TarantoolClientStats(); this.futures = new ConcurrentHashMap<>(config.predictedFutures); + this.delayedOperationsQueue = new PriorityBlockingQueue<>(128); + this.workExecutor = + Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantool-worker")); this.sharedBuffer = ByteBuffer.allocateDirect(config.sharedBufferSize); this.writerBuffer = ByteBuffer.allocateDirect(sharedBuffer.capacity()); this.connector.setDaemon(true); @@ -117,6 +138,7 @@ private void initClient(SocketChannelProvider socketProvider, TarantoolClientCon this.syncOps = new SyncOps(); this.composableAsyncOps = new ComposableAsyncOps(); this.fireAndForgetOps = new FireAndForgetOps(); + this.unsafeSchemaOps = new UnsafeSchemaOps(); if (!config.useNewCall) { setCallCode(Code.OLD_CALL); this.syncOps.setCallCode(Code.OLD_CALL); @@ -203,6 +225,7 @@ protected void connect(final SocketChannel channel) throws Exception { } this.thumbstone = null; startThreads(channel.socket().getRemoteSocketAddress().toString()); + updateSchema(); } protected void startThreads(String threadName) throws InterruptedException { @@ -214,7 +237,7 @@ protected void startThreads(String threadName) throws InterruptedException { try { readThread(); } finally { - state.release(StateHelper.READING); + state.release(StateHelper.READING | StateHelper.SCHEMA_UPDATING); // only last of two IO-threads can signal for reconnection if (leftIoThreads.decrementAndGet() == 0) { state.trySignalForReconnection(); @@ -228,7 +251,7 @@ protected void startThreads(String threadName) throws InterruptedException { try { writeThread(); } finally { - state.release(StateHelper.WRITING); + state.release(StateHelper.WRITING | StateHelper.SCHEMA_UPDATING); // only last of two IO-threads can signal for reconnection if (leftIoThreads.decrementAndGet() == 0) { state.trySignalForReconnection(); @@ -251,61 +274,86 @@ protected void configureThreads(String threadName) { reader.setPriority(config.readerThreadPriority); } + @Override + public TarantoolSchemaMeta getSchemaMeta() { + return schemaMeta; + } + /** * Executes an operation with default timeout. * - * @param code operation code - * @param args operation arguments + * @param request operation data * * @return deferred result * * @see #setOperationTimeout(long) */ - protected Future exec(Code code, Object... args) { - return doExec(operationTimeout, code, args); + @Override + protected Future exec(TarantoolRequest request) { + return doExec(request).getResult(); + } + + protected TarantoolOperation doExec(TarantoolRequest request) { + long stamp = schemaLock.readLock(); + try { + if (request.getTimeout() == null) { + request.setTimeout(operationTimeout); + } + TarantoolOperation operation = request.toOperation(syncId.incrementAndGet(), schemaMeta.getSchemaVersion()); + // space or index names could not be found in the cache + if (!operation.isSerializable()) { + delayedOperationsQueue.add(operation); + // It's possible the client keeps the outdated schema. + // Send a preflight ping request to check the schema + // version and refresh it if one is obsolete + if (isSchemaLoaded()) { + TarantoolOperation ping = new TarantoolRequest(Code.PING) + .toPreflightOperation(syncId.incrementAndGet(), schemaMeta.getSchemaVersion(), operation); + registerOperation(ping); + } + return operation; + } + // postpone operation if the schema is not ready + if (!isSchemaLoaded()) { + delayedOperationsQueue.add(operation); + return operation; + } + return registerOperation(operation); + } finally { + schemaLock.unlockRead(stamp); + } } /** - * Executes an operation with the given timeout. - * {@code timeoutMillis} will override the default - * timeout. 0 means the limitless operation. + * Checks whether the schema is fully cached. * - * @param timeoutMillis operation timeout - * @param code operation code - * @param args operation arguments - * - * @return deferred result + * @return {@literal true} if the schema is loaded */ - protected Future exec(long timeoutMillis, Code code, Object... args) { - return doExec(timeoutMillis, code, args); + private boolean isSchemaLoaded() { + return schemaMeta.isInitialized() && !state.isStateSet(StateHelper.SCHEMA_UPDATING); } - protected TarantoolOp doExec(long timeoutMillis, Code code, Object[] args) { - validateArgs(args); - long sid = syncId.incrementAndGet(); - - TarantoolOp future = makeNewOperation(timeoutMillis, sid, code, args); - - if (isDead(future)) { - return future; + protected TarantoolOperation registerOperation(TarantoolOperation operation) { + if (isDead(operation)) { + return operation; } - futures.put(sid, future); - if (isDead(future)) { - futures.remove(sid); - return future; + futures.put(operation.getId(), operation); + if (isDead(operation)) { + futures.remove(operation.getId()); + return operation; } try { - write(code, sid, null, args); + write( + operation.getCode(), + operation.getId(), + operation.getSentSchemaId(), + operation.getArguments().toArray() + ); } catch (Exception e) { - futures.remove(sid); - fail(future, e); + futures.remove(operation.getId()); + fail(operation, e); } - return future; - } - - protected TarantoolOp makeNewOperation(long timeoutMillis, long sid, Code code, Object[] args) { - return new TarantoolOp<>(sid, code, args) - .orTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + return operation; } protected synchronized void die(String message, Exception cause) { @@ -315,16 +363,22 @@ protected synchronized void die(String message, Exception cause) { final CommunicationException error = new CommunicationException(message, cause); this.thumbstone = error; while (!futures.isEmpty()) { - Iterator>> iterator = futures.entrySet().iterator(); + Iterator> iterator = futures.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry> elem = iterator.next(); + Map.Entry elem = iterator.next(); if (elem != null) { - TarantoolOp future = elem.getValue(); - fail(future, error); + TarantoolOperation operation = elem.getValue(); + fail(operation, error); } iterator.remove(); } } + + TarantoolOperation operation; + while ((operation = delayedOperationsQueue.poll()) != null) { + fail(operation, error); + } + pendingResponsesCount.set(0); bufferLock.lock(); @@ -337,8 +391,9 @@ protected synchronized void die(String message, Exception cause) { stopIO(); } + @Override public void ping() { - syncGet(exec(Code.PING)); + syncGet(exec(new TarantoolRequest(Code.PING))); } protected void write(Code code, Long syncId, Long schemaId, Object... args) @@ -430,10 +485,10 @@ protected void readThread() { Map headers = packet.getHeaders(); Long syncId = (Long) headers.get(Key.SYNC.getId()); - TarantoolOp future = futures.remove(syncId); + TarantoolOperation request = futures.remove(syncId); stats.received++; pendingResponsesCount.decrementAndGet(); - complete(packet, future); + complete(packet, request); } catch (Exception e) { die("Cant read answer", e); return; @@ -473,33 +528,119 @@ protected void writeThread() { } } - protected void fail(TarantoolOp future, Exception e) { - future.completeExceptionally(e); + protected void fail(TarantoolOperation operation, Exception e) { + operation.getResult().completeExceptionally(e); } - protected void complete(TarantoolPacket packet, TarantoolOp future) { - if (future != null) { - long code = packet.getCode(); - if (code == 0) { - if (future.getCode() == Code.EXECUTE) { - completeSql(future, packet); - } else { - ((TarantoolOp) future).complete(packet.getBody().get(Key.DATA.getId())); + protected void complete(TarantoolPacket packet, TarantoolOperation operation) { + CompletableFuture result = operation.getResult(); + if (result.isDone()) { + return; + } + + long code = packet.getCode(); + long schemaId = packet.getSchemaId(); + boolean isPreflightPing = operation.getDependedOperation() != null; + if (code == ProtoConstants.SUCCESS) { + operation.setCompletedSchemaId(schemaId); + if (isPreflightPing) { + // the schema wasn't changed + // try to evaluate an unserializable target operation + // in order to complete the operation exceptionally. + TarantoolOperation target = operation.getDependedOperation(); + delayedOperationsQueue.remove(target); + try { + target.getArguments(); + } catch (TarantoolSchemaException cause) { + fail(target, cause); } + } else if (operation.getCode() == Code.EXECUTE) { + completeSql(operation, packet); } else { - Object error = packet.getBody().get(Key.ERROR.getId()); - fail(future, serverError(code, error)); + ((CompletableFuture) result).complete(packet.getData()); + } + } else if (code == ProtoConstants.ERR_WRONG_SCHEMA_VERSION) { + if (schemaId > schemaMeta.getSchemaVersion()) { + delayedOperationsQueue.add(operation); + } else { + operation.setSentSchemaId(schemaMeta.getSchemaVersion()); + registerOperation(operation); + } + } else { + Object error = packet.getError(); + fail(operation, serverError(code, error)); + } + + if (operation.getSentSchemaId() == 0) { + return; + } + // it's possible to receive bigger version than current + // i.e. after DDL operation or wrong schema version response + if (schemaId > schemaMeta.getSchemaVersion()) { + updateSchema(); + } + } + + private void updateSchema() { + performSchemaAction(() -> { + if (state.acquire(StateHelper.SCHEMA_UPDATING)) { + workExecutor.execute(createUpdateSchemaTask()); + } + }); + } + + private Runnable createUpdateSchemaTask() { + return () -> { + try { + schemaMeta.refresh(); + } catch (Exception cause) { + workExecutor.schedule(createUpdateSchemaTask(), 300L, TimeUnit.MILLISECONDS); + return; + } + performSchemaAction(() -> { + try { + rescheduleDelayedOperations(); + } finally { + state.release(StateHelper.SCHEMA_UPDATING); + } + }); + }; + } + + private void rescheduleDelayedOperations() { + TarantoolOperation operation; + while ((operation = delayedOperationsQueue.poll()) != null) { + CompletableFuture result = operation.getResult(); + if (!result.isDone()) { + operation.setSentSchemaId(schemaMeta.getSchemaVersion()); + registerOperation(operation); } } } - protected void completeSql(TarantoolOp future, TarantoolPacket pack) { + protected void completeSql(TarantoolOperation operation, TarantoolPacket pack) { Long rowCount = SqlProtoUtils.getSQLRowCount(pack); + CompletableFuture result = operation.getResult(); if (rowCount != null) { - ((TarantoolOp) future).complete(rowCount); + ((CompletableFuture) result).complete(rowCount); } else { List> values = SqlProtoUtils.readSqlResult(pack); - ((TarantoolOp) future).complete(values); + ((CompletableFuture) result).complete(values); + } + } + + /** + * Convenient guard scope that executes given runnable + * inside schema write lock. + * + * @param action to be executed + */ + protected void performSchemaAction(Runnable action) { + long stamp = schemaLock.writeLock(); + try { + action.run(); + } finally { + schemaLock.unlockWrite(stamp); } } @@ -535,6 +676,9 @@ public void close() { protected void close(Exception e) { if (state.close()) { + if (workExecutor != null) { + workExecutor.shutdownNow(); + } connector.interrupt(); die(e.getMessage(), e); } @@ -563,7 +707,7 @@ protected void stopIO() { * @return timeout in millis */ public long getOperationTimeout() { - return operationTimeout; + return operationTimeout.toMillis(); } /** @@ -572,17 +716,17 @@ public long getOperationTimeout() { * @param operationTimeout timeout in millis */ public void setOperationTimeout(long operationTimeout) { - this.operationTimeout = operationTimeout; + this.operationTimeout = Duration.ofMillis(operationTimeout); } @Override public boolean isAlive() { - return state.getState() == StateHelper.ALIVE && thumbstone == null; + return state.isStateSet(StateHelper.ALIVE) && thumbstone == null; } @Override public boolean isClosed() { - return state.getState() == StateHelper.CLOSED; + return state.isStateSet(StateHelper.CLOSED); } @Override @@ -615,17 +759,31 @@ public TarantoolClientOps, Object, Long> fireAndForgetOps() { return fireAndForgetOps; } + public TarantoolClientOps, Object, TupleTwo, Long>> unsafeSchemaOps() { + return unsafeSchemaOps; + } + + protected TarantoolRequest makeSqlRequest(String sql, List bind) { + return new TarantoolRequest( + Code.EXECUTE, + TarantoolRequestArgumentFactory.value(Key.SQL_TEXT), + TarantoolRequestArgumentFactory.value(sql), + TarantoolRequestArgumentFactory.value(Key.SQL_BIND), + TarantoolRequestArgumentFactory.value(bind) + ); + } + @Override public TarantoolSQLOps>> sqlSyncOps() { return new TarantoolSQLOps>>() { @Override public Long update(String sql, Object... bind) { - return (Long) syncGet(exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, bind)); + return (Long) syncGet(exec(makeSqlRequest(sql, Arrays.asList(bind)))); } @Override public List> query(String sql, Object... bind) { - return (List>) syncGet(exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, bind)); + return (List>) syncGet(exec(makeSqlRequest(sql, Arrays.asList(bind)))); } }; } @@ -635,39 +793,32 @@ public TarantoolSQLOps, Future>>> return new TarantoolSQLOps, Future>>>() { @Override public Future update(String sql, Object... bind) { - return (Future) exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, bind); + return (Future) exec(makeSqlRequest(sql, Arrays.asList(bind))); } @Override public Future>> query(String sql, Object... bind) { - return (Future>>) exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, bind); + return (Future>>) exec(makeSqlRequest(sql, Arrays.asList(bind))); } }; } - protected class SyncOps extends AbstractTarantoolOps, Object, List> { + protected class SyncOps extends BaseClientOps> { @Override - public List exec(Code code, Object... args) { - return (List) syncGet(TarantoolClientImpl.this.exec(code, args)); - } - - @Override - public void close() { - throw new IllegalStateException("You should close TarantoolClient instead."); + protected List exec(TarantoolRequest request) { + return (List) syncGet(TarantoolClientImpl.this.exec(request)); } } - protected class FireAndForgetOps extends AbstractTarantoolOps, Object, Long> { + protected class FireAndForgetOps extends BaseClientOps { @Override - public Long exec(Code code, Object... args) { + protected Long exec(TarantoolRequest request) { if (thumbstone == null) { try { - long syncId = TarantoolClientImpl.this.syncId.incrementAndGet(); - write(code, syncId, null, args); - return syncId; + return doExec(request).getId(); } catch (Exception e) { throw new CommunicationException("Execute failed", e); } @@ -676,117 +827,16 @@ public Long exec(Code code, Object... args) { } } - @Override - public void close() { - throw new IllegalStateException("You should close TarantoolClient instead."); - } - } - protected boolean isDead(TarantoolOp future) { + protected boolean isDead(TarantoolOperation operation) { if (this.thumbstone != null) { - fail(future, new CommunicationException("Connection is dead", thumbstone)); + fail(operation, new CommunicationException("Connection is dead", thumbstone)); return true; } return false; } - protected static class TarantoolOp extends CompletableFuture { - - /** - * A task identifier used in {@link TarantoolClientImpl#futures}. - */ - private final long id; - - /** - * Tarantool binary protocol operation code. - */ - private final Code code; - - /** - * Arguments of operation. - */ - private final Object[] args; - - public TarantoolOp(long id, Code code, Object[] args) { - this.id = id; - this.code = code; - this.args = args; - } - - public long getId() { - return id; - } - - public Code getCode() { - return code; - } - - public Object[] getArgs() { - return args; - } - - @Override - public String toString() { - return "TarantoolOp{" + - "id=" + id + - ", code=" + code + - '}'; - } - - /** - * Missed in jdk8 CompletableFuture operator to limit execution - * by time. - * - * @param timeout execution timeout - * @param unit measurement unit for given timeout value - * - * @return a future on which the method is called - */ - public TarantoolOp orTimeout(long timeout, TimeUnit unit) { - if (timeout < 0) { - throw new IllegalArgumentException("Timeout cannot be negative"); - } - if (unit == null) { - throw new IllegalArgumentException("Time unit cannot be null"); - } - if (timeout == 0 || isDone()) { - return this; - } - ScheduledFuture abandonByTimeoutAction = TimeoutScheduler.EXECUTOR.schedule( - () -> { - if (!this.isDone()) { - this.completeExceptionally(new TimeoutException()); - } - }, - timeout, unit - ); - whenComplete( - (ignored, error) -> { - if (error == null && !abandonByTimeoutAction.isDone()) { - abandonByTimeoutAction.cancel(false); - } - } - ); - return this; - } - - /** - * Runs timeout operation as a delayed task. - */ - static class TimeoutScheduler { - - static final ScheduledThreadPoolExecutor EXECUTOR; - - static { - EXECUTOR = - new ScheduledThreadPoolExecutor(1, new TarantoolThreadDaemonFactory("tarantoolTimeout")); - EXECUTOR.setRemoveOnCancelPolicy(true); - } - } - - } - /** * A subclass may use this as a trigger to start retries. * This method is called when state becomes ALIVE. @@ -810,10 +860,11 @@ protected final class StateHelper { static final int UNINITIALIZED = 0; static final int READING = 1; - static final int WRITING = 2; + static final int WRITING = 1 << 1; static final int ALIVE = READING | WRITING; - static final int RECONNECT = 4; - static final int CLOSED = 8; + static final int SCHEMA_UPDATING = 1 << 2; + static final int RECONNECT = 1 << 3; + static final int CLOSED = 1 << 4; private final AtomicInteger state; @@ -842,6 +893,10 @@ protected int getState() { return state.get(); } + boolean isStateSet(int mask) { + return (getState() & mask) == mask; + } + /** * Set CLOSED state, drop RECONNECT state. * @@ -852,12 +907,12 @@ protected boolean close() { int currentState = getState(); /* CLOSED is the terminal state. */ - if ((currentState & CLOSED) == CLOSED) { + if (isStateSet(CLOSED)) { return false; } - /* Drop RECONNECT, set CLOSED. */ - if (compareAndSet(currentState, (currentState & ~RECONNECT) | CLOSED)) { + /* Clear all states and set CLOSED. */ + if (compareAndSet(currentState, CLOSED)) { return true; } } @@ -877,7 +932,7 @@ protected boolean acquire(int mask) { int currentState = getState(); /* CLOSED is the terminal state. */ - if ((currentState & CLOSED) == CLOSED) { + if ((isStateSet(CLOSED))) { return false; } @@ -887,8 +942,8 @@ protected boolean acquire(int mask) { } /* Cannot move from a state to the same state. */ - if ((currentState & mask) != 0) { - throw new IllegalStateException("State is already " + mask); + if (isStateSet(mask)) { + return false; } /* Set acquired state. */ @@ -912,7 +967,8 @@ protected boolean compareAndSet(int expect, int update) { return false; } - if (update == ALIVE) { + boolean wasAlreadyAlive = (expect & ALIVE) == ALIVE; + if (!wasAlreadyAlive && (update & ALIVE) == ALIVE) { CountDownLatch latch = nextAliveLatch.getAndSet(new CountDownLatch(1)); latch.countDown(); onReconnect(); @@ -951,13 +1007,13 @@ private CountDownLatch getStateLatch(int state) { return closedLatch; } if (state == ALIVE) { - if (getState() == CLOSED) { + if (isStateSet(CLOSED)) { throw new IllegalStateException("State is CLOSED."); } CountDownLatch latch = nextAliveLatch.get(); /* It may happen so that an error is detected but the state is still alive. Wait for the 'next' alive state in such cases. */ - return (getState() == ALIVE && thumbstone == null) ? null : latch; + return (isStateSet(ALIVE) && thumbstone == null) ? null : latch; } return null; } @@ -970,7 +1026,7 @@ private CountDownLatch getStateLatch(int state) { private void awaitReconnection() throws InterruptedException { connectorLock.lock(); try { - while (getState() != StateHelper.RECONNECT) { + while (!isStateSet(RECONNECT)) { reconnectRequired.await(); } } finally { @@ -996,12 +1052,11 @@ private void trySignalForReconnection() { } - protected class ComposableAsyncOps - extends AbstractTarantoolOps, Object, CompletionStage>> { + protected class ComposableAsyncOps extends BaseClientOps>> { @Override - public CompletionStage> exec(Code code, Object... args) { - return (CompletionStage>) TarantoolClientImpl.this.exec(code, args); + protected CompletionStage> exec(TarantoolRequest request) { + return (CompletionStage>) TarantoolClientImpl.this.exec(request); } @Override @@ -1011,4 +1066,32 @@ public void close() { } + /** + * Used by internal services to ignore schema ID issues. + */ + protected class UnsafeSchemaOps extends BaseClientOps, Long>> { + + protected TupleTwo, Long> exec(TarantoolRequest request) { + long syncId = TarantoolClientImpl.this.syncId.incrementAndGet(); + TarantoolOperation operation = request.toOperation(syncId, 0L); + List result = (List) syncGet(registerOperation(operation).getResult()); + return TupleTwo.of(result, operation.getCompletedSchemaId()); + } + + } + + protected abstract class BaseClientOps extends AbstractTarantoolOps { + + @Override + protected TarantoolSchemaMeta getSchemaMeta() { + return TarantoolClientImpl.this.getSchemaMeta(); + } + + @Override + public void close() { + throw new IllegalStateException("You should close TarantoolClient instead."); + } + + } + } diff --git a/src/main/java/org/tarantool/TarantoolClientOps.java b/src/main/java/org/tarantool/TarantoolClientOps.java index 69ab3a9e..9a466c44 100644 --- a/src/main/java/org/tarantool/TarantoolClientOps.java +++ b/src/main/java/org/tarantool/TarantoolClientOps.java @@ -1,20 +1,41 @@ package org.tarantool; +/** + * Provides a set of typical operations with data in Tarantool. + * + * @param represents space/index identifiers (not used anymore) + * @param represents tuple keys and/or tuples + * @param

represents tuples and/or update operations + * @param represents an operation result + */ +public interface TarantoolClientOps { + R select(Integer space, Integer index, O key, int offset, int limit, int iterator); -public interface TarantoolClientOps { - R select(T space, T index, O key, int offset, int limit, int iterator); + R select(String space, String index, O key, int offset, int limit, int iterator); - R select(T space, T index, O key, int offset, int limit, Iterator iterator); + R select(Integer space, Integer index, O key, int offset, int limit, Iterator iterator); - R insert(T space, O tuple); + R select(String space, String index, O key, int offset, int limit, Iterator iterator); - R replace(T space, O tuple); + R insert(Integer space, O tuple); - R update(T space, O key, P... tuple); + R insert(String space, O tuple); - R upsert(T space, O key, O defTuple, P... ops); + R replace(Integer space, O tuple); - R delete(T space, O key); + R replace(String space, O tuple); + + R update(Integer space, O key, P... tuple); + + R update(String space, O key, P... tuple); + + R upsert(Integer space, O key, O defTuple, P... ops); + + R upsert(String space, O key, O defTuple, P... ops); + + R delete(Integer space, O key); + + R delete(String space, O key); R call(String function, Object... args); diff --git a/src/main/java/org/tarantool/TarantoolClusterClient.java b/src/main/java/org/tarantool/TarantoolClusterClient.java index e3697a91..fff54e95 100644 --- a/src/main/java/org/tarantool/TarantoolClusterClient.java +++ b/src/main/java/org/tarantool/TarantoolClusterClient.java @@ -16,7 +16,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; @@ -39,14 +38,13 @@ public class TarantoolClusterClient extends TarantoolClientImpl { /** * Discovery activity. */ - private ScheduledExecutorService instancesDiscoveryExecutor; private Runnable instancesDiscovererTask; private StampedLock discoveryLock = new StampedLock(); /** * Collection of operations to be retried. */ - private ConcurrentHashMap> retries = new ConcurrentHashMap<>(); + private ConcurrentHashMap retries = new ConcurrentHashMap<>(); /** * Constructs a new cluster client. @@ -74,14 +72,12 @@ public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannel if (StringUtils.isNotBlank(config.clusterDiscoveryEntryFunction)) { this.instancesDiscovererTask = createDiscoveryTask(new TarantoolClusterStoredFunctionDiscoverer(config, this)); - this.instancesDiscoveryExecutor - = Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantoolDiscoverer")); int delay = config.clusterDiscoveryDelayMillis > 0 ? config.clusterDiscoveryDelayMillis : TarantoolClusterClientConfig.DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS; // todo: it's better to start a job later (out of ctor) - this.instancesDiscoveryExecutor.scheduleWithFixedDelay( + this.workExecutor.scheduleWithFixedDelay( this.instancesDiscovererTask, 0, delay, @@ -91,73 +87,50 @@ public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannel } @Override - protected boolean isDead(TarantoolOp future) { + protected boolean isDead(TarantoolOperation operation) { if ((state.getState() & StateHelper.CLOSED) != 0) { - future.completeExceptionally(new CommunicationException("Connection is dead", thumbstone)); + operation.getResult().completeExceptionally(new CommunicationException("Connection is dead", thumbstone)); return true; } Exception err = thumbstone; if (err != null) { - return checkFail(future, err); + return checkFail(operation, err); } return false; } - @Override - protected TarantoolOp doExec(long timeoutMillis, Code code, Object[] args) { - validateArgs(args); - long sid = syncId.incrementAndGet(); - TarantoolOp future = makeNewOperation(timeoutMillis, sid, code, args); - return registerOperation(future); - } - /** * Registers a new async operation which will be resolved later. * Registration is discovery-aware in term of synchronization and * it may be blocked util the discovery finishes its work. * - * @param future operation to be performed + * @param operation operation to be performed * * @return registered operation */ - private TarantoolOp registerOperation(TarantoolOp future) { + @Override + protected TarantoolOperation registerOperation(TarantoolOperation operation) { long stamp = discoveryLock.readLock(); try { - if (isDead(future)) { - return future; - } - futures.put(future.getId(), future); - if (isDead(future)) { - futures.remove(future.getId()); - return future; - } - - try { - write(future.getCode(), future.getId(), null, future.getArgs()); - } catch (Exception e) { - futures.remove(future.getId()); - fail(future, e); - } - - return future; + return super.registerOperation(operation); } finally { discoveryLock.unlock(stamp); } } @Override - protected void fail(TarantoolOp future, Exception e) { - checkFail(future, e); + protected void fail(TarantoolOperation operation, Exception cause) { + checkFail(operation, cause); } - protected boolean checkFail(TarantoolOp future, Exception e) { - if (!isTransientError(e)) { - future.completeExceptionally(e); + protected boolean checkFail(TarantoolOperation operation, Exception cause) { + if (!isTransientError(cause)) { + operation.getResult().completeExceptionally(cause); return true; } else { assert retries != null; - retries.put(future.getId(), future); - LOGGER.trace("Request {0} was delayed because of {1}", future, e); + retries.put(operation.getId(), operation); + LOGGER.trace("Request {0} was delayed because of {1}", operation, cause); return false; } } @@ -166,17 +139,13 @@ protected boolean checkFail(TarantoolOp future, Exception e) { protected void close(Exception e) { super.close(e); - if (instancesDiscoveryExecutor != null) { - instancesDiscoveryExecutor.shutdownNow(); - } - if (retries == null) { // May happen within constructor. return; } - for (TarantoolOp op : retries.values()) { - op.completeExceptionally(e); + for (TarantoolOperation operation : retries.values()) { + operation.getResult().completeExceptionally(e); } } @@ -199,23 +168,24 @@ protected void onReconnect() { // First call is before the constructor finished. Skip it. return; } - Collection> delayed = new ArrayList<>(retries.values()); - Collection> reissued = new ArrayList<>(retries.size()); + Collection delayed = new ArrayList<>(retries.values()); + Collection reissued = new ArrayList<>(retries.size()); retries.clear(); - for (final TarantoolOp future : delayed) { - if (!future.isDone()) { - executor.execute(() -> registerOperation(future)); - reissued.add(future); + for (final TarantoolOperation operation : delayed) { + if (!operation.getResult().isDone()) { + operation.setSentSchemaId(schemaMeta.getSchemaVersion()); + executor.execute(() -> registerOperation(operation)); + reissued.add(operation); } } - for (final TarantoolOp future : reissued) { - LOGGER.trace("{0} was re-issued after reconnection", future); + for (final TarantoolOperation operation : reissued) { + LOGGER.trace("{0} was re-issued after reconnection", operation); } } @Override - protected void complete(TarantoolPacket packet, TarantoolOp future) { - super.complete(packet, future); + protected void complete(TarantoolPacket packet, TarantoolOperation operation) { + super.complete(packet, operation); RefreshableSocketProvider provider = getRefreshableSocketProvider(); if (provider != null) { renewConnectionIfRequired(provider.getAddresses()); @@ -288,7 +258,6 @@ public synchronized void run() { onInstancesRefreshed(lastInstances); } } catch (Exception ignored) { - ignored.getCause(); // no-op } } diff --git a/src/main/java/org/tarantool/TarantoolConnection.java b/src/main/java/org/tarantool/TarantoolConnection.java index 09883bc0..0bdca220 100644 --- a/src/main/java/org/tarantool/TarantoolConnection.java +++ b/src/main/java/org/tarantool/TarantoolConnection.java @@ -2,6 +2,8 @@ import org.tarantool.protocol.ProtoUtils; import org.tarantool.protocol.TarantoolPacket; +import org.tarantool.schema.TarantoolSchemaException; +import org.tarantool.schema.TarantoolSchemaMeta; import java.io.IOException; import java.io.InputStream; @@ -27,11 +29,17 @@ public TarantoolConnection(String username, String password, Socket socket) thro } @Override - protected List exec(Code code, Object... args) { - TarantoolPacket responsePacket = writeAndRead(code, args); + protected List exec(TarantoolRequest request) { + Object[] args = request.getArguments().toArray(); + TarantoolPacket responsePacket = writeAndRead(request.getCode(), args); return (List) responsePacket.getBody().get(Key.DATA.getId()); } + @Override + protected TarantoolSchemaMeta getSchemaMeta() { + throw new TarantoolSchemaException("Schema operations are not supported."); + } + protected TarantoolPacket writeAndRead(Code code, Object... args) { try { ByteBuffer packet = ProtoUtils.createPacket(initialRequestSize, msgPackLite, @@ -44,7 +52,7 @@ protected TarantoolPacket writeAndRead(Code code, Object... args) { Long c = responsePacket.getCode(); if (c != 0) { - throw serverError(c, responsePacket.getBody().get(Key.ERROR.getId())); + throw serverError(c, responsePacket.getError()); } return responsePacket; diff --git a/src/main/java/org/tarantool/TarantoolException.java b/src/main/java/org/tarantool/TarantoolException.java index e7d38e82..10afdfb8 100644 --- a/src/main/java/org/tarantool/TarantoolException.java +++ b/src/main/java/org/tarantool/TarantoolException.java @@ -1,5 +1,11 @@ package org.tarantool; +import static org.tarantool.protocol.ProtoConstants.ERR_LOADING; +import static org.tarantool.protocol.ProtoConstants.ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY; +import static org.tarantool.protocol.ProtoConstants.ERR_READONLY; +import static org.tarantool.protocol.ProtoConstants.ERR_TIMEOUT; +import static org.tarantool.protocol.ProtoConstants.ERR_WRONG_SCHEMA_VERSION; + /** * A remote server error with error code and message. * @@ -7,11 +13,6 @@ * @version $Id: $ */ public class TarantoolException extends RuntimeException { - /* taken from src/box/errcode.h */ - public static final int ERR_READONLY = 7; - public static final int ERR_TIMEOUT = 78; - public static final int ERR_LOADING = 116; - public static final int ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY = 128; private static final long serialVersionUID = 1L; long code; @@ -60,6 +61,7 @@ public boolean isTransient() { switch ((int) code) { case ERR_READONLY: case ERR_TIMEOUT: + case ERR_WRONG_SCHEMA_VERSION: case ERR_LOADING: case ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY: return true; diff --git a/src/main/java/org/tarantool/TarantoolOperation.java b/src/main/java/org/tarantool/TarantoolOperation.java new file mode 100644 index 00000000..0707d920 --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolOperation.java @@ -0,0 +1,183 @@ +package org.tarantool; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * Describes an internal state of a registered request. + */ +public class TarantoolOperation implements Comparable { + + /** + * A operation identifier. + */ + private final long id; + + /** + * Tarantool binary protocol operation code. + */ + private final Code code; + + /** + * Schema ID when this operation was registered. + */ + private long sentSchemaId; + + /** + * Schema ID when this operation was completed. + */ + private long completedSchemaId; + + /** + * Arguments of operation. + */ + private final List arguments; + + /** + * Future request result. + */ + private final CompletableFuture result = new CompletableFuture<>(); + + /** + * Operation timeout. + */ + private final Duration timeout; + + /** + * Optional operation which is used for + * schema synchronization purposes. + */ + private TarantoolOperation dependedOperation; + + public TarantoolOperation(Code code, + List arguments, + long id, + long schemaId, + Duration timeout) { + this.id = id; + this.sentSchemaId = schemaId; + this.code = Objects.requireNonNull(code); + this.arguments = new ArrayList<>(arguments); + this.timeout = timeout; + setupTimeout(timeout); + } + + public TarantoolOperation(Code code, + List arguments, + long id, + long schemaId, + Duration timeout, + TarantoolOperation dependedOperation) { + this.id = id; + this.sentSchemaId = schemaId; + this.code = Objects.requireNonNull(code); + this.arguments = new ArrayList<>(arguments); + this.timeout = timeout; + this.dependedOperation = dependedOperation; + setupTimeout(timeout); + } + + public long getId() { + return id; + } + + public long getSentSchemaId() { + return sentSchemaId; + } + + public void setSentSchemaId(long sentSchemaId) { + this.sentSchemaId = sentSchemaId; + } + + public long getCompletedSchemaId() { + return completedSchemaId; + } + + public void setCompletedSchemaId(long completedSchemaId) { + this.completedSchemaId = completedSchemaId; + } + + public CompletableFuture getResult() { + return result; + } + + public Code getCode() { + return code; + } + + public TarantoolOperation getDependedOperation() { + return dependedOperation; + } + + public Duration getTimeout() { + return timeout; + } + + /** + * Serializability means this requests is capable being + * translated in a binary packet according to {@code iproto} + * protocol. + * + * @return {@literal true} if this request is serializable + */ + public boolean isSerializable() { + return arguments.stream().allMatch(TarantoolRequestArgument::isSerializable); + } + + public List getArguments() { + return arguments.stream().map(TarantoolRequestArgument::getValue).collect(Collectors.toList()); + } + + @Override + public int compareTo(TarantoolOperation other) { + return Long.compareUnsigned(this.id, other.id); + } + + private void setupTimeout(Duration duration) { + if (duration == null) { + return; + } + if (duration.isNegative()) { + throw new IllegalArgumentException("Timeout cannot be negative"); + } + if (duration.isZero() || result.isDone()) { + return; + } + ScheduledFuture abandonByTimeoutAction = TimeoutScheduler.EXECUTOR.schedule( + () -> { + if (!result.isDone()) { + result.completeExceptionally(new TimeoutException()); + } + }, + duration.toMillis(), TimeUnit.MILLISECONDS + ); + result.whenComplete((ignored, error) -> { + if (error == null && !abandonByTimeoutAction.isDone()) { + abandonByTimeoutAction.cancel(false); + } + }); + } + + /** + * Runs timeout operation as a delayed task. + */ + static class TimeoutScheduler { + static final ScheduledThreadPoolExecutor EXECUTOR; + + static { + EXECUTOR = new ScheduledThreadPoolExecutor( + 1, new TarantoolThreadDaemonFactory("tarantoolTimeout") + ); + EXECUTOR.setRemoveOnCancelPolicy(true); + } + } + +} diff --git a/src/main/java/org/tarantool/TarantoolRequest.java b/src/main/java/org/tarantool/TarantoolRequest.java new file mode 100644 index 00000000..190f7dae --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolRequest.java @@ -0,0 +1,86 @@ +package org.tarantool; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Describes a static request parameters. + */ +public class TarantoolRequest { + + /** + * Tarantool binary protocol operation code. + */ + private Code code; + + /** + * Arguments of operation. + */ + private List arguments; + + /** + * Request timeout start just after initialization. + */ + private Duration timeout; + + public TarantoolRequest(Code code) { + this.code = code; + this.arguments = new ArrayList<>(); + } + + public TarantoolRequest(Code code, TarantoolRequestArgument... arguments) { + this.code = code; + this.arguments = Arrays.asList(arguments); + } + + /** + * Initializes an operation and starts its timer. + * + * @param sid internal request id + * @param schemaId schema version + */ + TarantoolOperation toOperation(long sid, long schemaId) { + return new TarantoolOperation(code, arguments, sid, schemaId, timeout); + } + + /** + * Initializes a preflight operation that + * will be processed before the dependent. + * + * @param sid internal request id + * @param schemaId schema version + * @param operation depended operation + */ + TarantoolOperation toPreflightOperation(long sid, long schemaId, TarantoolOperation operation) { + return new TarantoolOperation(code, arguments, sid, schemaId, timeout, operation); + } + + + public Code getCode() { + return code; + } + + public void setCode(Code code) { + this.code = code; + } + + public Duration getTimeout() { + return timeout; + } + + public void setTimeout(Duration timeout) { + this.timeout = timeout; + } + + public List getArguments() { + return arguments.stream().map(TarantoolRequestArgument::getValue).collect(Collectors.toList()); + } + + public void addArguments(TarantoolRequestArgument... arguments) { + this.arguments.addAll(Arrays.asList(arguments)); + } + +} diff --git a/src/main/java/org/tarantool/TarantoolRequestArgument.java b/src/main/java/org/tarantool/TarantoolRequestArgument.java new file mode 100644 index 00000000..6ff11c21 --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolRequestArgument.java @@ -0,0 +1,23 @@ +package org.tarantool; + +/** + * Holds a request argument value. + */ +public interface TarantoolRequestArgument { + + /** + * Flag indicating that held value can be + * represented as bytes supported by iproto. + * + * @return {@literal true} if value is {@code iproto} serializable + */ + boolean isSerializable(); + + /** + * Gets a held value. + * + * @return wrapped value + */ + Object getValue(); + +} diff --git a/src/main/java/org/tarantool/TarantoolRequestArgumentFactory.java b/src/main/java/org/tarantool/TarantoolRequestArgumentFactory.java new file mode 100644 index 00000000..8d602e79 --- /dev/null +++ b/src/main/java/org/tarantool/TarantoolRequestArgumentFactory.java @@ -0,0 +1,79 @@ +package org.tarantool; + +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Request argument factory. + * + * @see TarantoolRequestArgument + */ +public class TarantoolRequestArgumentFactory { + + private TarantoolRequestArgumentFactory() { + } + + public static TarantoolRequestArgument value(Object value) { + return new SimpleArgument(value); + } + + public static TarantoolRequestArgument cacheLookupValue(Supplier supplier) { + return new LookupArgument(supplier); + } + + /** + * Simple wrapper that holds the original value. + */ + private static class SimpleArgument implements TarantoolRequestArgument { + + private Object value; + + SimpleArgument(Object value) { + Objects.requireNonNull(value); + this.value = value; + } + + @Override + public boolean isSerializable() { + return true; + } + + @Override + public Object getValue() { + return value; + } + + } + + /** + * Wrapper that evaluates the value each time + * it is requested. + *

+ * It works like a function, where {@code argument = f(key)}. + */ + private static class LookupArgument implements TarantoolRequestArgument { + + Supplier lookup; + + LookupArgument(Supplier lookup) { + this.lookup = Objects.requireNonNull(lookup); + } + + @Override + public boolean isSerializable() { + try { + lookup.get(); + } catch (Exception ignored) { + return false; + } + return true; + } + + @Override + public synchronized Object getValue() { + return lookup.get(); + } + + } + +} diff --git a/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java index 3b79819e..052ff4ba 100644 --- a/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java +++ b/src/main/java/org/tarantool/cluster/TarantoolClusterStoredFunctionDiscoverer.java @@ -1,11 +1,12 @@ package org.tarantool.cluster; -import org.tarantool.TarantoolClient; +import org.tarantool.TarantoolClientImpl; import org.tarantool.TarantoolClientOps; import org.tarantool.TarantoolClusterClientConfig; import org.tarantool.logging.Logger; import org.tarantool.logging.LoggerFactory; import org.tarantool.util.StringUtils; +import org.tarantool.util.TupleTwo; import java.util.LinkedHashSet; import java.util.List; @@ -14,34 +15,37 @@ /** * A cluster nodes discoverer based on calling a predefined function * which returns list of nodes. - * + *

* The function has to have no arguments and return list of * the strings which follow host[:port] format + *

+ * This class is not a part of public API. */ public class TarantoolClusterStoredFunctionDiscoverer implements TarantoolClusterDiscoverer { private static final Logger LOGGER = LoggerFactory.getLogger(TarantoolClusterStoredFunctionDiscoverer.class); - private TarantoolClient client; + private TarantoolClientImpl client; private String entryFunction; - public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, TarantoolClient client) { + public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, + TarantoolClientImpl client) { this.client = client; this.entryFunction = clientConfig.clusterDiscoveryEntryFunction; } @Override public Set getInstances() { - TarantoolClientOps, Object, List> syncOperations = client.syncOps(); + TarantoolClientOps, Object, TupleTwo, Long>> syncOperations = client.unsafeSchemaOps(); - List list = syncOperations.call(entryFunction); + TupleTwo, Long> result = syncOperations.call(entryFunction); // discoverer expects a single array result from the function now; // in order to protect this contract the discoverer does a strict // validation against the data returned; // this strict-mode allows us to extend the contract in a non-breaking // way for old clients just reserve an extra return value in // terms of Lua multi-result support.; - return checkAndFilterAddresses(list); + return checkAndFilterAddresses(result.getFirst()); } /** diff --git a/src/main/java/org/tarantool/jdbc/SQLConnection.java b/src/main/java/org/tarantool/jdbc/SQLConnection.java index 15f3258a..327d0d69 100644 --- a/src/main/java/org/tarantool/jdbc/SQLConnection.java +++ b/src/main/java/org/tarantool/jdbc/SQLConnection.java @@ -1,12 +1,12 @@ package org.tarantool.jdbc; -import org.tarantool.Code; import org.tarantool.CommunicationException; -import org.tarantool.Key; import org.tarantool.SocketChannelProvider; import org.tarantool.SqlProtoUtils; import org.tarantool.TarantoolClientConfig; import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolOperation; +import org.tarantool.TarantoolRequest; import org.tarantool.protocol.TarantoolPacket; import org.tarantool.util.JdbcConstants; import org.tarantool.util.SQLStates; @@ -32,6 +32,8 @@ import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; @@ -538,8 +541,8 @@ public SQLBatchResultHolder executeBatch(long timeout, List quer checkNotClosed(); SQLTarantoolClientImpl.SQLRawOps sqlOps = client.sqlRawOps(); SQLBatchResultHolder batchResult = useNetworkTimeout(timeout) - ? sqlOps.executeBatch(queries) - : sqlOps.executeBatch(timeout, queries); + ? sqlOps.executeBatch(queries) + : sqlOps.executeBatch(timeout, queries); return batchResult; } @@ -734,13 +737,13 @@ private static String formatError(SQLQueryHolder query) { static class SQLTarantoolClientImpl extends TarantoolClientImpl { private Future executeQuery(SQLQueryHolder queryHolder) { - return exec(Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams()); + return exec(makeSqlRequest(queryHolder.getQuery(), queryHolder.getParams())); } private Future executeQuery(SQLQueryHolder queryHolder, long timeoutMillis) { - return exec( - timeoutMillis, Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams() - ); + TarantoolRequest request = makeSqlRequest(queryHolder.getQuery(), queryHolder.getParams()); + request.setTimeout(Duration.of(timeoutMillis, ChronoUnit.MILLIS)); + return exec(request); } final SQLRawOps sqlRawOps = new SQLRawOps() { @@ -809,12 +812,12 @@ SQLRawOps sqlRawOps() { } @Override - protected void completeSql(TarantoolOp future, TarantoolPacket pack) { + protected void completeSql(TarantoolOperation operation, TarantoolPacket pack) { Long rowCount = SqlProtoUtils.getSQLRowCount(pack); SQLResultHolder result = (rowCount == null) ? SQLResultHolder.ofQuery(SqlProtoUtils.getSQLMetadata(pack), SqlProtoUtils.getSQLData(pack)) : SQLResultHolder.ofUpdate(rowCount.intValue(), SqlProtoUtils.getSQLAutoIncrementIds(pack)); - ((TarantoolOp) future).complete(result); + ((CompletableFuture) operation.getResult()).complete(result); } interface SQLRawOps { diff --git a/src/main/java/org/tarantool/protocol/ProtoConstants.java b/src/main/java/org/tarantool/protocol/ProtoConstants.java new file mode 100644 index 00000000..daee4dd7 --- /dev/null +++ b/src/main/java/org/tarantool/protocol/ProtoConstants.java @@ -0,0 +1,19 @@ +package org.tarantool.protocol; + +public class ProtoConstants { + + private ProtoConstants() { + } + + public static final long ERROR_TYPE_MARKER = 0x8000; + + public static final long SUCCESS = 0x0; + + /* taken from src/box/errcode.h */ + public static final int ERR_READONLY = 7; + public static final int ERR_TIMEOUT = 78; + public static final int ERR_WRONG_SCHEMA_VERSION = 109; + public static final int ERR_LOADING = 116; + public static final int ERR_LOCAL_INSTANCE_ID_IS_READ_ONLY = 128; + +} diff --git a/src/main/java/org/tarantool/protocol/ProtoUtils.java b/src/main/java/org/tarantool/protocol/ProtoUtils.java index 7b1a12be..51481d38 100644 --- a/src/main/java/org/tarantool/protocol/ProtoUtils.java +++ b/src/main/java/org/tarantool/protocol/ProtoUtils.java @@ -213,7 +213,7 @@ private static void assertCorrectWelcome(String firstLine, SocketAddress remoteA private static void assertNoErrCode(TarantoolPacket authResponse) { Long code = (Long) authResponse.getHeaders().get(Key.CODE.getId()); if (code != 0) { - Object error = authResponse.getBody().get(Key.ERROR.getId()); + Object error = authResponse.getError(); String errorMsg = error instanceof String ? (String) error : new String((byte[]) error); throw new TarantoolException(code, errorMsg); } @@ -305,7 +305,22 @@ public static ByteBuffer createPacket(int initialRequestSize, return buffer; } + /** + * Extracts an error code. + * + * @param code in 0x8XXX format + * + * @return actual error code (which is a XXX part) + */ + public static long extractErrorCode(long code) { + if ((code & ProtoConstants.ERROR_TYPE_MARKER) == 0) { + throw new IllegalArgumentException(String.format("Code %h does not follow 0x8XXX format", code)); + } + return (~ProtoConstants.ERROR_TYPE_MARKER & code); + } + private static class ByteArrayOutputStream extends java.io.ByteArrayOutputStream { + public ByteArrayOutputStream(int size) { super(size); } @@ -313,6 +328,7 @@ public ByteArrayOutputStream(int size) { ByteBuffer toByteBuffer() { return ByteBuffer.wrap(buf, 0, count); } + } } diff --git a/src/main/java/org/tarantool/protocol/TarantoolPacket.java b/src/main/java/org/tarantool/protocol/TarantoolPacket.java index ca131177..ca661ed9 100644 --- a/src/main/java/org/tarantool/protocol/TarantoolPacket.java +++ b/src/main/java/org/tarantool/protocol/TarantoolPacket.java @@ -29,8 +29,9 @@ public Long getCode() { potenticalCode != null ? potenticalCode.getClass().toString() : "null" ); } + Long code = (Long) potenticalCode; - return (Long) potenticalCode; + return code == 0 ? code : ProtoUtils.extractErrorCode(code); } public Long getSync() { @@ -48,4 +49,16 @@ public Map getBody() { public boolean hasBody() { return body != null && body.size() > 0; } + + public long getSchemaId() { + return (Long) headers.get(Key.SCHEMA_ID.getId()); + } + + public Object getData() { + return hasBody() ? body.get(Key.DATA.getId()) : null; + } + + public Object getError() { + return hasBody() ? body.get(Key.ERROR.getId()) : null; + } } diff --git a/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java b/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java new file mode 100644 index 00000000..cb468d7e --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolIndexMeta.java @@ -0,0 +1,128 @@ +package org.tarantool.schema; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Keeps a space index metadata. + */ +public class TarantoolIndexMeta { + + public static final int VINDEX_IID_FIELD_NUMBER = 1; + public static final int VINDEX_NAME_FIELD_NUMBER = 2; + public static final int VINDEX_TYPE_FIELD_NUMBER = 3; + public static final int VINDEX_OPTIONS_FIELD_NUMBER = 4; + public static final int VINDEX_PARTS_FIELD_NUMBER = 5; + + public static final int VINDEX_PART_FIELD = 0; + public static final int VINDEX_PART_TYPE = 1; + + private final int id; + private final String name; + private final String type; + private final IndexOptions options; + private final List parts; + + public TarantoolIndexMeta(int id, + String name, + String type, + IndexOptions options, + List parts) { + this.id = id; + this.name = name; + this.type = type; + this.options = options; + this.parts = parts; + } + + public static TarantoolIndexMeta fromTuple(List tuple) { + Map optionsMap = (Map) tuple.get(VINDEX_OPTIONS_FIELD_NUMBER); + + List parts = Collections.emptyList(); + List partsTuple = (List) tuple.get(VINDEX_PARTS_FIELD_NUMBER); + if (!partsTuple.isEmpty()) { + // simplified index parts as an array + // (when the parts don't use collation and is_nullable options) + if (partsTuple.get(0) instanceof List) { + parts = ((List>) partsTuple) + .stream() + .map(part -> new IndexPart( + (Integer) part.get(VINDEX_PART_FIELD), + (String) part.get(VINDEX_PART_TYPE) + ) + ) + .collect(Collectors.toList()); + } else if (partsTuple.get(0) instanceof Map) { + parts = ((List>) partsTuple) + .stream() + .map(part -> new IndexPart((Integer) part.get("field"), (String) part.get("type"))) + .collect(Collectors.toList()); + } + } + + return new TarantoolIndexMeta( + (Integer) tuple.get(VINDEX_IID_FIELD_NUMBER), + (String) tuple.get(VINDEX_NAME_FIELD_NUMBER), + (String) tuple.get(VINDEX_TYPE_FIELD_NUMBER), + new IndexOptions((Boolean) optionsMap.get("unique")), + parts + ); + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public IndexOptions getOptions() { + return options; + } + + public List getParts() { + return parts; + } + + public static class IndexOptions { + + private final boolean unique; + + public IndexOptions(boolean unique) { + this.unique = unique; + } + + public boolean isUnique() { + return unique; + } + + } + + public static class IndexPart { + + private final int fieldNumber; + private final String type; + + public IndexPart(int fieldNumber, String type) { + this.fieldNumber = fieldNumber; + this.type = type; + } + + public int getFieldNumber() { + return fieldNumber; + } + + public String getType() { + return type; + } + + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java b/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java new file mode 100644 index 00000000..49406fae --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolIndexNotFoundException.java @@ -0,0 +1,16 @@ +package org.tarantool.schema; + +public class TarantoolIndexNotFoundException extends TarantoolSchemaException { + + private final String indexName; + + public TarantoolIndexNotFoundException(String targetSpace, String indexName) { + super(targetSpace); + this.indexName = indexName; + } + + public String getIndexName() { + return indexName; + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolMetaSpacesCache.java b/src/main/java/org/tarantool/schema/TarantoolMetaSpacesCache.java new file mode 100644 index 00000000..8a800e38 --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolMetaSpacesCache.java @@ -0,0 +1,122 @@ +package org.tarantool.schema; + +import org.tarantool.Iterator; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolClientOps; +import org.tarantool.util.TupleTwo; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * In-memory schema cache. + *

+ * Caches meta spaces {@code _vspace} and {@code _vindex}. + *

+ * This class is not a part of public API. + */ +public class TarantoolMetaSpacesCache implements TarantoolSchemaMeta { + + private static final int VSPACE_ID = 281; + private static final int VSPACE_ID_INDEX_ID = 0; + + private static final int VINDEX_ID = 289; + private static final int VINDEX_ID_INDEX_ID = 0; + + /** + * Describes the theoretical maximum tuple size + * which is (2^31 - 1) (box.schema.SPACE_MAX) + */ + private static final int MAX_TUPLES = 2_147_483_647; + + private TarantoolClientImpl client; + + private volatile Map cachedSpaces = Collections.emptyMap(); + private volatile long schemaVersion; + + public TarantoolMetaSpacesCache(TarantoolClientImpl client) { + this.client = client; + } + + @Override + public TarantoolSpaceMeta getSpace(String spaceName) { + TarantoolSpaceMeta space = cachedSpaces.get(spaceName); + if (space == null) { + throw new TarantoolSpaceNotFoundException(spaceName); + } + return space; + } + + @Override + public TarantoolIndexMeta getSpaceIndex(String spaceName, String indexName) { + TarantoolIndexMeta index = getSpace(spaceName).getIndex(indexName); + if (index == null) { + throw new TarantoolIndexNotFoundException(spaceName, indexName); + } + return index; + } + + @Override + public long getSchemaVersion() { + return schemaVersion; + } + + @Override + public synchronized long refresh() { + TupleTwo, Long> result = fetchSpaces(); + cachedSpaces = result.getFirst() + .stream() + .collect( + Collectors.toConcurrentMap( + TarantoolSpaceMeta::getName, + Function.identity(), + (oldValue, newValue) -> newValue, + ConcurrentHashMap::new + ) + ); + return schemaVersion = result.getSecond(); + } + + @Override + public boolean isInitialized() { + return schemaVersion != 0; + } + + private TupleTwo, Long> fetchSpaces() { + TarantoolClientOps, Object, TupleTwo, Long>> clientOps = client.unsafeSchemaOps(); + + long firstRequestSchema = -1; + long secondRequestSchema = 0; + List spaces = null; + List indexes = null; + while (firstRequestSchema != secondRequestSchema) { + TupleTwo, Long> spacesResult = clientOps + .select(VSPACE_ID, VSPACE_ID_INDEX_ID, Collections.emptyList(), 0, Integer.MAX_VALUE, Iterator.ALL); + TupleTwo, Long> indexesResult = clientOps + .select(VINDEX_ID, VINDEX_ID_INDEX_ID, Collections.emptyList(), 0, Integer.MAX_VALUE, Iterator.ALL); + spaces = spacesResult.getFirst(); + indexes = indexesResult.getFirst(); + firstRequestSchema = spacesResult.getSecond(); + secondRequestSchema = indexesResult.getSecond(); + } + + Map>> indexesBySpace = indexes.stream() + .map(tuple -> (List) tuple) + .collect(Collectors.groupingBy(tuple -> (Integer) tuple.get(0))); + + List cachedMeta = spaces.stream() + .map(tuple -> (List) tuple) + .map(tuple -> TarantoolSpaceMeta.fromTuple( + tuple, + indexesBySpace.getOrDefault((Integer) tuple.get(0), Collections.emptyList())) + ) + .collect(Collectors.toList()); + + return TupleTwo.of(cachedMeta, firstRequestSchema); + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSchemaException.java b/src/main/java/org/tarantool/schema/TarantoolSchemaException.java new file mode 100644 index 00000000..877f6c6b --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSchemaException.java @@ -0,0 +1,15 @@ +package org.tarantool.schema; + +public class TarantoolSchemaException extends RuntimeException { + + private final String schemaName; + + public TarantoolSchemaException(String schemaName) { + this.schemaName = schemaName; + } + + public String getSchemaName() { + return schemaName; + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java b/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java new file mode 100644 index 00000000..bb66b27f --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSchemaMeta.java @@ -0,0 +1,48 @@ +package org.tarantool.schema; + +/** + * Provides Tarantool instance schema info. + */ +public interface TarantoolSchemaMeta { + + /** + * Finds a space by name if any. + * + * @param spaceName name of target space + * + * @return found space + */ + TarantoolSpaceMeta getSpace(String spaceName); + + /** + * Finds a space index by name if any. + * + * @param spaceName name of target space + * @param indexName name of target index + * + * @return found index meta + */ + TarantoolIndexMeta getSpaceIndex(String spaceName, String indexName); + + /** + * Gets current schema version that is cached. + * + * @return current version + */ + long getSchemaVersion(); + + /** + * Fetches schema metadata. + * + * @return fetched schema metadata version + */ + long refresh(); + + /** + * Checks whether a schema fully cached or not. + * + * @return {@literal true} if the schema is cached at least once + */ + boolean isInitialized(); + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java b/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java new file mode 100644 index 00000000..0fd4b214 --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSpaceMeta.java @@ -0,0 +1,99 @@ +package org.tarantool.schema; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Keeps a space metadata. + */ +public class TarantoolSpaceMeta { + + public static final int VSPACE_ID_FIELD_NUMBER = 0; + public static final int VSPACE_NAME_FIELD_NUMBER = 2; + public static final int VSPACE_ENGINE_FIELD_NUMBER = 3; + public static final int VSPACE_FORMAT_FIELD_NUMBER = 6; + + private final int id; + private final String name; + private final String engine; + private final List format; + private final Map indexes; + + public static TarantoolSpaceMeta fromTuple(List spaceTuple, List> indexTuples) { + List fields = ((List>) spaceTuple.get(VSPACE_FORMAT_FIELD_NUMBER)).stream() + .map(field -> new SpaceField(field.get("name").toString(), field.get("type").toString())) + .collect(Collectors.toList()); + + Map indexesMap = indexTuples.stream() + .map(TarantoolIndexMeta::fromTuple) + .collect(Collectors.toMap(TarantoolIndexMeta::getName, Function.identity())); + + return new TarantoolSpaceMeta( + (Integer) spaceTuple.get(VSPACE_ID_FIELD_NUMBER), + spaceTuple.get(VSPACE_NAME_FIELD_NUMBER).toString(), + spaceTuple.get(VSPACE_ENGINE_FIELD_NUMBER).toString(), + Collections.unmodifiableList(fields), + Collections.unmodifiableMap(indexesMap) + ); + } + + public TarantoolSpaceMeta(int id, + String name, + String engine, + List format, + Map indexes) { + this.id = id; + this.name = name; + this.engine = engine; + this.format = format; + this.indexes = indexes; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public String getEngine() { + return engine; + } + + public List getFormat() { + return format; + } + + public Map getIndexes() { + return indexes; + } + + public TarantoolIndexMeta getIndex(String indexName) { + return indexes.get(indexName); + } + + public static class SpaceField { + + private final String name; + private final String type; + + public SpaceField(String name, String type) { + this.name = name; + this.type = type; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + } + +} diff --git a/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java b/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java new file mode 100644 index 00000000..28498e6b --- /dev/null +++ b/src/main/java/org/tarantool/schema/TarantoolSpaceNotFoundException.java @@ -0,0 +1,9 @@ +package org.tarantool.schema; + +public class TarantoolSpaceNotFoundException extends TarantoolSchemaException { + + public TarantoolSpaceNotFoundException(String spaceName) { + super(spaceName); + } + +} diff --git a/src/main/java/org/tarantool/util/TupleTwo.java b/src/main/java/org/tarantool/util/TupleTwo.java index b124e287..9bad7059 100644 --- a/src/main/java/org/tarantool/util/TupleTwo.java +++ b/src/main/java/org/tarantool/util/TupleTwo.java @@ -8,12 +8,12 @@ public class TupleTwo { private final T first; private final U second; - private TupleTwo(T first, U second) { + TupleTwo(T first, U second) { this.first = first; this.second = second; } - public static TupleTwo of(T first, U second) { + public static TupleTwo of(T first, U second) { return new TupleTwo<>(first, second); } diff --git a/src/test/java/org/tarantool/ClientAsyncOperationsIT.java b/src/test/java/org/tarantool/ClientAsyncOperationsIT.java index 087a4760..20ead6a3 100644 --- a/src/test/java/org/tarantool/ClientAsyncOperationsIT.java +++ b/src/test/java/org/tarantool/ClientAsyncOperationsIT.java @@ -7,6 +7,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.tarantool.TestAssertions.checkRawTupleResult; +import org.tarantool.schema.TarantoolIndexNotFoundException; +import org.tarantool.schema.TarantoolSpaceNotFoundException; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -27,7 +30,7 @@ /** * Class with test cases for asynchronous operations - * + *

* NOTE: Parametrized tests can be simplified after * https://github.com/junit-team/junit5/issues/878 */ @@ -120,22 +123,43 @@ void testAsyncError(AsyncOpsProvider provider) { @MethodSource("getAsyncOps") void testOperations(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { - TarantoolClientOps, Object, Future>> ops = provider.getAsyncOps(); + testHelper.executeLua( + "box.space.basic_test:insert{10, '10'}", + "box.space.basic_test:insert{20, '20'}", + "box.space.basic_test:insert{30, '30'}", + "box.space.basic_test:insert{40, '40'}", + "box.space.basic_test:insert{50, '50'}" + ); + TarantoolClientOps, Object, Future>> ops = provider.getAsyncOps(); List>> futures = new ArrayList<>(); - futures.add(ops.insert(spaceId, Arrays.asList(10, "10"))); futures.add(ops.delete(spaceId, Collections.singletonList(10))); - futures.add(ops.insert(spaceId, Arrays.asList(10, "10"))); - futures.add(ops.update(spaceId, Collections.singletonList(10), Arrays.asList("=", 1, "ten"))); - - futures.add(ops.replace(spaceId, Arrays.asList(20, "20"))); - futures.add(ops.upsert(spaceId, Collections.singletonList(20), Arrays.asList(20, "twenty"), - Arrays.asList("=", 1, "twenty"))); - - futures.add(ops.insert(spaceId, Arrays.asList(30, "30"))); - futures.add(ops.call("box.space.basic_test:delete", Collections.singletonList(30))); + futures.add(ops.insert(spaceId, Arrays.asList(60, "60"))); + + futures.add(ops.update(spaceId, Collections.singletonList(50), Arrays.asList("=", 1, "fifty"))); + + futures.add(ops.replace(spaceId, Arrays.asList(30, "thirty"))); + futures.add(ops.replace(spaceId, Arrays.asList(70, "70"))); + + futures.add( + ops.upsert( + spaceId, + Collections.singletonList(20), + Arrays.asList(20, "20"), + Arrays.asList("=", 1, "twenty") + ) + ); + futures.add( + ops.upsert( + spaceId, + Collections.singletonList(80), + Arrays.asList(80, "80"), + Arrays.asList("=", 1, "eighty") + ) + ); + futures.add(ops.call("box.space.basic_test:delete", Collections.singletonList(40))); // Wait completion of all operations. for (Future> f : futures) { @@ -143,10 +167,14 @@ void testOperations(AsyncOpsProvider provider) } // Check the effects. - checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + assertEquals(Collections.emptyList(), consoleSelect(10)); checkRawTupleResult(consoleSelect(20), Arrays.asList(20, "twenty")); - assertEquals(consoleSelect(30), Collections.emptyList()); - + checkRawTupleResult(consoleSelect(30), Arrays.asList(30, "thirty")); + assertEquals(Collections.emptyList(), consoleSelect(40)); + checkRawTupleResult(consoleSelect(50), Arrays.asList(50, "fifty")); + checkRawTupleResult(consoleSelect(60), Arrays.asList(60, "60")); + checkRawTupleResult(consoleSelect(70), Arrays.asList(70, "70")); + checkRawTupleResult(consoleSelect(80), Arrays.asList(80, "80")); provider.close(); } @@ -185,16 +213,225 @@ void testCall(AsyncOpsProvider provider) throws ExecutionException, InterruptedE provider.close(); } + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringSelect(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + Future> result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(1, "one")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringInsert(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + Future> resultOne = provider.getAsyncOps() + .insert("basic_test", Arrays.asList(1, "one")); + + Future> resultTen = provider.getAsyncOps() + .insert("basic_test", Arrays.asList(10, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringReplace(AsyncOpsProvider provider) + throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .replace("basic_test", Arrays.asList(1, "one")); + + Future> resultTen = provider.getAsyncOps() + .replace("basic_test", Arrays.asList(10, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringDelete(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + testHelper.executeLua("box.space.basic_test:insert{20, '20'}"); + + Future> resultOne = provider.getAsyncOps() + .delete("basic_test", Collections.singletonList(1)); + + Future> resultTwenty = provider.getAsyncOps() + .delete("basic_test", Collections.singletonList(20)); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTwenty.get(TIMEOUT, TimeUnit.MILLISECONDS); + + assertEquals(Collections.emptyList(), consoleSelect(1)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "10")); + assertEquals(Collections.emptyList(), consoleSelect(20)); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringUpdate(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(1), Arrays.asList("=", 1, "one")); + + Future> resultTwo = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(2), Arrays.asList("=", 1, "two")); + + Future> resultTen = provider.getAsyncOps() + .update("basic_test", Collections.singletonList(10), Arrays.asList("=", 1, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTwo.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + assertEquals(Collections.emptyList(), consoleSelect(2)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringUpsert(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + Future> resultOne = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(1), Arrays.asList(1, "001"), Arrays.asList("=", 1, "one")); + + Future> resultTwo = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(2), Arrays.asList(2, "002"), Arrays.asList("=", 1, "two")); + + Future> resultTen = provider.getAsyncOps() + .upsert("basic_test", Collections.singletonList(10), Arrays.asList(10, "010"), + Arrays.asList("=", 1, "ten")); + + resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTwo.get(TIMEOUT, TimeUnit.MILLISECONDS); + resultTen.get(TIMEOUT, TimeUnit.MILLISECONDS); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(2), Arrays.asList(2, "002")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testStringMultipleIndirectChanges(AsyncOpsProvider provider) + throws ExecutionException, InterruptedException, TimeoutException { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + Future> result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(1, "one")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{2, 'two'}"); + + result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(2), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(2, "two")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{3, 'three'}"); + + result = provider.getAsyncOps() + .select("basic_test", "pk", Collections.singletonList(3), 0, 1, Iterator.EQ); + + assertEquals( + Collections.singletonList(Arrays.asList(3, "three")), + result.get(TIMEOUT, TimeUnit.MILLISECONDS) + ); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testUnknownSpace(AsyncOpsProvider provider) throws ExecutionException, InterruptedException, TimeoutException { + Future> resultOne = provider.getAsyncOps() + .update("basic_test_unknown", Collections.singletonList(1), Arrays.asList("=", 1, "one")); + + Exception exception = assertThrows(Exception.class, () -> resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS)); + assertTrue(exception.getCause() instanceof TarantoolSpaceNotFoundException); + + provider.close(); + } + + @ParameterizedTest + @MethodSource("getAsyncOps") + void testUnknownSpaceIndex(AsyncOpsProvider provider) { + Future> resultOne = provider.getAsyncOps() + .select("basic_test", "pk_unknown", Collections.singletonList(3), 0, 1, Iterator.EQ); + + Exception exception = assertThrows(Exception.class, () -> resultOne.get(TIMEOUT, TimeUnit.MILLISECONDS)); + assertTrue(exception.getCause() instanceof TarantoolIndexNotFoundException); + + provider.close(); + } + private List consoleSelect(Object key) { return testHelper.evaluate(TestUtils.toLuaSelect("basic_test", key)); } private interface AsyncOpsProvider { + TarantoolClientOps, Object, Future>> getAsyncOps(); TarantoolClient getClient(); void close(); + } private static class ClientAsyncOpsProvider implements AsyncOpsProvider { @@ -236,7 +473,7 @@ public TarantoolClient getClient() { @Override public void close() { - composableOps.close(); + client.close(); } } @@ -257,6 +494,11 @@ public Future> select(Integer space, Integer index, List key, int off return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); } + @Override + public Future> select(String space, String index, List key, int offset, int limit, int iterator) { + return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); + } + @Override public Future> select(Integer space, Integer index, @@ -267,31 +509,66 @@ public Future> select(Integer space, return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); } + @Override + public Future> select(String space, + String index, + List key, + int offset, + int limit, + Iterator iterator) { + return originOps.select(space, index, key, offset, limit, iterator).toCompletableFuture(); + } + @Override public Future> insert(Integer space, List tuple) { return originOps.insert(space, tuple).toCompletableFuture(); } + @Override + public Future> insert(String space, List tuple) { + return originOps.insert(space, tuple).toCompletableFuture(); + } + @Override public Future> replace(Integer space, List tuple) { return originOps.replace(space, tuple).toCompletableFuture(); } + @Override + public Future> replace(String space, List tuple) { + return originOps.replace(space, tuple).toCompletableFuture(); + } + @Override public Future> update(Integer space, List key, Object... tuple) { return originOps.update(space, key, tuple).toCompletableFuture(); } + @Override + public Future> update(String space, List key, Object... tuple) { + return originOps.update(space, key, tuple).toCompletableFuture(); + } + @Override public Future> upsert(Integer space, List key, List defTuple, Object... ops) { return originOps.upsert(space, key, defTuple, ops).toCompletableFuture(); } + @Override + public Future> upsert(String space, List key, List defTuple, Object... ops) { + return originOps.upsert(space, key, defTuple, ops).toCompletableFuture(); + } + @Override public Future> delete(Integer space, List key) { return originOps.delete(space, key).toCompletableFuture(); } + @Override + public Future> delete(String space, List key) { + return originOps.delete(space, key).toCompletableFuture(); + } + @Override public Future> call(String function, Object... args) { return originOps.call(function, args).toCompletableFuture(); @@ -311,6 +588,7 @@ public void ping() { public void close() { originOps.close(); } + } } diff --git a/src/test/java/org/tarantool/ClientOperationsIT.java b/src/test/java/org/tarantool/ClientOperationsIT.java index 1f7e1826..762c3dfb 100644 --- a/src/test/java/org/tarantool/ClientOperationsIT.java +++ b/src/test/java/org/tarantool/ClientOperationsIT.java @@ -2,6 +2,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.tarantool.TestAssertions.checkRawTupleResult; + +import org.tarantool.schema.TarantoolIndexNotFoundException; +import org.tarantool.schema.TarantoolSpaceNotFoundException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -9,6 +14,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + /** * Tests for synchronous operations of {@link TarantoolClientImpl} class. * @@ -34,11 +43,18 @@ public static void tearDownEnv() { @BeforeEach public void setUp() { + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); client = TestUtils.makeTestClient(TestUtils.makeDefaultClientConfig(), 2000); } @AfterEach public void tearDown() { + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); client.close(); } @@ -48,4 +64,173 @@ public void testClose() { assertEquals(e.getMessage(), "You should close TarantoolClient instead."); } + @Test + void testStringSelect() { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + List result = client.syncOps() + .select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + + assertEquals(Collections.singletonList(Arrays.asList(1, "one")), result); + } + + @Test + void testStringInsert() { + client.syncOps().insert("basic_test", Arrays.asList(1, "one")); + client.syncOps().insert("basic_test", Arrays.asList(10, "ten")); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + } + + @Test + void testStringReplace() { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + client.syncOps().replace("basic_test", Arrays.asList(1, "one")); + client.syncOps().replace("basic_test", Arrays.asList(10, "ten")); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + } + + @Test + void testStringDelete() { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + testHelper.executeLua("box.space.basic_test:insert{20, '20'}"); + + client.syncOps().delete("basic_test", Collections.singletonList(1)); + client.syncOps().delete("basic_test", Collections.singletonList(20)); + + assertEquals(Collections.emptyList(), consoleSelect(1)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "10")); + assertEquals(Collections.emptyList(), consoleSelect(20)); + } + + @Test + void testStringUpdate() { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + TarantoolClientOps, Object, List> clientOps = client.syncOps(); + clientOps.update("basic_test", Collections.singletonList(1), Arrays.asList("=", 1, "one")); + clientOps.update("basic_test", Collections.singletonList(2), Arrays.asList("=", 1, "two")); + clientOps.update("basic_test", Collections.singletonList(10), Arrays.asList("=", 1, "ten")); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + assertEquals(Collections.emptyList(), consoleSelect(2)); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + } + + @Test + void testStringUpsert() { + testHelper.executeLua("box.space.basic_test:insert{1, '1'}"); + testHelper.executeLua("box.space.basic_test:insert{10, '10'}"); + + TarantoolClientOps, Object, List> ops = client.syncOps(); + ops.upsert( + "basic_test", Collections.singletonList(1), + Arrays.asList(1, "001"), Arrays.asList("=", 1, "one") + ); + ops.upsert( + "basic_test", Collections.singletonList(2), + Arrays.asList(2, "002"), Arrays.asList("=", 1, "two") + ); + ops.upsert( + "basic_test", Collections.singletonList(10), + Arrays.asList(10, "010"), Arrays.asList("=", 1, "ten") + ); + + checkRawTupleResult(consoleSelect(1), Arrays.asList(1, "one")); + checkRawTupleResult(consoleSelect(2), Arrays.asList(2, "002")); + checkRawTupleResult(consoleSelect(10), Arrays.asList(10, "ten")); + } + + @Test + void testStringMultipleIndirectChanges() { + testHelper.executeLua("box.space.basic_test:insert{1, 'one'}"); + List result = client.syncOps().select("basic_test", "pk", Collections.singletonList(1), 0, 1, Iterator.EQ); + assertEquals(Collections.singletonList(Arrays.asList(1, "one")), result); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{2, 'two'}"); + result = client.syncOps().select("basic_test", "pk", Collections.singletonList(2), 0, 1, Iterator.EQ); + assertEquals(Collections.singletonList(Arrays.asList(2, "two")), result); + + testHelper.executeLua("box.space.basic_test and box.space.basic_test:drop()"); + testHelper.executeLua( + "box.schema.space.create('basic_test', { format = " + + "{{name = 'id', type = 'integer'}," + + " {name = 'val', type = 'string'} } })", + + "box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )" + ); + testHelper.executeLua("box.space.basic_test:insert{3, 'three'}"); + result = client.syncOps().select("basic_test", "pk", Collections.singletonList(3), 0, 1, Iterator.EQ); + assertEquals(Collections.singletonList(Arrays.asList(3, "three")), result); + } + + @Test + void testUnknownSpace() { + TarantoolClientOps, Object, List> clientOps = client.syncOps(); + Exception error = assertThrows( + Exception.class, + () -> clientOps.select("base_test_unknown", "pk", Collections.singletonList(12), 0, 1, Iterator.EQ) + ); + + assertTrue(error.getCause() instanceof TarantoolSpaceNotFoundException); + } + + @Test + void testUnknownSpaceIndex() { + TarantoolClientOps, Object, List> clientOps = client.syncOps(); + Exception error = assertThrows( + Exception.class, + () -> clientOps.select("basic_test", "pk_unknown", Collections.singletonList(12), 0, 1, Iterator.EQ) + ); + + assertTrue(error.getCause() instanceof TarantoolIndexNotFoundException); + } + + @Test + void testCreateSpaceAfterFailedRequest() { + TarantoolClientOps, Object, List> clientOps = client.syncOps(); + Exception error = assertThrows( + Exception.class, + () -> clientOps + .select("base_test_unknown", "pk", Collections.emptyList(), 0, 10, Iterator.ALL) + ); + assertTrue(error.getCause() instanceof TarantoolSpaceNotFoundException); + + testHelper.executeLua( + "box.schema.space.create('base_test_unknown', { format = { { name = 'id', type = 'integer' } } })", + "box.space.base_test_unknown:create_index('pk', { type = 'TREE', parts = {'id'} } )", + "box.space.base_test_unknown:insert{ 5 }" + ); + List result = clientOps + .select("base_test_unknown", "pk", Collections.emptyList(), 0, 10, Iterator.ALL); + assertEquals(Collections.singletonList(5), result.get(0)); + + error = assertThrows( + Exception.class, + () -> clientOps + .select("base_test_unknown1", "pk", Collections.emptyList(), 0, 10, Iterator.ALL) + ); + assertTrue(error.getCause() instanceof TarantoolSpaceNotFoundException); + + testHelper.executeLua("box.space.base_test_unknown:drop()"); + } + + private List consoleSelect(Object key) { + return testHelper.evaluate(TestUtils.toLuaSelect("basic_test", key)); + } + } diff --git a/src/test/java/org/tarantool/ClientReconnectClusterIT.java b/src/test/java/org/tarantool/ClientReconnectClusterIT.java index 6e1d005d..32e96963 100644 --- a/src/test/java/org/tarantool/ClientReconnectClusterIT.java +++ b/src/test/java/org/tarantool/ClientReconnectClusterIT.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.tarantool.TestUtils.findCause; import static org.tarantool.TestUtils.makeDefaultClusterClientConfig; import static org.tarantool.TestUtils.makeDiscoveryFunction; @@ -24,9 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -90,7 +91,7 @@ public void tearDownTest() { @Test @DisplayName("requests were re-issued after reconnection") public void testRetriesOnReconnect() throws ExecutionException, InterruptedException { - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); TarantoolClusterClientConfig config = makeDefaultClusterClientConfig(); config.operationExpiryTimeMillis = 3_000; TarantoolClusterClient client = new TarantoolClusterClient( @@ -104,7 +105,7 @@ public void testRetriesOnReconnect() throws ExecutionException, InterruptedExcep @Override protected void reconnect(Throwable lastError) { if (notFirst) { - tryAwait(barrier); + tryAwait(phaser, 0); } notFirst = true; super.reconnect(lastError); @@ -119,7 +120,7 @@ protected void reconnect(Throwable lastError) { futures.add(client.asyncOps().eval("return 1+3")); futures.add(client.asyncOps().eval("return 1+4")); - tryAwait(barrier); + phaser.arrive(); for (Future future : futures) { future.get(); @@ -168,7 +169,7 @@ void testUpdateExtendedNodeList() { String service1Address = "localhost:" + PORTS[0]; String service2Address = "127.0.0.1:" + PORTS[1]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddresses"; String infoFunctionScript = @@ -179,7 +180,7 @@ void testUpdateExtendedNodeList() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 0, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address ); @@ -187,7 +188,7 @@ void testUpdateExtendedNodeList() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1 }; wait for { srv1, srv2 } + tryAwait(phaser, 0); // client = { srv1 }; wait for { srv1, srv2 } expectConnected(client, spaceId, pkId); @@ -213,7 +214,7 @@ void testUpdateNarrowNodeList() { String service1Address = "localhost:" + PORTS[0]; String service2Address = "127.0.0.1:" + PORTS[1]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddresses"; String infoFunctionScript = makeDiscoveryFunction(infoFunctionName, Collections.singletonList(service1Address)); @@ -223,7 +224,7 @@ void testUpdateNarrowNodeList() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 0, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address, service2Address ); @@ -232,7 +233,7 @@ void testUpdateNarrowNodeList() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1, srv2 }; wait for { srv1 } + tryAwait(phaser, 0); // client = { srv1, srv2 }; wait for { srv1 } expectConnected(client, spaceId, pkId); @@ -397,7 +398,7 @@ void testDelayFunctionResultFetch() { String service2Address = "127.0.0.1:" + PORTS[1]; String service3Address = "localhost:" + PORTS[2]; - CyclicBarrier barrier = new CyclicBarrier(2); + Phaser phaser = new Phaser(1); String infoFunctionName = "getAddressesFunction"; String functionBody = Stream.of(service1Address, service2Address) @@ -415,7 +416,7 @@ void testDelayFunctionResultFetch() { final TarantoolClusterClient client = makeClientWithDiscoveryFeature( infoFunctionName, 3000, - (ignored) -> tryAwait(barrier), + (ignored) -> phaser.arrive(), service1Address ); @@ -423,16 +424,16 @@ void testDelayFunctionResultFetch() { final int spaceId = ids[0]; final int pkId = ids[1]; - tryAwait(barrier); // client = { srv1 }; wait for { srv1 } + tryAwait(phaser, 0); // client = { srv1 }; wait for { srv1 } expectConnected(client, spaceId, pkId); - tryAwait(barrier); // client = { srv1 }; wait for { srv2 } + tryAwait(phaser, 1); // client = { srv1 }; wait for { srv2 } stopInstancesAndAwait(SRV1); expectConnected(client, spaceId, pkId); - tryAwait(barrier); // client = { srv2 }; wait for { srv3 } + tryAwait(phaser, 2); // client = { srv2 }; wait for { srv3 } stopInstancesAndAwait(SRV2); expectConnected(client, spaceId, pkId); @@ -508,11 +509,11 @@ void testRoundRobinSocketProviderRefusedAfterConnect() { assertEquals(origin, client.getThumbstone()); } - private void tryAwait(CyclicBarrier barrier) { + private void tryAwait(Phaser phaser, int phase) { try { - barrier.await(6000, TimeUnit.MILLISECONDS); + phaser.awaitAdvanceInterruptibly(phase, 6000, TimeUnit.MILLISECONDS); } catch (Throwable e) { - e.printStackTrace(); + fail(e); } } @@ -574,6 +575,7 @@ private TarantoolClusterClient makeClientWithDiscoveryFeature(String entryFuncti Consumer> consumer, String... addresses) { TarantoolClusterClientConfig config = makeDefaultClusterClientConfig(); + config.operationExpiryTimeMillis = 3000; config.clusterDiscoveryEntryFunction = entryFunction; config.clusterDiscoveryDelayMillis = entryDelayMillis; diff --git a/src/test/java/org/tarantool/ClientReconnectIT.java b/src/test/java/org/tarantool/ClientReconnectIT.java index af2afc86..ad8a94f3 100644 --- a/src/test/java/org/tarantool/ClientReconnectIT.java +++ b/src/test/java/org/tarantool/ClientReconnectIT.java @@ -15,7 +15,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.function.Executable; import java.net.ConnectException; import java.nio.channels.SocketChannel; @@ -159,13 +158,7 @@ protected void write(Code code, Long syncId, Long schemaId, Object... args) { client.close(); - ExecutionException e = assertThrows(ExecutionException.class, new Executable() { - @Override - public void execute() throws Throwable { - res.get(); - } - }); - assertEquals("Connection is closed.", e.getCause().getMessage()); + ExecutionException e = assertThrows(ExecutionException.class, res::get); } /** @@ -189,16 +182,12 @@ protected void write(Code code, Long syncId, Long schemaId, Object... args) thro testHelper.stopInstance(); - assertThrows(ExecutionException.class, new Executable() { - @Override - public void execute() throws Throwable { - mustFail.get(); - } - }); + ExecutionException executionException = assertThrows(ExecutionException.class, mustFail::get); + assertEquals(executionException.getCause().getClass(), CommunicationException.class); + writeEnabled.set(true); testHelper.startInstance(); - writeEnabled.set(true); try { client.waitAlive(RESTART_TIMEOUT, TimeUnit.MILLISECONDS); diff --git a/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java b/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java index 97cf697a..09844e11 100644 --- a/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java +++ b/src/test/java/org/tarantool/FireAndForgetClientOperationsIT.java @@ -91,21 +91,45 @@ public void execute() throws Throwable { @Test public void testFireAndForgetOperations() { + testHelper.executeLua( + "box.space.basic_test:insert{1, '1'}", + "box.space.basic_test:insert{5, '5'}", + "box.space.basic_test:insert{10, '10'}", + "box.space.basic_test:insert{20, '20'}", + "box.space.basic_test:insert{30, '30'}" + ); + TarantoolClientOps, Object, Long> ffOps = client.fireAndForgetOps(); - Set syncIds = new HashSet(); + Set syncIds = new HashSet<>(); + + syncIds.add(ffOps.delete(spaceId, Collections.singletonList(1))); + + syncIds.add(ffOps.insert(spaceId, Arrays.asList(2, "2"))); - syncIds.add(ffOps.insert(spaceId, Arrays.asList(10, "10"))); - syncIds.add(ffOps.delete(spaceId, Collections.singletonList(10))); + syncIds.add(ffOps.replace(spaceId, Arrays.asList(3, "3"))); + syncIds.add(ffOps.replace(spaceId, Arrays.asList(5, "five"))); - syncIds.add(ffOps.insert(spaceId, Arrays.asList(10, "10"))); syncIds.add(ffOps.update(spaceId, Collections.singletonList(10), Arrays.asList("=", 1, "ten"))); - syncIds.add(ffOps.replace(spaceId, Arrays.asList(20, "20"))); - syncIds.add(ffOps.upsert(spaceId, Collections.singletonList(20), Arrays.asList(20, "twenty"), - Arrays.asList("=", 1, "twenty"))); + syncIds.add( + ffOps.upsert( + spaceId, + Collections.singletonList(20), + Arrays.asList(20, "twenty"), + Arrays.asList("=", 1, "twenty") + ) + ); + + syncIds.add( + ffOps.upsert( + spaceId, + Collections.singletonList(25), + Arrays.asList(25, "25"), + Arrays.asList("=", 1, "twenty five") + ) + ); - syncIds.add(ffOps.insert(spaceId, Arrays.asList(30, "30"))); syncIds.add(ffOps.call("box.space.basic_test:delete", Collections.singletonList(30))); // Check the syncs. @@ -117,9 +141,66 @@ public void testFireAndForgetOperations() { client.syncOps().ping(); // Check the effects + assertEquals(Collections.emptyList(), consoleSelect(SPACE_NAME, 1)); + checkRawTupleResult(consoleSelect(SPACE_NAME, 2), Arrays.asList(2, "2")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 3), Arrays.asList(3, "3")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 5), Arrays.asList(5, "five")); checkRawTupleResult(consoleSelect(SPACE_NAME, 10), Arrays.asList(10, "ten")); checkRawTupleResult(consoleSelect(SPACE_NAME, 20), Arrays.asList(20, "twenty")); - assertEquals(consoleSelect(SPACE_NAME, 30), Collections.emptyList()); + checkRawTupleResult(consoleSelect(SPACE_NAME, 25), Arrays.asList(25, "25")); + assertEquals(Collections.emptyList(), consoleSelect(SPACE_NAME, 30)); + } + + @Test + public void testFireAndForgetStringOperations() { + testHelper.executeLua( + "box.space.basic_test:insert{2, '2'}", + "box.space.basic_test:insert{20, '20'}", + "box.space.basic_test:insert{200, '200'}", + "box.space.basic_test:insert{2000, '2000'}" + ); + + TarantoolClientOps, Object, Long> ffOps = client.fireAndForgetOps(); + Set syncIds = new HashSet<>(); + + syncIds.add(ffOps.delete(SPACE_NAME, Collections.singletonList(2))); + syncIds.add(ffOps.insert(SPACE_NAME, Arrays.asList(3, "3"))); + syncIds.add(ffOps.replace(spaceId, Arrays.asList(2000, "2k"))); + syncIds.add(ffOps.replace(spaceId, Arrays.asList(3000, "3k"))); + syncIds.add(ffOps.update(SPACE_NAME, Collections.singletonList(20), Arrays.asList("=", 1, "twenty"))); + syncIds.add( + ffOps.upsert( + SPACE_NAME, + Collections.singletonList(200), + Arrays.asList(200, "200"), + Arrays.asList("=", 1, "two hundred") + ) + ); + syncIds.add( + ffOps.upsert( + SPACE_NAME, + Collections.singletonList(400), + Arrays.asList(400, "400"), + Arrays.asList("=", 1, "four hundred") + ) + ); + + // Check the syncs. + assertFalse(syncIds.contains(0L)); + assertEquals(7, syncIds.size()); + + // The reply for synchronous ping will + // indicate to us that previous fire & forget operations are completed. + client.syncOps().ping(); + + // Check the effects + assertEquals(consoleSelect(SPACE_NAME, 2), Collections.emptyList()); + checkRawTupleResult(consoleSelect(SPACE_NAME, 3), Arrays.asList(3, "3")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 20), Arrays.asList(20, "twenty")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 200), Arrays.asList(200, "two hundred")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 400), Arrays.asList(400, "400")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 2000), Arrays.asList(2000, "2k")); + checkRawTupleResult(consoleSelect(SPACE_NAME, 3000), Arrays.asList(3000, "3k")); } private List consoleSelect(String spaceName, Object key) { diff --git a/src/test/java/org/tarantool/IteratorTest.java b/src/test/java/org/tarantool/IteratorTest.java deleted file mode 100644 index 7fd68b61..00000000 --- a/src/test/java/org/tarantool/IteratorTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.tarantool; - -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; - -class IteratorTest { - protected class MockOps extends AbstractTarantoolOps, Object, List> { - - @Override - public List exec(Code code, Object... args) { - return null; - } - - @Override - public void close() { - throw new UnsupportedOperationException(); - } - } - - @Test - void testSelectWithIteratorInsteadOfInteger() { - MockOps ops = new MockOps(); - MockOps spyOps = spy(ops); - - spyOps.select(1, 1, new ArrayList(), 0, 1, Iterator.EQ); - - verify(spyOps, times(1)).select(1, 1, new ArrayList(), 0, 1, 0); - } -} diff --git a/src/test/java/org/tarantool/TarantoolClientOpsIT.java b/src/test/java/org/tarantool/TarantoolClientOpsIT.java index ebaca9f5..a8964b7b 100644 --- a/src/test/java/org/tarantool/TarantoolClientOpsIT.java +++ b/src/test/java/org/tarantool/TarantoolClientOpsIT.java @@ -580,12 +580,10 @@ public void execute() throws Throwable { @MethodSource("getClientOps") public void testInsertDuplicateKey(SyncOpsProvider provider) { final List tup = Arrays.asList(1, "uno"); - TarantoolException ex = assertThrows(TarantoolException.class, new Executable() { - @Override - public void execute() throws Throwable { - provider.getClientOps().insert(spaceId, tup); - } - }); + TarantoolException ex = assertThrows( + TarantoolException.class, + () -> provider.getClientOps().insert(spaceId, tup) + ); assertEquals("Duplicate key exists in unique index 'pk' in space 'basic_test'", ex.getMessage()); // Check the tuple stayed intact. diff --git a/src/test/java/org/tarantool/TestUtils.java b/src/test/java/org/tarantool/TestUtils.java index cb2b73e6..f144526b 100644 --- a/src/test/java/org/tarantool/TestUtils.java +++ b/src/test/java/org/tarantool/TestUtils.java @@ -276,7 +276,7 @@ public static TarantoolClusterClientConfig makeDefaultClusterClientConfig() { config.username = TarantoolTestHelper.USERNAME; config.password = TarantoolTestHelper.PASSWORD; config.initTimeoutMillis = 2000; - config.operationExpiryTimeMillis = 1000; + config.operationExpiryTimeMillis = 2000; config.sharedBufferSize = 128; config.executor = null; return config; diff --git a/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java b/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java index 4e940033..858e2143 100644 --- a/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java +++ b/src/test/java/org/tarantool/cluster/ClusterServiceStoredFunctionDiscovererIT.java @@ -9,7 +9,6 @@ import static org.tarantool.TestUtils.makeDiscoveryFunction; import org.tarantool.CommunicationException; -import org.tarantool.TarantoolClient; import org.tarantool.TarantoolClientImpl; import org.tarantool.TarantoolClusterClientConfig; import org.tarantool.TarantoolException; @@ -36,7 +35,7 @@ public class ClusterServiceStoredFunctionDiscovererIT { private static TarantoolTestHelper testHelper; private TarantoolClusterClientConfig clusterConfig; - private TarantoolClient client; + private TarantoolClientImpl client; @BeforeAll public static void setupEnv() { diff --git a/src/test/java/org/tarantool/jdbc/JdbcConnectionTimeoutIT.java b/src/test/java/org/tarantool/jdbc/JdbcConnectionTimeoutIT.java index 7f4d2b3f..9f503f79 100644 --- a/src/test/java/org/tarantool/jdbc/JdbcConnectionTimeoutIT.java +++ b/src/test/java/org/tarantool/jdbc/JdbcConnectionTimeoutIT.java @@ -7,6 +7,7 @@ import org.tarantool.ServerVersion; import org.tarantool.TarantoolClientConfig; +import org.tarantool.TarantoolOperation; import org.tarantool.TarantoolTestHelper; import org.tarantool.protocol.TarantoolPacket; @@ -51,7 +52,7 @@ void setUp() throws SQLException { protected SQLTarantoolClientImpl makeSqlClient(String address, TarantoolClientConfig config) { return new SQLTarantoolClientImpl(address, config) { @Override - protected void completeSql(TarantoolOp operation, TarantoolPacket pack) { + protected void completeSql(TarantoolOperation operation, TarantoolPacket pack) { try { Thread.sleep(LONG_ENOUGH_TIMEOUT); } catch (InterruptedException ignored) { diff --git a/src/test/java/org/tarantool/schema/ClientReconnectSchemaIT.java b/src/test/java/org/tarantool/schema/ClientReconnectSchemaIT.java new file mode 100644 index 00000000..ad4018eb --- /dev/null +++ b/src/test/java/org/tarantool/schema/ClientReconnectSchemaIT.java @@ -0,0 +1,94 @@ +package org.tarantool.schema; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.tarantool.TestUtils.makeDefaultClusterClientConfig; + +import org.tarantool.Iterator; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolClusterClient; +import org.tarantool.TarantoolClusterClientConfig; +import org.tarantool.TarantoolTestHelper; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class ClientReconnectSchemaIT { + + private static final String[] SRVS = { "srv-schema-it-1", "srv-schema-it-2" }; + private static final int[] PORTS = { 3401, 3402 }; + + private static TarantoolTestHelper firstTestHelper; + private static TarantoolTestHelper secondTestHelper; + + @BeforeAll + public static void setupEnv() { + firstTestHelper = new TarantoolTestHelper(SRVS[0]); + firstTestHelper.createInstance(TarantoolTestHelper.LUA_FILE, PORTS[0], PORTS[0] + 1000); + firstTestHelper.startInstance(); + + secondTestHelper = new TarantoolTestHelper(SRVS[1]); + secondTestHelper.createInstance(TarantoolTestHelper.LUA_FILE, PORTS[1], PORTS[1] + 1000); + secondTestHelper.startInstance(); + } + + @AfterAll + public static void teardownEnv() { + firstTestHelper.stopInstance(); + secondTestHelper.stopInstance(); + } + + @Test + @DisplayName("got a result from another node after the current node had disappeared") + public void testSameNamedSpaceAfterReconnection() { + String[] firstSpace = { + "box.schema.space.create('string_space1', { format = { {name = 'id', type = 'integer'} } })", + "box.space.string_space1:create_index('primary', { type = 'TREE', parts = {'id'} })" + }; + String[] secondSpace = { + "box.schema.space.create('string_space2', { format = { {name = 'id', type = 'integer'} } })", + "box.space.string_space2:create_index('primary', { type = 'TREE', parts = {'id'} })" + }; + + // create spaces on two instances with an inverted order + // as a result, instances have same schema version but spaces have unequal IDs + firstTestHelper.executeLua(firstSpace); + firstTestHelper.executeLua(secondSpace); + firstTestHelper.executeLua("box.space.string_space1:insert{100}"); + secondTestHelper.executeLua(secondSpace); + secondTestHelper.executeLua(firstSpace); + secondTestHelper.executeLua("box.space.string_space1:insert{200}"); + assertEquals(firstTestHelper.getInstanceVersion(), secondTestHelper.getInstanceVersion()); + + int firstSpaceIdFirstInstance = firstTestHelper.evaluate("box.space.string_space1.id"); + int firstSpaceIdSecondInstance = secondTestHelper.evaluate("box.space.string_space1.id"); + assertNotEquals(firstSpaceIdFirstInstance, firstSpaceIdSecondInstance); + + final TarantoolClientImpl client = makeClusterClient( + "localhost:" + PORTS[0], + "127.0.0.1:" + PORTS[1] + ); + + List result = client.syncOps() + .select("string_space1", "primary", Collections.emptyList(), 0, 10, Iterator.ALL); + assertEquals(Arrays.asList(100), result.get(0)); + firstTestHelper.stopInstance(); + + result = client.syncOps() + .select("string_space1", "primary", Collections.emptyList(), 0, 10, Iterator.ALL); + assertEquals(Arrays.asList(200), result.get(0)); + secondTestHelper.stopInstance(); + } + + private TarantoolClusterClient makeClusterClient(String... addresses) { + TarantoolClusterClientConfig config = makeDefaultClusterClientConfig(); + return new TarantoolClusterClient(config, addresses); + } + +} diff --git a/src/test/java/org/tarantool/schema/ClientSchemaIT.java b/src/test/java/org/tarantool/schema/ClientSchemaIT.java new file mode 100644 index 00000000..214c599c --- /dev/null +++ b/src/test/java/org/tarantool/schema/ClientSchemaIT.java @@ -0,0 +1,247 @@ +package org.tarantool.schema; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; +import static org.tarantool.TestUtils.makeDefaultClientConfig; + +import org.tarantool.ServerVersion; +import org.tarantool.TarantoolClientConfig; +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolTestHelper; +import org.tarantool.TestAssumptions; +import org.tarantool.schema.TarantoolIndexMeta.IndexOptions; +import org.tarantool.schema.TarantoolIndexMeta.IndexPart; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +@DisplayName("A schema meta") +public class ClientSchemaIT { + + private static TarantoolTestHelper testHelper; + + private TarantoolClientImpl client; + + @BeforeAll + public static void setupEnv() { + testHelper = new TarantoolTestHelper("client-schema-it"); + testHelper.createInstance(); + testHelper.startInstance(); + } + + @AfterAll + public static void teardownEnv() { + testHelper.stopInstance(); + } + + @BeforeEach + public void setup() { + TarantoolClientConfig config = makeDefaultClientConfig(); + + client = new TarantoolClientImpl( + TarantoolTestHelper.HOST + ":" + TarantoolTestHelper.PORT, + config + ); + } + + @AfterEach + public void tearDown() { + client.close(); + testHelper.executeLua("box.space.count_space and box.space.count_space:drop()"); + } + + @Test + @DisplayName("fetched a space with its index") + void testFetchSpaces() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + testHelper.executeLua("box.space.count_space:create_index('pk', { type = 'TREE', parts = {'id'} } )"); + + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + + TarantoolSpaceMeta space = meta.getSpace("count_space"); + assertNotNull(space); + assertEquals("count_space", space.getName()); + + List spaceFormat = space.getFormat(); + assertEquals(2, spaceFormat.size()); + assertEquals("id", spaceFormat.get(0).getName()); + assertEquals("integer", spaceFormat.get(0).getType()); + assertEquals("counts", spaceFormat.get(1).getName()); + assertEquals("integer", spaceFormat.get(1).getType()); + + TarantoolIndexMeta primaryIndex = space.getIndex("pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "TREE", + new IndexOptions(true), + Collections.singletonList(new IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + } + + @Test + @DisplayName("fetched newly created spaces and indexes") + void testFetchNewSpaces() { + // add count_space + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + TarantoolSpaceMeta space = meta.getSpace("count_space"); + assertNotNull(space); + assertEquals("count_space", space.getName()); + assertThrows(TarantoolSpaceNotFoundException.class, () -> meta.getSpace("count_space_2")); + + // add count_space_2 + testHelper.executeLua( + "box.schema.space.create('count_space_2', { format = " + + "{ {name = 'id', type = 'integer'} } })" + ); + meta.refresh(); + space = meta.getSpace("count_space_2"); + assertNotNull(space); + assertEquals("count_space_2", space.getName()); + assertThrows(TarantoolIndexNotFoundException.class, () -> meta.getSpaceIndex("count_space_2", "pk")); + + // add a primary index for count_space_2 + testHelper.executeLua( + "box.space.count_space_2:create_index('pk', { unique = true, type = 'TREE', parts = {'id'} } )" + ); + meta.refresh(); + TarantoolIndexMeta spaceIndex = meta.getSpaceIndex("count_space_2", "pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "TREE", + new IndexOptions(true), + Collections.singletonList(new IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, spaceIndex); + } + + @Test + @DisplayName("fetched space indexes of a space") + void testFetchIndexes() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'}," + + " {name = 'counts', type = 'integer'} }" + + "})" + ); + testHelper.executeLua( + "box.space.count_space:create_index('pk', { type = 'HASH', parts = {'id'} } )", + "box.space.count_space:create_index('c_index', { unique = false, type = 'TREE', parts = {'counts'} } )" + ); + + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + + TarantoolIndexMeta primaryIndex = meta.getSpaceIndex("count_space", "pk"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk", "HASH", + new IndexOptions(true), + Collections.singletonList(new IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + + TarantoolIndexMeta secondaryIndex = meta.getSpaceIndex("count_space", "c_index"); + TarantoolIndexMeta expectedSecondaryIndex = new TarantoolIndexMeta( + 1, "c_index", "TREE", + new IndexOptions(false), + Collections.singletonList(new IndexPart(1, "integer")) + ); + assertIndex(expectedSecondaryIndex, secondaryIndex); + } + + @Test + @DisplayName("fetched sql table primary index") + void testFetchSqlIndexes() { + TestAssumptions.assumeMinimalServerVersion(testHelper.getInstanceVersion(), ServerVersion.V_2_1); + testHelper.executeSql("create table my_table (id int primary key, val varchar(100))"); + + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + + TarantoolIndexMeta primaryIndex = meta.getSpaceIndex("MY_TABLE", "pk_unnamed_MY_TABLE_1"); + TarantoolIndexMeta expectedPrimaryIndex = new TarantoolIndexMeta( + 0, "pk_unnamed_MY_TABLE_1", "tree", + new IndexOptions(true), + Collections.singletonList(new IndexPart(0, "integer")) + ); + assertIndex(expectedPrimaryIndex, primaryIndex); + } + + @Test + @DisplayName("got an error with a wrong space name") + void tesGetUnknownSpace() { + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + + TarantoolSpaceNotFoundException exception = assertThrows( + TarantoolSpaceNotFoundException.class, + () -> meta.getSpace("unknown_space") + ); + assertEquals("unknown_space", exception.getSchemaName()); + } + + @Test + @DisplayName("got an error with a wrong space index name") + void testGetUnknownSpaceIndex() { + testHelper.executeLua( + "box.schema.space.create('count_space', { format = " + + "{ {name = 'id', type = 'integer'} } })" + ); + testHelper.executeLua("box.space.count_space:create_index('pk', { type = 'TREE', parts = {'id'} } )"); + + TarantoolSchemaMeta meta = new TarantoolMetaSpacesCache(client); + meta.refresh(); + + assertEquals("count_space", meta.getSpace("count_space").getName()); + TarantoolIndexNotFoundException exception = assertThrows( + TarantoolIndexNotFoundException.class, + () -> meta.getSpaceIndex("count_space", "wrong_pk") + ); + assertEquals("wrong_pk", exception.getIndexName()); + } + + private void assertIndex(TarantoolIndexMeta expectedIndex, TarantoolIndexMeta actualIndex) { + assertEquals(expectedIndex.getId(), actualIndex.getId()); + assertEquals(expectedIndex.getName(), actualIndex.getName()); + assertEquals(expectedIndex.getType(), actualIndex.getType()); + assertEqualsOptions(expectedIndex.getOptions(), actualIndex.getOptions()); + assertEqualsParts(expectedIndex.getParts(), actualIndex.getParts()); + } + + private void assertEqualsOptions(IndexOptions expected, IndexOptions actual) { + assertEquals(expected.isUnique(), actual.isUnique()); + } + + private void assertEqualsParts(List expected, List actual) { + if (expected.size() != actual.size()) { + fail("Part lists have different sizes"); + } + for (int i = 0; i < expected.size(); i++) { + IndexPart expectedPart = expected.get(i); + IndexPart actualPart = actual.get(i); + assertEquals(expectedPart.getFieldNumber(), actualPart.getFieldNumber()); + assertEquals(expectedPart.getType(), actualPart.getType()); + } + } + +} diff --git a/src/test/java/org/tarantool/schema/ClientThreadSafeSchemaIT.java b/src/test/java/org/tarantool/schema/ClientThreadSafeSchemaIT.java new file mode 100644 index 00000000..e8c8e04b --- /dev/null +++ b/src/test/java/org/tarantool/schema/ClientThreadSafeSchemaIT.java @@ -0,0 +1,105 @@ +package org.tarantool.schema; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.tarantool.TestUtils.makeDefaultClientConfig; +import static org.tarantool.TestUtils.makeTestClient; + +import org.tarantool.TarantoolClient; +import org.tarantool.TarantoolClientConfig; +import org.tarantool.TarantoolClientOps; +import org.tarantool.TarantoolTestHelper; +import org.tarantool.TarantoolThreadDaemonFactory; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@DisplayName("A client") +public class ClientThreadSafeSchemaIT { + + private static TarantoolTestHelper testHelper; + + @BeforeAll + public static void setupEnv() { + testHelper = new TarantoolTestHelper("client-schema-thread-safe-it"); + testHelper.createInstance(); + testHelper.startInstance(); + } + + @AfterAll + public static void teardownEnv() { + testHelper.stopInstance(); + } + + @Test + @DisplayName("executed many DML/DDL string-operations from several threads simultaneously") + void testFetchSpaces() { + testHelper.executeLua( + makeCreateSpaceFunction(), + makeDropSpaceFunction() + ); + + TarantoolClientConfig config = makeDefaultClientConfig(); + config.operationExpiryTimeMillis = 2000; + TarantoolClient client = makeTestClient(config, 500); + + int threadsNumber = 16; + int iterations = 100; + final CountDownLatch latch = new CountDownLatch(threadsNumber); + ExecutorService executor = Executors.newFixedThreadPool( + threadsNumber, + new TarantoolThreadDaemonFactory("testWorkers") + ); + + // multiple threads can cause schema invalidation simultaneously + // but it hasn't to affect other threads + for (int i = 0; i < threadsNumber; i++) { + int threadNumber = i; + executor.submit(() -> { + String spaceName = "my_space" + threadNumber; + for (int k = 0; k < iterations; k++) { + TarantoolClientOps, Object, List> ops = client.syncOps(); + ops.call("makeSpace", spaceName); + ops.insert(spaceName, Arrays.asList(k, threadNumber)); + ops.call("dropSpace", spaceName); + } + latch.countDown(); + }); + } + + try { + assertTrue(latch.await(20, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + fail(e); + } finally { + executor.shutdownNow(); + client.close(); + } + } + + private String makeCreateSpaceFunction() { + return "function makeSpace(spaceName) " + + "box.schema.space.create(spaceName, { format = " + + "{ {name = 'id', type = 'integer'}, " + + " {name = 'counts', type = 'integer'} } " + + "}); " + + "box.space[spaceName]:create_index('pk', { type = 'TREE', parts = {'id'} } ) " + + "end"; + } + + private String makeDropSpaceFunction() { + return "function dropSpace(spaceName) " + + "box.space[spaceName]:drop() " + + "end"; + } + +}