diff --git a/.circleci/config.yml b/.circleci/config.yml index a1a7b583..56ab97aa 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -71,7 +71,7 @@ commands: curl -Os https://uploader.codecov.io/latest/linux/codecov curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM.sig - curl -s https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import + curl -fsSL https://uploader.codecov.io/verification.gpg | gpg --no-default-keyring --keyring trustedkeys.gpg --import gpgv codecov.SHA256SUM.sig codecov.SHA256SUM shasum -a 256 -c codecov.SHA256SUM chmod +x ./codecov @@ -81,13 +81,14 @@ commands: - run: name: Collecting coverage reports command: | - curl -k https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov.SHA256SUM curl -Os https://uploader.codecov.io/v0.8.0/aarch64/codecov.SHA256SUM.sig + curl -fsSL https://uploader.codecov.io/verification.gpg | gpg --no-default-keyring --keyring trustedkeys.gpg --import gpgv codecov.SHA256SUM.sig codecov.SHA256SUM shasum -a 256 -c codecov.SHA256SUM - sudo chmod +x codecov + sudo chmod +x codecov + ./codecov jobs: tests-python: diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 824d5b70..753a6ea2 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -1,10 +1,14 @@ import importlib.util +import json import os import urllib.parse from typing import Any, List, Literal, Optional, TYPE_CHECKING import pyarrow as pa +from influxdb_client_3.version import USER_AGENT +from influxdb_client_3.write_client._sync import rest_client as rest + if TYPE_CHECKING: import pandas as pd import polars as pl @@ -14,7 +18,7 @@ from influxdb_client_3.exceptions import InfluxDBError from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder from influxdb_client_3.read_file import UploadFile -from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point +from influxdb_client_3.write_client import WriteOptions, Point from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \ PointSettings, DefaultWriteOptions, WriteType from influxdb_client_3.write_client.domain.write_precision import WritePrecision @@ -185,10 +189,13 @@ def _parse_timeout(to: str) -> int: class InfluxDBClient3: def __init__( self, - host=None, + host='localhost', org=None, database=None, token=None, + auth_scheme=None, + enable_gzip=False, + gzip_threshold=None, write_client_options=None, flight_client_options=None, write_port_overwrite=None, @@ -212,6 +219,10 @@ def __init__( :type flight_client_options: dict[str, any] :param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False. :type disable_grpc_compression: bool + :param enable_gzip: Enable GZIP compression for write requests. + :type enable_gzip: bool + :param gzip_threshold: Minimum payload size (bytes) to trigger GZIP when enable_gzip is True. + :type gzip_threshold: int :key auth_scheme: token authentication scheme. Set to "Bearer" for Edge. :key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server. :key str ssl_ca_cert: Set this to customize the certificate file to verify the peer. @@ -293,14 +304,45 @@ def __init__( if write_port_overwrite is not None: port = write_port_overwrite - self._client = _InfluxDBClient( - url=f"{scheme}://{hostname}:{port}", + # TODO fix retries + retries = None + + auth_schema = 'Token' if auth_scheme is None else auth_scheme + default_header = { + 'User-Agent': USER_AGENT + } + if self._token is not None: + default_header['Authorization'] = f'{auth_schema} {self._token}' + self.base_url = f"{scheme}://{hostname}:{port}" + self.default_header = default_header + self.rest_client = rest.RestClient( + base_url=self.base_url, + default_header=default_header, + verify_ssl=kwargs.get('verify_ssl', True), + ssl_ca_cert=kwargs.get('ssl_ca_cert', None), + cert_file=kwargs.get('cert_file', None), + cert_key_file=kwargs.get('cert_key_file', None), + cert_key_password=kwargs.get('cert_key_password', None), + ssl_context=kwargs.get('ssl_context', None), + proxy=kwargs.get('proxy', None), + proxy_headers=kwargs.get('proxy_headers', None), + retries=retries, + ) + + # TODO point_settings?? + + self._write_api = _WriteApi( token=self._token, + bucket=self._database, org=self._org, + gzip_threshold=gzip_threshold, + enable_gzip=enable_gzip, + auth_scheme=auth_scheme, timeout=write_timeout, - **kwargs) - - self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options) + default_header=default_header, + rest_client=self.rest_client, + **self._write_client_options + ) if query_port_overwrite is not None: port = query_port_overwrite @@ -658,32 +700,25 @@ async def query_async(self, query: str, language: str = "sql", mode: str = "all" except ArrowException as e: raise InfluxDB3ClientQueryError(f"Error while executing query: {e}") - def get_server_version(self) -> str: + def get_server_version(self) -> Optional[str]: """ - Get the version of the connected InfluxDB server. + Get the influxdb_version of the connected InfluxDB server. - This method makes a ping request to the server and extracts the version information + This method makes a ping request to the server and extracts the influxdb_version information from either the response headers or response body. - :return: The version string of the InfluxDB server. + :return: The influxdb_version string of the InfluxDB server. :rtype: str """ - version = None - (resp_body, _, header) = self._client.api_client.call_api( - resource_path="/ping", - method="GET", - response_type=object - ) - - for key, value in header.items(): + resp = self.rest_client.request(url='/ping', method="GET", headers=self.default_header) + for key, value in resp.getheaders().items(): if key.lower() == "x-influxdb-version": - version = value - break - - if version is None and isinstance(resp_body, dict): - version = resp_body['version'] + return value - return version + string_body = resp.get_string_body() + if string_body is not None: + return json.loads(string_body)['version'] + return None def flush(self): """ @@ -702,7 +737,6 @@ def close(self): """Close the client and clean up resources.""" self._write_api.close() self._query_api.close() - self._client.close() def __enter__(self): return self diff --git a/influxdb_client_3/write_client/__init__.py b/influxdb_client_3/write_client/__init__.py index feae8448..cd5433a3 100644 --- a/influxdb_client_3/write_client/__init__.py +++ b/influxdb_client_3/write_client/__init__.py @@ -4,15 +4,9 @@ from __future__ import absolute_import -from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions -from influxdb_client_3.write_client.client.influxdb_client import InfluxDBClient -from influxdb_client_3.write_client.client.logging_handler import InfluxLoggingHandler +from influxdb_client_3.version import VERSION from influxdb_client_3.write_client.client.write.point import Point - -from influxdb_client_3.write_client.service.write_service import WriteService - +from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions from influxdb_client_3.write_client.domain.write_precision import WritePrecision -from influxdb_client_3.write_client.configuration import Configuration -from influxdb_client_3.version import VERSION __version__ = VERSION diff --git a/influxdb_client_3/write_client/_sync/api_client.py b/influxdb_client_3/write_client/_sync/api_client.py deleted file mode 100644 index 6064c2dc..00000000 --- a/influxdb_client_3/write_client/_sync/api_client.py +++ /dev/null @@ -1,662 +0,0 @@ -# coding: utf-8 - -from __future__ import absolute_import - -import datetime -import json -import mimetypes -import os -import re -import tempfile -from multiprocessing.pool import ThreadPool -from urllib.parse import quote - -import influxdb_client_3.write_client.domain -from influxdb_client_3.write_client._sync import rest -from influxdb_client_3.write_client.configuration import Configuration - - -class ApiClient(object): - """ - :param configuration: .Configuration object for this client - :param header_name: a header to pass when making calls to the API. - :param header_value: a header value to pass when making calls to - the API. - :param pool_threads: The number of threads to use for async requests - to the API. More threads means more concurrent API requests. - """ - - PRIMITIVE_TYPES = (float, bool, bytes, str, int) - NATIVE_TYPES_MAPPING = { - 'int': int, - 'long': int, - 'float': float, - 'str': str, - 'bool': bool, - 'date': datetime.date, - 'datetime': datetime.datetime, - 'object': object, - } - _pool = None - - def __init__(self, configuration=None, header_name=None, header_value=None, - pool_threads=None, retries=False): - """Initialize generic API client.""" - if configuration is None: - configuration = Configuration() - self.configuration = configuration - self.pool_threads = pool_threads - - self.rest_client = rest.RESTClientObject(configuration, retries=retries) - self.default_headers = {} - if header_name is not None: - self.default_headers[header_name] = header_value - # Set default User-Agent. - from influxdb_client_3.version import USER_AGENT - self.user_agent = USER_AGENT - - def __del__(self): - """Dispose pools.""" - if self._pool: - self._pool.close() - self._pool.join() - self._pool = None - if self.rest_client and self.rest_client.pool_manager and hasattr(self.rest_client.pool_manager, 'clear'): - self.rest_client.pool_manager.clear() - - @property - def pool(self): - """Create thread pool on first request avoids instantiating unused threadpool for blocking clients.""" - if self._pool is None: - self._pool = ThreadPool(self.pool_threads) - return self._pool - - @property - def user_agent(self): - """User agent for this API client.""" - return self.default_headers['User-Agent'] - - @user_agent.setter - def user_agent(self, value): - """Set User agent for this API client.""" - self.default_headers['User-Agent'] = value - - def set_default_header(self, header_name, header_value): - """Set HTTP header for this API client.""" - self.default_headers[header_name] = header_value - - @staticmethod - def should_gzip(payload: str, enable_gzip: bool = False, gzip_threshold: int = None) -> bool: - """ - Determines whether gzip compression should be applied to the given payload based - on the specified conditions. This method evaluates the `enable_gzip` flag and - considers the size of the payload in relation to the optional `gzip_threshold`. - If `enable_gzip` is set to True and no threshold is provided, gzip compression - is advised without any size condition. If a threshold is specified, compression - is applied only when the size of the payload meets or exceeds the threshold. - By default, no compression is performed if `enable_gzip` is False. - - :param payload: The payload data as a string for which gzip determination is to - be made. - :type payload: str - :param enable_gzip: A flag indicating whether gzip compression is enabled. By - default, this flag is False. - :type enable_gzip: bool, optional - :param gzip_threshold: Optional threshold specifying the minimum size (in bytes) - of the payload to trigger gzip compression. Only considered if - `enable_gzip` is True. - :type gzip_threshold: int, optional - :return: A boolean value indicating True if gzip compression should be applied - based on the payload size, the enable_gzip flag, and the gzip_threshold. - :rtype: bool - """ - if enable_gzip is not False: - if gzip_threshold is not None: - payload_size = len(payload.encode('utf-8')) - return payload_size >= gzip_threshold - if enable_gzip is True: - return True - - return False - - def __call_api( - self, resource_path, method, path_params=None, - query_params=None, header_params=None, body=None, post_params=None, - files=None, response_type=None, auth_settings=None, - _return_http_data_only=None, collection_formats=None, - _preload_content=True, _request_timeout=None, urlopen_kw=None): - - config = self.configuration - - # body - should_gzip = False - if body: - should_gzip = self.should_gzip(body, config.enable_gzip, config.gzip_threshold) - body = self.sanitize_for_serialization(body) - body = config.update_request_body(resource_path, body, should_gzip) - - # header parameters - header_params = header_params or {} - config.update_request_header_params(resource_path, header_params, should_gzip) - header_params.update(self.default_headers) - if header_params: - header_params = self.sanitize_for_serialization(header_params) - header_params = dict(self.parameters_to_tuples(header_params, - collection_formats)) - - # path parameters - if path_params: - path_params = self.sanitize_for_serialization(path_params) - path_params = self.parameters_to_tuples(path_params, - collection_formats) - for k, v in path_params: - # specified safe chars, encode everything - resource_path = resource_path.replace( - '{%s}' % k, - quote(str(v), safe=config.safe_chars_for_path_param) - ) - - # query parameters - if query_params: - query_params = self.sanitize_for_serialization(query_params) - query_params = self.parameters_to_tuples(query_params, - collection_formats) - - # post parameters - if post_params or files: - post_params = self.prepare_post_parameters(post_params, files) - post_params = self.sanitize_for_serialization(post_params) - post_params = self.parameters_to_tuples(post_params, - collection_formats) - - # auth setting - self.update_params_for_auth(header_params, query_params, auth_settings) - - # request url - url = self.configuration.host + resource_path - - urlopen_kw = urlopen_kw or {} - - # perform request and return response - response_data = self.request( - method, url, query_params=query_params, headers=header_params, - post_params=post_params, body=body, - _preload_content=_preload_content, - _request_timeout=_request_timeout, **urlopen_kw) - - self.last_response = response_data - - return_data = response_data - if _preload_content: - # deserialize response data - if response_type: - return_data = self.deserialize(response_data, response_type) - else: - return_data = None - - if _return_http_data_only: - return (return_data) - else: - return (return_data, response_data.status, - response_data.getheaders()) - - def sanitize_for_serialization(self, obj): - """Build a JSON POST object. - - If obj is None, return None. - If obj is str, int, long, float, bool, return directly. - If obj is datetime.datetime, datetime.date - convert to string in iso8601 format. - If obj is list, sanitize each element in the list. - If obj is dict, return the dict. - If obj is OpenAPI model, return the properties dict. - - :param obj: The data to serialize. - :return: The serialized form of data. - """ - if obj is None: - return None - elif isinstance(obj, self.PRIMITIVE_TYPES): - return obj - elif isinstance(obj, list): - return [self.sanitize_for_serialization(sub_obj) - for sub_obj in obj] - elif isinstance(obj, tuple): - return tuple(self.sanitize_for_serialization(sub_obj) - for sub_obj in obj) - elif isinstance(obj, (datetime.datetime, datetime.date)): - return obj.isoformat() - - if isinstance(obj, dict): - obj_dict = obj - else: - # Convert model obj to dict except - # attributes `openapi_types`, `attribute_map` - # and attributes which value is not None. - # Convert attribute name to json key in - # model definition for request. - obj_dict = {obj.attribute_map[attr]: getattr(obj, attr) - for attr, _ in obj.openapi_types.items() - if getattr(obj, attr) is not None} - - return {key: self.sanitize_for_serialization(val) - for key, val in obj_dict.items()} - - def deserialize(self, response, response_type): - """Deserializes response into an object. - - :param response: RESTResponse object to be deserialized. - :param response_type: class literal for - deserialized object, or string of class name. - - :return: deserialized object. - """ - # handle file downloading - # save response body into a tmp file and return the instance - if response_type == "file": - return self.__deserialize_file(response) - - # fetch data from response object - try: - data = json.loads(response.data) - except ValueError: - data = response.data - - return self.__deserialize(data, response_type) - - def __deserialize(self, data, klass): - """Deserializes dict, list, str into an object. - - :param data: dict, list or str. - :param klass: class literal, or string of class name. - - :return: object. - """ - if data is None: - return None - - if klass is str: - if klass.startswith('list['): - sub_kls = re.match(r'list\[(.*)\]', klass).group(1) - return [self.__deserialize(sub_data, sub_kls) - for sub_data in data] - - if klass.startswith('dict('): - sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2) - return {k: self.__deserialize(v, sub_kls) - for k, v in data.items()} - - # convert str to class - if klass in self.NATIVE_TYPES_MAPPING: - klass = self.NATIVE_TYPES_MAPPING[klass] - else: - klass = getattr(influxdb_client_3.write_client.domain, klass) - - if klass in self.PRIMITIVE_TYPES: - return self.__deserialize_primitive(data, klass) - elif klass == object: - return self.__deserialize_object(data) - elif klass == datetime.date: - return self.__deserialize_date(data) - elif klass == datetime.datetime: - return self.__deserialize_datatime(data) - else: - return self.__deserialize_model(data, klass) - - def call_api(self, resource_path, method, - path_params=None, query_params=None, header_params=None, - body=None, post_params=None, files=None, - response_type=None, auth_settings=None, async_req=None, - _return_http_data_only=None, collection_formats=None, - _preload_content=True, _request_timeout=None, urlopen_kw=None): - """Make the HTTP request (synchronous) and Return deserialized data. - - To make an async_req request, set the async_req parameter. - - :param resource_path: Path to method endpoint. - :param method: Method to call. - :param path_params: Path parameters in the url. - :param query_params: Query parameters in the url. - :param header_params: Header parameters to be - placed in the request header. - :param body: Request body. - :param post_params dict: Request post form parameters, - for `application/x-www-form-urlencoded`, `multipart/form-data`. - :param auth_settings list: Auth Settings names for the request. - :param response: Response data type. - :param files dict: key -> filename, value -> filepath, - for `multipart/form-data`. - :param async_req bool: execute request asynchronously - :param _return_http_data_only: response data without head status code - and headers - :param collection_formats: dict of collection formats for path, query, - header, and post parameters. - :param _preload_content: if False, the urllib3.HTTPResponse object will - be returned without reading/decoding response - data. Default is True. - :param _request_timeout: timeout setting for this request. If one - number provided, it will be total request - timeout. It can also be a pair (tuple) of - (connection, read) timeouts. - :param urlopen_kw: Additional parameters are passed to - :meth:`urllib3.request.RequestMethods.request` - :return: - If async_req parameter is True, - the request will be called asynchronously. - The method will return the request thread. - If parameter async_req is False or missing, - then the method will return the response directly. - """ - if not async_req: - return self.__call_api(resource_path, method, - path_params, query_params, header_params, - body, post_params, files, - response_type, auth_settings, - _return_http_data_only, collection_formats, - _preload_content, _request_timeout, urlopen_kw) - else: - # TODO possible refactor - async handler inside package `_sync`? - thread = self.pool.apply_async(self.__call_api, (resource_path, - method, path_params, query_params, - header_params, body, - post_params, files, - response_type, auth_settings, - _return_http_data_only, - collection_formats, - _preload_content, _request_timeout, urlopen_kw)) - return thread - - def request(self, method, url, query_params=None, headers=None, - post_params=None, body=None, _preload_content=True, - _request_timeout=None, **urlopen_kw): - """Make the HTTP request using RESTClient.""" - if method == "GET": - return self.rest_client.GET(url, - query_params=query_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - headers=headers, - **urlopen_kw) - elif method == "HEAD": - return self.rest_client.HEAD(url, - query_params=query_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - headers=headers, - **urlopen_kw) - elif method == "OPTIONS": - return self.rest_client.OPTIONS(url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - **urlopen_kw) - elif method == "POST": - return self.rest_client.POST(url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - **urlopen_kw) - elif method == "PUT": - return self.rest_client.PUT(url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - **urlopen_kw) - elif method == "PATCH": - return self.rest_client.PATCH(url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - **urlopen_kw) - elif method == "DELETE": - return self.rest_client.DELETE(url, - query_params=query_params, - headers=headers, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - **urlopen_kw) - else: - raise ValueError( - "http method must be `GET`, `HEAD`, `OPTIONS`," - " `POST`, `PATCH`, `PUT` or `DELETE`." - ) - - def parameters_to_tuples(self, params, collection_formats): - """Get parameters as list of tuples, formatting collections. - - :param params: Parameters as dict or list of two-tuples - :param dict collection_formats: Parameter collection formats - :return: Parameters as list of tuples, collections formatted - """ - new_params = [] - if collection_formats is None: - collection_formats = {} - for k, v in params.items() if isinstance(params, dict) else params: # noqa: E501 - if k in collection_formats: - collection_format = collection_formats[k] - if collection_format == 'multi': - new_params.extend((k, value) for value in v) - else: - if collection_format == 'ssv': - delimiter = ' ' - elif collection_format == 'tsv': - delimiter = '\t' - elif collection_format == 'pipes': - delimiter = '|' - else: # csv is the default - delimiter = ',' - new_params.append( - (k, delimiter.join(str(value) for value in v))) - else: - new_params.append((k, v)) - return new_params - - def prepare_post_parameters(self, post_params=None, files=None): - """Build form parameters. - - :param post_params: Normal form parameters. - :param files: File parameters. - :return: Form parameters with files. - """ - params = [] - - if post_params: - params = post_params - - if files: - for k, v in files.items(): - if not v: - continue - file_names = v if type(v) is list else [v] - for n in file_names: - with open(n, 'rb') as f: - filename = os.path.basename(f.name) - filedata = f.read() - mimetype = (mimetypes.guess_type(filename)[0] or - 'application/octet-stream') - params.append( - tuple([k, tuple([filename, filedata, mimetype])])) - - return params - - def select_header_accept(self, accepts): - """Return `Accept` based on an array of accepts provided. - - :param accepts: List of headers. - :return: Accept (e.g. application/json). - """ - if not accepts: - return - - accepts = [x.lower() for x in accepts] - - if 'application/json' in accepts: - return 'application/json' - else: - return ', '.join(accepts) - - def select_header_content_type(self, content_types): - """Return `Content-Type` based on an array of content_types provided. - - :param content_types: List of content-types. - :return: Content-Type (e.g. application/json). - """ - if not content_types: - return 'application/json' - - content_types = [x.lower() for x in content_types] - - if 'application/json' in content_types or '*/*' in content_types: - return 'application/json' - else: - return content_types[0] - - def update_params_for_auth(self, headers, querys, auth_settings): - """Update header and query params based on authentication setting. - - :param headers: Header parameters dict to be updated. - :param querys: Query parameters tuple list to be updated. - :param auth_settings: Authentication setting identifiers list. - """ - if not auth_settings: - return - - for auth in auth_settings: - auth_setting = self.configuration.auth_settings().get(auth) - if auth_setting: - if not auth_setting['value']: - continue - elif auth_setting['in'] == 'header': - headers[auth_setting['key']] = auth_setting['value'] - elif auth_setting['in'] == 'query': - querys.append((auth_setting['key'], auth_setting['value'])) - else: - raise ValueError( - 'Authentication token must be in `query` or `header`' - ) - - def __deserialize_file(self, response): - """Deserializes body to file. - - Saves response body into a file in a temporary folder, - using the filename from the `Content-Disposition` header if provided. - - :param response: RESTResponse. - :return: file path. - """ - fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path) - os.close(fd) - os.remove(path) - - content_disposition = response.getheader("Content-Disposition") - if content_disposition: - filename = re.search(r'filename=[\'"]?([^\'"\s]+)[\'"]?', - content_disposition).group(1) - path = os.path.join(os.path.dirname(path), filename) - - with open(path, "wb") as f: - f.write(response.data) - - return path - - def __deserialize_primitive(self, data, klass): - """Deserializes string to primitive type. - - :param data: str. - :param klass: class literal. - - :return: int, long, float, str, bool. - """ - try: - return klass(data) - except UnicodeEncodeError: - return str(data) - except TypeError: - return data - - def __deserialize_object(self, value): - """Return an original value. - - :return: object. - """ - return value - - def __deserialize_date(self, string): - """Deserializes string to date. - - :param string: str. - :return: date. - """ - try: - from dateutil.parser import parse - return parse(string).date() - except ImportError: - return string - except ValueError: - raise rest.ApiException( - status=0, - reason="Failed to parse `{0}` as date object".format(string) - ) - - def __deserialize_datatime(self, string): - """Deserializes string to datetime. - - The string should be in iso8601 datetime format. - - :param string: str. - :return: datetime. - """ - try: - from dateutil.parser import parse - return parse(string) - except ImportError: - return string - except ValueError: - raise rest.ApiException( - status=0, - reason=( - "Failed to parse `{0}` as datetime object" - .format(string) - ) - ) - - def __deserialize_model(self, data, klass): - """Deserializes list or dict to model. - - :param data: dict, list. - :param klass: class literal. - :return: model object. - """ - if not klass.openapi_types and not hasattr(klass, - 'get_real_child_model'): - return data - - kwargs = {} - if klass.openapi_types is not None: - for attr, attr_type in klass.openapi_types.items(): - if (data is not None and - klass.attribute_map[attr] in data and - isinstance(data, (list, dict))): - value = data[klass.attribute_map[attr]] - kwargs[attr] = self.__deserialize(value, attr_type) - - instance = klass(**kwargs) - - if hasattr(instance, 'get_real_child_model'): - klass_name = instance.get_real_child_model(data) - if klass_name: - instance = self.__deserialize(data, klass_name) - return instance diff --git a/influxdb_client_3/write_client/_sync/rest.py b/influxdb_client_3/write_client/_sync/rest.py deleted file mode 100644 index f4d52991..00000000 --- a/influxdb_client_3/write_client/_sync/rest.py +++ /dev/null @@ -1,335 +0,0 @@ -# coding: utf-8 - -from __future__ import absolute_import - -import io -import json -import re -import ssl -from urllib.parse import urlencode - -from influxdb_client_3.write_client.rest import ApiException -from influxdb_client_3.write_client.rest import _BaseRESTClient - -try: - import urllib3 -except ImportError: - raise ImportError('OpenAPI Python client requires urllib3.') - - -class RESTResponse(io.IOBase): - - def __init__(self, resp): - """Initialize with HTTP response.""" - self.urllib3_response = resp - self.status = resp.status - self.reason = resp.reason - self.data = resp.data - - def getheaders(self): - """Return a dictionary of the response headers.""" - return self.urllib3_response.headers - - def getheader(self, name, default=None): - """Return a given response header.""" - return self.urllib3_response.headers.get(name, default) - - -class RESTClientObject(object): - - def __init__(self, configuration, pools_size=4, maxsize=None, retries=False): - """Initialize REST client.""" - # urllib3.PoolManager will pass all kw parameters to connectionpool - # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501 - # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501 - # maxsize is the number of requests to host that are allowed in parallel # noqa: E501 - # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501 - - self.configuration = configuration - self.pools_size = pools_size - self.maxsize = maxsize - self.retries = retries - - # cert_reqs - if configuration.verify_ssl: - cert_reqs = ssl.CERT_REQUIRED - else: - cert_reqs = ssl.CERT_NONE - - # ca_certs - if configuration.ssl_ca_cert: - ca_certs = configuration.ssl_ca_cert - else: - ca_certs = None - - addition_pool_args = {} - if configuration.assert_hostname is not None: - addition_pool_args['assert_hostname'] = configuration.assert_hostname # noqa: E501 - addition_pool_args['retries'] = self.retries - - if maxsize is None: - if configuration.connection_pool_maxsize is not None: - maxsize = configuration.connection_pool_maxsize - else: - maxsize = 4 - - # https pool manager - if configuration.proxy: - self.pool_manager = urllib3.ProxyManager( - num_pools=pools_size, - maxsize=maxsize, - cert_reqs=cert_reqs, - ca_certs=ca_certs, - cert_file=configuration.cert_file, - key_file=configuration.cert_key_file, - key_password=configuration.cert_key_password, - proxy_url=configuration.proxy, - proxy_headers=configuration.proxy_headers, - ssl_context=configuration.ssl_context, - **addition_pool_args - ) - else: - self.pool_manager = urllib3.PoolManager( - num_pools=pools_size, - maxsize=maxsize, - cert_reqs=cert_reqs, - ca_certs=ca_certs, - cert_file=configuration.cert_file, - key_file=configuration.cert_key_file, - key_password=configuration.cert_key_password, - ssl_context=configuration.ssl_context, - **addition_pool_args - ) - - def request(self, method, url, query_params=None, headers=None, - body=None, post_params=None, _preload_content=True, - _request_timeout=None, **urlopen_kw): - """Perform requests. - - :param method: http request method - :param url: http request url - :param query_params: query parameters in the url - :param headers: http request headers - :param body: request json body, for `application/json` - :param post_params: request post parameters, - `application/x-www-form-urlencoded` - and `multipart/form-data` - :param _preload_content: if False, the urllib3.HTTPResponse object will - be returned without reading/decoding response - data. Default is True. - :param _request_timeout: timeout setting for this request. If one - number provided, it will be total request - timeout. It can also be a pair (tuple) of - (connection, read) timeouts. - :param urlopen_kw: Additional parameters are passed to - :meth:`urllib3.request.RequestMethods.request` - """ - method = method.upper() - assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT', - 'PATCH', 'OPTIONS'] - - if post_params and body: - raise ValueError( - "body parameter cannot be used with post_params parameter." - ) - - post_params = post_params or {} - headers = headers or {} - - timeout = None - _configured_timeout = _request_timeout or self.configuration.timeout - if _configured_timeout: - if isinstance(_configured_timeout, (int, float, )): # noqa: E501,F821 - timeout = urllib3.Timeout(total=_configured_timeout / 1_000) - elif (isinstance(_configured_timeout, tuple) and - len(_configured_timeout) == 2): - timeout = urllib3.Timeout( - connect=_configured_timeout[0] / 1_000, read=_configured_timeout[1] / 1_000) - - if 'Content-Type' not in headers: - headers['Content-Type'] = 'application/json' - - if self.configuration.debug: - _BaseRESTClient.log_request(method, f"{url}?{urlencode(query_params)}") - _BaseRESTClient.log_headers(headers, '>>>') - _BaseRESTClient.log_body(body, '>>>') - - try: - # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` - if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: - if query_params: - url += '?' + urlencode(query_params) - if re.search('json', headers['Content-Type'], re.IGNORECASE): - request_body = None - if body is not None: - request_body = json.dumps(body) - r = self.pool_manager.request( - method, url, - body=request_body, - preload_content=_preload_content, - timeout=timeout, - headers=headers, - **urlopen_kw) - elif headers['Content-Type'] == 'application/x-www-form-urlencoded': # noqa: E501 - r = self.pool_manager.request( - method, url, - fields=post_params, - encode_multipart=False, - preload_content=_preload_content, - timeout=timeout, - headers=headers, - **urlopen_kw) - elif headers['Content-Type'] == 'multipart/form-data': - # must del headers['Content-Type'], or the correct - # Content-Type which generated by urllib3 will be - # overwritten. - del headers['Content-Type'] - r = self.pool_manager.request( - method, url, - fields=post_params, - encode_multipart=True, - preload_content=_preload_content, - timeout=timeout, - headers=headers, - **urlopen_kw) - # Pass a `string` parameter directly in the body to support - # other content types than Json when `body` argument is - # provided in serialized form - elif isinstance(body, str) or isinstance(body, bytes): - request_body = body - r = self.pool_manager.request( - method, url, - body=request_body, - preload_content=_preload_content, - timeout=timeout, - headers=headers, - **urlopen_kw) - else: - # Cannot generate the request from given parameters - msg = """Cannot prepare a request message for provided - arguments. Please check that your arguments match - declared content type.""" - raise ApiException(status=0, reason=msg) - # For `GET`, `HEAD` - else: - r = self.pool_manager.request(method, url, - fields=query_params, - preload_content=_preload_content, - timeout=timeout, - headers=headers, - **urlopen_kw) - except urllib3.exceptions.SSLError as e: - msg = "{0}\n{1}".format(type(e).__name__, str(e)) - raise ApiException(status=0, reason=msg) - - if _preload_content: - r = RESTResponse(r) - - # In the python 3, the response.data is bytes. - # we need to decode it to string. - r.data = r.data.decode('utf8') - - if self.configuration.debug: - _BaseRESTClient.log_response(r.status) - if hasattr(r, 'headers'): - _BaseRESTClient.log_headers(r.headers, '<<<') - if hasattr(r, 'urllib3_response'): - _BaseRESTClient.log_headers(r.urllib3_response.headers, '<<<') - _BaseRESTClient.log_body(r.data, '<<<') - - if not 200 <= r.status <= 299: - raise ApiException(http_resp=r) - - return r - - def GET(self, url, headers=None, query_params=None, _preload_content=True, - _request_timeout=None, **urlopen_kw): - """Perform GET HTTP request.""" - return self.request("GET", url, - headers=headers, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - query_params=query_params, - **urlopen_kw) - - def HEAD(self, url, headers=None, query_params=None, _preload_content=True, - _request_timeout=None, **urlopen_kw): - """Perform HEAD HTTP request.""" - return self.request("HEAD", url, - headers=headers, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - query_params=query_params, - **urlopen_kw) - - def OPTIONS(self, url, headers=None, query_params=None, post_params=None, - body=None, _preload_content=True, _request_timeout=None, **urlopen_kw): - """Perform OPTIONS HTTP request.""" - return self.request("OPTIONS", url, - headers=headers, - query_params=query_params, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - **urlopen_kw) - - def DELETE(self, url, headers=None, query_params=None, body=None, - _preload_content=True, _request_timeout=None, **urlopen_kw): - """Perform DELETE HTTP request.""" - return self.request("DELETE", url, - headers=headers, - query_params=query_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - **urlopen_kw) - - def POST(self, url, headers=None, query_params=None, post_params=None, - body=None, _preload_content=True, _request_timeout=None, **urlopen_kw): - """Perform POST HTTP request.""" - return self.request("POST", url, - headers=headers, - query_params=query_params, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - **urlopen_kw) - - def PUT(self, url, headers=None, query_params=None, post_params=None, - body=None, _preload_content=True, _request_timeout=None, **urlopen_kw): - """Perform PUT HTTP request.""" - return self.request("PUT", url, - headers=headers, - query_params=query_params, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - **urlopen_kw) - - def PATCH(self, url, headers=None, query_params=None, post_params=None, - body=None, _preload_content=True, _request_timeout=None, **urlopen_kw): - """Perform PATCH HTTP request.""" - return self.request("PATCH", url, - headers=headers, - query_params=query_params, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - **urlopen_kw) - - def __getstate__(self): - """Return a dict of attributes that you want to pickle.""" - state = self.__dict__.copy() - # Remove Pool managaer - del state['pool_manager'] - return state - - def __setstate__(self, state): - """Set your object with the provided dict.""" - self.__dict__.update(state) - # Init Pool manager - self.__init__(self.configuration, self.pools_size, self.maxsize, self.retries) diff --git a/influxdb_client_3/write_client/_sync/rest_client.py b/influxdb_client_3/write_client/_sync/rest_client.py new file mode 100644 index 00000000..5021bb5f --- /dev/null +++ b/influxdb_client_3/write_client/_sync/rest_client.py @@ -0,0 +1,191 @@ +# coding: utf-8 + +from __future__ import absolute_import + +import io +import multiprocessing +import ssl +from urllib.parse import urlencode + +from influxdb_client_3.write_client.rest import ApiException + +try: + import urllib3 +except ImportError: + raise ImportError('OpenAPI Python client requires urllib3.') + + +class RESTResponse(io.IOBase): + + def __init__(self, resp): + """Initialize with HTTP response.""" + self.urllib3_response = resp + self.status = resp.status + self.reason = resp.reason + self.data = resp.data + + def getheaders(self): + """Return a dictionary of the response headers.""" + return self.urllib3_response.headers + + def getheader(self, name, default=None): + """Return a given response header.""" + return self.urllib3_response.headers.get(name, default) + + def get_string_body(self): + string = self.urllib3_response.data.decode('utf-8') + if string is None or string == '': + return None + return string + + +class RestClient(object): + + def __init__(self, + base_url, + default_header=None, + verify_ssl=True, + ssl_ca_cert=None, + cert_file=None, + cert_key_file=None, + cert_key_password=None, + ssl_context=None, + proxy=None, + proxy_headers=None, + pools_size=4, + maxsize=None, + timeout=None, + retries=False, + connection_pool_maxsize=multiprocessing.cpu_count() * 5, + ): + """Initialize REST client.""" + # urllib3.PoolManager will pass all kw parameters to connectionpool + # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501 + # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501 + # maxsize is the number of requests to host that are allowed in parallel # noqa: E501 + # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501 + + self.base_url = base_url + self.pools_size = pools_size + self.maxsize = maxsize + self.timeout = timeout + self.retries = retries + self.default_header = default_header + self.verify_ssl = verify_ssl + self.ssl_context = ssl_context + self.proxy = proxy + self.proxy_headers = proxy_headers + self.ssl_ca_cert = ssl_ca_cert + self.cert_file = cert_file + self.cert_key_file = cert_key_file + self.cert_key_password = cert_key_password + + # cert_reqs + if verify_ssl: + cert_reqs = ssl.CERT_REQUIRED + else: + cert_reqs = ssl.CERT_NONE + + # ca_certs + if ssl_ca_cert: + ca_certs = ssl_ca_cert + else: + ca_certs = None + + addition_pool_args = {'retries': self.retries} + + if maxsize is None: + if connection_pool_maxsize is not None: + maxsize = connection_pool_maxsize + else: + maxsize = 4 + + # https pool manager + if proxy: + self.pool_manager = urllib3.ProxyManager( + num_pools=pools_size, + maxsize=maxsize, + cert_reqs=cert_reqs, + ca_certs=ca_certs, + cert_file=cert_file, + key_file=cert_key_file, + key_password=cert_key_password, + proxy_url=proxy, + proxy_headers=proxy_headers, + ssl_context=ssl_context, + **addition_pool_args + ).connection_from_url(url=base_url) + else: + self.pool_manager = urllib3.PoolManager( + num_pools=pools_size, + maxsize=maxsize, + cert_reqs=cert_reqs, + ca_certs=ca_certs, + cert_file=cert_file, + key_file=cert_key_file, + key_password=cert_key_password, + ssl_context=ssl_context, + **addition_pool_args + ).connection_from_url(url=base_url) + + def request(self, method, url, query_params=None, headers=None, + body=None, timeout=None, **urlopen_kw): + """Perform requests. + + :param method: http request method + :param url: http request url + :param query_params: query parameters in the url + :param headers: http request headers + :param body: request json body, for `application/json` + :param timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :param urlopen_kw: Additional parameters are passed to + :meth:`urllib3.request.RequestMethods.request` + """ + + if query_params: + url += '?' + urlencode(query_params) + + r = self.pool_manager.request(method, url, + body=body, + headers=headers, + timeout=timeout, + **urlopen_kw) + + r = RESTResponse(r) + r.data = r.data.decode('utf8') + + if not 200 <= r.status <= 299: + raise ApiException(http_resp=r) + + return r + + def __getstate__(self): + """Return a dict of attributes that you want to pickle.""" + state = self.__dict__.copy() + # Remove Pool managaer + del state['pool_manager'] + return state + + def __setstate__(self, state): + """Set your object with the provided dict.""" + self.__dict__.update(state) + # Init Pool manager + self.__init__( + base_url=self.base_url, + pools_size=self.pools_size, + maxsize=self.maxsize, + timeout=self.timeout, + retries=self.retries, + default_header=self.default_header, + verify_ssl=self.verify_ssl, + ssl_context=self.ssl_context, + proxy=self.proxy, + proxy_headers=self.proxy_headers, + ssl_ca_cert=self.ssl_ca_cert, + cert_file=self.cert_file, + cert_key_file=self.cert_key_file, + cert_key_password=self.cert_key_password, + ) diff --git a/influxdb_client_3/write_client/client/__init__.py b/influxdb_client_3/write_client/client/__init__.py index 5b960961..9b33e9bb 100644 --- a/influxdb_client_3/write_client/client/__init__.py +++ b/influxdb_client_3/write_client/client/__init__.py @@ -1,6 +1,3 @@ # flake8: noqa from __future__ import absolute_import - -# import apis into api package -from influxdb_client_3.write_client.service.write_service import WriteService diff --git a/influxdb_client_3/write_client/client/_base.py b/influxdb_client_3/write_client/client/_base.py deleted file mode 100644 index 8acae180..00000000 --- a/influxdb_client_3/write_client/client/_base.py +++ /dev/null @@ -1,296 +0,0 @@ -"""Commons function for Sync and Async client.""" -from __future__ import absolute_import - -import configparser -import logging -import os -from typing import Iterable - -from typing_extensions import deprecated - -from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer -from influxdb_client_3.write_client.configuration import Configuration -from influxdb_client_3.write_client.rest import _UTF_8_encoding -from influxdb_client_3.write_client.service.write_service import WriteService - -try: - import dataclasses - - _HAS_DATACLASS = True -except ModuleNotFoundError: - _HAS_DATACLASS = False - -LOGGERS_NAMES = [ - 'influxdb_client.client.influxdb_client', - 'influxdb_client.client.influxdb_client_async', - 'influxdb_client.client.write_api', - 'influxdb_client.client.write_api_async', - 'influxdb_client.client.write.retry', - 'influxdb_client.client.write.dataframe_serializer', - 'influxdb_client.client.util.multiprocessing_helper', - 'influxdb_client.client.http', - 'influxdb_client.client.exceptions', -] - - -# noinspection PyMethodMayBeStatic -class _BaseClient(object): - def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, org: str = None, - default_tags: dict = None, http_client_logger: str = None, **kwargs) -> None: - self.url = url - self.org = org - - self.default_tags = default_tags - - self.conf = _Configuration() - if self.url.endswith("/"): - self.conf.host = self.url[:-1] - else: - self.conf.host = self.url - self.conf.enable_gzip = enable_gzip - self.conf.gzip_threshold = kwargs.get('gzip_threshold', None) - self.conf.verify_ssl = kwargs.get('verify_ssl', True) - self.conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None) - self.conf.cert_file = kwargs.get('cert_file', None) - self.conf.cert_key_file = kwargs.get('cert_key_file', None) - self.conf.cert_key_password = kwargs.get('cert_key_password', None) - self.conf.ssl_context = kwargs.get('ssl_context', None) - self.conf.proxy = kwargs.get('proxy', None) - self.conf.proxy_headers = kwargs.get('proxy_headers', None) - self.conf.connection_pool_maxsize = kwargs.get('connection_pool_maxsize', self.conf.connection_pool_maxsize) - self.conf.timeout = timeout - # logging - self.conf.loggers["http_client_logger"] = logging.getLogger(http_client_logger) - for client_logger in LOGGERS_NAMES: - self.conf.loggers[client_logger] = logging.getLogger(client_logger) - self.conf.debug = debug - - # defaults - self.auth_header_name = None - self.auth_header_value = None - # by token - if token: - auth_scheme = kwargs.get('auth_scheme', "Token") - self.auth_header_name = "Authorization" - self.auth_header_value = f"{auth_scheme} {token}" - - self.retries = kwargs.get('retries', False) - - self.profilers = kwargs.get('profilers', None) - pass - - @classmethod - def _from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False, **kwargs): - config = configparser.ConfigParser() - config_name = kwargs.get('config_name', 'influx2') - is_json = False - try: - config.read(config_file) - except configparser.ParsingError: - with open(config_file) as json_file: - import json - config = json.load(json_file) - is_json = True - - def _config_value(key: str): - value = str(config[key]) if is_json else config[config_name][key] - return value.strip('"') - - def _has_option(key: str): - return key in config if is_json else config.has_option(config_name, key) - - def _has_section(key: str): - return key in config if is_json else config.has_section(key) - - url = _config_value('url') - token = _config_value('token') - - timeout = None - if _has_option('timeout'): - timeout = _config_value('timeout') - - org = None - if _has_option('org'): - org = _config_value('org') - - verify_ssl = True - if _has_option('verify_ssl'): - verify_ssl = _config_value('verify_ssl') - - ssl_ca_cert = None - if _has_option('ssl_ca_cert'): - ssl_ca_cert = _config_value('ssl_ca_cert') - - cert_file = None - if _has_option('cert_file'): - cert_file = _config_value('cert_file') - - cert_key_file = None - if _has_option('cert_key_file'): - cert_key_file = _config_value('cert_key_file') - - cert_key_password = None - if _has_option('cert_key_password'): - cert_key_password = _config_value('cert_key_password') - - connection_pool_maxsize = None - if _has_option('connection_pool_maxsize'): - connection_pool_maxsize = _config_value('connection_pool_maxsize') - - default_tags = None - if _has_section('tags'): - if is_json: - default_tags = config['tags'] - else: - tags = {k: v.strip('"') for k, v in config.items('tags')} - default_tags = dict(tags) - - profilers = None - if _has_option('profilers'): - profilers = [x.strip() for x in _config_value('profilers').split(',')] - - proxy = None - if _has_option('proxy'): - proxy = _config_value('proxy') - - return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags, - enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert, - cert_file=cert_file, cert_key_file=cert_key_file, cert_key_password=cert_key_password, - connection_pool_maxsize=_to_int(connection_pool_maxsize), profilers=profilers, proxy=proxy, **kwargs) - - @classmethod - @deprecated('Use InfluxDBClient3.from_env() instead.') - def _from_env_properties(cls, debug=None, enable_gzip=False, **kwargs): - url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086") - token = os.getenv('INFLUXDB_V2_TOKEN', "my-token") - timeout = os.getenv('INFLUXDB_V2_TIMEOUT', "10000") - org = os.getenv('INFLUXDB_V2_ORG', "my-org") - verify_ssl = os.getenv('INFLUXDB_V2_VERIFY_SSL', "True") - ssl_ca_cert = os.getenv('INFLUXDB_V2_SSL_CA_CERT', None) - cert_file = os.getenv('INFLUXDB_V2_CERT_FILE', None) - cert_key_file = os.getenv('INFLUXDB_V2_CERT_KEY_FILE', None) - cert_key_password = os.getenv('INFLUXDB_V2_CERT_KEY_PASSWORD', None) - connection_pool_maxsize = os.getenv('INFLUXDB_V2_CONNECTION_POOL_MAXSIZE', None) - - prof = os.getenv("INFLUXDB_V2_PROFILERS", None) - profilers = None - if prof is not None: - profilers = [x.strip() for x in prof.split(',')] - - default_tags = dict() - - for key, value in os.environ.items(): - if key.startswith("INFLUXDB_V2_TAG_"): - default_tags[key[16:].lower()] = value - - return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags, - enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert, - cert_file=cert_file, cert_key_file=cert_key_file, cert_key_password=cert_key_password, - connection_pool_maxsize=_to_int(connection_pool_maxsize), profilers=profilers, **kwargs) - - -class _BaseWriteApi(object): - def __init__(self, influxdb_client, point_settings=None): - self._influxdb_client = influxdb_client - self._point_settings = point_settings - self._write_service = WriteService(influxdb_client.api_client) - if influxdb_client.default_tags: - for key, value in influxdb_client.default_tags.items(): - self._point_settings.add_default_tag(key, value) - - def _append_default_tag(self, key, val, record): - from influxdb_client_3.write_client import Point - if isinstance(record, bytes) or isinstance(record, str): - pass - elif isinstance(record, Point): - record.tag(key, val) - elif isinstance(record, dict): - record.setdefault("tags", {}) - record.get("tags")[key] = val - elif isinstance(record, Iterable): - for item in record: - self._append_default_tag(key, val, item) - - def _append_default_tags(self, record): - if self._point_settings.defaultTags and record is not None: - for key, val in self._point_settings.defaultTags.items(): - self._append_default_tag(key, val, record) - - def _serialize(self, record, write_precision, payload, **kwargs): - from influxdb_client_3.write_client.client.write.point import Point - if isinstance(record, bytes): - payload[write_precision].append(record) - - elif isinstance(record, str): - self._serialize(record.encode(_UTF_8_encoding), write_precision, payload, **kwargs) - - elif isinstance(record, Point): - precision_from_point = kwargs.get('precision_from_point', True) - precision = record.write_precision if precision_from_point else write_precision - self._serialize(record.to_line_protocol(precision=precision, tag_order=kwargs.get('tag_order')), - precision, payload, **kwargs) - - elif isinstance(record, dict): - self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs), - write_precision, payload, **kwargs) - elif 'polars' in str(type(record)): - from influxdb_client_3.write_client.client.write.polars_dataframe_serializer import \ - PolarsDataframeSerializer - serializer = PolarsDataframeSerializer(record, self._point_settings, write_precision, **kwargs) - self._serialize(serializer.serialize(), write_precision, payload, **kwargs) - - elif 'pandas' in str(type(record)): - serializer = DataframeSerializer(record, self._point_settings, write_precision, **kwargs) - self._serialize(serializer.serialize(), write_precision, payload, **kwargs) - - elif hasattr(record, "_asdict"): - # noinspection PyProtectedMember - self._serialize(record._asdict(), write_precision, payload, **kwargs) - elif _HAS_DATACLASS and dataclasses.is_dataclass(record): - self._serialize(dataclasses.asdict(record), write_precision, payload, **kwargs) - elif isinstance(record, Iterable): - for item in record: - self._serialize(item, write_precision, payload, **kwargs) - - -class _Configuration(Configuration): - def __init__(self): - Configuration.__init__(self) - self.enable_gzip = False - - def update_request_header_params(self, path: str, params: dict, should_gzip: bool = False): - super().update_request_header_params(path, params, should_gzip) - if should_gzip: - # GZIP Request - if path == '/api/v2/write' or path == '/api/v3/write_lp': - params["Content-Encoding"] = "gzip" - params["Accept-Encoding"] = "identity" - pass - # GZIP Response - if path == '/api/v2/query': - # params["Content-Encoding"] = "gzip" - params["Accept-Encoding"] = "gzip" - pass - pass - pass - - def update_request_body(self, path: str, body, should_gzip: bool = False): - _body = super().update_request_body(path, body, should_gzip) - if should_gzip: - # GZIP Request - if path == '/api/v2/write' or path == '/api/v3/write_lp': - import gzip - if isinstance(_body, bytes): - return gzip.compress(data=_body) - else: - return gzip.compress(bytes(_body, _UTF_8_encoding)) - - return _body - - -def _to_bool(bool_value): - return str(bool_value).lower() in ("yes", "true") - - -def _to_int(int_value): - return int(int_value) if int_value is not None else None diff --git a/influxdb_client_3/write_client/client/influxdb_client.py b/influxdb_client_3/write_client/client/influxdb_client.py deleted file mode 100644 index 3ea97f60..00000000 --- a/influxdb_client_3/write_client/client/influxdb_client.py +++ /dev/null @@ -1,288 +0,0 @@ -"""InfluxDBClient is client for API defined in https://github.com/influxdata/influxdb/blob/master/http/swagger.yml.""" - -from __future__ import absolute_import - -import logging - -from typing_extensions import deprecated - -from influxdb_client_3.write_client.client._base import _BaseClient -from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions, PointSettings - -logger = logging.getLogger('influxdb_client_3.write_client.client.influxdb_client') - - -class InfluxDBClient(_BaseClient): - """InfluxDBClient is client for InfluxDB v2.""" - - def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, org: str = None, - default_tags: dict = None, gzip_threshold=None, **kwargs) -> None: - """ - Initialize defaults. - - :param url: InfluxDB server API url (ex. http://localhost:8086). - :param token: ``token`` to authenticate to the InfluxDB API - :param debug: enable verbose logging of http requests - :param timeout: HTTP client timeout setting for a request specified in milliseconds. - If one number provided, it will be total request timeout. - It can also be a pair (tuple) of (connection, read) timeouts. - :param enable_gzip: Enable Gzip compression for http requests. Currently, only the "Write" and "Query" endpoints - supports the Gzip compression. - :param org: organization name (used as a default in Query, Write and Delete API) - :key auth_scheme: token authentication scheme. Set to "Bearer" for Edge. - :key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server. - :key str ssl_ca_cert: Set this to customize the certificate file to verify the peer. - :key str cert_file: Path to the certificate that will be used for mTLS authentication. - :key str cert_key_file: Path to the file contains private key for mTLS certificate. - :key str cert_key_password: String or function which returns password for decrypting the mTLS private key. - :key ssl.SSLContext ssl_context: Specify a custom Python SSL Context for the TLS/ mTLS handshake. - Be aware that only delivered certificate/ key files or an SSL Context are - possible. - :key str proxy: Set this to configure the http proxy to be used (ex. http://localhost:3128) - :key str proxy_headers: A dictionary containing headers that will be sent to the proxy. Could be used for proxy - authentication. - :key int connection_pool_maxsize: Number of connections to save that can be reused by urllib3. - Defaults to "multiprocessing.cpu_count() * 5". - :key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests - except batching writes. As a default there is no one retry strategy. - :key list[str] profilers: list of enabled Flux profilers - """ - super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, - gzip_threshold=gzip_threshold, org=org, - default_tags=default_tags, http_client_logger="urllib3", **kwargs) - - from influxdb_client_3.write_client._sync.api_client import ApiClient - self.api_client = ApiClient(configuration=self.conf, header_name=self.auth_header_name, - header_value=self.auth_header_value, retries=self.retries) - - def __enter__(self): - """ - Enter the runtime context related to this object. - - It will bind this method’s return value to the target(s) - specified in the `as` clause of the statement. - - return: self instance - """ - return self - - def __exit__(self, exc_type, exc_value, traceback): - """Exit the runtime context related to this object and close the client.""" - self.close() - - @classmethod - def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False, **kwargs): - """ - Configure client via configuration file. The configuration has to be under 'influx' section. - - :param config_file: Path to configuration file - :param debug: Enable verbose logging of http requests - :param enable_gzip: Enable Gzip compression for http requests. Currently, only the "Write" and "Query" endpoints - supports the Gzip compression. - :key config_name: Name of the configuration section of the configuration file - :key str proxy_headers: A dictionary containing headers that will be sent to the proxy. Could be used for proxy - authentication. - :key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests - except batching writes. As a default there is no one retry strategy. - :key ssl.SSLContext ssl_context: Specify a custom Python SSL Context for the TLS/ mTLS handshake. - Be aware that only delivered certificate/ key files or an SSL Context are - possible. - - The supported formats: - - https://docs.python.org/3/library/configparser.html - - https://toml.io/en/ - - https://www.json.org/json-en.html - - Configuration options: - - url - - org - - token - - timeout, - - verify_ssl - - ssl_ca_cert - - cert_file - - cert_key_file - - cert_key_password - - connection_pool_maxsize - - profilers - - proxy - - - config.ini example:: - - [influx2] - url=http://localhost:8086 - org=my-org - token=my-token - timeout=6000 - connection_pool_maxsize=25 - profilers=query,operator - proxy=http:proxy.domain.org:8080 - - [tags] - id = 132-987-655 - customer = California Miner - data_center = ${env.data_center} - - config.toml example:: - - [influx2] - url = "http://localhost:8086" - token = "my-token" - org = "my-org" - timeout = 6000 - connection_pool_maxsize = 25 - profilers="query, operator" - proxy = "http://proxy.domain.org:8080" - - [tags] - id = "132-987-655" - customer = "California Miner" - data_center = "${env.data_center}" - - config.json example:: - - { - "url": "http://localhost:8086", - "token": "my-token", - "org": "my-org", - "active": true, - "timeout": 6000, - "connection_pool_maxsize": 55, - "profilers": "query, operator", - "tags": { - "id": "132-987-655", - "customer": "California Miner", - "data_center": "${env.data_center}" - } - } - - """ - return InfluxDBClient._from_config_file(config_file=config_file, debug=debug, enable_gzip=enable_gzip, **kwargs) - - @classmethod - @deprecated('Use InfluxDBClient3.from_env() instead.') - def from_env_properties(cls, debug=None, enable_gzip=False, **kwargs): - """ - Configure client via environment properties. - - :param debug: Enable verbose logging of http requests - :param enable_gzip: Enable Gzip compression for http requests. Currently, only the "Write" and "Query" endpoints - supports the Gzip compression. - :key str proxy: Set this to configure the http proxy to be used (ex. http://localhost:3128) - :key str proxy_headers: A dictionary containing headers that will be sent to the proxy. Could be used for proxy - authentication. - :key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests - except batching writes. As a default there is no one retry strategy. - :key ssl.SSLContext ssl_context: Specify a custom Python SSL Context for the TLS/ mTLS handshake. - Be aware that only delivered certificate/ key files or an SSL Context are - possible. - - Supported environment properties: - - INFLUXDB_V2_URL - - INFLUXDB_V2_ORG - - INFLUXDB_V2_TOKEN - - INFLUXDB_V2_TIMEOUT - - INFLUXDB_V2_VERIFY_SSL - - INFLUXDB_V2_SSL_CA_CERT - - INFLUXDB_V2_CERT_FILE - - INFLUXDB_V2_CERT_KEY_FILE - - INFLUXDB_V2_CERT_KEY_PASSWORD - - INFLUXDB_V2_CONNECTION_POOL_MAXSIZE - - INFLUXDB_V2_PROFILERS - - INFLUXDB_V2_TAG - """ - return InfluxDBClient._from_env_properties(debug=debug, enable_gzip=enable_gzip, **kwargs) - - def write_api(self, write_options=WriteOptions(), point_settings=PointSettings(), **kwargs) -> WriteApi: - """ - Create Write API instance. - - Example: - .. code-block:: python - - from influxdb_client import InfluxDBClient - from influxdb_client.client.write_api import SYNCHRONOUS - - - # Initialize SYNCHRONOUS instance of WriteApi - with InfluxDBClient(url="http://localhost:8086", token="my-token") as client: - write_api = client.write_api(write_options=SYNCHRONOUS) - - If you would like to use a **background batching**, you have to configure client like this: - - .. code-block:: python - - from influxdb_client import InfluxDBClient - - # Initialize background batching instance of WriteApi - with InfluxDBClient(url="http://localhost:8086", token="my-token") as client: - with client.write_api() as write_api: - pass - - There is also possibility to use callbacks to notify about state of background batches: - - .. code-block:: python - - from influxdb_client import InfluxDBClient - from influxdb_client.client.exceptions import InfluxDBError - - - class BatchingCallback(object): - - def success(self, conf: (str, str, str), data: str): - print(f"Written batch: {conf}, data: {data}") - - def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): - print(f"Cannot write batch: {conf}, data: {data} due: {exception}") - - def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): - print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") - - - with InfluxDBClient(url="http://localhost:8086", token="my-token") as client: - callback = BatchingCallback() - with client.write_api(success_callback=callback.success, - error_callback=callback.error, - retry_callback=callback.retry) as write_api: - pass - - :param write_options: Write API configuration - :param point_settings: settings to store default tags - :key success_callback: The callable ``callback`` to run after successfully writen a batch. - - The callable must accept two arguments: - - `Tuple`: ``(bucket, organization, precision)`` - - `str`: written data - - **[batching mode]** - - :key error_callback: The callable ``callback`` to run after unsuccessfully writen a batch. - - The callable must accept three arguments: - - `Tuple`: ``(bucket, organization, precision)`` - - `str`: written data - - `Exception`: an occurred error - - **[batching mode]** - :key retry_callback: The callable ``callback`` to run after retryable error occurred. - - The callable must accept three arguments: - - `Tuple`: ``(bucket, organization, precision)`` - - `str`: written data - - `Exception`: an retryable error - - **[batching mode]** - :return: write api instance - """ - return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings, **kwargs) - - def close(self): - """Shutdown the client.""" - self.__del__() - - def __del__(self): - """Shutdown the client.""" - if self.api_client: - self.api_client.__del__() - self.api_client = None diff --git a/influxdb_client_3/write_client/client/logging_handler.py b/influxdb_client_3/write_client/client/logging_handler.py deleted file mode 100644 index 4cfcfd53..00000000 --- a/influxdb_client_3/write_client/client/logging_handler.py +++ /dev/null @@ -1,64 +0,0 @@ -"""Use the influxdb_client with python native logging.""" -import logging - -from influxdb_client_3.write_client import InfluxDBClient - - -class InfluxLoggingHandler(logging.Handler): - """ - InfluxLoggingHandler instances dispatch logging events to influx. - - There is no need to set a Formatter. - The raw input will be passed on to the influx write api. - """ - - DEFAULT_LOG_RECORD_KEYS = list(logging.makeLogRecord({}).__dict__.keys()) + ['message'] - - def __init__(self, *, url, token, org, bucket, client_args=None, write_api_args=None): - """ - Initialize defaults. - - The arguments `client_args` and `write_api_args` can be dicts of kwargs. - They are passed on to the InfluxDBClient and write_api calls respectively. - """ - super().__init__() - - self.bucket = bucket - - client_args = {} if client_args is None else client_args - self.client = InfluxDBClient(url=url, token=token, org=org, **client_args) - - write_api_args = {} if write_api_args is None else write_api_args - self.write_api = self.client.write_api(**write_api_args) - - def __del__(self): - """Make sure all resources are closed.""" - self.close() - - def close(self) -> None: - """Close the write_api, client and logger.""" - self.write_api.close() - self.client.close() - super().close() - - def emit(self, record: logging.LogRecord) -> None: - """Emit a record via the influxDB WriteApi.""" - try: - message = self.format(record) - extra = self._get_extra_values(record) - return self.write_api.write(record=message, **extra) - except (KeyboardInterrupt, SystemExit): - raise - except (Exception,): - self.handleError(record) - - def _get_extra_values(self, record: logging.LogRecord) -> dict: - """ - Extract all items from the record that were injected via extra. - - Example: `logging.debug(msg, extra={key: value, ...})`. - """ - extra = {'bucket': self.bucket} - extra.update({key: value for key, value in record.__dict__.items() - if key not in self.DEFAULT_LOG_RECORD_KEYS}) - return extra diff --git a/influxdb_client_3/write_client/client/util/multiprocessing_helper.py b/influxdb_client_3/write_client/client/util/multiprocessing_helper.py index 311bf4c2..002cf964 100644 --- a/influxdb_client_3/write_client/client/util/multiprocessing_helper.py +++ b/influxdb_client_3/write_client/client/util/multiprocessing_helper.py @@ -7,7 +7,7 @@ import logging import multiprocessing -from influxdb_client_3.write_client import InfluxDBClient, WriteOptions +from influxdb_client_3.write_client import WriteOptions from influxdb_client_3.exceptions import InfluxDBError logger = logging.getLogger('influxdb_client.client.util.multiprocessing_helper') diff --git a/influxdb_client_3/write_client/client/write/__init__.py b/influxdb_client_3/write_client/client/write/__init__.py index 5b960961..9b33e9bb 100644 --- a/influxdb_client_3/write_client/client/write/__init__.py +++ b/influxdb_client_3/write_client/client/write/__init__.py @@ -1,6 +1,3 @@ # flake8: noqa from __future__ import absolute_import - -# import apis into api package -from influxdb_client_3.write_client.service.write_service import WriteService diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 9cd41bf0..ccf60fe4 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -1,31 +1,36 @@ """Collect and write time series data to InfluxDB Cloud or InfluxDB OSS.""" - +from __future__ import absolute_import # coding: utf-8 # TODO Remove after this program no longer supports Python 3.8.* from __future__ import annotations +import datetime import logging import os import warnings from collections import defaultdict -from datetime import timedelta from enum import Enum +from http import HTTPStatus +from multiprocessing.pool import ThreadPool from random import random from time import sleep from typing import Union, Any, Iterable, NamedTuple import reactivex as rx +import urllib3 from reactivex import operators as ops, Observable from reactivex.scheduler import ThreadPoolScheduler from reactivex.subject import Subject -from influxdb_client_3.write_client.client._base import _BaseWriteApi, _HAS_DATACLASS -from influxdb_client_3.write_client.client.util.helpers import get_org_query_param +from influxdb_client_3.exceptions import InfluxDBPartialWriteError +from influxdb_client_3.write_client._sync.rest_client import RestClient +# from influxdb_client_3.write_client.client._base import _HAS_DATACLASS from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, sanitize_tag_order from influxdb_client_3.write_client.client.write.retry import WritesRetry from influxdb_client_3.write_client.domain import WritePrecision -from influxdb_client_3.write_client.rest import _UTF_8_encoding +from influxdb_client_3.write_client.domain.write_precision_converter import WritePrecisionConverter +from influxdb_client_3.write_client.rest import _UTF_8_encoding, ApiException from influxdb_client_3.write_client.write_defaults import ( DEFAULT_WRITE_ACCEPT_PARTIAL as _DEFAULT_WRITE_ACCEPT_PARTIAL, DEFAULT_WRITE_NO_SYNC as _DEFAULT_WRITE_NO_SYNC, @@ -59,6 +64,11 @@ logger = logging.getLogger('influxdb_client_3.write_client.client.write_api') +try: + _HAS_DATACLASS = True +except ModuleNotFoundError: + _HAS_DATACLASS = False + if _HAS_DATACLASS: import dataclasses from dataclasses import dataclass @@ -266,7 +276,10 @@ def _body_reduce(batch_items): return b'\n'.join(map(lambda batch_item: batch_item.data, batch_items)) -class WriteApi(_BaseWriteApi): +class WriteApi: + PRIMITIVE_TYPES = (float, bool, bytes, str, int) + _pool = None + """ Implementation for '/api/v2/write' and '/api/v3/write_lp' endpoint. @@ -283,7 +296,16 @@ class WriteApi(_BaseWriteApi): """ def __init__(self, - influxdb_client, + token: str, + bucket: str, + org: str, + gzip_threshold=None, + enable_gzip=False, + auth_scheme=None, + timeout=None, + pool_threads=None, + default_header=None, + rest_client: RestClient = None, write_options: WriteOptions = WriteOptions(), point_settings: PointSettings = PointSettings(), **kwargs) -> None: @@ -317,7 +339,18 @@ def __init__(self, **[batching mode]** """ - super().__init__(influxdb_client=influxdb_client, point_settings=point_settings) + self.rest_client = rest_client + self.token = token + self.bucket = bucket + self.org = org + self.enable_gzip = enable_gzip + self.gzip_threshold = gzip_threshold + self.auth_scheme = auth_scheme + self.timeout = timeout + self.pool_threads = pool_threads + self._point_settings = point_settings + self.default_header = default_header + self._write_options = write_options # TODO - callbacks seem to be used with batching type only - could they be used with sync or async? self._success_callback = kwargs.get('success_callback', None) @@ -336,20 +369,22 @@ def __init__(self, # TODO above message has link to Influxdb2 API __NOT__ Influxdb3 API !!! - illustrates different API warnings.warn(message, DeprecationWarning) - def _resolve_write_request_options(self, kwargs): - no_sync = kwargs.pop('no_sync', self._write_options.no_sync) - accept_partial = kwargs.pop('accept_partial', self._write_options.accept_partial) - use_v2_api = kwargs.pop('use_v2_api', self._write_options.use_v2_api) - if use_v2_api and no_sync: - raise ValueError("invalid write options: no_sync cannot be used with use_v2_api") - return no_sync, accept_partial, use_v2_api + @property + def pool(self): + """Create thread pool on first request avoids instantiating unused threadpool for blocking clients.""" + if self._pool is None: + self._pool = ThreadPool(self.pool_threads) + return self._pool - def write(self, bucket: str, org: str = None, + def write(self, + bucket=None, + org=None, record: Union[ str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'], Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass'] ] = None, - write_precision: WritePrecision = None, **kwargs) -> Any: + write_precision: WritePrecision = None, + **kwargs) -> Any: """ Write time-series data into InfluxDB. @@ -416,7 +451,9 @@ def write(self, bucket: str, org: str = None, data_frame.index = pd.to_datetime(data_frame.index, unit='s') """ # noqa: E501 - org = get_org_query_param(org=org, client=self._influxdb_client) + + org = org if org is not None else self.org + bucket = bucket if bucket is not None else self.bucket self._append_default_tags(record) @@ -456,32 +493,92 @@ def write_payload(payload): return results[0] return results - def _create_batching_pipeline(self) -> tuple[Subject[Any], rx.abc.DisposableBase]: - """Create the batching pipeline for collecting and writing data.""" - # Define Subject that listen incoming data and produces writes into InfluxDB - subject = Subject() - - disposable = subject.pipe( - # Split incoming data to windows by batch_size or flush_interval - ops.window_with_time_or_count(count=self._write_options.batch_size, - timespan=timedelta(milliseconds=self._write_options.flush_interval)), - # Map window into groups defined by 'organization', 'bucket' and 'precision' - ops.flat_map(lambda window: window.pipe( # type: ignore - # Group window by 'organization', 'bucket' and 'precision' - ops.group_by(lambda batch_item: batch_item.key), # type: ignore - # Create batch (concatenation line protocols by \n) - ops.map(lambda group: group.pipe( # type: ignore - ops.to_iterable(), - ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))), - # type: ignore - ops.merge_all())), - # Write data into InfluxDB (possibility to retry if its fail) - ops.filter(lambda batch: batch.size > 0), - ops.map(mapper=lambda batch: self._to_response(data=batch, delay=self._jitter_delay())), - ops.merge_all()) \ - .subscribe(self._on_next, self._on_error, self._on_complete) + async def post_write_async(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 + """Write data. + + Writes data to a bucket. Use this endpoint to send data in [line protocol](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/) format to InfluxDB. #### InfluxDB Cloud - Does the following when you send a write request: 1. Validates the request and queues the write. 2. If queued, responds with _success_ (HTTP `2xx` status code); _error_ otherwise. 3. Handles the delete asynchronously and reaches eventual consistency. To ensure that InfluxDB Cloud handles writes and deletes in the order you request them, wait for a success response (HTTP `2xx` status code) before you send the next request. Because writes and deletes are asynchronous, your change might not yet be readable when you receive the response. #### InfluxDB OSS - Validates the request and handles the write synchronously. - If all points were written successfully, responds with HTTP `2xx` status code; otherwise, returns the first line that failed. #### Required permissions - `write-buckets` or `write-bucket BUCKET_ID`. *`BUCKET_ID`* is the ID of the destination bucket. #### Rate limits (with InfluxDB Cloud) `write` rate limits apply. For more information, see [limits and adjustable quotas](https://docs.influxdata.com/influxdb/cloud/account-management/limits/). #### Related guides - [Write data with the InfluxDB API](https://docs.influxdata.com/influxdb/latest/write-data/developer-tools/api) - [Optimize writes to InfluxDB](https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) - [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/) + This method makes an asynchronous HTTP request. + + :param async_req bool + :param str org: An organization name or ID. #### InfluxDB Cloud - Doesn't use the `org` parameter or `orgID` parameter. - Writes data to the bucket in the organization associated with the authorization (API token). #### InfluxDB OSS - Requires either the `org` parameter or the `orgID` parameter. - If you pass both `orgID` and `org`, they must both be valid. - Writes data to the bucket in the specified organization. (required) + :param str bucket: A bucket name or ID. InfluxDB writes all points in the batch to the specified bucket. (required) + :param str body: In the request body, provide data in [line protocol format](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/). To send compressed data, do the following: 1. Use [GZIP](https://www.gzip.org/) to compress the line protocol data. 2. In your request, send the compressed data and the `Content-Encoding: gzip` header. #### Related guides - [Best practices for optimizing writes](https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) (required) + :param str zap_trace_span: OpenTracing span context + :param str content_encoding: The compression applied to the line protocol in the request payload. To send a GZIP payload, pass `Content-Encoding: gzip` header. + :param str content_type: The format of the data in the request body. To send a line protocol payload, pass `Content-Type: text/plain; charset=utf-8`. + :param int content_length: The size of the entity-body, in bytes, sent to InfluxDB. If the length is greater than the `max body` configuration option, the server responds with status code `413`. + :param str accept: The content type that the client can understand. Writes only return a response body if they fail--for example, due to a formatting problem or quota limit. #### InfluxDB Cloud - Returns only `application/json` for format and limit errors. - Returns only `text/html` for some quota limit errors. #### InfluxDB OSS - Returns only `application/json` for format and limit errors. #### Related guides - [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/) + :param str org_id: An organization ID. #### InfluxDB Cloud - Doesn't use the `org` parameter or `orgID` parameter. - Writes data to the bucket in the organization associated with the authorization (API token). #### InfluxDB OSS - Requires either the `org` parameter or the `orgID` parameter. - If you pass both `orgID` and `org`, they must both be valid. - Writes data to the bucket in the specified organization. + :param WritePrecision precision: The precision for unix timestamps in the line protocol batch. + :param bool no_sync: Instructs the server whether to wait with the response until WAL persistence completes. True value means faster write but without the confirmation that the data was persisted. Note: This option is supported by InfluxDB 3 Core and Enterprise servers only. For other InfluxDB 3 server types (InfluxDB Clustered, InfluxDB Clould Serverless/Dedicated) the write operation will fail with an error. + :return: None + If the method is called asynchronously, + returns the request thread. + """ # noqa: E501 + local_var_params, path, path_params, query_params, header_params, body_params = \ + self._post_write_prepare(org, bucket, body, self.default_header, **kwargs) # noqa: E501 + use_v2_api = local_var_params['use_v2_api'] + + try: + return await self.call_api( + resource_path=path, + method='POST', + query_params=query_params, + header_params=header_params, + body=body, + async_req=local_var_params.get('async_req'), + _request_timeout=local_var_params.get('_request_timeout'), + urlopen_kw=kwargs.get('urlopen_kw', None)) + except ApiException as e: + raise self._translate_write_exception(e, use_v2_api) + + def call_api(self, resource_path, method, + query_params=None, header_params=None, + body=None, async_req=None, _request_timeout=None, urlopen_kw=None): + """Make the HTTP request (synchronous) and Return deserialized data. + + To make an async_req request, set the async_req parameter. + + :param resource_path: Path to method endpoint. + :param method: Method to call. + :param path_params: Path parameters in the url. + :param query_params: Query parameters in the url. + :param header_params: Header parameters to be + placed in the request header. + :param body: Request body. + :param post_params dict: Request post form parameters, + for `application/x-www-form-urlencoded`, `multipart/form-data`. + :param auth_settings list: Auth Settings names for the request. + :param response: Response data type. + :param files dict: key -> filename, value -> filepath, + for `multipart/form-data`. + :param async_req bool: execute request asynchronously + :param collection_formats: dict of collection formats for path, query, + header, and post parameters. + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :param urlopen_kw: Additional parameters are passed to + :meth:`urllib3.request.RequestMethods.request` + :return: + If async_req parameter is True, + the request will be called asynchronously. + The method will return the request thread. + If parameter async_req is False or missing, + then the method will return the response directly. + """ + if not async_req: + return self._call_api(resource_path, method, + query_params, header_params, + body, _request_timeout, urlopen_kw) - return subject, disposable + else: + # TODO possible refactor - async handler inside package `_sync`? + thread = self.pool.apply_async(self._call_api, (resource_path, + method, query_params, + header_params, body, _request_timeout, urlopen_kw)) + return thread def flush(self): """ @@ -540,24 +637,32 @@ def close(self): if self._disposable: self._disposable = None - def __enter__(self): - """ - Enter the runtime context related to this object. - - It will bind this method’s return value to the target(s) - specified in the `as` clause of the statement. - - return: self instance - """ - return self + def _create_batching_pipeline(self) -> tuple[Subject[Any], rx.abc.DisposableBase]: + """Create the batching pipeline for collecting and writing data.""" + # Define Subject that listen incoming data and produces writes into InfluxDB + subject = Subject() - def __exit__(self, exc_type, exc_val, exc_tb): - """Exit the runtime context related to this object and close the WriteApi.""" - self.close() + disposable = subject.pipe( + # Split incoming data to windows by batch_size or flush_interval + ops.window_with_time_or_count(count=self._write_options.batch_size, + timespan=datetime.timedelta(milliseconds=self._write_options.flush_interval)), + # Map window into groups defined by 'organization', 'bucket' and 'precision' + ops.flat_map(lambda window: window.pipe( # type: ignore + # Group window by 'organization', 'bucket' and 'precision' + ops.group_by(lambda batch_item: batch_item.key), # type: ignore + # Create batch (concatenation line protocols by \n) + ops.map(lambda group: group.pipe( # type: ignore + ops.to_iterable(), + ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))), + # type: ignore + ops.merge_all())), + # Write data into InfluxDB (possibility to retry if its fail) + ops.filter(lambda batch: batch.size > 0), + ops.map(mapper=lambda batch: self._to_response(data=batch, delay=self._jitter_delay())), + ops.merge_all()) \ + .subscribe(self._on_next, self._on_error, self._on_complete) - def __del__(self): - """Close WriteApi.""" - self.close() + return subject, disposable def _write_batching(self, bucket, org, data, precision=None, @@ -643,16 +748,291 @@ def _retry_callback_delegate(exception): def _post_write(self, _async_req, bucket, org, body, precision, no_sync, accept_partial, use_v2_api, **kwargs): # Filter out serializer-specific kwargs before passing to _post_write http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS} - return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision, - no_sync=no_sync, - accept_partial=accept_partial, - use_v2_api=use_v2_api, - async_req=_async_req, - content_type="text/plain; charset=utf-8", - **http_kwargs) + http_kwargs['precision'] = precision + http_kwargs['no_sync'] = no_sync + http_kwargs['accept_partial'] = accept_partial + http_kwargs['use_v2_api'] = use_v2_api + + local_var_params, path, path_params, query_params, header_params, body_params = \ + self._post_write_prepare(org, bucket, body, self.default_header, **http_kwargs) # noqa: E501 + + use_v2_api = local_var_params['use_v2_api'] + try: + result = self.call_api( + path, 'POST', + query_params, + header_params, + body=body_params, + async_req=_async_req, + _request_timeout=local_var_params.get('_request_timeout'), + urlopen_kw=http_kwargs.get('urlopen_kw', None)) + if _async_req: + original_get = result.get + + def translated_get(timeout=None): + try: + return original_get(timeout=timeout) + except ApiException as e: + raise self._translate_write_exception(e, use_v2_api) + + result.get = translated_get + return result + except ApiException as e: + raise self._translate_write_exception(e, use_v2_api) + + def _call_api( + self, resource_path, method, + query_params=None, header_params=None, body=None, + _request_timeout=None, urlopen_kw=None): + + # body + should_gzip = False + if body: + should_gzip = self._should_gzip(body, self.enable_gzip, self.gzip_threshold) + body = self._sanitize_for_serialization(body) + body = self._update_request_body(resource_path, body, should_gzip) + + # header parameters + header_params = header_params or {} + self._update_request_header_params(resource_path, header_params, should_gzip) + if header_params: + header_params = self._sanitize_for_serialization(header_params) + + # query parameters + if query_params: + query_params = self._sanitize_for_serialization(query_params) + + urlopen_kw = urlopen_kw or {} + + timeout = None + _configured_timeout = _request_timeout or self.timeout + if _configured_timeout: + if isinstance(_configured_timeout, (int, float,)): # noqa: E501,F821 + timeout = urllib3.Timeout(total=_configured_timeout / 1_000) + elif (isinstance(_configured_timeout, tuple) and + len(_configured_timeout) == 2): + timeout = urllib3.Timeout( + connect=_configured_timeout[0] / 1_000, read=_configured_timeout[1] / 1_000) + + # perform request and return response + response_data = self.rest_client.request( + method=method, + url=resource_path, + query_params=query_params, + headers=header_params, + body=body, + timeout=timeout, + **urlopen_kw + ) + + self.last_response = response_data + + return response_data + + def _post_write_prepare(self, org, bucket, body, default_header, **kwargs): # noqa: E501,D401,D403 + local_var_params = dict(locals()) + + all_params = ['org', 'bucket', 'body', 'zap_trace_span', 'content_encoding', 'content_type', 'content_length', + 'accept', 'org_id', 'precision', 'no_sync', 'accept_partial', 'use_v2_api'] # noqa: E501 + self._check_operation_params('_post_write', all_params, local_var_params) + local_var_params.setdefault('use_v2_api', DEFAULT_WRITE_USE_V2_API) + local_var_params.setdefault('no_sync', DEFAULT_WRITE_NO_SYNC) + local_var_params.setdefault('accept_partial', DEFAULT_WRITE_ACCEPT_PARTIAL) + # verify the required parameter 'org' is set + if ('org' not in local_var_params or + local_var_params['org'] is None): + raise ValueError("Missing the required parameter `org` when calling `post_write`") # noqa: E501 + # verify the required parameter 'bucket' is set + if ('bucket' not in local_var_params or + local_var_params['bucket'] is None): + raise ValueError("Missing the required parameter `bucket` when calling `post_write`") # noqa: E501 + # verify the required parameter 'body' is set + if ('body' not in local_var_params or + local_var_params['body'] is None): + raise ValueError("Missing the required parameter `body` when calling `post_write`") # noqa: E501 + + path_params = {} + query_params = [] + + use_v2_api = local_var_params['use_v2_api'] + no_sync = local_var_params['no_sync'] + accept_partial = local_var_params['accept_partial'] + if 'org' in local_var_params: + query_params.append(('org', local_var_params['org'])) # noqa: E501 + if 'org_id' in local_var_params: + query_params.append(('orgID', local_var_params['org_id'])) # noqa: E501 + if 'bucket' in local_var_params: + query_params.append(('bucket' if use_v2_api else 'db', local_var_params['bucket'])) # noqa: E501 + + if use_v2_api: + path = '/api/v2/write' + if 'precision' in local_var_params: + precision = local_var_params['precision'] + query_params.append(('precision', WritePrecisionConverter.to_v2_api_string(precision))) # noqa: E501 + else: + path = '/api/v3/write_lp' + if 'precision' in local_var_params: + precision = local_var_params['precision'] + query_params.append(('precision', WritePrecisionConverter.to_v3_api_string(precision))) # noqa: E501 + if no_sync: + query_params.append(('no_sync', 'true')) + if accept_partial is False: + query_params.append(('accept_partial', 'false')) + + header_params = dict(default_header) if default_header is not None else {} + header_params.setdefault('Accept', 'application/json') + header_params.setdefault('Content-Type', 'text/plain; charset=utf-8') + + if 'content_encoding' in local_var_params: + header_params['Content-Encoding'] = local_var_params['content_encoding'] # noqa: E501 + + body_params = None + if 'body' in local_var_params: + body_params = local_var_params['body'] + + return local_var_params, path, path_params, query_params, header_params, body_params + + def _check_operation_params(self, operation_id, supported_params, local_params): + supported_params.append('async_req') + supported_params.append('_request_timeout') + supported_params.append('urlopen_kw') + for key, val in local_params['kwargs'].items(): + if key not in supported_params: + raise TypeError( + f"Got an unexpected keyword argument '{key}'" + f" to method {operation_id}" + ) + local_params[key] = val + del local_params['kwargs'] + + def _update_request_header_params(self, path: str, params: dict, should_gzip: bool = False): + if should_gzip: + # GZIP Request + if path == '/api/v2/write' or path == '/api/v3/write_lp': + params["Content-Encoding"] = "gzip" + params["Accept-Encoding"] = "identity" + pass + # GZIP Response + if path == '/api/v2/query': + # params["Content-Encoding"] = "gzip" + params["Accept-Encoding"] = "gzip" + pass + pass + pass - def _to_response(self, data: _BatchItem, delay: timedelta): + def _update_request_body(self, path: str, body, should_gzip: bool = False): + _body = body + if should_gzip: + # GZIP Request + if path == '/api/v2/write' or path == '/api/v3/write_lp': + import gzip + if isinstance(_body, bytes): + return gzip.compress(data=_body) + else: + return gzip.compress(bytes(_body, _UTF_8_encoding)) + + return _body + + def _sanitize_for_serialization(self, obj): + """Build a JSON POST object. + + If obj is None, return None. + If obj is str, int, long, float, bool, return directly. + If obj is datetime.datetime, datetime.date + convert to string in iso8601 format. + If obj is list, sanitize each element in the list. + If obj is dict, return the dict. + If obj is OpenAPI model, return the properties dict. + + :param obj: The data to serialize. + :return: The serialized form of data. + """ + if obj is None: + return None + elif isinstance(obj, self.PRIMITIVE_TYPES): + return obj + elif isinstance(obj, list): + return [self._sanitize_for_serialization(sub_obj) + for sub_obj in obj] + elif isinstance(obj, tuple): + return tuple(self._sanitize_for_serialization(sub_obj) + for sub_obj in obj) + elif isinstance(obj, (datetime.datetime, datetime.date)): + return obj.isoformat() + + if isinstance(obj, dict): + obj_dict = obj + else: + # Convert model obj to dict except + # attributes `openapi_types`, `attribute_map` + # and attributes which value is not None. + # Convert attribute name to json key in + # model definition for request. + obj_dict = {obj.attribute_map[attr]: getattr(obj, attr) + for attr, _ in obj.openapi_types.items() + if getattr(obj, attr) is not None} + + return {key: self._sanitize_for_serialization(val) + for key, val in obj_dict.items()} + + def _translate_write_exception(self, exc, use_v2_api): + if use_v2_api and exc.status == HTTPStatus.METHOD_NOT_ALLOWED: + message = ("Server doesn't support the V2 API endpoint (/api/v2/write). " + "Set use_v2_api=False to use the V3 API endpoint.") + ex = ApiException(status=0, reason=message) + ex.message = message + ex.args = (message,) + return ex + if not use_v2_api and exc.status == HTTPStatus.METHOD_NOT_ALLOWED: + message = ("Server doesn't support the V3 API endpoint (/api/v3/write_lp). " + "Set use_v2_api=True to use the V2 API endpoint.") + ex = ApiException(status=0, reason=message) + ex.message = message + ex.args = (message,) + return ex + partial = InfluxDBPartialWriteError.from_response(exc.response) + if partial is not None: + return partial + return exc + + def _should_gzip(self, payload: str, enable_gzip: bool = False, gzip_threshold: int = None) -> bool: + """ + Determines whether gzip compression should be applied to the given payload based + on the specified conditions. This method evaluates the `enable_gzip` flag and + considers the size of the payload in relation to the optional `gzip_threshold`. + If `enable_gzip` is set to True and no threshold is provided, gzip compression + is advised without any size condition. If a threshold is specified, compression + is applied only when the size of the payload meets or exceeds the threshold. + By default, no compression is performed if `enable_gzip` is False. + + :param payload: The payload data as a string for which gzip determination is to + be made. + :type payload: str + :param enable_gzip: A flag indicating whether gzip compression is enabled. By + default, this flag is False. + :type enable_gzip: bool, optional + :param gzip_threshold: Optional threshold specifying the minimum size (in bytes) + of the payload to trigger gzip compression. Only considered if + `enable_gzip` is True. + :type gzip_threshold: int, optional + :return: A boolean value indicating True if gzip compression should be applied + based on the payload size, the enable_gzip flag, and the gzip_threshold. + :rtype: bool + """ + if enable_gzip is not False: + if gzip_threshold is not None: + payload_size = len(payload) if isinstance(payload, (bytes, bytearray)) else len(payload.encode('utf-8')) + return payload_size >= gzip_threshold + if enable_gzip is True: + return True + return False + + @staticmethod + def _on_error(ex): + logger.error("unexpected error during batching: %s", ex) + + def _to_response(self, data: _BatchItem, delay: datetime.timedelta): return rx.of(data).pipe( ops.subscribe_on(self._write_options.write_scheduler), # use delay if its specified @@ -663,9 +1043,6 @@ def _to_response(self, data: _BatchItem, delay: timedelta): ops.catch(handler=lambda exception, source: rx.just(_BatchResponse(exception=exception, data=data))), ) - def _jitter_delay(self): - return timedelta(milliseconds=random() * self._write_options.jitter_interval) - def _on_next(self, response: _BatchResponse): if response.exception: logger.error("The batch item wasn't processed successfully because: %s", response.exception) @@ -690,14 +1067,94 @@ def _on_next(self, response: _BatchResponse): except Exception as e: logger.error("The configured success callback threw an exception: %s", e) - @staticmethod - def _on_error(ex): - logger.error("unexpected error during batching: %s", ex) - def _on_complete(self): self._disposable.dispose() logger.debug("the batching processor was disposed") + def _append_default_tag(self, key, val, record): + from influxdb_client_3.write_client import Point + if isinstance(record, bytes) or isinstance(record, str): + pass + elif isinstance(record, Point): + record.tag(key, val) + elif isinstance(record, dict): + record.setdefault("tags", {}) + record.get("tags")[key] = val + elif isinstance(record, Iterable): + for item in record: + self._append_default_tag(key, val, item) + + def _append_default_tags(self, record): + if self._point_settings.defaultTags and record is not None: + for key, val in self._point_settings.defaultTags.items(): + self._append_default_tag(key, val, record) + + def _resolve_write_request_options(self, kwargs): + no_sync = kwargs.pop('no_sync', self._write_options.no_sync) + accept_partial = kwargs.pop('accept_partial', self._write_options.accept_partial) + use_v2_api = kwargs.pop('use_v2_api', self._write_options.use_v2_api) + if use_v2_api and no_sync: + raise ValueError("invalid write options: no_sync cannot be used with use_v2_api") + return no_sync, accept_partial, use_v2_api + + def _jitter_delay(self): + return datetime.timedelta(milliseconds=random() * self._write_options.jitter_interval) + + def _serialize(self, record, write_precision, payload, **kwargs): + from influxdb_client_3.write_client.client.write.point import Point + if isinstance(record, bytes): + payload[write_precision].append(record) + + elif isinstance(record, str): + self._serialize(record.encode(_UTF_8_encoding), write_precision, payload, **kwargs) + + elif isinstance(record, Point): + precision_from_point = kwargs.get('precision_from_point', True) + precision = record.write_precision if precision_from_point else write_precision + self._serialize(record.to_line_protocol(precision=precision, tag_order=kwargs.get('tag_order')), + precision, payload, **kwargs) + + elif isinstance(record, dict): + self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs), + write_precision, payload, **kwargs) + elif 'polars' in str(type(record)): + from influxdb_client_3.write_client.client.write.polars_dataframe_serializer import \ + PolarsDataframeSerializer + serializer = PolarsDataframeSerializer(record, self._point_settings, write_precision, **kwargs) + self._serialize(serializer.serialize(), write_precision, payload, **kwargs) + + elif 'pandas' in str(type(record)): + serializer = DataframeSerializer(record, self._point_settings, write_precision, **kwargs) + self._serialize(serializer.serialize(), write_precision, payload, **kwargs) + + elif hasattr(record, "_asdict"): + # noinspection PyProtectedMember + self._serialize(record._asdict(), write_precision, payload, **kwargs) + elif _HAS_DATACLASS and dataclasses.is_dataclass(record): + self._serialize(dataclasses.asdict(record), write_precision, payload, **kwargs) + elif isinstance(record, Iterable): + for item in record: + self._serialize(item, write_precision, payload, **kwargs) + + def __enter__(self): + """ + Enter the runtime context related to this object. + + It will bind this method’s return value to the target(s) + specified in the `as` clause of the statement. + + return: self instance + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit the runtime context related to this object and close the WriteApi.""" + self.close() + + def __del__(self): + """Close WriteApi.""" + self.close() + def __getstate__(self): """Return a dict of attributes that you want to pickle.""" state = self.__dict__.copy() @@ -711,8 +1168,7 @@ def __setstate__(self, state): """Set your object with the provided dict.""" self.__dict__.update(state) # Init Rx - self.__init__(self._influxdb_client, - self._write_options, + self.__init__(self._write_options, self._point_settings, success_callback=self._success_callback, error_callback=self._error_callback, diff --git a/influxdb_client_3/write_client/configuration.py b/influxdb_client_3/write_client/configuration.py deleted file mode 100644 index d52e9a34..00000000 --- a/influxdb_client_3/write_client/configuration.py +++ /dev/null @@ -1,244 +0,0 @@ -# coding: utf-8 - -from __future__ import absolute_import - -import copy -import logging -import multiprocessing -import sys - - -class TypeWithDefault(type): - - def __init__(cls, name, bases, dct): - """Initialize with defaults.""" - super(TypeWithDefault, cls).__init__(name, bases, dct) - cls._default = None - - def __call__(cls): - """Call self as a function.""" - if cls._default is None: - cls._default = type.__call__(cls) - return copy.copy(cls._default) - - def set_default(cls, default): - """Set dafaults.""" - cls._default = copy.copy(default) - - -class Configuration(object, metaclass=TypeWithDefault): - - def __init__(self): - """Initialize configuration.""" - # Default Base url - self.host = "http://localhost/api/v2" - # Temp file folder for downloading files - self.temp_folder_path = None - - # Authentication Settings - # dict to store API key(s) - self.api_key = {} - # dict to store API prefix (e.g. Bearer) - self.api_key_prefix = {} - - # Logging Settings - self.loggers = {} - # Log format - self.logger_format = '%(asctime)s %(levelname)s %(message)s' - # Log stream handler - self.logger_stream_handler = None - # Log file handler - self.logger_file_handler = None - # Debug file location - self.logger_file = None - # Debug switch - self.debug = False - - # SSL/TLS verification - # Set this to false to skip verifying SSL certificate when calling API - # from https server. - self.verify_ssl = True - # Set this to customize the certificate file to verify the peer. - self.ssl_ca_cert = None - # client certificate file - self.cert_file = None - # client key file - self.cert_key_file = None - # client key file password - self.cert_key_password = None - # Set this to True/False to enable/disable SSL hostname verification. - self.assert_hostname = None - - # Set this to specify a custom ssl context to inject this context inside the urllib3 connection pool. - self.ssl_context = None - - # urllib3 connection pool's maximum number of connections saved - # per pool. urllib3 uses 1 connection as default value, but this is - # not the best value when you are making a lot of possibly parallel - # requests to the same host, which is often the case here. - # cpu_count * 5 is used as default value to increase performance. - self.connection_pool_maxsize = multiprocessing.cpu_count() * 5 - # Timeout setting for a request. If one number provided, it will be total request timeout. - # It can also be a pair (tuple) of (connection, read) timeouts. - self.timeout = None - - # Proxy URL - self.proxy = None - # A dictionary containing headers that will be sent to the proxy - self.proxy_headers = None - # Safe chars for path_param - self.safe_chars_for_path_param = '' - - # Compression settings - self.enable_gzip = False - self.gzip_threshold = None - - @property - def logger_file(self): - """Logger file. - - If the logger_file is None, then add stream handler and remove file - handler. Otherwise, add file handler and remove stream handler. - - :param value: The logger_file path. - :type: str - """ - return self.__logger_file - - @logger_file.setter - def logger_file(self, value): - """Logger file. - - If the logger_file is None, then add stream handler and remove file - handler. Otherwise, add file handler and remove stream handler. - - :param value: The logger_file path. - :type: str - """ - self.__logger_file = value - if self.__logger_file: - # If set logging file, - # then add file handler and remove stream handler. - self.logger_file_handler = logging.FileHandler(self.__logger_file) - self.logger_file_handler.setFormatter(self.logger_formatter) - for _, logger in self.loggers.items(): - logger.addHandler(self.logger_file_handler) - - @property - def debug(self): - """Debug status. - - :param value: The debug status, True or False. - :type: bool - """ - return self.__debug - - @debug.setter - def debug(self, value): - """Debug status. - - :param value: The debug status, True or False. - :type: bool - """ - self.__debug = value - if self.__debug: - # if debug status is True, turn on debug logging - for name, logger in self.loggers.items(): - logger.setLevel(logging.DEBUG) - if name == 'influxdb_client.client.http': - # makes sure to do not duplicate stdout handler - if not any(map(lambda h: isinstance(h, logging.StreamHandler) and h.stream == sys.stdout, - logger.handlers)): - logger.addHandler(logging.StreamHandler(sys.stdout)) - # we use 'influxdb_client.client.http' logger instead of this - # httplib.HTTPConnection.debuglevel = 1 - else: - # if debug status is False, turn off debug logging, - # setting log level to default `logging.WARNING` - for _, logger in self.loggers.items(): - logger.setLevel(logging.WARNING) - # we use 'influxdb_client.client.http' logger instead of this - # httplib.HTTPConnection.debuglevel = 0 - - @property - def logger_format(self): - """Logger format. - - The logger_formatter will be updated when sets logger_format. - - :param value: The format string. - :type: str - """ - return self.__logger_format - - @logger_format.setter - def logger_format(self, value): - """Logger format. - - The logger_formatter will be updated when sets logger_format. - - :param value: The format string. - :type: str - """ - self.__logger_format = value - self.logger_formatter = logging.Formatter(self.__logger_format) - - def get_api_key_with_prefix(self, identifier): - """Get API key (with prefix if set). - - :param identifier: The identifier of apiKey. - :return: The token for api key authentication. - """ - if (self.api_key.get(identifier) and - self.api_key_prefix.get(identifier)): - return self.api_key_prefix[identifier] + ' ' + self.api_key[identifier] # noqa: E501 - elif self.api_key.get(identifier): - return self.api_key[identifier] - - def auth_settings(self): - """Get Auth Settings dict for api client. - - :return: The Auth Settings information dict. - """ - return { - 'TokenAuthentication': - { - 'type': 'api_key', - 'in': 'header', - 'key': 'Authorization', - 'value': self.get_api_key_with_prefix('Authorization') - }, - - } - - def to_debug_report(self): - """Get the essential information for debugging. - - :return: The report for debugging. - """ - from write_client import VERSION - return "Python SDK Debug Report:\n"\ - "OS: {env}\n"\ - "Python Version: {pyversion}\n"\ - "Version of the API: 2.0.0\n"\ - "SDK Package Version: {client_version}".\ - format(env=sys.platform, pyversion=sys.version, client_version=VERSION) - - def update_request_header_params(self, path: str, params: dict, should_gzip: bool = False): - """Update header params based on custom settings. - - :param path: Resource path. - :param params: Header parameters dict to be updated. - :param should_gzip: Describes if request body should be gzip compressed. - """ - pass - - def update_request_body(self, path: str, body, should_gzip: bool = False): - """Update http body based on custom settings. - - :param path: Resource path. - :param body: Request body to be updated. - :param should_gzip: Describes if request body should be gzip compressed. - :return: Updated body - """ - return body diff --git a/influxdb_client_3/write_client/service/__init__.py b/influxdb_client_3/write_client/service/__init__.py deleted file mode 100644 index 805c6d98..00000000 --- a/influxdb_client_3/write_client/service/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# flake8: noqa - -from __future__ import absolute_import - -# import apis into api package -from influxdb_client_3.write_client.service.write_service import WriteService \ No newline at end of file diff --git a/influxdb_client_3/write_client/service/_base_service.py b/influxdb_client_3/write_client/service/_base_service.py deleted file mode 100644 index cc0adcfe..00000000 --- a/influxdb_client_3/write_client/service/_base_service.py +++ /dev/null @@ -1,67 +0,0 @@ - - -# noinspection PyMethodMayBeStatic -class _BaseService(object): - - def __init__(self, api_client=None): - """Init common services operation.""" - if api_client is None: - raise ValueError("Invalid value for `api_client`, must be defined.") - self.api_client = api_client - self._build_type = None - - def _check_operation_params(self, operation_id, supported_params, local_params): - supported_params.append('async_req') - supported_params.append('_return_http_data_only') - supported_params.append('_preload_content') - supported_params.append('_request_timeout') - supported_params.append('urlopen_kw') - for key, val in local_params['kwargs'].items(): - if key not in supported_params: - raise TypeError( - f"Got an unexpected keyword argument '{key}'" - f" to method {operation_id}" - ) - local_params[key] = val - del local_params['kwargs'] - - def _is_cloud_instance(self) -> bool: - if not self._build_type: - self._build_type = self.build_type() - return 'cloud' in self._build_type.lower() - - async def _is_cloud_instance_async(self) -> bool: - if not self._build_type: - self._build_type = await self.build_type_async() - return 'cloud' in self._build_type.lower() - - def build_type(self) -> str: - """ - Return the build type of the connected InfluxDB Server. - - :return: The type of InfluxDB build. - """ - from write_client import PingService - ping_service = PingService(self.api_client) - - response = ping_service.get_ping_with_http_info(_return_http_data_only=False) - return self.response_header(response, header_name='X-Influxdb-Build') - - async def build_type_async(self) -> str: - """ - Return the build type of the connected InfluxDB Server. - - :return: The type of InfluxDB build. - """ - from write_client import PingService - ping_service = PingService(self.api_client) - - response = await ping_service.get_ping_async(_return_http_data_only=False) - return self.response_header(response, header_name='X-Influxdb-Build') - - def response_header(self, response, header_name='X-Influxdb-Version') -> str: - if response is not None and len(response) >= 3: - if header_name in response[2]: - return response[2][header_name] - - return "unknown" diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py deleted file mode 100644 index 694430e7..00000000 --- a/influxdb_client_3/write_client/service/write_service.py +++ /dev/null @@ -1,321 +0,0 @@ -# coding: utf-8 - -from __future__ import absolute_import - -import re # noqa: F401 -from http import HTTPStatus - -from influxdb_client_3.write_client.domain.write_precision_converter import WritePrecisionConverter -from influxdb_client_3.write_client.rest import ApiException -from influxdb_client_3.write_client.service._base_service import _BaseService -from influxdb_client_3.exceptions import InfluxDBPartialWriteError -from influxdb_client_3.write_client.write_defaults import ( - DEFAULT_WRITE_ACCEPT_PARTIAL, - DEFAULT_WRITE_NO_SYNC, - DEFAULT_WRITE_USE_V2_API, -) - - -class WriteService(_BaseService): - - def __init__(self, api_client=None): # noqa: E501,D401,D403 - """WriteService - a operation defined in OpenAPI.""" - super().__init__(api_client) - - def post_write(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 - """Write data. - - Writes data to a bucket. Use this endpoint to send data in [line protocol](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/) format to InfluxDB. #### InfluxDB Cloud - Does the following when you send a write request: 1. Validates the request and queues the write. 2. If queued, responds with _success_ (HTTP `2xx` status code); _error_ otherwise. 3. Handles the delete asynchronously and reaches eventual consistency. To ensure that InfluxDB Cloud handles writes and deletes in the order you request them, wait for a success response (HTTP `2xx` status code) before you send the next request. Because writes and deletes are asynchronous, your change might not yet be readable when you receive the response. #### InfluxDB OSS - Validates the request and handles the write synchronously. - If all points were written successfully, responds with HTTP `2xx` status code; otherwise, returns the first line that failed. #### Required permissions - `write-buckets` or `write-bucket BUCKET_ID`. *`BUCKET_ID`* is the ID of the destination bucket. #### Rate limits (with InfluxDB Cloud) `write` rate limits apply. For more information, see [limits and adjustable quotas](https://docs.influxdata.com/influxdb/cloud/account-management/limits/). #### Related guides - [Write data with the InfluxDB API](https://docs.influxdata.com/influxdb/latest/write-data/developer-tools/api) - [Optimize writes to InfluxDB](https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) - [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/) - This method makes a synchronous HTTP request by default. To make an - asynchronous HTTP request, please pass async_req=True - >>> thread = api.post_write(org, bucket, body, async_req=True) - >>> result = thread.get() - - :param async_req bool - :param str org: An organization name or ID. #### InfluxDB Cloud - Doesn't use the `org` parameter or `orgID` parameter. - Writes data to the bucket in the organization associated with the authorization (API token). #### InfluxDB OSS - Requires either the `org` parameter or the `orgID` parameter. - If you pass both `orgID` and `org`, they must both be valid. - Writes data to the bucket in the specified organization. (required) - :param str bucket: A bucket name or ID. InfluxDB writes all points in the batch to the specified bucket. (required) - :param str body: In the request body, provide data in [line protocol format](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/). To send compressed data, do the following: 1. Use [GZIP](https://www.gzip.org/) to compress the line protocol data. 2. In your request, send the compressed data and the `Content-Encoding: gzip` header. #### Related guides - [Best practices for optimizing writes](https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) (required) - :param str zap_trace_span: OpenTracing span context - :param str content_encoding: The compression applied to the line protocol in the request payload. To send a GZIP payload, pass `Content-Encoding: gzip` header. - :param str content_type: The format of the data in the request body. To send a line protocol payload, pass `Content-Type: text/plain; charset=utf-8`. - :param int content_length: The size of the entity-body, in bytes, sent to InfluxDB. If the length is greater than the `max body` configuration option, the server responds with status code `413`. - :param str accept: The content type that the client can understand. Writes only return a response body if they fail--for example, due to a formatting problem or quota limit. #### InfluxDB Cloud - Returns only `application/json` for format and limit errors. - Returns only `text/html` for some quota limit errors. #### InfluxDB OSS - Returns only `application/json` for format and limit errors. #### Related guides - [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/) - :param str org_id: An organization ID. #### InfluxDB Cloud - Doesn't use the `org` parameter or `orgID` parameter. - Writes data to the bucket in the organization associated with the authorization (API token). #### InfluxDB OSS - Requires either the `org` parameter or the `orgID` parameter. - If you pass both `orgID` and `org`, they must both be valid. - Writes data to the bucket in the specified organization. - :param WritePrecision precision: The precision for unix timestamps in the line protocol batch. - :param bool no_sync: Instructs the server whether to wait with the response until WAL persistence completes. True value means faster write but without the confirmation that the data was persisted. Note: This option is supported by InfluxDB 3 Core and Enterprise servers only. For other InfluxDB 3 server types (InfluxDB Clustered, InfluxDB Clould Serverless/Dedicated) the write operation will fail with an error. - :return: None - If the method is called asynchronously, - returns the request thread. - """ # noqa: E501 - - kwargs['_return_http_data_only'] = True - if kwargs.get('async_req'): - thread = self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501 - return thread - else: - (data) = self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501 - return data - - # TODO review this documentation - is it still up-to-date? - def post_write_with_http_info(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 - """Write data. - - Writes data to a bucket. - Use this endpoint to send data in - [line protocol](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/) format - to InfluxDB. - #### InfluxDB Cloud - - Does the following when you send a write request: - 1. Validates the request and queues the write. - 2. If queued, responds with _success_ (HTTP `2xx` status code); _error_ otherwise. - 3. Handles the delete asynchronously and reaches eventual consistency. - To ensure that InfluxDB Cloud handles writes and deletes in the order you request them, wait for a success - response (HTTP `2xx` status code) before you send the next request. Because writes and deletes are - asynchronous, your change might not yet be readable when you receive the response. - #### InfluxDB OSS - - Validates the request and handles the write synchronously. - - If all points were written successfully, responds with HTTP `2xx` status code; - otherwise, returns the first line that failed. #### Required permissions - - `write-buckets` or `write-bucket BUCKET_ID`. *`BUCKET_ID`* is the ID of the destination bucket. - #### Rate limits (with InfluxDB Cloud) `write` rate limits apply. - For more information, see - [limits and adjustable quotas](https://docs.influxdata.com/influxdb/cloud/account-management/limits/). - #### Related guides - - [Write data with the InfluxDB API] - (https://docs.influxdata.com/influxdb/latest/write-data/developer-tools/api) - - [Optimize writes to InfluxDB] - (https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) - - [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/) - This method makes a synchronous HTTP request by default. To make an - asynchronous HTTP request, please pass async_req=True - >>> thread = api.post_write_with_http_info(org, bucket, body, async_req=True) - >>> result = thread.get() - - :param async_req bool - :param str org: An organization name or ID. - #### InfluxDB Cloud - Doesn't use the `org` parameter or `orgID` parameter. - - Writes data to the bucket in the organization associated with the authorization (API token). - #### InfluxDB OSS - Requires either the `org` parameter or the `orgID` parameter. - - If you pass both `orgID` and `org`, they must both be valid. - - Writes data to the bucket in the specified organization. (required) - :param str bucket: A bucket name or ID. InfluxDB writes all points in the batch to the - specified bucket. (required) - :param str body: In the request body, provide data in - [line protocol format](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/). - To send compressed data, do the following: - 1. Use [GZIP](https://www.gzip.org/) to compress the line protocol data. - 2. In your request, send the compressed data and the `Content-Encoding: gzip` header. - #### Related guides - - [Best practices for optimizing writes] - (https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) (required) - :param str zap_trace_span: OpenTracing span context - :param str content_encoding: The compression applied to the line protocol in the request payload. - To send a GZIP payload, pass `Content-Encoding: gzip` header. - :param str content_type: The format of the data in the request body. To send a line protocol payload, - pass `Content-Type: text/plain; charset=utf-8`. - :param int content_length: The size of the entity-body, in bytes, sent to InfluxDB. If the length is greater - than the `max body` configuration option, the server responds with status code `413`. - :param str accept: The content type that the client can understand. Writes only return a response body - if they fail--for example, due to a formatting problem or quota limit. - #### InfluxDB Cloud - - Returns only `application/json` for format and limit errors. - - Returns only `text/html` for some quota limit errors. - #### InfluxDB OSS - - Returns only `application/json` for format and limit errors. - #### Related guides - - [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/) - :param str org_id: An organization ID. - #### InfluxDB Cloud - - Doesn't use the `org` parameter or `orgID` parameter. - - Writes data to the bucket in the organization associated with the authorization (API token). - #### InfluxDB OSS - - Requires either the `org` parameter or the `orgID` parameter. - - If you pass both `orgID` and `org`, they must both be valid. - - Writes data to the bucket in the specified organization. - :param WritePrecision precision: The precision for unix timestamps in the line protocol batch. - :param bool no_sync: Instructs the server whether to wait with the response until WAL persistence completes. - True value means faster write but without the confirmation that the data was persisted. - Note: This option is supported by InfluxDB 3 Core and Enterprise servers only. - For other InfluxDB 3 server types (InfluxDB Clustered, InfluxDB Clould Serverless/Dedicated) the write - operation will fail with an error. - :return: None - If the method is called asynchronously, - returns the request thread. - """ # noqa: E501 - # noqa: E501 - local_var_params, path, path_params, query_params, header_params, body_params = \ - self._post_write_prepare(org, bucket, body, **kwargs) # noqa: E501 - - use_v2_api = local_var_params['use_v2_api'] - try: - result = self.api_client.call_api( - path, 'POST', - path_params, - query_params, - header_params, - body=body_params, - post_params=[], - files={}, - response_type=None, # noqa: E501 - auth_settings=[], - async_req=local_var_params.get('async_req'), - _return_http_data_only=local_var_params.get('_return_http_data_only'), # noqa: E501 - _preload_content=local_var_params.get('_preload_content', True), - _request_timeout=local_var_params.get('_request_timeout'), - collection_formats={}, - urlopen_kw=kwargs.get('urlopen_kw', None)) - if local_var_params.get('async_req'): - original_get = result.get - - def translated_get(timeout=None): - try: - return original_get(timeout=timeout) - except ApiException as e: - raise self._translate_write_exception(e, use_v2_api) - - result.get = translated_get - return result - except ApiException as e: - raise self._translate_write_exception(e, use_v2_api) - - @staticmethod - def _translate_write_exception(exc, use_v2_api): - if use_v2_api and exc.status == HTTPStatus.METHOD_NOT_ALLOWED: - message = ("Server doesn't support the V2 API endpoint (/api/v2/write). " - "Set use_v2_api=False to use the V3 API endpoint.") - ex = ApiException(status=0, reason=message) - ex.message = message - ex.args = (message,) - return ex - if not use_v2_api and exc.status == HTTPStatus.METHOD_NOT_ALLOWED: - message = ("Server doesn't support the V3 API endpoint (/api/v3/write_lp). " - "Set use_v2_api=True to use the V2 API endpoint.") - ex = ApiException(status=0, reason=message) - ex.message = message - ex.args = (message,) - return ex - partial = InfluxDBPartialWriteError.from_response(exc.response) - if partial is not None: - return partial - return exc - - async def post_write_async(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 - """Write data. - - Writes data to a bucket. Use this endpoint to send data in [line protocol](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/) format to InfluxDB. #### InfluxDB Cloud - Does the following when you send a write request: 1. Validates the request and queues the write. 2. If queued, responds with _success_ (HTTP `2xx` status code); _error_ otherwise. 3. Handles the delete asynchronously and reaches eventual consistency. To ensure that InfluxDB Cloud handles writes and deletes in the order you request them, wait for a success response (HTTP `2xx` status code) before you send the next request. Because writes and deletes are asynchronous, your change might not yet be readable when you receive the response. #### InfluxDB OSS - Validates the request and handles the write synchronously. - If all points were written successfully, responds with HTTP `2xx` status code; otherwise, returns the first line that failed. #### Required permissions - `write-buckets` or `write-bucket BUCKET_ID`. *`BUCKET_ID`* is the ID of the destination bucket. #### Rate limits (with InfluxDB Cloud) `write` rate limits apply. For more information, see [limits and adjustable quotas](https://docs.influxdata.com/influxdb/cloud/account-management/limits/). #### Related guides - [Write data with the InfluxDB API](https://docs.influxdata.com/influxdb/latest/write-data/developer-tools/api) - [Optimize writes to InfluxDB](https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) - [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/) - This method makes an asynchronous HTTP request. - - :param async_req bool - :param str org: An organization name or ID. #### InfluxDB Cloud - Doesn't use the `org` parameter or `orgID` parameter. - Writes data to the bucket in the organization associated with the authorization (API token). #### InfluxDB OSS - Requires either the `org` parameter or the `orgID` parameter. - If you pass both `orgID` and `org`, they must both be valid. - Writes data to the bucket in the specified organization. (required) - :param str bucket: A bucket name or ID. InfluxDB writes all points in the batch to the specified bucket. (required) - :param str body: In the request body, provide data in [line protocol format](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/). To send compressed data, do the following: 1. Use [GZIP](https://www.gzip.org/) to compress the line protocol data. 2. In your request, send the compressed data and the `Content-Encoding: gzip` header. #### Related guides - [Best practices for optimizing writes](https://docs.influxdata.com/influxdb/latest/write-data/best-practices/optimize-writes/) (required) - :param str zap_trace_span: OpenTracing span context - :param str content_encoding: The compression applied to the line protocol in the request payload. To send a GZIP payload, pass `Content-Encoding: gzip` header. - :param str content_type: The format of the data in the request body. To send a line protocol payload, pass `Content-Type: text/plain; charset=utf-8`. - :param int content_length: The size of the entity-body, in bytes, sent to InfluxDB. If the length is greater than the `max body` configuration option, the server responds with status code `413`. - :param str accept: The content type that the client can understand. Writes only return a response body if they fail--for example, due to a formatting problem or quota limit. #### InfluxDB Cloud - Returns only `application/json` for format and limit errors. - Returns only `text/html` for some quota limit errors. #### InfluxDB OSS - Returns only `application/json` for format and limit errors. #### Related guides - [Troubleshoot issues writing data](https://docs.influxdata.com/influxdb/latest/write-data/troubleshoot/) - :param str org_id: An organization ID. #### InfluxDB Cloud - Doesn't use the `org` parameter or `orgID` parameter. - Writes data to the bucket in the organization associated with the authorization (API token). #### InfluxDB OSS - Requires either the `org` parameter or the `orgID` parameter. - If you pass both `orgID` and `org`, they must both be valid. - Writes data to the bucket in the specified organization. - :param WritePrecision precision: The precision for unix timestamps in the line protocol batch. - :param bool no_sync: Instructs the server whether to wait with the response until WAL persistence completes. True value means faster write but without the confirmation that the data was persisted. Note: This option is supported by InfluxDB 3 Core and Enterprise servers only. For other InfluxDB 3 server types (InfluxDB Clustered, InfluxDB Clould Serverless/Dedicated) the write operation will fail with an error. - :return: None - If the method is called asynchronously, - returns the request thread. - """ # noqa: E501 - local_var_params, path, path_params, query_params, header_params, body_params = \ - self._post_write_prepare(org, bucket, body, **kwargs) # noqa: E501 - use_v2_api = local_var_params['use_v2_api'] - - try: - return await self.api_client.call_api( - path, 'POST', - path_params, - query_params, - header_params, - body=body_params, - post_params=[], - files={}, - response_type=None, # noqa: E501 - auth_settings=[], - async_req=local_var_params.get('async_req'), - _return_http_data_only=local_var_params.get('_return_http_data_only'), # noqa: E501 - _preload_content=local_var_params.get('_preload_content', True), - _request_timeout=local_var_params.get('_request_timeout'), - collection_formats={}, - urlopen_kw=kwargs.get('urlopen_kw', None)) - except ApiException as e: - raise self._translate_write_exception(e, use_v2_api) - - def _post_write_prepare(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403 - local_var_params = dict(locals()) - - all_params = ['org', 'bucket', 'body', 'zap_trace_span', 'content_encoding', 'content_type', 'content_length', - 'accept', 'org_id', 'precision', 'no_sync', 'accept_partial', 'use_v2_api'] # noqa: E501 - self._check_operation_params('post_write', all_params, local_var_params) - local_var_params.setdefault('use_v2_api', DEFAULT_WRITE_USE_V2_API) - local_var_params.setdefault('no_sync', DEFAULT_WRITE_NO_SYNC) - local_var_params.setdefault('accept_partial', DEFAULT_WRITE_ACCEPT_PARTIAL) - # verify the required parameter 'org' is set - if ('org' not in local_var_params or - local_var_params['org'] is None): - raise ValueError("Missing the required parameter `org` when calling `post_write`") # noqa: E501 - # verify the required parameter 'bucket' is set - if ('bucket' not in local_var_params or - local_var_params['bucket'] is None): - raise ValueError("Missing the required parameter `bucket` when calling `post_write`") # noqa: E501 - # verify the required parameter 'body' is set - if ('body' not in local_var_params or - local_var_params['body'] is None): - raise ValueError("Missing the required parameter `body` when calling `post_write`") # noqa: E501 - - path_params = {} - query_params = [] - - use_v2_api = local_var_params['use_v2_api'] - no_sync = local_var_params['no_sync'] - accept_partial = local_var_params['accept_partial'] - if 'org' in local_var_params: - query_params.append(('org', local_var_params['org'])) # noqa: E501 - if 'org_id' in local_var_params: - query_params.append(('orgID', local_var_params['org_id'])) # noqa: E501 - if 'bucket' in local_var_params: - query_params.append(('bucket' if use_v2_api else 'db', local_var_params['bucket'])) # noqa: E501 - - if use_v2_api: - path = '/api/v2/write' - if 'precision' in local_var_params: - precision = local_var_params['precision'] - query_params.append(('precision', WritePrecisionConverter.to_v2_api_string(precision))) # noqa: E501 - else: - path = '/api/v3/write_lp' - if 'precision' in local_var_params: - precision = local_var_params['precision'] - query_params.append(('precision', WritePrecisionConverter.to_v3_api_string(precision))) # noqa: E501 - if no_sync: - query_params.append(('no_sync', 'true')) - if accept_partial is False: - query_params.append(('accept_partial', 'false')) - - header_params = {} - if 'zap_trace_span' in local_var_params: - header_params['Zap-Trace-Span'] = local_var_params['zap_trace_span'] # noqa: E501 - if 'content_encoding' in local_var_params: - header_params['Content-Encoding'] = local_var_params['content_encoding'] # noqa: E501 - if 'content_type' in local_var_params: - header_params['Content-Type'] = local_var_params['content_type'] # noqa: E501 - if 'content_length' in local_var_params: - header_params['Content-Length'] = local_var_params['content_length'] # noqa: E501 - if 'accept' in local_var_params: - header_params['Accept'] = local_var_params['accept'] # noqa: E501 - - body_params = None - if 'body' in local_var_params: - body_params = local_var_params['body'] - # HTTP header `Accept` - header_params['Accept'] = self.api_client.select_header_accept( - ['application/json', 'text/html', ]) # noqa: E501 - - # HTTP header `Content-Type` - header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501 - ['text/plain']) # noqa: E501 - - return local_var_params, path, path_params, query_params, header_params, body_params diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 1183aadc..193bc80f 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -2,6 +2,8 @@ import unittest from collections import defaultdict from unittest.mock import patch + +import pandas as pd from pytest_httpserver import HTTPServer from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions, Point, WriteOptions, WriteType, \ @@ -12,8 +14,6 @@ from tests.util import asyncio_run from tests.util.mocks import ConstantFlightServer, ConstantData, ErrorFlightServer -import pandas as pd - try: import polars as pl HAS_POLARS = True @@ -29,11 +29,9 @@ def http_server(): class TestInfluxDBClient3(unittest.TestCase): - @patch('influxdb_client_3._InfluxDBClient') @patch('influxdb_client_3._WriteApi') @patch('influxdb_client_3._QueryApi') - def setUp(self, mock_query_api, mock_write_api, mock_influx_db_client): - self.mock_influx_db_client = mock_influx_db_client + def setUp(self, mock_query_api, mock_write_api): self.mock_write_api = mock_write_api self.mock_query_api = mock_query_api self.client = InfluxDBClient3( @@ -51,7 +49,6 @@ def tearDown(self): def test_init(self): self.assertEqual(self.client._org, "my_org") self.assertEqual(self.client._database, "my_db") - self.assertEqual(self.client._client, self.mock_influx_db_client.return_value) self.assertEqual(self.client._write_api, self.mock_write_api.return_value) self.assertEqual(self.client._query_api, self.mock_query_api.return_value) @@ -63,7 +60,7 @@ def test_token_auth_scheme_default(self): database="my_db", token="my_token", ) - self.assertEqual(client._client.auth_header_value, "Token my_token") + self.assertEqual(client.default_header['Authorization'], "Token my_token") # test explicit token auth_scheme def test_token_auth_scheme_explicit(self): @@ -72,9 +69,9 @@ def test_token_auth_scheme_explicit(self): org="my_org", database="my_db", token="my_token", - auth_scheme="my_scheme" + auth_scheme="Bearer" ) - self.assertEqual(client._client.auth_header_value, "my_scheme my_token") + self.assertEqual(client.default_header['Authorization'], "Bearer my_token") def test_write_options(self): client = InfluxDBClient3( @@ -309,11 +306,11 @@ def test_from_env_all_env_vars_set(self): client = InfluxDBClient3.from_env() self.assertIsInstance(client, InfluxDBClient3) self.assertEqual(client._token, "test_token") - self.assertEqual(client._client.url, "https://localhost:443") - self.assertEqual(client._client.auth_header_value, f"custom_scheme {client._token}") + self.assertEqual(client.base_url, "https://localhost:443") + self.assertEqual(client.default_header['Authorization'], f"custom_scheme {client._token}") self.assertEqual(client._database, "test_db") self.assertEqual(client._org, "test_org") - self.assertEqual(client._client.api_client.rest_client.configuration.gzip_threshold, 2000) + self.assertEqual(client._write_api.gzip_threshold, 2000) write_options = client._write_client_options.get("write_options") self.assertEqual(write_options.write_precision, WritePrecision.MS) @@ -545,10 +542,9 @@ def test_get_version_fail(self): class TestWriteDataFrame(unittest.TestCase): """Tests for the write_dataframe() method.""" - @patch('influxdb_client_3._InfluxDBClient') @patch('influxdb_client_3._WriteApi') @patch('influxdb_client_3._QueryApi') - def setUp(self, mock_query_api, mock_write_api, mock_influx_db_client): + def setUp(self, mock_query_api, mock_write_api): self.mock_write_api = mock_write_api self.client = InfluxDBClient3( host="localhost", diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 8d3dffa5..845fd8bf 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -1,3 +1,4 @@ +import json import logging import os import random @@ -13,6 +14,7 @@ from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \ WriteType, InfluxDB3ClientQueryError from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError +from influxdb_client_3.write_client.rest import ApiException from tests.util import asyncio_run, lp_to_py_object @@ -133,28 +135,37 @@ def test_v3_error(self): for accept_partial in [True, False]: with self.subTest(accept_partial=accept_partial): with InfluxDBClient3( - host=self.host, - database=self.database, - token=self.token, - write_client_options=write_client_options(write_options=WriteOptions( - write_type=WriteType.synchronous, - use_v2_api=False, - accept_partial=accept_partial - )) + host=self.host, + database=self.database, + token=self.token, + write_client_options=write_client_options(write_options=WriteOptions( + write_type=WriteType.synchronous, + use_v2_api=False, + accept_partial=accept_partial + )) ) as client: - with self.assertRaises(InfluxDBPartialWriteError) as err: - client.write(lp) - - msg = err.exception.message - self.assertTrue( - "partial write of line protocol occurred" in msg or "parsing failed for write_lp endpoint" in msg - ) - self.assertIn(( - "invalid column type for column 'temp', expected iox::column_type::field::float, " - "got iox::column_type::field::string" - ), msg) - self.assertIn("line 2", msg) - self.assertIn("home,room=Sunroom", msg) + if accept_partial: + with self.assertRaises(InfluxDBPartialWriteError) as err: + client.write(lp) + + self.assertEqual(1, len(err.exception.line_errors)) + line_error = err.exception.line_errors[0] + self.assertEqual(2, line_error.line_number) + self.assertIn("invalid column type for column 'temp'", line_error.error_message) + self.assertIn("home,room=Sunroom", line_error.original_line) + else: + with self.assertRaises(ApiException) as err: + client.write(lp) + + self.assertEqual(400, err.exception.status) + self.assertEqual("line protocol parsing error", err.exception.message) + body = json.loads(err.exception.body) + self.assertEqual("line protocol parsing error", body["error"]) + self.assertEqual(2, body["data"]["line_number"]) + self.assertIn( + "invalid column type for column 'temp'", + body["data"]["error_message"], + ) def test_v2_error(self): lp = "\n".join([ @@ -162,23 +173,26 @@ def test_v2_error(self): "home,room=Sunroom temp=\"hi\" 1735549200", ]) - with InfluxDBClient3( - host=self.host, - database=self.database, - token=self.token, - write_client_options=write_client_options(write_options=WriteOptions( - write_type=WriteType.synchronous, - use_v2_api=True, - accept_partial=False - )) - ) as client: - with self.assertRaises(InfluxDBError) as err: - client.write(lp) - - self.assertNotIsInstance(err.exception, InfluxDBPartialWriteError) - self.assertIsNotNone(err.exception.response) - self.assertEqual(400, err.exception.response.status) - self.assertTrue(err.exception.message) + for accept_partial in [True, False]: + with self.subTest(accept_partial=accept_partial): + with InfluxDBClient3( + host=self.host, + database=self.database, + token=self.token, + write_client_options=write_client_options(write_options=WriteOptions( + write_type=WriteType.synchronous, + use_v2_api=True, + accept_partial=accept_partial + )) + ) as client: + with self.assertRaises(ApiException) as err: + client.write(lp) + + self.assertEqual(400, err.exception.status) + self.assertNotIsInstance(err.exception, InfluxDBPartialWriteError) + body = json.loads(err.exception.body) + self.assertEqual("invalid", body["code"]) + self.assertIn("write buffer error", body["message"]) def test_auth_error_token(self): self.client = InfluxDBClient3(host=self.host, database=self.database, token='fake token') diff --git a/tests/test_polars.py b/tests/test_polars.py index 42725465..cbabae2e 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -1,10 +1,9 @@ import importlib.util import time import unittest -from unittest.mock import Mock, ANY +from unittest import mock -from influxdb_client_3 import PointSettings, InfluxDBClient3, write_client_options, WriteOptions -from influxdb_client_3.write_client import WriteService +from influxdb_client_3 import PointSettings, InfluxDBClient3, WriteOptions, write_client_options from influxdb_client_3.write_client.client.write.polars_dataframe_serializer import polars_data_frame_to_list_of_points @@ -164,7 +163,8 @@ def test_write_polars(self): "time": pl.Series(["2024-08-01 00:00:00", "2024-08-01 01:00:00"]).str.to_datetime(time_unit='ns'), "temperature": [22.4, 21.8], }) - self.client._write_api._write_service = Mock(spec=WriteService) + + self.client._write_api._call_api = mock.Mock() self.client.write( database="database", @@ -173,7 +173,7 @@ def test_write_polars(self): data_frame_timestamp_column="time", ) - actual = self.client._write_api._write_service.post_write.call_args[1]['body'] + actual = self.client._write_api._call_api.call_args.args[4] self.assertEqual(b'measurement temperature=22.4 1722470400000000000\n' b'measurement temperature=21.8 1722474000000000000', actual) @@ -192,7 +192,8 @@ def test_write_polars_batching(self): ) ) self.client._write_api._write_options = WriteOptions(batch_size=2) - self.client._write_api._write_service = Mock(spec=WriteService) + + self.client._write_api._call_api = mock.Mock() self.client.write( database="database", @@ -202,14 +203,8 @@ def test_write_polars_batching(self): ) time.sleep(0.5) - self.client._write_api._write_service.post_write.assert_called_once_with( - org=ANY, - bucket=ANY, - precision=ANY, - no_sync=ANY, - accept_partial=ANY, - use_v2_api=ANY, - async_req=ANY, - content_type=ANY, - urlopen_kw=ANY, - body=b'measurement temperature=22.4 1722470400000000000\nmeasurement temperature=21.8 1722474000000000000') + args = self.client._write_api._call_api.call_args.args + body = args[4] + self.assertEqual(self.client._write_api._call_api.call_count, 1) + self.assertEqual(b'measurement temperature=22.4 1722470400000000000\nmeasurement ' + b'temperature=21.8 1722474000000000000', body) diff --git a/tests/test_api_client.py b/tests/test_write_api.py similarity index 72% rename from tests/test_api_client.py rename to tests/test_write_api.py index 6d0c5207..d0c9720e 100644 --- a/tests/test_api_client.py +++ b/tests/test_write_api.py @@ -8,96 +8,50 @@ from urllib3 import response from urllib3.exceptions import ConnectTimeoutError -from influxdb_client_3.write_client._sync.api_client import ApiClient -from influxdb_client_3.write_client.configuration import Configuration -from influxdb_client_3.exceptions import InfluxDBError, InfluxDBPartialWriteError -from influxdb_client_3.write_client.rest import ApiException -from influxdb_client_3.write_client.service import WriteService +from influxdb_client_3 import InfluxDBClient3, InfluxDBError +from influxdb_client_3.exceptions import InfluxDBPartialWriteError from influxdb_client_3.version import VERSION +from influxdb_client_3.write_client.rest import ApiException _package = "influxdb3-python" _sentHeaders = {} -def mock_rest_request(method, - url, - query_params=None, - headers=None, - body=None, - post_params=None, - _preload_content=True, - _request_timeout=None, - **urlopen_kw): - class MockResponse: - def __init__(self, data, status_code): - self.data = data - self.status_code = status_code - - def data(self): - return self.data - - global _sentHeaders - _sentHeaders = headers - - return MockResponse(None, 200) - - -class ApiClientTests(unittest.TestCase): - +class WriteApiTests(unittest.TestCase): received_timeout_total = None def mock_urllib3_timeout_request(method, url, - body, - headers, **urlopen_kw): if urlopen_kw.get('timeout', None) is not None: - ApiClientTests.received_timeout_total = urlopen_kw['timeout'].total + WriteApiTests.received_timeout_total = urlopen_kw['timeout'].total raise ConnectTimeoutError() return response.HTTPResponse(status=200, version=4, reason="OK", decode_content=False, request_url=url) - def test_default_headers(self): - conf = Configuration() - client = ApiClient(conf, - header_name="Authorization", - header_value="Bearer TEST_TOKEN") - self.assertIsNotNone(client.default_headers["User-Agent"]) - self.assertIsNotNone(client.default_headers["Authorization"]) - self.assertEqual(f"{_package}/{VERSION}", client.default_headers["User-Agent"]) - self.assertEqual("Bearer TEST_TOKEN", client.default_headers["Authorization"]) - - @mock.patch("influxdb_client_3.write_client._sync.rest.RESTClientObject.request", - side_effect=mock_rest_request) - def test_call_api(self, mock_post): - global _sentHeaders - _sentHeaders = {} - - conf = Configuration() - client = ApiClient(conf, - header_name="Authorization", - header_value="Bearer TEST_TOKEN") - service = WriteService(client) - service.post_write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14") - self.assertEqual(4, len(_sentHeaders.keys())) - self.assertIsNotNone(_sentHeaders["Accept"]) - self.assertEqual("application/json", _sentHeaders["Accept"]) - self.assertIsNotNone(_sentHeaders["Content-Type"]) - self.assertEqual("text/plain", _sentHeaders["Content-Type"]) - self.assertIsNotNone(_sentHeaders["Authorization"]) - self.assertEqual("Bearer TEST_TOKEN", _sentHeaders["Authorization"]) - self.assertIsNotNone(_sentHeaders["User-Agent"]) - self.assertEqual(f"{_package}/{VERSION}", _sentHeaders["User-Agent"]) - def _test_api_error(self, body): - conf = Configuration() - client = ApiClient(conf) - client.rest_client.pool_manager.request \ + client = InfluxDBClient3( + token='my-token', + database='my-bucket', + org='my-org' + ) + client._write_api.rest_client.pool_manager.request \ = mock.Mock(return_value=response.HTTPResponse(status=400, reason='Bad Request', body=body.encode())) - service = WriteService(client) - service.post_write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14") + client._write_api.write(record="data,foo=bar val=3.14") + + def test_default_headers(self): + client = InfluxDBClient3( + token='my-token', + database='my-bucket', + org='my-org' + ) + write_api = client._write_api + self.assertIsNotNone(write_api.default_header["User-Agent"]) + self.assertIsNotNone(write_api.default_header["Authorization"]) + self.assertEqual(f"{_package}/{VERSION}", write_api.default_header["User-Agent"]) + self.assertEqual("Token my-token", write_api.default_header["Authorization"]) def test_api_error_cloud(self): response_body = '{"message": "parsing failed for write_lp endpoint"}' @@ -272,12 +226,16 @@ def test_partial_write_from_response_guards(self): def test_api_error_headers(self): body = '{"error": "test error"}' body_dic = json.loads(body) - conf = Configuration() - local_client = ApiClient(conf) traceid = "123456789ABCDEF0" requestid = uuid.uuid4().__str__() - local_client.rest_client.pool_manager.request = mock.Mock( + client = InfluxDBClient3( + token='my-token', + database='my-bucket', + org='my-org' + ) + + client._write_api.rest_client.pool_manager.request = mock.Mock( return_value=response.HTTPResponse( status=400, reason='Bad Request', @@ -291,8 +249,7 @@ def test_api_error_headers(self): ) ) with self.assertRaises(InfluxDBError) as err: - service = WriteService(local_client) - service.post_write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14") + client._write_api.write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14") self.assertEqual(body_dic['error'], err.exception.message) headers = err.exception.getheaders() self.assertEqual(4, len(headers)) @@ -303,55 +260,72 @@ def test_api_error_headers(self): @mock.patch("urllib3._request_methods.RequestMethods.request", side_effect=mock_urllib3_timeout_request) - def test_request_config_timeout(self, mock_request): - conf = Configuration() - conf.host = "http://localhost:8181" - conf.timeout = 300 - local_client = ApiClient(conf) - service = WriteService(local_client) + def test_write_timeout(self, mock_request): + host = "http://localhost:8181" + timeout = 300 + client = InfluxDBClient3( + host=host, + token='my-token', + database='my-bucket', + org='my-org', + write_timeout=timeout + ) + with pytest.raises(ConnectTimeoutError): - service.post_write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14", - _preload_content=False) + client._write_api.write("TEST_BUCKET", "TEST_ORG", "data,foo=bar val=3.14") self.assertEqual(0.3, self.received_timeout_total) self.received_timeout_total = None @mock.patch("urllib3._request_methods.RequestMethods.request", side_effect=mock_urllib3_timeout_request) def test_request_arg_timeout(self, mock_request): - conf = Configuration() - conf.host = "http://localhost:8181" - conf.timeout = 300 - local_client = ApiClient(conf) - service = WriteService(local_client) + host = "http://localhost:8181" + timeout = 300 + client = InfluxDBClient3( + host=host, + token='my-token', + database='my-bucket', + org='my-org', + write_timeout=timeout + ) + with pytest.raises(ConnectTimeoutError): - service.post_write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14", - _request_timeout=100, _preload_content=False) + client._write_api.write("TEST_ORG", "TEST_BUCKET", "data,foo=bar val=3.14", + _request_timeout=100) self.assertEqual(0.1, self.received_timeout_total) self.received_timeout_total = None def test_should_gzip(self): + client = InfluxDBClient3( + host='http://localhost:8181', + token='my-token', + database='my-bucket', + org='my-org' + ) + write_api = client._write_api + # Test when gzip is disabled - self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=1)) - self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=10000)) - self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=None)) + self.assertFalse(write_api._should_gzip("test", enable_gzip=False, gzip_threshold=1)) + self.assertFalse(write_api._should_gzip("test", enable_gzip=False, gzip_threshold=10000)) + self.assertFalse(write_api._should_gzip("test", enable_gzip=False, gzip_threshold=None)) # Test when enable_gzip is True - self.assertTrue(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=None)) - self.assertTrue(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=1)) - self.assertFalse(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=100000)) + self.assertTrue(write_api._should_gzip("test", enable_gzip=True, gzip_threshold=None)) + self.assertTrue(write_api._should_gzip("test", enable_gzip=True, gzip_threshold=1)) + self.assertFalse(write_api._should_gzip("test", enable_gzip=True, gzip_threshold=100000)) # Test payload smaller than threshold - self.assertFalse(ApiClient.should_gzip("test", enable_gzip=True, gzip_threshold=10000)) + self.assertFalse(write_api._should_gzip("test", enable_gzip=True, gzip_threshold=10000)) # Test payload larger than threshold large_payload = "x" * 10000 - self.assertTrue(ApiClient.should_gzip(large_payload, enable_gzip=True, gzip_threshold=1000)) + self.assertTrue(write_api._should_gzip(large_payload, enable_gzip=True, gzip_threshold=1000)) # Test exact threshold match and less than threshold payload = "x" * 1000 - self.assertTrue(ApiClient.should_gzip(payload, enable_gzip=True, gzip_threshold=1000)) + self.assertTrue(write_api._should_gzip(payload, enable_gzip=True, gzip_threshold=1000)) - def test_post_write_with_http_info_async_translates_exceptions(self): + def test_post_write_async_translates_exceptions(self): cases = [ ( "v2 on v3-only backend", @@ -386,18 +360,25 @@ def test_post_write_with_http_info_async_translates_exceptions(self): ] for name, use_v2_api, http_resp, expected_type, expected_message in cases: with self.subTest(name): - conf = Configuration() - local_client = ApiClient(conf) - local_client.call_api = mock.Mock() + client = InfluxDBClient3( + token='my-token', + database='my-bucket', + org='my-org' + ) + write_api = client._write_api + write_api.call_api = mock.Mock() thread = mock.Mock() thread.get.side_effect = ApiException(http_resp=http_resp) - local_client.call_api.return_value = thread - service = WriteService(local_client) - result = service.post_write_with_http_info( - "TEST_ORG", - "TEST_BUCKET", - "home,room=Sunroom temp=96 1735545600", + write_api.call_api.return_value = thread + result = write_api._post_write( + org="TEST_ORG", + bucket="TEST_BUCKET", + body="home,room=Sunroom temp=96 1735545600", + precision='s', + accept_partial=False, + no_sync=False, async_req=True, + _async_req=True, use_v2_api=use_v2_api, ) with self.assertRaises(expected_type) as err: @@ -409,17 +390,22 @@ def test_post_write_with_http_info_async_translates_exceptions(self): self.assertEqual(1, len(err.exception.line_errors)) def test_post_write_async_translates_v3_unsupported(self): - conf = Configuration() - local_client = ApiClient(conf) - local_client.call_api = mock.AsyncMock( + client = InfluxDBClient3( + token='my-token', + database='my-bucket', + org='my-org', + ) + + write_api = client._write_api + + write_api.call_api = mock.AsyncMock( side_effect=ApiException( http_resp=response.HTTPResponse(status=405, reason="Method Not Allowed", body=b"") ) ) - service = WriteService(local_client) async def run(): - await service.post_write_async( + await write_api.post_write_async( "TEST_ORG", "TEST_BUCKET", "home,room=Sunroom temp=96 1735545600",