Skip to content

Commit 4cd99c6

Browse files
committed
otel: add opentelemetry traces
1 parent 58999f5 commit 4cd99c6

File tree

8 files changed

+189
-71
lines changed

8 files changed

+189
-71
lines changed

postgrest.cabal

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ library
6969
PostgREST.Query.QueryBuilder
7070
PostgREST.Query.SqlFragment
7171
PostgREST.Query.Statements
72+
PostgREST.OpenTelemetry
7273
PostgREST.Plan
7374
PostgREST.Plan.CallPlan
7475
PostgREST.Plan.MutatePlan
@@ -113,6 +114,9 @@ library
113114
, hasql-transaction >= 1.0.1 && < 1.1
114115
, heredoc >= 0.2 && < 0.3
115116
, http-types >= 0.12.2 && < 0.13
117+
, hs-opentelemetry-sdk >= 0.0.3.6 && < 0.0.4
118+
, hs-opentelemetry-instrumentation-wai
119+
, hs-opentelemetry-utils-exceptions
116120
, insert-ordered-containers >= 0.2.2 && < 0.3
117121
, interpolatedstring-perl6 >= 1 && < 1.1
118122
, jose >= 0.8.5.1 && < 0.12
@@ -257,6 +261,7 @@ test-suite spec
257261
, hasql-pool >= 0.10 && < 0.11
258262
, hasql-transaction >= 1.0.1 && < 1.1
259263
, heredoc >= 0.2 && < 0.3
264+
, hs-opentelemetry-sdk >= 0.0.3.6 && < 0.0.4
260265
, hspec >= 2.3 && < 2.12
261266
, hspec-wai >= 0.10 && < 0.12
262267
, hspec-wai-json >= 0.10 && < 0.12

src/PostgREST/App.hs

Lines changed: 62 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ import qualified PostgREST.Unix as Unix (installSignalHandlers)
4545

4646
import PostgREST.ApiRequest (Action (..), ApiRequest (..),
4747
Mutation (..), Target (..))
48-
import PostgREST.AppState (AppState)
48+
import PostgREST.AppState (AppState, getOTelTracer)
4949
import PostgREST.Auth (AuthResult (..))
5050
import PostgREST.Config (AppConfig (..))
5151
import PostgREST.Config.PgVersion (PgVersion (..))
52-
import PostgREST.Error (Error)
52+
import PostgREST.Error (Error (..))
5353
import PostgREST.Observation (Observation (..))
5454
import PostgREST.Query (DbHandler)
5555
import PostgREST.Response.Performance (ServerTiming (..),
@@ -58,12 +58,15 @@ import PostgREST.SchemaCache (SchemaCache (..))
5858
import PostgREST.SchemaCache.Routine (Routine (..))
5959
import PostgREST.Version (docsVersion, prettyVersion)
6060

61-
import qualified Data.ByteString.Char8 as BS
62-
import qualified Data.List as L
63-
import qualified Network.HTTP.Types as HTTP
64-
import qualified Network.Socket as NS
65-
import Protolude hiding (Handler)
66-
import System.TimeIt (timeItT)
61+
import qualified Data.ByteString.Char8 as BS
62+
import qualified Data.List as L
63+
import qualified Network.HTTP.Types as HTTP
64+
import qualified Network.Socket as NS
65+
import OpenTelemetry.Instrumentation.Wai (newOpenTelemetryWaiMiddleware)
66+
import OpenTelemetry.Trace (defaultSpanArguments)
67+
import OpenTelemetry.Utils.Exceptions (inSpanM)
68+
import Protolude hiding (Handler)
69+
import System.TimeIt (timeItT)
6770

6871
type Handler = ExceptT Error
6972

@@ -88,7 +91,9 @@ run appState observer = do
8891
port <- NS.socketPort $ AppState.getSocketREST appState
8992
observer $ AppServerPortObs port
9093

91-
Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) app
94+
oTelMWare <- newOpenTelemetryWaiMiddleware
95+
96+
Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) (oTelMWare app)
9297

