Skip to content

Commit b3779bb

Browse files
committed
Close subscription after expired token
1 parent 8795dd8 commit b3779bb

File tree

1 file changed

+43
-17
lines changed

1 file changed

+43
-17
lines changed

databroker/src/broker.rs

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ pub struct QuerySubscription {
155155

156156
pub struct ChangeSubscription {
157157
entries: HashMap<i32, HashSet<Field>>,
158-
sender: mpsc::Sender<EntryUpdates>,
158+
sender: Option<mpsc::Sender<EntryUpdates>>,
159159
permissions: Permissions,
160160
}
161161

@@ -607,7 +607,7 @@ impl Subscriptions {
607607
}
608608

609609
pub async fn notify(
610-
&self,
610+
&mut self,
611611
changed: Option<&HashMap<i32, HashSet<Field>>>,
612612
db: &Database,
613613
) -> Result<Option<HashMap<String, ()>>, NotificationError> {
@@ -627,10 +627,10 @@ impl Subscriptions {
627627
}
628628
}
629629

630-
for sub in &self.change_subscriptions {
630+
for sub in &mut self.change_subscriptions {
631631
match sub.notify(changed, db).await {
632632
Ok(_) => {}
633-
Err(err) => error = Some(err),
633+
Err(err) => error = Some(err)
634634
}
635635
}
636636

@@ -660,12 +660,24 @@ impl Subscriptions {
660660
true
661661
}
662662
});
663-
self.change_subscriptions.retain(|sub| {
664-
if sub.sender.is_closed() {
663+
self.change_subscriptions.retain_mut(|sub| {
664+
if let Some(sender) = &sub.sender {
665+
if sender.is_closed() {
666+
info!("Subscriber gone: removing subscription");
667+
false
668+
} else {
669+
match &sub.permissions.expired() {
670+
Ok(()) => true,
671+
Err(PermissionError::Expired) => {
672+
sub.sender = None;
673+
false
674+
}
675+
Err(err) => panic!("Error: {:?}", err),
676+
}
677+
}
678+
} else {
665679
info!("Subscriber gone: removing subscription");
666680
false
667-
} else {
668-
true
669681
}
670682
});
671683
}
@@ -693,7 +705,7 @@ impl ChangeSubscription {
693705
// notify
694706
let notifications = {
695707
let mut notifications = EntryUpdates::default();
696-
708+
let mut error = None;
697709
for (id, changed_fields) in changed {
698710
if let Some(fields) = self.entries.get(id) {
699711
if !fields.is_disjoint(changed_fields) {
@@ -723,22 +735,32 @@ impl ChangeSubscription {
723735
fields: notify_fields,
724736
});
725737
}
738+
Err(ReadError::PermissionExpired) => {
739+
debug!("notify: token expired, closing subscription channel");
740+
error = Some(NotificationError {});
741+
break;
742+
}
726743
Err(_) => {
727-
debug!("notify: could not find entry with id {}", id)
744+
debug!("notify: could not find entry with id {}", id);
728745
}
729746
}
730747
}
731748
}
732749
}
750+
if let Some(err) = error {
751+
return Err(err);
752+
}
733753
notifications
734754
};
735755
if notifications.updates.is_empty() {
736756
Ok(())
737-
} else {
738-
match self.sender.send(notifications).await {
757+
} else if let Some(sender) = &self.sender {
758+
match sender.send(notifications).await {
739759
Ok(()) => Ok(()),
740760
Err(_) => Err(NotificationError {}),
741761
}
762+
} else {
763+
Err(NotificationError {})
742764
}
743765
} else {
744766
Ok(())
@@ -775,9 +797,13 @@ impl ChangeSubscription {
775797
}
776798
notifications
777799
};
778-
match self.sender.send(notifications).await {
779-
Ok(()) => Ok(()),
780-
Err(_) => Err(NotificationError {}),
800+
if let Some(sender) = &self.sender {
801+
match sender.send(notifications).await {
802+
Ok(()) => Ok(()),
803+
Err(_) => Err(NotificationError {}),
804+
}
805+
} else {
806+
Err(NotificationError {})
781807
}
782808
}
783809
}
@@ -1411,7 +1437,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
14111437
match self
14121438
.broker
14131439
.subscriptions
1414-
.read()
1440+
.write()
14151441
.await
14161442
.notify(Some(&changed), &db)
14171443
.await
@@ -1457,7 +1483,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
14571483
let (sender, receiver) = mpsc::channel(10);
14581484
let subscription = ChangeSubscription {
14591485
entries: valid_entries,
1460-
sender,
1486+
sender: Some(sender),
14611487
permissions: self.permissions.clone(),
14621488
};
14631489

0 commit comments

Comments
 (0)