Skip to content

Commit

Permalink
Merge pull request #60 from InfluxCommunity/59-add-custom-port-and-ad…
Browse files Browse the repository at this point in the history
…dress

added custom url and port
  • Loading branch information
Jayclifford345 authored Sep 15, 2023
2 parents 7e6bc49 + 9363038 commit f5eca2e
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 5 deletions.
File renamed without changes.
101 changes: 101 additions & 0 deletions Examples/community/custom_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from influxdb_client_3 import InfluxDBClient3,InfluxDBError,WriteOptions,write_client_options
import pandas as pd
import random


class BatchingCallback(object):

def success(self, conf, data: str):
print(f"Written batch: {conf}, data: {data}")

def error(self, conf, data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

callback = BatchingCallback()


write_options = WriteOptions(batch_size=100,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)

wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
WriteOptions=write_options
)

client = InfluxDBClient3(
token="",
host="https://eu-central-1-1.aws.cloud2.influxdata.com:442",
org="6a841c0c08328fb1",
database="pokemon-codex", enable_gzip=True, write_client_options=wco, write_port_overwrite=443, query_port_overwrite=443)

now = pd.Timestamp.now(tz='UTC').floor('ms')

# Lists of possible trainers
trainers = ["ash", "brock", "misty", "gary", "jessie", "james"]

# Read the CSV into a DataFrame
pokemon_df = pd.read_csv("https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv")

# Creating an empty list to store the data
data = []

# Dictionary to keep track of the number of times each trainer has caught each Pokémon
trainer_pokemon_counts = {}

# Number of entries we want to create
num_entries = 1000

# Generating random data
for i in range(num_entries):
trainer = random.choice(trainers)

# Randomly select a row from pokemon_df
random_pokemon = pokemon_df.sample().iloc[0]
caught = random_pokemon['Name']

# Count the number of times this trainer has caught this Pokémon
if (trainer, caught) in trainer_pokemon_counts:
trainer_pokemon_counts[(trainer, caught)] += 1
else:
trainer_pokemon_counts[(trainer, caught)] = 1

# Get the number for this combination of trainer and Pokémon
num = trainer_pokemon_counts[(trainer, caught)]

entry = {
"trainer": trainer,
"id": f"{0000 + random_pokemon['#']:04d}",
"num": str(num),
"caught": caught,
"level": random.randint(5, 20),
"attack": random_pokemon['Attack'],
"defense": random_pokemon['Defense'],
"hp": random_pokemon['HP'],
"speed": random_pokemon['Speed'],
"type1": random_pokemon['Type 1'],
"type2": random_pokemon['Type 2'],
"timestamp": now
}
data.append(entry)

# Convert the list of dictionaries to a DataFrame
caught_pokemon_df = pd.DataFrame(data).set_index('timestamp')

# Print the DataFrame
print(caught_pokemon_df)


try:
client.write(caught_pokemon_df, data_frame_measurement_name='caught',
data_frame_tag_columns=['trainer', 'id', 'num'])
except Exception as e:
print(f"Error writing point: {e}")
File renamed without changes.
28 changes: 28 additions & 0 deletions Examples/debugging/get_trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from influxdb_client_3 import InfluxDBClient3
import pandas as pd
from influxdb_client_3.debug import TracingClientMiddleWareFactory



client = InfluxDBClient3(
token="",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1",
database="pokemon-codex",
flight_client_options={"middleware": (TracingClientMiddleWareFactory(),)})


sql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time >= now() - interval '1 hour' LIMIT 5'''
table = client.query(query=sql, language='sql', mode='all')
print(table)


influxql = '''SELECT * FROM caught WHERE trainer = 'ash' AND time > now() - 1h LIMIT 5'''
reader = client.query(query=influxql, language='influxql', mode='chunk')
try:
while True:
batch, buff = reader.read_chunk()
print("batch:")
print(buff)
except StopIteration:
print("No more chunks to read")
24 changes: 19 additions & 5 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from influxdb_client.client.exceptions import InfluxDBError
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
from influxdb_client_3.read_file import UploadFile
import urllib.parse



Expand Down Expand Up @@ -49,6 +50,8 @@ def __init__(
token=None,
write_client_options=None,
flight_client_options=None,
write_port_overwrite=None,
query_port_overwrite=None,
**kwargs):
"""
Initialize an InfluxDB client.
Expand All @@ -71,21 +74,32 @@ def __init__(
self._database = database
self._token = token
self._write_client_options = write_client_options if write_client_options is not None else default_client_options(write_options=SYNCHRONOUS)

# Extracting the hostname from URL if provided
# Parse the host input
parsed_url = urllib.parse.urlparse(host)
host = parsed_url.hostname or host

# Determine the protocol (scheme), hostname, and port
scheme = parsed_url.scheme if parsed_url.scheme else "https"
hostname = parsed_url.hostname if parsed_url.hostname else host
port = parsed_url.port if parsed_url.port else 443

# Construct the clients using the parsed values
if write_port_overwrite is not None:
port = write_port_overwrite

self._client = _InfluxDBClient(
url=f"https://{host}",
url=f"{scheme}://{hostname}:{port}",
token=self._token,
org=self._org,
**kwargs)

self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
self._flight_client_options = flight_client_options or {}
self._flight_client = FlightClient(f"grpc+tls://{host}:443", **self._flight_client_options)

if query_port_overwrite is not None:
port = query_port_overwrite
self._flight_client = FlightClient(f"grpc+tls://{hostname}:{port}", **self._flight_client_options)



def write(self, record=None, database=None ,**kwargs):
Expand Down

0 comments on commit f5eca2e

Please sign in to comment.