-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathcore.clj
More file actions
398 lines (362 loc) · 14.5 KB
/
core.clj
File metadata and controls
398 lines (362 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
(ns sqlite4clj.core
"High level interface for using sqlite4clj including connection pool
and prepared statement caching."
(:require
[sqlite4clj.impl.api :as api]
[sqlite4clj.impl.functions :as funcs]
[sqlite4clj.impl.functions-aggregates :as aggs]
[clojure.core.cache.wrapped :as cache])
(:import
(java.util.concurrent BlockingQueue LinkedBlockingQueue)))
(defn bind [stmt params]
(reduce
(fn [i param]
(cond
(integer? param) (api/bind-int stmt i param)
(double? param) (api/bind-double stmt i param)
(string? param) (api/bind-text stmt i param)
(nil? param) (api/bind-null stmt i)
:else (api/bind-blob stmt i param))
(inc i))
1 ;; starts at 1
params))
(defn prepare
([pdb sql]
(let [stmt (api/prepare-v3 pdb sql)
col-count (int #_{:clj-kondo/ignore [:type-mismatch]}
(api/column-count stmt))]
(cond-> {:stmt stmt}
(> col-count 0)
(assoc :col-metadata
(mapv (fn [c]
{:database (api/column-database-name stmt c)
:table (api/column-table-name stmt c)
:origin (api/column-origin-name stmt c)
:alias (api/column-name stmt c)})
(range 0 col-count)))))))
(defn prepare-cached [{:keys [pdb stmt-cache]} query]
(let [sql (first query)
params (subvec query 1)
{:keys [stmt] :as m}
(cache/lookup-or-miss stmt-cache sql
(fn [_] (prepare pdb sql)))]
(bind stmt params)
m))
(defmacro with-stmt-reset
{:clj-kondo/lint-as 'clojure.core/with-open}
[[stmt-binding stmt] & body]
`(let [~stmt-binding ~stmt]
(try
~@body
(finally
(api/reset ~stmt-binding)
(api/clear-bindings ~stmt-binding)))))
(defn get-column-val [stmt n]
(case (int #_{:clj-kondo/ignore [:type-mismatch]}
(api/column-type stmt n))
;; See type codes here: https://sqlite.org/c3ref/c_blob.html
1 (api/column-int stmt n)
2 (api/column-double stmt n)
3 (api/column-text stmt n)
4 (api/column-blob stmt n)
5 nil))
(defn column [stmt n-cols]
(case n-cols
0 nil
1 [(get-column-val stmt 0)]
2 [(get-column-val stmt 0)
(get-column-val stmt 1)]
3 [(get-column-val stmt 0)
(get-column-val stmt 1)
(get-column-val stmt 2)]
4 [(get-column-val stmt 0)
(get-column-val stmt 1)
(get-column-val stmt 2)
(get-column-val stmt 3)]
5 [(get-column-val stmt 0)
(get-column-val stmt 1)
(get-column-val stmt 2)
(get-column-val stmt 3)
(get-column-val stmt 4)]
;; After 5 params it's worth iterating
(loop [n 0
cols (transient [])]
(if (>= n n-cols)
(persistent! cols)
(recur (inc n)
(conj! cols (get-column-val stmt n)))))))
(defn- q* [conn query result-set-fn]
;; sqlite4clj uses -DSQLITE_THREADSAFE=2 which means sqlite4clj is
;; responsible for serializing access to database connections and prepared
;; prepared statements. SQLite will be safe to use in a multi-threaded
;; environment as long as no two threads attempt to use the same database
;; connection at the same time.
;; The reason for this is letting SQLite manage these locks is messy and
;; can lead to high tail latency (SQLITE_BUSY). So it's better for the
;; driver/application layer to handle it.
;; sqlite4clj manages connections through the pool. So most of the time
;; connections will only be handled by a single thread at a time.
;; The exception is when write/read transactions are being used in
;; creative async contexts. So this lock is here to prevent problems when
;; that happens. Outside of this usage it will not come into play/cause
;; contention.
(locking conn
(let [{:keys [stmt col-metadata]} (prepare-cached conn query)]
(with-stmt-reset [stmt stmt]
(let [n-cols (int
#_{:clj-kondo/ignore [:type-mismatch]}
(api/column-count stmt))
result-set-fn (or result-set-fn (:default-result-set-fn conn))]
(result-set-fn col-metadata
(reify
clojure.lang.IReduceInit
(reduce [_ f init]
(loop [ret init]
(let [code (int
#_{:clj-kondo/ignore [:type-mismatch]}
(api/step stmt))]
(case code
100 (let [result (f ret (column stmt n-cols))]
(if (reduced? result)
@result
(recur result)))
101 ret
(throw (api/sqlite-ex-info (:pdb conn) code
{:sql (first query)
:params (subvec query 1)})))))))))))))
(def default-pragma
{:cache_size 15625
:page_size 4096
:journal_mode "WAL"
:synchronous "NORMAL"
:temp_store "MEMORY"
:foreign_keys false
;; Because of WAL and a single writer at the application level
;; SQLITE_BUSY error should almost never happen, see:
;; https://sqlite.org/wal.html#sometimes_queries_return_sqlite_busy_in_wal_mode
;; However, sometime when using litestream for backups it can happen.
;; So we set it to the recommended value see:
;; https://litestream.io/tips/#busy-timeout
:busy_timeout 5000
;; Litestream recommends disabling autocheckpointing under high write loads
;; https://litestream.io/tips/#disable-autocheckpoints-for-high-write-load-servers
:wal_autocheckpoint 0
;; :optimize cannot be run on connection open when using application
;; function in indexes. As you will get a unknown function error.
;; https://sqlite.org/pragma.html#pragma_optimize
;; :optimize 0x10002
})
(defn pragma->set-pragma-query [pragma]
(conj (->> (merge default-pragma pragma)
(mapv (fn [[k v]] [(str "pragma " (name k) "=" v)])))))
(defn no-unwrap-result-set-fn
[_col-metadata result-set]
(let [result (into [] result-set)]
(when (seq result) result)))
(defn qualified-keyword-result-set-fn [col-metadata result-set]
(let [col-keys
(mapv (fn [{:keys [table alias]}]
(keyword table alias)) col-metadata)
result (into []
(map (fn [col-vals]
(zipmap col-keys col-vals)))
result-set)]
(when (seq result) result)))
(defn new-conn! [db-name pragma read-only vfs default-result-set-fn]
(let [flags (if read-only
;; SQLITE_OPEN_READONLY
0x00000001
;; SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE
(bit-or 0x00000002 0x00000004))
*pdb (api/open-v2 db-name flags vfs)
statement-cache (cache/fifo-cache-factory {} :threshold 512)
conn {:pdb *pdb
:stmt-cache statement-cache
:default-result-set-fn default-result-set-fn}]
(->> (pragma->set-pragma-query pragma)
(run! #(q* conn % default-result-set-fn)))
conn))
(defn init-pool!
[db-name & [{:keys [pool-size pragma read-only vfs default-result-set-fn]
:or {pool-size
(Runtime/.availableProcessors (Runtime/getRuntime))}}]]
(let [conns (repeatedly pool-size
(fn [] (new-conn! db-name pragma read-only vfs
default-result-set-fn)))
pool (LinkedBlockingQueue/new ^int pool-size)]
(run! #(BlockingQueue/.add pool %) conns)
{:conn-pool pool
:connections conns
:close
(fn [] (run! (fn [conn] (api/close (:pdb conn))) conns))}))
(defn unwrap-result-set-fn
[col-metadata result-set]
(let [result (if (= (count col-metadata) 1)
(into [] cat result-set)
(into [] result-set))]
(when (seq result) result)))
(defn init-db!
"A db consists of a read pool of size :pool-size and a write pool of size 1.
The same pragma are set for both pools."
[url & [{:keys [pool-size pragma writer-pragma vfs
default-result-set-fn]
:or {default-result-set-fn unwrap-result-set-fn
pool-size (Runtime/.availableProcessors
(Runtime/getRuntime))}}]]
(assert (< 0 pool-size))
(let [;; Only one write connection
writer
(init-pool! url
{:pool-size 1
:pragma (merge pragma writer-pragma)
:vfs vfs
:default-result-set-fn default-result-set-fn})
;; Pool of read connections
reader (if (= ":memory:" url)
writer
(init-pool! url
{:read-only true
:pool-size pool-size
:pragma pragma
:vfs vfs
:default-result-set-fn default-result-set-fn}))]
{:writer writer
:reader reader
;; Prevents application function callback pointers from getting
;; garbage collected.
:internal {:app-functions (atom {})
:app-aggregates (atom {})}}))
(defmacro with-conn
"Use the same connection for a series of queries (not a transaction) without
returning it to the pool until the end. If passed a connection instead of db
binds body with that connection."
{:clj-kondo/lint-as 'clojure.core/with-open}
[[tx db] & body]
`(if-let [conn-pool# (:conn-pool ~db)]
(let [~tx (BlockingQueue/.take conn-pool#)]
(binding [*print-length* nil]
(try
~@body
(finally
(BlockingQueue/.offer conn-pool# ~tx)))))
(let [~tx ~db]
(do ~@body))))
(defn q
"Run a query against a db. Return nil when no results."
[tx query &
[{:keys [result-set-fn]}] ]
(with-conn [conn tx]
(q* conn query result-set-fn)))
(defn optimize-db
"Use for running optimize on long lived connections. Should only be run
on write connection. Uses 0x10002 flag to analyse all tables. This is
optimal for fresh connections and or read/write connections having
different query usage. It also means you only need to run optimize from
the write connection and not all read connections."
[db]
(let [n-conn (count (:conn-pool db))]
(loop [n 0]
(q db ["pragma optimize=0x10002"])
(if (> n-conn n) (recur (inc n)) n))))
(defmacro with-read-tx
"Wrap series of queries in a read transaction."
{:clj-kondo/lint-as 'clojure.core/with-open}
[[tx db] & body]
`(let [conn-pool# (:conn-pool ~db)
~tx (BlockingQueue/.take conn-pool#)]
(binding [*print-length* nil]
(try
(q ~tx ["BEGIN DEFERRED"])
~@(butlast body)
(let [r# ~(last body)]
(q ~tx ["COMMIT"])
r#)
(catch Throwable t#
;; Handles non SQLITE errors crashing a transaction
(q ~tx ["ROLLBACK"])
(throw t#))
(finally
(BlockingQueue/.offer conn-pool# ~tx))))))
(defmacro with-write-tx
"Wrap series of queries in a write transaction."
{:clj-kondo/lint-as 'clojure.core/with-open}
[[tx db] & body]
`(let [conn-pool# (:conn-pool ~db)
~tx (BlockingQueue/.take conn-pool#)]
(binding [*print-length* nil]
(try
(q ~tx ["BEGIN IMMEDIATE"])
~@(butlast body)
(let [r# ~(last body)]
(q ~tx ["COMMIT"])
r#)
(catch Throwable t#
;; Handles non SQLITE errors crashing a transaction
(q ~tx ["ROLLBACK"])
(throw t#))
(finally
(BlockingQueue/.offer conn-pool# ~tx))))))
;; WAL + single writer enforced at the application layer means you don't need
;; to handle SQLITE_BUSY or SQLITE_LOCKED.
;; TODO: finalise prepared statements when shutting down
(defn create-function
"register a user-defined function with sqlite on all connections.
parameters:
- db: database from init-db!
- name: string function name
- f-or-var: either a function or a var that points to a function.
- opts: a map of options that can include:
the sqlite function flags (see https://www.sqlite.org/c3ref/c_deterministic.html)
- :deterministic? (boolean)
- :direct-only? (boolean)
- :innocuous? (boolean)
- :sub-type? (boolean)
- :result-sub-type? (boolean)
- :self-order1? (boolean)
other options:
- :arity (int): the number of arguments the function takes.
by default the function/var will be analyzed and the arity will be inferred.
if the function has multiple arities then all will be registered with sqlite.
you can specify the arity explicitly with the `:arity` option."
[db name f-or-var & {:as opts}]
(funcs/create-function db name f-or-var opts))
(defn remove-function
"unregister a user-defined function from sqlite on all connections.
if an arity is not provided, it will unregister all arities for the function."
([db name]
(funcs/remove-function db name))
([db name arity]
(funcs/remove-function db name arity)))
(defn create-aggregate
"register a user-defined aggregate function with sqlite on all connections.
parameters:
- db: database from init-db!
- name: string function name
- step-f-or-var: step callback function or var
- final-f-or-var: final callback function or var
- opts: a map of options that can include:
the sqlite function flags (see https://www.sqlite.org/c3ref/c_deterministic.html)
- :deterministic? (boolean)
- :direct-only? (boolean)
- :innocuous? (boolean)
- :sub-type? (boolean)
- :result-sub-type? (boolean)
- :self-order1? (boolean)
aggregate options:
- :arity (int): SQL arity. -1 means variadic.
- :initial-state: initial step state used for empty inputs when provided.
by default SQL arity is inferred from step-f-or-var as:
sql arity = (step arity - 1) where the first argument is aggregate state.
step callbacks must have signature:
(fn [state & sql-args] new-state)
final callbacks must have signature:
(fn [state] result)"
[db name step-f-or-var final-f-or-var & {:as opts}]
(aggs/create-aggregate db name step-f-or-var final-f-or-var opts))
(defn remove-aggregate
"unregister a user-defined aggregate from sqlite on all connections.
if an arity is not provided, it will unregister all arities for the aggregate."
([db name]
(aggs/remove-aggregate db name))
([db name arity]
(aggs/remove-aggregate db name arity)))