Skip to content

Commit ddf0bd9

Browse files
committed
complete homework 6
1 parent bfc048a commit ddf0bd9

File tree

3 files changed

+98
-2
lines changed

3 files changed

+98
-2
lines changed

hw06_pipeline_execution/.sync

Whitespace-only changes.

hw06_pipeline_execution/pipeline.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,41 @@ type (
88

99
type Stage func(in In) (out Out)
1010

11+
// transferBetweenCh перегнать данные из in в out и остановить процесс, если done.
12+
func transferBetweenCh(in In, out Bi, done In) {
13+
defer close(out)
14+
for {
15+
select {
16+
// приоритизация случая с ранним выходом
17+
case <-done:
18+
return
19+
default:
20+
}
21+
select {
22+
case <-done:
23+
return
24+
case v, ok := <-in:
25+
if !ok {
26+
return
27+
}
28+
out <- v
29+
}
30+
}
31+
}
32+
33+
// ExecutePipeline запускает конкурентный пайплайн.
1134
func ExecutePipeline(in In, done In, stages ...Stage) Out {
12-
// Place your code here.
13-
return nil
35+
out := in
36+
for _, stage := range stages {
37+
select {
38+
// приоритизация случая с ранним выходом
39+
case <-done:
40+
break
41+
default:
42+
}
43+
processed := make(Bi)
44+
go transferBetweenCh(stage(out), processed, done)
45+
out = processed
46+
}
47+
return out
1448
}

hw06_pipeline_execution/pipeline_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,66 @@ func TestPipeline(t *testing.T) {
9090
require.Len(t, result, 0)
9191
require.Less(t, int64(elapsed), int64(abortDur)+int64(fault))
9292
})
93+
94+
t.Run("empty IN channel", func(t *testing.T) {
95+
in := make(Bi)
96+
close(in)
97+
98+
result := make([]string, 0)
99+
start := time.Now()
100+
for s := range ExecutePipeline(in, nil, stages...) {
101+
result = append(result, s.(string))
102+
}
103+
elapsed := time.Since(start)
104+
105+
require.Empty(t, result)
106+
// nothing to process, must be done so quickly
107+
require.Less(t,
108+
int64(elapsed),
109+
int64(1*time.Millisecond))
110+
})
111+
}
112+
113+
func TestTransferBetweenCh(t *testing.T) {
114+
t.Run("transfer all data and close channel as writer", func(t *testing.T) {
115+
expected := []int{1, 2, 3, 4, 5}
116+
in := make(Bi)
117+
go func() {
118+
defer close(in)
119+
for _, v := range expected {
120+
in <- v
121+
}
122+
}()
123+
124+
out := make(Bi)
125+
go func() {
126+
transferBetweenCh(in, out, nil)
127+
}()
128+
129+
actual := make([]int, 0, len(expected))
130+
for v := range out {
131+
actual = append(actual, v.(int))
132+
}
133+
134+
require.Equal(t, expected, actual)
135+
})
136+
137+
t.Run("nothing to transfer", func(t *testing.T) {
138+
in := make(Bi)
139+
go func() {
140+
close(in)
141+
}()
142+
143+
out := make(Bi)
144+
go func() {
145+
transferBetweenCh(in, out, nil)
146+
}()
147+
148+
actual := make([]int, 0)
149+
for v := range out {
150+
actual = append(actual, v.(int))
151+
}
152+
153+
require.Empty(t, actual)
154+
})
93155
}

0 commit comments

Comments
 (0)