Skip to content

Commit 12085e5

Browse files
authored
Merge pull request #164 from erizocosmico/feature/pushdown
gitquery: implement filter pushdown for git tables
2 parents 8bfff3d + 348b525 commit 12085e5

16 files changed

+1403
-65
lines changed

blobs.go

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,25 @@
11
package gitquery
22

33
import (
4+
"io"
5+
46
"gopkg.in/src-d/go-mysql-server.v0/sql"
57

8+
"gopkg.in/src-d/go-git.v4/plumbing"
69
"gopkg.in/src-d/go-git.v4/plumbing/object"
710
)
811

912
type blobsTable struct {
1013
pool *RepositoryPool
1114
}
1215

16+
var blobsSchema = sql.Schema{
17+
{Name: "hash", Type: sql.Text, Nullable: false, Source: blobsTableName},
18+
{Name: "size", Type: sql.Int64, Nullable: false, Source: blobsTableName},
19+
}
20+
21+
var _ sql.PushdownProjectionAndFiltersTable = (*blobsTable)(nil)
22+
1323
func newBlobsTable(pool *RepositoryPool) sql.Table {
1424
return &blobsTable{pool: pool}
1525
}
@@ -23,10 +33,7 @@ func (blobsTable) Name() string {
2333
}
2434

2535
func (blobsTable) Schema() sql.Schema {
26-
return sql.Schema{
27-
{Name: "hash", Type: sql.Text, Nullable: false, Source: blobsTableName},
28-
{Name: "size", Type: sql.Int64, Nullable: false, Source: blobsTableName},
29-
}
36+
return blobsSchema
3037
}
3138

