Skip to content

Commit

Permalink
Minor edits
Browse files Browse the repository at this point in the history
  • Loading branch information
MatinF committed Nov 27, 2020
1 parent b8d11cf commit df0bc64
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ There are multiple ways to automate the script execution.
### 3A: Use task scheduler
One approach is via periodic execution, triggered e.g. by Windows Task Scheduler or Linux cron jobs. By default, the script is 'dynamic' meaning that it will only process log files that have not yet been added to the InfluxDB database. The script achieves this by fetching the 'most recent' timestamp (across signals) for each device in InfluxDB. The script will then only fetch log files that contain newer data vs. this timestamp.

If no timestamps are found in InfluxDB for a device, the `default_start` datetime will be used. Same goes if `dynamic = False` is used.
If no timestamps are found in InfluxDB for a device, `default_start` is used. Same goes if `dynamic = False` is used.

For details on setting up task scheduler, see the CANedge Intro guide for browser dashboards.

Expand Down
47 changes: 26 additions & 21 deletions dashboard-writer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,23 @@ def get_start_times(self, devices, default_start, dynamic):

default_start_dt = datetime.strptime(default_start, "%Y-%m-%d %H:%M:%S").replace(tzinfo=tzutc())
device_ids = [device.split("/")[1] for device in devices]

self.test_influx()

start_times = []

for device in device_ids:
influx_time = self.client.query_api().query(
f'from(bucket:"{self.influx_bucket}") |> range(start: 0, stop: now()) |> filter(fn: (r) => r["_measurement"] == "{device}") |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> last(column: "_time")'
)
if self.test_influx() == 0:
for device in device_ids:
start_times.append(default_start_dt)
else:
for device in device_ids:
influx_time = self.client.query_api().query(
f'from(bucket:"{self.influx_bucket}") |> range(start: 0, stop: now()) |> filter(fn: (r) => r["_measurement"] == "{device}") |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> last(column: "_time")'
)

if len(influx_time) == 0 or dynamic == False:
last_time = default_start_dt
else:
last_time = influx_time[0].records[0]["_time"]
if len(influx_time) == 0 or dynamic == False:
last_time = default_start_dt
else:
last_time = influx_time[0].records[0]["_time"]

start_times.append(last_time)
start_times.append(last_time)

return start_times

Expand All @@ -104,12 +105,9 @@ def write_influx(self, name, df):
"""
from influxdb_client import WriteOptions

if self.influx_url == "influx_endpoint":
print("- WARNING: Please add your InfluxDB credentials\n")
if self.test_influx() == 0:
return

self.test_influx()

_write_client = self.client.write_api(
write_options=WriteOptions(batch_size=5000, flush_interval=1_000, jitter_interval=2_000, retry_interval=5_000,)
)
Expand All @@ -135,11 +133,18 @@ def delete_influx(self, device):
)

def test_influx(self):
try:
test = self.client.query_api().query(f'from(bucket:"{self.influx_bucket}") |> range(start: -10s)')
except Exception as err:
self.print_influx_error(str(err))
return
if self.influx_url == "influx_endpoint":
print("- WARNING: Please add your InfluxDB credentials\n")
result = 0
else:
try:
test = self.client.query_api().query(f'from(bucket:"{self.influx_bucket}") |> range(start: -10s)')
result = 1
except Exception as err:
self.print_influx_error(str(err))
result = 0

return result

def print_influx_error(self, err):
warning = "- WARNING: Unable to write data to InfluxDB |"
Expand Down

0 comments on commit df0bc64

Please sign in to comment.