Skip to content

Deprecate Session.(read|write)Transaction in favor of execute(Read|Write) methods #911

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import Result, { QueryResult, ResultObserver } from './result'
import ConnectionProvider from './connection-provider'
import Connection from './connection'
import Transaction from './transaction'
import ManagedTransaction from './transaction-managed'
import TransactionPromise from './transaction-promise'
import Session, { TransactionConfig } from './session'
import Driver, * as driver from './driver'
Expand Down Expand Up @@ -137,6 +138,7 @@ const forExport = {
Stats,
Result,
Transaction,
ManagedTransaction,
TransactionPromise,
Session,
Driver,
Expand Down Expand Up @@ -196,6 +198,7 @@ export {
ConnectionProvider,
Connection,
Transaction,
ManagedTransaction,
TransactionPromise,
Session,
Driver,
Expand Down
46 changes: 29 additions & 17 deletions packages/core/src/internal/transaction-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const DEFAULT_RETRY_DELAY_MULTIPLIER = 2.0
const DEFAULT_RETRY_DELAY_JITTER_FACTOR = 0.2

type TransactionCreator = () => TransactionPromise
type TransactionWork<T> = (tx: Transaction) => T | Promise<T>
type TransactionWork<T, Tx = Transaction> = (tx: Tx) => T | Promise<T>
type Resolve<T> = (value: T | PromiseLike<T>) => void
type Reject = (value: any) => void
type Timeout = ReturnType<typeof setTimeout>
Expand Down Expand Up @@ -68,16 +68,18 @@ export class TransactionExecutor {
this._verifyAfterConstruction()
}

execute<T>(
execute<T, Tx = Transaction>(
transactionCreator: TransactionCreator,
transactionWork: TransactionWork<T>
transactionWork: TransactionWork<T, Tx>,
transactionWrapper?: (tx: Transaction) => Tx
): Promise<T> {
return new Promise<T>((resolve, reject) => {
this._executeTransactionInsidePromise(
transactionCreator,
transactionWork,
resolve,
reject
reject,
transactionWrapper
)
}).catch(error => {
const retryStartTimeMs = Date.now()
Expand All @@ -87,7 +89,8 @@ export class TransactionExecutor {
transactionWork,
error,
retryStartTimeMs,
retryDelayMs
retryDelayMs,
transactionWrapper
)
})
}
Expand All @@ -98,12 +101,13 @@ export class TransactionExecutor {
this._inFlightTimeoutIds = []
}

_retryTransactionPromise<T>(
_retryTransactionPromise<T, Tx = Transaction>(
transactionCreator: TransactionCreator,
transactionWork: TransactionWork<T>,
transactionWork: TransactionWork<T, Tx>,
error: Error,
retryStartTime: number,
retryDelayMs: number
retryDelayMs: number,
transactionWrapper?: (tx: Transaction) => Tx
): Promise<T> {
const elapsedTimeMs = Date.now() - retryStartTime

Expand All @@ -122,7 +126,8 @@ export class TransactionExecutor {
transactionCreator,
transactionWork,
resolve,
reject
reject,
transactionWrapper
)
}, nextRetryTime)
// add newly created timeoutId to the list of all in-flight timeouts
Expand All @@ -134,16 +139,18 @@ export class TransactionExecutor {
transactionWork,
error,
retryStartTime,
nextRetryDelayMs
nextRetryDelayMs,
transactionWrapper
)
})
}

