Skip to content

Introduce the ability of awaiting for eager beginning a Transaction #870

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import Result, { QueryResult, ResultObserver } from './result'
import ConnectionProvider from './connection-provider'
import Connection from './connection'
import Transaction from './transaction'
import TransactionPromise from './transaction-promise'
import Session, { TransactionConfig } from './session'
import Driver, * as driver from './driver'
import auth from './auth'
Expand Down Expand Up @@ -134,6 +135,7 @@ const forExport = {
Stats,
Result,
Transaction,
TransactionPromise,
Session,
Driver,
Connection,
Expand Down Expand Up @@ -191,6 +193,7 @@ export {
ConnectionProvider,
Connection,
Transaction,
TransactionPromise,
Session,
Driver,
types,
Expand Down
9 changes: 5 additions & 4 deletions packages/core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import ConnectionProvider from './connection-provider'
import { Query, SessionMode } from './types'
import Connection from './connection'
import { NumberOrInteger } from './graph-types'
import TransactionPromise from './transaction-promise'

type ConnectionConsumer = (connection: Connection | void) => any | undefined
type TransactionWork<T> = (tx: Transaction) => Promise<T> | T
Expand Down Expand Up @@ -239,9 +240,9 @@ class Session {
* While a transaction is open the session cannot be used to run queries outside the transaction.
*
* @param {TransactionConfig} [transactionConfig] - Configuration for the new auto-commit transaction.
* @returns {Transaction} New Transaction.
* @returns {TransactionPromise} New Transaction.
*/
beginTransaction(transactionConfig?: TransactionConfig): Transaction {
beginTransaction(transactionConfig?: TransactionConfig): TransactionPromise {
// this function needs to support bookmarks parameter for backwards compatibility
// parameter was of type {string|string[]} and represented either a single or multiple bookmarks
// that's why we need to check parameter type and decide how to interpret the value
Expand All @@ -255,7 +256,7 @@ class Session {
return this._beginTransaction(this._mode, txConfig)
}

_beginTransaction(accessMode: SessionMode, txConfig: TxConfig): Transaction {
_beginTransaction(accessMode: SessionMode, txConfig: TxConfig): TransactionPromise {
if (!this._open) {
throw newError('Cannot begin a transaction on a closed session.')
}
Expand All @@ -271,7 +272,7 @@ class Session {
connectionHolder.initializeConnection()
this._hasTx = true

const tx = new Transaction({
const tx = new TransactionPromise({
connectionHolder,
impersonatedUser: this._impersonatedUser,
onClose: this._transactionClosed.bind(this),
Expand Down
193 changes: 193 additions & 0 deletions packages/core/src/transaction-promise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Transaction from "./transaction"
import {
ConnectionHolder
} from './internal/connection-holder'

import { Bookmarks } from './internal/bookmarks'
import { TxConfig } from "./internal/tx-config";

/**
* Represents a {@link Promise<Transaction>} object and a {@link Transaction} object.
*
* Resolving this object promise verifies the result of the transaction begin and returns the {@link Transaction} object in case of success.
*
* The object can still also used as {@link Transaction} for convenience. The result of begin will be checked
* during the next API calls in the object as it is in the transaction.
*
* @access public
*/
class TransactionPromise extends Transaction implements Promise<Transaction>{
[Symbol.toStringTag]: string = "TransactionPromise"
private _beginError?: Error;
private _beginMetadata?: any;
private _beginPromise?: Promise<Transaction>;
private _reject?: (error: Error) => void;
private _resolve?: (value?: Transaction | PromiseLike<Transaction> | undefined) => void;

/**
* @constructor
* @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
* @param {function(bookmarks: Bookmarks)} onBookmarks callback invoked when new bookmark is produced.
* @param {function()} onConnection - Function to be called when a connection is obtained to ensure the connection
* is not yet released.
* @param {boolean} reactive whether this transaction generates reactive streams
* @param {number} fetchSize - the record fetch size in each pulling batch.
* @param {string} impersonatedUser - The name of the user which should be impersonated for the duration of the session.
*/
constructor({
connectionHolder,
onClose,
onBookmarks,
onConnection,
reactive,
fetchSize,
impersonatedUser,
highRecordWatermark,
lowRecordWatermark
}: {
connectionHolder: ConnectionHolder
onClose: () => void
onBookmarks: (bookmarks: Bookmarks) => void
onConnection: () => void
reactive: boolean
fetchSize: number
impersonatedUser?: string,
highRecordWatermark: number,
lowRecordWatermark: number
}) {
super({
connectionHolder,
onClose,
onBookmarks,
onConnection,
reactive,
fetchSize,
impersonatedUser,
highRecordWatermark,
lowRecordWatermark
})
}

/**
* Waits for the begin to complete.
*
* @param {function(transaction: Transaction)} onFulfilled - function to be called when finished.
* @param {function(error: {message:string, code:string})} onRejected - function to be called upon errors.
* @return {Promise} promise.
*/
then<TResult1 = Transaction, TResult2 = never>(
onfulfilled?:
((value: Transaction) => TResult1 | PromiseLike<TResult1>)
| null,
onrejected?:
((reason: any) => TResult2 | PromiseLike<TResult2>)
| null
): Promise<TResult1 | TResult2> {
return this._getOrCreateBeginPromise().then(onfulfilled, onrejected);
}

/**
* Catch errors when using promises.
*
* @param {function(error: Neo4jError)} onRejected - Function to be called upon errors.
* @return {Promise} promise.
*/
catch<TResult = never>(onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | null): Promise<any> {
return this._getOrCreateBeginPromise().catch(onrejected);
}

/**
* Called when finally the begin is done
*
* @param {function()|null} onfinally - function when the promise finished
* @return {Promise} promise.
*/
finally(onfinally?: (() => void) | null): Promise<Transaction> {
return this._getOrCreateBeginPromise().finally(onfinally);
}

private _getOrCreateBeginPromise(): Promise<Transaction> {
if (!this._beginPromise) {
this._beginPromise = new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
if (this._beginError) {
reject(this._beginError);
}
if (this._beginMetadata) {
resolve(this._toTransaction());
}
});
}
return this._beginPromise;
}

/**
* @access private
*/
private _toTransaction(): Transaction {
//@ts-ignore
return {
...this,
run: super.run.bind(this),
commit: super.commit.bind(this),
rollback: super.rollback.bind(this),
close: super.close.bind(this),
isOpen: super.isOpen.bind(this),
_begin: this._begin.bind(this),
}
}

/**
* @access private
*/
_begin(bookmarks: string | Bookmarks | string[], txConfig: TxConfig): void {
return super._begin(bookmarks, txConfig, {
onError: this._onBeginError.bind(this),
onComplete: this._onBeginMetadata.bind(this)
});
}

/**
* @access private
*/
private _onBeginError(error: Error): void {
this._beginError = error;
if (this._reject) {
this._reject(error);
}
}

/**
* @access private
*/
private _onBeginMetadata(metadata: any): void {
this._beginMetadata = metadata || {};
if (this._resolve) {
this._resolve(this._toTransaction());
}
}

}

export default TransactionPromise
30 changes: 25 additions & 5 deletions packages/core/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ class Transaction {
* @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
* @param {function(bookmarks: Bookmarks)} onBookmarks callback invoked when new bookmark is produced.
* * @param {function()} onConnection - Function to be called when a connection is obtained to ensure the conneciton
* @param {function()} onConnection - Function to be called when a connection is obtained to ensure the conneciton
* is not yet released.
* @param {boolean} reactive whether this transaction generates reactive streams
* @param {number} fetchSize - the record fetch size in each pulling batch.
* @param {string} impersonatedUser - The name of the user which should be impersonated for the duration of the session.
* @param {number} highRecordWatermark - The high watermark for the record buffer.
* @param {number} lowRecordWatermark - The low watermark for the record buffer.
*/
constructor({
connectionHolder,
Expand Down Expand Up @@ -109,7 +111,10 @@ class Transaction {
* @param {TxConfig} txConfig
* @returns {void}
*/
_begin(bookmarks: Bookmarks | string | string[], txConfig: TxConfig): void {
_begin(bookmarks: Bookmarks | string | string[], txConfig: TxConfig, events?: {
onError: (error: Error) => void
onComplete: (metadata: any) => void
}): void {
this._connectionHolder
.getConnection()
.then(connection => {
Expand All @@ -121,14 +126,29 @@ class Transaction {
mode: this._connectionHolder.mode(),
database: this._connectionHolder.database(),
impersonatedUser: this._impersonatedUser,
beforeError: this._onError,
afterComplete: this._onComplete
beforeError: (error: Error) => {
if (events) {
events.onError(error)
}
return this._onError(error).catch(() => {})
},
afterComplete: (metadata: any) => {
if (events) {
events.onComplete(metadata)
}
return this._onComplete(metadata)
}
})
} else {
throw newError('No connection available')
}
})
.catch(error => this._onError(error))
.catch(error => {
if (events) {
events.onError(error)
}
this._onError(error).catch(() => {})
})
}

/**
Expand Down
31 changes: 30 additions & 1 deletion packages/core/test/session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ConnectionProvider, Session, Connection } from '../src'
import { ConnectionProvider, Session, Connection, TransactionPromise, Transaction } from '../src'
import { bookmarks } from '../src/internal'
import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants'
import FakeConnection from './utils/connection.fake'
Expand Down Expand Up @@ -227,6 +227,35 @@ describe('session', () => {
expect(session.lastBookmarks()).toEqual(bookmarks.values())
})
})

describe('.beginTransaction()', () => {
it('should return a TransactionPromise', () => {
const session = newSessionWithConnection(newFakeConnection(), false, 1000)

const tx: Transaction = session.beginTransaction()

expect(tx).toBeInstanceOf(TransactionPromise)
})

it('should resolves a Transaction', async () => {
const connection = newFakeConnection()
const protocol = connection.protocol()
// @ts-ignore
connection.protocol = () => {
return {
...protocol,
beginTransaction: (params: { afterComplete: () => {} }) => {
params.afterComplete()
}
}
}
const session = newSessionWithConnection(connection, false, 1000)

const tx: Transaction = await session.beginTransaction()

expect(tx).toBeDefined()
})
})
})

function newSessionWithConnection(
Expand Down
Loading