Skip to content

Commit 81ce721

Browse files
authored
SingleUploadThreshold config paramter is introduced. (#66)
The S3 client's **multipart_upload** boolean flag did not give users control over when to use single vs. multipart upload. Additionally, the BOSH HTTP client was using **CreateDefaultClient**, which doesn't reuse TCP connections, resulting in higher latency under concurrent upload/download workloads. Solution - Single upload threshold: Replaced the multipart_upload flag with a new **single_upload_threshold** config parameter (bytes). Files at or below this size use a single PutObject call; larger files use multipart upload. The value is capped at **5GB** to respect the AWS S3 hard limit. For GCS, which requires single put for all uploads but has no size limit, the threshold is automatically set to [math.MaxInt64] Keep-alive HTTP client: Replaced **CreateDefaultClient** with **CreateKeepAliveDefaultClient**. Benchmark tests showed this reduces response time for concurrent uploads/downloads by reusing existing TCP connections.
1 parent 94a6bc6 commit 81ce721

10 files changed

Lines changed: 222 additions & 71 deletions

File tree

s3/README.md

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,27 @@ The S3 client requires a JSON configuration file with the following structure:
1212

1313
``` json
1414
{
15-
"bucket_name": "<string> (required)",
16-
"credentials_source": "<string> [static|env_or_profile|none]",
17-
"access_key_id": "<string> (required if credentials_source = 'static')",
18-
"secret_access_key": "<string> (required if credentials_source = 'static')",
19-
"region": "<string> (optional - default: 'us-east-1')",
20-
"host": "<string> (optional)",
21-
"port": <int> (optional),
22-
"ssl_verify_peer": <bool> (optional - default: true),
23-
"use_ssl": <bool> (optional - default: true),
24-
"signature_version": "<string> (optional)",
25-
"server_side_encryption": "<string> (optional)",
26-
"sse_kms_key_id": "<string> (optional)",
27-
"multipart_upload": <bool> (optional - default: true),
28-
"download_concurrency": <int> (optional - default: 5),
29-
"download_part_size": <int64> (optional - default: 5242880), # 5 MB
30-
"upload_concurrency": <int> (optional - default: 5),
31-
"upload_part_size": <int64> (optional - default: 5242880) # 5 MB
32-
"multipart_copy_threshold": <int64> (optional - default: 5368709120) # default 5 GB
33-
"multipart_copy_part_size": <int64> (optional - default: 104857600) # default 100 MB - must be at least 5 MB
15+
"bucket_name": "<string> (required)",
16+
"credentials_source": "<string> [static|env_or_profile|none]",
17+
"access_key_id": "<string> (required if credentials_source = 'static')",
18+
"secret_access_key": "<string> (required if credentials_source = 'static')",
19+
"region": "<string> (optional - default: 'us-east-1')",
20+
"host": "<string> (optional)",
21+
"port": <int> (optional),
22+
"ssl_verify_peer": <bool> (optional - default: true),
23+
"use_ssl": <bool> (optional - default: true),
24+
"signature_version": "<string> (optional)",
25+
"server_side_encryption": "<string> (optional)",
26+
"sse_kms_key_id": "<string> (optional)",
27+
"download_concurrency": <int> (optional - default: 5),
28+
"download_part_size": <int64> (optional - default: 5242880), # 5 MB
29+
"upload_concurrency": <int> (optional - default: 5),
30+
"upload_part_size": <int64> (optional - default: 5242880), # 5 MB
31+
"multipart_copy_threshold": <int64> (optional - default: 5368709120), # 5 GB - files larger than this use multipart copy
32+
"multipart_copy_part_size": <int64> (optional - default: 104857600), # 100 MB - must be at least 5 MB
33+
"single_upload_threshold": <int64> (optional - default: 0) # bytes; files <= this use a single PutObject call, larger files use multipart upload. 0 means always use multipart. Max 5 GB for AWS S3. GCS ignores this and always uses single upload.
3434
}
3535
```
36-
> Note: **multipart_upload** is not supported by Google - it's automatically set to false by parsing the provided 'host'
3736

3837
**Usage examples:**
3938
```shell

s3/client/aws_s3_blobstore.go

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
)
2222

2323
var errorInvalidCredentialsSourceValue = errors.New("the client operates in read only mode. Change 'credentials_source' parameter value ")
24-
var oneTB = int64(1000 * 1024 * 1024 * 1024)
2524

2625
// Default settings for transfer concurrency and part size.
2726
// These values are chosen to align with typical AWS CLI and SDK defaults for efficient S3 uploads and downloads.
@@ -33,6 +32,7 @@ const (
3332
// AWS CopyObject limit is 5GB, use 100MB parts for multipart copy
3433
defaultMultipartCopyThreshold = int64(5 * 1024 * 1024 * 1024) // 5 GB
3534
defaultMultipartCopyPartSize = int64(100 * 1024 * 1024) // 100 MB
35+
maxRetries = 3
3636
)
3737

3838
// awsS3Client encapsulates AWS S3 blobstore interactions
@@ -84,17 +84,9 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error {
8484
u.Concurrency = cfg.UploadConcurrency
8585
}
8686

87-
// PartSize: if multipart uploads disabled, force a very large part to avoid multipart.
88-
// Otherwise, use configured upload part size if present, otherwise default.
89-
if !cfg.MultipartUpload {
90-
// disable multipart uploads by way of large PartSize configuration
91-
u.PartSize = oneTB
92-
} else {
93-
if cfg.UploadPartSize > 0 {
94-
u.PartSize = cfg.UploadPartSize
95-
} else {
96-
u.PartSize = defaultTransferPartSize
97-
}
87+
u.PartSize = defaultTransferPartSize
88+
if cfg.UploadPartSize > 0 {
89+
u.PartSize = cfg.UploadPartSize
9890
}
9991

10092
if cfg.ShouldDisableUploaderRequestChecksumCalculation() {
@@ -116,7 +108,6 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error {
116108
}
117109

118110
retry := 0
119-
maxRetries := 3
120111
for {
121112
putResult, err := uploader.Upload(context.TODO(), uploadInput) //nolint:staticcheck
122113
if err != nil {
@@ -136,6 +127,50 @@ func (b *awsS3Client) Put(src io.ReadSeeker, dest string) error {
136127
}
137128
}
138129

130+
// PutSinglePart uploads a blob using a single PutObject call (no multipart).
131+
// Use this for small files where multipart overhead is unnecessary.
132+
func (b *awsS3Client) PutSinglePart(src io.ReadSeeker, dest string) error {
133+
cfg := b.s3cliConfig
134+
if cfg.CredentialsSource == config.NoneCredentialsSource {
135+
return errorInvalidCredentialsSourceValue
136+
}
137+
138+
input := &s3.PutObjectInput{
139+
Body: src,
140+
Bucket: aws.String(cfg.BucketName),
141+
Key: b.key(dest),
142+
}
143+
if cfg.ServerSideEncryption != "" {
144+
input.ServerSideEncryption = types.ServerSideEncryption(cfg.ServerSideEncryption)
145+
}
146+
if cfg.SSEKMSKeyID != "" {
147+
input.SSEKMSKeyId = aws.String(cfg.SSEKMSKeyID)
148+
}
149+
150+
retry := 0
151+
for {
152+
// Seek back to the start on retries so the full body is re-sent
153+
if retry > 0 {
154+
if _, seekErr := src.Seek(0, io.SeekStart); seekErr != nil {
155+
return fmt.Errorf("failed to seek source for retry: %s", seekErr.Error())
156+
}
157+
}
158+
159+
_, err := b.s3Client.PutObject(context.TODO(), input)
160+
if err != nil {
161+
if retry == maxRetries {
162+
return fmt.Errorf("single part upload retry limit exceeded: %s", err.Error())
163+
}
164+
retry++
165+
time.Sleep(time.Second * time.Duration(retry))
166+
continue
167+
}
168+
169+
slog.Info("Successfully uploaded file (single part)", "key", dest)
170+
return nil
171+
}
172+
}
173+
139174
// Delete removes a blob - no error is returned if the object does not exist
140175
func (b *awsS3Client) Delete(dest string) error {
141176
if b.s3cliConfig.CredentialsSource == config.NoneCredentialsSource {

s3/client/client.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,16 @@ func (c *S3CompatibleClient) Put(src string, dest string) error {
4444
return err
4545
}
4646
defer sourceFile.Close() //nolint:errcheck
47+
48+
info, err := sourceFile.Stat()
49+
if err != nil {
50+
return err
51+
}
52+
size := info.Size()
53+
54+
if size <= c.s3cliConfig.SingleUploadThreshold {
55+
return c.awsS3BlobstoreClient.PutSinglePart(sourceFile, dest)
56+
}
4757
return c.awsS3BlobstoreClient.Put(sourceFile, dest)
4858
}
4959

s3/client/sdk.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func NewAwsS3ClientWithApiOptions(
3737
var httpClient *http.Client
3838

3939
if c.SSLVerifyPeer {
40-
httpClient = boshhttp.CreateDefaultClient(nil)
40+
httpClient = boshhttp.CreateKeepAliveDefaultClient(nil)
4141
} else {
4242
httpClient = boshhttp.CreateDefaultClientInsecureSkipVerify()
4343
}

s3/config/config.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"math"
89
"strings"
910
)
1011

@@ -23,7 +24,6 @@ type S3Cli struct {
2324
ServerSideEncryption string `json:"server_side_encryption"`
2425
SSEKMSKeyID string `json:"sse_kms_key_id"`
2526
AssumeRoleArn string `json:"assume_role_arn"`
26-
MultipartUpload bool `json:"multipart_upload"`
2727
HostStyle bool `json:"host_style"`
2828
SwiftAuthAccount string `json:"swift_auth_account"`
2929
SwiftTempURLKey string `json:"swift_temp_url_key"`
@@ -39,12 +39,21 @@ type S3Cli struct {
3939
UploadPartSize int64 `json:"upload_part_size"`
4040
MultipartCopyThreshold int64 `json:"multipart_copy_threshold"` // Default: 5GB - files larger than this use multipart copy
4141
MultipartCopyPartSize int64 `json:"multipart_copy_part_size"` // Default: 100MB - size of each part in multipart copy
42+
43+
// Files smaller than or equal to this size (in bytes) are uploaded using a single PutObject call.
44+
// Files exceeding this size use multipart upload. Omit or set to 0 to always use multipart upload.
45+
// Must not exceed 5GB (AWS S3 hard limit for PutObject, https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html).
46+
// For GCS, leave this unset (0); it will be automatically set to math.MaxInt64 since GCS requires single put for all uploads but has no size limit.
47+
SingleUploadThreshold int64 `json:"single_upload_threshold"`
4248
}
4349

4450
const (
4551
// multipartCopyMinPartSize is the AWS minimum part size for multipart operations.
4652
// Other providers may have different limits - users should consult their provider's documentation.
4753
multipartCopyMinPartSize = 5 * 1024 * 1024 // 5MB - AWS minimum part size
54+
55+
// singlePutMaxSize is the AWS S3 hard limit for a single PutObject call.
56+
singlePutMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB
4857
)
4958

5059
const defaultAWSRegion = "us-east-1" //nolint:unused
@@ -85,7 +94,6 @@ func NewFromReader(reader io.Reader) (S3Cli, error) {
8594
c := S3Cli{
8695
SSLVerifyPeer: true,
8796
UseSSL: true,
88-
MultipartUpload: true,
8997
RequestChecksumCalculationEnabled: true,
9098
ResponseChecksumCalculationEnabled: true,
9199
UploaderRequestChecksumCalculationEnabled: true,
@@ -101,6 +109,11 @@ func NewFromReader(reader io.Reader) (S3Cli, error) {
101109
return S3Cli{}, errors.New("bucket_name must be set")
102110
}
103111

112+
// Validate single put threshold
113+
if c.SingleUploadThreshold < 0 {
114+
return S3Cli{}, errors.New("single_upload_threshold must not be negative")
115+
}
116+
104117
// Validate numeric fields: disallow negative values (zero means "use defaults")
105118
if c.DownloadConcurrency < 0 || c.UploadConcurrency < 0 || c.DownloadPartSize < 0 || c.UploadPartSize < 0 {
106119
return S3Cli{}, errors.New("download/upload concurrency and part sizes must be non-negative")
@@ -158,6 +171,12 @@ func NewFromReader(reader io.Reader) (S3Cli, error) {
158171
c.configureDefault()
159172
}
160173

174+
// Validate SingleUploadThreshold against the 5GB AWS limit, but only for non-GCS providers.
175+
// GCS has no such limit, and configureGoogle() sets math.MaxInt64 internally.
176+
if !c.IsGoogle() && c.SingleUploadThreshold > singlePutMaxSize {
177+
return S3Cli{}, fmt.Errorf("single_upload_threshold must not exceed %d bytes (5GB - AWS S3 PutObject limit)", singlePutMaxSize)
178+
}
179+
161180
return c, nil
162181
}
163182

@@ -174,8 +193,6 @@ func Provider(host string) string {
174193
}
175194

176195
func (c *S3Cli) configureAWS() {
177-
c.MultipartUpload = true
178-
179196
if c.Region == "" {
180197
if region := AWSHostToRegion(c.Host); region != "" {
181198
c.Region = region
@@ -186,7 +203,6 @@ func (c *S3Cli) configureAWS() {
186203
}
187204

188205
func (c *S3Cli) configureAlicloud() {
189-
c.MultipartUpload = true
190206
c.HostStyle = true
191207

192208
c.Host = strings.Split(c.Host, ":")[0]
@@ -198,7 +214,9 @@ func (c *S3Cli) configureAlicloud() {
198214
}
199215

200216
func (c *S3Cli) configureGoogle() {
201-
c.MultipartUpload = false
217+
// GCS does not support multipart upload, so all files must be uploaded via a single PutObject call.
218+
// Unlike AWS S3, GCS has no 5GB hard limit on single uploads, so math.MaxInt64 is safe here.
219+
c.SingleUploadThreshold = math.MaxInt64
202220
c.RequestChecksumCalculationEnabled = false
203221
}
204222

s3/config/config_test.go

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -112,26 +112,6 @@ var _ = Describe("BlobstoreClient configuration", func() {
112112
})
113113
})
114114

115-
Context("when MultipartUpload have been set", func() {
116-
dummyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "some-host", "region": "some-region", "multipart_upload": false}`)
117-
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
118-
It("sets MultipartUpload to user-specified values", func() {
119-
c, err := config.NewFromReader(dummyJSONReader)
120-
Expect(err).ToNot(HaveOccurred())
121-
Expect(c.MultipartUpload).To(BeFalse())
122-
})
123-
})
124-
125-
Context("when MultipartUpload have not been set", func() {
126-
dummyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "some-host", "region": "some-region"}`)
127-
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
128-
It("default MultipartUpload to true", func() {
129-
c, err := config.NewFromReader(dummyJSONReader)
130-
Expect(err).ToNot(HaveOccurred())
131-
Expect(c.MultipartUpload).To(BeTrue())
132-
})
133-
})
134-
135115
Context("when HostStyle has been set", func() {
136116
dummyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "some-host", "region": "some-region", "host_style": true}`)
137117
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
@@ -633,16 +613,70 @@ var _ = Describe("BlobstoreClient configuration", func() {
633613
})
634614
})
635615

