Skip to content

Commit 3fa9f9c

Browse files
committed
Adapt try next
1 parent bbb6b80 commit 3fa9f9c

File tree

1 file changed

+42
-32
lines changed

1 file changed

+42
-32
lines changed

lib/mongo/change_stream.ex

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,52 +35,62 @@ defmodule Mongo.ChangeStream do
3535
Calls the GetCore-Command
3636
See https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands.rst
3737
"""
38-
def get_more(topology_pid, session, coll, cursor_id,
39-
change_stream(resume_token: resume_token, op_time: op_time, cmd: aggregate_cmd,
40-
on_resume_token: fun) = change_stream, opts) do
38+
def try_next(%__MODULE__{
39+
cmd: aggregate_cmd,
40+
doc: doc,
41+
session: session,
42+
topology_pid: _topology_pid,
43+
on_resume_token: on_resume_token
44+
} = change_stream, opts \\ []) do
45+
%{
46+
"cursor" => %{
47+
"id" => cursor_id,
48+
"ns" => ns,
49+
"firstBatch" => docs,
50+
"postBatchResumeToken" => old_token
51+
},
52+
"operationTime" => op_time
53+
} = doc
54+
55+
[_db, coll] = ns |> String.split(".")
56+
57+
batch_size = opts[:batch_size] || aggregate_cmd[:cursor][:batchSize]
4158

4259
get_more = [
43-
getMore: %BSON.LongNumber{value: cursor_id},
44-
collection: coll,
45-
batchSize: opts[:batch_size],
46-
maxTimeMS: opts[:max_time]
47-
] |> filter_nils()
60+
getMore: %BSON.LongNumber{value: cursor_id},
61+
collection: coll,
62+
batchSize: batch_size,
63+
maxTimeMS: opts[:max_time]
64+
] |> Enum.reject(fn {_key, value} -> is_nil(value) end)
4865

4966
with {:ok, %{"operationTime" => op_time,
5067
"cursor" => %{"id" => new_cursor_id, "nextBatch" => docs} = cursor,
5168
"ok" => ok}} when ok == 1 <- Mongo.exec_command_session(session, get_more, opts) do
5269

53-
old_token = change_stream(change_stream, :resume_token)
54-
change_stream = update_change_stream(change_stream, cursor["postBatchResumeToken"], op_time, List.last(docs))
55-
new_token = change_stream(change_stream, :resume_token)
70+
# TODO: old_token also fallback to startAfter, etc
71+
new_token = cursor["postBatchResumeToken"] || List.last(docs)["_id"]
5672

57-
case token_changes(old_token, new_token) do
58-
true -> fun.(new_token)
59-
false -> nil
73+
case {old_token, new_token} do
74+
{nil, nil} -> nil
75+
{%{} = map, map} -> nil
76+
_ -> on_resume_token.(new_token)
6077
end
6178

62-
{:ok, %{cursor_id: new_cursor_id, docs: docs, change_stream: change_stream}}
79+
# TODO: Do we need to update the operationTime?
80+
change_stream = Map.update!(change_stream, :doc, fn old_doc ->
81+
old_doc
82+
|> Map.put("operationTime", op_time)
83+
|> Map.update!("cursor", & Map.put(&1, "id", new_cursor_id))
84+
end)
85+
86+
{docs, change_stream}
6387

6488
else
6589
{:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable}
66-
{:error, _error} ->
67-
68-
with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do
69-
70-
[%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options
71-
72-
stream_opts = update_stream_options(stream_opts, resume_token, op_time, wire_version)
73-
aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end)
74-
75-
# kill the cursor
76-
kill_cursors(session, coll, [cursor_id], opts)
77-
78-
# Start aggregation again...
79-
with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do
80-
{:resume, state}
81-
end
82-
end
90+
{:error, error} ->
8391

92+
# TODO: handle error
93+
raise error
8494
end
8595
end
8696

0 commit comments

Comments
 (0)