3239
func (r *blobsTable) TransformUp(f func(sql.Node) (sql.Node, error)) (sql.Node, error) {
@@ -38,7 +45,7 @@ func (r *blobsTable) TransformExpressionsUp(f func(sql.Expression) (sql.Expressi
3845
}
3946

4047
func (r blobsTable) RowIter(_ sql.Session) (sql.RowIter, error) {
41-
iter := &blobIter{}
48+
iter := new(blobIter)
4249

4350
repoIter, err := NewRowRepoIter(r.pool, iter)
4451
if err != nil {
@@ -49,7 +56,33 @@ func (r blobsTable) RowIter(_ sql.Session) (sql.RowIter, error) {
4956
}
5057

5158
func (blobsTable) Children() []sql.Node {
52-
return []sql.Node{}
59+
return nil
60+
}
61+
62+
func (blobsTable) HandledFilters(filters []sql.Expression) []sql.Expression {
63+
return handledFilters(blobsTableName, blobsSchema, filters)
64+
}
65+
66+
func (r *blobsTable) WithProjectAndFilters(
67+
session sql.Session,
68+
_, filters []sql.Expression,
69+
) (sql.RowIter, error) {
70+
return rowIterWithSelectors(
71+
session, r.pool, blobsSchema, blobsTableName, filters,
72+
[]string{"hash"},
73+
func(selectors selectors) (RowRepoIter, error) {
74+
if len(selectors["hash"]) == 0 {
75+
return new(blobIter), nil
76+
}
77+
78+
hashes, err := selectors.textValues("hash")
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
return &blobsByHashIter{hashes: hashes}, nil
84+
},
85+
)
5386
}
5487

5588
type blobIter struct {
@@ -82,6 +115,41 @@ func (i *blobIter) Close() error {
82115
return nil
83116
}
84117

118+
type blobsByHashIter struct {
119+
repo *Repository
120+
pos int
121+
hashes []string
122+
}
123+
124+
func (i *blobsByHashIter) NewIterator(repo *Repository) (RowRepoIter, error) {
125+
return &blobsByHashIter{repo, 0, i.hashes}, nil
126+
}
127+
128+
func (i *blobsByHashIter) Next() (sql.Row, error) {
129+
for {
130+
if i.pos >= len(i.hashes) {
131+
return nil, io.EOF
132+
}
133+
134+
hash := plumbing.NewHash(i.hashes[i.pos])
135+
i.pos++
136+
blob, err := i.repo.Repo.BlobObject(hash)
137+
if err == plumbing.ErrObjectNotFound {
138+
continue
139+
}
140+
141+
if err != nil {
142+
return nil, err
143+
}
144+
145+
return blobToRow(blob), nil
146+
}
147+
}
148+
149+
func (i *blobsByHashIter) Close() error {
150+
return nil
151+
}
152+
85153
func blobToRow(c *object.Blob) sql.Row {
86154
return sql.NewRow(
87155
c.Hash.String(),

blobs_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/stretchr/testify/require"
88
"gopkg.in/src-d/go-mysql-server.v0/sql"
9+
"gopkg.in/src-d/go-mysql-server.v0/sql/expression"
910

1011
"gopkg.in/src-d/go-git-fixtures.v3"
1112
)
@@ -47,3 +48,50 @@ func TestBlobsTable_RowIter(t *testing.T) {
4748
require.Nil(err, "row %d doesn't conform to schema", idx)
4849
}
4950
}
51+
52+
func TestBlobsPushdown(t *testing.T) {
53+
require := require.New(t)
54+
session, _, cleanup := setup(t)
55+
defer cleanup()
56+
57+
table := newBlobsTable(session.Pool).(sql.PushdownProjectionAndFiltersTable)
58+
59+
iter, err := table.WithProjectAndFilters(session, nil, nil)
60+
require.NoError(err)
61+
62+
rows, err := sql.RowIterToRows(iter)
63+
require.NoError(err)
64+
require.Len(rows, 10)
65+
66+
iter, err = table.WithProjectAndFilters(session, nil, []sql.Expression{
67+
expression.NewEquals(
68+
expression.NewGetFieldWithTable(0, sql.Text, blobsTableName, "hash", false),
69+
expression.NewLiteral("32858aad3c383ed1ff0a0f9bdf231d54a00c9e88", sql.Text),
70+
),
71+
})
72+
require.NoError(err)
73+
74+
rows, err = sql.RowIterToRows(iter)
75+
require.NoError(err)
76+
require.Len(rows, 1)
77+
78+
iter, err = table.WithProjectAndFilters(session, nil, []sql.Expression{
79+
expression.NewLessThan(
80+
expression.NewGetFieldWithTable(1, sql.Int64, blobsTableName, "size", false),
81+
expression.NewLiteral(int64(10), sql.Int64),
82+
),
83+
})
84+
require.NoError(err)
85+
86+
iter, err = table.WithProjectAndFilters(session, nil, []sql.Expression{
87+
expression.NewEquals(
88+
expression.NewGetFieldWithTable(0, sql.Text, blobsTableName, "hash", false),
89+
expression.NewLiteral("not exists", sql.Text),
90+
),
91+
})
92+
require.NoError(err)
93+
94+
rows, err = sql.RowIterToRows(iter)
95+
require.NoError(err)
96+
require.Len(rows, 0)
97+
}

commits.go

Lines changed: 80 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,31 @@
11
package gitquery
22

33
import (
4+
"io"
5+
46
"gopkg.in/src-d/go-mysql-server.v0/sql"
57

8+
"gopkg.in/src-d/go-git.v4/plumbing"
69
"gopkg.in/src-d/go-git.v4/plumbing/object"
710
)
811

912
type commitsTable struct {
1013
pool *RepositoryPool
1114
}
1215

16+
var commitsSchema = sql.Schema{
17+
{Name: "hash", Type: sql.Text, Nullable: false, Source: commitsTableName},
18+
{Name: "author_name", Type: sql.Text, Nullable: false, Source: commitsTableName},
19+
{Name: "author_email", Type: sql.Text, Nullable: false, Source: commitsTableName},
20+
{Name: "author_when", Type: sql.Timestamp, Nullable: false, Source: commitsTableName},
21+
{Name: "committer_name", Type: sql.Text, Nullable: false, Source: commitsTableName},
22+
{Name: "committer_email", Type: sql.Text, Nullable: false, Source: commitsTableName},
23+
{Name: "committer_when", Type: sql.Timestamp, Nullable: false, Source: commitsTableName},
24+
{Name: "message", Type: sql.Text, Nullable: false, Source: commitsTableName},
25+
}
26+
27+
var _ sql.PushdownProjectionAndFiltersTable = (*commitsTable)(nil)
28+
1329
func newCommitsTable(pool *RepositoryPool) sql.Table {
1430
return &commitsTable{pool: pool}
1531
}
@@ -23,16 +39,7 @@ func (commitsTable) Name() string {
2339
}
2440

2541
func (commitsTable) Schema() sql.Schema {
26-
return sql.Schema{
27-
{Name: "hash", Type: sql.Text, Nullable: false, Source: commitsTableName},
28-
{Name: "author_name", Type: sql.Text, Nullable: false, Source: commitsTableName},
29-
{Name: "author_email", Type: sql.Text, Nullable: false, Source: commitsTableName},
30-
{Name: "author_when", Type: sql.Timestamp, Nullable: false, Source: commitsTableName},
31-
{Name: "committer_name", Type: sql.Text, Nullable: false, Source: commitsTableName},
32-
{Name: "committer_email", Type: sql.Text, Nullable: false, Source: commitsTableName},
33-
{Name: "committer_when", Type: sql.Timestamp, Nullable: false, Source: commitsTableName},
34-
{Name: "message", Type: sql.Text, Nullable: false, Source: commitsTableName},
35-
}
42+
return commitsSchema
3643
}
3744

3845
func (r *commitsTable) TransformUp(f func(sql.Node) (sql.Node, error)) (sql.Node, error) {
@@ -44,7 +51,7 @@ func (r *commitsTable) TransformExpressionsUp(f func(sql.Expression) (sql.Expres
4451
}
4552

4653
func (r commitsTable) RowIter(_ sql.Session) (sql.RowIter, error) {
47-
iter := &commitIter{}
54+
iter := new(commitIter)
4855

4956
repoIter, err := NewRowRepoIter(r.pool, iter)
5057
if err != nil {
@@ -55,7 +62,33 @@ func (r commitsTable) RowIter(_ sql.Session) (sql.RowIter, error) {
5562
}
5663

5764
func (commitsTable) Children() []sql.Node {
58-
return []sql.Node{}
65+
return nil
66+
}
67+
68+
func (commitsTable) HandledFilters(filters []sql.Expression) []sql.Expression {
69+
return handledFilters(commitsTableName, commitsSchema, filters)
70+
}
71+
72+
func (r *commitsTable) WithProjectAndFilters(
73+
session sql.Session,
74+
_, filters []sql.Expression,
75+
) (sql.RowIter, error) {
76+
return rowIterWithSelectors(
77+
session, r.pool, commitsSchema, commitsTableName, filters,
78+
[]string{"hash"},
79+
func(selectors selectors) (RowRepoIter, error) {
80+
if len(selectors["hash"]) == 0 {
81+
return new(commitIter), nil
82+
}
83+
84+
hashes, err := selectors.textValues("hash")
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
return &commitsByHashIter{hashes: hashes}, nil
90+
},
91+
)
5992
}
6093

6194
type commitIter struct {
@@ -88,6 +121,41 @@ func (i *commitIter) Close() error {
88121
return nil
89122
}
90123

124+
type commitsByHashIter struct {
125+
repo *Repository
126+
pos int
127+
hashes []string
128+
}
129+
130+
func (i *commitsByHashIter) NewIterator(repo *Repository) (RowRepoIter, error) {
131+
return &commitsByHashIter{repo, 0, i.hashes}, nil
132+
}
133+
134+
func (i *commitsByHashIter) Next() (sql.Row, error) {
135+
for {
136+
if i.pos >= len(i.hashes) {
137+
return nil, io.EOF
138+
}
139+
140+
hash := plumbing.NewHash(i.hashes[i.pos])
141+
i.pos++
142+
commit, err := i.repo.Repo.CommitObject(hash)
143+
if err == plumbing.ErrObjectNotFound {
144+
continue
145+
}
146+
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
return commitToRow(commit), nil
152+
}
153+
}
154+
155+
func (i *commitsByHashIter) Close() error {
156+
return nil
157+
}
158+
91159
func commitToRow(c *object.Commit) sql.Row {
92160
return sql.NewRow(
93161
c.Hash.String(),

commits_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/stretchr/testify/require"
88
"gopkg.in/src-d/go-mysql-server.v0/sql"
9+
"gopkg.in/src-d/go-mysql-server.v0/sql/expression"
910

1011
"gopkg.in/src-d/go-git-fixtures.v3"
1112
)
@@ -47,3 +48,70 @@ func TestCommitsTable_RowIter(t *testing.T) {
4748
require.Nil(err, "row %d doesn't conform to schema", idx)
4849
}
4950
}
51+
52+
func TestCommitsPushdown(t *testing.T) {
53+
require := require.New(t)
54+
session, _, cleanup := setup(t)
55+
defer cleanup()
56+
57+
table := newCommitsTable(session.Pool).(sql.PushdownProjectionAndFiltersTable)
58+
59+
iter, err := table.WithProjectAndFilters(session, nil, nil)
60+
require.NoError(err)
61+
62+
rows, err := sql.RowIterToRows(iter)
63+
require.NoError(err)
64+
require.Len(rows, 9)
65+
66+
iter, err = table.WithProjectAndFilters(session, nil, []sql.Expression{
67+
expression.NewEquals(
68+
expression.NewGetFieldWithTable(0, sql.Text, commitsTableName, "hash", false),
69+
expression.NewLiteral("918c48b83bd081e863dbe1b80f8998f058cd8294", sql.Text),
70+
),
71+
})
72+
require.NoError(err)
73+
74+
rows, err = sql.RowIterToRows(iter)
75+
require.NoError(err)
76+
require.Len(rows, 1)
77+
78+
iter, err = table.WithProjectAndFilters(session, nil, []sql.Expression{
79+
expression.NewEquals(
80+
expression.NewGetFieldWithTable(0, sql.Text, commitsTableName, "hash", false),
81+
expression.NewLiteral("not exists", sql.Text),
82+
),
83+
})
84+
require.NoError(err)
85+
86+
rows, err = sql.RowIterToRows(iter)
87+
require.NoError(err)
88+
require.Len(rows, 0)
89+
90+
iter, err = table.WithProjectAndFilters(session, nil, []sql.Expression{
91+
expression.NewEquals(
92+
expression.NewGetFieldWithTable(2, sql.Text, commitsTableName, "author_email", false),
93+
expression.NewLiteral("[email protected]", sql.Text),
94+
),
95+
})
96+
require.NoError(err)
97+
98+
rows, err = sql.RowIterToRows(iter)
99+
require.NoError(err)
100+
require.Len(rows, 8)
101+
102+
iter, err = table.WithProjectAndFilters(session, nil, []sql.Expression{
103+
expression.NewEquals(
104+
expression.NewGetFieldWithTable(2, sql.Text, commitsTableName, "author_email", false),
105+
expression.NewLiteral("[email protected]", sql.Text),
106+
),
107+
expression.NewEquals(
108+
expression.NewGetFieldWithTable(7, sql.Text, commitsTableName, "message", false),
109+
expression.NewLiteral("vendor stuff\n", sql.Text),
110+
),
111+
})
112+
require.NoError(err)
113+
114+
rows, err = sql.RowIterToRows(iter)
115+
require.NoError(err)
116+
require.Len(rows, 1)
117+
}

0 commit comments

Comments
 (0)