Skip to content

enhancement: add context management support to storages #1569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e5059da
arangodb: add support for context management
efectn Feb 9, 2025
58bbe46
clickhouse: add support for context management
efectn Feb 9, 2025
9d7368c
dynamodb: add support for context management
efectn Feb 9, 2025
a01135d
etcd: add support for context management
efectn Feb 9, 2025
134e774
minio: add support for context management
efectn Feb 9, 2025
c1e526d
mongodb: add support for context management
efectn Feb 9, 2025
17ab512
mssql: add support for context management
efectn Feb 9, 2025
3242f93
mysql: add support for context management
efectn Feb 9, 2025
a422c89
nats: add support for context management
efectn Feb 10, 2025
a5ea1ad
postgres: add support for context management
efectn Feb 10, 2025
245dce0
redis: add support for context management
efectn Feb 10, 2025
d6fb931
rueidis: add support for context management
efectn Feb 10, 2025
486941c
s3: add support for context management
efectn Feb 10, 2025
ce31f76
scylladb: add support for context management
efectn Feb 10, 2025
75f5216
sqlite3: add support for context management
efectn Feb 10, 2025
1b443d0
valkey: add support for context management
efectn Feb 11, 2025
2192ac6
azureblob: add support for context management
efectn Feb 11, 2025
f03ca4e
cloudflarekv: add support for context management
efectn Feb 11, 2025
634695c
cloudflarekv: make context functional for test_module
efectn Feb 11, 2025
58388d3
couchbase: add support for context management
efectn Mar 4, 2025
d7bbc22
chore: use testcontainers properly in nats
mdelapenya Mar 28, 2025
f3bdd25
chore: use testcontainers properly in dynamodb
mdelapenya Mar 28, 2025
d1893c2
chore: use testcontainers properly in mysql
mdelapenya Mar 28, 2025
371611b
chore: use testcontainers properly in postgres
mdelapenya Mar 28, 2025
53b2205
chore: use testcontainers properly in mongodb
mdelapenya Mar 28, 2025
0a9e0ef
neo4j: add support for context management
efectn Apr 22, 2025
8dcdba4
coherence: add support for context management
efectn Apr 22, 2025
ba32290
fix tests
efectn Apr 22, 2025
0f13ff4
Merge branch 'main' into add-withcontext
mdelapenya May 23, 2025
74b3bd3
chore(minio): defer closing the store in test
mdelapenya May 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions arangodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ A ArangoDB storage driver using `arangodb/go-driver` and [arangodb/go-driver](ht
### Signatures
```go
func New(config ...Config) Storage
func (s *Storage) GetWithContext(ctx context.Context, key string) ([]byte, error)
func (s *Storage) Get(key string) ([]byte, error)
func (s *Storage) SetWithContext(ctx context.Context, key string, val []byte, exp time.Duration) error
func (s *Storage) Set(key string, val []byte, exp time.Duration) error
func (s *Storage) DeleteWithContext(ctx context.Context, key string) error
func (s *Storage) Delete(key string) error
func (s *Storage) ResetWithContext(ctx context.Context) error
func (s *Storage) Reset() error
func (s *Storage) Close() error
func (s *Storage) Conn() driver.Client
Expand Down
39 changes: 28 additions & 11 deletions arangodb/arangodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,12 @@ func New(config ...Config) *Storage {
return store
}

// Get value by key
func (s *Storage) Get(key string) ([]byte, error) {
// GetWithContext value by key with given context
func (s *Storage) GetWithContext(ctx context.Context, key string) ([]byte, error) {
if len(key) <= 0 {
return nil, nil
}

ctx := context.Background()

// Check if the document exists
// to avoid errors later
exists, err := s.collection.DocumentExists(ctx, key)
Expand Down Expand Up @@ -151,8 +149,13 @@ func (s *Storage) Get(key string) ([]byte, error) {
return utils.UnsafeBytes(model.Val), nil
}

// Set key with value
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
// Get value by key
func (s *Storage) Get(key string) ([]byte, error) {
return s.GetWithContext(context.Background(), key)
}

// SetWithContext key with value with given context
func (s *Storage) SetWithContext(ctx context.Context, key string, val []byte, exp time.Duration) error {
// Ain't Nobody Got Time For That
if len(key) <= 0 || len(val) <= 0 {
return nil
Expand All @@ -169,7 +172,6 @@ func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
Val: valStr,
Exp: expireAt,
}
ctx := context.Background()

// Arango does not support documents with the same key
// So we need to check if the document exists
Expand All @@ -188,20 +190,35 @@ func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
return err
}

// Delete value by key
func (s *Storage) Delete(key string) error {
// Set key with value
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
return s.SetWithContext(context.Background(), key, val, exp)
}

// DeleteWithContext value by key with given context
func (s *Storage) DeleteWithContext(ctx context.Context, key string) error {
// Ain't Nobody Got Time For That
if len(key) <= 0 {
return nil
}
_, err := s.collection.RemoveDocument(context.Background(), key)
_, err := s.collection.RemoveDocument(ctx, key)
return err
}

// Delete value by key
func (s *Storage) Delete(key string) error {
return s.DeleteWithContext(context.Background(), key)
}

// ResetWithContext all keys with given context
func (s *Storage) ResetWithContext(ctx context.Context) error {
return s.collection.Truncate(ctx)
}

// Reset all keys
// truncate the collection
func (s *Storage) Reset() error {
return s.collection.Truncate(context.Background())
return s.ResetWithContext(context.Background())
}

// Close the database
Expand Down
86 changes: 86 additions & 0 deletions arangodb/arangodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ func Test_ArangoDB_Set(t *testing.T) {
require.NoError(t, err)
}

func Test_ArangoDB_SetWithContext(t *testing.T) {
var (
key = "john"
val = []byte("doe")
)

testStore := newTestStore(t)
defer testStore.Close()

ctx, cancel := context.WithCancel(context.Background())
cancel()

err := testStore.SetWithContext(ctx, key, val, 0)
require.ErrorIs(t, err, context.Canceled)
}

func Test_ArangoDB_Upsert(t *testing.T) {
var (
key = "john"
Expand Down Expand Up @@ -100,6 +116,26 @@ func Test_ArangoDB_Get(t *testing.T) {
require.Equal(t, val, result)
}

func Test_ArangoDB_GetWithContext(t *testing.T) {
var (
key = "john"
val = []byte("doe")
)

testStore := newTestStore(t)
defer testStore.Close()

err := testStore.Set(key, val, 0)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

result, err := testStore.GetWithContext(ctx, key)
require.ErrorIs(t, err, context.Canceled)
require.Zero(t, len(result))
}

func Test_ArangoDB_Set_Expiration(t *testing.T) {
var (
key = "john"
Expand Down Expand Up @@ -156,6 +192,29 @@ func Test_ArangoDB_Delete(t *testing.T) {
require.Zero(t, len(result))
}

func Test_ArangoDB_DeleteWithContext(t *testing.T) {
var (
key = "john"
val = []byte("doe")
)

testStore := newTestStore(t)
defer testStore.Close()

err := testStore.Set(key, val, 0)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

err = testStore.DeleteWithContext(ctx, key)
require.ErrorIs(t, err, context.Canceled)

result, err := testStore.Get(key)
require.NoError(t, err)
require.Equal(t, val, result)
}

func Test_ArangoDB_Reset(t *testing.T) {
val := []byte("doe")

Expand All @@ -180,6 +239,33 @@ func Test_ArangoDB_Reset(t *testing.T) {
require.Zero(t, len(result))
}

func Test_ArangoDB_ResetWithContext(t *testing.T) {
val := []byte("doe")

testStore := newTestStore(t)
defer testStore.Close()

err := testStore.Set("john1", val, 0)
require.NoError(t, err)

err = testStore.Set("john2", val, 0)
require.Equal(t, err, nil)

ctx, cancel := context.WithCancel(context.Background())
cancel()

err = testStore.ResetWithContext(ctx)
require.ErrorIs(t, err, context.Canceled)

result, err := testStore.Get("john1")
require.NoError(t, err)
require.Equal(t, val, result)

result, err = testStore.Get("john2")
require.NoError(t, err)
require.Equal(t, val, result)
}

func Test_ArangoDB_Non_UTF8(t *testing.T) {
val := []byte("0xF5")

Expand Down
65 changes: 45 additions & 20 deletions azureblob/azureblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,12 @@ func New(config ...Config) *Storage {
return storage
}

// Get value by key
func (s *Storage) Get(key string) ([]byte, error) {
// GetWithContext gets value by key
func (s *Storage) GetWithContext(ctx context.Context, key string) ([]byte, error) {
if len(key) <= 0 {
return nil, nil
}
ctx, cancel := s.requestContext()
defer cancel()

resp, err := s.client.DownloadStream(ctx, s.container, key, nil)
if err != nil {
return []byte{}, err
Expand All @@ -63,55 +62,81 @@ func (s *Storage) Get(key string) ([]byte, error) {
return data, err
}

// Set key with value
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
// Get gets value by key
func (s *Storage) Get(key string) ([]byte, error) {
ctx, cancel := s.requestContext()
defer cancel()

return s.GetWithContext(ctx, key)
}

// SetWithContext sets key with value
func (s *Storage) SetWithContext(ctx context.Context, key string, val []byte, exp time.Duration) error {
if len(key) <= 0 {
return nil
}
ctx, cancel := s.requestContext()
defer cancel()

_, err := s.client.UploadBuffer(ctx, s.container, key, val, nil)
return err
}

// Delete entry by key
func (s *Storage) Delete(key string) error {
// Set sets key with value
func (s *Storage) Set(key string, val []byte, exp time.Duration) error {
ctx, cancel := s.requestContext()
defer cancel()

return s.SetWithContext(ctx, key, val, exp)
}

// DeleteWithContext deletes entry by key
func (s *Storage) DeleteWithContext(ctx context.Context, key string) error {
if len(key) <= 0 {
return nil
}

ctx, cancel := s.requestContext()
defer cancel()
_, err := s.client.DeleteBlob(ctx, s.container, key, nil)
return err
}

// Reset all entries
func (s *Storage) Reset() error {
// Delete deletes entry by key
func (s *Storage) Delete(key string) error {
ctx, cancel := s.requestContext()
defer cancel()

return s.DeleteWithContext(ctx, key)
}

// ResetWithContext resets all entries
func (s *Storage) ResetWithContext(ctx context.Context) error {
//_, err := s.client.DeleteContainer(ctx, s.container, nil)
//return err
pager := s.client.NewListBlobsFlatPager(s.container, nil)
errCounter := 0

for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
errCounter = errCounter + 1
return err
}

for _, v := range resp.Segment.BlobItems {
_, err = s.client.DeleteBlob(ctx, s.container, *v.Name, nil)
if err != nil {
errCounter = errCounter + 1
return err
}
}
}
if errCounter > 0 {
return fmt.Errorf("%d errors occured while resetting", errCounter)
}

return nil
}
Comment on lines +109 to 130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Review error handling change in ResetWithContext.

The error handling in ResetWithContext has been changed to return immediately on any error (lines 118 and 124). Previously, it would continue trying to delete other blobs even if some deletions failed. This is a behavior change that could impact users who expect the method to delete as many blobs as possible even if some operations fail.

Also, there's a commented-out implementation (lines 111-112) that might be more efficient. Consider adding a comment explaining why the current approach was chosen over deleting the container.

// ResetWithContext resets all entries
func (s *Storage) ResetWithContext(ctx context.Context) error {
-	//_, err := s.client.DeleteContainer(ctx, s.container, nil)
-	//return err
+	// Note: We're not using DeleteContainer and recreating it because...
+	// [Add explanation here for why individual blob deletion is preferred]

	pager := s.client.NewListBlobsFlatPager(s.container, nil)

	for pager.More() {
		resp, err := pager.NextPage(ctx)

		if err != nil {
			return err
		}

		for _, v := range resp.Segment.BlobItems {
			_, err = s.client.DeleteBlob(ctx, s.container, *v.Name, nil)
			if err != nil {
				return err
			}
		}
	}

	return nil
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// ResetWithContext resets all entries
func (s *Storage) ResetWithContext(ctx context.Context) error {
//_, err := s.client.DeleteContainer(ctx, s.container, nil)
//return err
pager := s.client.NewListBlobsFlatPager(s.container, nil)
errCounter := 0
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
errCounter = errCounter + 1
return err
}
for _, v := range resp.Segment.BlobItems {
_, err = s.client.DeleteBlob(ctx, s.container, *v.Name, nil)
if err != nil {
errCounter = errCounter + 1
return err
}
}
}
if errCounter > 0 {
return fmt.Errorf("%d errors occured while resetting", errCounter)
}
return nil
}
// ResetWithContext resets all entries
func (s *Storage) ResetWithContext(ctx context.Context) error {
// Note: We're not using DeleteContainer and recreating it because...
// [Add explanation here for why individual blob deletion is preferred]
pager := s.client.NewListBlobsFlatPager(s.container, nil)
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return err
}
for _, v := range resp.Segment.BlobItems {
_, err = s.client.DeleteBlob(ctx, s.container, *v.Name, nil)
if err != nil {
return err
}
}
}
return nil
}


// Reset resets all entries
func (s *Storage) Reset() error {
ctx, cancel := s.requestContext()
defer cancel()

return s.ResetWithContext(ctx)
}

// Conn returns storage client
func (s *Storage) Conn() *azblob.Client {
return s.client
Expand Down
Loading
Loading