Skip to content

Commit 08bc672

Browse files
Merge pull request #89 from Scalingo/feature/storage
Add simple object storage package for s3 and swift
2 parents f09d2d5 + e16d54f commit 08bc672

File tree

10 files changed

+678
-4
lines changed

10 files changed

+678
-4
lines changed

Gopkg.lock

Lines changed: 51 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mocks.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"base_package": "github.com/Scalingo/go-utils",
3+
"mocks": [
4+
{
5+
"interface": "Backend",
6+
"src_package": "storage"
7+
}, {
8+
"interface": "Producer",
9+
"src_package": "nsqproducer"
10+
}
11+
]
12+
}

mocks_sig.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"github.com/Scalingo/go-utils/nsqproducer.Producer":"33 c7 9b 68 1a 5f 11 fc 4f cb 66 83 92 27 61 b9 c4 ec 87 7b","github.com/Scalingo/go-utils/storage.Backend":"NOFILE"}

nsqproducer/nsqproducermock/gomock_producer.go renamed to nsqproducer/nsqproducermock/producer_mock.go

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

storage/backend.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"io"
6+
)
7+
8+
// BackendMethod represents the name of a Method included in the Backend interface
9+
type BackendMethod string
10+
11+
const (
12+
GetMethod BackendMethod = "Get"
13+
UploadMethod BackendMethod = "Upload"
14+
SizeMethod BackendMethod = "Size"
15+
DeleteMethod BackendMethod = "Delete"
16+
)
17+
18+
// Backend represents something which is able to store files on an object
19+
// storage service
20+
type Backend interface {
21+
Get(ctx context.Context, path string) (io.ReadCloser, error)
22+
Upload(ctx context.Context, file io.Reader, path string) error
23+
Size(ctx context.Context, path string) (int64, error)
24+
Delete(ctx context.Context, path string) error
25+
}
26+
27+
var _ Backend = &S3{}
28+
var _ Backend = &Swift{}

