-
-
Notifications
You must be signed in to change notification settings - Fork 601
Expand file tree
/
Copy pathmssql-schema.lisp
More file actions
184 lines (172 loc) · 8.49 KB
/
mssql-schema.lisp
File metadata and controls
184 lines (172 loc) · 8.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
;;;
;;; Tools to query the MS SQL Schema to reproduce in PostgreSQL
;;;
(in-package :pgloader.source.mssql)
(defclass copy-mssql (db-copy)
((encoding :accessor encoding ; allows forcing encoding
:initarg :encoding
:initform nil))
(:documentation "pgloader MS SQL Data Source"))
;;;
;;; Those functions are to be called from withing an already established
;;; MS SQL Connection.
;;;
;;; Tools to get MS SQL table and columns definitions and transform them to
;;; PostgreSQL CREATE TABLE statements, and run those.
;;;
(defvar *table-type* '((:table . "BASE TABLE")
(:view . "VIEW"))
"Associate internal table type symbol with what's found in MS SQL
information_schema.tables.table_type column.")
(defmethod filter-list-to-where-clause ((mssql copy-mssql)
filter-list
&key
not
(schema-col "table_schema")
(table-col "table_name"))
"Given an INCLUDING or EXCLUDING clause, turn it into a MS SQL WHERE clause."
(loop :for (schema . table-name-list) :in filter-list
:append (mapcar (lambda (table-name)
(format nil "(~a = '~a' and ~a ~:[~;NOT ~]LIKE '~a')"
schema-col schema table-col not table-name))
table-name-list)))
(defmethod fetch-columns ((catalog catalog)
(mssql copy-mssql)
&key
(table-type :table)
including
excluding
&aux
(table-type-name
(cdr (assoc table-type *table-type*))))
(loop
:with incl-where := (filter-list-to-where-clause
mssql including :not nil
:schema-col "c.table_schema"
:table-col "c.table_name")
:with excl-where := (filter-list-to-where-clause
mssql excluding :not t
:schema-col "c.table_schema"
:table-col "c.table_name")
:for (schema-name table-name name type default nullable identity
character-maximum-length
numeric-precision numeric-precision-radix numeric-scale
datetime-precision
character-set-name collation-name)
:in (mssql-query (sql "/mssql/list-all-columns.sql"
(db-name *mssql-db*)
table-type-name
incl-where ; do we print the clause?
incl-where
excl-where ; do we print the clause?
excl-where))
:do (let* ((schema (maybe-add-schema catalog schema-name))
(table (maybe-add-table schema table-name))
(field
(make-mssql-column
schema-name table-name name type default nullable
(eq 1 identity)
character-maximum-length
numeric-precision numeric-precision-radix numeric-scale
datetime-precision
character-set-name collation-name)))
(add-field table field))
:finally (return catalog)))
(defmethod fetch-indexes ((catalog catalog)
(mssql copy-mssql)
&key including excluding)
"Get the list of MSSQL index definitions per table."
(loop
:with incl-where := (filter-list-to-where-clause
mssql including :not nil
:schema-col "schema_name(schema_id)"
:table-col "o.name")
:with excl-where := (filter-list-to-where-clause
mssql excluding :not t
:schema-col "schema_name(schema_id)"
:table-col "o.name")
:for (schema-name table-name index-name colname unique pkey filter)
:in (mssql-query (sql "/mssql/list-all-indexes.sql"
incl-where ; do we print the clause?
incl-where
excl-where ; do we print the clause?
excl-where))
:do (let* ((schema (find-schema catalog schema-name))
(table (find-table schema table-name))
(pg-index (make-index :name index-name
:schema schema
:table table
:primary (= pkey 1)
:unique (= unique 1)
:columns nil
:filter filter))
(index
(when table
(maybe-add-index table index-name pg-index :key #'index-name))))
(unless table
(log-message :warning
"Failed to find table ~s in schema ~s for index ~s, skipping the index"
table-name schema-name index-name))
(when index
(add-column index colname)))
:finally (return catalog)))
(defmethod fetch-foreign-keys ((catalog catalog) (mssql copy-mssql)
&key including excluding)
"Get the list of MSSQL index definitions per table."
(loop
:with incl-where := (filter-list-to-where-clause
mssql including :not nil
:schema-col "KCU1.table_schema"
:table-col "KCU1.table_name")
:with excl-where := (filter-list-to-where-clause
mssql excluding :not t
:schema-col "KCU1.table_schema"
:table-col "KCU1.table_name")
:for (fkey-name schema-name table-name col
fschema-name ftable-name fcol
fk-update-rule fk-delete-rule)
:in (mssql-query (sql "/mssql/list-all-fkeys.sql"
(db-name *mssql-db*) (db-name *mssql-db*)
incl-where ; do we print the clause?
incl-where
excl-where ; do we print the clause?
excl-where))
:do (let* ((schema (find-schema catalog schema-name))
(table (find-table schema table-name))
(fschema (find-schema catalog fschema-name))
(ftable (find-table fschema ftable-name))
(col-name (apply-identifier-case col))
(fcol-name (apply-identifier-case fcol))
(pg-fkey
(make-fkey :name (apply-identifier-case fkey-name)
:table table
:columns nil
:foreign-table ftable
:foreign-columns nil
:update-rule fk-update-rule
:delete-rule fk-delete-rule))
(fkey
(maybe-add-fkey table fkey-name pg-fkey :key #'fkey-name)))
(push-to-end col-name (fkey-columns fkey))
(push-to-end fcol-name (fkey-foreign-columns fkey)))
:finally (return catalog)))
;;;
;;; Tools to handle row queries.
;;;
(defmethod get-column-sql-expression ((mssql copy-mssql) name type)
"Return per-TYPE SQL expression to use given a column NAME.
Mostly we just use the name, and make try to avoid parsing dates."
(case (intern (string-upcase type) "KEYWORD")
(:time (format nil "convert(varchar(30), [~a], 114)" name))
(:datetime (format nil "convert(varchar(30), [~a], 126)" name))
(:datetime2 (format nil "convert(varchar(30), [~a], 126)" name))
(:datetimeoffset (format nil "convert(varchar(35), [~a], 127)" name))
(:smalldatetime (format nil "convert(varchar(30), [~a], 126)" name))
(:date (format nil "convert(varchar(30), [~a], 126)" name))
(:bigint (format nil "cast([~a] as numeric(20))" name))
(t (format nil "[~a]" name))))
(defmethod get-column-list ((mssql copy-mssql))
"Tweak how we fetch the column values to avoid parsing when possible."
(loop :for field :in (fields mssql)
:collect (with-slots (name type) field
(get-column-sql-expression mssql name type))))