Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in rpc client / test #1829

Merged
merged 13 commits into from
Oct 12, 2023
6 changes: 3 additions & 3 deletions .github/workflows/ts-rpc-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ jobs:

- name: Run Create Channels script
# TODO: We could write a test specific script that creates channels and checks the results
run: npx ts-node ./scripts/client-runner.ts create-channels -w 300000 &> output.log
bitwiseguy marked this conversation as resolved.
Show resolved Hide resolved
run: npx ts-node ./scripts/client-runner.ts create-channels -w 300000
working-directory: packages/nitro-rpc-client

- name: Archive logs
if: always()
uses: actions/upload-artifact@v2
with:
name: logs
path: ./**/*.log
name: rpc server logs
path: ./output.log
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/BurntSushi/toml v1.3.2
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/libp2p/go-libp2p-kad-dht v0.24.2
github.com/lmittmann/tint v1.0.2
github.com/tidwall/buntdb v1.2.10
github.com/urfave/cli/v2 v2.25.3

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQsc
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
github.com/libp2p/go-yamux/v4 v4.0.1 h1:FfDR4S1wj6Bw2Pqbc8Uz7pCxeRBPbwsBbEdfwiCypkQ=
github.com/libp2p/go-yamux/v4 v4.0.1/go.mod h1:NWjl8ZTLOGlozrXSOZ/HlfG++39iKNnM5wwmtQP1YB4=
github.com/lmittmann/tint v1.0.2 h1:9XZ+JvEzjvd3VNVugYqo3j+dl0NRju8k9FquAusJExM=
github.com/lmittmann/tint v1.0.2/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down
7 changes: 6 additions & 1 deletion internal/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"log/slog"
"os"
"path/filepath"
"time"

"github.com/lmittmann/tint"
"github.com/statechannels/go-nitro/protocols"
"github.com/statechannels/go-nitro/types"
)
Expand Down Expand Up @@ -64,6 +66,9 @@ func SetupDefaultFileLogger(filename string, level slog.Level) {

// SetupDefaultLogger sets up a default logger that writes to the specified writer
func SetupDefaultLogger(w io.Writer, level slog.Level) {
h := slog.NewJSONHandler(w, &slog.HandlerOptions{Level: level})
h := tint.NewHandler(w, &tint.Options{
Level: level,
TimeFormat: time.Kitchen,
})
Comment on lines +69 to +72
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gives output like this:
Screenshot 2023-10-12 at 10 52 14

slog.SetDefault(slog.New(h))
}
21 changes: 8 additions & 13 deletions packages/nitro-rpc-client/scripts/client-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,10 @@ yargs(hideBin(process.argv))
);

await Promise.all([
aliceClient.WaitForObjective(aliceLedger.Id),
bobClient.WaitForObjective(bobLedger.Id),
aliceClient.WaitForLedgerChannelStatus(aliceLedger.ChannelId, "Open"),
bobClient.WaitForLedgerChannelStatus(bobLedger.ChannelId, "Open"),
]);

console.log(`Ledger channel ${bobLedger.ChannelId} created`);
console.log(`Ledger channel ${aliceLedger.ChannelId} created`);
}
Expand All @@ -194,7 +195,7 @@ yargs(hideBin(process.argv))
[ireneAddress],
yargs.virtualdeposit
);
await aliceClient.WaitForObjective(res.Id);
await aliceClient.WaitForPaymentChannelStatus(res.ChannelId, "Open");
console.log(`Virtual channel ${res.ChannelId} created`);
virtualChannels.push(res.ChannelId);
}
Expand All @@ -217,11 +218,9 @@ yargs(hideBin(process.argv))
break;
}

const res = await aliceClient.ClosePaymentChannel(channelId);
await aliceClient.WaitForObjective(res);
console.log(
`Virtual channel ${getChannelIdFromObjectiveId(res)} closed`
);
await aliceClient.ClosePaymentChannel(channelId);
await aliceClient.WaitForPaymentChannelStatus(channelId, "Complete");
console.log(`Virtual channel ${channelId} closed`);
closeCount++;
}

Expand Down Expand Up @@ -269,7 +268,7 @@ yargs(hideBin(process.argv))
rightAddress,
1_000_000
);
await leftClient.WaitForObjective(ledger.Id);
await leftClient.WaitForLedgerChannelStatus(ledger.ChannelId, "Open");
console.log(`Ledger channel ${ledger.ChannelId} created`);

await closeClients(clients);
Expand All @@ -286,10 +285,6 @@ async function wait(ms: number) {
await new Promise((res) => setTimeout(res, ms));
}

function getChannelIdFromObjectiveId(objectiveId: string): string {
return objectiveId.split("-")[1];
}

// Waits for the RPC server to be available by sending a simple get_address POST request until we get a response
async function waitForRPCServer(
port: number,
Expand Down
20 changes: 10 additions & 10 deletions packages/nitro-rpc-client/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ yargs(hideBin(process.argv))
yargs.counterparty,
yargs.amount
);
const { Id } = dfObjective;
const { Id, ChannelId } = dfObjective;

console.log(`Objective started ${Id}`);
await rpcClient.WaitForObjective(Id);
console.log(`Objective complete ${Id}`);
await rpcClient.WaitForLedgerChannelStatus(ChannelId, "Open");
console.log(`Channel Open ${ChannelId}`);
await rpcClient.Close();
process.exit(0);
}
Expand All @@ -154,8 +154,8 @@ yargs(hideBin(process.argv))

