-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathProgram.fs
364 lines (287 loc) · 10.4 KB
/
Program.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
module rec MiloneLspServer.Program
open System
open System.Collections.Generic
open System.IO
open System.Threading
open System.Threading.Channels
open MiloneShared.UtilParallel
open MiloneLspServer.JsonRpcReader
open MiloneLspServer.LspServer
open MiloneLspServer.LspUtil
open MiloneLspServer.Util
// FIXME: shouldn't depend
module SyntaxApi = MiloneSyntax.SyntaxApi
module LLS = MiloneLspServer.LspLangService
module LspTests = MiloneLspServer.LspTests
// -----------------------------------------------
// .NET functions
// -----------------------------------------------
let private opt (s: string) =
match s with
| null
| "" -> None
| _ -> Some s
let private getMiloneHomeEnv () =
Environment.GetEnvironmentVariable("MILONE_HOME")
|> opt
let private getHomeEnv () =
Environment.GetFolderPath(Environment.SpecialFolder.UserProfile)
|> opt
let private miloneHome =
SyntaxApi.getMiloneHomeFromEnv getMiloneHomeEnv getHomeEnv
|> Path.normalize
let private readTextFile (filePath: string) : Future<string option> =
try
if File.Exists(filePath) then
File.ReadAllTextAsync(filePath)
|> Future.ofTask
|> Future.map Some
else
Future.just None
with
| _ -> Future.just None
let private getHost () : LLS.WorkspaceAnalysisHost =
{ MiloneHome = miloneHome
FileExists =
fun path ->
traceFn "FileExists (%s)" path
File.Exists path
ReadTextFile =
fun path ->
traceFn "ReadTextFile (%s)" path
readTextFile path
DirEntries =
fun dir ->
traceFn "DirEntries (%s)" dir
let collect paths =
paths |> Array.map Path.normalize |> Array.toList
collect (Directory.GetFiles(dir)), collect (Directory.GetDirectories(dir)) }
// -----------------------------------------------
// Actors
// -----------------------------------------------
// Program works concurrently in a way based on the actor model.
//
// An actor is single long-running procedure.
// Actors run in parallel.
// Each actor owns a channel that only the actor can receive from.
//
// Actors can and only can interact with other actors
// by sending messages to their channels.
//
// Actors don't share mutable states except for channels
// so that mutation of states don't cause data races.
//
// There are three actors here:
//
// - Reader:
// Reader actor reads from standard input for incoming LSP messages
// and sends to Server actor and Canceller actor.
// - Server:
// Server actor receives LSP messages from its channel (sent by Reader)
// to process them.
// This also writes LSP messages to standard output to for the LSP client.
// - Canceller:
// Canceller actor manages message Ids and cancellation tokens
// to signal tokens on cancel.
// Note Reader actor can't do this task
// because Reader needs to wait for standard input, can't have their own channel
// and can't tell which messages are processed (if I understand correctly).
//
// To shutdown the program gracefully, these actors end to run by themselves.
//
// - Reader stops when standard input is closed.
// - Server and Canceller accepts "shutdown" message from channel.
[<RequireQualifiedAccess; NoEquality; NoComparison>]
type private ServerMsg =
| Receive of JsonValue * LspIncome * CancellationToken option
| Shutdown
[<RequireQualifiedAccess; NoEquality; NoComparison>]
type private CancellerMsg =
| Start of msgId: JsonValue * CancellationTokenSource
| End of msgId: JsonValue
| Cancel of msgId: JsonValue
| Shutdown
let private summarizeMsg msg =
let id =
match msg with
| JObject map ->
match map |> Map.tryFind "id" with
| Some (JString it) -> it
| Some (JNumber it) -> string it
| _ -> "_"
| _ -> "_"
let m =
match msg with
| JObject map ->
match map |> Map.tryFind "method" with
| Some (JString it) -> it
| _ -> "??"
| _ -> "??"
$"{id}:{m}"
[<EntryPoint>]
let main (args: string array) =
match args with
| [| "test" |] -> LspTests.lspTests (getHost ())
| _ -> ()
async {
// Cancellation token of the program.
let! serverCt = Async.CancellationToken
use _ctRegistration =
serverCt.Register(Action(fun () -> traceFn "serverCt gets cancelled."))
let serverChannel = Channel.CreateBounded<ServerMsg>(3000)
let cancellerChannel =
Channel.CreateBounded<CancellerMsg>(3000)
let readChannel (channel: Channel<'T>) =
channel.Reader.ReadAsync(serverCt).AsTask()
|> Async.AwaitTask
let writeChannel msg (channel: Channel<'T>) =
channel.Writer.WriteAsync(msg, serverCt).AsTask()
|> Async.AwaitTask
use input = openStdin ()
traceFn "program (thread:%d)" Environment.CurrentManagedThreadId
let! readerCompleter =
let reader = JsonRpcReader.create input
let rec loop () =
async {
traceFn "reader reading (thread:%d)" Environment.CurrentManagedThreadId
match JsonRpcReader.read reader with
| JsonRpcReadResult.Received msg ->
traceFn "reader received %s" (summarizeMsg msg)
let income = parseIncome msg
match LspIncome.asCancelRequest income with
| Some msgId ->
traceFn "reader requests cancel of %A" msgId
do! writeChannel (CancellerMsg.Cancel msgId) cancellerChannel
return! loop ()
| None ->
let mutable ctOpt = None
match LspIncome.asMsgId income with
| Some msgId ->
let cts = new CancellationTokenSource()
traceFn "reader notified start of %A" msgId
do! writeChannel (CancellerMsg.Start(msgId, cts)) cancellerChannel
ctOpt <- Some cts.Token
| None -> ()
do! writeChannel (ServerMsg.Receive(msg, income, ctOpt)) serverChannel
return! loop ()
| JsonRpcReadResult.Stopped ->
traceFn "reader stopped"
return! writeChannel ServerMsg.Shutdown serverChannel
}
Async.StartChild(loop ())
let! serverCompleter =
let server = LspServer.create (getHost ())
// true if the published diagnostics are still fresh.
let mutable diagnosticsFresh = false
let rec loop () =
async {
traceFn "server reading (thread:%d)" Environment.CurrentManagedThreadId
match! readChannel serverChannel with
| ServerMsg.Receive (msg, income, ctOpt) ->
traceFn "server received (%s) (thread:%d)" (summarizeMsg msg) Environment.CurrentManagedThreadId
let mutable cancelled = false
match ctOpt with
| Some ct when ct.IsCancellationRequested -> cancelled <- true
| Some ct ->
do! Async.Sleep(30) // Wait for cancellation.
cancelled <- ct.IsCancellationRequested
| None -> ()
if not cancelled then
let ct =
ctOpt
|> Option.defaultValue CancellationToken.None
let result = LspServer.processNext income ct server
match LspIncome.asMsgId income with
| Some msgId ->
traceFn "server end process %A" msgId
do! writeChannel (CancellerMsg.End msgId) cancellerChannel
| None -> ()
match result with
| Continue ->
diagnosticsFresh <-
diagnosticsFresh
&& not (LspIncome.affectsDiagnostics income)
// Automatically update diagnostics
// if the last result isn't fresh (or not published yet)
// after some query request.
if not diagnosticsFresh && LspIncome.isQuery income then
let ct = CancellationToken.None
match LspServer.processNext LspIncome.diagnostics ct server with
| Continue ->
diagnosticsFresh <- true
return! loop ()
| Exit _ -> return! failwith "unreachable"
else
return! loop ()
| Exit exitCode -> return exitCode
else
traceFn "server cancelled %A" (summarizeMsg msg)
return! loop ()
| ServerMsg.Shutdown -> return 0
}
Async.StartChild(
async {
let! exitCode = loop ()
traceFn "server shutting down (code:%d)" exitCode
do! writeChannel CancellerMsg.Shutdown cancellerChannel
return exitCode
}
)
let! cancellerCompleter =
let mutable map =
Dictionary<JsonValue, CancellationTokenSource>()
let clear () =
for cts in map.Values do
cts.Dispose()
map.Clear()
let rec loop () =
async {
traceFn "canceller reading (thread:%d)" Environment.CurrentManagedThreadId
match! readChannel cancellerChannel with
| CancellerMsg.Start (msgId, cts) ->
traceFn "canceller received start of %A" msgId
if map.Count >= 3100100 then
warnFn "Canceller map clears because of too large size."
clear ()
let ok = map.TryAdd(msgId, cts)
if not ok then cts.Dispose() // Duplicated msgId.
return! loop ()
| CancellerMsg.End msgId ->
traceFn "canceller received end of %A" msgId
match map.TryGetValue(msgId) with
| true, cts ->
map.Remove(msgId) |> ignore
cts.Dispose()
| false, _ ->
traceFn "canceller, End of %A not found" msgId
()
return! loop ()
| CancellerMsg.Cancel msgId ->
traceFn "canceller received cancel of %A" msgId
match map.TryGetValue(msgId) with
| true, cts ->
map.Remove(msgId) |> ignore
cts.Cancel()
| false, _ ->
traceFn "canceller, cancel of %A not found" msgId
()
return! loop ()
| CancellerMsg.Shutdown ->
traceFn "canceller shutting down"
clear ()
}
Async.StartChild(
async {
try
return! loop ()
with
| ex -> errorFn "canceller failed: %A" ex
}
)
do! readerCompleter
do! cancellerCompleter
let! exitCode = serverCompleter
traceFn "gracefully exit"
return exitCode
}
|> Async.RunSynchronously