async _executeTransactionInsidePromise<T>(
async _executeTransactionInsidePromise<T, Tx = Transaction>(
transactionCreator: TransactionCreator,
transactionWork: TransactionWork<T>,
transactionWork: TransactionWork<T, Tx>,
resolve: Resolve<T>,
reject: Reject
reject: Reject,
transactionWrapper?: (tx: Transaction) => Tx,
): Promise<void> {
let tx: Transaction
try {
Expand All @@ -154,7 +161,12 @@ export class TransactionExecutor {
return
}

const resultPromise = this._safeExecuteTransactionWork(tx, transactionWork)
// The conversion from `tx` as `unknown` then to `Tx` is necessary
// because it is not possible to be sure that `Tx` is a subtype of `Transaction`
// in using static type checking.
const wrap = transactionWrapper || ((tx: Transaction) => tx as unknown as Tx)
const wrappedTx = wrap(tx)
const resultPromise = this._safeExecuteTransactionWork(wrappedTx, transactionWork)

resultPromise
.then(result =>
Expand All @@ -163,9 +175,9 @@ export class TransactionExecutor {
.catch(error => this._handleTransactionWorkFailure(error, tx, reject))
}

_safeExecuteTransactionWork<T>(
tx: Transaction,
transactionWork: TransactionWork<T>
_safeExecuteTransactionWork<T, Tx = Transaction>(
tx: Tx,
transactionWork: TransactionWork<T, Tx>
): Promise<T> {
try {
const result = transactionWork(tx)
Expand Down
69 changes: 69 additions & 0 deletions packages/core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import { Query, SessionMode } from './types'
import Connection from './connection'
import { NumberOrInteger } from './graph-types'
import TransactionPromise from './transaction-promise'
import ManagedTransaction from './transaction-managed'

type ConnectionConsumer = (connection: Connection | void) => any | undefined
type TransactionWork<T> = (tx: Transaction) => Promise<T> | T
type ManagedTransactionWork<T> = (tx: ManagedTransaction) => Promise<T> | T

interface TransactionConfig {
timeout?: NumberOrInteger
Expand Down Expand Up @@ -336,6 +338,8 @@ class Session {
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @deprecated This method will be removed in version 6.0. Please, use {@link Session#executeRead} instead.
*
* @param {function(tx: Transaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
Expand All @@ -358,6 +362,8 @@ class Session {
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @deprecated This method will be removed in version 6.0. Please, use {@link Session#executeWrite} instead.
*
* @param {function(tx: Transaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
Expand All @@ -383,6 +389,69 @@ class Session {
)
}

/**
* Execute given unit of work in a {@link READ} transaction.
*
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @param {function(tx: ManagedTransaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
* @return {Promise} Resolved promise as returned by the given function or rejected promise when given
* function or commit fails.
*/
executeRead<T>(
transactionWork: ManagedTransactionWork<T>,
transactionConfig?: TransactionConfig
): Promise<T> {
const config = new TxConfig(transactionConfig)
return this._executeInTransaction(ACCESS_MODE_READ, config, transactionWork)
}

/**
* Execute given unit of work in a {@link WRITE} transaction.
*
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @param {function(tx: ManagedTransaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
* @return {Promise} Resolved promise as returned by the given function or rejected promise when given
* function or commit fails.
*/
executeWrite<T>(
transactionWork: ManagedTransactionWork<T>,
transactionConfig?: TransactionConfig
): Promise<T> {
const config = new TxConfig(transactionConfig)
return this._executeInTransaction(ACCESS_MODE_WRITE, config, transactionWork)
}

/**
* @private
* @param {SessionMode} accessMode
* @param {TxConfig} transactionConfig
* @param {ManagedTransactionWork} transactionWork
* @returns {Promise}
*/
private _executeInTransaction<T>(
accessMode: SessionMode,
transactionConfig: TxConfig,
transactionWork: ManagedTransactionWork<T>
): Promise<T> {
return this._transactionExecutor.execute(
() => this._beginTransaction(accessMode, transactionConfig),
transactionWork,
ManagedTransaction.fromTransaction
)
}

/**
* Sets the resolved database name in the session context.
* @private
Expand Down
70 changes: 70 additions & 0 deletions packages/core/src/transaction-managed.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Result from './result'
import Transaction from './transaction'
import { Query } from './types'

interface Run {
(query: Query, parameters?: any): Result
}

/**
* Represents a transaction that is managed by the transaction executor.
*
* @public
*/
class ManagedTransaction {
private _run: Run

/**
* @private
*/
private constructor({ run }: { run: Run }) {
/**
* @private
*/
this._run = run
}

/**
* @private
* @param {Transaction} tx - Transaction to wrap
* @returns {ManagedTransaction} the ManagedTransaction
*/
static fromTransaction(tx: Transaction): ManagedTransaction {
return new ManagedTransaction({
run: tx.run.bind(tx)
})
}

/**
* Run Cypher query
* Could be called with a query object i.e.: `{text: "MATCH ...", parameters: {param: 1}}`
* or with the query and parameters as separate arguments.
* @param {mixed} query - Cypher query to execute
* @param {Object} parameters - Map with parameters to use in query
* @return {Result} New Result
*/
run(query: Query, parameters?: any): Result {
return this._run(query, parameters)
}
}

export default ManagedTransaction
2 changes: 2 additions & 0 deletions packages/core/src/transaction-promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class TransactionPromise extends Transaction implements Promise<Transaction>{

/**
* @access private
* @returns {void}
*/
private _onBeginError(error: Error): void {
this._beginError = error;
Expand All @@ -180,6 +181,7 @@ class TransactionPromise extends Transaction implements Promise<Transaction>{

/**
* @access private
* @returns {void}
*/
private _onBeginMetadata(metadata: any): void {
this._beginMetadata = metadata || {};
Expand Down
39 changes: 39 additions & 0 deletions packages/core/test/session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import { ConnectionProvider, Session, Connection, TransactionPromise, Transaction } from '../src'
import { bookmarks } from '../src/internal'
import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants'
import ManagedTransaction from '../src/transaction-managed'
import FakeConnection from './utils/connection.fake'

describe('session', () => {
Expand Down Expand Up @@ -279,6 +280,44 @@ describe('session', () => {
expect(tx).toBeDefined()
})
})

describe.each([
['.executeWrite()', (session: Session) => session.executeWrite.bind(session)],
['.executeRead()', (session: Session) => session.executeRead.bind(session)],
])('%s', (_, execute) => {
it('should call executor with ManagedTransaction', async () => {
const connection = mockBeginWithSuccess(newFakeConnection())
const session = newSessionWithConnection(connection, false, 1000)
const status = { functionCalled: false }

await execute(session)(async (tx: ManagedTransaction) => {
expect(typeof tx).toEqual('object')
expect(tx).toBeInstanceOf(ManagedTransaction)

status.functionCalled = true
})

expect(status.functionCalled).toEqual(true)
})

it('should proxy run to the begun transaction', async () => {
const connection = mockBeginWithSuccess(newFakeConnection())
const session = newSessionWithConnection(connection, false, FETCH_ALL)
// @ts-ignore
const run = jest.spyOn(Transaction.prototype, 'run').mockImplementation(() => Promise.resolve())
const status = { functionCalled: false }
const query = 'RETURN $a'
const params = { a: 1 }

await execute(session)(async (tx: ManagedTransaction) => {
status.functionCalled = true
await tx.run(query, params)
})

expect(status.functionCalled).toEqual(true)
expect(run).toHaveBeenCalledWith(query, params)
})
})
})

function mockBeginWithSuccess(connection: FakeConnection) {
Expand Down
Loading