diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 50086e95e2..12da45a8a0 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -35,6 +35,7 @@ import java.util.Comparator; import java.util.Deque; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.Map; import java.util.Set; @@ -164,7 +165,7 @@ public class ConnectionWorker implements AutoCloseable { * Contains the updated TableSchema. */ @GuardedBy("lock") - private TableSchemaAndTimestamp updatedSchema; + private TableSchema updatedSchema; /* * A client used to interact with BigQuery. @@ -196,6 +197,12 @@ public class ConnectionWorker implements AutoCloseable { */ private final String writerId = UUID.randomUUID().toString(); + /** + * When using connection pooling, this is the set of StreamWriters that are currently using this + * connection. This object is only accessed by ConnectionWorkerPool when the pool's lock is held. + */ + private final Set streamWriters; + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) @@ -241,6 +248,7 @@ public void run() { } }); this.appendThread.start(); + this.streamWriters = new HashSet<>(); } private void resetConnection() { @@ -392,6 +400,10 @@ public void close() { } } + Set getCurrentStreamWriters() { + return streamWriters; + } + /* * This loop is executed in a separate thread. * @@ -616,8 +628,7 @@ private void requestCallback(AppendRowsResponse response) { AppendRequestAndResponse requestWrapper; this.lock.lock(); if (response.hasUpdatedSchema()) { - this.updatedSchema = - TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema()); + this.updatedSchema = response.getUpdatedSchema(); } try { // Had a successful connection with at least one result, reset retries. @@ -742,7 +753,7 @@ private AppendRequestAndResponse pollInflightRequestQueue() { } /** Thread-safe getter of updated TableSchema */ - synchronized TableSchemaAndTimestamp getUpdatedSchema() { + synchronized TableSchema getUpdatedSchema() { return this.updatedSchema; } @@ -840,17 +851,4 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) { overwhelmedInflightCount = newThreshold; } } - - @AutoValue - abstract static class TableSchemaAndTimestamp { - // Shows the timestamp updated schema is reported from response - abstract long updateTimeStamp(); - - // The updated schema returned from server. - abstract TableSchema updatedSchema(); - - static TableSchemaAndTimestamp create(long updateTimeStamp, TableSchema updatedSchema) { - return new AutoValue_ConnectionWorker_TableSchemaAndTimestamp(updateTimeStamp, updatedSchema); - } - } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 121b1d0e28..e86f05f690 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -20,7 +20,6 @@ import com.google.api.gax.batching.FlowController; import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load; -import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; @@ -29,6 +28,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,14 +67,6 @@ public class ConnectionWorkerPool { */ private final FlowController.LimitExceededBehavior limitExceededBehavior; - /** Map from write stream to corresponding connection. */ - private final Map streamWriterToConnection = - new ConcurrentHashMap<>(); - - /** Map from a connection to a set of write stream that have sent requests onto it. */ - private final Map> connectionToWriteStream = - new ConcurrentHashMap<>(); - /** Collection of all the created connections. */ private final Set connectionWorkerPool = Collections.synchronizedSet(new HashSet<>()); @@ -82,7 +74,8 @@ public class ConnectionWorkerPool { /* * Contains the mapping from stream name to updated schema. */ - private Map tableNameToUpdatedSchema = new ConcurrentHashMap<>(); + private Map> streamNameToStreamWriter = new ConcurrentHashMap(); + private Map streamWriterToUpdatedSchema = new ConcurrentHashMap<>(); /** Enable test related logic. */ private static boolean enableTesting = false; @@ -225,6 +218,12 @@ public static void setOptions(Settings settings) { ConnectionWorkerPool.settings = settings; } + public void registerStreamWriter(StreamWriter streamWriter) { + streamNameToStreamWriter + .computeIfAbsent(streamWriter.getStreamName(), k -> new HashSet<>()) + .add(streamWriter); + } + /** Distributes the writing of a message to an underlying connection. */ public ApiFuture append(StreamWriter streamWriter, ProtoRows rows) { return append(streamWriter, rows, -1); @@ -234,47 +233,52 @@ public ApiFuture append(StreamWriter streamWriter, ProtoRows public ApiFuture append( StreamWriter streamWriter, ProtoRows rows, long offset) { // We are in multiplexing mode after entering the following logic. - ConnectionWorker connectionWorker = - streamWriterToConnection.compute( - streamWriter, - (key, existingStream) -> { - // Though compute on concurrent map is atomic, we still do explicit locking as we - // may have concurrent close(...) triggered. - lock.lock(); - try { - // Stick to the existing stream if it's not overwhelmed. - if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) { - return existingStream; - } - // Try to create or find another existing stream to reuse. - ConnectionWorker createdOrExistingConnection = null; - try { - createdOrExistingConnection = - createOrReuseConnectionWorker(streamWriter, existingStream); - } catch (IOException e) { - throw new IllegalStateException(e); - } - // Update connection to write stream relationship. - connectionToWriteStream.computeIfAbsent( - createdOrExistingConnection, (ConnectionWorker k) -> new HashSet<>()); - connectionToWriteStream.get(createdOrExistingConnection).add(streamWriter); - return createdOrExistingConnection; - } finally { - lock.unlock(); - } - }); + ConnectionWorker currentConnection; + // TODO: Do we need a global lock here? Or is it enough to just lock the StreamWriter? + lock.lock(); + try { + currentConnection = streamWriter.getCurrentConnectionPoolConnection(); + // TODO: Experiment with only checking isOverwhelmed less often per StreamWriter (once per + // second?) instead of on every append call. + if (currentConnection == null || currentConnection.getLoad().isOverwhelmed()) { + // Try to create or find another existing stream to reuse. + ConnectionWorker createdOrExistingConnection = null; + try { + createdOrExistingConnection = + createOrReuseConnectionWorker(streamWriter, currentConnection); + } catch (IOException e) { + throw new IllegalStateException(e); + } + currentConnection = createdOrExistingConnection; + streamWriter.setCurrentConnectionPoolConnection(currentConnection); + // Update connection to write stream relationship. + // TODO: What if we simply kept an atomic refcount in ConnectionWorker? We could also + // manage the refcount in the callback below to precisely track which connections are being + // used. + currentConnection.getCurrentStreamWriters().add(streamWriter); + } + } finally { + lock.unlock(); + } + Stopwatch stopwatch = Stopwatch.createStarted(); ApiFuture responseFuture = - connectionWorker.append( + currentConnection.append( streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset); return ApiFutures.transform( responseFuture, // Add callback for update schema (response) -> { if (response.getWriteStream() != "" && response.hasUpdatedSchema()) { - tableNameToUpdatedSchema.put( - response.getWriteStream(), - TableSchemaAndTimestamp.create(System.nanoTime(), response.getUpdatedSchema())); + Set streamWritersToUpdate = + streamNameToStreamWriter.get(response.getWriteStream()); + if (streamWritersToUpdate != null) { + for (StreamWriter updateStream : streamWritersToUpdate) { + // Alternatively, just call a setter on each of these StreamWriters to tell it about + // the new schema. That would eliminate another static map. + streamWriterToUpdatedSchema.put(updateStream, response.getUpdatedSchema()); + } + } } return response; }, @@ -385,26 +389,30 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w public void close(StreamWriter streamWriter) { lock.lock(); try { - streamWriterToConnection.remove(streamWriter); + Set streamWriters = streamNameToStreamWriter.get(streamWriter.getStreamName()); + if (streamWriters != null) { + streamWriters.remove(streamWriter); + } + + streamWriter.setCurrentConnectionPoolConnection(null); // Since it's possible some other connections may have served this writeStream, we // iterate and see whether it's also fine to close other connections. - Set connectionToRemove = new HashSet<>(); - for (ConnectionWorker connectionWorker : connectionToWriteStream.keySet()) { - if (connectionToWriteStream.containsKey(connectionWorker)) { - connectionToWriteStream.get(connectionWorker).remove(streamWriter); - if (connectionToWriteStream.get(connectionWorker).isEmpty()) { - connectionWorker.close(); - connectionWorkerPool.remove(connectionWorker); - connectionToRemove.add(connectionWorker); - } + int numClosed = 0; + for (Iterator it = connectionWorkerPool.iterator(); it.hasNext(); ) { + ConnectionWorker connectionWorker = it.next(); + connectionWorker.getCurrentStreamWriters().remove(streamWriter); + if (connectionWorker.getCurrentStreamWriters().isEmpty()) { + connectionWorker.close(); + it.remove(); + ++numClosed; } } + log.info( String.format( "During closing of writeStream for %s with writer id %s, we decided to close %s " + "connections", - streamWriter.getStreamName(), streamWriter.getWriterId(), connectionToRemove.size())); - connectionToWriteStream.keySet().removeAll(connectionToRemove); + streamWriter.getStreamName(), streamWriter.getWriterId(), numClosed)); } finally { lock.unlock(); } @@ -414,7 +422,7 @@ public void close(StreamWriter streamWriter) { public long getInflightWaitSeconds(StreamWriter streamWriter) { lock.lock(); try { - ConnectionWorker connectionWorker = streamWriterToConnection.get(streamWriter); + ConnectionWorker connectionWorker = streamWriter.getCurrentConnectionPoolConnection(); if (connectionWorker == null) { return 0; } else { @@ -425,8 +433,8 @@ public long getInflightWaitSeconds(StreamWriter streamWriter) { } } - TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) { - return tableNameToUpdatedSchema.getOrDefault(streamWriter.getStreamName(), null); + TableSchema getUpdatedSchema(StreamWriter streamWriter) { + return streamWriterToUpdatedSchema.getOrDefault(streamWriter, null); } /** Enable Test related logic. */ diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index ff7dad474d..5eb868228a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -22,7 +22,6 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auto.value.AutoOneOf; import com.google.auto.value.AutoValue; -import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.Status; @@ -112,6 +111,21 @@ public static ConnectionPoolKey create(String location) { } } + private ConnectionWorker currentConnectionPoolConnection; + + /** + * If using a single connection, this returns null. Always accessed under the ConnectionWorkerPool + * lock. + */ + ConnectionWorker getCurrentConnectionPoolConnection() { + return currentConnectionPoolConnection; + } + + /** Always accessed under the ConnectionWorkerPool lock. */ + void setCurrentConnectionPoolConnection(ConnectionWorker currentConnectionPoolConnection) { + this.currentConnectionPoolConnection = currentConnectionPoolConnection; + } + /** * When in single table mode, append directly to connectionWorker. Otherwise append to connection * pool in multiplexing mode. @@ -155,12 +169,12 @@ long getInflightWaitSeconds(StreamWriter streamWriter) { return connectionWorker().getInflightWaitSeconds(); } - TableSchemaAndTimestamp getUpdatedSchema(StreamWriter streamWriter) { - if (getKind() == Kind.CONNECTION_WORKER_POOL) { + TableSchema getUpdatedSchema(StreamWriter streamWriter) { + if (getKind() == Kind.CONNECTION_WORKER) { + return connectionWorker().getUpdatedSchema(); + } else { return connectionWorkerPool().getUpdatedSchema(streamWriter); } - // Always populate MIN timestamp to w - return connectionWorker().getUpdatedSchema(); } String getWriterId(String streamWriterId) { @@ -170,6 +184,11 @@ String getWriterId(String streamWriterId) { return connectionWorker().getWriterId(); } + public void register(StreamWriter streamWriter) { + Preconditions.checkState(getKind() == Kind.CONNECTION_WORKER_POOL); + connectionWorkerPool().registerStreamWriter(streamWriter); + } + public static SingleConnectionOrConnectionPool ofSingleConnection(ConnectionWorker connection) { return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorker(connection); } @@ -259,6 +278,7 @@ private StreamWriter(Builder builder) throws IOException { client, ownsBigQueryWriteClient); })); + this.singleConnectionOrConnectionPool.register(this); validateFetchedConnectonPool(builder); // Shut down the passed in client. Internally we will create another client inside connection // pool for every new connection worker. @@ -429,14 +449,7 @@ public static StreamWriter.Builder newBuilder(String streamName) { * than the updated schema. */ public synchronized TableSchema getUpdatedSchema() { - TableSchemaAndTimestamp tableSchemaAndTimestamp = - singleConnectionOrConnectionPool.getUpdatedSchema(this); - if (tableSchemaAndTimestamp == null) { - return null; - } - return creationTimestamp < tableSchemaAndTimestamp.updateTimeStamp() - ? tableSchemaAndTimestamp.updatedSchema() - : null; + return singleConnectionOrConnectionPool.getUpdatedSchema(this); } long getCreationTimestamp() { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index f8822e231f..2767f2e592 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -310,10 +310,10 @@ private void testUpdatedSchemaFetch(boolean enableMultiplexing) .setWriteStream(TEST_STREAM_1) .build()); - assertEquals(writer.getUpdatedSchema(), null); + assertEquals(null, writer.getUpdatedSchema()); AppendRowsResponse response = writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0).get(); - assertEquals(writer.getUpdatedSchema(), UPDATED_TABLE_SCHEMA); + assertEquals(UPDATED_TABLE_SCHEMA, writer.getUpdatedSchema()); // Create another writer, although it's the same stream name but the time stamp is newer, thus // the old updated schema won't get returned. @@ -325,7 +325,7 @@ private void testUpdatedSchemaFetch(boolean enableMultiplexing) .setEnableConnectionPool(enableMultiplexing) .setLocation("us") .build(); - assertEquals(writer2.getUpdatedSchema(), null); + assertEquals(null, writer2.getUpdatedSchema()); } @Test diff --git a/pom.xml b/pom.xml index 8d08251cf3..aca813eb89 100644 --- a/pom.xml +++ b/pom.xml @@ -52,7 +52,7 @@ UTF-8 UTF-8 - 3.14.0 + 3.18.3 github google-cloud-bigquerystorage-parent