-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrubychat.rb
337 lines (293 loc) · 10.9 KB
/
rubychat.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# coding: utf-8
# Copyright (C) 2004-2018 Quod Erat Demonstrandum e.V. <webmaster@qed-verein.de>
#
# This file is part of QED-Chat.
#
# QED-Chat is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# QED-Chat is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with QED-Chat. If not, see
# <http://www.gnu.org/licenses/>.
require 'thread'
require 'socket'
require 'cgi'
require 'json'
require 'sequel'
require 'digest'
require 'eventmachine'
require '/etc/chat/rubychat-config.rb'
require './rubychat-backend.rb'
require './rubychat-websockets.rb'
require './rubychat-common.rb'
class CGIAdapter < ::CGI
attr_reader :args, :env_table, :stdinput, :stdoutput
def initialize(enviroment, input, output, *args)
@env_table = enviroment
@stdinput = input
@stdoutput = output
@args = *args
super(*args)
end
end
def readSCGIHeaders(stream)
len = stream.gets(":").chomp(":").to_i
headers = Hash[*(stream.read(len).split("\0"))]
stream.read(1)
headers
end
class ChatError < StandardError; end
#Entry point for every request
#Points requests to the appropriate handler using the request-url
def handleRequest(cgi)
#Login requests must be passed first. All other requests are auth only
if cgi.script_name == "/rubychat/account"
accountHandler cgi
return
end
#These headers are sent with every response (except for login-responses)
headers = {
'Content-Type' => 'application/json; charset=utf-8', #All posts are sent as JSON
'Cache-Control' => 'no-cache, must-revalidate', #Posts shouldn't be cached
'Expires' => 'Sat, 26 Jul 1997 05:00:00 GMT',
'X-Frame-Options' => 'SAMEORIGIN'
}
begin
cookieAuthenticate cgi #Authenticate via cookie and set the userid
if Thread.current[:userid].nil?
cookie1 = CGI::Cookie::new('name' => 'userid', 'value' => '',
'path' => '/', 'expires' => Time.now - 3600 * 24)
cookie2 = CGI::Cookie::new('name' => 'pwhash', 'value' => '',
'path' => '/', 'expires' => Time.now - 3600 * 24)
cgi.out('type' => 'application/json', 'cookie' => [cookie1, cookie2]) {
{'result' => 'success', 'message' => 'Ausgeloggt'}.to_json}
return
end
raise ChatError, "Ungueltige Versionsnummer!" if cgi.has_key? 'version' && cgi['version'] != '20171030131648'
cgi.print cgi.http_header(headers)
#Direct to appropriate handler
if cgi.request_method == "POST" && cgi.script_name == "/rubychat/post" #New post
postHandler cgi
elsif cgi.request_method == "GET" && cgi.script_name == "/rubychat/view" #Get long-polling request
viewHandler cgi
elsif cgi.request_method == "GET" && cgi.script_name == "/rubychat/history" #Get history (range of posts)
historyHandler cgi
else
raise ChatError, "Unbekannter Befehl!"
end
rescue Errno::EPIPE => e
raise e
rescue StandardError => e
writeException e
cgi.print({'type' => 'error', 'description' => e.message + "\n" + e.backtrace.join("\n")}.to_json)
end
end
#Provieds a message queue for incoming posts
#Each post in this queue gets pushed to all websocket-connections
@messageQueue = EM::Queue.new
#This handler gets called when creating a new post
def postHandler(cgi)
#Extract parameters form request
name = cgi['name']
message = cgi['message']
channel = cgi['channel']
date = Time.new.strftime "%Y-%m-%d %H-%M-%S"
delay = cgi.has_key?('delay') ? cgi['delay'].to_i : nil
bottag = cgi.has_key?('bottag') ? cgi['bottag'].to_i : 0
publicid = cgi.has_key?('publicid') ? (cgi['publicid'].to_i == 0 ? 0 : 1) : 0
$chat.createPost(name, message, channel, date, Thread.current[:userid], delay, bottag, publicid)
#Notify all listeners about new post
$mutex.synchronize{$increment += 1; $condition.broadcast}
@messageQueue.push(channel)
cgi.print({'type' => 'ok', 'finished' => 1}.to_json)
end
#Endpoint for long polling
#Handles requests to create new long polling connection
def viewHandler(cgi)
#Parse request and set default values if missing
position = cgi.has_key?('position') ? cgi['position'].to_i : -24
limit = cgi.has_key?('limit') ? cgi['limit'].to_i : 1000
#Get the -position last posts if position isn't positive
if position <= 0 then
position = $chat.getCurrentId(cgi['channel'], -position)
end
cgi.print ({'type' => 'ok', 'started' => 1}.to_json) + "\n"
cgi.stdoutput.flush
#Enter message loop
messageLoop(cgi) {
$chat.getPostsByStartId(cgi['channel'], position, limit) {|row|
outputPosting(cgi, row.to_h)
position = row[:id].to_i + 1
limit -= 1 }
cgi.stdoutput.flush
break if limit <= 0
}
cgi.print ({'type' => 'ok', 'finished' => 1}.to_json) + "\n"
cgi.stdoutput.flush
end
def messageLoop(cgi)
keepalive = cgi.has_key?('keepalive') ? cgi['keepalive'].to_i : 30
keepalive = 1.0 / 0.0 if keepalive <= 0
timeout = cgi.has_key?('timeout') ? cgi['timeout'].to_i : 3600
timeout = Time.now + timeout
threadIncrement = nil; sendKeepalive = false
begin
if sendKeepalive then
cgi.print ({'type' => 'ok'}.to_json) + "\n"
cgi.stdoutput.flush
else
yield
end
$mutex.synchronize {
$condition.wait($mutex, [[timeout - Time.now, keepalive].min, 0].max) if threadIncrement == $increment
sendKeepalive = (threadIncrement == $increment)
threadIncrement = $increment}
raise ChatError, "Server wurde beendet" if !$running
end while Time.now < timeout
end
#Gets called when the histroy is request
def historyHandler(cgi)
cgi.print ({'type' => 'ok', 'started' => 1}.to_json) + "\n"
channel = cgi['channel']
case cgi['mode']
when 'dateinterval'
$chat.getPostsByDateInterval(channel, cgi['from'], cgi['to']) {|row| outputPosting(cgi, row)}
when 'daterecent'
from = Time.now - cgi['last'].to_i
$chat.getPostsByStartDate(channel, from) {|row| outputPosting(cgi, row)}
when 'postinterval'
$chat.getPostsByIdInterval(channel, cgi['from'].to_i, cgi['to'].to_i) {|row| outputPosting(cgi, row)}
when 'postrecent'
$chat.getPostsByStartId(channel, $chat.getCurrentId(channel, cgi['last'].to_i)) {|row| outputPosting(cgi, row)}
when 'fromownpost'
$chat.getPostsByStartId(channel, $chat.getLastPostId(channel, Thread.current[:userid], cgi['skip'].to_i)) {|row| outputPosting(cgi, row)}
else
raise ChatError, "Unbekannter Modus!"
end
cgi.print ({'type' => 'ok', 'finished' => 1}.to_json) + "\n"
end
def accountHandler(cgi)
if cgi['logout'] == '1' then
cookie1 = CGI::Cookie::new('name' => 'userid', 'value' => '',
'path' => '/', 'expires' => Time.now - 3600 * 24)
cookie2 = CGI::Cookie::new('name' => 'pwhash', 'value' => '',
'path' => '/', 'expires' => Time.now - 3600 * 24)
cgi.out('type' => 'application/json', 'cookie' => [cookie1, cookie2]) {
{'result' => 'success', 'message' => 'Ausgeloggt'}.to_json}
return
end
user = $chat.userAuthenticate(cgi['username'], cgi['password'])
if user.nil? then
cgi.out('type' => 'application/json') {
{'result' => 'fail', 'message' => 'Logindaten sind ungültig'}.to_json}
else
#Userid cannot be httponly due to the way chat.js detects login-state
cookie1 = CGI::Cookie::new('name' => 'userid', 'value' => user[:id].to_s,
'path' => '/', 'expires' => Time.now + $tokenExpirationSeconds, 'secure' => $secureCookies)
cookie2 = CGI::Cookie::new('name' => 'pwhash', 'value' => $chat.getCookie(user[:id]),
'path' => '/', 'expires' => Time.now + $tokenExpirationSeconds, 'secure' => $secureCookies, 'httponly' => true)
cgi.out('type' => 'application/json', 'cookie' => [cookie1, cookie2]) {
{'result' => 'success', 'message' => 'Eingeloggt'}.to_json}
end
end
def outputPosting(cgi, posting)
cgi.print $chat.formatAsJson(posting) + "\n"
end
def cookieAuthenticate(cgi)
return if !cgi.cookies.keys.include?('userid') || !cgi.cookies.keys.include?('pwhash')
return if cgi.cookies['pwhash'][0].nil?
Thread.current[:userid] = $chat.checkCookie(cgi.cookies['userid'][0], cgi.cookies['pwhash'][0])
end
$mutex = Mutex.new
$condition = ConditionVariable.new
$increment = 0
$running = true
$chat = ChatBackend.new
threads = []
Signal.trap("USR2") do
active = threads.select {|thread| !thread[:cgi].nil? && thread.alive?}
STDERR.printf "Anzahl der Threads: %d\n", threads.size
STDERR.printf "Anzahl der offenen Verbindungen: %d\n", active.size
active.each {|thread|
STDERR.printf "Verbunden: IP=%s RequestURI=%s UserAgent=%s\n", thread[:cgi].remote_addr,
thread[:cgi].script_name.to_s + '?' + thread[:cgi].query_string.to_s, thread[:cgi].user_agent}
end
scgiServer = TCPServer.new "127.0.0.1", $scgiPort #Start tcp server for Scgi
writeToLog "Chatserver wurde gestartet."
#Start seperate thread to listen to WS-requests
wsServerThread = Thread.new do
begin
#Start EventMachine. This blocks this thread
EventMachine.run {
#Redirect all uncaught errors raised in the eventloop to stderr
EM.error_handler{ |e|
writeException e
}
EM.start_server "127.0.0.1", $wsPort, WsConnection, @messageQueue
#Ping clients every n seconds. This allows us to kick clients who went away silently
EM.add_periodic_timer($wsPingInterval) {
$connectedClients.each { |client|
client.ping
}
}
#Handle new items in messageQueue
processPost = Proc.new { |channel|
$connectedClients.each { |client|
#Only process posts if the connection has the correct channel to reduce load on db
if client.channel == channel
$chat.getPostsByStartId(channel, client.position) { |post|
client.send_post post
client.position = post[:id] + 1
}
end
}
#Check for new items on next tick
EM.next_tick { @messageQueue.pop &processPost }
}
#Begin first check of messageQueue. From now on the checks will executed within processPost
@messageQueue.pop &processPost
}
rescue Exception => e
writeException e
end
end
#Main loop: Listen for SCGI-requests and process them
begin
loop do
#Process new SCGI-request
scgiConnection = scgiServer.accept
scgiThread = Thread.new do #Each request gets its own thread
begin
#Parse arguments and store them in the thread
headers = readSCGIHeaders scgiConnection
cgi = CGIAdapter.new headers, scgiConnection, scgiConnection
Thread.current[:cgi] = cgi;
#Begin request-handling
handleRequest cgi
rescue Errno::EPIPE, Errno::ECONNRESET => e
writeToLog sprintf("Verbindung abgebrochen: %s", e.message);
rescue Exception => e
writeException e
ensure
scgiConnection.close
end
end
#Store thread
threads.push scgiThread
threads.keep_if {|scgiThread| scgiThread.alive?}
end
rescue Interrupt => e
writeToLog "Beende den Chatserver."
$running = false
$mutex.synchronize {$condition.broadcast}
threads.each {|t| t.join(1)}
ensure
writeToLog "Chatserver wurde beendet."
end