diff --git a/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java b/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java index db9e97433..edf6e2859 100644 --- a/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java +++ b/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java @@ -67,6 +67,7 @@ final class GrpcServiceStubs implements IGrpcServiceStubs { private static final String CLIENT_IMPL_HEADER_VALUE = "uber-java"; private final ManagedChannel channel; + private final boolean shutdownChannel; private final AtomicBoolean shutdownRequested = new AtomicBoolean(); private final DomainAPIGrpc.DomainAPIBlockingStub domainBlockingStub; private final DomainAPIGrpc.DomainAPIFutureStub domainFutureStub; @@ -80,12 +81,17 @@ final class GrpcServiceStubs implements IGrpcServiceStubs { private final MetaAPIGrpc.MetaAPIFutureStub metaFutureStub; GrpcServiceStubs(ClientOptions options) { - this.channel = - ManagedChannelBuilder.forAddress(options.getHost(), options.getPort()) - .defaultLoadBalancingPolicy("round_robin") - .usePlaintext() - .build(); - + if (options.getGRPCChannel() != null) { + this.channel = options.getGRPCChannel(); + shutdownChannel = false; + } else { + this.channel = + ManagedChannelBuilder.forAddress(options.getHost(), options.getPort()) + .defaultLoadBalancingPolicy("round_robin") + .usePlaintext() + .build(); + shutdownChannel = true; + } ClientInterceptor deadlineInterceptor = new GrpcDeadlineInterceptor(options); ClientInterceptor tracingInterceptor = newTracingInterceptor(); Metadata headers = new Metadata(); @@ -201,28 +207,41 @@ public WorkflowAPIFutureStub workflowFutureStub() { @Override public void shutdown() { shutdownRequested.set(true); - channel.shutdown(); + if (shutdownChannel) { + channel.shutdown(); + } } @Override public void shutdownNow() { shutdownRequested.set(true); - channel.shutdownNow(); + if (shutdownChannel) { + channel.shutdownNow(); + } } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return channel.awaitTermination(timeout, unit); + if (shutdownChannel) { + return channel.awaitTermination(timeout, unit); + } + return true; } @Override public boolean isShutdown() { - return channel.isShutdown(); + if (shutdownChannel) { + return channel.isShutdown(); + } + return shutdownRequested.get(); } @Override public boolean isTerminated() { - return channel.isTerminated(); + if (shutdownChannel) { + return channel.isTerminated(); + } + return shutdownRequested.get(); } private static class GrpcDeadlineInterceptor implements ClientInterceptor { diff --git a/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java b/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java index 5d06277d5..a65179a53 100644 --- a/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java +++ b/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java @@ -23,79 +23,94 @@ import com.uber.cadence.internal.metrics.NoopScope; import com.uber.cadence.serviceclient.auth.IAuthorizationProvider; import com.uber.m3.tally.Scope; +import io.grpc.ManagedChannel; import java.util.Map; public class ClientOptions { + private static final int DEFAULT_LOCAL_CADENCE_SERVER_PORT = 7933; private static final String LOCALHOST = "127.0.0.1"; - /** Default RPC timeout used for all non long poll calls. */ + /** + * Default RPC timeout used for all non long poll calls. + */ private static final long DEFAULT_RPC_TIMEOUT_MILLIS = 3 * 1000; - /** Default RPC timeout used for all long poll calls. */ + /** + * Default RPC timeout used for all long poll calls. + */ private static final long DEFAULT_POLL_RPC_TIMEOUT_MILLIS = 30 * 1000; - /** Default RPC timeout for QueryWorkflow */ + /** + * Default RPC timeout for QueryWorkflow + */ private static final long DEFAULT_QUERY_RPC_TIMEOUT_MILLIS = 10 * 1000; - /** Default RPC timeout for ListArchivedWorkflow */ + /** + * Default RPC timeout for ListArchivedWorkflow + */ private static final long DEFAULT_LIST_ARCHIVED_WORKFLOW_TIMEOUT_MILLIS = 180 * 1000; private static final String DEFAULT_CLIENT_APP_NAME = "cadence-client"; - /** Name of the Cadence service front end as required by TChannel. */ + /** + * Name of the Cadence service front end as required by TChannel. + */ private static final String DEFAULT_SERVICE_NAME = "cadence-frontend"; + private static final ClientOptions DEFAULT_INSTANCE; + + static { + DEFAULT_INSTANCE = new Builder().build(); + } private final String host; private final int port; - - /** The tChannel timeout in milliseconds */ + private final ManagedChannel gRPCChannel; + /** + * The tChannel timeout in milliseconds + */ private final long rpcTimeoutMillis; - - /** The tChannel timeout for long poll calls in milliseconds */ + /** + * The tChannel timeout for long poll calls in milliseconds + */ private final long rpcLongPollTimeoutMillis; - - /** The tChannel timeout for query workflow call in milliseconds */ + /** + * The tChannel timeout for query workflow call in milliseconds + */ private final long rpcQueryTimeoutMillis; - - /** The tChannel timeout for list archived workflow call in milliseconds */ + /** + * The tChannel timeout for list archived workflow call in milliseconds + */ private final long rpcListArchivedWorkflowTimeoutMillis; - - /** TChannel service name that the Cadence service was started with. */ + /** + * TChannel service name that the Cadence service was started with. + */ private final String serviceName; - - /** Name of the service using the cadence-client. */ + /** + * Name of the service using the cadence-client. + */ private final String clientAppName; - - /** Client for metrics reporting. */ + /** + * Client for metrics reporting. + */ private final Scope metricsScope; - - /** Optional TChannel transport headers */ + /** + * Optional TChannel transport headers + */ private final Map transportHeaders; - - /** Optional TChannel headers */ + /** + * Optional TChannel headers + */ private final Map headers; - - /** Optional authorization provider */ + /** + * Optional authorization provider + */ private final IAuthorizationProvider authProvider; - - private static final ClientOptions DEFAULT_INSTANCE; - - /** Optional Feature flags to turn on/off some Cadence features */ + /** + * Optional Feature flags to turn on/off some Cadence features + */ private final FeatureFlags featureFlags; - static { - DEFAULT_INSTANCE = new Builder().build(); - } - - public static ClientOptions defaultInstance() { - return DEFAULT_INSTANCE; - } - - public static Builder newBuilder() { - return new Builder(); - } - private ClientOptions(Builder builder) { if (Strings.isNullOrEmpty(builder.host)) { host = @@ -105,8 +120,8 @@ private ClientOptions(Builder builder) { } else { host = builder.host; } - this.port = builder.port; + this.gRPCChannel = builder.gRPCChannel; this.rpcTimeoutMillis = builder.rpcTimeoutMillis; if (builder.clientAppName == null) { this.clientAppName = DEFAULT_CLIENT_APP_NAME; @@ -141,6 +156,14 @@ private ClientOptions(Builder builder) { this.authProvider = builder.authProvider; } + public static ClientOptions defaultInstance() { + return DEFAULT_INSTANCE; + } + + public static Builder newBuilder() { + return new Builder(); + } + public String getHost() { return host; } @@ -149,27 +172,41 @@ public int getPort() { return port; } - /** @return Returns the rpc timeout value in millis. */ + public ManagedChannel getGRPCChannel() { + return gRPCChannel; + } + + /** + * @return Returns the rpc timeout value in millis. + */ public long getRpcTimeoutMillis() { return rpcTimeoutMillis; } - /** @return Returns the rpc timout for long poll requests in millis. */ + /** + * @return Returns the rpc timout for long poll requests in millis. + */ public long getRpcLongPollTimeoutMillis() { return rpcLongPollTimeoutMillis; } - /** @return Returns the rpc timout for query workflow requests in millis. */ + /** + * @return Returns the rpc timout for query workflow requests in millis. + */ public long getRpcQueryTimeoutMillis() { return rpcQueryTimeoutMillis; } - /** @return Returns the rpc timout for list archived workflow requests in millis. */ + /** + * @return Returns the rpc timout for list archived workflow requests in millis. + */ public long getRpcListArchivedWorkflowTimeoutMillis() { return rpcListArchivedWorkflowTimeoutMillis; } - /** Returns the client application name. */ + /** + * Returns the client application name. + */ public String getClientAppName() { return this.clientAppName; } @@ -204,8 +241,10 @@ public FeatureFlags getFeatureFlags() { * @author venkat */ public static class Builder { + private String host; private int port = DEFAULT_LOCAL_CADENCE_SERVER_PORT; + private ManagedChannel gRPCChannel; private String clientAppName = DEFAULT_CLIENT_APP_NAME; private long rpcTimeoutMillis = DEFAULT_RPC_TIMEOUT_MILLIS; private long rpcLongPollTimeoutMillis = DEFAULT_POLL_RPC_TIMEOUT_MILLIS; @@ -219,7 +258,8 @@ public static class Builder { private IAuthorizationProvider authProvider; private FeatureFlags featureFlags; - private Builder() {} + private Builder() { + } public Builder setHost(String host) { this.host = host; @@ -236,6 +276,14 @@ public Builder setPort(int port) { return this; } + /** + * Sets gRPC channel to use. Exclusive with host and port. + */ + public Builder setGRPCChannel(ManagedChannel gRPCChannel) { + this.gRPCChannel = gRPCChannel; + return this; + } + /** * Sets the rpc timeout value for non query and non long poll calls. Default is 1000. * @@ -278,6 +326,13 @@ public Builder setListArchivedWorkflowRpcTimeout(long timeoutMillis) { return this; } + /** + * Returns the feature flags defined in ClientOptions + */ + public FeatureFlags getFeatureFlags() { + return this.featureFlags; + } + /** * Sets the feature flags to turn on/off some Cadence features By default, all features under * FeatureFlags are turned off. @@ -289,11 +344,6 @@ public Builder setFeatureFlags(FeatureFlags featureFlags) { return this; } - /** Returns the feature flags defined in ClientOptions */ - public FeatureFlags getFeatureFlags() { - return this.featureFlags; - } - /** * Sets the client application name. *