9398
serverSettings :: AppConfig -> Warp.Settings
9499
serverSettings AppConfig{..} =
@@ -106,27 +111,28 @@ postgrest conf appState connWorker observer =
106111
Logger.middleware (configLogLevel conf) $
107112
-- fromJust can be used, because the auth middleware will **always** add
108113
-- some AuthResult to the vault.
109-
\req respond -> case fromJust $ Auth.getResult req of
110-
Left err -> respond $ Error.errorResponseFor err
111-
Right authResult -> do
112-
appConf <- AppState.getConfig appState -- the config must be read again because it can reload
113-
maybeSchemaCache <- AppState.getSchemaCache appState
114-
pgVer <- AppState.getPgVersion appState
115-
116-
let
117-
eitherResponse :: IO (Either Error Wai.Response)
118-
eitherResponse =
119-
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req observer
120-
121-
response <- either Error.errorResponseFor identity <$> eitherResponse
122-
-- Launch the connWorker when the connection is down. The postgrest
123-
-- function can respond successfully (with a stale schema cache) before
124-
-- the connWorker is done.
125-
when (isServiceUnavailable response) connWorker
126-
resp <- do
127-
delay <- AppState.getRetryNextIn appState
128-
return $ addRetryHint delay response
129-
respond resp
114+
\req respond -> inSpanM (getOTelTracer appState) "respond" defaultSpanArguments $
115+
case fromJust $ Auth.getResult req of
116+
Left err -> respond $ Error.errorResponseFor err
117+
Right authResult -> do
118+
appConf <- AppState.getConfig appState -- the config must be read again because it can reload
119+
maybeSchemaCache <- AppState.getSchemaCache appState
120+
pgVer <- AppState.getPgVersion appState
121+
122+
let
123+
eitherResponse :: IO (Either Error Wai.Response)
124+
eitherResponse = inSpanM (getOTelTracer appState) "eitherResponse" defaultSpanArguments $
125+
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req observer
126+
127+
response <- either Error.errorResponseFor identity <$> eitherResponse
128+
-- Launch the connWorker when the connection is down. The postgrest
129+
-- function can respond successfully (with a stale schema cache) before
130+
-- the connWorker is done.
131+
when (isServiceUnavailable response) connWorker
132+
resp <- do
133+
delay <- AppState.getRetryNextIn appState
134+
return $ addRetryHint delay response
135+
respond resp
130136

