Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] demo app with sending / subscribing features #13

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ publish.properties
/.idea/compiler.xml
/.idea/jarRepositories.xml
/.idea/misc.xml
/.idea/shelf

# general
**/.DS_Store
12 changes: 12 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Cancellation.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.ably.chat

/**
* A cancellation handle, returned by various functions (mostly subscriptions)
* where cancellation is required.
*/
fun interface Cancellation {
/**
* Handle cancellation (unsubscribe listeners, clean up)
*/
fun cancel()
}
9 changes: 5 additions & 4 deletions chat-android/src/main/java/com/ably/chat/ChatApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@ private const val API_PROTOCOL_VERSION = 3
private const val PROTOCOL_VERSION_PARAM_NAME = "v"
private val apiProtocolParam = Param(PROTOCOL_VERSION_PARAM_NAME, API_PROTOCOL_VERSION.toString())

// TODO make this class internal
class ChatApi(private val realtimeClient: RealtimeClient, private val clientId: String) {
internal class ChatApi(private val realtimeClient: RealtimeClient, private val clientId: String) {

/**
* Get messages from the Chat Backend
*
* @return paginated result with messages
*/
suspend fun getMessages(roomId: String, params: QueryOptions): PaginatedResult<Message> {
suspend fun getMessages(roomId: String, options: QueryOptions, fromSerial: String? = null): PaginatedResult<Message> {
val baseParams = options.toParams()
val params = fromSerial?.let { baseParams + Param("fromSerial", it) } ?: baseParams
return makeAuthorizedPaginatedRequest(
url = "/chat/v1/rooms/$roomId/messages",
method = "GET",
params = params.toParams(),
params = params,
) {
Message(
timeserial = it.requireString("timeserial"),
Expand Down
3 changes: 2 additions & 1 deletion chat-android/src/main/java/com/ably/chat/ChatClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ interface ChatClient {
val clientOptions: ClientOptions
}

fun ChatClient(realtimeClient: RealtimeClient, clientOptions: ClientOptions): ChatClient = DefaultChatClient(realtimeClient, clientOptions)
fun ChatClient(realtimeClient: RealtimeClient, clientOptions: ClientOptions = ClientOptions()): ChatClient =
DefaultChatClient(realtimeClient, clientOptions)

internal class DefaultChatClient(
override val realtime: RealtimeClient,
Expand Down
8 changes: 1 addition & 7 deletions chat-android/src/main/java/com/ably/chat/ConnectionStatus.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,7 @@ interface ConnectionStatus {
* Registers a listener that will be called whenever the connection status changes.
* @param listener The function to call when the status changes.
*/
fun on(listener: Listener)

/**
* Unregisters a listener
* @param listener The function to call when the status changes.
*/
fun off(listener: Listener)
fun on(listener: Listener): Cancellation

/**
* An interface for listening to changes for the connection status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@ interface EmitsDiscontinuities {
* Register a listener to be called when a discontinuity is detected.
* @param listener The listener to be called when a discontinuity is detected.
*/
fun onDiscontinuity(listener: Listener)

/**
* Unregister a listener to be called when a discontinuity is detected.
* @param listener The listener
*/
fun offDiscontinuity(listener: Listener)
fun onDiscontinuity(listener: Listener): Cancellation

/**
* An interface for listening when discontinuity happens
Expand Down
100 changes: 76 additions & 24 deletions chat-android/src/main/java/com/ably/chat/Messages.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

package com.ably.chat

import com.ably.chat.QueryOptions.MessageOrder.NewestFirst
import com.google.gson.JsonObject
import io.ably.lib.realtime.Channel
import io.ably.lib.types.PaginatedResult
import io.ably.lib.realtime.Channel.MessageListener
import io.ably.lib.realtime.ChannelState
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine

/**
* This interface is used to interact with messages in a chat room: subscribing
Expand All @@ -24,13 +29,7 @@ interface Messages : EmitsDiscontinuities {
* @param listener callback that will be called
* @returns A response object that allows you to control the subscription.
*/
fun subscribe(listener: Listener)

/**
* Unsubscribe listener
* @param listener callback that will be unsubscribed
*/
fun unsubscribe(listener: Listener)
fun subscribe(listener: Listener): MessagesSubscription

/**
* Get messages that have been previously sent to the chat room, based on the provided options.
Expand Down Expand Up @@ -98,7 +97,7 @@ data class QueryOptions(
/**
* The order of messages in the query result.
*/
val orderBy: MessageOrder = MessageOrder.NewestFirst,
val orderBy: MessageOrder = NewestFirst,
) {
/**
* Represents direction to query messages in.
Expand Down Expand Up @@ -176,39 +175,92 @@ data class SendMessageParams(
val headers: MessageHeaders? = null,
)

class DefaultMessages(
interface MessagesSubscription : Cancellation {
suspend fun getPreviousMessages(queryOptions: QueryOptions): PaginatedResult<Message>
}

internal class DefaultMessagesSubscription(
private val chatApi: ChatApi,
private val roomId: String,
private val cancellation: Cancellation,
private val timeserialProvider: suspend () -> String,
) : MessagesSubscription {
override fun cancel() {
cancellation.cancel()
}

override suspend fun getPreviousMessages(queryOptions: QueryOptions): PaginatedResult<Message> {
val fromSerial = timeserialProvider()
return chatApi.getMessages(
roomId = roomId,
options = queryOptions.copy(orderBy = NewestFirst),
fromSerial = fromSerial,
)
}
}

internal class DefaultMessages(
private val roomId: String,
private val realtimeClient: RealtimeClient,
realtimeClient: RealtimeClient,
private val chatApi: ChatApi,
) : Messages {

private var observers: Set<Messages.Listener> = emptySet()

private var channelSerial: String? = null

/**
* the channel name for the chat messages channel.
*/
private val messagesChannelName = "$roomId::\$chat::\$chatMessages"

override val channel: Channel
get() = realtimeClient.channels.get(messagesChannelName, ChatChannelOptions())
override val channel: Channel = realtimeClient.channels.get(messagesChannelName, ChatChannelOptions())

override fun subscribe(listener: Messages.Listener) {
TODO("Not yet implemented")
}
override fun subscribe(listener: Messages.Listener): MessagesSubscription {
observers += listener
val messageListener = MessageListener {
val pubSubMessage = it!!
val chatMessage = Message(
roomId = roomId,
createdAt = pubSubMessage.timestamp,
clientId = pubSubMessage.clientId,
timeserial = pubSubMessage.extras.asJsonObject().get("timeserial").asString,
text = (pubSubMessage.data as JsonObject).get("text").asString,
metadata = mapOf(), // rawPubSubMessage.data.metadata
headers = mapOf(), // rawPubSubMessage.extras.headers
)
observers.forEach { listener -> listener.onEvent(MessageEvent(type = MessageEventType.Created, message = chatMessage)) }
}
channel.subscribe(messageListener)

override fun unsubscribe(listener: Messages.Listener) {
TODO("Not yet implemented")
return DefaultMessagesSubscription(
chatApi = chatApi,
roomId = roomId,
cancellation = {
observers -= listener
channel.unsubscribe(messageListener)
},
timeserialProvider = { getChannelSerial() },
)
}

override suspend fun get(options: QueryOptions): PaginatedResult<Message> {
TODO("Not yet implemented")
}
override suspend fun get(options: QueryOptions): PaginatedResult<Message> = chatApi.getMessages(roomId, options)

override suspend fun send(params: SendMessageParams): Message = chatApi.sendMessage(roomId, params)

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener) {
override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Cancellation {
TODO("Not yet implemented")
}

override fun offDiscontinuity(listener: EmitsDiscontinuities.Listener) {
TODO("Not yet implemented")
private suspend fun readAttachmentProperties() = suspendCoroutine { continuation ->
channel.once(ChannelState.attached) {
continuation.resume(channel.properties)
}
}

private suspend fun getChannelSerial(): String {
if (channelSerial != null) return channelSerial!!
channelSerial = readAttachmentProperties().channelSerial
return channelSerial!!
}
}
21 changes: 3 additions & 18 deletions chat-android/src/main/java/com/ably/chat/Occupancy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,7 @@ interface Occupancy : EmitsDiscontinuities {
*
* @param listener A listener to be called when the occupancy of the room changes.
*/
fun subscribe(listener: Listener)

/**
* Unsubscribe a given listener to occupancy updates of the chat room.
*
* @param listener A listener to be unsubscribed.
*/
fun unsubscribe(listener: Listener)
fun subscribe(listener: Listener): Cancellation

/**
* Get the current occupancy of the chat room.
Expand Down Expand Up @@ -72,23 +65,15 @@ internal class DefaultOccupancy(
override val channel: Channel
get() = messages.channel

override fun subscribe(listener: Occupancy.Listener) {
TODO("Not yet implemented")
}

override fun unsubscribe(listener: Occupancy.Listener) {
override fun subscribe(listener: Occupancy.Listener): Cancellation {
TODO("Not yet implemented")
}

override suspend fun get(): OccupancyEvent {
TODO("Not yet implemented")
}

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener) {
TODO("Not yet implemented")
}

override fun offDiscontinuity(listener: EmitsDiscontinuities.Listener) {
override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Cancellation {
TODO("Not yet implemented")
}
}
20 changes: 3 additions & 17 deletions chat-android/src/main/java/com/ably/chat/Presence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ interface Presence : EmitsDiscontinuities {
* Subscribe the given listener to all presence events.
* @param listener listener to subscribe
*/
fun subscribe(listener: Listener)

/**
* Unsubscribe the given listener to all presence events.
* @param listener listener to unsubscribe
*/
fun unsubscribe(listener: Listener)
fun subscribe(listener: Listener): Cancellation

/**
* An interface for listening to new presence event
Expand Down Expand Up @@ -162,19 +156,11 @@ internal class DefaultPresence(
TODO("Not yet implemented")
}

override fun subscribe(listener: Presence.Listener) {
TODO("Not yet implemented")
}

override fun unsubscribe(listener: Presence.Listener) {
TODO("Not yet implemented")
}

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener) {
override fun subscribe(listener: Presence.Listener): Cancellation {
TODO("Not yet implemented")
}

override fun offDiscontinuity(listener: EmitsDiscontinuities.Listener) {
override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Cancellation {
TODO("Not yet implemented")
}
}
19 changes: 3 additions & 16 deletions chat-android/src/main/java/com/ably/chat/RoomReactions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ interface RoomReactions : EmitsDiscontinuities {
* @param listener The listener function to be called when a reaction is received.
* @returns A response object that allows you to control the subscription.
*/
fun subscribe(listener: Listener)

/**
* Unsubscribe all listeners from receiving room-level reaction events.
*/
fun unsubscribe(listener: Listener)
fun subscribe(listener: Listener): Cancellation

/**
* An interface for listening to new reaction events
Expand Down Expand Up @@ -116,19 +111,11 @@ internal class DefaultRoomReactions(
TODO("Not yet implemented")
}

override fun subscribe(listener: RoomReactions.Listener) {
TODO("Not yet implemented")
}

override fun unsubscribe(listener: RoomReactions.Listener) {
TODO("Not yet implemented")
}

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener) {
override fun subscribe(listener: RoomReactions.Listener): Cancellation {
TODO("Not yet implemented")
}

override fun offDiscontinuity(listener: EmitsDiscontinuities.Listener) {
override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Cancellation {
TODO("Not yet implemented")
}
}
7 changes: 1 addition & 6 deletions chat-android/src/main/java/com/ably/chat/RoomStatus.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ interface RoomStatus {
* @param listener The function to call when the status changes.
* @returns An object that can be used to unregister the listener.
*/
fun on(listener: Listener)

/**
* Removes all listeners that were added by the `onChange` method.
*/
fun off(listener: Listener)
fun on(listener: Listener): Cancellation

/**
* An interface for listening to changes for the room status
Expand Down
15 changes: 6 additions & 9 deletions chat-android/src/main/java/com/ably/chat/Rooms.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.ably.chat

import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo

/**
* Manages the lifecycle of chat rooms.
*/
Expand All @@ -24,7 +21,7 @@ interface Rooms {
* @throws {@link ErrorInfo} if a room with the same ID but different options already exists.
* @returns Room A new or existing Room object.
*/
fun get(roomId: String, options: RoomOptions): Room
fun get(roomId: String, options: RoomOptions = RoomOptions()): Room

/**
* Release the Room object if it exists. This method only releases the reference
Expand Down Expand Up @@ -60,11 +57,11 @@ internal class DefaultRooms(
)
}

if (room.options != options) {
throw AblyException.fromErrorInfo(
ErrorInfo("Room already exists with different options", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest),
)
}
// if (room.options != options) {
// throw AblyException.fromErrorInfo(
// ErrorInfo("Room already exists with different options", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest),
// )
// }

room
}
Expand Down
Loading