Skip to content
7 changes: 4 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ commands:
curl -Os https://uploader.codecov.io/latest/linux/codecov
curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM
curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM.sig
curl -s https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import
curl -fsSL https://uploader.codecov.io/verification.gpg | gpg --no-default-keyring --keyring trustedkeys.gpg --import
gpgv codecov.SHA256SUM.sig codecov.SHA256SUM
shasum -a 256 -c codecov.SHA256SUM
chmod +x ./codecov
Expand All @@ -81,13 +81,14 @@ commands:
- run:
name: Collecting coverage reports
command: |
curl -k https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import
curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov
curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov.SHA256SUM
curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov.SHA256SUM.sig
curl -fsSL https://uploader.codecov.io/verification.gpg | gpg --no-default-keyring --keyring trustedkeys.gpg --import
gpgv codecov.SHA256SUM.sig codecov.SHA256SUM
shasum -a 256 -c codecov.SHA256SUM
sudo chmod +x codecov
sudo chmod +x codecov
./codecov

jobs:
tests-python:
Expand Down
86 changes: 60 additions & 26 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import importlib.util
import json
import os
import urllib.parse
from typing import Any, List, Literal, Optional, TYPE_CHECKING

import pyarrow as pa

from influxdb_client_3.version import USER_AGENT
from influxdb_client_3.write_client._sync import rest_client as rest

if TYPE_CHECKING:
import pandas as pd
import polars as pl
Expand All @@ -14,7 +18,7 @@
from influxdb_client_3.exceptions import InfluxDBError
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
from influxdb_client_3.read_file import UploadFile
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
from influxdb_client_3.write_client import WriteOptions, Point
from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \
PointSettings, DefaultWriteOptions, WriteType
from influxdb_client_3.write_client.domain.write_precision import WritePrecision
Expand Down Expand Up @@ -185,10 +189,13 @@ def _parse_timeout(to: str) -> int:
class InfluxDBClient3:
def __init__(
self,
host=None,
host='localhost',
org=None,
database=None,
token=None,
auth_scheme=None,
enable_gzip=False,
gzip_threshold=None,
write_client_options=None,
flight_client_options=None,
write_port_overwrite=None,
Expand All @@ -212,6 +219,10 @@ def __init__(
:type flight_client_options: dict[str, any]
:param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False.
:type disable_grpc_compression: bool
:param enable_gzip: Enable GZIP compression for write requests.
:type enable_gzip: bool
:param gzip_threshold: Minimum payload size (bytes) to trigger GZIP when enable_gzip is True.
:type gzip_threshold: int
:key auth_scheme: token authentication scheme. Set to "Bearer" for Edge.
:key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
:key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
Expand Down Expand Up @@ -293,14 +304,45 @@ def __init__(
if write_port_overwrite is not None:
port = write_port_overwrite

self._client = _InfluxDBClient(
url=f"{scheme}://{hostname}:{port}",
# TODO fix retries
retries = None

auth_schema = 'Token' if auth_scheme is None else auth_scheme
default_header = {
'User-Agent': USER_AGENT
}
if self._token is not None:
default_header['Authorization'] = f'{auth_schema} {self._token}'
self.base_url = f"{scheme}://{hostname}:{port}"
self.default_header = default_header
self.rest_client = rest.RestClient(
base_url=self.base_url,
default_header=default_header,
verify_ssl=kwargs.get('verify_ssl', True),
ssl_ca_cert=kwargs.get('ssl_ca_cert', None),
cert_file=kwargs.get('cert_file', None),
cert_key_file=kwargs.get('cert_key_file', None),
cert_key_password=kwargs.get('cert_key_password', None),
ssl_context=kwargs.get('ssl_context', None),
proxy=kwargs.get('proxy', None),
proxy_headers=kwargs.get('proxy_headers', None),
retries=retries,
)

# TODO point_settings??

self._write_api = _WriteApi(
token=self._token,
bucket=self._database,
org=self._org,
gzip_threshold=gzip_threshold,
enable_gzip=enable_gzip,
auth_scheme=auth_scheme,
timeout=write_timeout,
**kwargs)

self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
default_header=default_header,
rest_client=self.rest_client,
**self._write_client_options
)

if query_port_overwrite is not None:
port = query_port_overwrite
Expand Down Expand Up @@ -658,32 +700,25 @@ async def query_async(self, query: str, language: str = "sql", mode: str = "all"
except ArrowException as e:
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")

def get_server_version(self) -> str:
def get_server_version(self) -> Optional[str]:
"""
Get the version of the connected InfluxDB server.
Get the influxdb_version of the connected InfluxDB server.

This method makes a ping request to the server and extracts the version information
This method makes a ping request to the server and extracts the influxdb_version information
from either the response headers or response body.

:return: The version string of the InfluxDB server.
:return: The influxdb_version string of the InfluxDB server.
:rtype: str
"""
version = None
(resp_body, _, header) = self._client.api_client.call_api(
resource_path="/ping",
method="GET",
response_type=object
)

for key, value in header.items():
resp = self.rest_client.request(url='/ping', method="GET", headers=self.default_header)
for key, value in resp.getheaders().items():
if key.lower() == "x-influxdb-version":
version = value
break

if version is None and isinstance(resp_body, dict):
version = resp_body['version']
return value

return version
string_body = resp.get_string_body()
if string_body is not None:
return json.loads(string_body)['version']
return None

def flush(self):
"""
Expand All @@ -702,7 +737,6 @@ def close(self):
"""Close the client and clean up resources."""
self._write_api.close()
self._query_api.close()
self._client.close()

def __enter__(self):
return self
Expand Down
10 changes: 2 additions & 8 deletions influxdb_client_3/write_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@

from __future__ import absolute_import

from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
from influxdb_client_3.write_client.client.influxdb_client import InfluxDBClient
from influxdb_client_3.write_client.client.logging_handler import InfluxLoggingHandler
from influxdb_client_3.version import VERSION
from influxdb_client_3.write_client.client.write.point import Point

from influxdb_client_3.write_client.service.write_service import WriteService

from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions
from influxdb_client_3.write_client.domain.write_precision import WritePrecision

from influxdb_client_3.write_client.configuration import Configuration
from influxdb_client_3.version import VERSION
__version__ = VERSION
Loading
Loading