Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cognite/client/_api/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cognite.client._api.data_modeling.instances import InstancesAPI
from cognite.client._api.data_modeling.spaces import SpacesAPI
from cognite.client._api.data_modeling.statistics import StatisticsAPI
from cognite.client._api.data_modeling.streams import StreamsAPI
from cognite.client._api.data_modeling.views import ViewsAPI
from cognite.client._api_client import APIClient

Expand All @@ -27,6 +28,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
self.instances = InstancesAPI(config, api_version, cognite_client)
self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client)
self.statistics = StatisticsAPI(config, api_version, cognite_client)
self.streams = StreamsAPI(config, api_version, cognite_client)

def _get_semaphore(
self, operation: Literal["read", "write", "delete", "search", "read_schema", "write_schema"]
Expand Down
94 changes: 94 additions & 0 deletions cognite/client/_api/data_modeling/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, overload

from cognite.client._api_client import APIClient
from cognite.client.data_classes.data_modeling.streams import (
Stream,
StreamList,
StreamWrite,
)
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils._url import interpolate_and_url_encode
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig


class StreamsAPI(APIClient):
"""Streams API (``/streams``)."""

_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
super().__init__(config, api_version, cognite_client)

@overload
async def create(self, items: StreamWrite) -> Stream: ...

@overload
async def create(self, items: Sequence[StreamWrite]) -> StreamList: ...

async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList:
"""`Create streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_.

Args:
items (StreamWrite | Sequence[StreamWrite]): One or more streams to create.

Returns:
Stream | StreamList: The created stream or streams.
"""
return await self._create_multiple(
items=items,
list_cls=StreamList,
resource_cls=Stream,
input_resource_cls=StreamWrite,
)

async def list(self) -> StreamList:
"""`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_ in the project.

Note:
There is no paging limit parameter: the endpoint returns all streams in the project
(projects are expected to have few streams).

Returns:
StreamList: The streams in the project.
"""
res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read"))
return StreamList._load(res.json()["items"])

async def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream:
"""`Retrieve a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_.

Args:
stream_external_id (str): Stream external id.
include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing
statistics can be expensive.

Returns:
Stream: The stream metadata (and optionally statistics).
"""
path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id)
params: dict[str, bool] | None = None
if include_statistics is not None:
params = {"includeStatistics": include_statistics}
res = await self._get(url_path=path, params=params, semaphore=self._get_semaphore("read"))
return Stream._load(res.json())

async def delete(self, external_id: str | SequenceNotStr[str]) -> None:
"""`Delete streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_.

Deletion is a soft delete that retains capacity for an extended period; prefer deleting only
when necessary.

Args:
external_id (str | SequenceNotStr[str]): External ID or list of external IDs.
"""
await self._delete_multiple(
identifiers=IdentifierSequence.load(external_ids=external_id),
wrap_ids=True,
)
14 changes: 12 additions & 2 deletions cognite/client/_sync_api/data_modeling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
"""
===============================================================================
c76b2b9351d2a5eee6a710fa9893bfa4
584030bc5e2a4b8168f54c101f7f521d
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""

from __future__ import annotations

from typing import TYPE_CHECKING
import asyncio
from collections.abc import Coroutine, Iterator
from typing import TYPE_CHECKING, Any, Literal, overload

from cognite.client import AsyncCogniteClient
from cognite.client._api_client import APIClient
from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI
from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI
from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI
from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI
from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI
from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI
from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI
from cognite.client._sync_api.data_modeling.views import SyncViewsAPI
from cognite.client._sync_api_client import SyncAPIClient
from cognite.client.utils._async_helpers import SyncIterator, run_sync
from cognite.client.utils._concurrency import _get_event_loop_executor

if TYPE_CHECKING:
import pandas as pd

from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig


class SyncDataModelingAPI(SyncAPIClient):
Expand All @@ -35,3 +44,4 @@ def __init__(self, async_client: AsyncCogniteClient) -> None:
self.instances = SyncInstancesAPI(async_client)
self.graphql = SyncDataModelingGraphQLAPI(async_client)
self.statistics = SyncStatisticsAPI(async_client)
self.streams = SyncStreamsAPI(async_client)
88 changes: 88 additions & 0 deletions cognite/client/_sync_api/data_modeling/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""
===============================================================================
08522cd2e3d995076b7bf12a6390e97e
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""

from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, overload

from cognite.client import AsyncCogniteClient
from cognite.client._sync_api_client import SyncAPIClient
from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite
from cognite.client.utils._async_helpers import run_sync
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient


class SyncStreamsAPI(SyncAPIClient):
"""Auto-generated, do not modify manually."""

def __init__(self, async_client: AsyncCogniteClient) -> None:
self.__async_client = async_client

@overload
def create(self, items: StreamWrite) -> Stream: ...

@overload
def create(self, items: Sequence[StreamWrite]) -> StreamList: ...

def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList:
"""
`Create streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_.

Args:
items (StreamWrite | Sequence[StreamWrite]): One or more streams to create.

Returns:
Stream | StreamList: The created stream or streams.
"""
return run_sync(self.__async_client.data_modeling.streams.create(items=items))

def list(self) -> StreamList:
"""
`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_ in the project.

Note:
There is no paging limit parameter: the endpoint returns all streams in the project
(projects are expected to have few streams).

Returns:
StreamList: The streams in the project.
"""
return run_sync(self.__async_client.data_modeling.streams.list())

def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream:
"""
`Retrieve a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_.

Args:
stream_external_id (str): Stream external id.
include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing
statistics can be expensive.

Returns:
Stream: The stream metadata (and optionally statistics).
"""
return run_sync(
self.__async_client.data_modeling.streams.retrieve(
stream_external_id=stream_external_id, include_statistics=include_statistics
)
)

def delete(self, external_id: str | SequenceNotStr[str]) -> None:
"""
`Delete streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_.

Deletion is a soft delete that retains capacity for an extended period; prefer deleting only
when necessary.

Args:
external_id (str | SequenceNotStr[str]): External ID or list of external IDs.
"""
return run_sync(self.__async_client.data_modeling.streams.delete(external_id=external_id))
20 changes: 20 additions & 0 deletions cognite/client/data_classes/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@
UnionAll,
)
from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList
from cognite.client.data_classes.data_modeling.streams import (
Stream,
StreamLifecycleSettings,
StreamLimit,
StreamLimitSettings,
StreamList,
StreamSettings,
StreamTemplate,
StreamTemplateWriteSettings,
StreamWrite,
)
from cognite.client.data_classes.data_modeling.sync import SubscriptionContext
from cognite.client.data_classes.data_modeling.views import (
ConnectionDefinition,
Expand Down Expand Up @@ -233,6 +244,15 @@
"SpaceApply",
"SpaceApplyList",
"SpaceList",
"Stream",
"StreamLifecycleSettings",
"StreamLimit",
"StreamLimitSettings",
"StreamList",
"StreamSettings",
"StreamTemplate",
"StreamTemplateWriteSettings",
"StreamWrite",
"SubscriptionContext",
"Text",
"TimeSeriesReference",
Expand Down
Loading
Loading