636-
Describe("checking the alibaba cloud MultipartUpload", func() {
637-
emptyJSONBytes := []byte(`{"access_key_id": "id", "secret_access_key": "key", "bucket_name": "some-bucket", "host": "oss-some-region.aliyuncs.com"}`)
638-
emptyJSONReader := bytes.NewReader(emptyJSONBytes)
616+
Describe("single_upload_threshold", func() {
617+
It("defaults to 0 when not set", func() {
618+
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket"}`)
619+
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
639620

640-
It("defaults to support multipart uploading", func() {
641-
c, err := config.NewFromReader(emptyJSONReader)
621+
c, err := config.NewFromReader(dummyJSONReader)
642622
Expect(err).ToNot(HaveOccurred())
643-
Expect(c.MultipartUpload).To(BeTrue())
623+
Expect(c.SingleUploadThreshold).To(Equal(int64(0)))
624+
})
625+
626+
It("accepts a valid positive value", func() {
627+
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":104857600}`)
628+
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
629+
630+
c, err := config.NewFromReader(dummyJSONReader)
631+
Expect(err).ToNot(HaveOccurred())
632+
Expect(c.SingleUploadThreshold).To(Equal(int64(104857600))) // 100MB
633+
})
634+
635+
It("accepts exactly 5GB (AWS maximum)", func() {
636+
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":5368709120}`)
637+
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
638+
639+
c, err := config.NewFromReader(dummyJSONReader)
640+
Expect(err).ToNot(HaveOccurred())
641+
Expect(c.SingleUploadThreshold).To(Equal(int64(5368709120)))
642+
})
643+
644+
It("rejects negative values", func() {
645+
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":-1}`)
646+
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
647+
648+
_, err := config.NewFromReader(dummyJSONReader)
649+
Expect(err).To(MatchError("single_upload_threshold must not be negative"))
650+
})
651+
652+
It("rejects values above 5GB for non-GCS providers", func() {
653+
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","single_upload_threshold":5368709121}`)
654+
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
655+
656+
_, err := config.NewFromReader(dummyJSONReader)
657+
Expect(err).To(MatchError(ContainSubstring("single_upload_threshold must not exceed")))
658+
})
659+
660+
It("allows values above 5GB for GCS (no hard limit)", func() {
661+
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","host":"storage.googleapis.com","single_upload_threshold":10737418240}`)
662+
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
663+
664+
c, err := config.NewFromReader(dummyJSONReader)
665+
Expect(err).ToNot(HaveOccurred())
666+
// configureGoogle() overrides to math.MaxInt64 regardless of user input
667+
Expect(c.SingleUploadThreshold).To(Equal(int64(9223372036854775807)))
668+
})
669+
670+
It("automatically sets threshold to MaxInt64 for GCS regardless of config", func() {
671+
dummyJSONBytes := []byte(`{"access_key_id":"id","secret_access_key":"key","bucket_name":"some-bucket","host":"storage.googleapis.com"}`)
672+
dummyJSONReader := bytes.NewReader(dummyJSONBytes)
673+
674+
c, err := config.NewFromReader(dummyJSONReader)
675+
Expect(err).ToNot(HaveOccurred())
676+
Expect(c.SingleUploadThreshold).To(Equal(int64(9223372036854775807)))
644677
})
645678
})
679+
646680
})
647681

648682
type explodingReader struct{}

0 commit comments

Comments
 (0)