@@ -2,25 +2,32 @@ package daemon
2
2
3
3
import (
4
4
"context"
5
+ "io"
5
6
"path/filepath"
6
7
"sync"
7
8
8
9
"github.com/containerd/containerd"
9
- "github.com/containerd/containerd/api/services/containers/v1"
10
- "github.com/containerd/containerd/api/services/diff/v1"
11
- "github.com/containerd/containerd/api/services/images/v1"
12
- "github.com/containerd/containerd/api/services/namespaces/v1"
13
- "github.com/containerd/containerd/api/services/tasks/v1"
10
+ eventspb "github.com/containerd/containerd/api/events"
11
+ containerspb "github.com/containerd/containerd/api/services/containers/v1"
12
+ diffpb "github.com/containerd/containerd/api/services/diff/v1"
13
+ imagespb "github.com/containerd/containerd/api/services/images/v1"
14
+ namespacespb "github.com/containerd/containerd/api/services/namespaces/v1"
15
+ taskspb "github.com/containerd/containerd/api/services/tasks/v1"
14
16
"github.com/containerd/containerd/content"
15
17
"github.com/containerd/containerd/errdefs"
18
+ "github.com/containerd/containerd/events/exchange"
19
+ "github.com/containerd/containerd/images"
16
20
"github.com/containerd/containerd/leases"
17
21
"github.com/containerd/containerd/log"
18
- cns "github.com/containerd/containerd/namespaces"
22
+ "github.com/containerd/containerd/namespaces"
19
23
"github.com/containerd/containerd/plugin"
20
24
"github.com/containerd/containerd/services"
21
25
"github.com/containerd/containerd/snapshots"
22
26
criutil "github.com/containerd/cri/pkg/containerd/util"
23
27
"github.com/containerd/cri/pkg/server"
28
+ "github.com/containerd/typeurl"
29
+ prototypes "github.com/gogo/protobuf/types"
30
+ ocispec "github.com/opencontainers/image-spec/specs-go/v1"
24
31
"github.com/pkg/errors"
25
32
"github.com/rancher/k3c/pkg/client"
26
33
"github.com/rancher/k3c/pkg/daemon/config"
@@ -58,7 +65,7 @@ func (c *Daemon) RemoveVolume(ctx context.Context, name string, force bool) erro
58
65
59
66
func (c * Daemon ) start (ic * plugin.InitContext ) error {
60
67
var (
61
- ctx = criutil .WithUnlisted (cns .WithNamespace (ic .Context , defaults .PublicNamespace ), defaults .PrivateNamespace )
68
+ ctx = criutil .WithUnlisted (namespaces .WithNamespace (ic .Context , defaults .PublicNamespace ), defaults .PrivateNamespace )
62
69
)
63
70
plugins , err := ic .GetByType (plugin .GRPCPlugin )
64
71
if err != nil {
@@ -86,13 +93,14 @@ func (c *Daemon) start(ic *plugin.InitContext) error {
86
93
}
87
94
88
95
go c .gc (ctx )
96
+ go c .syncImages (namespaces .WithNamespace (ic .Context , defaults .PrivateNamespace ), ic .Events )
89
97
90
98
return nil
91
99
}
92
100
93
101
func (c * Daemon ) bootstrap (ic * plugin.InitContext ) error {
94
102
var (
95
- ctx = cns .WithNamespace (ic .Context , k3c .PrivateNamespace )
103
+ ctx = namespaces .WithNamespace (ic .Context , k3c .PrivateNamespace )
96
104
cfg = ic .Config .(* config.K3Config )
97
105
)
98
106
@@ -147,6 +155,96 @@ func (c *Daemon) Close() error {
147
155
return nil
148
156
}
149
157
158
+ // syncImages listens for ImageCreate events in the private namespace and copies them to the public namespace
159
+ // based on the assumption that ImageCreate events are from images built by buildkit
160
+ func (c * Daemon ) syncImages (ctx context.Context , ex * exchange.Exchange ) {
161
+ evtch , errch := ex .Subscribe (ctx , `topic~="/images/"` )
162
+ for {
163
+ select {
164
+ case err , ok := <- errch :
165
+ if ! ok {
166
+ return
167
+ }
168
+ log .G (ctx ).WithError (err ).Error ("image sync listener" )
169
+ case evt , ok := <- evtch :
170
+ if ! ok {
171
+ return
172
+ }
173
+ if evt .Namespace != defaults .PrivateNamespace {
174
+ continue
175
+ }
176
+ if err := c .handleEvent (ctx , evt .Event ); err != nil {
177
+ log .G (ctx ).WithError (err ).Error ("image sync handler" )
178
+ }
179
+ case <- ctx .Done ():
180
+ log .G (ctx ).WithError (ctx .Err ()).Error ("image sync handler" )
181
+ return
182
+ }
183
+ }
184
+ }
185
+
186
+ func (c * Daemon ) handleEvent (ctx context.Context , any * prototypes.Any ) error {
187
+ evt , err := typeurl .UnmarshalAny (any )
188
+ if err != nil {
189
+ return errors .Wrap (err , "failed to unmarshal any" )
190
+ }
191
+
192
+ switch e := evt .(type ) {
193
+ case * eventspb.ImageCreate :
194
+ log .G (ctx ).WithField ("event" , "image.create" ).Debug (e .Name )
195
+ return c .handleImageCreate (ctx , e .Name )
196
+ }
197
+
198
+ return nil
199
+ }
200
+
201
+ func (c * Daemon ) handleImageCreate (ctx context.Context , name string ) error {
202
+ imageStore := c .ctd .ImageService ()
203
+ img , err := imageStore .Get (ctx , name )
204
+ if err != nil {
205
+ return err
206
+ }
207
+ contentStore := c .ctd .ContentStore ()
208
+ otherContext := namespaces .WithNamespace (ctx , defaults .PublicNamespace )
209
+ var copy images.HandlerFunc = func (ctx context.Context , desc ocispec.Descriptor ) (subdescs []ocispec.Descriptor , err error ) {
210
+ log .G (ctx ).WithField ("media-type" , desc .MediaType ).Debug (desc .Digest )
211
+ info , err := contentStore .Info (ctx , desc .Digest )
212
+ if err != nil {
213
+ return subdescs , err
214
+ }
215
+ if _ , err = contentStore .Info (otherContext , desc .Digest ); err != nil && ! errdefs .IsNotFound (err ) {
216
+ return subdescs , err
217
+ }
218
+ ra , err := contentStore .ReaderAt (ctx , desc )
219
+ if err != nil {
220
+ return subdescs , err
221
+ }
222
+ defer ra .Close ()
223
+ r := content .NewReader (ra )
224
+ w , err := contentStore .Writer (otherContext , content .WithRef (img .Name ))
225
+ if err != nil {
226
+ return subdescs , err
227
+ }
228
+ defer w .Close ()
229
+ if _ , err = io .Copy (w , r ); err != nil {
230
+ return subdescs , err
231
+ }
232
+ if err = w .Commit (otherContext , 0 , w .Digest (), content .WithLabels (info .Labels )); err != nil && errdefs .IsAlreadyExists (err ) {
233
+ return subdescs , nil
234
+ }
235
+ return subdescs , err
236
+ }
237
+ err = images .Walk (ctx , images .Handlers (images .ChildrenHandler (contentStore ), copy ), img .Target )
238
+ if err != nil {
239
+ return err
240
+ }
241
+ _ , err = imageStore .Create (otherContext , img )
242
+ if errdefs .IsAlreadyExists (err ) {
243
+ return nil
244
+ }
245
+ return err
246
+ }
247
+
150
248
func getServicesOpts (ic * plugin.InitContext ) ([]containerd.ServicesOpt , error ) {
151
249
plugins , err := ic .GetByType (plugin .ServicePlugin )
152
250
if err != nil {
@@ -162,22 +260,22 @@ func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
162
260
return containerd .WithContentStore (s .(content.Store ))
163
261
},
164
262
services .ImagesService : func (s interface {}) containerd.ServicesOpt {
165
- return containerd .WithImageService (s .(images .ImagesClient ))
263
+ return containerd .WithImageService (s .(imagespb .ImagesClient ))
166
264
},
167
265
services .SnapshotsService : func (s interface {}) containerd.ServicesOpt {
168
266
return containerd .WithSnapshotters (s .(map [string ]snapshots.Snapshotter ))
169
267
},
170
268
services .ContainersService : func (s interface {}) containerd.ServicesOpt {
171
- return containerd .WithContainerService (s .(containers .ContainersClient ))
269
+ return containerd .WithContainerService (s .(containerspb .ContainersClient ))
172
270
},
173
271
services .TasksService : func (s interface {}) containerd.ServicesOpt {
174
- return containerd .WithTaskService (s .(tasks .TasksClient ))
272
+ return containerd .WithTaskService (s .(taskspb .TasksClient ))
175
273
},
176
274
services .DiffService : func (s interface {}) containerd.ServicesOpt {
177
- return containerd .WithDiffService (s .(diff .DiffClient ))
275
+ return containerd .WithDiffService (s .(diffpb .DiffClient ))
178
276
},
179
277
services .NamespacesService : func (s interface {}) containerd.ServicesOpt {
180
- return containerd .WithNamespaceService (s .(namespaces .NamespacesClient ))
278
+ return containerd .WithNamespaceService (s .(namespacespb .NamespacesClient ))
181
279
},
182
280
services .LeasesService : func (s interface {}) containerd.ServicesOpt {
183
281
return containerd .WithLeasesService (s .(leases.Manager ))
0 commit comments