storage/s3.go

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"io"
6+
"time"
7+
8+
"github.com/Scalingo/go-utils/logger"
9+
"github.com/aws/aws-sdk-go-v2/aws"
10+
"github.com/aws/aws-sdk-go-v2/aws/awserr"
11+
"github.com/aws/aws-sdk-go-v2/aws/defaults"
12+
"github.com/aws/aws-sdk-go-v2/aws/endpoints"
13+
"github.com/aws/aws-sdk-go-v2/service/s3"
14+
"github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
15+
"github.com/pkg/errors"
16+
)
17+
18+
const (
19+
NotFoundErrCode = "NotFound"
20+
)
21+
22+
type S3Client interface {
23+
GetObjectRequest(input *s3.GetObjectInput) s3.GetObjectRequest
24+
HeadObjectRequest(input *s3.HeadObjectInput) s3.HeadObjectRequest
25+
DeleteObjectRequest(input *s3.DeleteObjectInput) s3.DeleteObjectRequest
26+
}
27+
28+
type S3Config struct {
29+
AK string
30+
SK string
31+
Region string
32+
Endpoint string
33+
Bucket string
34+
}
35+
36+
type RetryPolicy struct {
37+
WaitDuration time.Duration
38+
Attempts int
39+
MethodHandlers map[BackendMethod][]string
40+
}
41+
42+
type S3 struct {
43+
cfg S3Config
44+
s3client S3Client
45+
s3uploader *s3manager.Uploader
46+
retryPolicy RetryPolicy
47+
}
48+
49+
type s3Opt func(s3 *S3)
50+
51+
// WithRetryPolicy is an option to constructor NewS3 to add a Retry Policy
52+
// impacting GET operations
53+
func WithRetryPolicy(policy RetryPolicy) s3Opt {
54+
return s3Opt(func(s3 *S3) {
55+
s3.retryPolicy = policy
56+
})
57+
}
58+
59+
func NewS3(cfg S3Config, opts ...s3Opt) *S3 {
60+
s3config := s3Config(cfg)
61+
s3 := &S3{
62+
cfg: cfg, s3client: s3.New(s3config), s3uploader: s3manager.NewUploader(s3config),
63+
retryPolicy: RetryPolicy{
64+
WaitDuration: time.Second,
65+
Attempts: 3,
66+
MethodHandlers: map[BackendMethod][]string{
67+
SizeMethod: []string{NotFoundErrCode},
68+
},
69+
},
70+
}
71+
for _, opt := range opts {
72+
opt(s3)
73+
}
74+
return s3
75+
}
76+
77+
func (s *S3) Get(ctx context.Context, path string) (io.ReadCloser, error) {
78+
log := logger.Get(ctx)
79+
log.WithField("path", path).Info("Get object")
80+
81+
input := &s3.GetObjectInput{
82+
Bucket: &s.cfg.Bucket,
83+
Key: &path,
84+
}
85+
out, err := s.s3client.GetObjectRequest(input).Send(ctx)
86+
if err != nil {
87+
return nil, errors.Wrapf(err, "fail to get object %v", path)
88+
}
89+
return out.Body, nil
90+
}
91+
92+
func (s *S3) Upload(ctx context.Context, file io.Reader, path string) error {
93+
input := &s3manager.UploadInput{
94+
Body: file,
95+
Bucket: &s.cfg.Bucket,
96+
Key: &path,
97+
}
98+
_, err := s.s3uploader.UploadWithContext(ctx, input)
99+
if err != nil {
100+
return errors.Wrapf(err, "fail to save file to %v", path)
101+
}
102+
103+
return nil
104+
}
105+
106+
// Size returns the size of the content of the object. A retry mecanism is
107+
// implemented because of the eventual consistency of S3 backends NotFound
108+
// error are sometimes returned when the object was just uploaded.
109+
func (s *S3) Size(ctx context.Context, path string) (int64, error) {
110+
var res int64
111+
err := s.retryWrapper(ctx, SizeMethod, func(ctx context.Context) error {
112+
log := logger.Get(ctx).WithField("key", path)
113+
log.Infof("[s3] Size()")
114+
115+
input := &s3.HeadObjectInput{Bucket: &s.cfg.Bucket, Key: &path}
116+
stat, err := s.s3client.HeadObjectRequest(input).Send(ctx)
117+
if err != nil {
118+
return err
119+
}
120+
res = *stat.ContentLength
121+
return nil
122+
})
123+
124+
if err != nil {
125+
return -1, errors.Wrapf(err, "fail to HEAD object '%v'", path)
126+
}
127+
return res, nil
128+
}
129+
130+
func (s *S3) Delete(ctx context.Context, path string) error {
131+
input := &s3.DeleteObjectInput{Bucket: &s.cfg.Bucket, Key: &path}
132+
req := s.s3client.DeleteObjectRequest(input)
133+
_, err := req.Send(ctx)
134+
if err != nil {
135+
return errors.Wrapf(err, "fail to delete object %v", path)
136+
}
137+
138+
return nil
139+
}
140+
141+
func (s *S3) retryWrapper(ctx context.Context, method BackendMethod, fun func(ctx context.Context) error) error {
142+
var err error
143+
144+
errorCodes := s.retryPolicy.MethodHandlers[method]
145+
// no-op is no retry policy on the method
146+
if errorCodes == nil {
147+
return fun(ctx)
148+
}
149+
for i := 0; i < s.retryPolicy.Attempts; i++ {
150+
log := logger.Get(ctx).WithField("attempt", i+1)
151+
ctx := logger.ToCtx(ctx, log)
152+
err = fun(ctx)
153+
if err == nil {
154+
return nil
155+
}
156+
if aerr, ok := err.(awserr.Error); ok {
157+
for _, code := range errorCodes {
158+
if aerr.Code() == code {
159+
time.Sleep(s.retryPolicy.WaitDuration)
160+
}
161+
}
162+
}
163+
}
164+
return err
165+
}
166+
167+
func s3Config(cfg S3Config) aws.Config {
168+
credentials := aws.NewStaticCredentialsProvider(cfg.AK, cfg.SK, "")
169+
config := aws.Config{
170+
Region: cfg.Region,
171+
Handlers: defaults.Handlers(),
172+
HTTPClient: defaults.HTTPClient(),
173+
Credentials: credentials,
174+
EndpointResolver: aws.ResolveWithEndpoint(aws.Endpoint{
175+
URL: "https://" + cfg.Endpoint,
176+
SigningRegion: cfg.Endpoint,
177+
}),
178+
}
179+
if cfg.Endpoint == "" {
180+
config.EndpointResolver = endpoints.NewDefaultResolver()
181+
}
182+
183+
return config
184+
}

0 commit comments

Comments
 (0)