Skip to content

feat: support env #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 0.13.0 [unreleased]

### Features

1. [#127](https://github.com/InfluxCommunity/influxdb3-python/pull/127): Support creating client from environment variables.

## 0.12.0 [2025-03-26]

Expand Down
192 changes: 188 additions & 4 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
import importlib.util
import os
import urllib.parse
from typing import Any

import pyarrow as pa
import importlib.util

from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
from influxdb_client_3.read_file import UploadFile
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
from influxdb_client_3.write_client.client.exceptions import InfluxDBError
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
PointSettings
PointSettings, WriteType, DefaultWriteOptions
from influxdb_client_3.write_client.domain.write_precision import WritePrecision

polars = importlib.util.find_spec("polars") is not None

INFLUX_HOST = "INFLUX_HOST"
INFLUX_TOKEN = "INFLUX_TOKEN"
INFLUX_DATABASE = "INFLUX_DATABASE"
INFLUX_ORG = "INFLUX_ORG"
INFLUX_PRECISION = "INFLUX_PRECISION"
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
INFLUX_TIMEOUT = "INFLUX_TIMEOUT"
INFLUX_VERIFY_SSL = "INFLUX_VERIFY_SSL"
INFLUX_SSL_CA_CERT = "INFLUX_SSL_CA_CERT"
INFLUX_CERT_FILE = "INFLUX_CERT_FILE"
INFLUX_CERT_KEY_FILE = "INFLUX_CERT_KEY_FILE"
INFLUX_CERT_KEY_PASSWORD = "INFLUX_CERT_KEY_PASSWORD"
INFLUX_CONNECTION_POOL_MAXSIZE = "INFLUX_CONNECTION_POOL_MAXSIZE"
INFLUX_PROFILERS = "INFLUX_PROFILERS"
INFLUX_TAG = "INFLUX_TAG"


def write_client_options(**kwargs):
"""
Expand Down Expand Up @@ -83,6 +103,51 @@
return _deep_merge(defaults, {key: value for key, value in custom.items() if key not in exclude_keys})


def _parse_precision(precision):
"""
Parses the precision value and ensures it is valid.

This function checks that the given `precision` is one of the allowed
values defined in `WritePrecision`. If the precision is invalid, it
raises a `ValueError`. The function returns the valid precision value
if it passes validation.

:param precision: The precision value to be validated.
Must be one of WritePrecision.NS, WritePrecision.MS,
WritePrecision.S, or WritePrecision.US.
:return: The valid precision value.
:rtype: WritePrecision
:raises ValueError: If the provided precision is not valid.
"""
if precision not in [WritePrecision.NS, WritePrecision.MS, WritePrecision.S, WritePrecision.US]:
raise ValueError(f"Invalid precision value: {precision}")
return precision


def _parse_gzip_threshold(threshold):
"""
Parses and validates the provided threshold value.

This function ensures that the given threshold is a valid integer value,
and it raises an appropriate error if the threshold is not valid. It also
enforces that the threshold value is non-negative.

:param threshold: The input threshold value to be parsed and validated.
:type threshold: Any
:return: The validated threshold value as an integer.
:rtype: int
:raises ValueError: If the provided threshold is not an integer or if it is
negative.
"""
try:
threshold = int(threshold)
except (TypeError, ValueError):
raise ValueError(f"Invalid threshold value: {threshold}. Must be integer.")
if threshold < 0:
raise ValueError(f"Invalid threshold value: {threshold}. Must be non-negative.")

Check warning on line 147 in influxdb_client_3/__init__.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/__init__.py#L147

Added line #L147 was not covered by tests
return threshold


class InfluxDBClient3:
def __init__(
self,
Expand Down Expand Up @@ -136,8 +201,27 @@
self._org = org if org is not None else "default"
self._database = database
self._token = token
self._write_client_options = write_client_options if write_client_options is not None \
else default_client_options(write_options=SYNCHRONOUS)

write_type = DefaultWriteOptions.write_type.value
write_precision = DefaultWriteOptions.write_precision.value
gzip_threshold = None
if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
write_opts = write_client_options['write_options']
write_type = getattr(write_opts, 'write_type', write_type)
write_precision = getattr(write_opts, 'write_precision', write_precision)
gzip_threshold = getattr(write_opts, 'gzip_threshold')

write_options = WriteOptions(
write_type=write_type,
write_precision=write_precision,
gzip_threshold=gzip_threshold,
enable_gzip=kwargs.get('enable_gzip', False)
)

self._write_client_options = {
"write_options": write_options,
**(write_client_options or {})
}

# Parse the host input
parsed_url = urllib.parse.urlparse(host)
Expand All @@ -155,6 +239,8 @@
url=f"{scheme}://{hostname}:{port}",
token=self._token,
org=self._org,
enable_gzip=write_options.enable_gzip,
gzip_threshold=write_options.gzip_threshold,
**kwargs)

self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
Expand All @@ -178,6 +264,104 @@
flight_client_options=flight_client_options,
proxy=kwargs.get("proxy", None), options=q_opts_builder.build())

@classmethod
def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':

"""
Creates an instance of InfluxDBClient3 configured by specific environment
variables. This method automatically loads configuration settings,
such as connection details, security parameters, and performance
options, from environment variables and initializes the client
accordingly.

:param cls:
The class used to create the client instance.
:param kwargs:
Additional optional parameters that can be passed to customize the
configuration or override specific settings derived from the
environment variables.

:raises ValueError:
If any required environment variables are missing or have empty
values.

:return:
An initialized instance of the `InfluxDBClient3` class with all the
configuration settings applied.
:rtype:
InfluxDBClient3
"""

required_vars = {
INFLUX_HOST: os.getenv(INFLUX_HOST),
INFLUX_TOKEN: os.getenv(INFLUX_TOKEN),
INFLUX_DATABASE: os.getenv(INFLUX_DATABASE)
}
missing_vars = [var for var, value in required_vars.items() if value is None or value == ""]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")

write_options = WriteOptions(write_type=WriteType.synchronous)

gzip_threshold = os.getenv(INFLUX_GZIP_THRESHOLD)
if gzip_threshold is not None:
write_options.gzip_threshold = _parse_gzip_threshold(gzip_threshold)
write_options.enable_gzip = True

precision = os.getenv(INFLUX_PRECISION)
if precision is not None:
write_options.write_precision = _parse_precision(precision)

write_client_option = {'write_options': write_options}

if os.getenv(INFLUX_AUTH_SCHEME) is not None:
kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)

timeout = os.getenv(INFLUX_TIMEOUT)
if timeout is not None:
kwargs['timeout'] = int(timeout)

ssl_ca_cert = os.getenv(INFLUX_SSL_CA_CERT)
if ssl_ca_cert is not None:
kwargs['ssl_ca_cert'] = ssl_ca_cert

cert_file = os.getenv(INFLUX_CERT_FILE)
if cert_file is not None:
kwargs['cert_file'] = cert_file

cert_key_file = os.getenv(INFLUX_CERT_KEY_FILE)
if cert_key_file is not None:
kwargs['cert_key_file'] = cert_key_file

cert_key_password = os.getenv(INFLUX_CERT_KEY_PASSWORD)
if cert_key_password is not None:
kwargs['cert_key_password'] = cert_key_password

connection_pool_maxsize = os.getenv(INFLUX_CONNECTION_POOL_MAXSIZE)
if connection_pool_maxsize is not None:
kwargs['connection_pool_maxsize'] = int(connection_pool_maxsize)

profilers = os.getenv(INFLUX_PROFILERS)
if profilers is not None:
kwargs['profilers'] = [x.strip() for x in profilers.split(',')]

default_tags = dict()
for key, value in os.environ.items():
if key.startswith("{0}_".format(INFLUX_TAG)):
default_tags[key[11:].lower()] = value
kwargs['default_tags'] = default_tags

kwargs['verify_ssl'] = bool(os.getenv(INFLUX_VERIFY_SSL, 'True').lower() in ['True', 'true'])
org = os.getenv(INFLUX_ORG, "default")
return InfluxDBClient3(
host=required_vars[INFLUX_HOST],
token=required_vars[INFLUX_TOKEN],
database=required_vars[INFLUX_DATABASE],
write_client_options=write_client_option,
org=org,
**kwargs
)

def write(self, record=None, database=None, **kwargs):
"""
Write data to InfluxDB.
Expand Down
51 changes: 32 additions & 19 deletions influxdb_client_3/write_client/_sync/api_client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
# coding: utf-8
"""
InfluxDB OSS API Service.

The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501

OpenAPI spec version: 2.0.0
Generated by: https://openapi-generator.tech
"""

from __future__ import absolute_import

Expand All @@ -28,17 +20,7 @@


class ApiClient(object):
"""Generic API client for OpenAPI client library Build.

OpenAPI generic API client. This client handles the client-
server communication, and is invariant across implementations. Specifics of
the methods and models for each application are generated from the OpenAPI
templates.

NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.

"""
:param configuration: .Configuration object for this client
:param header_name: a header to pass when making calls to the API.
:param header_value: a header value to pass when making calls to
Expand Down Expand Up @@ -120,6 +102,10 @@ def __call_api(
config = self.configuration
self._signin(resource_path=resource_path)

gzip_threshold = config.gzip_threshold
enable_gzip = config.enable_gzip
self.should_compress = self.check_should_compress(body, gzip_threshold, enable_gzip)

# header parameters
header_params = header_params or {}
config.update_request_header_params(resource_path, header_params)
Expand Down Expand Up @@ -192,6 +178,33 @@ def __call_api(
return (return_data, response_data.status,
response_data.getheaders())

def check_should_compress(self, body: bytearray, gzip_threshold: int, enable_gzip: bool) -> bool:
"""
Determines whether the given body should be compressed based on its size,
a defined threshold for compression, and a flag indicating whether
compression is enabled.

This function evaluates whether the body meets the required criteria for
compression. Compression may be enabled explicitly or conditionally
based on the body size exceeding the provided threshold.

:param body: The content to be evaluated for compression.
:type body: bytearray
:param gzip_threshold: The minimum size threshold for compression to be applied.
:type gzip_threshold: int
:param enable_gzip: A flag indicating whether gzip compression is enabled.
It can explicitly enable or disable compression, or conditionally
allow compression if the body size exceeds the threshold.
:type enable_gzip: bool
:return: Returns True if the body meets the criteria for compression;
otherwise, returns False.
:rtype: bool
"""
body_size = len(body)
if enable_gzip is True or (enable_gzip is not False and (gzip_threshold and body_size >= gzip_threshold)):
return True
return False

def sanitize_for_serialization(self, obj):
"""Build a JSON POST object.

Expand Down
20 changes: 0 additions & 20 deletions influxdb_client_3/write_client/_sync/rest.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
# coding: utf-8

"""
InfluxDB OSS API Service.

The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501

OpenAPI spec version: 2.0.0
Generated by: https://openapi-generator.tech
"""


from __future__ import absolute_import

import io
Expand All @@ -28,11 +18,6 @@


class RESTResponse(io.IOBase):
"""NOTE: This class is auto generated by OpenAPI Generator.

Ref: https://openapi-generator.tech
Do not edit the class manually.
"""

def __init__(self, resp):
"""Initialize with HTTP response."""
Expand All @@ -51,11 +36,6 @@ def getheader(self, name, default=None):


class RESTClientObject(object):
"""NOTE: This class is auto generated by OpenAPI Generator.

Ref: https://openapi-generator.tech
Do not edit the class manually.
"""

def __init__(self, configuration, pools_size=4, maxsize=None, retries=False):
"""Initialize REST client."""
Expand Down
Loading
Loading