Skip to content

Commit

Permalink
fix: offramp watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
justinenerio committed Jan 11, 2024
1 parent 3e56d3e commit 29e9aff
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 36 deletions.
6 changes: 5 additions & 1 deletion packages/espressocash_app/lib/data/db/db.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class OutgoingTransferRows extends Table {
Set<Column<Object>> get primaryKey => {id};
}

const int latestVersion = 46;
const int latestVersion = 47;

const _tables = [
OutgoingTransferRows,
Expand Down Expand Up @@ -203,6 +203,9 @@ class MyDatabase extends _$MyDatabase {
if (from >= 39 && from < 46) {
await m.addColumn(iLPRows, iLPRows.feeAmount);
}
if (from >= 40 && from < 47) {
await m.addColumn(offRampOrderRows, offRampOrderRows.txId);
}
},
);

Expand Down Expand Up @@ -259,6 +262,7 @@ class OffRampOrderRows extends Table with AmountMixin, EntityMixin {
TextColumn get humanStatus => text()();
TextColumn get machineStatus => text()();
TextColumn get partnerOrderId => text()();
TextColumn get txId => text().nullable()();
TextColumn get transaction => text()();
TextColumn get depositAddress => text()();
Int64Column get slot => int64()();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class TransactionRepository {
);

final offRamp = _db.offRampOrderRows.findActivityOrNull(
where: (row) => row.transaction.contains(txId),
where: (row) => row.txId.equals(txId),
builder: (pr) => Activity.offRamp(id: pr.id, created: pr.created),
ignoreWhen: (row) => const [
OffRampOrderStatus.completed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ class CoinflowOffRampOrderWatcher implements RampWatcher {
)
.listen((event) async {
final statement = _db.update(_db.offRampOrderRows)
..where(
(tbl) =>
tbl.id.equals(orderId) &
tbl.status.equals(OffRampOrderStatus.waitingForPartner.name),
);
..where((tbl) => tbl.id.equals(orderId));

final status = switch (event?.status) {
CoinflowOrderStatus.completed => OffRampOrderStatus.completed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,21 @@ class OffRampOrderScreenContent extends StatelessWidget {
OffRampOrderStatus.cancelled => CpTimelineStatus.neutral,
};

final animated = timelineStatus == CpTimelineStatus.inProgress &&
order.status != OffRampOrderStatus.waitingForPartner;
final animated = timelineStatus == CpTimelineStatus.inProgress;

final int activeItem = switch (order.status) {
OffRampOrderStatus.depositTxRequired ||
OffRampOrderStatus.creatingDepositTx ||
OffRampOrderStatus.depositTxReady ||
OffRampOrderStatus.sendingDepositTx ||
OffRampOrderStatus.waitingForPartner ||
OffRampOrderStatus.depositError ||
OffRampOrderStatus.depositTxConfirmError ||
OffRampOrderStatus.cancelled =>
1,
OffRampOrderStatus.failure || OffRampOrderStatus.completed => 2,
OffRampOrderStatus.waitingForPartner ||
OffRampOrderStatus.failure ||
OffRampOrderStatus.completed =>
2,
};

final withdrawInitiated = CpTimelineItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ import '../../../config.dart';
import '../../../core/amount.dart';
import '../../../core/currency.dart';
import '../../../data/db/db.dart';
import '../../../di.dart';
import '../../accounts/models/ec_wallet.dart';
import '../../authenticated/auth_scope.dart';
import '../../tokens/token_list.dart';
import '../../transactions/models/tx_results.dart';
import '../../transactions/services/resign_tx.dart';
import '../../transactions/services/tx_sender.dart';
import '../coinflow/services/coinflow_off_ramp_order_watcher.dart';
import '../kado/services/kado_off_ramp_order_watcher.dart';
import '../models/ramp_partner.dart';
import '../scalex/services/scalex_off_ramp_order_watcher.dart';
import '../src/models/ramp_watcher.dart';

typedef OffRampOrder = ({
String id,
Expand All @@ -46,6 +51,7 @@ class OffRampOrderService implements Disposable {
);

final Map<String, StreamSubscription<void>> _subscriptions = {};
final Map<String, RampWatcher> _watchers = {};

final ECWallet _account;
final CryptopleaseClient _client;
Expand All @@ -64,6 +70,7 @@ class OffRampOrderService implements Disposable {

for (final order in orders) {
_subscribe(order.id);
await _watch(order.id);
}
}

Expand Down Expand Up @@ -200,6 +207,7 @@ class OffRampOrderService implements Disposable {
humanStatus: '',
machineStatus: '',
partnerOrderId: partnerOrderId,
txId: transaction?.$1.id,
transaction: transaction?.$1.encode() ?? '',
slot: transaction?.$2 ?? BigInt.zero,
status: transaction == null
Expand All @@ -213,6 +221,7 @@ class OffRampOrderService implements Disposable {

await _db.into(_db.offRampOrderRows).insert(order);
_subscribe(order.id);
await _watch(order.id);

return order.id;
}
Expand Down Expand Up @@ -241,6 +250,23 @@ class OffRampOrderService implements Disposable {
}
});

Future<void> _watch(String orderId) async {
final query = _db.select(_db.offRampOrderRows)
..where((tbl) => tbl.id.equals(orderId));

final order = await query.getSingle();

_watchers[orderId] = switch (order.partner) {
RampPartner.kado => sl<KadoOffRampOrderWatcher>(),
RampPartner.scalex => sl<ScalexOffRampOrderWatcher>(),
RampPartner.coinflow => sl<CoinflowOffRampOrderWatcher>(),
RampPartner.rampNetwork ||
RampPartner.guardarian =>
throw ArgumentError('Not implemented'),
}
..watch(orderId);
}

void _subscribe(String orderId) {
_subscriptions[orderId] = (_db.select(_db.offRampOrderRows)
..where((tbl) => tbl.id.equals(orderId)))
Expand Down Expand Up @@ -280,6 +306,9 @@ class OffRampOrderService implements Disposable {
_subscriptions[orderId]?.cancel();
_subscriptions.remove(orderId);

_watchers[orderId]?.close();
_watchers.remove(orderId);

return const Stream.empty();
}
}).listen(
Expand All @@ -292,6 +321,7 @@ class OffRampOrderService implements Disposable {
@override
Future<void> onDispose() async {
await Future.wait(_subscriptions.values.map((it) => it.cancel()));
_watchers.values.map((it) => it.close());
await _db.delete(_db.offRampOrderRows).go();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import 'package:flutter/widgets.dart';

import '../../../di.dart';
import '../coinflow/services/coinflow_off_ramp_order_watcher.dart';
import '../kado/services/kado_off_ramp_order_watcher.dart';
import '../models/ramp_partner.dart';
import '../scalex/services/scalex_off_ramp_order_watcher.dart';
import '../services/off_ramp_order_service.dart';
import '../src/models/ramp_watcher.dart';

typedef OffRampOrderDetailsBuilder = Widget Function(
BuildContext context,
Expand All @@ -29,35 +24,15 @@ class OffRampOrderDetails extends StatefulWidget {

class _OffRampOrderDetailsState extends State<OffRampOrderDetails> {
late final Stream<OffRampOrder> _stream;
RampWatcher? _watcher;

@override
void initState() {
super.initState();
_stream = sl<OffRampOrderService>().watch(widget.orderId);

_initWatcher();
}

Future<void> _initWatcher() async {
if (_watcher != null) return;

final ramp = await _stream.first;

_watcher = switch (ramp.partner) {
RampPartner.kado => sl<KadoOffRampOrderWatcher>(),
RampPartner.scalex => sl<ScalexOffRampOrderWatcher>(),
RampPartner.coinflow => sl<CoinflowOffRampOrderWatcher>(),
RampPartner.rampNetwork ||
RampPartner.guardarian =>
throw ArgumentError('Not implemented'),
}
..watch(widget.orderId);
}

@override
void dispose() {
_watcher?.close();
super.dispose();
}

Expand Down

0 comments on commit 29e9aff

Please sign in to comment.