Skip to content

Commit

Permalink
cluster mode on different threads
Browse files Browse the repository at this point in the history
  • Loading branch information
alekcz committed Jul 6, 2021
1 parent 4410310 commit aed771c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 28 deletions.
12 changes: 2 additions & 10 deletions src/pcp/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,8 @@
([path]
(let [scgi-port (Integer/parseInt (or (System/getenv "SCGI_PORT") "9000"))]
(case path
""
(scgi/serve scgi-handler scgi-port)

"-c"
(do
(scgi/serve scgi-handler scgi-port)
(scgi/serve scgi-handler 9007)
(scgi/serve scgi-handler 9014)
(scgi/serve scgi-handler 9021))

"" (scgi/serve scgi-handler scgi-port)
"-c" (scgi/serve scgi-handler scgi-port :cluster true)
(run-script path)))))


53 changes: 35 additions & 18 deletions src/pcp/scgi.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns pcp.scgi
(:require [clojure.string :as str])
(:require [clojure.string :as str]
[clojure.core.async :as async])
(:import [java.nio.channels ServerSocketChannel SocketChannel Selector SelectionKey]
[java.nio ByteBuffer]
[java.net InetSocketAddress InetAddress]
Expand All @@ -8,8 +9,6 @@

(set! *warn-on-reflection* 1)

(def ^Selector selector (Selector/open))

(defn to-byte-array [^String text]
(-> text (.getBytes "UTF-8") ByteBuffer/wrap))

Expand Down Expand Up @@ -41,7 +40,7 @@
(assoc! :body (:body req))
(persistent!))))

(defn on-accept [^SelectionKey key]
(defn on-accept [selector ^SelectionKey key]
(let [^ServerSocketChannel channel (.channel key)
^SocketChannel socketChannel (.accept channel)]
(.configureBlocking socketChannel false)
Expand Down Expand Up @@ -95,29 +94,47 @@
(.cancel key))))
(catch Exception _ (.close socket-channel) (.cancel key)))))

(defn build-server [port]
(defn build-server [port selector]
(let [^ServerSocketChannel serverChannel (ServerSocketChannel/open)
portAddr (InetSocketAddress. ^InetAddress (InetAddress/getByName "127.0.0.1") ^Integer port)]
(.configureBlocking serverChannel false)
(.bind (.socket serverChannel) portAddr)
(.register serverChannel selector SelectionKey/OP_ACCEPT)
serverChannel))

(defn serve [handler port]
(defn run-selection [active handler ^Selector selector]
(async/thread
(while (some? @active)
(if (not= 0 (.select selector 50))
(let [keys (.selectedKeys selector)]
(doseq [^SelectionKey key keys]
(let [ops (.readyOps key)]
(cond
(= ops SelectionKey/OP_ACCEPT) (on-accept selector key)
(= ops SelectionKey/OP_READ) (on-read key handler))))
(.clear keys))
nil))))

(defn serve [handler port &{:keys [cluster]}]
(let [active (atom true)
^ServerSocketChannel server (build-server port)]
(future
(while (some? @active)
(if (not= 0 (.select selector 50))
(let [keys (.selectedKeys selector)]
(doseq [^SelectionKey key keys]
(let [ops (.readyOps key)]
(cond
(= ops SelectionKey/OP_ACCEPT) (on-accept key)
(= ops SelectionKey/OP_READ) (on-read key handler))))
(.clear keys))
nil)))
^Selector selector (Selector/open)
^Selector selector2 (when cluster (Selector/open))
^Selector selector3 (when cluster (Selector/open))
^Selector selector4 (when cluster (Selector/open))
^ServerSocketChannel server (build-server port selector)
^ServerSocketChannel server2 (when cluster (build-server 9007 selector2))
^ServerSocketChannel server3 (when cluster (build-server 9014 selector3))
^ServerSocketChannel server4 (when cluster (build-server 9021 selector4))]
(run-selection active handler selector)
(when cluster
(run-selection active handler selector2)
(run-selection active handler selector3)
(run-selection active handler selector4))
(future (while (some? @active) nil))
(fn []
(.close server)
(when cluster (.close server2))
(when cluster (.close server3))
(when cluster (.close server4))
(reset! active false))))

0 comments on commit aed771c

Please sign in to comment.