-
Notifications
You must be signed in to change notification settings - Fork 0
/
KVS6SizePoly.hs
161 lines (139 loc) · 6.73 KB
/
KVS6SizePoly.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-
-}
module KVS6SizePoly where
import Choreography
import CLI
import Control.Monad.IO.Class (liftIO, MonadIO)
import Data.Foldable (toList)
import Data.IORef (IORef)
import qualified Data.IORef as IORef
import Data.List (nub)
import Data.Map (Map)
import Data.Map qualified as Map
import GHC.TypeLits (KnownSymbol)
import Test.QuickCheck (Arbitrary, arbitrary, listOf, frequency)
import Text.Read (readMaybe)
readIORef :: MonadIO m => IORef a -> m a
readIORef = liftIO <$> IORef.readIORef
modifyIORef :: MonadIO m => IORef a -> (a -> a) -> m a
modifyIORef ref f = do a <- readIORef ref
liftIO $ IORef.modifyIORef ref f
return a
newIORef :: MonadIO m => a -> m (IORef a)
newIORef = liftIO <$> IORef.newIORef
-- $(mkLoc "client")
-- $(mkLoc "primary")
-- $(mkLoc "backup1")
-- $(mkLoc "backup2")
-- type Participants = ["client", "primary", "backup1", "backup2"]
kvs :: (KnownSymbol client) => ReplicationStrategy ps (CLI m) -> Member client ps -> Choreo ps (CLI m) ()
kvs ReplicationStrategy{setup, primary, handle} client = do
rigging <- setup
let go = do request <- (client, readRequest) -~> primary @@ nobody
response <- handle rigging singleton request
case response of Stopped -> return ()
_ -> do client `_locally_` putOutput "Recieved:" response
go
go
naryReplicationStrategy :: (KnownSymbol primary, KnownSymbols backups, KnownSymbols ps, MonadIO m)
=> Member primary ps -> Subset backups ps -> ReplicationStrategy ps m
naryReplicationStrategy primary backups = ReplicationStrategy{
primary
, setup = servers `_parallel` newIORef (Map.empty :: State)
, handle = \stateRef pHas request -> do
request' <- (primary, (pHas, request)) ~> servers
localResponse <- servers `parallel` \server un ->
handleRequest (viewFacet un server stateRef) (un server request')
responses <- gather servers (primary @@ nobody) localResponse
response <- (primary @@ nobody) `congruently` \un ->
case nub (toList $ un refl responses) of [r] -> r
rs -> Desynchronization rs
broadcast (primary, response) }
where servers = primary @@ backups
data ReplicationStrategy ps m = forall primary rigging. (KnownSymbol primary) =>
ReplicationStrategy { primary :: Member primary ps
, setup :: Choreo ps m rigging
, handle :: forall starts.
rigging -> Member primary starts -> Located starts Request
-> Choreo ps m Response }
data Request = Put String String | Get String | Stop deriving (Eq, Ord, Read, Show)
data Response = Found String | NotFound | Stopped | Desynchronization [Response]
deriving (Eq, Ord, Read, Show)
-- | PUT returns the old stored value; GET returns whatever was stored.
handleRequest :: (MonadIO m) => IORef State -> Request -> m Response
handleRequest stateRef (Put key value) = mlookup key <$> modifyIORef stateRef (Map.insert key value)
handleRequest stateRef (Get key) = mlookup key <$> readIORef stateRef
handleRequest _ Stop = return Stopped
mlookup :: String -> State -> Response
mlookup key = maybe NotFound Found . Map.lookup key
type State = Map String String
newtype Args = Args [Request] deriving (Eq, Ord, Read, Show)
instance Arbitrary Args where
arbitrary = do reqs <- pgs
return . Args $ reqs ++ [Stop]
where pgs = listOf $ frequency [ (1, Put <$> arbitrary <*> arbitrary)
, (1, Get <$> arbitrary)
]
readRequest :: CLI m Request
readRequest = do line <- getstr "Command?"
case line of
[] -> return Stop
_ -> case readMaybe line of
Just t -> return t
Nothing -> putNote "Invalid command" >> readRequest
-- | `nullReplicationStrategy` is a replication strategy that does not replicate the state.
nullReplicationStrategy :: (KnownSymbol primary, KnownSymbols ps, MonadIO m)
=> Member primary ps
-> ReplicationStrategy ps m
nullReplicationStrategy primary =
ReplicationStrategy{ primary
, setup = primary `_locally` newIORef (Map.empty :: State)
, handle = \stateRef pHas request -> (
(primary, \un -> handleRequest (un singleton stateRef) (un pHas request)) ~~> refl
) >>= naked refl
}
naryHumans :: (KnownSymbol primary, KnownSymbols backups, KnownSymbols ps, MonadIO m)
=> Member primary ps
-> Subset backups ps
-> ReplicationStrategy ps (CLI m)
naryHumans primary backups =
ReplicationStrategy{ primary
, setup = primary `_locally` newIORef (Map.empty :: State)
, handle = \stateRef pHas request -> do
request' <- (primary, (pHas, request)) ~> backups
backupResponse <- backups `parallel` \server un -> readResponse (un server request')
localResponse <- primary `locally` \un -> handleRequest (un singleton stateRef) (un pHas request)
responses <- gather backups (primary @@ nobody) backupResponse
response <- (primary @@ nobody) `congruently` \un ->
case nub $ un refl localResponse : toList (un refl responses) of
[r] -> r
rs -> Desynchronization rs
((primary, response) ~> refl) >>= naked refl
}
where readResponse :: Request -> CLI m Response
readResponse r = do line <- getstr $ show r ++ ": "
case line of
[] -> return NotFound
_ -> return $ Found line
{-main :: IO ()
main = do
[loc] <- getArgs
case loc of
"client" -> runChoreography config primaryBackupChoreo "client"
"primary" -> runChoreography config primaryBackupChoreo "primary"
"backup1" -> runChoreography config primaryBackupChoreo "backup1"
"backup2" -> runChoreography config primaryBackupChoreo "backup2"
_ -> error "unknown party"
return ()
where
config =
mkHttpConfig
[ ("client", ("localhost", 3000)),
("primary", ("localhost", 4000)),
("backup1", ("localhost", 5000)),
("backup2", ("localhost", 6000))
]
-}