-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.go
1389 lines (1237 loc) · 55.4 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Tries to use the guidelines at https://cloud.google.com/apis/design for the gRPC API where possible.
//
// TODO: permissive deadlines for all RPC calls
//
//nolint:staticcheck
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"maps"
"math"
"net/url"
"os"
"os/signal"
"regexp"
"strconv"
"strings"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"golang.org/x/oauth2"
"google.golang.org/api/idtoken"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
grpcMetadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/googleforgames/open-match2/v2/pkg/pb"
"github.com/googleforgames/open-match2/v2/internal/filter"
"github.com/googleforgames/open-match2/v2/internal/logging"
"github.com/googleforgames/open-match2/v2/internal/statestore/cache"
store "github.com/googleforgames/open-match2/v2/internal/statestore/datatypes"
memoryReplicator "github.com/googleforgames/open-match2/v2/internal/statestore/memory"
redisReplicator "github.com/googleforgames/open-match2/v2/internal/statestore/redis"
"github.com/googleforgames/open-match2/v2/internal/config"
"github.com/davecgh/go-spew/spew"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
// Required by protobuf compiler's golang gRPC auto-generated code.
type grpcServer struct {
pb.UnimplementedOpenMatchServiceServer
}
var (
InvalidIdErr = errors.New("Invalid ticket id")
NoTicketIdsErr = errors.New("No Ticket IDs in update request")
NoValidTicketIdsErr = errors.New("No Valid Ticket IDs in update request")
TooManyUpdatesErr = errors.New("Too many ticket state updates requested in a single call")
MMFTimeoutError = errors.New("MMF deadline in om-core (specified in OM_MMF_TIMEOUT_SECS config var) exceeded")
MMFsComplete = errors.New("All MMF streams monitored by om-core have completed (io.EOF)")
logger = logrus.WithFields(logrus.Fields{
"app": "open_match",
"component": "core",
})
cfg *viper.Viper = nil
// One global instance for the local ticket cache. Everything reads and
// writes to this one instance which contains concurrent-safe data
// structures where necessary.
tc cache.ReplicatedTicketCache
tlsConfig *tls.Config
meter *metric.Meter
otelShutdownFunc func(context.Context) error
)
func main() {
// Read configuration env vars, and configure logging
cfg = config.Read()
logging.ConfigureLogging(cfg)
// Make a parent context that gets cancelled on SIGINT
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
// Read in the system root certificates in case an MMF is using TLS (which will
// always be the case if your MMF is deployed to Google Cloud Run)
if tlsConfig == nil {
var err error
systemRootCa, err := x509.SystemCertPool()
// Turn off linting to avoid "G402: TLS MinVersion too low". Setting a
// min version can cause compatibility problems for anyone who tries to
// run OM on something other than Cloud Run.
tlsConfig = &tls.Config{RootCAs: systemRootCa} //nolint:gosec
if err != nil {
logger.Fatalf("gRPC TLS: failed to get root certs")
}
}
// Configure metrics
if cfg.GetBool("OM_OTEL_SIDECAR") {
meter, otelShutdownFunc = initializeOtel()
} else {
meter, otelShutdownFunc = initializeOtelWithLocalProm()
}
defer otelShutdownFunc(ctx) //nolint:errcheck
registerMetrics(meter)
cache.RegisterMetrics(meter)
// Set up the replicated ticket cache.
// fields using sync.Map come ready to use and don't need initialization
tc.Cfg = cfg
tc.UpRequests = make(chan *cache.UpdateRequest)
switch cfg.GetString("OM_STATE_STORAGE_TYPE") {
case "redis":
// Default: use redis
rr, err := redisReplicator.New(cfg)
if err != nil {
logger.Fatal(fmt.Errorf("Redis Failure; exiting: %w", err).Error())
}
tc.Replicator = rr
case "memory":
// NOT RECOMMENDED FOR PRODUCTION
//
// Store statage in local memory only. Every copy of om-core spun up
// using this configuration is an island which does not send or receive
// any updates to/from any other instances.
// Useful for debugging, local development, etc.
logger.Warnf("OM_STATE_STORAGE_TYPE configuration variable set to 'memory'. NOT RECOMMENDED FOR PRODUCTION")
tc.Replicator = memoryReplicator.New(cfg)
}
// These goroutines send and receive cache updates from state storage
go tc.OutgoingReplicationQueue(ctx)
go tc.IncomingReplicationQueue(ctx)
// Start the gRPC server
start(cfg)
// Dump the final state of the cache to the log for debugging.
if cfg.GetBool("OM_VERBOSE") {
spew.Dump(syncMapDump(&tc.Tickets))
spew.Dump(syncMapDump(&tc.InactiveSet))
spew.Dump(syncMapDump(&tc.Assignments))
}
logger.Info("Application stopped successfully.")
logger.Infof("Final state of local cache: %v tickets, %v active, %v inactive, %v assignments", len(syncMapDump(&tc.Tickets)), len(setDifference(&tc.Tickets, &tc.InactiveSet)), len(syncMapDump(&tc.InactiveSet)), len(syncMapDump(&tc.Assignments)))
}
//----------------------------------------------------------------------------------
// CreateTicket()
//----------------------------------------------------------------------------------
// CreateTicket generates an event to update the ticket state storage, adding a
// new ticket. The ticket's id will be generated by the state storage and
// returned asynchronously. This request hangs until it can return the ticket
// id. When a ticket is created, it starts off as inactive and must be
// activated with the ActivateTickets call. This ensures that no ticket that
// was not successfully replicated and returned to the om-core client is ever
// put in the pool.
func (s *grpcServer) CreateTicket(parentCtx context.Context, req *pb.CreateTicketRequest) (*pb.CreateTicketResponse, error) {
return createTicket(parentCtx, &tc, req)
}
// createTicket is the package-internal implementation of the grpc server
// CreateTicket function. Basically, this exists as a separate function only so
// that tests can run against it without having to stand up a grpc server.
func createTicket(parentCtx context.Context, tc *cache.ReplicatedTicketCache, req *pb.CreateTicketRequest) (*pb.CreateTicketResponse, error) {
rpcName := "CreateTicket"
// Add structured logging fields for logs output from this function
logger := logger.WithFields(logrus.Fields{
"stage": "handling",
"rpc": rpcName,
})
// Validate request and update fields to ensure a valid ticket, then
// Marshal ticket into storage format
ticketPb, err := validateAndMarshalIncomingTicket(req)
if ticketPb == nil || err != nil {
err = status.Error(codes.Internal,
fmt.Errorf("CreateTicket failed to marshal the provided ticket to the state storage format: %w", err).Error())
logger.Error(err)
return &pb.CreateTicketResponse{}, fmt.Errorf("%w", err)
}
// Record the size of the ticket in kb
otelTicketSize.Record(parentCtx, float64(len(ticketPb))/1024.0)
// Make a results return channel
rChan := make(chan *store.StateResponse)
// Queue our ticket creation cache update request to be replicated to all
// om-core instances.
tc.UpRequests <- &cache.UpdateRequest{
// This command (writing a ticket to the cache) is replicated to all
// other om-core instances using the batch writing async goroutine
// tc.OutgoingReplicationQueue() and the update itself is applied to
// the local ticket cache in the update-processing async goroutine
// tc.IncomingReplicationQueue().
ResultsChan: rChan,
Ctx: parentCtx,
Update: store.StateUpdate{
Cmd: store.Ticket,
Value: string(ticketPb[:]),
},
}
// Get the results
results := <-rChan
return &pb.CreateTicketResponse{TicketId: results.Result}, results.Err
}
// validateAndMarshalIncomingTicket is the package-internal implementation to
// validate expiration time and creation time input to the CreateTicket
// function and generate replacement timestamps as necessary.
//
// Basically, this exists as a separate function only so that tests can run
// against it without having to stand up a grpc server.
//
// A note about testing and input validation: this implementation and its test
// suite are designed around how this function is executed in the live
// production code path: e.g. This function is only ever called by the
// OpenMatchServiceServer grpc server's CreateTicket function, which is assumed
// to sit behind the grpc-gateway reverse-proxy
// (https://github.com/grpc-ecosystem/grpc-gateway). That upstream function
// call chain will fail if the JSON pb.CreateTicketRequest is invalid, so this
// function does not need to do any input validation that would be caught by
// those upstream functions. (Basically, if your request is so malformed that
// the JSON can't be marshalled to the protobuf CreateTicketRequest message,
// your input will never reach this function.)
func validateAndMarshalIncomingTicket(req *pb.CreateTicketRequest) ([]byte, error) {
rpcName := "CreateTicket"
// Add structured logging fields for logs output from this function
logger := logger.WithFields(logrus.Fields{
"stage": "validation",
"rpc": rpcName,
})
// Input validation
if req.GetTicket().GetId() != "" {
logger.Warnf("CreateTicket request included a ticketid '%v'. "+
"Open Match assigns Ticket IDs on creation"+
", see documentation for more details.",
req.GetTicket().GetId())
}
// Set creation timestamp if request didn't provide one or specified one
// that's in the future.
//
// Note: om-core will not complain about a creation time so old that the
// expiration time has already passed and the ticket is immediately
// expired!
crTime := req.GetTicket().GetAttributes().GetCreationTime()
switch {
case crTime == nil:
// This is our expected code path: user provides an empty creation
// time and om-core fills it in. (Thus why this logs at debug logging
// level while all other creation time validation failures are warnings)
logger.Trace("No CreationTime provided; using current time")
crTime = timestamppb.Now()
case !crTime.IsValid():
logger.Warn("CreationTime provided is invalid; replacing with current time")
crTime = timestamppb.Now()
case crTime.AsTime().After(time.Now()):
logger.Warn("CreationTime provided is in the future; replacing with current time")
crTime = timestamppb.Now()
}
// Set expiration timestamp if request didn't provide a valid one.
//
// Matchmakers can request a shorter expiration time than the default, but
// not a longer one. The default expiration time is also the maximum allowed:
// now + the configured ticket TTL
exTime := req.GetTicket().GetExpirationTime()
maxExTime := time.Now().Add(time.Millisecond *
time.Duration(cfg.GetInt("OM_CACHE_TICKET_TTL_MS")))
switch {
case exTime == nil:
// This is our expected code path: user provides an empty expiration
// time and om-core fills it in. (Thus why this logs at debug logging
// level while all other recoverable exp time validations failures are warnings)
logger.Tracef("ExpirationTime provided is missing; "+
"replacing with current time + OM_CACHE_TICKET_TTL_MS (%v)",
cfg.GetInt("OM_CACHE_TICKET_TTL_MS"))
exTime = timestamppb.New(maxExTime)
case !exTime.IsValid():
fallthrough
case exTime.AsTime().After(maxExTime):
fallthrough
case exTime.AsTime().Before(time.Now()):
// This combination of creation time + expiration time isn't valid.
return nil, fmt.Errorf("%w", status.Error(codes.InvalidArgument,
fmt.Sprintf("specified expiration time not between (now, now + %v)",
cfg.GetInt("OM_CACHE_TICKET_TTL_MS"))))
}
// Update ticket with new creation/expiration time
return proto.Marshal(&pb.Ticket{ //nolint:wrapcheck
ExpirationTime: exTime,
Extensions: req.GetTicket().GetExtensions(),
Attributes: &pb.Ticket_FilterableData{
Tags: req.GetTicket().GetAttributes().GetTags(),
StringArgs: req.GetTicket().GetAttributes().GetStringArgs(),
DoubleArgs: req.GetTicket().GetAttributes().GetDoubleArgs(),
CreationTime: crTime,
},
})
}
//----------------------------------------------------------------------------------
// ActivateTicket()
// DeactivateTicket()
//----------------------------------------------------------------------------------
// DeactivatesTicket is a lazy deletion process: it adds the provided ticketIDs to the inactive list,
// which prevents them from appearing in player pools, and the tickets are deleted when they expire.
//
// Receiving one or more invalid ticket ids in the pb.DeactivateTicketsRequest
// does not stop processing of valid ticket ids in the same request. In the
// event of a partially successful update, the grpc Status Details can be
// checked for more information about which updates failed, and why.
func (s *grpcServer) DeactivateTickets(parentCtx context.Context, req *pb.DeactivateTicketsRequest) (*pb.DeactivateTicketsResponse, error) {
logger := logger.WithFields(logrus.Fields{
"rpc": "DeactivateTickets",
})
return deactivateTickets(parentCtx, logger, &tc, req)
}
// deactivateTicket is the package-internal implementation of the grpc server
// DeactivateTicket function. Basically, this exists as a separate function only so
// that tests can run against it without having to stand up a grpc server.
func deactivateTickets(parentCtx context.Context, logger *logrus.Entry, tc *cache.ReplicatedTicketCache, req *pb.DeactivateTicketsRequest) (*pb.DeactivateTicketsResponse, error) {
// This function does not allow for cancelling once the request is received.
ctx := context.WithoutCancel(parentCtx)
// Validate input list of ticket ids against state storage id format before
// trying to process them. If only some of the ids are valid, the
// invalidIdErrorDetails will be used to populate error details to return
// to the gRPC client.
validTicketIds, invalidIdErrorDetails, err := validateTicketStateUpdates(logger,
tc.Replicator.GetReplIdValidator(), req.GetTicketIds())
if err != nil && len(validTicketIds) == 0 {
// Nothing valid to process.
return nil, fmt.Errorf("%w", status.Error(codes.InvalidArgument, err.Error()))
}
// Send the ticket activation updates.
updateStateErrDetails := updateTicketsActiveState(ctx, logger, tc, validTicketIds, store.Deactivate)
// Attach error details we found while validating ticket IDs and
// replicating the updates.
maps.Copy(updateStateErrDetails, invalidIdErrorDetails)
if len(updateStateErrDetails) > 0 {
err = addErrorDetails(updateStateErrDetails, status.New(codes.InvalidArgument, err.Error()))
}
// Record metrics
if meter != nil {
otelDeactivationsPerCall.Record(ctx, int64(len(validTicketIds)))
otelInvalidIdsPerDeactivateCall.Record(ctx, int64(len(invalidIdErrorDetails)))
otelFailedIdsPerDeactivateCall.Record(ctx, int64(len(updateStateErrDetails)))
}
return &pb.DeactivateTicketsResponse{}, err
}
// ActivateTickets accepts a list of ticketids to activate, validates the
// input, and generates replication updates for each activation event.
//
// Receiving one or more invalid ticket ids in the pb.ActivateTicketsRequest
// does not stop processing of valid ticket ids in the same request. In the
// event of a partially successful update, the grpc Status Details can be
// checked for more information about which updates failed, and why.
func (s *grpcServer) ActivateTickets(parentCtx context.Context, req *pb.ActivateTicketsRequest) (*pb.ActivateTicketsResponse, error) {
// Structured logging fields for this function.
logger := logger.WithFields(logrus.Fields{
"rpc": "ActivateTickets",
})
return activateTickets(parentCtx, logger, &tc, req)
}
// activateTicket is the package-internal implementation of the grpc server
// ActivateTicket function. Basically, this exists as a separate function only so
// that tests can run against it without having to stand up a grpc server.
func activateTickets(parentCtx context.Context, logger *logrus.Entry, tc *cache.ReplicatedTicketCache, req *pb.ActivateTicketsRequest) (*pb.ActivateTicketsResponse, error) {
// This function does not allow for cancelling once the request is received.
ctx := context.WithoutCancel(parentCtx)
// Validate input list of ticket ids against state storage id format before
// trying to process them. If only some of the ids are valid, the
// invalidIdErrorDetails will be used to populate error details to return
// to the gRPC client.
validTicketIds, invalidIdErrorDetails, err := validateTicketStateUpdates(logger,
tc.Replicator.GetReplIdValidator(), req.GetTicketIds())
if err != nil && len(validTicketIds) == 0 {
// Nothing valid to process.
return nil, fmt.Errorf("%w", status.Error(codes.InvalidArgument, err.Error()))
}
// Send the ticket activation updates.
updateStateErrDetails := updateTicketsActiveState(ctx, logger, tc, validTicketIds, store.Activate)
// Attach error details we found while validating ticket IDs and
// replicating the updates.
maps.Copy(updateStateErrDetails, invalidIdErrorDetails)
if len(updateStateErrDetails) > 0 {
err = addErrorDetails(updateStateErrDetails, status.New(codes.InvalidArgument, err.Error()))
}
// Record metrics
if meter != nil {
otelActivationsPerCall.Record(ctx, int64(len(validTicketIds)))
otelInvalidIdsPerActivateCall.Record(ctx, int64(len(invalidIdErrorDetails)))
otelFailedIdsPerActivateCall.Record(ctx, int64(len(updateStateErrDetails)))
}
return &pb.ActivateTicketsResponse{}, err
}
// validateTicketStateUpdates takes a list of ticket IDs to validate, and a
// regex used to validate those IDs. Since we want to process any valid ticket
// IDs in requests that contain multiple IDs, this function doesn't just exit
// when it finds an invalid ID. It returns a list of valid IDs, a map of IDs
// to failures (errors) for those IDs that are invalid, and an error.
func validateTicketStateUpdates(logger *logrus.Entry, idValidator *regexp.Regexp, ticketIds []string) (validIds []string, invalidIdErrorDetails map[string]error, err error) {
// Initialize map to prevent 'panic: assignment to entry in nil map' on first assignment
invalidIdErrorDetails = map[string]error{}
// Structured logging
logger = logger.WithFields(logrus.Fields{
"stage": "validation",
})
// Validate number of requested updates
numReqUpdates := len(ticketIds)
if numReqUpdates > cfg.GetInt("OM_MAX_STATE_UPDATES_PER_CALL") {
return nil, nil, fmt.Errorf("%w (configured maximum %v, requested %v)",
TooManyUpdatesErr, cfg.GetInt("OM_MAX_STATE_UPDATES_PER_CALL"), numReqUpdates)
}
if numReqUpdates == 0 {
return nil, nil, NoTicketIdsErr
}
// Validate ids of requested updates
if numReqUpdates > 0 {
// check each id in the input list, making separate lists of those that are
// valid and those that are invalid.
for _, id := range ticketIds {
if idValidator.MatchString(id) {
validIds = append(validIds, id)
logger.WithFields(logrus.Fields{"ticket_id": id}).Trace("valid ticket id")
} else {
// Generate error details to attach to returned grpc Status
invalidIdErrorDetails[id] = InvalidIdErr
logger.WithFields(logrus.Fields{"ticket_id": id}).Error(InvalidIdErr)
}
}
}
if len(validIds) == 0 {
// Don't bother returning details about every invalid ticket id if they
// were all invalid.
return nil, nil, NoValidTicketIdsErr
}
if len(invalidIdErrorDetails) > 0 {
err = InvalidIdErr
}
return
}
// updateTicketsActiveState accepts a list of ticketids to (de-)activate, and
// generates cache updates for each.
//
// NOTE This function does no input validation, the calling function has to handle that.
func updateTicketsActiveState(parentCtx context.Context, logger *logrus.Entry, tc *cache.ReplicatedTicketCache, ticketIds []string, command int) map[string]error {
// Make a human-readable version of the requested state transition, for logging
var requestedStateAsString string
switch command {
case store.Deactivate:
requestedStateAsString = "inactive"
case store.Activate:
requestedStateAsString = "active"
}
logger = logger.WithFields(logrus.Fields{
"stage": "handling",
"rpc": "updateTicketsActiveState",
"num_updates": len(ticketIds),
"desired_state": requestedStateAsString,
})
errs := map[string]error{}
// Generate result channel
rChan := make(chan *store.StateResponse, len(ticketIds))
defer close(rChan)
// Queue the update requests
logger.Trace("queuing updates for ticket state change")
for _, id := range ticketIds {
tc.UpRequests <- &cache.UpdateRequest{
// This command (adding/removing the id to the inactive list)
// is replicated to all other om-core instances using the batch
// writing async goroutine tc.OutgoingReplicationQueue() and its
// effect is applied to the local ticket cache in the update
// processing async goroutine tc.IncomingReplicationQueue().
ResultsChan: rChan,
Ctx: parentCtx,
Update: store.StateUpdate{
Cmd: command,
Key: id,
},
}
logger.WithFields(logrus.Fields{
"ticket_id": id,
}).Trace("generated request to update ticket status")
}
// Look through all results for errors. Since results come back on a
// channel, we need to process all the results, even if the context has
// been cancelled and we'll never return those results to the calling
// client (returning from this function before processing all results
// closes the results channel, which will cause a panic, as the state
// storage layer isn't context-aware). This is slightly inefficient in the
// worst-case scenario (client has quit) but the code is very simple and
// therefore robust.
for i := 0; i < len(ticketIds); i++ {
r := <-rChan
if r.Err != nil {
// Wrap redis error and give it a gRPC internal server error status code
// The results.result field contains the ticket id that generated the error.
errs[r.Result] = status.Error(codes.Internal, fmt.Errorf("Unable to update ticket %v state to %v : %w", ticketIds[i], requestedStateAsString, r.Err).Error())
logger.Error(errs[r.Result])
}
}
return errs
}
//----------------------------------------------------------------------------------
// InvokeMatchmakingFunctions()
//----------------------------------------------------------------------------------
// InvokeMatchmakingFunctions loops through each Pool in the provided Profile,
// applying the filters inside and adding participating tickets to those pools.
// It then attempts to connect to every matchmaking function in the provided
// list, and send the Profile with filled Pools to each. It processes resulting
// matches from each matchmaking function asynchronously as they arrive. For
// each match it receives, it deactivates all tickets contained in the match
// and streams the match back to the InvokeMatchmakingFunctions caller.
//
// NOTE: (complexity linters disabled) The complexity here is unavoidable as we're
// using multiple nested goroutines to process lots of MMFs simultaneously
// while minimizing the time to first reponse for clients.
//
//nolint:gocognit,cyclop,gocyclo,maintidx
func (s *grpcServer) InvokeMatchmakingFunctions(req *pb.MmfRequest, stream pb.OpenMatchService_InvokeMatchmakingFunctionsServer) error {
startTime := time.Now()
// Set a timeout for this API call, but ignore context cancellations from
// the background context. It is fully intended that MMFs can run longer
// than the matchmaker is willing to wait, so we want them not to get
// cancelled when the calling matchmaker cancels its context. However, best
// practices dictate that we define /some/ timeout (default: 10 mins)
mmfTimeout := time.Duration(cfg.GetInt("OM_MMF_TIMEOUT_SECS")) * time.Second
ctx, cancel := context.WithCancelCause(context.WithoutCancel(context.Background())) // ignore cancellation from parent context
ctx, _ = context.WithTimeoutCause(ctx, mmfTimeout, MMFTimeoutError)
defer func() {
logger.Debugf("MMFs complete, sending context cancellation after %04d ms", time.Since(startTime).Milliseconds())
cancel(MMFsComplete)
}()
// input validation
if req.GetProfile() == nil {
return fmt.Errorf("%w", status.Error(codes.InvalidArgument, "profile is required"))
}
if req.GetMmfs() == nil {
return fmt.Errorf("%w", status.Error(codes.InvalidArgument, "list of mmfs to invoke is required"))
}
// By convention, profile names should use reverse-DNS notation
// https://en.wikipedia.org/wiki/Reverse_domain_name_notation This
// helps with metric attribute cardinality, as we can record
// profile names alongside metric readings after stripping off the
// most-unique portion.
profileName := req.GetProfile().GetName()
i := strings.LastIndex(profileName, ".")
if i > 0 {
profileName = profileName[0:i]
}
logger := logger.WithFields(logrus.Fields{
"stage": "handling",
"rpc": "InvokeMatchmakingFunctions",
"profile_name": profileName,
})
// Apply filters from all pools specified in this profile
// to find the participating tickets for each pool. Start by snapshotting
// the state of the ticket cache to a new data structure, so we can work
// with that snapshot without incurring a bunch of additional access
// contention on the ticket cache itself, which will continue to be updated
// as we process. The participants of these pools won't reflect updates to
// the ticket cache that happen after this point.
// Copy the ticket cache, leaving out inactive tickets.
activeTickets := setDifference(&tc.Tickets, &tc.InactiveSet)
// This is largely for local debugging when developiong a matchmaker against
// OM. Assignments are considered deprecated, so OM shouldn't be
// responsible for them in production and doesn't output OTEL metrics for
// tracking assignment counts.
unassignedTickets := setDifference(&tc.InactiveSet, &tc.Assignments)
// Track the number of tickets in the cache at the moment filters are applied
logger.Infof(" %5d tickets active, %5d tickets inactive without assignment",
len(activeTickets), len(unassignedTickets))
otelCachedTicketsAvailableForFilteringPerInvokeMMFCall.Record(context.Background(), int64(len(activeTickets)),
metric.WithAttributes(attribute.String("profile.name", profileName)),
)
// validate pool filters before filling them
validPools := map[string][]*pb.Ticket{}
for name, pool := range req.GetProfile().GetPools() {
if valid, err := filter.ValidatePoolFilters(pool); valid {
// Initialize a clean roster for this pool
validPools[name] = make([]*pb.Ticket, 0)
} else {
logger.Error("Unable to fill pool with tickets, invalid: %w", err)
}
}
// Track the number of pools in requested profiles
otelPoolsPerProfile.Record(context.Background(), int64(len(validPools)),
metric.WithAttributes(attribute.String("profile.name", profileName)),
)
// Perform filtering, and 'chunk' the pools into 4mb pieces for streaming
// (4mb is default max pb size.)
// 1000 instead of 1024 for a little extra headroom, we're not trying to
// hyper-optimize here. Every chunk contains the entire profile, and a
// portion of the tickets in that profile's pools.
maxPbSize := 4 * 1000 * 1000
// Figure out how big the message is with the profile populated, but all pools empty
emptyChunkSize := proto.Size(&pb.ChunkedMmfRunRequest{Profile: req.GetProfile(), NumChunks: math.MaxInt32})
curChunkSize := emptyChunkSize
// Array of 'chunks', each consisting of a portion of the pools in this profile.
// MMFs need to re-assemble the pools with a simple loop+concat over all chunks
var chunkCount int32
chunkedPools := make([]map[string][]*pb.Ticket, 0)
chunkedPools = append(chunkedPools, map[string][]*pb.Ticket{})
for _, ticket := range activeTickets {
for name, _ := range validPools {
// All the implementation details of filtering are in github.com/googleforgames/open-match2/v2/internal/filter/filter.go
if filter.In(req.GetProfile().GetPools()[name], ticket.(*pb.Ticket)) {
ticketSize := proto.Size(ticket.(*pb.Ticket))
// Check if this ticket will put us over the max pb size for this chunk
if (curChunkSize + ticketSize) >= maxPbSize {
// Start a new chunk
curChunkSize = emptyChunkSize
chunkCount++
chunkedPools = append(chunkedPools, map[string][]*pb.Ticket{})
}
chunkedPools[chunkCount][name] = append(chunkedPools[chunkCount][name], ticket.(*pb.Ticket))
curChunkSize += ticketSize
}
}
}
// Track how large (that is, how many chunks) the profile became once
// we populated all the pools with tickets.
logger.Debugf("%v pools packed into %v chunks", len(validPools), len(chunkedPools))
otelProfileChunksPerInvokeMMFCall.Record(context.Background(), int64(len(chunkedPools)),
metric.WithAttributes(attribute.String("profile.name", profileName)),
)
// Put final participant rosters into the pools.
// Send the full profile in each streamed 'chunk', only the pools are broken
// up to keep the pbs under the max size. This could probably be optimized
// so we don't repeatedly send profile details in larger payloads, but this
// implementation is 1) simpler and 2) could still be useful to the receiving
// MMF if it somehow only got part of the chunked request.
chunkedRequest := make([]*pb.ChunkedMmfRunRequest, len(chunkedPools))
for chunkIndex, chunk := range chunkedPools {
logger.Debugf("processing chunk %v ", chunkIndex)
// Fill this request 'chunk' with the chunked pools we built above
pools := make(map[string]*pb.Pool)
profile := &pb.Profile{
Name: req.GetProfile().GetName(),
Pools: pools,
Extensions: req.GetProfile().GetExtensions(),
}
for name, participantRoster := range chunk {
logger.Debugf("making chunk containing %v tickets", len(participantRoster))
profile.GetPools()[name] = &pb.Pool{
Participants: &pb.Roster{
Name: name + "_roster",
Tickets: participantRoster,
},
}
}
chunkedRequest[chunkIndex] = &pb.ChunkedMmfRunRequest{
Profile: profile,
NumChunks: int32(len(chunkedPools)),
}
}
// MMF Result fan-in goroutine
// Simple fan-in channel pattern implemented as an async inline goroutine.
// Asynchronously sends matches to InvokeMatchMakingFunction() caller as
// they come in from the concurrently-running MMFs. Exits when all
// MMFs are complete and matchChan is closed.
//
// Channel on which MMFs return their matches.
matchChan := make(chan *pb.Match)
var fanwg sync.WaitGroup
go func() {
fanwg.Add(1)
// Local logger with a field to indicate logs are from this goroutine.
logger := logger.WithFields(logrus.Fields{"stage": "fan-in"})
logger.Trace("MMF results fan-in goroutine active")
for match := range matchChan {
logger.WithFields(logrus.Fields{
"match_id": match.GetId(),
}).Trace("streaming back match to matchmaker")
// Send the match back to the caller.
err := stream.Send(&pb.StreamedMmfResponse{Match: match})
// om-core doesn't retry sending MMF results; if your matchmaker cancels the
// context or stops responding, om-core drops the rest of the match results.
if err != nil {
logger.Errorf("Unable to stream match result back to matchmaker, dropping %v matches: %v, %v", len(matchChan)+1, err, context.Cause(ctx))
logger.Errorf("dropped: %v", match.GetId())
return
}
}
logger.Trace("ALL MMFS COMPLETE: exiting MMF results fan-in goroutine")
fanwg.Done()
return
}()
// Set up grpc dial options for all MMFs.
//
// Our design anticipates MMFs that aren't constrained by OM - maybe you
// want to write an MMF that does an monte-carlo simulation or is writing
// analytics to a database, and will never return a match to the calling
// client. Since we want to allow long-running MMFs to continue until they
// finish processing we send them a context below that isn't cancelled when
// the client closes the stream; but best practice dictates /some/ timeout
// here (default 10 mins).
var opts []grpc.DialOption
opts = append(opts,
grpc.WithConnectParams(grpc.ConnectParams{MinConnectTimeout: mmfTimeout}),
)
// Auth using IAM when on Google Cloud
// Maintain a dictionary of tokens and tokenSources by MMF so we aren't
// recreating them every time.
tokenSources := map[string]oauth2.TokenSource{}
tokens := map[string]*oauth2.Token{}
// Invoke each requested MMF, and put the matches they stream back into the match channel.
// TODO: Technically this would be better as an ErrorGroup but this already works.
// https://stackoverflow.com/questions/71246253/handle-goroutine-termination-and-error-handling-via-error-group
var mmfwg sync.WaitGroup
for _, mmf := range req.GetMmfs() {
// Add this invocation to the MMF wait group.
mmfwg.Add(1)
// MMF fan-out goroutine
// Call the MMF asynchronously, so all processing happens concurrently
go func(mmf *pb.MatchmakingFunctionSpec) error {
defer mmfwg.Done()
// var init
var err error
// Parse the connection URI
mmfUrl, err := url.Parse(fmt.Sprintf("%v:%v", mmf.GetHost(), mmf.GetPort()))
if err != nil {
err = status.Error(codes.Internal,
fmt.Errorf("Failed to parse mmf host uri %v: %w", mmf.GetHost(), err).Error())
logger.Error(err)
otelMmfFailures.Add(context.Background(), 1, metric.WithAttributes(
attribute.String("mmf.name", mmf.GetName()),
attribute.String("profile.name", profileName),
))
return fmt.Errorf("%w", err)
}
// Legacy REST is not implemented in OM2; error.
if mmf.GetType() == pb.MatchmakingFunctionSpec_REST {
otelMmfFailures.Add(context.Background(), 1, metric.WithAttributes(
attribute.String("mmf.name", mmf.GetName()),
attribute.String("profile.name", profileName),
))
// OM1 allowed MMFs to use HTTP RESTful grpc implementations,
// but its usage was incredibly low. This is where that would
// be re-implemented if we see enough user demand.
return status.Error(codes.Internal, fmt.Errorf("REST Mmf invocation NYI %v: %w", mmfUrl.Host, err).Error())
} // pb.MatchmakingFunctionSpec_gRPC is default.
// Add mmf details to all logs from this point on.
logger := logger.WithFields(logrus.Fields{
"mmf_name": mmf.GetName(),
"mmf_host": mmfUrl.Host,
})
// TODO: Potential future optimization: cache connections/clients using a pool and re-use
var conn *grpc.ClientConn
// Default to unauthenticated grpc
creds := insecure.NewCredentials()
// If MMF server is running with TLS (https), we must authenticate
// Note: TLS is transparently added to the MMF server when running on Cloud Run,
// see https://cloud.google.com/run/docs/container-contract#tls
if mmfUrl.Scheme == "https" {
creds = credentials.NewTLS(tlsConfig)
audience := mmf.GetHost()
httpsLogger := logger.WithFields(logrus.Fields{
"audience": audience,
})
httpsLogger.Infof("HTTPS mmf url detected. Attempting to set up secure grpc credentials.")
// Fetch Google Cloud IAM auth token; have to make a new one if
// it doesn't exists or is invalid
var token *oauth2.Token
var exists bool
if token, exists = tokens[audience]; !exists || !token.Valid() {
// Check for existing tokenSource
if _, exists = tokenSources[audience]; !exists {
// Create a TokenSource if none exists.
tokenSources[audience], err = idtoken.NewTokenSource(ctx, audience)
if err != nil {
err = status.Error(codes.Internal, fmt.Errorf(
"Failed to get a source for ID tokens to contact gRPC MMF at %v: %w",
mmfUrl.Host, err).Error())
httpsLogger.Error(err)
otelMmfFailures.Add(ctx, 1, metric.WithAttributes(
attribute.String("mmf.name", mmf.GetName()),
attribute.String("profile.name", profileName),
))
return fmt.Errorf("%w", err)
}
httpsLogger.Trace("successfully initialized new token source")
}
// Get new token from the tokenSource, store it in the map
// for later use
tokens[audience], err = tokenSources[audience].Token()
if err != nil {
err = status.Error(codes.Internal, fmt.Errorf(
"Failed to get ID token to contact gRPC MMF at %v: %w",
mmfUrl.Host, err).Error())
httpsLogger.Error(err)
otelMmfFailures.Add(ctx, 1, metric.WithAttributes(
attribute.String("mmf.name", mmf.GetName()),
attribute.String("profile.name", profileName),
))
return fmt.Errorf("%w", err)
}
// New token successfully minted; use it for this call
token = tokens[audience]
// Make a truncated version of the token for trace logging.
if logrus.IsLevelEnabled(logrus.TraceLevel) {
truncToken := token.AccessToken
if len(truncToken) >= 8 {
truncToken = truncToken[0:7]
}
ttField := logrus.Fields{"trunc_access_token": fmt.Sprintf("%v...", truncToken)}
httpsLogger.WithFields(ttField).Trace("successfully retrieved new access token")
}
} else {
// Make a truncated version of the token for trace logging.
if logrus.IsLevelEnabled(logrus.TraceLevel) {
truncToken := token.AccessToken
if len(truncToken) >= 8 {
truncToken = truncToken[0:7]
}
ttField := logrus.Fields{"trunc_access_token": fmt.Sprintf("%v...", truncToken)}
httpsLogger.WithFields(ttField).Trace("reusing existing valid access token")
}
}
// Add Google Cloud IAM auth token to the context.
ctx = grpcMetadata.AppendToOutgoingContext(ctx,
"authorization", "Bearer "+token.AccessToken)
}
opts = append(opts,
grpc.WithTransportCredentials(creds),
grpc.WithAuthority(mmfUrl.Host),
)
// Connect to gRPC server for this mmf.
conn, err = grpc.NewClient(mmfUrl.Host, opts...)
if err != nil {
err = status.Error(codes.Internal, fmt.Errorf("Failed to dial gRPC for %v: %w", mmfUrl.Host, err).Error())
logger.Error(err)
otelMmfFailures.Add(ctx, 1, metric.WithAttributes(
attribute.String("mmf.name", mmf.GetName()),
attribute.String("profile.name", profileName),
))
return fmt.Errorf("%w", err)
}
defer conn.Close()
// Get MMF client. May be possible to re-use, but would
// require some validation of assumptions; for now we're
// re-creating on every call under the assumption this will allow
// individual MMF calls to be load-balanced when an MMF is under
// enough load to spin up additional instances. For more
// information about the challenges of load-balancing gRPC
// endpoints, see:
//https://grpc.io/blog/grpc-load-balancing/
client := pb.NewMatchMakingFunctionServiceClient(conn)
logger.Trace("Connected to MMF")
// Run the MMF
var mmfStream pb.MatchMakingFunctionService_RunClient
mmfStream, err = client.Run(ctx)
if err != nil {
// Example failure that will trigger this codepath:
// "Failed to connect to MMF at localhost:50443: rpc error:
// code = Unavailable desc = connection error: desc =
// \"transport: authentication handshake failed: tls: first
// record does not look like a TLS handshake\""
logger.Error(fmt.Errorf("Failed to connect to MMF at %v: %w", mmfUrl.Host, err).Error())
otelMmfFailures.Add(ctx, 1, metric.WithAttributes(
attribute.String("mmf.name", mmf.GetName()),
attribute.String("profile.name", profileName),
))
return status.Error(codes.Internal, fmt.Errorf("Failed to connect to MMF at %v: %w", mmfUrl.Host, err).Error())
}
logger.Trace("MMF .Run() invoked, sending profile chunks")
// Request itself is chunked if all the tickets returned in
// ticket pools result in a total request size larger than the
// default gRPC message size of 4mb.
for index, chunk := range chunkedRequest {
err = mmfStream.Send(chunk)
if err != nil {
logger.Errorf("Failed to send MmfRequest chunk to MMF: %v", err)
}
logger.Tracef("MMF request chunk %02d/%02d: %0.2fmb", index+1, len(chunkedRequest), float64(proto.Size(chunk))/float64(1024*1024))
}
// Make a waitgroup that lets us know when all ticket deactivations
// are complete. (All tickets in matches returned by the MMF are
// set to inactive by OM.)
var tdwg sync.WaitGroup
// i counts the number of results (which equates to matches
// received) for metrics reporting; this loop reads streaming
// match responses from the mmf and runs until a break
// statement is encountered.
var i int64
i = 0
for {
// Get results from MMF
var result *pb.StreamedMmfResponse
result, err = mmfStream.Recv()