-
Notifications
You must be signed in to change notification settings - Fork 440
Expand file tree
/
Copy pathmercure.go
More file actions
102 lines (81 loc) · 2.5 KB
/
mercure.go
File metadata and controls
102 lines (81 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//go:build !nomercure
package frankenphp
// #include <stdint.h>
// #include "frankenphp.h"
// #include <php.h>
import "C"
import (
"log/slog"
"unsafe"
"github.com/dunglas/mercure"
)
type mercureContext struct {
mercureHub *mercure.Hub
}
//export go_mercure_publish
func go_mercure_publish(threadIndex C.uintptr_t, topics *C.struct__zval_struct, data *C.zend_string, private bool, id, typ *C.zend_string, retry uint64) (generatedID *C.zend_string, error C.short) {
thread := phpThreads[threadIndex]
ctx := thread.context()
fc := thread.frankenPHPContext()
if fc.mercureHub == nil {
if fc.logger.Enabled(ctx, slog.LevelError) {
fc.logger.LogAttrs(ctx, slog.LevelError, "No Mercure hub configured")
}
return nil, 1
}
u := &mercure.Update{
Event: mercure.Event{
Data: GoString(unsafe.Pointer(data)),
ID: GoString(unsafe.Pointer(id)),
Retry: retry,
Type: GoString(unsafe.Pointer(typ)),
},
Private: private,
Debug: fc.logger.Enabled(ctx, slog.LevelDebug),
}
zvalType := C.zval_get_type(topics)
switch zvalType {
case C.IS_STRING:
u.Topics = []string{GoString(unsafe.Pointer(*(**C.zend_string)(unsafe.Pointer(&topics.value[0]))))}
case C.IS_ARRAY:
ts, err := GoPackedArray[string](unsafe.Pointer(*(**C.zend_array)(unsafe.Pointer(&topics.value[0]))))
if err != nil {
if fc.logger.Enabled(ctx, slog.LevelError) {
fc.logger.LogAttrs(ctx, slog.LevelError, "invalid topics type", slog.Any("error", err))
}
return nil, 1
}
u.Topics = ts
default:
// Never happens as the function is called from C with proper types
panic("invalid topics type")
}
if err := fc.mercureHub.Publish(ctx, u); err != nil {
if fc.logger.Enabled(ctx, slog.LevelError) {
fc.logger.LogAttrs(ctx, slog.LevelError, "Unable to publish Mercure update", slog.Any("error", err))
}
return nil, 2
}
return (*C.zend_string)(PHPString(u.ID, false)), 0
}
func (w *worker) configureMercure(o *workerOpt) {
if o.mercureHub == nil {
return
}
w.mercureHub = o.mercureHub
}
// WithMercureHub sets the mercure.Hub to use to publish updates
func WithMercureHub(hub *mercure.Hub) RequestOption {
return func(o *frankenPHPContext) error {
o.mercureHub = hub
return nil
}
}
// WithWorkerMercureHub sets the mercure.Hub in the worker script and used to dispatch hot reloading-related mercure.Update.
func WithWorkerMercureHub(hub *mercure.Hub) WorkerOption {
return func(w *workerOpt) error {
w.mercureHub = hub
w.requestOptions = append(w.requestOptions, WithMercureHub(hub))
return nil
}
}