const id = await rpcClient.CloseLedgerChannel(yargs.channelId);
console.log(`Objective started ${id}`);
await rpcClient.WaitForObjective(id);
console.log(`Objective complete ${id}`);
await rpcClient.WaitForPaymentChannelStatus(yargs.channelId, "Complete");
console.log(`Channel Complete ${yargs.channelId}`);
await rpcClient.Close();
process.exit(0);
}
Expand Down Expand Up @@ -200,10 +200,10 @@ yargs(hideBin(process.argv))
yargs.amount
);

const { Id } = vfObjective;
const { ChannelId, Id } = vfObjective;
console.log(`Objective started ${Id}`);
await rpcClient.WaitForObjective(Id);
console.log(`Objective complete ${Id}`);
await rpcClient.WaitForPaymentChannelStatus(ChannelId, "Open");
console.log(`Channel Open ${ChannelId}`);
await rpcClient.Close();
process.exit(0);
}
Expand All @@ -230,8 +230,8 @@ yargs(hideBin(process.argv))
const id = await rpcClient.ClosePaymentChannel(yargs.channelId);

console.log(`Objective started ${id}`);
await rpcClient.WaitForObjective(id);
console.log(`Objective complete ${id}`);
await rpcClient.WaitForPaymentChannelStatus(yargs.channelId, "Complete");
console.log(`Channel complete ${yargs.channelId}`);
await rpcClient.Close();
process.exit(0);
}
Expand Down
21 changes: 18 additions & 3 deletions packages/nitro-rpc-client/src/interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
ChannelStatus,
LedgerChannelInfo,
ObjectiveResponse,
PaymentChannelInfo,
Expand Down Expand Up @@ -100,11 +101,25 @@ interface paymentApi {

interface syncAPI {
/**
* WaitForObjective blocks until the objective with the given ID to complete.
* WaitForLedgerChannelStatus blocks until the ledger channel with the given ID to have the given status.
*
* @param objectiveId - The id objective to wait for
* @param objectiveId - The channel id to wait for
* @param status - The channel id to wait for (e.g. Ready or Closing)
*/
WaitForLedgerChannelStatus(
objectiveId: string,
status: ChannelStatus
): Promise<void>;
/**
* WaitForPaymentChannelStatus blocks until the payment channel with the given ID to have the given status.
*
* @param objectiveId - The channel id to wait for
* @param status - The channel id to wait for (e.g. Ready or Closing)
*/
WaitForObjective(objectiveId: string): Promise<void>;
WaitForPaymentChannelStatus(
objectiveId: string,
status: ChannelStatus
): Promise<void>;
/**
* PaymentChannelUpdated attaches a callback which is triggered when the channel with supplied ID is updated.
* Returns a cleanup function which can be used to remove the subscription.
Expand Down
46 changes: 39 additions & 7 deletions packages/nitro-rpc-client/src/rpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import {
RequestMethod,
RPCRequestAndResponses,
ObjectiveResponse,
ObjectiveCompleteNotification,
Voucher,
ReceiveVoucherResult,
ChannelStatus,
LedgerChannelUpdatedNotification,
PaymentChannelUpdatedNotification,
} from "./types";
import { Transport } from "./transport";
import { createOutcome, generateRequest } from "./utils";
Expand Down Expand Up @@ -57,17 +59,47 @@ export class NitroRpcClient implements RpcClientApi {
return getAndValidateResult(res, "receive_voucher");
}

public async WaitForObjective(objectiveId: string): Promise<void> {
return new Promise((resolve) => {
public async WaitForLedgerChannelStatus(
channelId: string,
status: ChannelStatus
): Promise<void> {
const promise = new Promise<void>((resolve) => {
this.transport.Notifications.on(
"ledger_channel_updated",
(payload: LedgerChannelUpdatedNotification["params"]["payload"]) => {
if (payload.ID === channelId) {
this.GetLedgerChannel(channelId).then((l) => {
if (l.Status == status) resolve();
});
}
}
);
});
const ledger = await this.GetLedgerChannel(channelId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there need for both this GetLedgerChannel call and the same one above nested inside the Notifications handler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am trying to do here is make it impossible for there to be a race condition. So the approach is,
Step 1: register a handler in case the event has not yet happened (if it already happened, this will never fire)
Step 2: make a static query in case the event has already happened.

Which should result in the promise resolving whenever the event fires / was fired.

Does that make sense?

if (ledger.Status == status) return;
return promise;
}

public async WaitForPaymentChannelStatus(
channelId: string,
status: ChannelStatus
): Promise<void> {
const promise = new Promise<void>((resolve) => {
this.transport.Notifications.on(
"objective_completed",
(params: ObjectiveCompleteNotification["params"]) => {
if (params["payload"] === objectiveId) {
resolve();
"payment_channel_updated",
(payload: PaymentChannelUpdatedNotification["params"]["payload"]) => {
if (payload.ID === channelId) {
this.GetPaymentChannel(channelId).then((l) => {
if (l.Status == status) resolve();
});
}
}
);
});

const channel = await this.GetPaymentChannel(channelId);
if (channel.Status == status) return;
return promise;
}

public onPaymentChannelUpdated(
Expand Down
2 changes: 1 addition & 1 deletion packages/nitro-rpc-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,4 @@ export type AssetMetadata = {
Metadata: null;
};

export type ChannelStatus = "Proposed" | "Ready" | "Closing" | "Complete";
export type ChannelStatus = "Proposed" | "Open" | "Closing" | "Complete";
3 changes: 0 additions & 3 deletions packages/payment-proxy-client/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ export default function App() {
initialChannelBalance
);

// TODO: If the objective completes fast enough, we might start waiting after it's already done
// await nitroClient.WaitForObjective(result.Id);

setPaymentChannelId(result.ChannelId);

nitroClient.onPaymentChannelUpdated(
Expand Down
Loading