|
19 | 19 | (uncaughtException [_ thread throwable]
|
20 | 20 | (println (.getMessage throwable)))))
|
21 | 21 |
|
22 |
| - |
23 | 22 | (defn read-edn [filepath]
|
24 | 23 | (-> filepath
|
25 | 24 | slurp
|
|
48 | 47 | (slack/import-emojis! (conn))
|
49 | 48 | (let [channel->db-id (q/channel-id-map (d/db (conn)))
|
50 | 49 | channels (mapv import/channel->tx (slack/channels))]
|
51 |
| - (d/transact (conn) (map (fn [{slack-id :channel/slack-id :as ch}] |
52 |
| - (if-let [db-id (channel->db-id slack-id)] |
53 |
| - (assoc ch :db/id db-id) |
54 |
| - ch)) |
55 |
| - channels)))) |
| 50 | + @(d/transact (conn) (map (fn [{slack-id :channel/slack-id :as ch}] |
| 51 | + (if-let [db-id (channel->db-id slack-id)] |
| 52 | + (assoc ch :db/id db-id) |
| 53 | + ch)) |
| 54 | + channels)))) |
56 | 55 |
|
57 | 56 | (defn build-indexes! []
|
58 | 57 | (q/build-indexes! (d/db (conn))))
|
|
100 | 99 | (run! load-log-file! (log-files (java.io.File. directory "logs")))
|
101 | 100 | (build-indexes!))
|
102 | 101 |
|
| 102 | +(defn files-from |
| 103 | + "Get a sequence of log files starting ata given date" |
| 104 | + [date] |
| 105 | + (->> (log-files "/home/arne/github/clojurians-log/logs") |
| 106 | + (drop-while #(not (clojure.string/starts-with? (.getName %) date))))) |
| 107 | + |
103 | 108 | (defn load-from
|
104 | 109 | "Load log files starting from a certain date (a string like \"2019-05-20\")"
|
105 | 110 | [date]
|
106 |
| - (->> (log-files) |
107 |
| - (drop-while #(not (clojure.string/starts-with? (.getName %) date))) |
| 111 | + (->> (files-from date) |
108 | 112 | (run! load-log-file!)))
|
109 | 113 |
|
| 114 | +(defn wrap-catch [f] |
| 115 | + (fn [& args] |
| 116 | + (try |
| 117 | + (apply f args) |
| 118 | + (catch Throwable t |
| 119 | + (let [ex-sym (gensym "ex-")] |
| 120 | + (intern *ns* ex-sym t) |
| 121 | + (println (str f " threw " (class t) " see " ex-sym))))))) |
| 122 | + |
110 | 123 | (def file->tx
|
111 | 124 | "Transducer which consumes files and produces transaction data"
|
112 | 125 | (comp (mapcat #(import/lines-reducible (io/reader %)))
|
|
116 | 129 | (println "Error decoding JSON: " %)
|
117 | 130 | (println e)
|
118 | 131 | nil)))
|
119 |
| - (filter #(= (:type %) "message")) |
120 |
| - (keep import/event->tx))) |
| 132 | + (keep (wrap-catch import/event->tx)))) |
121 | 133 |
|
122 | 134 | (def tx-thread-count
|
123 | 135 | "The number of threads to use for processing transactions"
|
|
135 | 147 | (mod-hash
|
136 | 148 | (if (vector? msg)
|
137 | 149 | (second (second msg))
|
138 |
| - (:message/key msg))))) |
| 150 | + (or (:message/key msg) |
| 151 | + (:message/key (:reaction/message msg))))))) |
139 | 152 |
|
140 | 153 | (defn load-files!
|
141 | 154 | "Bulk import a set of files (e.g. from (log-files)), uses multiple threads to speed things up"
|
142 | 155 | [files]
|
143 |
| - (let [tx-chs (into [] (repeatedly tx-thread-count |
144 |
| - #(async/chan 100 (import/partition-messages tx-size)))) |
145 |
| - file-ch (async/chan 100) |
146 |
| - pubsub-ch (async/chan 100) |
147 |
| - pubsub (async/pub pubsub-ch msg-topic) |
148 |
| - conn (conn) |
149 |
| - counter (volatile! 0) |
150 |
| - done? (promise)] |
| 156 | + (let [make-tx-chan #(async/chan 100 (import/partition-messages tx-size)) |
| 157 | + tx-chs (into [] (repeatedly tx-thread-count make-tx-chan)) |
| 158 | + file-ch (async/chan 100) |
| 159 | + pubsub-ch (async/chan 100) |
| 160 | + pubsub (async/pub pubsub-ch (wrap-catch msg-topic)) |
| 161 | + conn (conn) |
| 162 | + counter (volatile! 0) |
| 163 | + done? (promise)] |
151 | 164 |
|
152 | 165 | (doseq [tx-ch tx-chs]
|
153 | 166 | (async/thread
|
|
162 | 175 | (println e)))
|
163 | 176 | (recur (<!! tx-ch)))))))
|
164 | 177 |
|
165 |
| - (go-loop [[f & files] files] |
166 |
| - (>! file-ch f) |
167 |
| - (if (seq files) |
168 |
| - (recur files) |
169 |
| - (async/close! file-ch))) |
| 178 | + (async/onto-chan file-ch files) |
170 | 179 |
|
171 |
| - (doseq [i (range tx-thread-count)] |
172 |
| - (async/sub pubsub i (get tx-chs i))) |
| 180 | + (doseq [[i tx-ch] (map-indexed vector tx-chs)] |
| 181 | + (async/sub pubsub i tx-ch)) |
173 | 182 |
|
174 | 183 | (async/pipeline-blocking 10 pubsub-ch file->tx file-ch true)
|
175 | 184 |
|
|
189 | 198 | (use 'clojurians-log.repl)
|
190 | 199 | (in-ns 'clojurians-log.repl)
|
191 | 200 | (load-slack-data!)
|
| 201 | + |
192 | 202 | (def result (load-files! (log-files)))
|
193 |
| - result |
| 203 | + ;; or |
| 204 | + (def result (load-files! (files-from "2019-01-01"))) |
194 | 205 |
|
195 |
| - (load-files! [f]) |
| 206 | + ;; see progress |
| 207 | + (future |
| 208 | + (while (not (realized? (second result))) |
| 209 | + (println (java.util.Date.) "\t" @(first result)) |
| 210 | + (Thread/sleep 5000))) |
196 | 211 |
|
197 |
| - (def result (load-files! (drop 1508 (log-files)))) |
198 |
| - (def rrr (load-files! (filter #(and (.contains (str %) "2020-08") (.contains (str %) "backfill")) (log-files)))) |
| 212 | + ;; After importing, or you won't see data show up |
| 213 | + (build-indexes!) |
199 | 214 |
|
200 |
| - (while (not (realized? (second result))) |
201 |
| - (println (java.util.Date.) "\t" @(first result)) |
202 |
| - (Thread/sleep 5000)) |
| 215 | + (def result |
| 216 | + (load-files! |
| 217 | + (filter #(and (.contains (str %) "2020-08") |
| 218 | + (.contains (str %) "backfill")) |
| 219 | + (log-files)))) |
203 | 220 |
|
204 |
| - ;; old way (slower) |
205 |
| - (run! load-log-file! (log-files)) |
206 | 221 |
|
207 |
| - ;; incremental |
208 |
| - (load-from "2019-08-23") |
209 | 222 |
|
| 223 | + ;; old way, this does not use multi-thread core.async magic, and does not |
| 224 | + ;; batch transactions, it literally transacts each slack event separately. |
| 225 | + ;; Very slow but always works. |
| 226 | + (run! load-log-file! (log-files)) |
210 | 227 |
|
| 228 | + ;; Fetch and store slack data |
211 | 229 | (do
|
212 | 230 | (write-edn "users.edn" (map import/user->tx (slack/users)))
|
213 |
| - (write-edn "channels.edn" (map import/channel->tx (slack/channels)))) |
214 |
| - |
215 |
| - (time |
216 |
| - (do |
217 |
| - (time (clojurians-log.db.queries/channel-day-messages db "clojurescript" "2018-02-04")) |
218 |
| - (time (clojurians-log.db.queries/thread-messages db '("1517722327.000023" "1517722363.000043" "1517722613.000012" "1517724278.000043" "1517724340.000044" "1517724770.000024" "1517724836.000023" "1517725105.000054"))) |
219 |
| - (time (ffirst (clojurians-log.db.queries/channel db "clojurescript"))) |
220 |
| - (time (clojurians-log.db.queries/channel-list db "2018-02-04")) |
221 |
| - (time (clojurians-log.db.queries/user-names db #{"U2TUBBPNU"})) |
222 |
| - (time (clojurians-log.db.queries/channel-days db "clojurescript")) |
223 |
| - |
224 |
| - nil)) |
225 |
| - |
| 231 | + (write-edn "channels.edn" (map import/channel->tx (slack/channels))) |
| 232 | + (write-edn "emoji.edn" (map import/emoji->tx (slack/emoji)))) |
| 233 | + |
| 234 | + ;; Micro-benchmark some queries, good to check if anything is unreasonably |
| 235 | + ;; slow |
| 236 | + (let [db (db)] |
| 237 | + (time |
| 238 | + (do |
| 239 | + (time (clojurians-log.db.queries/channel-day-messages db "clojurescript" "2018-02-04")) |
| 240 | + (time (clojurians-log.db.queries/thread-messages db '("1517722327.000023" "1517722363.000043" "1517722613.000012" "1517724278.000043" "1517724340.000044" "1517724770.000024" "1517724836.000023" "1517725105.000054"))) |
| 241 | + (time (ffirst (clojurians-log.db.queries/channel db "clojurescript"))) |
| 242 | + (time (clojurians-log.db.queries/channel-list db "2018-02-04")) |
| 243 | + (time (clojurians-log.db.queries/user-names db #{"U2TUBBPNU"})) |
| 244 | + (time (clojurians-log.db.queries/channel-days db "clojurescript")) |
| 245 | + |
| 246 | + nil))) |
| 247 | + |
| 248 | + ;; Original |
226 | 249 | "Elapsed time: 18.166254 msecs"
|
227 |
| - "Elapsed time: 631.458841 msecs" |
| 250 | + "Elapsed time: 631.458841 msecs" ; -> we optimized this |
228 | 251 | "Elapsed time: 1.568807 msecs"
|
229 | 252 | "Elapsed time: 16.425878 msecs"
|
230 | 253 | "Elapsed time: 1.126005 msecs"
|
231 |
| - "Elapsed time: 1535.355001 msecs" |
232 |
| - "Elapsed time: 2205.20762 msecs" |
| 254 | + "Elapsed time: 1535.355001 msecs" ; -> and this |
| 255 | + "Elapsed time: 2205.20762 msecs" ; Total |
| 256 | + |
| 257 | + ;; Latest |
| 258 | + "Elapsed time: 31.38712 msecs" ; -> seems fetching channel-day-messages is |
| 259 | + ; slower now |
| 260 | + "Elapsed time: 0.844338 msecs" |
| 261 | + "Elapsed time: 1.582986 msecs" |
| 262 | + "Elapsed time: 0.22545 msecs" |
| 263 | + "Elapsed time: 1.120628 msecs" |
| 264 | + "Elapsed time: 0.00676 msecs" |
| 265 | + "Elapsed time: 37.954533 msecs" ; Total |
| 266 | + |
233 | 267 | )
|
0 commit comments