Skip to content

Commit 7fa4d37

Browse files
committed
add support for optional progress updates channel
Signed-off-by: Avi Deitcher <[email protected]>
1 parent 5073458 commit 7fa4d37

File tree

2 files changed

+72
-2
lines changed

2 files changed

+72
-2
lines changed

copy.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,25 @@ type CopyGraphOptions struct {
112112
// source storage to fetch large blobs.
113113
// If FindSuccessors is nil, content.Successors will be used.
114114
FindSuccessors func(ctx context.Context, fetcher content.Fetcher, desc ocispec.Descriptor) ([]ocispec.Descriptor, error)
115+
116+
// UpdateChannel is an optional channel to receive progress updates.
117+
// Each update will include the number of bytes copied for a particular blob
118+
// or manifest, the expected total size, and the descriptor of the blob or
119+
// manifest. It is up to the consumer of the channel to differentiate
120+
// between updates among different blobs and manifests; no mechanism is
121+
// provided for distinguishing between them, other than the descriptor
122+
// passed with each update. The total size of downloads of all blobs and
123+
// manifests is not provided, as it is not known. You can calculate the
124+
// percentage downloaded for a particular blob in an individual update
125+
// based on the total size of that blob, which is provided in the
126+
// descriptor, and the number of bytes copied, which is provided in the
127+
// update.
128+
// Updates are sent each time a block is copied. The number of bytes copied
129+
// depends upon whatever calls the io.ReadCloser of the source Target.
130+
// This may be io.Copy, which, by default, is 32KB, or it may be some other
131+
// implementation.
132+
// The caller is responsible for closing the channel.
133+
UpdateChannel chan<- CopyUpdate
115134
}
116135

117136
// Copy copies a rooted directed acyclic graph (DAG) with the tagged root node
@@ -266,11 +285,17 @@ func copyGraph(ctx context.Context, src content.ReadOnlyStorage, dst content.Sto
266285
}
267286

268287
// doCopyNode copies a single content from the source CAS to the destination CAS.
269-
func doCopyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Storage, desc ocispec.Descriptor) error {
288+
func doCopyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Storage, desc ocispec.Descriptor, ch chan<- CopyUpdate) error {
270289
rc, err := src.Fetch(ctx, desc)
271290
if err != nil {
272291
return err
273292
}
293+
if ch != nil {
294+
rc = &progressReader{
295+
c: ch,
296+
r: rc,
297+
}
298+
}
274299
defer rc.Close()
275300
err = dst.Push(ctx, desc, rc)
276301
if err != nil && !errors.Is(err, errdef.ErrAlreadyExists) {
@@ -291,7 +316,7 @@ func copyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Stor
291316
}
292317
}
293318

294-
if err := doCopyNode(ctx, src, dst, desc); err != nil {
319+
if err := doCopyNode(ctx, src, dst, desc, opts.UpdateChannel); err != nil {
295320
return err
296321
}
297322

progress_reader.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
Copyright The ORAS Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
package oras
17+
18+
import (
19+
"io"
20+
21+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
22+
)
23+
24+
type CopyUpdate struct {
25+
Copied int64
26+
Descriptor ocispec.Descriptor
27+
}
28+
29+
type progressReader struct {
30+
desc ocispec.Descriptor
31+
r io.ReadCloser
32+
c chan<- CopyUpdate
33+
}
34+
35+
func (p *progressReader) Close() error {
36+
return p.r.Close()
37+
}
38+
39+
func (p *progressReader) Read(buf []byte) (int, error) {
40+
n, err := p.r.Read(buf)
41+
if n > 0 {
42+
p.c <- CopyUpdate{Copied: int64(n), Descriptor: p.desc}
43+
}
44+
return n, err
45+
}

0 commit comments

Comments
 (0)