131137
postgrestResponse
132138
:: AppState.AppState
@@ -172,54 +178,54 @@ handleRequest :: AuthResult -> AppConfig -> AppState.AppState -> Bool -> Bool ->
172178
handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@ApiRequest{..} sCache jwtTime parseTime observer =
173179
case (iAction, iTarget) of
174180
(ActionRead headersOnly, TargetIdent identifier) -> do
175-
(planTime', wrPlan) <- withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
176-
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.wrTxMode wrPlan) $ Query.readQuery wrPlan conf apiReq
177-
(respTime', pgrst) <- withTiming $ liftEither $ Response.readResponse wrPlan headersOnly identifier apiReq resultSet
181+
(planTime', wrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.wrappedReadPlan identifier conf sCache apiReq
182+
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.wrTxMode wrPlan) $ Query.readQuery wrPlan conf apiReq
183+
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.readResponse wrPlan headersOnly identifier apiReq resultSet
178184
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
179185

180186
(ActionMutate MutationCreate, TargetIdent identifier) -> do
181-
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationCreate apiReq identifier conf sCache
182-
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.createQuery mrPlan apiReq conf
183-
(respTime', pgrst) <- withTiming $ liftEither $ Response.createResponse identifier mrPlan apiReq resultSet
187+
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationCreate apiReq identifier conf sCache
188+
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.mrTxMode mrPlan) $ Query.createQuery mrPlan apiReq conf
189+
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.createResponse identifier mrPlan apiReq resultSet
184190
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
185191

186192
(ActionMutate MutationUpdate, TargetIdent identifier) -> do
187-
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationUpdate apiReq identifier conf sCache
188-
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.updateQuery mrPlan apiReq conf
189-
(respTime', pgrst) <- withTiming $ liftEither $ Response.updateResponse mrPlan apiReq resultSet
193+
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationUpdate apiReq identifier conf sCache
194+
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.mrTxMode mrPlan) $ Query.updateQuery mrPlan apiReq conf
195+
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.updateResponse mrPlan apiReq resultSet
190196
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
191197

192198
(ActionMutate MutationSingleUpsert, TargetIdent identifier) -> do
193-
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationSingleUpsert apiReq identifier conf sCache
194-
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.singleUpsertQuery mrPlan apiReq conf
195-
(respTime', pgrst) <- withTiming $ liftEither $ Response.singleUpsertResponse mrPlan apiReq resultSet
199+
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationSingleUpsert apiReq identifier conf sCache
200+
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.mrTxMode mrPlan) $ Query.singleUpsertQuery mrPlan apiReq conf
201+
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.singleUpsertResponse mrPlan apiReq resultSet
196202
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
197203

198204
(ActionMutate MutationDelete, TargetIdent identifier) -> do
199-
(planTime', mrPlan) <- withTiming $ liftEither $ Plan.mutateReadPlan MutationDelete apiReq identifier conf sCache
200-
(txTime', resultSet) <- withTiming $ runQuery roleIsoLvl mempty (Plan.mrTxMode mrPlan) $ Query.deleteQuery mrPlan apiReq conf
201-
(respTime', pgrst) <- withTiming $ liftEither $ Response.deleteResponse mrPlan apiReq resultSet
205+
(planTime', mrPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.mutateReadPlan MutationDelete apiReq identifier conf sCache
206+
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.mrTxMode mrPlan) $ Query.deleteQuery mrPlan apiReq conf
207+
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.deleteResponse mrPlan apiReq resultSet
202208
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
203209

204210
(ActionInvoke invMethod, TargetProc identifier _) -> do
205-
(planTime', cPlan) <- withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq invMethod
206-
(txTime', resultSet) <- withTiming $ runQuery (fromMaybe roleIsoLvl $ pdIsoLvl (Plan.crProc cPlan)) (pdFuncSettings $ Plan.crProc cPlan) (Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
207-
(respTime', pgrst) <- withTiming $ liftEither $ Response.invokeResponse cPlan invMethod (Plan.crProc cPlan) apiReq resultSet
211+
(planTime', cPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq invMethod
212+
(txTime', resultSet) <- withOTel "query" $ withTiming $ runQuery (fromMaybe roleIsoLvl $ pdIsoLvl (Plan.crProc cPlan)) (pdFuncSettings $ Plan.crProc cPlan) (Plan.crTxMode cPlan) $ Query.invokeQuery (Plan.crProc cPlan) cPlan apiReq conf pgVer
213+
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.invokeResponse cPlan invMethod (Plan.crProc cPlan) apiReq resultSet
208214
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
209215

210216
(ActionInspect headersOnly, TargetDefaultSpec tSchema) -> do
211-
(planTime', iPlan) <- withTiming $ liftEither $ Plan.inspectPlan apiReq
212-
(txTime', oaiResult) <- withTiming $ runQuery roleIsoLvl mempty (Plan.ipTxmode iPlan) $ Query.openApiQuery sCache pgVer conf tSchema
213-
(respTime', pgrst) <- withTiming $ liftEither $ Response.openApiResponse (T.decodeUtf8 prettyVersion, docsVersion) headersOnly oaiResult conf sCache iSchema iNegotiatedByProfile
217+
(planTime', iPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.inspectPlan apiReq
218+
(txTime', oaiResult) <- withOTel "query" $ withTiming $ runQuery roleIsoLvl [] (Plan.ipTxmode iPlan) $ Query.openApiQuery sCache pgVer conf tSchema
219+
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.openApiResponse (T.decodeUtf8 prettyVersion, docsVersion) headersOnly oaiResult conf sCache iSchema iNegotiatedByProfile
214220
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' txTime' respTime') pgrst
215221

216222
(ActionInfo, TargetIdent identifier) -> do
217-
(respTime', pgrst) <- withTiming $ liftEither $ Response.infoIdentResponse identifier sCache
223+
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.infoIdentResponse identifier sCache
218224
return $ pgrstResponse (ServerTiming jwtTime parseTime Nothing Nothing respTime') pgrst
219225

220226
(ActionInfo, TargetProc identifier _) -> do
221-
(planTime', cPlan) <- withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq ApiRequest.InvHead
222-
(respTime', pgrst) <- withTiming $ liftEither $ Response.infoProcResponse (Plan.crProc cPlan)
227+
(planTime', cPlan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.callReadPlan identifier conf sCache apiReq ApiRequest.InvHead
228+
(respTime', pgrst) <- withOTel "response" $ withTiming $ liftEither $ Response.infoProcResponse (Plan.crProc cPlan)
223229
return $ pgrstResponse (ServerTiming jwtTime parseTime planTime' Nothing respTime') pgrst
224230

225231
(ActionInfo, TargetDefaultSpec _) -> do
@@ -244,6 +250,8 @@ handleRequest AuthResult{..} conf appState authenticated prepared pgVer apiReq@A
244250

245251
withTiming = calcTiming $ configServerTimingEnabled conf
246252

253+
withOTel label = inSpanM (getOTelTracer appState) label defaultSpanArguments
254+
247255
calcTiming :: Bool -> Handler IO a -> Handler IO (Maybe Double, a)
248256
calcTiming timingEnabled f = if timingEnabled
249257
then do

src/PostgREST/AppState.hs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ module PostgREST.AppState
1616
, getJwtCache
1717
, getSocketREST
1818
, getSocketAdmin
19+
, getOTelTracer
1920
, init
2021
, initSockets
2122
, initWithPool
@@ -71,6 +72,7 @@ import PostgREST.Unix (createAndBindDomainSocket)
7172

7273
import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
7374
import Data.String (IsString (..))
75+
import OpenTelemetry.Trace (Tracer)
7476
import Protolude
7577

7678
data AuthResult = AuthResult
@@ -107,19 +109,21 @@ data AppState = AppState
107109
, stateSocketREST :: NS.Socket
108110
-- | Network socket for the admin UI
109111
, stateSocketAdmin :: Maybe NS.Socket
112+
-- | OpenTelemetry tracer
113+
, oTelTracer :: Tracer
110114
}
111115

112116
type AppSockets = (NS.Socket, Maybe NS.Socket)
113117

114-
init :: AppConfig -> (Observation -> IO ()) -> IO AppState
115-
init conf observer = do
118+
init :: AppConfig -> Tracer -> (Observation -> IO ()) -> IO AppState
119+
init conf tracer observer = do
116120
pool <- initPool conf
117121
(sock, adminSock) <- initSockets conf
118-
state' <- initWithPool (sock, adminSock) pool conf observer
119-
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock }
122+
state' <- initWithPool (sock, adminSock) pool tracer conf observer
123+
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock}
120124

121-
initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> (Observation -> IO() ) -> IO AppState
122-
initWithPool (sock, adminSock) pool conf observer = do
125+
initWithPool :: AppSockets -> SQL.Pool -> Tracer -> AppConfig -> (Observation -> IO() ) -> IO AppState
126+
initWithPool (sock, adminSock) pool tracer conf observer = do
123127
appState <- AppState pool
124128
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
125129
<*> newIORef Nothing
@@ -134,6 +138,7 @@ initWithPool (sock, adminSock) pool conf observer = do
134138
<*> C.newCache Nothing
135139
<*> pure sock
136140
<*> pure adminSock
141+
<*> pure tracer
137142

138143

139144
debPoolTimeout <-
@@ -263,6 +268,9 @@ getSocketREST = stateSocketREST
263268
getSocketAdmin :: AppState -> Maybe NS.Socket
264269
getSocketAdmin = stateSocketAdmin
265270

271+
getOTelTracer :: AppState -> Tracer
272+
getOTelTracer = oTelTracer
273+
266274
getMainThreadId :: AppState -> ThreadId
267275
getMainThreadId = stateMainThreadId
268276

src/PostgREST/CLI.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ import qualified Options.Applicative as O
1717
import Data.Text.IO (hPutStrLn)
1818
import Text.Heredoc (str)
1919

20-
import PostgREST.AppState (AppState)
21-
import PostgREST.Config (AppConfig (..))
22-
import PostgREST.SchemaCache (querySchemaCache)
23-
import PostgREST.Version (prettyVersion)
20+
import PostgREST.AppState (AppState)
21+
import PostgREST.Config (AppConfig (..))
22+
import PostgREST.OpenTelemetry (withTracer)
23+
import PostgREST.SchemaCache (querySchemaCache)
24+
import PostgREST.Version (prettyVersion)
2425

2526
import qualified PostgREST.App as App
2627
import qualified PostgREST.AppState as AppState
@@ -29,9 +30,8 @@ import qualified PostgREST.Logger as Logger
2930

3031
import Protolude hiding (hPutStrLn)
3132

32-
3333
main :: CLI -> IO ()
34-
main CLI{cliCommand, cliPath} = do
34+
main CLI{cliCommand, cliPath} = withTracer "PostgREST" $ \tracer -> do
3535
conf@AppConfig{..} <-
3636
either panic identity <$> Config.readAppConfig mempty cliPath Nothing mempty mempty
3737

@@ -40,7 +40,7 @@ main CLI{cliCommand, cliPath} = do
4040
-- explicitly close the connections to PostgreSQL on shutdown.
4141
-- 'AppState.destroy' takes care of that.
4242
bracket
43-
(AppState.init conf $ Logger.logObservation loggerState)
43+
(AppState.init conf tracer $ Logger.logObservation loggerState)
4444
AppState.destroy
4545
(\appState -> case cliCommand of
4646
CmdDumpConfig -> do

src/PostgREST/OpenTelemetry.hs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
module PostgREST.OpenTelemetry (withTracer) where
2+
3+
import OpenTelemetry.Trace (InstrumentationLibrary (..), Tracer,
4+
initializeGlobalTracerProvider,
5+
makeTracer, shutdownTracerProvider,
6+
tracerOptions)
7+
import PostgREST.Version (prettyVersion)
8+
import Protolude
9+
10+
withTracer :: Text -> (Tracer -> IO c) -> IO c
11+
withTracer label f = bracket
12+
initializeGlobalTracerProvider
13+
shutdownTracerProvider
14+
(\tracerProvider -> f $ makeTracer tracerProvider instrumentationLibrary tracerOptions)
15+
where
16+
instrumentationLibrary = InstrumentationLibrary {libraryName = label, libraryVersion = decodeUtf8 prettyVersion}

0 commit comments

Comments
 (0)