func.go

  1package sqlite3
  2
  3import (
  4	"context"
  5	"io"
  6	"iter"
  7	"sync"
  8	"sync/atomic"
  9
 10	"github.com/tetratelabs/wazero/api"
 11
 12	"github.com/ncruces/go-sqlite3/internal/util"
 13)
 14
 15// CollationNeeded registers a callback to be invoked
 16// whenever an unknown collation sequence is required.
 17//
 18// https://sqlite.org/c3ref/collation_needed.html
 19func (c *Conn) CollationNeeded(cb func(db *Conn, name string)) error {
 20	var enable int32
 21	if cb != nil {
 22		enable = 1
 23	}
 24	rc := res_t(c.call("sqlite3_collation_needed_go", stk_t(c.handle), stk_t(enable)))
 25	if err := c.error(rc); err != nil {
 26		return err
 27	}
 28	c.collation = cb
 29	return nil
 30}
 31
 32// AnyCollationNeeded uses [Conn.CollationNeeded] to register
 33// a fake collating function for any unknown collating sequence.
 34// The fake collating function works like BINARY.
 35//
 36// This can be used to load schemas that contain
 37// one or more unknown collating sequences.
 38func (c Conn) AnyCollationNeeded() error {
 39	rc := res_t(c.call("sqlite3_anycollseq_init", stk_t(c.handle), 0, 0))
 40	if err := c.error(rc); err != nil {
 41		return err
 42	}
 43	c.collation = nil
 44	return nil
 45}
 46
 47// CreateCollation defines a new collating sequence.
 48//
 49// https://sqlite.org/c3ref/create_collation.html
 50func (c *Conn) CreateCollation(name string, fn CollatingFunction) error {
 51	var funcPtr ptr_t
 52	defer c.arena.mark()()
 53	namePtr := c.arena.string(name)
 54	if fn != nil {
 55		funcPtr = util.AddHandle(c.ctx, fn)
 56	}
 57	rc := res_t(c.call("sqlite3_create_collation_go",
 58		stk_t(c.handle), stk_t(namePtr), stk_t(funcPtr)))
 59	return c.error(rc)
 60}
 61
 62// Collating function is the type of a collation callback.
 63// Implementations must not retain a or b.
 64type CollatingFunction func(a, b []byte) int
 65
 66// CreateFunction defines a new scalar SQL function.
 67//
 68// https://sqlite.org/c3ref/create_function.html
 69func (c *Conn) CreateFunction(name string, nArg int, flag FunctionFlag, fn ScalarFunction) error {
 70	var funcPtr ptr_t
 71	defer c.arena.mark()()
 72	namePtr := c.arena.string(name)
 73	if fn != nil {
 74		funcPtr = util.AddHandle(c.ctx, fn)
 75	}
 76	rc := res_t(c.call("sqlite3_create_function_go",
 77		stk_t(c.handle), stk_t(namePtr), stk_t(nArg),
 78		stk_t(flag), stk_t(funcPtr)))
 79	return c.error(rc)
 80}
 81
 82// ScalarFunction is the type of a scalar SQL function.
 83// Implementations must not retain arg.
 84type ScalarFunction func(ctx Context, arg ...Value)
 85
 86// CreateAggregateFunction defines a new aggregate SQL function.
 87//
 88// https://sqlite.org/c3ref/create_function.html
 89func (c *Conn) CreateAggregateFunction(name string, nArg int, flag FunctionFlag, fn AggregateSeqFunction) error {
 90	var funcPtr ptr_t
 91	defer c.arena.mark()()
 92	namePtr := c.arena.string(name)
 93	if fn != nil {
 94		funcPtr = util.AddHandle(c.ctx, AggregateConstructor(func() AggregateFunction {
 95			var a aggregateFunc
 96			coro := func(yieldCoro func(struct{}) bool) {
 97				seq := func(yieldSeq func([]Value) bool) {
 98					for yieldSeq(a.arg) {
 99						if !yieldCoro(struct{}{}) {
100							break
101						}
102					}
103				}
104				fn(&a.ctx, seq)
105			}
106			a.next, a.stop = iter.Pull(coro)
107			return &a
108		}))
109	}
110	rc := res_t(c.call("sqlite3_create_aggregate_function_go",
111		stk_t(c.handle), stk_t(namePtr), stk_t(nArg),
112		stk_t(flag), stk_t(funcPtr)))
113	return c.error(rc)
114}
115
116// AggregateSeqFunction is the type of an aggregate SQL function.
117// Implementations must not retain the slices yielded by seq.
118type AggregateSeqFunction func(ctx *Context, seq iter.Seq[[]Value])
119
120// CreateWindowFunction defines a new aggregate or aggregate window SQL function.
121// If fn returns a [WindowFunction], an aggregate window function is created.
122// If fn returns an [io.Closer], it will be called to free resources.
123//
124// https://sqlite.org/c3ref/create_function.html
125func (c *Conn) CreateWindowFunction(name string, nArg int, flag FunctionFlag, fn AggregateConstructor) error {
126	var funcPtr ptr_t
127	defer c.arena.mark()()
128	namePtr := c.arena.string(name)
129	if fn != nil {
130		funcPtr = util.AddHandle(c.ctx, AggregateConstructor(func() AggregateFunction {
131			agg := fn()
132			if win, ok := agg.(WindowFunction); ok {
133				return win
134			}
135			return windowFunc{agg, name}
136		}))
137	}
138	rc := res_t(c.call("sqlite3_create_window_function_go",
139		stk_t(c.handle), stk_t(namePtr), stk_t(nArg),
140		stk_t(flag), stk_t(funcPtr)))
141	return c.error(rc)
142}
143
144// AggregateConstructor is a an [AggregateFunction] constructor.
145type AggregateConstructor func() AggregateFunction
146
147// AggregateFunction is the interface an aggregate function should implement.
148//
149// https://sqlite.org/appfunc.html
150type AggregateFunction interface {
151	// Step is invoked to add a row to the current window.
152	// The function arguments, if any, corresponding to the row being added, are passed to Step.
153	// Implementations must not retain arg.
154	Step(ctx Context, arg ...Value)
155
156	// Value is invoked to return the current (or final) value of the aggregate.
157	Value(ctx Context)
158}
159
160// WindowFunction is the interface an aggregate window function should implement.
161//
162// https://sqlite.org/windowfunctions.html
163type WindowFunction interface {
164	AggregateFunction
165
166	// Inverse is invoked to remove the oldest presently aggregated result of Step from the current window.
167	// The function arguments, if any, are those passed to Step for the row being removed.
168	// Implementations must not retain arg.
169	Inverse(ctx Context, arg ...Value)
170}
171
172// OverloadFunction overloads a function for a virtual table.
173//
174// https://sqlite.org/c3ref/overload_function.html
175func (c *Conn) OverloadFunction(name string, nArg int) error {
176	defer c.arena.mark()()
177	namePtr := c.arena.string(name)
178	rc := res_t(c.call("sqlite3_overload_function",
179		stk_t(c.handle), stk_t(namePtr), stk_t(nArg)))
180	return c.error(rc)
181}
182
183func destroyCallback(ctx context.Context, mod api.Module, pApp ptr_t) {
184	util.DelHandle(ctx, pApp)
185}
186
187func collationCallback(ctx context.Context, mod api.Module, pArg, pDB ptr_t, eTextRep uint32, zName ptr_t) {
188	if c, ok := ctx.Value(connKey{}).(*Conn); ok && c.handle == pDB && c.collation != nil {
189		name := util.ReadString(mod, zName, _MAX_NAME)
190		c.collation(c, name)
191	}
192}
193
194func compareCallback(ctx context.Context, mod api.Module, pApp ptr_t, nKey1 int32, pKey1 ptr_t, nKey2 int32, pKey2 ptr_t) uint32 {
195	fn := util.GetHandle(ctx, pApp).(CollatingFunction)
196	return uint32(fn(util.View(mod, pKey1, int64(nKey1)), util.View(mod, pKey2, int64(nKey2))))
197}
198
199func funcCallback(ctx context.Context, mod api.Module, pCtx, pApp ptr_t, nArg int32, pArg ptr_t) {
200	db := ctx.Value(connKey{}).(*Conn)
201	args := callbackArgs(db, nArg, pArg)
202	defer returnArgs(args)
203	fn := util.GetHandle(db.ctx, pApp).(ScalarFunction)
204	fn(Context{db, pCtx}, *args...)
205}
206
207func stepCallback(ctx context.Context, mod api.Module, pCtx, pAgg, pApp ptr_t, nArg int32, pArg ptr_t) {
208	db := ctx.Value(connKey{}).(*Conn)
209	args := callbackArgs(db, nArg, pArg)
210	defer returnArgs(args)
211	fn, _ := callbackAggregate(db, pAgg, pApp)
212	fn.Step(Context{db, pCtx}, *args...)
213}
214
215func valueCallback(ctx context.Context, mod api.Module, pCtx, pAgg, pApp ptr_t, final int32) {
216	db := ctx.Value(connKey{}).(*Conn)
217	fn, handle := callbackAggregate(db, pAgg, pApp)
218	fn.Value(Context{db, pCtx})
219
220	// Cleanup.
221	if final != 0 {
222		var err error
223		if handle != 0 {
224			err = util.DelHandle(ctx, handle)
225		} else if c, ok := fn.(io.Closer); ok {
226			err = c.Close()
227		}
228		if err != nil {
229			Context{db, pCtx}.ResultError(err)
230			return // notest
231		}
232	}
233}
234
235func inverseCallback(ctx context.Context, mod api.Module, pCtx, pAgg ptr_t, nArg int32, pArg ptr_t) {
236	db := ctx.Value(connKey{}).(*Conn)
237	args := callbackArgs(db, nArg, pArg)
238	defer returnArgs(args)
239	fn := util.GetHandle(db.ctx, pAgg).(WindowFunction)
240	fn.Inverse(Context{db, pCtx}, *args...)
241}
242
243func callbackAggregate(db *Conn, pAgg, pApp ptr_t) (AggregateFunction, ptr_t) {
244	if pApp == 0 {
245		handle := util.Read32[ptr_t](db.mod, pAgg)
246		return util.GetHandle(db.ctx, handle).(AggregateFunction), handle
247	}
248
249	// We need to create the aggregate.
250	fn := util.GetHandle(db.ctx, pApp).(AggregateConstructor)()
251	if pAgg != 0 {
252		handle := util.AddHandle(db.ctx, fn)
253		util.Write32(db.mod, pAgg, handle)
254		return fn, handle
255	}
256	return fn, 0
257}
258
259var (
260	valueArgsPool sync.Pool
261	valueArgsLen  atomic.Int32
262)
263
264func callbackArgs(db *Conn, nArg int32, pArg ptr_t) *[]Value {
265	arg, ok := valueArgsPool.Get().(*[]Value)
266	if !ok || cap(*arg) < int(nArg) {
267		max := valueArgsLen.Or(nArg) | nArg
268		lst := make([]Value, max)
269		arg = &lst
270	}
271	lst := (*arg)[:nArg]
272	for i := range lst {
273		lst[i] = Value{
274			c:      db,
275			handle: util.Read32[ptr_t](db.mod, pArg+ptr_t(i)*ptrlen),
276		}
277	}
278	*arg = lst
279	return arg
280}
281
282func returnArgs(p *[]Value) {
283	valueArgsPool.Put(p)
284}
285
286type aggregateFunc struct {
287	next func() (struct{}, bool)
288	stop func()
289	ctx  Context
290	arg  []Value
291}
292
293func (a *aggregateFunc) Step(ctx Context, arg ...Value) {
294	a.ctx = ctx
295	a.arg = append(a.arg[:0], arg...)
296	if _, more := a.next(); !more {
297		a.stop()
298	}
299}
300
301func (a *aggregateFunc) Value(ctx Context) {
302	a.ctx = ctx
303	a.stop()
304}
305
306func (a *aggregateFunc) Close() error {
307	a.stop()
308	return nil
309}
310
311type windowFunc struct {
312	AggregateFunction
313	name string
314}
315
316func (w windowFunc) Inverse(ctx Context, arg ...Value) {
317	// Implementing inverse allows certain queries that don't really need it to succeed.
318	ctx.ResultError(util.ErrorString(w.name + ": may not be used as a window function"))
319}