Skip to content

Commit

Permalink
feat(media): add support for async uploads
Browse files Browse the repository at this point in the history
Changelog: support for preallocated media content URI has been added in
`Media::create_content_uri()`, and uploading the content for such a
preallocated URI is possible with `Media::upload_preallocated()`.
  • Loading branch information
bnjbvr committed Oct 23, 2024
1 parent d23698f commit 553c2e0
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 6 deletions.
6 changes: 5 additions & 1 deletion crates/matrix-sdk/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use serde_json::Error as JsonError;
use thiserror::Error;
use url::ParseError as UrlParseError;

use crate::{event_cache::EventCacheError, store_locks::LockStoreError};
use crate::{event_cache::EventCacheError, media::MediaError, store_locks::LockStoreError};

/// Result type of the matrix-sdk.
pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -374,6 +374,10 @@ pub enum Error {
/// Backups are not enabled
#[error("backups are not enabled")]
BackupNotEnabled,

/// An error happened during handling of a media subrequest.
#[error(transparent)]
Media(#[from] MediaError),
}

#[rustfmt::skip] // stop rustfmt breaking the `<code>` in docs across multiple lines
Expand Down
122 changes: 118 additions & 4 deletions crates/matrix-sdk/src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,21 @@ pub use matrix_sdk_base::media::*;
use mime::Mime;
use ruma::{
api::{
client::{authenticated_media, media},
client::{authenticated_media, error::ErrorKind, media},
MatrixVersion,
},
assign,
events::room::{MediaSource, ThumbnailInfo},
MxcUri,
MilliSecondsSinceUnixEpoch, MxcUri, OwnedMxcUri,
};
#[cfg(not(target_arch = "wasm32"))]
use tempfile::{Builder as TempFileBuilder, NamedTempFile, TempDir};
#[cfg(not(target_arch = "wasm32"))]
use tokio::{fs::File as TokioFile, io::AsyncWriteExt};

use crate::{attachment::Thumbnail, futures::SendRequest, Client, Result, TransmissionProgress};
use crate::{
attachment::Thumbnail, futures::SendRequest, Client, Error, Result, TransmissionProgress,
};

/// A conservative upload speed of 1Mbps
const DEFAULT_UPLOAD_SPEED: u64 = 125_000;
Expand Down Expand Up @@ -105,6 +107,28 @@ impl fmt::Display for PersistError {
}
}

/// A preallocated MXC URI created by [`Media::create_media_content_uri`], and
/// to be used with [`Media::fill_media_content`].
#[derive(Debug)]
pub struct PreallocatedMxcUri {
/// The URI for the media URI.
pub uri: OwnedMxcUri,
/// The expiration date for the media URI.
expire_date: Option<MilliSecondsSinceUnixEpoch>,
}

/// An error that happened in the realm of media.
#[derive(Debug, thiserror::Error)]
pub enum MediaError {
/// A preallocated MXC URI has expired.
#[error("a preallocated MXC URI has expired")]
ExpiredPreallocatedMxcUri,

/// Preallocated media already had content, cannot overwrite.
#[error("preallocated media already had content, cannot overwrite")]
CannotOverwriteMedia,
}

/// `IntoFuture` returned by [`Media::upload`].
pub type SendUploadRequest = SendRequest<media::create_content::v3::Request>;

Expand Down Expand Up @@ -154,6 +178,96 @@ impl Media {
self.client.send(request, Some(request_config))
}

/// Preallocates an MXC URI for a media that will be uploaded soon.
///
/// This preallocates an URI *before* any content is uploaded to the server.
/// The resulting preallocated MXC URI can then be consumed with
/// [`Media::upload_preallocated`].
///
/// # Examples
///
/// ```no_run
/// # use std::fs;
/// # use matrix_sdk::{Client, ruma::room_id};
/// # use url::Url;
/// # use mime;
/// # async {
/// # let homeserver = Url::parse("http://localhost:8080")?;
/// # let mut client = Client::new(homeserver).await?;
///
/// let preallocated = client.media().create_content_uri().await?;
/// println!("Cat URI: {}", preallocated.uri);
///
/// let image = fs::read("/home/example/my-cat.jpg")?;
/// client
/// .media()
/// .upload_preallocated(preallocated, &mime::IMAGE_JPEG, image)
/// .await?;
///
/// # anyhow::Ok(()) };
/// ```
pub async fn create_content_uri(&self) -> Result<PreallocatedMxcUri> {
// Note: this request doesn't have any parameters.
let request = media::create_mxc_uri::v1::Request::default();

let response = self.client.send(request, None).await?;

Ok(PreallocatedMxcUri {
uri: response.content_uri,
expire_date: response.unused_expires_at,
})
}

