Skip to content

Decouple session from connection #324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 24, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,39 @@
package org.neo4j.driver.internal;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.spi.PooledConnection;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Session;

import static java.lang.String.format;

public class DirectDriver extends BaseDriver
/**
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
* the given address.
*/
public class DirectConnectionProvider implements ConnectionProvider
{
private final BoltServerAddress address;
protected final ConnectionPool connections;
private final ConnectionPool pool;

public DirectDriver(
BoltServerAddress address,
ConnectionPool connections,
SecurityPlan securityPlan,
SessionFactory sessionFactory,
Logging logging )
DirectConnectionProvider( BoltServerAddress address, ConnectionPool pool )
{
super( securityPlan, sessionFactory, logging );
this.address = address;
this.connections = connections;
this.pool = pool;
}

@Override
protected Session newSessionWithMode( AccessMode mode )
public PooledConnection acquireConnection( AccessMode mode )
{
return sessionFactory.newInstance( connections.acquire( address ) );
return pool.acquire( address );
}

@Override
protected void closeResources()
public void close() throws Exception
{
try
{
connections.close();
}
catch ( Exception ex )
{
log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex );
}
pool.close();
}

BoltServerAddress server()
public BoltServerAddress getAddress()
{
return address;
}
Expand Down
74 changes: 50 additions & 24 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.net.URI;
import java.security.GeneralSecurityException;

import org.neo4j.driver.internal.cluster.LoadBalancer;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.net.SocketConnector;
import org.neo4j.driver.internal.net.pooling.PoolSettings;
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.spi.Connector;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.AuthToken;
Expand All @@ -50,13 +52,10 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
BoltServerAddress address = BoltServerAddress.from( uri );
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
SessionFactory sessionFactory = createSessionFactory( config );

try
{
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan,
sessionFactory
);
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan );
}
catch ( Throwable driverError )
{
Expand All @@ -74,42 +73,68 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
}

private Driver createDriver( BoltServerAddress address, String scheme, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
SessionFactory sessionFactory )
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan )
{
switch ( scheme.toLowerCase() )
{
case "bolt":
return createDirectDriver( address, connectionPool, config, securityPlan, sessionFactory );
return createDirectDriver( address, connectionPool, config, securityPlan );
case "bolt+routing":
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan,
sessionFactory );
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
}

/**
* Creates new {@link DirectDriver}.
* Creates a new driver for "bolt" scheme.
* <p>
* <b>This method is protected only for testing</b>
*/
protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
SecurityPlan securityPlan )
{
return new DirectDriver( address, connectionPool, securityPlan, sessionFactory, config.logging() );
ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, config );
return createDriver( config, securityPlan, sessionFactory );
}

/**
* Creates new {@link RoutingDriver}.
* Creates new a new driver for "bolt+routing" scheme.
* <p>
* <b>This method is protected only for testing</b>
*/
protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory )
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan )
{
return new RoutingDriver( routingSettings, address, connectionPool, securityPlan, sessionFactory,
createClock(), config.logging() );
if ( !securityPlan.isRoutingCompatible() )
{
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
}
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, config );
return createDriver( config, securityPlan, sessionFactory );
}

/**
* Creates new {@link Driver}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
{
return new InternalDriver( securityPlan, sessionFactory, config.logging() );
}

/**
* Creates new {@link LoadBalancer} for the routing driver.
* <p>
* <b>This method is protected only for testing</b>
*/
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
RoutingSettings routingSettings )
{
return new LoadBalancer( routingSettings, connectionPool, createClock(), config.logging(), address );
}

/**
Expand Down Expand Up @@ -150,13 +175,14 @@ protected Connector createConnector( ConnectionSettings connectionSettings, Secu
return new SocketConnector( connectionSettings, securityPlan, logging );
}

private static SessionFactory createSessionFactory( Config config )
/**
* Creates new {@link SessionFactory}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, Config config )
{
if ( config.logLeakedSessions() )
{
return new LeakLoggingNetworkSessionFactory( config.logging() );
}
return new NetworkSessionFactory();
return new SessionFactoryImpl( connectionProvider, config, config.logging() );
}

private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,21 @@ private enum State
ROLLED_BACK
}

private final Runnable cleanup;
private final SessionResourcesHandler resourcesHandler;
private final Connection conn;

private String bookmark = null;
private State state = State.ACTIVE;

public ExplicitTransaction( Connection conn, Runnable cleanup )
public ExplicitTransaction( Connection conn, SessionResourcesHandler resourcesHandler )
{
this( conn, cleanup, null );
this( conn, resourcesHandler, null );
}

ExplicitTransaction( Connection conn, Runnable cleanup, String bookmark )
ExplicitTransaction( Connection conn, SessionResourcesHandler resourcesHandler, String bookmark )
{
this.conn = conn;
this.cleanup = cleanup;
this.resourcesHandler = resourcesHandler;
runBeginStatement( conn, bookmark );
}

Expand Down Expand Up @@ -139,7 +139,7 @@ else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
}
finally
{
cleanup.run();
resourcesHandler.onTransactionClosed( this );
}
}

Expand Down Expand Up @@ -185,13 +185,14 @@ public synchronized StatementResult run( Statement statement )

try
{
InternalStatementResult cursor = new InternalStatementResult( conn, this, statement );
InternalStatementResult result =
new InternalStatementResult( conn, SessionResourcesHandler.NO_OP, this, statement );
conn.run( statement.text(),
statement.parameters().asMap( ofValue() ),
cursor.runResponseCollector() );
conn.pullAll( cursor.pullAllResponseCollector() );
result.runResponseCollector() );
conn.pullAll( result.pullAllResponseCollector() );
conn.flush();
return cursor;
return result;
}
catch ( Neo4jException e )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Session;

abstract class BaseDriver implements Driver
import static java.lang.String.format;

public class InternalDriver implements Driver
{
private final static String DRIVER_LOG_NAME = "Driver";

private final SecurityPlan securityPlan;
protected final SessionFactory sessionFactory;
protected final Logger log;
private final SessionFactory sessionFactory;
private final Logger log;

private AtomicBoolean closed = new AtomicBoolean( false );

BaseDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
{
this.securityPlan = securityPlan;
this.sessionFactory = sessionFactory;
Expand All @@ -61,8 +63,8 @@ public final Session session()
public final Session session( AccessMode mode )
{
assertOpen();
Session session = newSessionWithMode( mode );
if( closed.get() )
Session session = sessionFactory.newInstance( mode );
if ( closed.get() )
{
// the driver is already closed and we either 1. obtain this session from the old session pool
// or 2. we obtain this session from a new session pool
Expand All @@ -77,15 +79,35 @@ public final Session session( AccessMode mode )
@Override
public final void close()
{
if ( closed.compareAndSet(false, true) )
if ( closed.compareAndSet( false, true ) )
{
closeResources();
}
}

protected abstract Session newSessionWithMode( AccessMode mode );
/**
* Get the underlying session factory.
* <p>
* <b>This method is only for testing</b>
*
* @return the session factory used by this driver.
*/
public final SessionFactory getSessionFactory()
{
return sessionFactory;
}

protected abstract void closeResources();
private void closeResources()
{
try
{
sessionFactory.close();
}
catch ( Exception ex )
{
log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex );
}
}

private void assertOpen()
{
Expand All @@ -95,7 +117,7 @@ private void assertOpen()
}
}

private IllegalStateException driverCloseException()
private static RuntimeException driverCloseException()
{
return new IllegalStateException( "This driver instance has already been closed" );
}
Expand Down
Loading