diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 90408360b4..706ccbed1b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -19,7 +19,7 @@ jobs: steps: - name: Checkout repository uses: actions/checkout@v2 - + - name: Set up .env env: QUESTION_FIREBASE_CREDENTIAL_PATH: ${{ vars.QUESTION_SERVICE_FIREBASE_CREDENTIAL_PATH }} @@ -30,7 +30,7 @@ jobs: echo "FIREBASE_CREDENTIAL_PATH=$QUESTION_FIREBASE_CREDENTIAL_PATH" >> .env echo "JWT_SECRET=$JWT_SECRET" >> .env echo "EXECUTION_SERVICE_URL=$EXECUTION_SERVICE_URL" >> .env - + - name: Set up credentials env: QUESTION_FIREBASE_JSON: ${{ secrets.QUESTION_SERVICE_FIREBASE_CREDENTIAL }} @@ -41,8 +41,8 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 - with: - go-version: '1.23.x' + with: + go-version: "1.23.x" - name: Install Go dependencies run: | @@ -51,7 +51,7 @@ jobs: - name: Install firebase tools run: curl -sL firebase.tools | bash - + - name: Run Go tests with Firebase emulator run: firebase emulators:exec --only firestore 'cd ./apps/question-service; go test -v ./tests' @@ -66,11 +66,11 @@ jobs: run: | cd ./apps/frontend cp .env.example .env - + - name: Set up Node.js uses: actions/setup-node@v2 with: - node-version: '22' + node-version: "22" - name: Install pnpm run: npm i -g pnpm @@ -117,6 +117,7 @@ jobs: EXECUTION_SERVICE_PORT: ${{ vars.EXECUTION_SERVICE_PORT }} MATCHING_SERVICE_TIMEOUT: ${{ vars.MATCHING_SERVICE_TIMEOUT }} REDIS_URL: ${{ vars.REDIS_URL }} + RABBITMQ_URL: ${{ vars.RABBITMQ_URL }} QUESTION_SERVICE_GRPC_URL: ${{ vars.QUESTION_SERVICE_GPRC_URL }} run: | cd ./apps/frontend @@ -147,11 +148,13 @@ jobs: cd ../history-service echo "FIREBASE_CREDENTIAL_PATH=$HISTORY_FIREBASE_CREDENTIAL_PATH" >> .env echo "PORT=$HISTORY_SERVICE_PORT" >> .env - + echo "RABBITMQ_URL=$RABBITMQ_URL" >> .env + cd ../execution-service echo "FIREBASE_CREDENTIAL_PATH=$EXECUTION_FIREBASE_CREDENTIAL_PATH" >> .env echo "PORT=$EXECUTION_SERVICE_PORT" >> .env echo "HISTORY_SERVICE_URL=$HISTORY_SERVICE_URL" >> .env + echo "RABBITMQ_URL=$RABBITMQ_URL" >> .env cd ../signalling-service echo "PORT=$SIGNALLING_SERVICE_PORT" >> .env @@ -170,7 +173,7 @@ jobs: cd ../history-service echo "$HISTORY_FIREBASE_JSON" > "./$HISTORY_FIREBASE_CREDENTIAL_PATH" - + cd ../execution-service echo "$EXECUTION_FIREBASE_JSON" > "./$EXECUTION_FIREBASE_CREDENTIAL_PATH" diff --git a/apps/docker-compose.yml b/apps/docker-compose.yml index a0565fe1dd..12d02b5661 100644 --- a/apps/docker-compose.yml +++ b/apps/docker-compose.yml @@ -68,7 +68,9 @@ services: - apps_network volumes: - ./history-service:/history-service - + depends_on: + - rabbitmq + signalling-service: build: context: ./signalling-service @@ -95,6 +97,8 @@ services: volumes: - ./execution-service:/execution-service - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - rabbitmq redis: image: redis:latest @@ -104,6 +108,19 @@ services: - 6379:6379 container_name: redis-container + rabbitmq: + image: rabbitmq:3-management + networks: + - apps_network + ports: + - 5672:5672 # Port for RabbitMQ message broker + - 15672:15672 # Port for RabbitMQ Management UI + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + volumes: + - rabbitmq_data:/var/lib/rabbitmq + python-sandbox: build: context: ./execution-service/execution/python @@ -114,3 +131,8 @@ services: networks: apps_network: + +volumes: + # Mounts a volume for RabbitMQ data persistence. + # This ensures that data is not lost when the container is restarted or removed. + rabbitmq_data: diff --git a/apps/execution-service/.env.example b/apps/execution-service/.env.example index 6a1fb0bd86..4122078fbd 100644 --- a/apps/execution-service/.env.example +++ b/apps/execution-service/.env.example @@ -3,6 +3,8 @@ PORT=8083 # If you are NOT USING docker, use the below variables # HISTORY_SERVICE_URL=http://localhost:8082/ +# RABBITMQ_URL=amqp://guest:guest@localhost:5672/ # If you are USING docker, use the below variables HISTORY_SERVICE_URL=http://history-service:8082/ +RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/ diff --git a/apps/execution-service/README.md b/apps/execution-service/README.md index 2961ed152e..c53dc3ab9f 100644 --- a/apps/execution-service/README.md +++ b/apps/execution-service/README.md @@ -20,7 +20,32 @@ go run main.go The server will be available at http://localhost:8083. -## Running the Application via Docker +### Setting up message queue with RabbitMQ + +A message queue is used to pass submission results asynchronously from the execution-service to the history-service. + +1. In order to do so, we can run the following command to set up a docker container for RabbitMQ: + +```bash +docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management +``` + +2. Then we can run the execution-service: + +```bash +go run main.go +``` + +3. We can run the history-service by changing our directory and running the same command: + +```bash +cd ../history-service +go run main.go +``` + +To view more details on the RabbitMQ queue, we can go to `localhost:15672`, and login using `guest` as the username and password. + +### Running the Application via Docker To run the application via Docker, run the following command: @@ -74,10 +99,10 @@ The following json format will be returned: ```json [ - { - "input":"hello", - "expected":"olleh" - } + { + "input": "hello", + "expected": "olleh" + } ] ``` @@ -98,16 +123,16 @@ The following json format will be returned: ```json { - "visibleTestResults":[ + "visibleTestResults": [ { - "input":"hello", - "expected":"olleh", - "actual":"olleh", - "passed":true, - "error":"" + "input": "hello", + "expected": "olleh", + "actual": "olleh", + "passed": true, + "error": "" } ], - "customTestResults":null + "customTestResults": null } ``` @@ -127,29 +152,29 @@ The following json format will be returned: ```json { - "visibleTestResults":[ + "visibleTestResults": [ { - "input":"hello", - "expected":"olleh", - "actual":"olleh", - "passed":true, - "error":"" + "input": "hello", + "expected": "olleh", + "actual": "olleh", + "passed": true, + "error": "" } ], - "customTestResults":[ + "customTestResults": [ { - "input":"Hannah", - "expected":"hannaH", - "actual":"hannaH", - "passed":true, - "error":"" + "input": "Hannah", + "expected": "hannaH", + "actual": "hannaH", + "passed": true, + "error": "" }, { - "input":"abcdefg", - "expected":"gfedcba", - "actual":"gfedcba", - "passed":true, - "error":"" + "input": "abcdefg", + "expected": "gfedcba", + "actual": "gfedcba", + "passed": true, + "error": "" } ] } @@ -178,20 +203,20 @@ The following json format will be returned: ```json { - "visibleTestResults":[ + "visibleTestResults": [ { - "input":"hello", - "expected":"olleh", - "actual":"olleh", - "passed":true, - "error":"" + "input": "hello", + "expected": "olleh", + "actual": "olleh", + "passed": true, + "error": "" } ], - "hiddenTestResults":{ - "passed":2, - "total":2 + "hiddenTestResults": { + "passed": 2, + "total": 2 }, - "status":"Accepted" + "status": "Accepted" } ``` @@ -199,19 +224,19 @@ If compilation error exists or any of the tests (visible and hidden) fails, stat ```json { - "visibleTestResults":[ + "visibleTestResults": [ { - "input":"hello", - "expected":"olleh", - "actual":"", - "passed":false, - "error":"Command execution failed: Traceback (most recent call last):\n File \"/tmp/4149249165.py\", line 2, in \u003cmodule\u003e\n prit(name[::-1])\n ^^^^\nNameError: name 'prit' is not defined. Did you mean: 'print'?\n: %!w(*exec.ExitError=\u0026{0x4000364678 []})" + "input": "hello", + "expected": "olleh", + "actual": "", + "passed": false, + "error": "Command execution failed: Traceback (most recent call last):\n File \"/tmp/4149249165.py\", line 2, in \u003cmodule\u003e\n prit(name[::-1])\n ^^^^\nNameError: name 'prit' is not defined. Did you mean: 'print'?\n: %!w(*exec.ExitError=\u0026{0x4000364678 []})" } ], - "hiddenTestResults":{ - "passed":0, - "total":2 + "hiddenTestResults": { + "passed": 0, + "total": 2 }, - "status":"Attempted" + "status": "Attempted" } ``` diff --git a/apps/execution-service/go.mod b/apps/execution-service/go.mod index 32a516cf23..aff484d2a7 100644 --- a/apps/execution-service/go.mod +++ b/apps/execution-service/go.mod @@ -31,6 +31,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect diff --git a/apps/execution-service/go.sum b/apps/execution-service/go.sum index 7d3c75e147..8a595690fa 100644 --- a/apps/execution-service/go.sum +++ b/apps/execution-service/go.sum @@ -85,6 +85,8 @@ github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwA github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/apps/execution-service/handlers/submit.go b/apps/execution-service/handlers/submit.go index 0bd06656a3..1411ffcea7 100644 --- a/apps/execution-service/handlers/submit.go +++ b/apps/execution-service/handlers/submit.go @@ -1,16 +1,16 @@ package handlers import ( - "bytes" "encoding/json" "execution-service/constants" + "execution-service/messagequeue" "execution-service/models" "execution-service/utils" "fmt" + "net/http" + "github.com/go-chi/chi/v5" "google.golang.org/api/iterator" - "net/http" - "os" ) func (s *Service) ExecuteVisibleAndHiddenTestsAndSubmit(w http.ResponseWriter, r *http.Request) { @@ -61,7 +61,6 @@ func (s *Service) ExecuteVisibleAndHiddenTestsAndSubmit(w http.ResponseWriter, r } // Save the collaboration history via the history-service - // TODO: convert to message queue submissionReq := models.Submission{ Code: submission.Code, Language: submission.Language, @@ -84,34 +83,12 @@ func (s *Service) ExecuteVisibleAndHiddenTestsAndSubmit(w http.ResponseWriter, r return } - // get history-service url from os env - historyServiceUrl := os.Getenv("HISTORY_SERVICE_URL") - if historyServiceUrl == "" { - http.Error(w, "HISTORY_SERVICE_URL is not set", http.StatusInternalServerError) - return - } - - req, err := http.NewRequest(http.MethodPost, historyServiceUrl+"histories", - bytes.NewBuffer(jsonData)) + err = messagequeue.PublishSubmissionMessage(jsonData) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("Failed to save submission history: %v", err), http.StatusInternalServerError) return } - req.Header.Set("Content-Type", "application/json") - - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - http.Error(w, "Failed to save submission history", http.StatusInternalServerError) - } - w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(testResults) diff --git a/apps/execution-service/main.go b/apps/execution-service/main.go index 706d0afcd6..e2da2948be 100644 --- a/apps/execution-service/main.go +++ b/apps/execution-service/main.go @@ -3,6 +3,8 @@ package main import ( "context" "execution-service/handlers" + "execution-service/messagequeue" + "execution-service/utils" "fmt" "log" "net/http" @@ -21,20 +23,20 @@ import ( func main() { // Load .env file err := godotenv.Load() - if err != nil { - log.Fatal("Error loading .env file") - } + utils.FailOnError(err, "Error loading .env file") // Initialize Firestore client ctx := context.Background() client, err := initFirestore(ctx) - if err != nil { - log.Fatalf("Failed to initialize Firestore client: %v", err) - } + utils.FailOnError(err, "Failed to initialize Firestore client") defer client.Close() service := &handlers.Service{Client: client} + amqpConnection, amqpChannel := messagequeue.InitRabbitMQServer() + defer amqpConnection.Close() + defer amqpChannel.Close() + r := initChiRouter(service) initRestServer(r) } @@ -107,7 +109,5 @@ func initRestServer(r *chi.Mux) { // Start the server log.Printf("Starting REST server on http://localhost:%s", port) err := http.ListenAndServe(fmt.Sprintf(":%s", port), r) - if err != nil { - log.Fatalf("Failed to start server: %v", err) - } + utils.FailOnError(err, "Failed to start REST server") } diff --git a/apps/execution-service/messagequeue/rabbitmq.go b/apps/execution-service/messagequeue/rabbitmq.go new file mode 100644 index 0000000000..1a6aafcde7 --- /dev/null +++ b/apps/execution-service/messagequeue/rabbitmq.go @@ -0,0 +1,78 @@ +package messagequeue + +import ( + "execution-service/utils" + "fmt" + "log" + "os" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + CODE_SUBMISSION_QUEUE_KEY = "code-submission" + NUM_RETRIES = 10 +) + +var ( + codeSubmissionQueue amqp.Queue + rabbitMQChannel *amqp.Channel +) + +func InitRabbitMQServer() (*amqp.Connection, *amqp.Channel) { + conn := connectToRabbitMQ() + + // Create a channel + ch, err := conn.Channel() + utils.FailOnError(err, "Failed to open a channel") + rabbitMQChannel = ch + + // Declare a queue + q, err := ch.QueueDeclare( + CODE_SUBMISSION_QUEUE_KEY, // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + utils.FailOnError(err, "Failed to declare a queue") + codeSubmissionQueue = q + + return conn, ch +} + +func connectToRabbitMQ() *amqp.Connection { + var conn *amqp.Connection + var err error + rabbitMQURL := os.Getenv("RABBITMQ_URL") + for i := 0; i < NUM_RETRIES; i++ { // Retry up to 10 times + conn, err = amqp.Dial(rabbitMQURL) + if err == nil { + log.Println("Connected to RabbitMQ") + return conn + } + log.Printf("Failed to connect to RabbitMQ, retrying in 5 seconds... (Attempt %d/%d)", i+1, NUM_RETRIES) + time.Sleep(5 * time.Second) + } + utils.FailOnError(err, fmt.Sprintf("Failed to connect to RabbitMQ after %d attempts", NUM_RETRIES)) + return nil +} + +func PublishSubmissionMessage(submission []byte) error { + err := rabbitMQChannel.Publish( + "", // exchange + codeSubmissionQueue.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: submission, + }) + if err != nil { + return fmt.Errorf("Failed to publish a message: %v", err) + } + log.Printf("RabbitMQ: [x] Sent %s", submission) + return nil +} diff --git a/apps/execution-service/utils/log.go b/apps/execution-service/utils/log.go new file mode 100644 index 0000000000..a77b2c29ca --- /dev/null +++ b/apps/execution-service/utils/log.go @@ -0,0 +1,9 @@ +package utils + +import "log" + +func FailOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + } +} diff --git a/apps/history-service/.env.example b/apps/history-service/.env.example index b3bd4db2b4..0f3f18c101 100644 --- a/apps/history-service/.env.example +++ b/apps/history-service/.env.example @@ -1,2 +1,8 @@ FIREBASE_CREDENTIAL_PATH=cs3219-staging-codehisto-bb61c-firebase-adminsdk-egopb-95cfaf9b87.json -PORT=8082 \ No newline at end of file +PORT=8082 + +# If you are NOT USING docker, use the below variables +# RABBITMQ_URL=amqp://guest:guest@localhost:5672/ + +# If you are USING docker, use the below variables +RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/ \ No newline at end of file diff --git a/apps/history-service/README.md b/apps/history-service/README.md index 681c629bcd..6fec9e5bbd 100644 --- a/apps/history-service/README.md +++ b/apps/history-service/README.md @@ -47,9 +47,34 @@ To start the server, run the following command: go run main.go ``` +### Setting up message queue with RabbitMQ + +A message queue is used to pass submission results asynchronously from the execution-service to the history-service. + +1. In order to do so, we can run the following command to set up a docker container for RabbitMQ: + +```bash +docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management +``` + +2. Then we can run the history-service: + +```bash +go run main.go +``` + +3. We can run the execution-service by changing our directory and running the same command: + +```bash +cd ../execution-service +go run main.go +``` + +To view more details on the RabbitMQ queue, we can go to `localhost:15672`, and login using `guest` as the username and password. + The server will be available at http://localhost:8082. -## Running the Application via Docker +### Running the Application via Docker To run the application via Docker, run the following command: diff --git a/apps/history-service/databases/history.go b/apps/history-service/databases/history.go new file mode 100644 index 0000000000..e9b41e4b14 --- /dev/null +++ b/apps/history-service/databases/history.go @@ -0,0 +1,32 @@ +package databases + +import ( + "context" + "history-service/models" + + "cloud.google.com/go/firestore" +) + +func CreateHistory(client *firestore.Client, ctx context.Context, submissionHistory models.SubmissionHistory) (*firestore.DocumentRef, error) { + // Document reference ID in firestore mapped to the match ID in model + collection := client.Collection("collaboration-history") + + docRef, _, err := collection.Add(ctx, map[string]interface{}{ + "title": submissionHistory.Title, + "code": submissionHistory.Code, + "language": submissionHistory.Language, + "user": submissionHistory.User, + "matchedUser": submissionHistory.MatchedUser, + "matchedTopics": submissionHistory.MatchedTopics, + "questionDocRefId": submissionHistory.QuestionDocRefID, + "questionDifficulty": submissionHistory.QuestionDifficulty, + "questionTopics": submissionHistory.QuestionTopics, + "status": submissionHistory.Status, + "createdAt": firestore.ServerTimestamp, + "updatedAt": firestore.ServerTimestamp, + }) + if err != nil { + return nil, err + } + return docRef, nil +} diff --git a/apps/history-service/go.mod b/apps/history-service/go.mod index 37d6005bf1..2bdb4f0c8a 100644 --- a/apps/history-service/go.mod +++ b/apps/history-service/go.mod @@ -31,6 +31,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect diff --git a/apps/history-service/go.sum b/apps/history-service/go.sum index ce7234e8f6..fbc00bcfaa 100644 --- a/apps/history-service/go.sum +++ b/apps/history-service/go.sum @@ -85,6 +85,8 @@ github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwA github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/apps/history-service/handlers/create.go b/apps/history-service/handlers/create.go index 3dc635d0dd..e4868730b7 100644 --- a/apps/history-service/handlers/create.go +++ b/apps/history-service/handlers/create.go @@ -2,11 +2,11 @@ package handlers import ( "encoding/json" + "history-service/databases" "history-service/models" "history-service/utils" "net/http" - "cloud.google.com/go/firestore" "google.golang.org/api/iterator" ) @@ -22,22 +22,7 @@ func (s *Service) CreateHistory(w http.ResponseWriter, r *http.Request) { } // Document reference ID in firestore mapped to the match ID in model - collection := s.Client.Collection("collaboration-history") - - docRef, _, err := collection.Add(ctx, map[string]interface{}{ - "title": submissionHistory.Title, - "code": submissionHistory.Code, - "language": submissionHistory.Language, - "user": submissionHistory.User, - "matchedUser": submissionHistory.MatchedUser, - "matchedTopics": submissionHistory.MatchedTopics, - "questionDocRefId": submissionHistory.QuestionDocRefID, - "questionDifficulty": submissionHistory.QuestionDifficulty, - "questionTopics": submissionHistory.QuestionTopics, - "status": submissionHistory.Status, - "createdAt": firestore.ServerTimestamp, - "updatedAt": firestore.ServerTimestamp, - }) + docRef, err := databases.CreateHistory(s.Client, ctx, submissionHistory) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/apps/history-service/main.go b/apps/history-service/main.go index 751f2ed525..1b79073caf 100644 --- a/apps/history-service/main.go +++ b/apps/history-service/main.go @@ -3,7 +3,10 @@ package main import ( "context" "fmt" + "history-service/databases" "history-service/handlers" + "history-service/messagequeue" + "history-service/utils" "log" "net/http" "os" @@ -20,20 +23,21 @@ import ( func main() { err := godotenv.Load() - if err != nil { - log.Fatal("Error loading .env file") - } + utils.FailOnError(err, "Error loading .env file") // Initialize Firestore client ctx := context.Background() client, err := initFirestore(ctx) - if err != nil { - log.Fatalf("Failed to initialize Firestore client: %v", err) - } + utils.FailOnError(err, "Failed to initialize Firestore client") defer client.Close() service := &handlers.Service{Client: client} + amqpConnection, amqpChannel := messagequeue.InitRabbitMQServer() + defer amqpConnection.Close() + defer amqpChannel.Close() + go messagequeue.ConsumeSubmissionMessages(client, databases.CreateHistory) + r := initChiRouter(service) initRestServer(r) } diff --git a/apps/history-service/messagequeue/rabbitmq.go b/apps/history-service/messagequeue/rabbitmq.go new file mode 100644 index 0000000000..f945a866d0 --- /dev/null +++ b/apps/history-service/messagequeue/rabbitmq.go @@ -0,0 +1,108 @@ +package messagequeue + +import ( + "context" + "encoding/json" + "fmt" + "history-service/models" + "history-service/utils" + "log" + "os" + "time" + + "cloud.google.com/go/firestore" + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + CODE_SUBMISSION_QUEUE_KEY = "code-submission" + NUM_RETRIES = 10 +) + +var ( + codeSubmissionQueue amqp.Queue + rabbitMQChannel *amqp.Channel +) + +func InitRabbitMQServer() (*amqp.Connection, *amqp.Channel) { + conn := connectToRabbitMQ() + + // Create a channel + ch, err := conn.Channel() + utils.FailOnError(err, "Failed to open a channel") + rabbitMQChannel = ch + + // Declare a queue + q, err := ch.QueueDeclare( + "code-submission", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + utils.FailOnError(err, "Failed to declare a queue") + codeSubmissionQueue = q + + return conn, ch +} + +func connectToRabbitMQ() *amqp.Connection { + var conn *amqp.Connection + var err error + rabbitMQURL := os.Getenv("RABBITMQ_URL") + for i := 0; i < NUM_RETRIES; i++ { // Retry up to 10 times + conn, err = amqp.Dial(rabbitMQURL) + if err == nil { + log.Println("Connected to RabbitMQ") + return conn + } + log.Printf("Failed to connect to RabbitMQ, retrying in 5 seconds... (Attempt %d/%d)", i+1, NUM_RETRIES) + time.Sleep(5 * time.Second) + } + utils.FailOnError(err, fmt.Sprintf("Failed to connect to RabbitMQ after %d attempts", NUM_RETRIES)) + return nil +} + +func ConsumeSubmissionMessages(client *firestore.Client, createSubmission func( + *firestore.Client, context.Context, models.SubmissionHistory) ( + *firestore.DocumentRef, error)) { + ctx := context.Background() + + // Consume messages from the queue + msgs, err := rabbitMQChannel.Consume( + codeSubmissionQueue.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + utils.FailOnError(err, "RabbitMQ: Failed to register a consumer") + + // Create a channel to block indefinitely + forever := make(chan bool) + + // Start a goroutine to handle incoming messages + go func() { + for d := range msgs { + log.Printf("RabbitMQ: Received a message") + + // Parse request + var submissionHistory models.SubmissionHistory + if err := json.Unmarshal(d.Body, &submissionHistory); err != nil { + log.Printf("RabbitMQ: Error decoding JSON: %v", err) + continue + } + + _, err := createSubmission(client, ctx, submissionHistory) + if err != nil { + log.Printf("RabbitMQ: %v", err) + } + } + }() + + log.Printf("RabbitMQ: [*] Waiting for messages.") + <-forever +} diff --git a/apps/history-service/utils/log.go b/apps/history-service/utils/log.go new file mode 100644 index 0000000000..a77b2c29ca --- /dev/null +++ b/apps/history-service/utils/log.go @@ -0,0 +1,9 @@ +package utils + +import "log" + +func FailOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + } +}