Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 195 additions & 10 deletions src/hydra-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import {
hydraUTxO,
hydraUTxOs,
ServerOutput,
} from "./types";
ConnectionState,
} from ".";
import { handleHydraErrors } from "./types/events/handler";
import {
PostTxOnChainFailed,
Expand All @@ -52,6 +53,11 @@ export class HydraProvider implements IFetcher, ISubmitter {
| ((data: ServerOutput | ClientMessage) => void)
| null = null;
private _messageQueue: (ServerOutput | ClientMessage)[] = [];
private _disconnectTimeout: NodeJS.Timeout | null = null;
private _isDisconnecting: boolean = false;
private _currentStatus: hydraStatus = "IDLE";
private _connectionState: ConnectionState = "IDLE";
private _connectingPromise: Promise<boolean> | null = null;

constructor({
httpUrl,
Expand Down Expand Up @@ -86,27 +92,134 @@ export class HydraProvider implements IFetcher, ISubmitter {
}
},
);

this._eventEmitter.on("onstatuschange", (status: hydraStatus) => {
this._currentStatus = status;
});
}

/**
* Connects to the Hydra Head.
*/
async connect() {
this._connection.connect();
* Connects to the Hydra Head socket only.
*/
async connect(): Promise<void> {
try {
await this._connection.connect();
} catch (error) {
throw new Error(
`Failed to connect to Hydra Head: ${error instanceof Error ? error.message : String(error)
}`,
);
}
}
/**
* Connects (if needed) and waits until Hydra confirms readiness via "Greetings".
*
* - Idempotent
* - No dangling handlers
* - Accurate state tracking
*/
async isConnected(timeoutMs = 30_000): Promise<boolean> {
// Fast path
if (this._connectionState === "CONNECTED") {
return true;
}

// Reuse in-flight connection
if (this._connectingPromise) {
return this._connectingPromise;
}

this._connectingPromise = new Promise<boolean>(async (resolve, reject) => {
let timeout: NodeJS.Timeout;

const finalize = (state: ConnectionState, result?: boolean, error?: Error) => {
clearTimeout(timeout);
this._connectingPromise = null;
this._connectionState = state;

if (error) reject(error);
else resolve(result ?? false);
};

try {
await this.connect();
} catch (err) {
finalize("FAILED", false, err as Error);
return;
}

timeout = setTimeout(() => {
finalize(
"FAILED",
false,
new Error("Connection timed out: no Greetings from Hydra node"),
);
}, timeoutMs);

this.onMessage((msg) => {
if (this._connectionState !== "CONNECTING") return;

if (msg.tag === "Greetings") {
finalize("CONNECTED", true);
return;
}

if (handleHydraErrors(msg as ClientMessage, (err) => {
finalize("FAILED", false, err);
})) {
return;
}
});
});

return this._connectingPromise;
}


/**
* Disconnects from the Hydra Head.
*
* @param timeout Optional timeout in milliseconds (defaults to 5 minutes) to wait for the disconnect operation to complete.
* If set to 0, disconnects immediately (reactive to clicks/events).
* If not provided, the default disconnect timeout will be used.
* Useful for customizing how long to wait before disconnecting.
* @throws {Error} If timeout is less than 0 or between 1 and 59,999 ms
*/
async disconnect(timeout: number = 300_000) {
if (timeout < 60_000) {
throw new Error("Timeout must be at least 60,000 ms (1 minute)");
if (timeout < 0) {
throw new Error("Timeout must be a non-negative number");
}
if (timeout > 0 && timeout < 60_000) {
throw new Error(
"Timeout must be at least 60,000 ms (1 minute) or 0 for immediate disconnect",
);
}

const clearPendingTimeout = () => {
if (this._disconnectTimeout) {
clearTimeout(this._disconnectTimeout);
this._disconnectTimeout = null;
}
};

if (timeout === 0) {
clearPendingTimeout();
this._isDisconnecting = false;
await this._connection.disconnect(0);
return;
}
await this._connection.disconnect(timeout);

if (this._isDisconnecting) return;

this._isDisconnecting = true;
this._disconnectTimeout = setTimeout(async () => {
try {
await this._connection.disconnect(0);
} finally {
this._disconnectTimeout = null;
this._isDisconnecting = false;
}
}, timeout);
}

/**
Expand Down Expand Up @@ -139,6 +252,8 @@ export class HydraProvider implements IFetcher, ISubmitter {
}
if (handleHydraErrors(msg as ClientMessage, reject)) {
return;
} else {
reject(new Error("Failed to initialize, head is not in Idle state"));
}
});
});
Expand All @@ -161,6 +276,10 @@ export class HydraProvider implements IFetcher, ISubmitter {
}
if (handleHydraErrors(msg as ClientMessage, reject)) {
return;
} else {
reject(
new Error("Failed to abort, head is not in Initializing state"),
);
}
});
});
Expand Down Expand Up @@ -203,6 +322,12 @@ export class HydraProvider implements IFetcher, ISubmitter {
`Transaction invalid: ${JSON.stringify(msg.validationError)}`,
),
);
} else {
reject(
new Error(
"Failed to submit transaction, head is not in Open state",
),
);
}
});
});
Expand Down Expand Up @@ -233,6 +358,12 @@ export class HydraProvider implements IFetcher, ISubmitter {
}
if (handleHydraErrors(msg as ClientMessage, reject)) {
return;
} else {
reject(
new Error(
"Failed to recover transaction, head is not in Open state",
),
);
}
});
});
Expand Down Expand Up @@ -270,6 +401,8 @@ export class HydraProvider implements IFetcher, ISubmitter {
}
if (handleHydraErrors(msg as ClientMessage, reject)) {
return;
} else {
reject(new Error("Failed to decommit, head is not in Open state"));
}
});
});
Expand Down Expand Up @@ -303,6 +436,8 @@ export class HydraProvider implements IFetcher, ISubmitter {
if (handleHydraErrors(message as ClientMessage, reject)) {
reject(new Error("Failed to close head"));
return;
} else {
reject(new Error("Failed to close, head is not in Open state"));
}
});
});
Expand All @@ -325,6 +460,8 @@ export class HydraProvider implements IFetcher, ISubmitter {
if (handleHydraErrors(msg as ClientMessage, reject)) {
reject(new Error("Failed to contest head"));
return;
} else {
reject(new Error("Failed to contest, head is not in Closed state"));
}
});
});
Expand All @@ -349,6 +486,8 @@ export class HydraProvider implements IFetcher, ISubmitter {
if (handleHydraErrors(msg as ClientMessage, reject)) {
reject(new Error("Failed to fanout head"));
return;
} else {
reject(new Error("Failed to fanout, head is not in Closed state"));
}
});
});
Expand Down Expand Up @@ -738,16 +877,62 @@ export class HydraProvider implements IFetcher, ISubmitter {
return protocolParams;
}

/**
* Registers a callback to receive messages from the Hydra Head.
* When called, the callback will immediately be invoked for all messages that have already been received
* (queued in the message queue), and subsequently for each new incoming message.
*
* @param callback - The function to call with each ServerOutput or ClientMessage received.
*
* @example
* ```ts
* hydraProvider.onMessage((message) => {
* console.log("Received Hydra message:", message);
* });
* ```
*/
onMessage(callback: (data: ServerOutput | ClientMessage) => void) {
this._messageCallback = callback;
this._messageQueue.forEach((message) => {
callback(message);
});
}
onStatusChange(callback: (status: hydraStatus) => void) {
this._eventEmitter.on("onstatuschange", (status) => {

/**
* Subscribe to status changes of the Hydra Head.
* The callback will be called whenever the status changes.
*
* @param callback Function to call when status changes, receives the new hydraStatus
* @returns The current status
*
* @example
* ```ts
* hydraProvider.onStatusChange((status) => {
* console.log(`Hydra Head status changed to: ${status}`);
* });
* ```
*/
onStatusChange(callback: (status: hydraStatus) => void): hydraStatus {
this._eventEmitter.on("onstatuschange", (status: hydraStatus) => {
this._currentStatus = status;
callback(status);
});
return this._currentStatus;
}

/**
* Get the current status of the Hydra Head.
*
* @returns The current hydraStatus
*
* @example
* ```ts
* const currentStatus = hydraProvider.getStatus();
* console.log(`Current status: ${currentStatus}`);
* ```
*/
getStatus(): hydraStatus {
return this._currentStatus;
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./hydra-instance";
export * from "./hydra-provider";
export * from "./types"
6 changes: 6 additions & 0 deletions src/types/events/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export type ConnectionState =
| "IDLE"
| "CONNECTING"
| "CONNECTED"
| "FAILED"
| "DISCONNECTED";
3 changes: 2 additions & 1 deletion src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ export * from "./hydra/hydraTransaction";
export * from "./hydra/hydraUTxOs";
export * from "./client-input";
export * from "./client-message";
export * from "./server-output";
export * from "./server-output";
export * from "./events/connection";