From a69ca17f591f7f2f49208615482aa9f8e55c2db9 Mon Sep 17 00:00:00 2001 From: Mohamed Al Ashaal Date: Tue, 4 Jun 2024 23:47:53 +0300 Subject: [PATCH 1/7] initial v3 commit --- .idea/.gitignore | 8 +++ .idea/modules.xml | 8 +++ .idea/vcs.xml | 6 ++ .idea/wsify.iml | 9 +++ CODE_OF_CONDUCT.md | 46 ------------- CONTRIBUTING.md | 6 -- Dockerfile | 9 --- Gopkg.lock | 91 ------------------------- Gopkg.toml | 46 ------------- LICENSE | 21 ------ Makefile | 30 --------- README.md | 123 ---------------------------------- debug.go | 14 ---- flags.go | 33 --------- go.mod | 17 ----- go.sum | 33 --------- main.go | 27 -------- message.go | 23 ------- server.go | 162 --------------------------------------------- webhook.go | 38 ----------- 20 files changed, 31 insertions(+), 719 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 .idea/wsify.iml delete mode 100644 CODE_OF_CONDUCT.md delete mode 100644 CONTRIBUTING.md delete mode 100644 Dockerfile delete mode 100644 Gopkg.lock delete mode 100644 Gopkg.toml delete mode 100644 LICENSE delete mode 100644 Makefile delete mode 100644 README.md delete mode 100644 debug.go delete mode 100644 flags.go delete mode 100644 go.mod delete mode 100644 go.sum delete mode 100644 main.go delete mode 100644 message.go delete mode 100644 server.go delete mode 100644 webhook.go diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..5abd84a --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/wsify.iml b/.idea/wsify.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/wsify.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md deleted file mode 100644 index 7ef5e90..0000000 --- a/CODE_OF_CONDUCT.md +++ /dev/null @@ -1,46 +0,0 @@ -# Contributor Covenant Code of Conduct - -## Our Pledge - -In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, gender identity and expression, level of experience, nationality, personal appearance, race, religion, or sexual identity and orientation. - -## Our Standards - -Examples of behavior that contributes to creating a positive environment include: - -* Using welcoming and inclusive language -* Being respectful of differing viewpoints and experiences -* Gracefully accepting constructive criticism -* Focusing on what is best for the community -* Showing empathy towards other community members - -Examples of unacceptable behavior by participants include: - -* The use of sexualized language or imagery and unwelcome sexual attention or advances -* Trolling, insulting/derogatory comments, and personal or political attacks -* Public or private harassment -* Publishing others' private information, such as a physical or electronic address, without explicit permission -* Other conduct which could reasonably be considered inappropriate in a professional setting - -## Our Responsibilities - -Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior. - -Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful. - -## Scope - -This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers. - -## Enforcement - -Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at m7medalash3al@gmail.com. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately. - -Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project's leadership. - -## Attribution - -This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at [http://contributor-covenant.org/version/1/4][version] - -[homepage]: http://contributor-covenant.org -[version]: http://contributor-covenant.org/version/1/4/ diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md deleted file mode 100644 index 442797a..0000000 --- a/CONTRIBUTING.md +++ /dev/null @@ -1,6 +0,0 @@ -How to contribute -================= -- Fork the repo -- Create a feature branch -- Push your changes -- Create a pull request diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index b280506..0000000 --- a/Dockerfile +++ /dev/null @@ -1,9 +0,0 @@ -FROM golang:alpine - -RUN apk update && apk add git - -RUN go install github.com/alash3al/wsify@latest - -ENTRYPOINT ["wsify"] - -WORKDIR /root/ \ No newline at end of file diff --git a/Gopkg.lock b/Gopkg.lock deleted file mode 100644 index 2ddeef3..0000000 --- a/Gopkg.lock +++ /dev/null @@ -1,91 +0,0 @@ -# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. - - -[[projects]] - branch = "master" - name = "github.com/alash3al/go-pubsub" - packages = ["."] - revision = "77583f42d4a36df3b5c7bea41f0f4624f0346f41" - -[[projects]] - branch = "master" - name = "github.com/bclicn/color" - packages = ["."] - revision = "4c02eff8a28c1add99b3b5b238c165056c3bea8e" - -[[projects]] - name = "github.com/dgrijalva/jwt-go" - packages = ["."] - revision = "06ea1031745cb8b3dab3f6a236daf2b0aa468b7e" - version = "v3.2.0" - -[[projects]] - name = "github.com/gorilla/websocket" - packages = ["."] - revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b" - version = "v1.2.0" - -[[projects]] - name = "github.com/labstack/echo" - packages = [ - ".", - "middleware" - ] - revision = "b338075a0fc6e1a0683dbf03d09b4957a289e26f" - version = "3.2.6" - -[[projects]] - name = "github.com/labstack/gommon" - packages = [ - "bytes", - "color", - "log", - "random" - ] - revision = "57409ada9da0f2afad6664c49502f8c50fbd8476" - version = "0.2.3" - -[[projects]] - name = "github.com/mattn/go-colorable" - packages = ["."] - revision = "ed8eb9e318d7a84ce5915b495b7d35e0cfe7b5a8" - version = "v0.0.6" - -[[projects]] - name = "github.com/mattn/go-isatty" - packages = ["."] - revision = "66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8" - -[[projects]] - branch = "master" - name = "github.com/valyala/bytebufferpool" - packages = ["."] - revision = "e746df99fe4a3986f4d4f79e13c1e0117ce9c2f7" - -[[projects]] - branch = "master" - name = "github.com/valyala/fasttemplate" - packages = ["."] - revision = "dcecefd839c4193db0d35b88ec65b4c12d360ab0" - -[[projects]] - branch = "master" - name = "golang.org/x/crypto" - packages = [ - "acme", - "acme/autocert" - ] - revision = "c7dcf104e3a7a1417abc0230cb0d5240d764159d" - -[[projects]] - branch = "master" - name = "golang.org/x/sys" - packages = ["unix"] - revision = "7dca6fe1f43775aa6d1334576870ff63f978f539" - -[solve-meta] - analyzer-name = "dep" - analyzer-version = 1 - inputs-digest = "94415153918a75860338f4dade7bfbdd4269d92b82ef1f6080b1e03a562a39a1" - solver-name = "gps-cdcl" - solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml deleted file mode 100644 index cc74753..0000000 --- a/Gopkg.toml +++ /dev/null @@ -1,46 +0,0 @@ -# Gopkg.toml example -# -# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html -# for detailed Gopkg.toml documentation. -# -# required = ["github.com/user/thing/cmd/thing"] -# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] -# -# [[constraint]] -# name = "github.com/user/project" -# version = "1.0.0" -# -# [[constraint]] -# name = "github.com/user/project2" -# branch = "dev" -# source = "github.com/myfork/project2" -# -# [[override]] -# name = "github.com/x/y" -# version = "2.4.0" -# -# [prune] -# non-go = false -# go-tests = true -# unused-packages = true - - -[[constraint]] - branch = "master" - name = "github.com/alash3al/go-pubsub" - -[[constraint]] - branch = "master" - name = "github.com/bclicn/color" - -[[constraint]] - name = "github.com/gorilla/websocket" - version = "1.2.0" - -[[constraint]] - name = "github.com/labstack/echo" - version = "3.2.6" - -[prune] - go-tests = true - unused-packages = true diff --git a/LICENSE b/LICENSE deleted file mode 100644 index 719bcfe..0000000 --- a/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2018 Mohammed Al Ashaal - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/Makefile b/Makefile deleted file mode 100644 index d1ea8c6..0000000 --- a/Makefile +++ /dev/null @@ -1,30 +0,0 @@ -test: - go test -v ./... - -install-gometalinter: - go get -v -u github.com/alecthomas/gometalinter - gometalinter --install - -LINT=$(eval export GOGC=400)\ -gometalinter --enable-all -D dupl -D lll -D gas -D goconst -D interfacer -D safesql -D test -D testify -D vetshadow\ - --tests --vendor --warn-unmatched-nolint --deadline=10m --concurrency=2 --enable-gc ./... -lint: install-gometalinter - $(LINT) - -install-dep: - go get -v -u github.com/golang/dep/cmd/dep - -dep-init: install-dep - dep init - -dep-ensure: install-dep - dep ensure - -dep-update: install-dep - dep ensure -update - -.PHONY:: \ - test \ - lint \ - dep-ensure \ - dep-update \ diff --git a/README.md b/README.md deleted file mode 100644 index 68439fb..0000000 --- a/README.md +++ /dev/null @@ -1,123 +0,0 @@ -Websocketify (wsify) v2.0 [![StackShare](https://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/alash3al/wsify) -========================= -> Just a tiny, simple and realtime pub/sub messaging service - - -![Quick Demo](https://i.imgur.com/jxyejg0.gif) - -Why -==== -> I wanted to create a tiny solution that can replace `pusher` and similar services and learning more about the realtime world, so I dispatched this project. - -Features -================ -- No dependencies, just a single binary ! -- Light and Tiny. -- Event-Driven Design `webhooks`. -- A client can listen on any resource. -- You control whether a client is allowed to `connect`, `subscribe`, `unsubscribe` using any programming language !. -- A client defines itself using `key` via the url query param i.e `?key=123`. -- Send messages to only certain users. - - -Installation -============== -- **Docker ?** > `docker run --network host alash3al/wsify -listen :8080 -webhook "http://localhost/wsify.php"` -- **Binary ?** > goto the [releases](https://github.com/alash3al/wsify/releases) page and download yours. -- **From Source ?** > `go get -u github.com/alash3al/wsify` - -Questions -========== - -### (1)- How can a client/device connect to the websocket service? -> by simply connecting to the following endpoint `ws://your.wsify.service:port/subscribe` - -### (2)- How can a client subscribe to a certain channel(s)/topic(s)? -> after connecting to the main websocket service `/subscribe`, you can send a simple json payload `commands` to ask wsify to `subscribe`/`unsubscribe` you to/from any channel/topic you want! - -### (3)- What is the commands format? -> -```json -{ - "action": "subscribe", - "value": "testchan" -} - -``` - -### (4)- Can I control the client command so I can allow/disallow certain users? -> Yes, each client can define itself using a query param `?key=client1`, this key will be passed to the `webhook` endpoint -as well as the event being executed, and here is the event format: -```javascript -{ - // one of the following: connect|subscribe|unsubscribe|disconnect - "action": "subscribe", - - // the channel if provided - "value": "testchan" -} -``` - -### (5)- How can I publish message to i.e `testchan`? -> Just a post request to `/publish` with the following format: -```javascript -{ - // the channel you want to publish to - "channel": "testchan", - - // the data to be send (any format) - "payload": "testchan", - - // array of clients "keys" (if you want certain clients only to receive the message) - "to": [] -} -``` -i.e -```bash -curl -X POST \ - -H "Content-Type: application/json" \ - -d '{"payload": "hi from the terminal", "channel": "testchan"}' \ - http://localhost:4040/publish -``` - -### (6)- Can I skip the webhook events for testing? -> Yes, `wsify --events=""` empty events means "NO WEBHOOK, WSIFY!" - -### (7)- How can I secure the publish endpoint, so no one except me can publish ?!! -> Easy :), Just change the endpoint to something more secure and hard to guess it is an alternative to access tokens .. etc, `wsify --publish="/broadcasteiru6chefoh1Yee0MohJ2um5eepaephies3zonai0Cae7quaeb"` - -### (8)- What about other options? -> `wsify --help` will help you ! - -### (9)- What is the websocket client used in demos? -> [Simple Websocket Client](https://chrome.google.com/webstore/detail/simple-websocket-client/pfdhoblngboilpfeibdedpjgfnlcodoo) - -### (10)- How I can use it over SSl/TLS with Nginx? -> You can use proxy, add this lines on your Nginx configration -``` - location /websocket/subscribe { - proxy_pass http://localhost:4040/subscribe; - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "Upgrade"; - } -``` -Now you can call websocket by `wss://yourdomain.com/websocket/subscribe` - - -![Quick Demo2](https://i.imgur.com/f8xVwJU.gif) - -Author -============= -This project has been created by [Mohamed Al Ashaal](http://github.com/alash3al) a Crazy Gopher ^^! - -Contribution -============= -- Fork the Repo -- Create a feature branch -- Push your changes to the created branch -- Create a pull request. - -License -============= -Wsify is open-sourced software licensed under the [MIT License](LICENSE). diff --git a/debug.go b/debug.go deleted file mode 100644 index 09b3013..0000000 --- a/debug.go +++ /dev/null @@ -1,14 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/bclicn/color" -) - -func debug(txt string) { - if !*FlagDebug { - return - } - fmt.Println(color.BCyan("[DEBUG]: ") + color.BLightYellow(txt)) -} diff --git a/flags.go b/flags.go deleted file mode 100644 index 7d5f262..0000000 --- a/flags.go +++ /dev/null @@ -1,33 +0,0 @@ -package main - -import ( - "flag" - "strings" -) - -var ( - //FlagHTTPAddr ... - FlagHTTPAddr = flag.String("listen", ":4040", "the http address to listen on") - //FlagAllowedOrigin ... - FlagAllowedOrigin = flag.String("origin", "*", "the allowed websocket origin(s), it accepts a comma separated list of domains, * means anything") - //FlagWebhookURL ... - FlagWebhookURL = flag.String("webhook", "http://localhost:8000", "the webhook") - //FlagWebhookEvents ... - FlagWebhookEvents = flag.String("events", "connect,disconnect,subscribe,unsubscribe", "the events to be sent to the webhook") - //FlagPublishEndpoint ... - FlagPublishEndpoint = flag.String("publish", "/publish", "the publish endpoint, just make it as secure as you can") - //FlagDebug - FlagDebug = flag.Bool("debug", false, "enable debugging mode") - //Version ... - Version = "2.3" - //WebhookEvents .. - WebhookEvents = map[string]bool{} -) - -//InitFlags ... -func InitFlags() { - flag.Parse() - for _, e := range strings.Split(strings.ToLower(*FlagWebhookEvents), ",") { - WebhookEvents[strings.TrimSpace(e)] = true - } -} diff --git a/go.mod b/go.mod deleted file mode 100644 index a5c1508..0000000 --- a/go.mod +++ /dev/null @@ -1,17 +0,0 @@ -module github.com/alash3al/wsify - -require ( - github.com/alash3al/go-pubsub v0.0.0-20180526124946-4b2b7702fea2 - github.com/bclicn/color v0.0.0-20180711051946-108f2023dc84 - github.com/dgrijalva/jwt-go v3.2.0+incompatible - github.com/gorilla/websocket v1.4.0 - github.com/labstack/echo v0.0.0-20171223171103-b338075a0fc6 - github.com/labstack/gommon v0.2.7 - github.com/mattn/go-colorable v0.0.9 - github.com/mattn/go-isatty v0.0.4 - github.com/rs/xid v1.2.1 - github.com/valyala/bytebufferpool v1.0.0 - github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 - golang.org/x/crypto v0.0.0-20180927165925-5295e8364332 - golang.org/x/sys v0.0.0-20180927150500-dad3d9fb7b6e -) diff --git a/go.sum b/go.sum deleted file mode 100644 index c1feba1..0000000 --- a/go.sum +++ /dev/null @@ -1,33 +0,0 @@ -github.com/alash3al/go-pubsub v0.0.0-20180131130321-77583f42d4a3/go.mod h1:OFNZAVEGEivuAfXw6Kc5539cV2jhjycsoSmXKwf9yBs= -github.com/alash3al/go-pubsub v0.0.0-20180526124946-4b2b7702fea2 h1:eUc66XG49vHysmpGPxngktLz8kQgzegGBL64F/MIIoI= -github.com/alash3al/go-pubsub v0.0.0-20180526124946-4b2b7702fea2/go.mod h1:OFNZAVEGEivuAfXw6Kc5539cV2jhjycsoSmXKwf9yBs= -github.com/bclicn/color v0.0.0-20161123064900-4c02eff8a28c/go.mod h1:Va9ap1qxjAWkIVaW1E9rH0aNgE8SDI5A4n8Ds8P0fAA= -github.com/bclicn/color v0.0.0-20180711051946-108f2023dc84 h1:cutFptzj+ospnc1PETUqcSVTH3VQ44Bi0rpt3nE9gvo= -github.com/bclicn/color v0.0.0-20180711051946-108f2023dc84/go.mod h1:Va9ap1qxjAWkIVaW1E9rH0aNgE8SDI5A4n8Ds8P0fAA= -github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= -github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/gorilla/websocket v1.2.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= -github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= -github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= -github.com/labstack/echo v0.0.0-20171223171103-b338075a0fc6 h1:c/xiiwAicwSUaVDcLCUqmCUpTGRe/X8nABJEHfy3Xlk= -github.com/labstack/echo v0.0.0-20171223171103-b338075a0fc6/go.mod h1:0INS7j/VjnFxD4E2wkz67b8cVwCLbBmJyDaka6Cmk1s= -github.com/labstack/gommon v0.0.0-20170925052817-57409ada9da0/go.mod h1:/tj9csK2iPSBvn+3NLM9e52usepMtrd5ilFYA+wQNJ4= -github.com/labstack/gommon v0.2.7 h1:2qOPq/twXDrQ6ooBGrn3mrmVOC+biLlatwgIu8lbzRM= -github.com/labstack/gommon v0.2.7/go.mod h1:/tj9csK2iPSBvn+3NLM9e52usepMtrd5ilFYA+wQNJ4= -github.com/mattn/go-colorable v0.0.6/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= -github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= -github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= -github.com/mattn/go-isatty v0.0.0-20160806122752-66b8e73f3f5c/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= -github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs= -github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= -github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= -github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4 h1:gKMu1Bf6QINDnvyZuTaACm9ofY+PRh+5vFz4oxBZeF8= -github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4/go.mod h1:50wTf68f99/Zt14pr046Tgt3Lp2vLyFZKzbFXTOabXw= -golang.org/x/crypto v0.0.0-20180308185624-c7dcf104e3a7/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20180927165925-5295e8364332 h1:hvQVdF6P9DX4OiKA5tpehlG6JsgzmyQiThG7q5Bn3UQ= -golang.org/x/crypto v0.0.0-20180927165925-5295e8364332/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/sys v0.0.0-20180308152046-7dca6fe1f437/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20180927150500-dad3d9fb7b6e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/main.go b/main.go deleted file mode 100644 index cc017eb..0000000 --- a/main.go +++ /dev/null @@ -1,27 +0,0 @@ -package main - -import ( - "fmt" - - "github.com/bclicn/color" -) - -func main() { - // handling any panic here ... - defer func() { - if err := recover(); err != nil { - fmt.Println(color.BRed("[!] Panic: ") + color.BLightYellow(err.(string))) - } - }() - - // parsing the command line flags - fmt.Println(color.BGreen("[*] Welcome to WSIFY"), color.BCyan(Version)) - InitFlags() - - // start the pub/sub server - fmt.Println(color.BGreen("[*] Listening for connections on address"), color.BCyan(*FlagHTTPAddr), color.BGreen(" ...")) - if err := InitWsServer(*FlagHTTPAddr); err != nil { - fmt.Println(color.BRed("[!] Error: ") + color.BLightYellow(err.Error())) - return - } -} diff --git a/message.go b/message.go deleted file mode 100644 index 52cb73e..0000000 --- a/message.go +++ /dev/null @@ -1,23 +0,0 @@ -package main - -// Message that will be polled from redis -type Message struct { - Payload interface{} `json:"payload,omitempty"` - To []string `json:"to,omitempty"` - Topic string `json:"channel,omitempty"` - Time int64 `json:"time,omitempty"` -} - -// IsUserAllowed checks whether the specified user is allowed -// to receive this message or not -func (m *Message) IsUserAllowed(u string) bool { - if len(m.To) < 1 { - return true - } - for _, v := range m.To { - if v == u { - return true - } - } - return false -} diff --git a/server.go b/server.go deleted file mode 100644 index 983e5d3..0000000 --- a/server.go +++ /dev/null @@ -1,162 +0,0 @@ -package main - -import ( - "log" - "net/http" - "strings" -) - -import ( - "github.com/alash3al/go-pubsub" - "github.com/gorilla/websocket" - "github.com/labstack/echo" - "github.com/labstack/echo/middleware" - "github.com/rs/xid" -) - -var ( - // WSUpgrader is Default websocket upgrader - WSUpgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - for _, origin := range strings.Split(*FlagAllowedOrigin, ",") { - origin = strings.TrimSpace(origin) - if origin == "*" || origin == r.Host { - return true - } - } - return false - }, - EnableCompression: true, - } - - //Broker default - Broker = pubsub.NewBroker() -) - -// WSHandler is the websocket request handler -func WSHandler(c echo.Context) error { - defer (func() { - if err := recover(); err != nil { - log.Println(err) - } - })() - key := c.QueryParam("key") - if key == "" { - key = "Anonymous#" + xid.New().String() - } - allowed := TriggerWebhook(Event{ - Action: "connect", - Key: key, - }) - if !allowed { - return c.JSON(403, "You aren't allowed to access this resource") - } - conn, err := WSUpgrader.Upgrade(c.Response(), c.Request(), nil) - if err != nil { - return nil - } - defer conn.Close() // nolint: errcheck - subscriber, err := Broker.Attach() - if err != nil { - conn.WriteJSON(map[string]string{ // nolint: errcheck - "error": "Sorry, couldn't allocate resources for you", - }) - return nil - } - closeCh := make(chan bool) - closed := false - debug("New Valid Connection(" + key + ")") - conn.SetCloseHandler(func(_ int, _ string) error { - debug("Connection(" + key + ") has been closed (by itself)") - closeCh <- true - return nil - }) - goRoutineAction(conn, closeCh, subscriber, key) - for !closed { - select { - case <-closeCh: - closed = true - Broker.Detach(subscriber) - TriggerWebhook(Event{Action: "disconnect", Key: key}) - case data := <-subscriber.GetMessages(): - msg := (data.GetPayload()).(Message) - debug("Incomming message to(" + key + ") ...") - if !msg.IsUserAllowed(key) { - debug("The client(" + key + ") isn't allowed to see the message") - continue - } - msg.Topic = data.GetTopic() - msg.Time = data.GetCreatedAt() - msg.To = nil - if err := conn.WriteJSON(msg); err != nil { - debug("A message cannot be published to (" + key + ") because of the following error (" + err.Error() + ")") - closeCh <- true - } - } - } - return nil -} - -func goRoutineAction(conn *websocket.Conn, closeCh chan bool, subscriber *pubsub.Subscriber, key string) { - go (func() { - var action Event - for { - if err := conn.ReadJSON(&action); err != nil { - debug("Cannot read from the connection of(" + key + "), may connection has been closed, closing ...") - break - } - debug("An action (" + action.Action + ") from the client(" + key + ")") - if action.Action == "subscribe" || action.Action == "unsubscribe" { - if !TriggerWebhook(Event{Action: action.Action, Key: key, Value: action.Value}) { - conn.WriteJSON(map[string]string{ // nolint: errcheck - "error": "You aren't allowed to access the requested resource", - }) - continue - } - } - if action.Action == "subscribe" { - Broker.Subscribe(subscriber, action.Value) - } else if action.Action == "unsubscribe" { - Broker.Unsubscribe(subscriber, action.Value) - } - } - close(closeCh) - })() -} - -// PublishHandler ... -func PublishHandler(c echo.Context) error { - var msg Message - if err := c.Bind(&msg); err != nil { - return c.JSON(422, map[string]interface{}{ - "success": false, - "error": err.Error(), - }) - } - Broker.Broadcast(msg, msg.Topic) - debug("publishing a message ...") - return c.JSON(200, map[string]interface{}{ - "success": true, - "data": msg, - }) -} - -// InitWsServer start the websocket server -func InitWsServer(addr string) error { - e := echo.New() - - e.Debug = true - e.HideBanner = true - - e.Pre(middleware.RemoveTrailingSlash()) - e.Use(middleware.Recover()) - e.Use(middleware.CORS()) - e.Use(middleware.GzipWithConfig(middleware.GzipConfig{Level: 9})) - - e.GET("/subscribe", WSHandler) - e.POST(*FlagPublishEndpoint, PublishHandler) - - return e.Start(addr) -} diff --git a/webhook.go b/webhook.go deleted file mode 100644 index f1a01b6..0000000 --- a/webhook.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -import ( - "bytes" - "encoding/json" - "net/http" - "strings" -) - -// Event ... -type Event struct { - Action string `json:"action"` - Key string `json:"key,omitempty"` - Value string `json:"value,omitempty"` -} - -// TriggerWebhook ... -func TriggerWebhook(ev Event) bool { - if *FlagWebhookURL == "" { - return true - } - ev.Action = strings.ToLower(ev.Action) - jdata, _ := json.Marshal(ev) - reader := bytes.NewReader(jdata) - if _, found := WebhookEvents[ev.Action]; !found { - return true - } - resp, err := http.Post(*FlagWebhookURL, "application/json", reader) - defer func() { - if resp != nil { - resp.Body.Close() // nolint: errcheck - } - }() - if err != nil || resp.StatusCode > 200 { - return false - } - return true -} From d14eb6c569a5b5ee36972d997b29f09fce6249ed Mon Sep 17 00:00:00 2001 From: Mohamed Al Ashaal Date: Wed, 5 Jun 2024 05:33:56 +0300 Subject: [PATCH 2/7] finalized the base code --- .env | 4 ++ README.md | 10 +++++ broker/drivers/memory/driver.go | 77 ++++++++++++++++++++++++++++++++ broker/drivers/memory/init.go | 11 +++++ broker/drivers/redis/driver.go | 70 +++++++++++++++++++++++++++++ broker/drivers/redis/init.go | 11 +++++ broker/registry.go | 38 ++++++++++++++++ broker/types.go | 12 +++++ config/config.go | 49 ++++++++++++++++++++ go.mod | 30 +++++++++++++ go.sum | 52 ++++++++++++++++++++++ handler_ws.go | 79 +++++++++++++++++++++++++++++++++ main.go | 50 +++++++++++++++++++++ types.go | 37 +++++++++++++++ 14 files changed, 530 insertions(+) create mode 100644 .env create mode 100644 README.md create mode 100644 broker/drivers/memory/driver.go create mode 100644 broker/drivers/memory/init.go create mode 100644 broker/drivers/redis/driver.go create mode 100644 broker/drivers/redis/init.go create mode 100644 broker/registry.go create mode 100644 broker/types.go create mode 100644 config/config.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 handler_ws.go create mode 100644 main.go create mode 100644 types.go diff --git a/.env b/.env new file mode 100644 index 0000000..07bdc9e --- /dev/null +++ b/.env @@ -0,0 +1,4 @@ +BROKER_DRIVER=redis +BROKER_DSN=redis://localhost +SERVER_LISTEN_ADDR=":3000" +INTERCEPTOR_ENDPOINT_URL="" diff --git a/README.md b/README.md new file mode 100644 index 0000000..364c474 --- /dev/null +++ b/README.md @@ -0,0 +1,10 @@ +TODOs +===== +- [ ] As a client, I want to identify myself. +- [ ] As a client, I want to join to channel(s). +- [ ] As a client, I want to leave from channel(s). +- [ ] As a client, I want to broadcast message(s) [optionally to channel(s) or specific client(s)]. +- [ ] As a client, I want to be acked about the result of my broadcasting (succeeded or failed). +- [ ] As a developer, I want to intercept the websocket messages. +- [ ] As a server, I understand the following status codes from the interceptor responses (100 "continue", 200 "ok but don't continue", anything else for error). +- [ ] As a developer, I want to broadcast message(s) to channel(s) or specific client(s). \ No newline at end of file diff --git a/broker/drivers/memory/driver.go b/broker/drivers/memory/driver.go new file mode 100644 index 0000000..c80523a --- /dev/null +++ b/broker/drivers/memory/driver.go @@ -0,0 +1,77 @@ +package memorybroker + +import ( + "context" + "github.com/savsgio/gotils/uuid" + "sync" +) + +type Driver struct { + sync.RWMutex + subscriptions map[string]map[string]chan any + cancelFuncs map[string]context.CancelFunc +} + +func (d *Driver) Connect(_ string) error { + d.subscriptions = make(map[string]map[string]chan any) + d.cancelFuncs = make(map[string]context.CancelFunc) + + return nil +} + +func (d *Driver) Subscribe(ctx context.Context, channels []string) (<-chan any, error) { + d.Lock() + defer d.Unlock() + + id := uuid.V4() + messagesChan := make(chan any) + ctx, cancel := context.WithCancel(ctx) + + d.cancelFuncs[id] = cancel + + for _, channel := range channels { + if _, found := d.subscriptions[channel]; !found { + d.subscriptions[channel] = make(map[string]chan any) + } + d.subscriptions[channel][id] = messagesChan + } + + go (func() { + select { + case <-ctx.Done(): + d.Lock() + close(messagesChan) + for _, channel := range channels { + delete(d.subscriptions[channel], id) + delete(d.cancelFuncs, id) + } + d.Unlock() + } + })() + + return messagesChan, nil +} + +func (d *Driver) Publish(_ context.Context, channel string, msg any) error { + d.Lock() + defer d.Unlock() + + for _, subscriber := range d.subscriptions[channel] { + go (func() { + subscriber <- msg + })() + } + + return nil +} + +func (d *Driver) Close() error { + d.Lock() + defer d.Unlock() + + for _, cancel := range d.cancelFuncs { + cancel() + } + + return nil +} diff --git a/broker/drivers/memory/init.go b/broker/drivers/memory/init.go new file mode 100644 index 0000000..d559659 --- /dev/null +++ b/broker/drivers/memory/init.go @@ -0,0 +1,11 @@ +package memorybroker + +import ( + "github.com/alash3al/wsify/broker" +) + +const name = "memory" + +func init() { + broker.Register(name, &Driver{}) +} diff --git a/broker/drivers/redis/driver.go b/broker/drivers/redis/driver.go new file mode 100644 index 0000000..852b4b2 --- /dev/null +++ b/broker/drivers/redis/driver.go @@ -0,0 +1,70 @@ +package redisbroker + +import ( + "context" + "encoding/json" + "fmt" + "github.com/redis/go-redis/v9" +) + +type Driver struct { + conn *redis.Client +} + +func (d *Driver) Connect(dsn string) error { + opts, err := redis.ParseURL(dsn) + if err != nil { + return err + } + + d.conn = redis.NewClient(opts) + + if err := d.conn.Ping(context.Background()).Err(); err != nil { + return err + } + + return nil +} + +func (d *Driver) Subscribe(ctx context.Context, channels []string) (<-chan any, error) { + pubSub := d.conn.Subscribe(ctx, channels...) + messagesChan := make(chan any) + + go (func() { + var msg any + for { + res, err := pubSub.ReceiveMessage(ctx) + if err != nil { + break + } + if err := json.Unmarshal([]byte(res.Payload), &msg); err != nil { + // TODO do something with the error + } + messagesChan <- msg + } + })() + + go (func() { + select { + case <-ctx.Done(): + if err := pubSub.Close(); err != nil { + // TODO handle this + fmt.Println("Unable to close from redis driver") + } + } + })() + + return messagesChan, nil +} + +func (d *Driver) Publish(ctx context.Context, channel string, msg any) error { + j, err := json.Marshal(msg) + if err != nil { + return err + } + return d.conn.Publish(ctx, channel, string(j)).Err() +} + +func (d *Driver) Close() error { + return d.conn.Close() +} diff --git a/broker/drivers/redis/init.go b/broker/drivers/redis/init.go new file mode 100644 index 0000000..fd14d55 --- /dev/null +++ b/broker/drivers/redis/init.go @@ -0,0 +1,11 @@ +package redisbroker + +import ( + "github.com/alash3al/wsify/broker" +) + +const name = "redis" + +func init() { + broker.Register(name, &Driver{}) +} diff --git a/broker/registry.go b/broker/registry.go new file mode 100644 index 0000000..934a510 --- /dev/null +++ b/broker/registry.go @@ -0,0 +1,38 @@ +package broker + +import ( + "fmt" + "sync" +) + +var ( + driversMap = map[string]Driver{} + driversMutex = &sync.RWMutex{} +) + +func Register(name string, driver Driver) { + driversMutex.Lock() + defer driversMutex.Unlock() + + if _, found := driversMap[name]; found { + panic(fmt.Errorf("specified driver (%s) already registered", name)) + } + + driversMap[name] = driver +} + +func Connect(name string, dsn string) (Driver, error) { + driversMutex.RLock() + defer driversMutex.RUnlock() + + driver, found := driversMap[name] + if !found { + return nil, fmt.Errorf("specified driver (%s) not registered", name) + } + + if err := driver.Connect(dsn); err != nil { + return nil, err + } + + return driver, nil +} diff --git a/broker/types.go b/broker/types.go new file mode 100644 index 0000000..2a7d43a --- /dev/null +++ b/broker/types.go @@ -0,0 +1,12 @@ +package broker + +import ( + "context" +) + +type Driver interface { + Connect(dsn string) error + Subscribe(ctx context.Context, channels []string) (<-chan any, error) + Publish(ctx context.Context, channel string, msg any) error + Close() error +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..df7fcf8 --- /dev/null +++ b/config/config.go @@ -0,0 +1,49 @@ +package config + +import ( + "github.com/joho/godotenv" + "log/slog" + "os" +) + +type Config struct { + logger *slog.Logger + brokerDriver string + brokerDSN string + webServerListenAddress string + interceptorEndpointURL string +} + +func NewFromEnv(envFilename string) (*Config, error) { + if err := godotenv.Load(envFilename); err != nil { + return nil, err + } + + return &Config{ + logger: slog.Default(), + brokerDriver: os.Getenv("BROKER_DRIVER"), + brokerDSN: os.Getenv("BROKER_DSN"), + webServerListenAddress: os.Getenv("SERVER_LISTEN_ADDR"), + interceptorEndpointURL: os.Getenv("INTERCEPTOR_ENDPOINT_URL"), + }, nil +} + +func (c *Config) GetLogger() *slog.Logger { + return c.logger +} + +func (c *Config) GetBrokerDriver() string { + return c.brokerDriver +} + +func (c *Config) GetBrokerDSN() string { + return c.brokerDSN +} + +func (c *Config) GetWebServerListenAddr() string { + return c.webServerListenAddress +} + +func (c *Config) GetInterceptorEndpointURL() string { + return c.interceptorEndpointURL +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..b075899 --- /dev/null +++ b/go.mod @@ -0,0 +1,30 @@ +module github.com/alash3al/wsify + +go 1.22.0 + +require ( + github.com/andybalholm/brotli v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/fasthttp/websocket v1.5.9 // indirect + github.com/gofiber/contrib/websocket v1.3.1 // indirect + github.com/gofiber/fiber/v2 v2.52.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/joho/godotenv v1.5.1 // indirect + github.com/klauspost/compress v1.17.8 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/redis/go-redis/v9 v9.5.2 // indirect + github.com/rivo/uniseg v0.4.7 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511 // indirect + github.com/urfave/cli/v2 v2.27.2 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.54.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect + github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..eb359c1 --- /dev/null +++ b/go.sum @@ -0,0 +1,52 @@ +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fasthttp/websocket v1.5.9 h1:9deGuzYcCRKjk940kNwSN6Hd14hk4zYwropm4UsUIUQ= +github.com/fasthttp/websocket v1.5.9/go.mod h1:NLzHBFur260OMuZHohOfYQwMTpR7sfSpUnuqKxMpgKA= +github.com/gofiber/contrib/websocket v1.3.1 h1:iINEnUIT7Wi1ttGWW5fY1fnKQlIEa5KTDXmMoedKinE= +github.com/gofiber/contrib/websocket v1.3.1/go.mod h1:oDLA6uM7x4hFq1zjy3US3HuvmrlWJKO5nrsw2ZKNSfY= +github.com/gofiber/fiber/v2 v2.52.4 h1:P+T+4iK7VaqUsq2PALYEfBBo6bJZ4q3FP8cZ84EggTM= +github.com/gofiber/fiber/v2 v2.52.4/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/redis/go-redis/v9 v9.5.2 h1:L0L3fcSNReTRGyZ6AqAEN0K56wYeYAwapBIhkvh0f3E= +github.com/redis/go-redis/v9 v9.5.2/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511 h1:KanIMPX0QdEdB4R3CiimCAbxFrhB3j7h0/OvpYGVQa8= +github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511/go.mod h1:sM7Mt7uEoCeFSCBM+qBrqvEo+/9vdmj19wzp3yzUhmg= +github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI= +github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.54.0 h1:cCL+ZZR3z3HPLMVfEYVUMtJqVaui0+gu7Lx63unHwS0= +github.com/valyala/fasthttp v1.54.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 h1:+qGGcbkzsfDQNPPe9UDgpxAWQrhbbBXOYJFQDq/dtJw= +github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913/go.mod h1:4aEEwZQutDLsQv2Deui4iYQ6DWTxR14g6m8Wv88+Xqk= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/handler_ws.go b/handler_ws.go new file mode 100644 index 0000000..8d26e4b --- /dev/null +++ b/handler_ws.go @@ -0,0 +1,79 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "github.com/alash3al/wsify/broker" + "github.com/alash3al/wsify/config" + "github.com/gofiber/contrib/websocket" +) + +func handleWebSocketRoute(cfg *config.Config, brokerConn broker.Driver) func(*websocket.Conn) { + return func(conn *websocket.Conn) { + var parsedMsg WebSocketMessage + cop := make(chan any) + defer close(cop) + + go (func() { + for msg := range cop { + if err := conn.WriteJSON(msg); err != nil { + // TODO handle this + cfg.GetLogger().Error(err.Error(), "step", "writeJson") + } + } + })() + + subscribedChannels := map[string]context.CancelFunc{} + + for { + _, rawMsg, err := conn.ReadMessage() + if err != nil { + break + } + + if err := json.Unmarshal(rawMsg, &parsedMsg); err != nil { + // TODO handle this error + cfg.GetLogger().Error(err.Error(), "step", "jsonUnmarshal") + } + + switch parsedMsg.Command { + case WebSocketMessageCommandTypeBroadcast: + for _, channel := range parsedMsg.GetAvailableChannels() { + if err := brokerConn.Publish(context.Background(), channel, parsedMsg.GetAvailableWebSocketMessage()); err != nil { + // TODO handle this error + cfg.GetLogger().Error(err.Error(), "step", "publish") + } + } + case WebSocketMessageCommandTypeLeave: + fmt.Println("Leaving ...") + fmt.Println(subscribedChannels) + for _, channel := range parsedMsg.GetAvailableChannels() { + if cancel, found := subscribedChannels[channel]; found { + cancel() + delete(subscribedChannels, channel) + } + } + fmt.Println(subscribedChannels) + fmt.Println("=========================") + case WebSocketMessageCommandTypeJoin: + for _, channel := range parsedMsg.GetAvailableChannels() { + ctx, cancel := context.WithCancel(context.Background()) + subscribedChannels[channel] = cancel + msgsChan, err := brokerConn.Subscribe(ctx, []string{channel}) + if err != nil { + // TODO handle this error + cfg.GetLogger().Error(err.Error(), "step", "Subscribe") + } + + go (func() { + for msg := range msgsChan { + cop <- msg + } + fmt.Println("Channel Listener Ended: ", channel) + })() + } + } + } + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..a0adb8e --- /dev/null +++ b/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "github.com/alash3al/wsify/broker" + "github.com/alash3al/wsify/config" + "github.com/gofiber/contrib/websocket" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/logger" + "os" + + _ "github.com/alash3al/wsify/broker/drivers/memory" + _ "github.com/alash3al/wsify/broker/drivers/redis" +) + +func main() { + envFilename := ".env" + if len(os.Args) > 1 { + envFilename = os.Args[1] + } + + cfg, err := config.NewFromEnv(envFilename) + if err != nil { + panic(err.Error()) + } + + brokerConn, err := broker.Connect(cfg.GetBrokerDriver(), cfg.GetBrokerDSN()) + if err != nil { + panic(err.Error()) + } + + server := fiber.New() + + server.Use(logger.New()) + + // TODO check with interceptor + server.Use("/ws", func(c *fiber.Ctx) error { + if websocket.IsWebSocketUpgrade(c) { + return c.Next() + } + return fiber.ErrUpgradeRequired + }) + + // TODO handle client id + // TODO set connection close handler + // TODO more code cleanup + // TODO integrate interceptor (with retries if possible) + server.Get("/ws/:clientId", websocket.New(handleWebSocketRoute(cfg, brokerConn))) + + panic(server.Listen(cfg.GetWebServerListenAddr())) +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..ae8f65e --- /dev/null +++ b/types.go @@ -0,0 +1,37 @@ +package main + +import "fmt" + +type WebSocketMessageCommandType string + +const ( + WebSocketMessageCommandTypeJoin = WebSocketMessageCommandType("join") + WebSocketMessageCommandTypeLeave = WebSocketMessageCommandType("leave") + WebSocketMessageCommandTypeBroadcast = WebSocketMessageCommandType("broadcast") +) + +type WebSocketMessage struct { + Command WebSocketMessageCommandType `json:"command"` + Args map[string]any `json:"args"` +} + +func (m WebSocketMessage) GetAvailableChannels() (result []string) { + if m.Args["channels"] == nil { + return []string{} + } + + switch val := m.Args["channels"].(type) { + case []string: + result = val + case []any: + for _, v := range val { + result = append(result, fmt.Sprintf("%v", v)) + } + } + + return +} + +func (m WebSocketMessage) GetAvailableWebSocketMessage() any { + return m.Args["message"] +} From dec63b744807a21bcdafc2deab6ac5f5b806066e Mon Sep 17 00:00:00 2001 From: Mohamed Al Ashaal Date: Wed, 5 Jun 2024 05:42:08 +0300 Subject: [PATCH 3/7] updated README --- README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 364c474..c609ecf 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,11 @@ TODOs ===== - [ ] As a client, I want to identify myself. -- [ ] As a client, I want to join to channel(s). -- [ ] As a client, I want to leave from channel(s). -- [ ] As a client, I want to broadcast message(s) [optionally to channel(s) or specific client(s)]. -- [ ] As a client, I want to be acked about the result of my broadcasting (succeeded or failed). +- [x] As a client, I want to join to channel(s). +- [x] As a client, I want to leave from channel(s). +- [x] As a client, I want to broadcast message(s) to channel(s). +- [ ] As a client, I must reset my state if the server closed the connection. - [ ] As a developer, I want to intercept the websocket messages. - [ ] As a server, I understand the following status codes from the interceptor responses (100 "continue", 200 "ok but don't continue", anything else for error). -- [ ] As a developer, I want to broadcast message(s) to channel(s) or specific client(s). \ No newline at end of file +- [ ] As a developer, I want to broadcast message(s) to channel(s). +- [ ] As a developer, I have to consider my client id as a channel and I have to automatically join it. \ No newline at end of file From c0c9258154a4a354c39c80a9600abe3eb8d9090e Mon Sep 17 00:00:00 2001 From: Mohamed Al Ashaal Date: Wed, 5 Jun 2024 15:03:01 +0300 Subject: [PATCH 4/7] did some new enhancements and code refactor --- .env | 10 ++++ broker/drivers/memory/driver.go | 49 +++++++++------- broker/drivers/redis/driver.go | 36 ++++++------ broker/types.go | 4 +- config/config.go | 2 +- go.mod | 33 +++++------ go.sum | 57 ++++++++++--------- handler_ws.go | 79 -------------------------- main.go | 29 ++++------ routes/ws.go | 39 +++++++++++++ session/message.go | 37 ++++++++++++ session/session.go | 99 +++++++++++++++++++++++++++++++++ types.go | 37 ------------ 13 files changed, 287 insertions(+), 224 deletions(-) delete mode 100644 handler_ws.go create mode 100644 routes/ws.go create mode 100644 session/message.go create mode 100644 session/session.go delete mode 100644 types.go diff --git a/.env b/.env index 07bdc9e..1714801 100644 --- a/.env +++ b/.env @@ -1,4 +1,14 @@ +# the driver to be used as a message broker +# currently supported drivers are: "memory", "redis" BROKER_DRIVER=redis + +# the driver data source name "connection string" in URL style +# for "memory", you don't have to provide it +# for "redis", it should be in the form of "redis://[user[@pass]:]host[:port]/[database_number]" BROKER_DSN=redis://localhost + +# the webserver address to listen on SERVER_LISTEN_ADDR=":3000" + +# the interceptor endpoint url which will receive all POST requests for interceptions INTERCEPTOR_ENDPOINT_URL="" diff --git a/broker/drivers/memory/driver.go b/broker/drivers/memory/driver.go index c80523a..84d7ebf 100644 --- a/broker/drivers/memory/driver.go +++ b/broker/drivers/memory/driver.go @@ -8,51 +8,60 @@ import ( type Driver struct { sync.RWMutex - subscriptions map[string]map[string]chan any - cancelFuncs map[string]context.CancelFunc + subscriptions map[string]map[string]chan []byte + doneChannels map[string]chan struct{} } func (d *Driver) Connect(_ string) error { - d.subscriptions = make(map[string]map[string]chan any) - d.cancelFuncs = make(map[string]context.CancelFunc) + d.subscriptions = make(map[string]map[string]chan []byte) + d.doneChannels = make(map[string]chan struct{}) return nil } -func (d *Driver) Subscribe(ctx context.Context, channels []string) (<-chan any, error) { +func (d *Driver) Subscribe(ctx context.Context, channels []string) (<-chan []byte, chan struct{}, error) { d.Lock() defer d.Unlock() id := uuid.V4() - messagesChan := make(chan any) - ctx, cancel := context.WithCancel(ctx) + messagesChan := make(chan []byte) + doneChan := make(chan struct{}) - d.cancelFuncs[id] = cancel + d.doneChannels[id] = doneChan for _, channel := range channels { if _, found := d.subscriptions[channel]; !found { - d.subscriptions[channel] = make(map[string]chan any) + d.subscriptions[channel] = make(map[string]chan []byte) } + d.subscriptions[channel][id] = messagesChan } + done := func() { + d.Lock() + defer d.Unlock() + + close(messagesChan) + + for _, channel := range channels { + delete(d.subscriptions[channel], id) + delete(d.doneChannels, id) + } + } + go (func() { select { + case <-doneChan: + done() case <-ctx.Done(): - d.Lock() - close(messagesChan) - for _, channel := range channels { - delete(d.subscriptions[channel], id) - delete(d.cancelFuncs, id) - } - d.Unlock() + done() } })() - return messagesChan, nil + return messagesChan, doneChan, nil } -func (d *Driver) Publish(_ context.Context, channel string, msg any) error { +func (d *Driver) Publish(_ context.Context, channel string, msg []byte) error { d.Lock() defer d.Unlock() @@ -69,8 +78,8 @@ func (d *Driver) Close() error { d.Lock() defer d.Unlock() - for _, cancel := range d.cancelFuncs { - cancel() + for _, ch := range d.doneChannels { + ch <- struct{}{} } return nil diff --git a/broker/drivers/redis/driver.go b/broker/drivers/redis/driver.go index 852b4b2..66880dc 100644 --- a/broker/drivers/redis/driver.go +++ b/broker/drivers/redis/driver.go @@ -2,7 +2,6 @@ package redisbroker import ( "context" - "encoding/json" "fmt" "github.com/redis/go-redis/v9" ) @@ -26,43 +25,42 @@ func (d *Driver) Connect(dsn string) error { return nil } -func (d *Driver) Subscribe(ctx context.Context, channels []string) (<-chan any, error) { +func (d *Driver) Subscribe(ctx context.Context, channels []string) (<-chan []byte, chan struct{}, error) { pubSub := d.conn.Subscribe(ctx, channels...) - messagesChan := make(chan any) + messagesChan := make(chan []byte) + doneChan := make(chan struct{}) go (func() { - var msg any for { res, err := pubSub.ReceiveMessage(ctx) if err != nil { break } - if err := json.Unmarshal([]byte(res.Payload), &msg); err != nil { - // TODO do something with the error - } - messagesChan <- msg + messagesChan <- []byte(res.Payload) } })() + done := func() { + if err := pubSub.Close(); err != nil { + // TODO handle this + fmt.Println("Unable to close from redis pubSub") + } + } + go (func() { select { case <-ctx.Done(): - if err := pubSub.Close(); err != nil { - // TODO handle this - fmt.Println("Unable to close from redis driver") - } + done() + case <-doneChan: + done() } })() - return messagesChan, nil + return messagesChan, doneChan, nil } -func (d *Driver) Publish(ctx context.Context, channel string, msg any) error { - j, err := json.Marshal(msg) - if err != nil { - return err - } - return d.conn.Publish(ctx, channel, string(j)).Err() +func (d *Driver) Publish(ctx context.Context, channel string, msg []byte) error { + return d.conn.Publish(ctx, channel, string(msg)).Err() } func (d *Driver) Close() error { diff --git a/broker/types.go b/broker/types.go index 2a7d43a..10e95fc 100644 --- a/broker/types.go +++ b/broker/types.go @@ -6,7 +6,7 @@ import ( type Driver interface { Connect(dsn string) error - Subscribe(ctx context.Context, channels []string) (<-chan any, error) - Publish(ctx context.Context, channel string, msg any) error + Subscribe(ctx context.Context, channels []string) (<-chan []byte, chan struct{}, error) + Publish(ctx context.Context, channel string, msg []byte) error Close() error } diff --git a/config/config.go b/config/config.go index df7fcf8..1cfa800 100644 --- a/config/config.go +++ b/config/config.go @@ -20,7 +20,7 @@ func NewFromEnv(envFilename string) (*Config, error) { } return &Config{ - logger: slog.Default(), + logger: slog.New(slog.NewJSONHandler(os.Stdout, nil)), brokerDriver: os.Getenv("BROKER_DRIVER"), brokerDSN: os.Getenv("BROKER_DSN"), webServerListenAddress: os.Getenv("SERVER_LISTEN_ADDR"), diff --git a/go.mod b/go.mod index b075899..1ecd175 100644 --- a/go.mod +++ b/go.mod @@ -3,28 +3,25 @@ module github.com/alash3al/wsify go 1.22.0 require ( - github.com/andybalholm/brotli v1.1.0 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect + github.com/joho/godotenv v1.5.1 + github.com/labstack/echo/v4 v4.12.0 + github.com/redis/go-redis/v9 v9.5.2 + github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511 + golang.org/x/net v0.26.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/fasthttp/websocket v1.5.9 // indirect - github.com/gofiber/contrib/websocket v1.3.1 // indirect - github.com/gofiber/fiber/v2 v2.52.4 // indirect + github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/google/uuid v1.6.0 // indirect - github.com/joho/godotenv v1.5.1 // indirect - github.com/klauspost/compress v1.17.8 // indirect + github.com/labstack/gommon v0.4.2 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/mattn/go-runewidth v0.0.15 // indirect - github.com/redis/go-redis/v9 v9.5.2 // indirect - github.com/rivo/uniseg v0.4.7 // indirect - github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511 // indirect - github.com/urfave/cli/v2 v2.27.2 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.54.0 // indirect - github.com/valyala/tcplisten v1.0.0 // indirect - github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect - golang.org/x/net v0.26.0 // indirect + github.com/valyala/fasttemplate v1.2.2 // indirect + golang.org/x/crypto v0.24.0 // indirect golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + golang.org/x/time v0.5.0 // indirect ) diff --git a/go.sum b/go.sum index eb359c1..6aa82d7 100644 --- a/go.sum +++ b/go.sum @@ -1,52 +1,51 @@ -github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= -github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= -github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/fasthttp/websocket v1.5.9 h1:9deGuzYcCRKjk940kNwSN6Hd14hk4zYwropm4UsUIUQ= -github.com/fasthttp/websocket v1.5.9/go.mod h1:NLzHBFur260OMuZHohOfYQwMTpR7sfSpUnuqKxMpgKA= -github.com/gofiber/contrib/websocket v1.3.1 h1:iINEnUIT7Wi1ttGWW5fY1fnKQlIEa5KTDXmMoedKinE= -github.com/gofiber/contrib/websocket v1.3.1/go.mod h1:oDLA6uM7x4hFq1zjy3US3HuvmrlWJKO5nrsw2ZKNSfY= -github.com/gofiber/fiber/v2 v2.52.4 h1:P+T+4iK7VaqUsq2PALYEfBBo6bJZ4q3FP8cZ84EggTM= -github.com/gofiber/fiber/v2 v2.52.4/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= +github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= +github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/labstack/echo/v4 v4.12.0 h1:IKpw49IMryVB2p1a4dzwlhP1O2Tf2E0Ir/450lH+kI0= +github.com/labstack/echo/v4 v4.12.0/go.mod h1:UP9Cr2DJXbOK3Kr9ONYzNowSh7HP0aG0ShAyycHSJvM= +github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= +github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= -github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.5.2 h1:L0L3fcSNReTRGyZ6AqAEN0K56wYeYAwapBIhkvh0f3E= github.com/redis/go-redis/v9 v9.5.2/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= -github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= -github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= -github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= -github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= -github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511 h1:KanIMPX0QdEdB4R3CiimCAbxFrhB3j7h0/OvpYGVQa8= github.com/savsgio/gotils v0.0.0-20240303185622-093b76447511/go.mod h1:sM7Mt7uEoCeFSCBM+qBrqvEo+/9vdmj19wzp3yzUhmg= -github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI= -github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.54.0 h1:cCL+ZZR3z3HPLMVfEYVUMtJqVaui0+gu7Lx63unHwS0= -github.com/valyala/fasthttp v1.54.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= -github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= -github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= -github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 h1:+qGGcbkzsfDQNPPe9UDgpxAWQrhbbBXOYJFQDq/dtJw= -github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913/go.mod h1:4aEEwZQutDLsQv2Deui4iYQ6DWTxR14g6m8Wv88+Xqk= +github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= +github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/handler_ws.go b/handler_ws.go deleted file mode 100644 index 8d26e4b..0000000 --- a/handler_ws.go +++ /dev/null @@ -1,79 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "github.com/alash3al/wsify/broker" - "github.com/alash3al/wsify/config" - "github.com/gofiber/contrib/websocket" -) - -func handleWebSocketRoute(cfg *config.Config, brokerConn broker.Driver) func(*websocket.Conn) { - return func(conn *websocket.Conn) { - var parsedMsg WebSocketMessage - cop := make(chan any) - defer close(cop) - - go (func() { - for msg := range cop { - if err := conn.WriteJSON(msg); err != nil { - // TODO handle this - cfg.GetLogger().Error(err.Error(), "step", "writeJson") - } - } - })() - - subscribedChannels := map[string]context.CancelFunc{} - - for { - _, rawMsg, err := conn.ReadMessage() - if err != nil { - break - } - - if err := json.Unmarshal(rawMsg, &parsedMsg); err != nil { - // TODO handle this error - cfg.GetLogger().Error(err.Error(), "step", "jsonUnmarshal") - } - - switch parsedMsg.Command { - case WebSocketMessageCommandTypeBroadcast: - for _, channel := range parsedMsg.GetAvailableChannels() { - if err := brokerConn.Publish(context.Background(), channel, parsedMsg.GetAvailableWebSocketMessage()); err != nil { - // TODO handle this error - cfg.GetLogger().Error(err.Error(), "step", "publish") - } - } - case WebSocketMessageCommandTypeLeave: - fmt.Println("Leaving ...") - fmt.Println(subscribedChannels) - for _, channel := range parsedMsg.GetAvailableChannels() { - if cancel, found := subscribedChannels[channel]; found { - cancel() - delete(subscribedChannels, channel) - } - } - fmt.Println(subscribedChannels) - fmt.Println("=========================") - case WebSocketMessageCommandTypeJoin: - for _, channel := range parsedMsg.GetAvailableChannels() { - ctx, cancel := context.WithCancel(context.Background()) - subscribedChannels[channel] = cancel - msgsChan, err := brokerConn.Subscribe(ctx, []string{channel}) - if err != nil { - // TODO handle this error - cfg.GetLogger().Error(err.Error(), "step", "Subscribe") - } - - go (func() { - for msg := range msgsChan { - cop <- msg - } - fmt.Println("Channel Listener Ended: ", channel) - })() - } - } - } - } -} diff --git a/main.go b/main.go index a0adb8e..d9e07ac 100644 --- a/main.go +++ b/main.go @@ -3,9 +3,10 @@ package main import ( "github.com/alash3al/wsify/broker" "github.com/alash3al/wsify/config" - "github.com/gofiber/contrib/websocket" - "github.com/gofiber/fiber/v2" - "github.com/gofiber/fiber/v2/middleware/logger" + "github.com/alash3al/wsify/routes" + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" + "log" "os" _ "github.com/alash3al/wsify/broker/drivers/memory" @@ -28,23 +29,13 @@ func main() { panic(err.Error()) } - server := fiber.New() + srv := echo.New() + srv.HideBanner = true - server.Use(logger.New()) + srv.Use(middleware.CORS()) + srv.Use(middleware.Logger()) - // TODO check with interceptor - server.Use("/ws", func(c *fiber.Ctx) error { - if websocket.IsWebSocketUpgrade(c) { - return c.Next() - } - return fiber.ErrUpgradeRequired - }) + srv.GET("/session/:id", routes.WebsocketRouteHandler(cfg, brokerConn)) - // TODO handle client id - // TODO set connection close handler - // TODO more code cleanup - // TODO integrate interceptor (with retries if possible) - server.Get("/ws/:clientId", websocket.New(handleWebSocketRoute(cfg, brokerConn))) - - panic(server.Listen(cfg.GetWebServerListenAddr())) + log.Fatal(srv.Start(cfg.GetWebServerListenAddr())) } diff --git a/routes/ws.go b/routes/ws.go new file mode 100644 index 0000000..a89b346 --- /dev/null +++ b/routes/ws.go @@ -0,0 +1,39 @@ +package routes + +import ( + "github.com/alash3al/wsify/broker" + "github.com/alash3al/wsify/config" + "github.com/alash3al/wsify/session" + "github.com/labstack/echo/v4" + "golang.org/x/net/websocket" + "net/http" +) + +func WebsocketRouteHandler(cfg *config.Config, broker broker.Driver) echo.HandlerFunc { + return func(c echo.Context) error { + return echo.WrapHandler(websocket.Server{ + Handshake: func(c *websocket.Config, request *http.Request) error { return nil }, + Handler: websocket.Handler(func(conn *websocket.Conn) { + session := session.Session{ + Context: conn.Request().Context(), + Broker: broker, + Conn: conn, + DoneChannels: make(map[string]chan struct{}), + ErrChan: make(chan error), + Writer: make(chan []byte), + } + + go (func() { + for err := range session.ErrChan { + cfg.GetLogger().Error(err.Error(), "func", "sessionErrorListener") + } + })() + + if err := session.Serve(); err != nil { + cfg.GetLogger().Error(err.Error(), "func", "session.Serve") + return + } + }), + })(c) + } +} diff --git a/session/message.go b/session/message.go new file mode 100644 index 0000000..b443fc1 --- /dev/null +++ b/session/message.go @@ -0,0 +1,37 @@ +package session + +import "fmt" + +type MessageCommandType string + +const ( + MessageCommandTypeJoin = MessageCommandType("join") + MessageCommandTypeLeave = MessageCommandType("leave") + MessageCommandTypeBroadcast = MessageCommandType("broadcast") +) + +type Message struct { + Command MessageCommandType `json:"command"` + Args map[string]any `json:"args"` +} + +func (m Message) GetArgsChannels() (result []string) { + if m.Args["channels"] == nil { + return []string{} + } + + switch val := m.Args["channels"].(type) { + case []string: + result = val + case []any: + for _, v := range val { + result = append(result, fmt.Sprintf("%v", v)) + } + } + + return +} + +func (m Message) GetArgsContent() any { + return m.Args["content"] +} diff --git a/session/session.go b/session/session.go new file mode 100644 index 0000000..24d6e84 --- /dev/null +++ b/session/session.go @@ -0,0 +1,99 @@ +package session + +import ( + "context" + "encoding/json" + "errors" + "github.com/alash3al/wsify/broker" + "golang.org/x/net/websocket" + "io" +) + +type Session struct { + Context context.Context + Broker broker.Driver + Conn *websocket.Conn + Message Message + DoneChannels map[string]chan struct{} + ErrChan chan error + Writer chan []byte +} + +func (s *Session) Serve() error { + defer (func() { + _ = s.Conn.Close() + + close(s.ErrChan) + close(s.Writer) + + for _, ch := range s.DoneChannels { + ch <- struct{}{} + } + })() + + go (func() { + for output := range s.Writer { + if err := websocket.Message.Send(s.Conn, string(output)); err != nil { + s.ErrChan <- err + } + } + })() + + for { + if err := websocket.JSON.Receive(s.Conn, &s.Message); err != nil { + if errors.Is(err, io.EOF) { + return err + } + + s.ErrChan <- err + } + + switch s.Message.Command { + case MessageCommandTypeJoin: + s.onJoin() + case MessageCommandTypeLeave: + s.onLeave() + case MessageCommandTypeBroadcast: + s.onBroadcast() + } + } +} + +func (s *Session) onJoin() { + for _, channel := range s.Message.GetArgsChannels() { + feed, done, err := s.Broker.Subscribe(s.Context, []string{channel}) + if err != nil { + s.ErrChan <- err + continue + } + + s.DoneChannels[channel] = done + + go (func() { + for msg := range feed { + s.Writer <- msg + } + })() + } +} + +func (s *Session) onLeave() { + for _, channel := range s.Message.GetArgsChannels() { + s.DoneChannels[channel] <- struct{}{} + delete(s.DoneChannels, channel) + } +} + +func (s *Session) onBroadcast() { + for _, channel := range s.Message.GetArgsChannels() { + j, err := json.Marshal(s.Message.GetArgsContent()) + if err != nil { + s.ErrChan <- err + continue + } + + if err := s.Broker.Publish(s.Context, channel, j); err != nil { + s.ErrChan <- err + } + } +} diff --git a/types.go b/types.go deleted file mode 100644 index ae8f65e..0000000 --- a/types.go +++ /dev/null @@ -1,37 +0,0 @@ -package main - -import "fmt" - -type WebSocketMessageCommandType string - -const ( - WebSocketMessageCommandTypeJoin = WebSocketMessageCommandType("join") - WebSocketMessageCommandTypeLeave = WebSocketMessageCommandType("leave") - WebSocketMessageCommandTypeBroadcast = WebSocketMessageCommandType("broadcast") -) - -type WebSocketMessage struct { - Command WebSocketMessageCommandType `json:"command"` - Args map[string]any `json:"args"` -} - -func (m WebSocketMessage) GetAvailableChannels() (result []string) { - if m.Args["channels"] == nil { - return []string{} - } - - switch val := m.Args["channels"].(type) { - case []string: - result = val - case []any: - for _, v := range val { - result = append(result, fmt.Sprintf("%v", v)) - } - } - - return -} - -func (m WebSocketMessage) GetAvailableWebSocketMessage() any { - return m.Args["message"] -} From d3b0e592ec8a0ca0e0beb0b61e1570a21e41fc57 Mon Sep 17 00:00:00 2001 From: Mohamed Al Ashaal Date: Wed, 5 Jun 2024 15:55:11 +0300 Subject: [PATCH 5/7] added publish endpoint added publishing key --- .env | 3 ++ broker/drivers/memory/driver.go | 19 +++++------ broker/drivers/redis/driver.go | 10 ++---- broker/types.go | 2 +- config/config.go | 13 +++++--- main.go | 3 +- routes/publish.go | 45 ++++++++++++++++++++++++++ routes/ws.go | 10 +++--- session/message.go | 19 +++-------- session/session.go | 57 ++++++++++++++++++--------------- 10 files changed, 111 insertions(+), 70 deletions(-) create mode 100644 routes/publish.go diff --git a/.env b/.env index 1714801..75abb6f 100644 --- a/.env +++ b/.env @@ -12,3 +12,6 @@ SERVER_LISTEN_ADDR=":3000" # the interceptor endpoint url which will receive all POST requests for interceptions INTERCEPTOR_ENDPOINT_URL="" + +# the key that must match '/publish?key=$VALUE' in order to allow publishing +SERVER_PUBLISHING_KEY="secret" diff --git a/broker/drivers/memory/driver.go b/broker/drivers/memory/driver.go index 84d7ebf..88e958c 100644 --- a/broker/drivers/memory/driver.go +++ b/broker/drivers/memory/driver.go @@ -19,7 +19,7 @@ func (d *Driver) Connect(_ string) error { return nil } -func (d *Driver) Subscribe(ctx context.Context, channels []string) (<-chan []byte, chan struct{}, error) { +func (d *Driver) Subscribe(ctx context.Context, channel string) (<-chan []byte, chan struct{}, error) { d.Lock() defer d.Unlock() @@ -29,24 +29,21 @@ func (d *Driver) Subscribe(ctx context.Context, channels []string) (<-chan []byt d.doneChannels[id] = doneChan - for _, channel := range channels { - if _, found := d.subscriptions[channel]; !found { - d.subscriptions[channel] = make(map[string]chan []byte) - } - - d.subscriptions[channel][id] = messagesChan + if _, found := d.subscriptions[channel]; !found { + d.subscriptions[channel] = make(map[string]chan []byte) } + d.subscriptions[channel][id] = messagesChan + done := func() { d.Lock() defer d.Unlock() close(messagesChan) - for _, channel := range channels { - delete(d.subscriptions[channel], id) - delete(d.doneChannels, id) - } + delete(d.subscriptions[channel], id) + delete(d.doneChannels, id) + } go (func() { diff --git a/broker/drivers/redis/driver.go b/broker/drivers/redis/driver.go index 66880dc..8cf5fd1 100644 --- a/broker/drivers/redis/driver.go +++ b/broker/drivers/redis/driver.go @@ -2,7 +2,6 @@ package redisbroker import ( "context" - "fmt" "github.com/redis/go-redis/v9" ) @@ -25,8 +24,8 @@ func (d *Driver) Connect(dsn string) error { return nil } -func (d *Driver) Subscribe(ctx context.Context, channels []string) (<-chan []byte, chan struct{}, error) { - pubSub := d.conn.Subscribe(ctx, channels...) +func (d *Driver) Subscribe(ctx context.Context, channel string) (<-chan []byte, chan struct{}, error) { + pubSub := d.conn.Subscribe(ctx, channel) messagesChan := make(chan []byte) doneChan := make(chan struct{}) @@ -41,10 +40,7 @@ func (d *Driver) Subscribe(ctx context.Context, channels []string) (<-chan []byt })() done := func() { - if err := pubSub.Close(); err != nil { - // TODO handle this - fmt.Println("Unable to close from redis pubSub") - } + _ = pubSub.Close() } go (func() { diff --git a/broker/types.go b/broker/types.go index 10e95fc..573c53b 100644 --- a/broker/types.go +++ b/broker/types.go @@ -6,7 +6,7 @@ import ( type Driver interface { Connect(dsn string) error - Subscribe(ctx context.Context, channels []string) (<-chan []byte, chan struct{}, error) + Subscribe(ctx context.Context, channel string) (<-chan []byte, chan struct{}, error) Publish(ctx context.Context, channel string, msg []byte) error Close() error } diff --git a/config/config.go b/config/config.go index 1cfa800..070dd7d 100644 --- a/config/config.go +++ b/config/config.go @@ -10,8 +10,9 @@ type Config struct { logger *slog.Logger brokerDriver string brokerDSN string - webServerListenAddress string interceptorEndpointURL string + webServerListenAddress string + webServerPublishingKey string } func NewFromEnv(envFilename string) (*Config, error) { @@ -23,8 +24,9 @@ func NewFromEnv(envFilename string) (*Config, error) { logger: slog.New(slog.NewJSONHandler(os.Stdout, nil)), brokerDriver: os.Getenv("BROKER_DRIVER"), brokerDSN: os.Getenv("BROKER_DSN"), - webServerListenAddress: os.Getenv("SERVER_LISTEN_ADDR"), interceptorEndpointURL: os.Getenv("INTERCEPTOR_ENDPOINT_URL"), + webServerListenAddress: os.Getenv("SERVER_LISTEN_ADDR"), + webServerPublishingKey: os.Getenv("SERVER_PUBLISHING_KEY"), }, nil } @@ -39,11 +41,12 @@ func (c *Config) GetBrokerDriver() string { func (c *Config) GetBrokerDSN() string { return c.brokerDSN } +func (c *Config) GetInterceptorEndpointURL() string { + return c.interceptorEndpointURL +} func (c *Config) GetWebServerListenAddr() string { return c.webServerListenAddress } -func (c *Config) GetInterceptorEndpointURL() string { - return c.interceptorEndpointURL -} +func (c *Config) GetWebServerPublishingKey() string { return c.webServerPublishingKey } diff --git a/main.go b/main.go index d9e07ac..c2ccb00 100644 --- a/main.go +++ b/main.go @@ -35,7 +35,8 @@ func main() { srv.Use(middleware.CORS()) srv.Use(middleware.Logger()) - srv.GET("/session/:id", routes.WebsocketRouteHandler(cfg, brokerConn)) + srv.GET("/ws/:id", routes.WebsocketRouteHandler(cfg, brokerConn)) + srv.POST("/publish", routes.PublishHandler(cfg, brokerConn)) log.Fatal(srv.Start(cfg.GetWebServerListenAddr())) } diff --git a/routes/publish.go b/routes/publish.go new file mode 100644 index 0000000..3c3790b --- /dev/null +++ b/routes/publish.go @@ -0,0 +1,45 @@ +package routes + +import ( + "encoding/json" + "github.com/alash3al/wsify/broker" + "github.com/alash3al/wsify/config" + "github.com/labstack/echo/v4" + "net/http" + "strings" +) + +func PublishHandler(cfg *config.Config, brokerConn broker.Driver) echo.HandlerFunc { + return func(c echo.Context) error { + if c.QueryParam("key") != cfg.GetWebServerPublishingKey() { + return c.NoContent(http.StatusForbidden) + } + + if strings.ToLower(c.Request().Header.Get("Content-Type")) != "application/json" { + return c.NoContent(http.StatusUnsupportedMediaType) + } + + var msg struct { + Channel string `json:"channel"` + Content any `json:"content"` + } + + if err := c.Bind(&msg); err != nil { + cfg.GetLogger().Error(err.Error(), "func", "PublishHandler.Bind") + return c.NoContent(http.StatusBadRequest) + } + + j, err := json.Marshal(msg.Content) + if err != nil { + cfg.GetLogger().Error(err.Error(), "func", "PublishHandler.json.Unmarshal") + return c.NoContent(http.StatusInternalServerError) + } + + if err := brokerConn.Publish(c.Request().Context(), msg.Channel, j); err != nil { + cfg.GetLogger().Error(err.Error(), "func", "PublishHandler.broker.Publish") + return c.NoContent(http.StatusInternalServerError) + } + + return c.NoContent(http.StatusCreated) + } +} diff --git a/routes/ws.go b/routes/ws.go index a89b346..48306a3 100644 --- a/routes/ws.go +++ b/routes/ws.go @@ -9,14 +9,14 @@ import ( "net/http" ) -func WebsocketRouteHandler(cfg *config.Config, broker broker.Driver) echo.HandlerFunc { +func WebsocketRouteHandler(cfg *config.Config, brokerConn broker.Driver) echo.HandlerFunc { return func(c echo.Context) error { return echo.WrapHandler(websocket.Server{ Handshake: func(c *websocket.Config, request *http.Request) error { return nil }, Handler: websocket.Handler(func(conn *websocket.Conn) { - session := session.Session{ + sess := session.Session{ Context: conn.Request().Context(), - Broker: broker, + Broker: brokerConn, Conn: conn, DoneChannels: make(map[string]chan struct{}), ErrChan: make(chan error), @@ -24,12 +24,12 @@ func WebsocketRouteHandler(cfg *config.Config, broker broker.Driver) echo.Handle } go (func() { - for err := range session.ErrChan { + for err := range sess.ErrChan { cfg.GetLogger().Error(err.Error(), "func", "sessionErrorListener") } })() - if err := session.Serve(); err != nil { + if err := sess.Serve(); err != nil { cfg.GetLogger().Error(err.Error(), "func", "session.Serve") return } diff --git a/session/message.go b/session/message.go index b443fc1..badad52 100644 --- a/session/message.go +++ b/session/message.go @@ -1,6 +1,6 @@ package session -import "fmt" +import "strings" type MessageCommandType string @@ -15,21 +15,10 @@ type Message struct { Args map[string]any `json:"args"` } -func (m Message) GetArgsChannels() (result []string) { - if m.Args["channels"] == nil { - return []string{} - } +func (m Message) GetArgsChannel() string { + s, _ := m.Args["channel"].(string) - switch val := m.Args["channels"].(type) { - case []string: - result = val - case []any: - for _, v := range val { - result = append(result, fmt.Sprintf("%v", v)) - } - } - - return + return strings.TrimSpace(s) } func (m Message) GetArgsContent() any { diff --git a/session/session.go b/session/session.go index 24d6e84..6daa61c 100644 --- a/session/session.go +++ b/session/session.go @@ -35,6 +35,7 @@ func (s *Session) Serve() error { for output := range s.Writer { if err := websocket.Message.Send(s.Conn, string(output)); err != nil { s.ErrChan <- err + break } } })() @@ -60,40 +61,46 @@ func (s *Session) Serve() error { } func (s *Session) onJoin() { - for _, channel := range s.Message.GetArgsChannels() { - feed, done, err := s.Broker.Subscribe(s.Context, []string{channel}) - if err != nil { - s.ErrChan <- err - continue - } + channel := s.Message.GetArgsChannel() - s.DoneChannels[channel] = done + if channel == "" { + s.ErrChan <- errors.New("requested join on an empty chan") + return + } - go (func() { - for msg := range feed { - s.Writer <- msg - } - })() + feed, done, err := s.Broker.Subscribe(s.Context, channel) + if err != nil { + s.ErrChan <- err + return } + + s.DoneChannels[channel] = done + + go (func() { + for msg := range feed { + s.Writer <- msg + } + })() + } func (s *Session) onLeave() { - for _, channel := range s.Message.GetArgsChannels() { - s.DoneChannels[channel] <- struct{}{} - delete(s.DoneChannels, channel) - } + channel := s.Message.GetArgsChannel() + s.DoneChannels[channel] <- struct{}{} + delete(s.DoneChannels, channel) } func (s *Session) onBroadcast() { - for _, channel := range s.Message.GetArgsChannels() { - j, err := json.Marshal(s.Message.GetArgsContent()) - if err != nil { - s.ErrChan <- err - continue - } + channel := s.Message.GetArgsChannel() - if err := s.Broker.Publish(s.Context, channel, j); err != nil { - s.ErrChan <- err - } + j, err := json.Marshal(s.Message.GetArgsContent()) + if err != nil { + s.ErrChan <- err + return + } + + if err := s.Broker.Publish(s.Context, channel, j); err != nil { + s.ErrChan <- err } + } From ad6c0433dd2de0ec095cb694549cbb72a803e134 Mon Sep 17 00:00:00 2001 From: Mohamed Al Ashaal Date: Wed, 5 Jun 2024 16:02:23 +0300 Subject: [PATCH 6/7] renamed publish endpoint to broadcast for consistency reasons --- .env | 4 ++-- README.md | 15 +++++++-------- config/config.go | 26 +++++++++++++------------- main.go | 2 +- routes/{publish.go => broadcast.go} | 10 +++++----- 5 files changed, 28 insertions(+), 29 deletions(-) rename routes/{publish.go => broadcast.go} (69%) diff --git a/.env b/.env index 75abb6f..70dff1a 100644 --- a/.env +++ b/.env @@ -13,5 +13,5 @@ SERVER_LISTEN_ADDR=":3000" # the interceptor endpoint url which will receive all POST requests for interceptions INTERCEPTOR_ENDPOINT_URL="" -# the key that must match '/publish?key=$VALUE' in order to allow publishing -SERVER_PUBLISHING_KEY="secret" +# the key that must match '/broadcast?key=$VALUE' in order to allow broadcasting +SERVER_BROADCASTING_KEY="secret" diff --git a/README.md b/README.md index c609ecf..b14e366 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,10 @@ TODOs ===== -- [ ] As a client, I want to identify myself. -- [x] As a client, I want to join to channel(s). -- [x] As a client, I want to leave from channel(s). -- [x] As a client, I want to broadcast message(s) to channel(s). -- [ ] As a client, I must reset my state if the server closed the connection. -- [ ] As a developer, I want to intercept the websocket messages. +- [x] As a client, I want to join to channel. +- [x] As a client, I want to leave from channel. +- [x] As a client, I want to broadcast a message to a channel. +- [ ] As a server, I must close the connection if the interceptor returned none-valid status codes. - [ ] As a server, I understand the following status codes from the interceptor responses (100 "continue", 200 "ok but don't continue", anything else for error). -- [ ] As a developer, I want to broadcast message(s) to channel(s). -- [ ] As a developer, I have to consider my client id as a channel and I have to automatically join it. \ No newline at end of file +- [ ] As a developer, I want to intercept the websocket messages. +- [ ] As a developer, I want to receive all query params used to connect to the websocket endpoint. +- [x] As a developer, I want to broadcast a message to a channel. diff --git a/config/config.go b/config/config.go index 070dd7d..22a5ed4 100644 --- a/config/config.go +++ b/config/config.go @@ -7,12 +7,12 @@ import ( ) type Config struct { - logger *slog.Logger - brokerDriver string - brokerDSN string - interceptorEndpointURL string - webServerListenAddress string - webServerPublishingKey string + logger *slog.Logger + brokerDriver string + brokerDSN string + interceptorEndpointURL string + webServerListenAddress string + webServerBroadcastingKey string } func NewFromEnv(envFilename string) (*Config, error) { @@ -21,12 +21,12 @@ func NewFromEnv(envFilename string) (*Config, error) { } return &Config{ - logger: slog.New(slog.NewJSONHandler(os.Stdout, nil)), - brokerDriver: os.Getenv("BROKER_DRIVER"), - brokerDSN: os.Getenv("BROKER_DSN"), - interceptorEndpointURL: os.Getenv("INTERCEPTOR_ENDPOINT_URL"), - webServerListenAddress: os.Getenv("SERVER_LISTEN_ADDR"), - webServerPublishingKey: os.Getenv("SERVER_PUBLISHING_KEY"), + logger: slog.New(slog.NewJSONHandler(os.Stdout, nil)), + brokerDriver: os.Getenv("BROKER_DRIVER"), + brokerDSN: os.Getenv("BROKER_DSN"), + interceptorEndpointURL: os.Getenv("INTERCEPTOR_ENDPOINT_URL"), + webServerListenAddress: os.Getenv("SERVER_LISTEN_ADDR"), + webServerBroadcastingKey: os.Getenv("SERVER_BROADCASTING_KEY"), }, nil } @@ -49,4 +49,4 @@ func (c *Config) GetWebServerListenAddr() string { return c.webServerListenAddress } -func (c *Config) GetWebServerPublishingKey() string { return c.webServerPublishingKey } +func (c *Config) GetWebServerBroadcastingKey() string { return c.webServerBroadcastingKey } diff --git a/main.go b/main.go index c2ccb00..db80e2c 100644 --- a/main.go +++ b/main.go @@ -36,7 +36,7 @@ func main() { srv.Use(middleware.Logger()) srv.GET("/ws/:id", routes.WebsocketRouteHandler(cfg, brokerConn)) - srv.POST("/publish", routes.PublishHandler(cfg, brokerConn)) + srv.POST("/broadcast", routes.BroadcastHandler(cfg, brokerConn)) log.Fatal(srv.Start(cfg.GetWebServerListenAddr())) } diff --git a/routes/publish.go b/routes/broadcast.go similarity index 69% rename from routes/publish.go rename to routes/broadcast.go index 3c3790b..6c25788 100644 --- a/routes/publish.go +++ b/routes/broadcast.go @@ -9,9 +9,9 @@ import ( "strings" ) -func PublishHandler(cfg *config.Config, brokerConn broker.Driver) echo.HandlerFunc { +func BroadcastHandler(cfg *config.Config, brokerConn broker.Driver) echo.HandlerFunc { return func(c echo.Context) error { - if c.QueryParam("key") != cfg.GetWebServerPublishingKey() { + if c.QueryParam("key") != cfg.GetWebServerBroadcastingKey() { return c.NoContent(http.StatusForbidden) } @@ -25,18 +25,18 @@ func PublishHandler(cfg *config.Config, brokerConn broker.Driver) echo.HandlerFu } if err := c.Bind(&msg); err != nil { - cfg.GetLogger().Error(err.Error(), "func", "PublishHandler.Bind") + cfg.GetLogger().Error(err.Error(), "func", "BroadcastHandler.Bind") return c.NoContent(http.StatusBadRequest) } j, err := json.Marshal(msg.Content) if err != nil { - cfg.GetLogger().Error(err.Error(), "func", "PublishHandler.json.Unmarshal") + cfg.GetLogger().Error(err.Error(), "func", "BroadcastHandler.json.Unmarshal") return c.NoContent(http.StatusInternalServerError) } if err := brokerConn.Publish(c.Request().Context(), msg.Channel, j); err != nil { - cfg.GetLogger().Error(err.Error(), "func", "PublishHandler.broker.Publish") + cfg.GetLogger().Error(err.Error(), "func", "BroadcastHandler.broker.Publish") return c.NoContent(http.StatusInternalServerError) } From de03e9fc2340e413fadfda6056fd1144654e4dbe Mon Sep 17 00:00:00 2001 From: Mohamed Al Ashaal Date: Wed, 5 Jun 2024 22:33:38 +0300 Subject: [PATCH 7/7] finalized the v3 features --- .env | 17 ----- .github/workflows/releaser.yaml | 63 +++++++++++++++++++ Dockerfile | 21 +++++++ README.md | 106 +++++++++++++++++++++++++++++--- config/config.go | 30 +++++---- main.go | 15 ++--- routes/ws.go | 19 ++++++ session/message.go | 1 + session/session.go | 13 +++- utils/authorizer.go | 38 ++++++++++++ 10 files changed, 272 insertions(+), 51 deletions(-) delete mode 100644 .env create mode 100644 .github/workflows/releaser.yaml create mode 100644 Dockerfile create mode 100644 utils/authorizer.go diff --git a/.env b/.env deleted file mode 100644 index 70dff1a..0000000 --- a/.env +++ /dev/null @@ -1,17 +0,0 @@ -# the driver to be used as a message broker -# currently supported drivers are: "memory", "redis" -BROKER_DRIVER=redis - -# the driver data source name "connection string" in URL style -# for "memory", you don't have to provide it -# for "redis", it should be in the form of "redis://[user[@pass]:]host[:port]/[database_number]" -BROKER_DSN=redis://localhost - -# the webserver address to listen on -SERVER_LISTEN_ADDR=":3000" - -# the interceptor endpoint url which will receive all POST requests for interceptions -INTERCEPTOR_ENDPOINT_URL="" - -# the key that must match '/broadcast?key=$VALUE' in order to allow broadcasting -SERVER_BROADCASTING_KEY="secret" diff --git a/.github/workflows/releaser.yaml b/.github/workflows/releaser.yaml new file mode 100644 index 0000000..cf247ad --- /dev/null +++ b/.github/workflows/releaser.yaml @@ -0,0 +1,63 @@ +env: + CGO_ENABLED: "0" + REGISTRY: ghcr.io + IMAGE_NAME: alash3al/wsify + +on: + release: + types: [created] + +jobs: + binary-releaser: + name: Release Go Binary + runs-on: ubuntu-latest + strategy: + matrix: + goos: [linux, darwin] + goarch: [amd64, arm64] + steps: + - uses: actions/checkout@v4.1.3 + - uses: wangyoucao577/go-release-action@v1.50 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + goos: ${{ matrix.goos }} + goarch: ${{ matrix.goarch }} + goversion: "https://dl.google.com/go/go1.22.0.linux-amd64.tar.gz" + project_path: "." + binary_name: "wsify" + ldflags: "-s -w" + + docker-releaser: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + steps: + - name: Checkout repository + uses: actions/checkout@v4.1.3 + + - name: Get Latest Tag + id: var_tag + run: echo "name=$(git describe --tags --abbrev=0)" >> $GITHUB_OUTPUT + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3.3.0 + + - name: Log in to Github Container registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5.3.0 + with: + file: Dockerfile + context: . + push: true + tags: | + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.var_tag.outputs.name }} + cache-from: type=gha + cache-to: type=gha,mode=max \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d7ea20a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,21 @@ +FROM golang:1.22-alpine As builder + +WORKDIR /wsify/ + +RUN apk update && apk add git upx + +COPY go.mod go.sum ./ + +RUN go mod download + +COPY . . + +RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o /usr/bin/wsify . + +RUN upx -9 /usr/bin/wsify + +FROM alpine + +WORKDIR /wsify/ + +COPY --from=builder /usr/bin/wsify /usr/bin/wsify diff --git a/README.md b/README.md index b14e366..c9f9433 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,98 @@ -TODOs +WSIFY +====== +> a tiny general purpose websocket server that could be used for building real-time apps +> in addition to giving you the power to simply accept/reject any websocket message when using the +> authorizer feature (a webhook that should respond with 200 OK on success and anything else to reject). + +Philosophy +========== +- A websocket server should only responsible for transmitting messages in real-time between connected parties. +- It isn't the websocket server responsibility to authorize whether a party is allowed to send certain message or not. +- An authorizer must respond with `200 OK` if a party can send a message. + - `200 OK` means "Authorized" + - `5xx` means "the authorizer is down, please close the current connection and reconnect" + - anything else means "NotAuthorized" +- The client identity should be declared in the query params while connecting to the websocket endpoint and as an argument in the `args` of the message if needed. +- + +Definitions +============ +### Message +> is a data structure contains some data to be transmitted. + +```json5 +{ + // command is a string describes what should be done + // available commands are: + // "join": joins a channel (specified in the "args.channel") + // "leave": leaves a channel (specified in the "args.channel") + // "broadcast": broadcasts a content (specified in the "args.content") to a channel (in "args.channel") + "command": "join", + "args": { + "channel": "some_channel" + } +} +``` + +### Authorizer +> a webhook that responds with `200 OK` to accept a message sent by a websocket client, +> the authorizer will receive a `POST` request containing a message data structure as described above, but there +> is one special command that isn't described which is "connect", it is fired before accepting the websocket connection +> and it sounds like +```json5 +{ + "command": "connect", + "args": { + // array of all http headers sent by the client while trying to open a websocket connection. + "headers": [/*...*/], + // an object that contains all available query params sent by the client while trying to open a websocket connection. + "query": {/*...*/} + } +} +``` + +Usage ===== -- [x] As a client, I want to join to channel. -- [x] As a client, I want to leave from channel. -- [x] As a client, I want to broadcast a message to a channel. -- [ ] As a server, I must close the connection if the interceptor returned none-valid status codes. -- [ ] As a server, I understand the following status codes from the interceptor responses (100 "continue", 200 "ok but don't continue", anything else for error). -- [ ] As a developer, I want to intercept the websocket messages. -- [ ] As a developer, I want to receive all query params used to connect to the websocket endpoint. -- [x] As a developer, I want to broadcast a message to a channel. +> There is no need to say a lot on how to use this software, just connect using any websocket client to `http://wsify:3000/ws` and start sending messages + + +Examples +======== + +#### \> How can a client/device connect to the websocket service? +> by simply connecting to the following endpoint `ws://your.wsify.service:port/ws` + + +#### \> What is the command used to join a channel named "hello"? +> +```json +{ + "command": "join", + "args": { + "channel": "hello" + } +} +``` + +#### \> What is the command used to broadcast a message to the channel "hello"? +```json +{ + "command": "broadcast", + "args": { + "channel": "hello" + } +} +``` + +#### \> How can I publish message to `hello` from another server without connecting to the websocket endpoint? +> Do a post request to `/broadcast` with the following format: +```json5 +{ + "channel": "hello", + "args": { + "content": "Hello World! from the server" + } +} +``` + +### for more info look at `$ ./wsify --help` \ No newline at end of file diff --git a/config/config.go b/config/config.go index 22a5ed4..d9323ff 100644 --- a/config/config.go +++ b/config/config.go @@ -1,7 +1,7 @@ package config import ( - "github.com/joho/godotenv" + "flag" "log/slog" "os" ) @@ -10,23 +10,27 @@ type Config struct { logger *slog.Logger brokerDriver string brokerDSN string - interceptorEndpointURL string + authorizerEndpointURL string webServerListenAddress string webServerBroadcastingKey string } -func NewFromEnv(envFilename string) (*Config, error) { - if err := godotenv.Load(envFilename); err != nil { - return nil, err - } +func NewFromFlags() (*Config, error) { + brokerDriver := flag.String("broker-driver", "memory", "the message broker driver (redis, memory)") + brokerDSN := flag.String("broker-dsn", "", "the selected driver DSN (connection url), example: redis://localhost") + authorizerURL := flag.String("authorizer-url", "", "the endpoint url that will be used as the main authorizer webhook") + listenAddr := flag.String("listen-addr", ":3000", "the web server listen address") + broadcastingKey := flag.String("broadcasting-key", "", "key that will authorize all `/broadcast` calls") + + flag.Parse() return &Config{ logger: slog.New(slog.NewJSONHandler(os.Stdout, nil)), - brokerDriver: os.Getenv("BROKER_DRIVER"), - brokerDSN: os.Getenv("BROKER_DSN"), - interceptorEndpointURL: os.Getenv("INTERCEPTOR_ENDPOINT_URL"), - webServerListenAddress: os.Getenv("SERVER_LISTEN_ADDR"), - webServerBroadcastingKey: os.Getenv("SERVER_BROADCASTING_KEY"), + brokerDriver: *brokerDriver, + brokerDSN: *brokerDSN, + authorizerEndpointURL: *authorizerURL, + webServerListenAddress: *listenAddr, + webServerBroadcastingKey: *broadcastingKey, }, nil } @@ -41,8 +45,8 @@ func (c *Config) GetBrokerDriver() string { func (c *Config) GetBrokerDSN() string { return c.brokerDSN } -func (c *Config) GetInterceptorEndpointURL() string { - return c.interceptorEndpointURL +func (c *Config) GetAuthorizerEndpointURL() string { + return c.authorizerEndpointURL } func (c *Config) GetWebServerListenAddr() string { diff --git a/main.go b/main.go index db80e2c..d021ebf 100644 --- a/main.go +++ b/main.go @@ -2,24 +2,17 @@ package main import ( "github.com/alash3al/wsify/broker" + _ "github.com/alash3al/wsify/broker/drivers/memory" + _ "github.com/alash3al/wsify/broker/drivers/redis" "github.com/alash3al/wsify/config" "github.com/alash3al/wsify/routes" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" "log" - "os" - - _ "github.com/alash3al/wsify/broker/drivers/memory" - _ "github.com/alash3al/wsify/broker/drivers/redis" ) func main() { - envFilename := ".env" - if len(os.Args) > 1 { - envFilename = os.Args[1] - } - - cfg, err := config.NewFromEnv(envFilename) + cfg, err := config.NewFromFlags() if err != nil { panic(err.Error()) } @@ -35,7 +28,7 @@ func main() { srv.Use(middleware.CORS()) srv.Use(middleware.Logger()) - srv.GET("/ws/:id", routes.WebsocketRouteHandler(cfg, brokerConn)) + srv.GET("/ws", routes.WebsocketRouteHandler(cfg, brokerConn)) srv.POST("/broadcast", routes.BroadcastHandler(cfg, brokerConn)) log.Fatal(srv.Start(cfg.GetWebServerListenAddr())) diff --git a/routes/ws.go b/routes/ws.go index 48306a3..9196602 100644 --- a/routes/ws.go +++ b/routes/ws.go @@ -4,6 +4,7 @@ import ( "github.com/alash3al/wsify/broker" "github.com/alash3al/wsify/config" "github.com/alash3al/wsify/session" + "github.com/alash3al/wsify/utils" "github.com/labstack/echo/v4" "golang.org/x/net/websocket" "net/http" @@ -11,12 +12,30 @@ import ( func WebsocketRouteHandler(cfg *config.Config, brokerConn broker.Driver) echo.HandlerFunc { return func(c echo.Context) error { + canConnect, err := utils.ShouldAcceptPayload(cfg.GetAuthorizerEndpointURL(), session.Message{ + Command: session.MessageCommandTypeConnect, + Args: map[string]any{ + "headers": c.Request().Header, + "query": c.QueryParams(), + }, + }) + + if err != nil { + cfg.GetLogger().Error(err.Error(), "utils.ShouldAcceptPayload") + return c.NoContent(http.StatusForbidden) + } + + if !canConnect { + return c.NoContent(http.StatusForbidden) + } + return echo.WrapHandler(websocket.Server{ Handshake: func(c *websocket.Config, request *http.Request) error { return nil }, Handler: websocket.Handler(func(conn *websocket.Conn) { sess := session.Session{ Context: conn.Request().Context(), Broker: brokerConn, + Config: cfg, Conn: conn, DoneChannels: make(map[string]chan struct{}), ErrChan: make(chan error), diff --git a/session/message.go b/session/message.go index badad52..5c85cfa 100644 --- a/session/message.go +++ b/session/message.go @@ -5,6 +5,7 @@ import "strings" type MessageCommandType string const ( + MessageCommandTypeConnect = MessageCommandType("connect") MessageCommandTypeJoin = MessageCommandType("join") MessageCommandTypeLeave = MessageCommandType("leave") MessageCommandTypeBroadcast = MessageCommandType("broadcast") diff --git a/session/session.go b/session/session.go index 6daa61c..2f33650 100644 --- a/session/session.go +++ b/session/session.go @@ -5,6 +5,8 @@ import ( "encoding/json" "errors" "github.com/alash3al/wsify/broker" + "github.com/alash3al/wsify/config" + "github.com/alash3al/wsify/utils" "golang.org/x/net/websocket" "io" ) @@ -12,6 +14,7 @@ import ( type Session struct { Context context.Context Broker broker.Driver + Config *config.Config Conn *websocket.Conn Message Message DoneChannels map[string]chan struct{} @@ -49,6 +52,15 @@ func (s *Session) Serve() error { s.ErrChan <- err } + canProceed, err := utils.ShouldAcceptPayload(s.Config.GetAuthorizerEndpointURL(), s.Message) + if err != nil { + return err + } + + if !canProceed { + continue + } + switch s.Message.Command { case MessageCommandTypeJoin: s.onJoin() @@ -81,7 +93,6 @@ func (s *Session) onJoin() { s.Writer <- msg } })() - } func (s *Session) onLeave() { diff --git a/utils/authorizer.go b/utils/authorizer.go new file mode 100644 index 0000000..d166944 --- /dev/null +++ b/utils/authorizer.go @@ -0,0 +1,38 @@ +package utils + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" +) + +func ShouldAcceptPayload(url string, payload any) (bool, error) { + if url == "" { + return true, nil + } + + j, err := json.Marshal(payload) + if err != nil { + return false, err + } + + resp, err := http.Post(url, "application/json", bytes.NewReader(j)) + if err != nil { + return false, err + } + + defer (func() { + _ = resp.Body.Close() + })() + + if resp.StatusCode == http.StatusOK { + return true, nil + } + + if resp.StatusCode >= 500 { + return false, fmt.Errorf("unexpected statusCode (%d) returned from the authorizer", resp.StatusCode) + } + + return false, nil +}