Skip to content

Commit bbb6b80

Browse files
committed
copy paste get_more from enumerable protocol to ChangeStream module
1 parent b7aa63c commit bbb6b80

File tree

1 file changed

+54
-1
lines changed

1 file changed

+54
-1
lines changed

lib/mongo/change_stream.ex

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,59 @@ defmodule Mongo.ChangeStream do
3131
end
3232
end
3333

34+
@doc """
35+
Calls the GetCore-Command
36+
See https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands.rst
37+
"""
38+
def get_more(topology_pid, session, coll, cursor_id,
39+
change_stream(resume_token: resume_token, op_time: op_time, cmd: aggregate_cmd,
40+
on_resume_token: fun) = change_stream, opts) do
41+
42+
get_more = [
43+
getMore: %BSON.LongNumber{value: cursor_id},
44+
collection: coll,
45+
batchSize: opts[:batch_size],
46+
maxTimeMS: opts[:max_time]
47+
] |> filter_nils()
48+
49+
with {:ok, %{"operationTime" => op_time,
50+
"cursor" => %{"id" => new_cursor_id, "nextBatch" => docs} = cursor,
51+
"ok" => ok}} when ok == 1 <- Mongo.exec_command_session(session, get_more, opts) do
52+
53+
old_token = change_stream(change_stream, :resume_token)
54+
change_stream = update_change_stream(change_stream, cursor["postBatchResumeToken"], op_time, List.last(docs))
55+
new_token = change_stream(change_stream, :resume_token)
56+
57+
case token_changes(old_token, new_token) do
58+
true -> fun.(new_token)
59+
false -> nil
60+
end
61+
62+
{:ok, %{cursor_id: new_cursor_id, docs: docs, change_stream: change_stream}}
63+
64+
else
65+
{:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable}
66+
{:error, _error} ->
67+
68+
with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do
69+
70+
[%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options
71+
72+
stream_opts = update_stream_options(stream_opts, resume_token, op_time, wire_version)
73+
aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end)
74+
75+
# kill the cursor
76+
kill_cursors(session, coll, [cursor_id], opts)
77+
78+
# Start aggregation again...
79+
with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do
80+
{:resume, state}
81+
end
82+
end
83+
84+
end
85+
end
86+
3487
defimpl Enumerable do
3588

3689
defrecordp :change_stream, [:resume_token, :op_time, :cmd, :on_resume_token]
@@ -291,4 +344,4 @@ defmodule Mongo.ChangeStream do
291344
def member?(_stream, _term), do: {:error, __MODULE__}
292345

293346
end
294-
end
347+
end

0 commit comments

Comments
 (0)