Skip to content

Commit d3c82a0

Browse files
author
ks0m1c_dharma
committed
stream handling for bulk streaming downloads
1 parent 4cc59b9 commit d3c82a0

File tree

1 file changed

+100
-0
lines changed

1 file changed

+100
-0
lines changed

lib/vyasa/medium/ext/stream.ex

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
defmodule Vyasa.Medium.Ext.Stream do
2+
def stream([{_url, _path} | _] = data) do
3+
Task.async_stream(
4+
data,
5+
fn {url, path} ->
6+
download(url, path)
7+
end,
8+
timeout: :infinity
9+
)
10+
end
11+
12+
def download(url, file_path) do
13+
IO.puts("Starting to process #{inspect(file_path)}...........")
14+
15+
# Open a file to which binary chunks will be appended to.
16+
# this process is reset in case of redirection
17+
file_pid = File.open!(file_path, [:write, :binary])
18+
19+
unless is_pid(file_pid), do: raise("File creation problem on disk")
20+
21+
# the HTTP stream request
22+
Finch.build(:get, url)
23+
|> Finch.stream_while(Vyasa.Finch, nil, fn
24+
# we put the status in the "acc" to handle redirections
25+
{:status, status}, _acc ->
26+
{:cont, status}
27+
28+
# - when we receive 302, we put the "location" header in the "acc"
29+
# - when we receive a 200, we put the "content-length" and the file name in the "acc",
30+
{:headers, headers}, acc ->
31+
handle_headers(headers, acc)
32+
33+
# when we receive the "location" tuple, we recurse
34+
# otherwise, we write the chunk into the file and print out the current progress.
35+
{:data, data}, acc ->
36+
handle_data(data, acc, file_path, file_pid)
37+
end)
38+
39+
case File.close(file_pid) do
40+
:ok ->
41+
{:halt, {file_path, :done}}
42+
43+
{:error, _reason} ->
44+
{:halt, :error}
45+
end
46+
end
47+
48+
def handle_headers(headers, status) when status in [301, 302, 303, 307, 308] do
49+
IO.puts("REDIR: #{status}")
50+
51+
{:cont, Enum.find(headers, &(elem(&1, 0) == "location"))}
52+
end
53+
54+
def handle_headers(headers, 200) do
55+
{"content-length", size} =
56+
Enum.find(headers, &(elem(&1, 0) == "content-length"))
57+
58+
case size do
59+
nil ->
60+
{:cont, {0, 0}}
61+
62+
size ->
63+
{:cont, {0, String.to_integer(size)}}
64+
end
65+
end
66+
67+
def handle_headers(_, status) do
68+
dbg(status)
69+
{:halt, :bad_status}
70+
end
71+
72+
def handle_data(_data, {"location", location}, file_path, file_pid) do
73+
if Process.alive?(file_pid), do: :ok = File.close(file_pid)
74+
75+
# recursion
76+
download(location, file_path)
77+
end
78+
79+
def handle_data(data, {processed, size}, file_path, file_pid) do
80+
case IO.binwrite(file_pid, data) do
81+
:ok ->
82+
processed =
83+
if is_integer(size) and size > 0 do
84+
(processed + byte_size(data))
85+
|> tap(fn processed ->
86+
IO.inspect(Float.round(processed * 100 / size, 1),
87+
label: "Processed #{inspect(file_path)} %: "
88+
)
89+
end)
90+
else
91+
processed + byte_size(data)
92+
end
93+
94+
{:cont, {processed, size}}
95+
96+
{:error, reason} ->
97+
{:error, reason}
98+
end
99+
end
100+
end

0 commit comments

Comments
 (0)