/// Fills the content of a preallocated MXC URI with the given content type
/// and data.
///
/// The URI must have been preallocated with [`Self::create_content_uri`].
/// See this method's documentation for a full example.
pub async fn upload_preallocated(
&self,
uri: PreallocatedMxcUri,
content_type: &Mime,
data: Vec<u8>,
) -> Result<()> {
// Do a best-effort at reporting an expired MXC URI here; otherwise the server
// may complain about it later.
if let Some(expire_date) = uri.expire_date {
if MilliSecondsSinceUnixEpoch::now() >= expire_date {
return Err(Error::Media(MediaError::ExpiredPreallocatedMxcUri));
}
}

let timeout = std::cmp::max(
Duration::from_secs(data.len() as u64 / DEFAULT_UPLOAD_SPEED),
MIN_UPLOAD_REQUEST_TIMEOUT,
);

let request = assign!(media::create_content_async::v3::Request::from_url(&uri.uri, data)?, {
content_type: Some(content_type.as_ref().to_owned()),
});

let request_config = self.client.request_config().timeout(timeout);

if let Err(err) = self.client.send(request, Some(request_config)).await {
match err.client_api_error_kind() {
Some(ErrorKind::CannotOverwriteMedia) => {
Err(Error::Media(MediaError::CannotOverwriteMedia))
}

// Unfortunately, the spec says a server will return 404 for either an expired MXC
// ID or a non-existing MXC ID. Do a best-effort guess to recognize an expired MXC
// ID based on the error string, which will work with Synapse (as of 2024-10-23).
Some(ErrorKind::Unknown) if err.to_string().contains("expired") => {
Err(Error::Media(MediaError::ExpiredPreallocatedMxcUri))
}

_ => Err(err.into()),
}
} else {
Ok(())
}
}

/// Gets a media file by copying it to a temporary location on disk.
///
/// The file won't be encrypted even if it is encrypted on the server.
Expand Down Expand Up @@ -506,7 +620,7 @@ impl Media {
self.upload(content_type, data)
.with_send_progress_observable(send_progress)
.await
.map_err(crate::Error::from)
.map_err(Error::from)
};

let ((thumbnail_source, thumbnail_info), response) =
Expand Down
50 changes: 49 additions & 1 deletion crates/matrix-sdk/tests/integration/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ruma::{
api::client::media::get_content_thumbnail::v3::Method,
assign, device_id,
events::room::{message::ImageMessageEventContent, ImageInfo, MediaSource},
mxc_uri, uint, user_id,
mxc_uri, owned_mxc_uri, uint, user_id,
};
use serde_json::json;
use wiremock::{
Expand Down Expand Up @@ -411,3 +411,51 @@ async fn test_get_media_file_with_auth_matrix_stable_feature() {
};
client.media().get_thumbnail(&event_content, settings, true).await.unwrap();
}

#[async_test]
async fn test_async_media_upload() {
let (client, server) = logged_in_client_with_server().await;

client.reset_server_capabilities().await.unwrap();

// Declare Matrix version v1.7.
Mock::given(method("GET"))
.and(path("/_matrix/client/versions"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"versions": [
"v1.7"
],
})))
.expect(1)
.mount(&server)
.await;

Mock::given(method("POST"))
.and(path("/_matrix/media/v1/create"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"content_uri": "mxc://example.com/AQwafuaFswefuhsfAFAgsw"
})))
.expect(1)
.mount(&server)
.await;

Mock::given(method("PUT"))
.and(path("/_matrix/media/v3/upload/example.com/AQwafuaFswefuhsfAFAgsw"))
.and(header("authorization", "Bearer 1234"))
.and(header("content-type", "image/jpeg"))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&server)
.await;

let mxc_uri = client.media().create_content_uri().await.unwrap();

assert_eq!(mxc_uri.uri, owned_mxc_uri!("mxc://example.com/AQwafuaFswefuhsfAFAgsw"));

client
.media()
.upload_preallocated(mxc_uri, &mime::IMAGE_JPEG, b"hello world".to_vec())
.await
.unwrap();
}

0 comments on commit 553c2e0

Please sign in to comment.