Skip to content
This repository was archived by the owner on Sep 22, 2024. It is now read-only.

Commit 3aa72a3

Browse files
committed
add worker abstract
1 parent 6f432a6 commit 3aa72a3

File tree

10 files changed

+141
-42
lines changed

10 files changed

+141
-42
lines changed

integration_test/app_test.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ void main() {
107107
downloadedFiles = [];
108108
});
109109
tearDownAll(() async {
110-
await receiver?.stopListening();
110+
await receiver?.stop();
111111
});
112112
});
113113

@@ -134,7 +134,7 @@ void main() {
134134
devices = await Discover.discover();
135135
}
136136
expect(devices.where((device) => device.code == code), isNotEmpty);
137-
Future.delayed(const Duration(milliseconds: 500), sender.cancel);
137+
Future.delayed(const Duration(milliseconds: 500), sender.stop);
138138
await sender.send(
139139
devices.first,
140140
[

integration_test/mobile_test.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,6 @@ void main() {
7676
downloadedFiles = [];
7777
});
7878
tearDownAll(() async {
79-
await receiver?.stopListening();
79+
await receiver?.stop();
8080
});
8181
}

lib/classes/receiver.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import '../constants.dart';
1919

2020
///Class for all Recieve jobs
2121
///
22-
///Available methods are [listen] and [stopListening]
22+
///Available methods are [listen] and [stop]
2323
class Receiver {
2424
final _files = <DbFile>[];
2525
late final _ms = MediaStore();
@@ -251,7 +251,7 @@ class Receiver {
251251
///Closes the listening server.
252252
///
253253
///Is is safe to call before [listen] or after [listen] .
254-
Future<void> stopListening() async => await _server?.close();
254+
Future<void> stop() async => await _server?.close();
255255

256256
Map<String, dynamic> get map => {
257257
"useDb": useDb,

lib/classes/sender.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class Sender {
2626
return result?.files;
2727
}
2828

29-
void cancel() {
29+
void stop() {
3030
log("Request cancelled", name: "Sender");
3131
_senderCancelToken.cancel();
3232
}

lib/classes/workers/base_worker.dart

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import 'dart:isolate';
2+
3+
///Base class for implementing workers communication.
4+
///
5+
///Worker classes meant to manage workers.
6+
///It run in main isolate, but workers run separated isolates.
7+
abstract class BaseWorker {
8+
///Whether if worker is active.
9+
Future<bool> isActive();
10+
11+
///Stop worker and clean all resources.
12+
Future<void> stop();
13+
14+
///Creates [ReceivePort] for worker to main isolate communication and registers.
15+
///
16+
///Calling more than one, throws.
17+
ReceivePort getReceivePort();
18+
19+
///Looks up for the [SendPort] registered for main isolate to worker communication.
20+
///
21+
///Call it every time you need the [SendPort], instead storing as variable.
22+
SendPort? getSendPort();
23+
}

lib/classes/workers/isolated_receiver.dart

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@ import 'package:weepy/classes/workers/worker_interface.dart';
99
import 'package:weepy/classes/workers/worker_messages.dart' as messages;
1010
import 'package:workmanager/workmanager.dart';
1111

12-
class IsolatedReceiver extends Receiver {
12+
import 'base_worker.dart';
13+
14+
class IsolatedReceiver extends Receiver implements BaseWorker {
1315
///If [true] creates and manages progress notification.
1416
bool progressNotification;
1517

16-
///Runs [Receiver] from a worker.
18+
//TODO: Test completer
19+
final aliveCheckCompleter = Completer<bool>();
20+
1721
IsolatedReceiver(
1822
{super.onDownloadUpdatePercent,
1923
super.useDb,
@@ -26,6 +30,25 @@ class IsolatedReceiver extends Receiver {
2630
super.code,
2731
this.progressNotification = true});
2832

33+
@override
34+
Future<bool> isActive() async {
35+
final sendPort = getSendPort();
36+
if (sendPort == null) {
37+
return false;
38+
}
39+
sendPort.send(const messages.Alive().map);
40+
const waitDuration = Duration(seconds: 5);
41+
final aliveCanceller = Future.delayed(
42+
waitDuration,
43+
() => aliveCheckCompleter.complete(false),
44+
);
45+
final alive = await aliveCheckCompleter.future;
46+
if (alive) {
47+
aliveCanceller.ignore();
48+
}
49+
return alive;
50+
}
51+
2952
///Starts worker and runs [Receiver.listen]
3053
///
3154
///If necessary, requests permission. Throws [NoStoragePermissionException]
@@ -42,21 +65,19 @@ class IsolatedReceiver extends Receiver {
4265
throw NoStoragePermissionException();
4366
}
4467
}
45-
final port = ReceivePort();
46-
IsolateNameServer.registerPortWithName(port.sendPort, MyTasks.receive.name);
47-
port.listen(_portCallback);
48-
await workManager.registerOneOffTask(
49-
MyTasks.receive.name, MyTasks.receive.name,
68+
69+
getReceivePort().listen(_portCallback);
70+
await workManager.registerOneOffTask(Tasks.receive.name, Tasks.receive.name,
5071
inputData: super.map, existingWorkPolicy: ExistingWorkPolicy.keep);
5172
return super.code;
5273
}
5374

5475
@override
55-
Future<void> stopListening() async {
76+
Future<void> stop() async {
5677
if (progressNotification) {
5778
await notifications.cancelDownload();
5879
}
59-
await workManager.cancelByUniqueName(MyTasks.receive.name);
80+
await workManager.cancelByUniqueName(Tasks.receive.name);
6081
}
6182

6283
Future<void> _portCallback(data) async {
@@ -92,12 +113,30 @@ class IsolatedReceiver extends Receiver {
92113
case messages.MessageType.downloadStarted:
93114
final _ = messages.DownloadStarted.fromMap(data);
94115
super.onDownloadStart?.call();
116+
case messages.MessageType.alive:
117+
aliveCheckCompleter.complete(true);
95118
default:
96119
throw Error();
97120
}
98121
} on Exception catch (e) {
99-
log("Interface error", name: "IsolatedReceiver", error: e);
122+
log("Error", name: "IsolatedReceiver", error: e);
100123
rethrow;
101124
}
102125
}
126+
127+
@override
128+
ReceivePort getReceivePort() {
129+
final receivePort = ReceivePort();
130+
final isRegistered = IsolateNameServer.registerPortWithName(
131+
receivePort.sendPort, PortNames.receiver2main.name);
132+
assert(isRegistered);
133+
return receivePort;
134+
}
135+
136+
@override
137+
SendPort? getSendPort() {
138+
final sendPort =
139+
IsolateNameServer.lookupPortByName(PortNames.main2receiver.name);
140+
return sendPort;
141+
}
103142
}

lib/classes/workers/isolated_sender.dart

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,41 @@
11
import 'dart:async';
2+
import 'dart:isolate';
3+
import 'dart:ui';
24
import 'package:file_picker/file_picker.dart';
35
import 'package:weepy/classes/notifications.dart' as notifications;
46
import 'package:weepy/classes/sender.dart';
5-
import 'package:weepy/classes/workers/isolated_receiver.dart';
67
import 'package:weepy/classes/workers/worker_interface.dart';
78
import 'package:weepy/classes/workers/worker_messages.dart' as messages;
89
import 'package:weepy/models.dart';
10+
import 'base_worker.dart';
911

10-
class IsolatedSender extends Sender {
12+
class IsolatedSender extends Sender implements BaseWorker {
1113
IsolatedSender({super.onUploadProgress});
12-
13-
///Returns whether [IsolatedReceiver] active in a worker.
14+
//TODO: Test completer
15+
final aliveCheckCompleter = Completer<bool>();
16+
@override
1417
Future<bool> isActive() async {
15-
//TODO
16-
throw UnimplementedError();
18+
final sendPort = getSendPort();
19+
if (sendPort == null) {
20+
return false;
21+
}
22+
sendPort.send(const messages.Alive().map);
23+
const waitDuration = Duration(seconds: 5);
24+
final aliveCanceller = Future.delayed(
25+
waitDuration,
26+
() => aliveCheckCompleter.complete(false),
27+
);
28+
final alive = await aliveCheckCompleter.future;
29+
if (alive) {
30+
aliveCanceller.ignore();
31+
}
32+
return alive;
1733
}
1834

1935
@override
2036
Future<void> send(Device device, Iterable<PlatformFile> files,
2137
{bool useDb = true, bool progressNotification = true}) async {
22-
final port = await initialize();
38+
await initialize();
2339
if (progressNotification) {
2440
progressNotification = await notifications.initialize();
2541
}
@@ -31,7 +47,8 @@ class IsolatedSender extends Sender {
3147
map["useDb"] = useDb;
3248

3349
final exitBlock = Completer<void>();
34-
port.listen((data) async {
50+
getReceivePort().listen((data) async {
51+
//Listen incoming messages
3552
switch (messages.MessageType.values[data["type"]]) {
3653
case messages.MessageType.updatePercent:
3754
final message = messages.UpdatePercent.fromMap(data);
@@ -43,11 +60,13 @@ class IsolatedSender extends Sender {
4360
case messages.MessageType.completed:
4461
final _ = messages.Completed.fromMap(data);
4562
exitBlock.complete();
63+
case messages.MessageType.alive:
64+
aliveCheckCompleter.complete(true);
4665
default:
4766
throw Error();
4867
}
4968
});
50-
await workManager.registerOneOffTask(MyTasks.send.name, MyTasks.send.name,
69+
await workManager.registerOneOffTask(Tasks.send.name, Tasks.send.name,
5170
inputData: map);
5271
if (progressNotification) {
5372
//Create notification
@@ -60,5 +79,21 @@ class IsolatedSender extends Sender {
6079
}
6180

6281
@override
63-
Future<void> cancel() => workManager.cancelByUniqueName(MyTasks.send.name);
82+
Future<void> stop() => workManager.cancelByUniqueName(Tasks.send.name);
83+
84+
@override
85+
ReceivePort getReceivePort() {
86+
final receivePort = ReceivePort();
87+
final isRegistered = IsolateNameServer.registerPortWithName(
88+
receivePort.sendPort, PortNames.sender2main.name);
89+
assert(isRegistered);
90+
return receivePort;
91+
}
92+
93+
@override
94+
SendPort? getSendPort() {
95+
final sendPort =
96+
IsolateNameServer.lookupPortByName(PortNames.main2sender.name);
97+
return sendPort;
98+
}
6499
}

lib/classes/workers/worker_interface.dart

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
///This library manages essentials for all `worker`'s
2+
///
3+
///We are using `worker`'s from workmanager plugin to continue jobs from background
4+
library;
5+
16
import 'dart:async';
27
import 'dart:developer';
38
import 'dart:io';
@@ -15,25 +20,25 @@ import 'package:workmanager/workmanager.dart';
1520

1621
import '../receiver.dart';
1722

18-
enum MyTasks { receive, send }
23+
enum PortNames { receiver2main, sender2main, main2receiver, main2sender }
1924

20-
enum PortName { main2receiver, main2sender }
25+
enum Tasks { receive, send }
2126

2227
final workManager = Workmanager();
2328
@pragma("vm:entry-point")
2429
void _callBack() {
2530
workManager.executeTask((taskName, inputData) async {
2631
try {
2732
final task =
28-
MyTasks.values.singleWhere((element) => element.name == taskName);
33+
Tasks.values.singleWhere((element) => element.name == taskName);
2934
switch (task) {
30-
case MyTasks.receive:
35+
case Tasks.receive:
3136
SendPort? getSendPort() =>
32-
IsolateNameServer.lookupPortByName(MyTasks.receive.name);
37+
IsolateNameServer.lookupPortByName(PortNames.receiver2main.name);
3338

3439
final receiverPort = ReceivePort();
3540
IsolateNameServer.registerPortWithName(
36-
receiverPort.sendPort, PortName.main2receiver.name);
41+
receiverPort.sendPort, PortNames.main2receiver.name);
3742
receiverPort.listen((message) {
3843
//TODO: Test alive feature
3944
if (message["data"] == messages.MessageType.alive) {
@@ -67,13 +72,13 @@ void _callBack() {
6772
});
6873
await receiver.listen();
6974
return exitBlock.future;
70-
case MyTasks.send:
75+
case Tasks.send:
7176
final senderMap = inputData!;
7277
final receiverPort = ReceivePort();
7378
IsolateNameServer.registerPortWithName(
74-
receiverPort.sendPort, PortName.main2sender.name);
79+
receiverPort.sendPort, PortNames.main2sender.name);
7580
SendPort? getSendPort() =>
76-
IsolateNameServer.lookupPortByName(MyTasks.send.name);
81+
IsolateNameServer.lookupPortByName(PortNames.sender2main.name);
7782
receiverPort.listen((message) {
7883
if (message["data"] == messages.MessageType.alive) {
7984
//TODO: Test alive feature
@@ -104,13 +109,10 @@ void _callBack() {
104109
}
105110
} on Exception {
106111
rethrow;
107-
112+
}
108113
});
109114
}
110115

111-
Future<ReceivePort> initialize() async {
112-
final port = ReceivePort();
113-
IsolateNameServer.registerPortWithName(port.sendPort, MyTasks.send.name);
116+
Future<void> initialize() async {
114117
await workManager.initialize(_callBack, isInDebugMode: kDebugMode);
115-
return port;
116118
}

lib/screens/receive_page.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class _ReceivePageInnerState extends ConsumerState<ReceivePageInner>
9898
@override
9999
void dispose() {
100100
_downloadAnimC.dispose();
101-
_receiver.stopListening();
101+
_receiver.stop();
102102
super.dispose();
103103
}
104104

lib/screens/send_page.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ class _SendPageInnerState extends ConsumerState<SendPageInner>
9696
@override
9797
void dispose() {
9898
_uploadAnimC.dispose();
99-
_sender.cancel();
99+
_sender.stop();
100100
super.dispose();
101101
}
102102

0 commit comments

Comments
 (0)