Skip to content

Reference

GlassFlow Python Client to interact with GlassFlow API

GlassFlowClient

GlassFlowClient(personal_access_token: str = None, organization_id: str = None)

Bases: APIClient

GlassFlow Client to interact with GlassFlow API and manage pipelines and other resources

Attributes:

Name Type Description
client

requests.Session object to make HTTP requests to GlassFlow API

glassflow_config

GlassFlowConfig object to store configuration

organization_id

Organization ID of the user. If not provided, the default organization will be used

Parameters:

Name Type Description Default
personal_access_token str

GlassFlow Personal Access Token

None
organization_id str

Organization ID of the user. If not provided, the default organization will be used

None
Source code in src/glassflow/client.py
def __init__(
    self, personal_access_token: str = None, organization_id: str = None
) -> None:
    """Create a new GlassFlowClient object

    Args:
        personal_access_token: GlassFlow Personal Access Token
        organization_id: Organization ID of the user. If not provided,
            the default organization will be used
    """
    super().__init__()
    self.personal_access_token = personal_access_token
    self.organization_id = organization_id

get_pipeline

get_pipeline(pipeline_id: str) -> Pipeline

Gets a Pipeline object from the GlassFlow API

Parameters:

Name Type Description Default
pipeline_id str

UUID of the pipeline

required

Returns:

Name Type Description
Pipeline Pipeline

Pipeline object from the GlassFlow API

Raises:

Type Description
PipelineNotFoundError

Pipeline does not exist

UnauthorizedError

User does not have permission to perform the requested operation

ClientError

GlassFlow Client Error

Source code in src/glassflow/client.py
def get_pipeline(self, pipeline_id: str) -> Pipeline:
    """Gets a Pipeline object from the GlassFlow API

    Args:
        pipeline_id: UUID of the pipeline

    Returns:
        Pipeline: Pipeline object from the GlassFlow API

    Raises:
        PipelineNotFoundError: Pipeline does not exist
        UnauthorizedError: User does not have permission to perform the
            requested operation
        ClientError: GlassFlow Client Error
    """
    return Pipeline(
        personal_access_token=self.personal_access_token, id=pipeline_id
    ).fetch()

create_pipeline

create_pipeline(name: str, space_id: str, transformation_file: str = None, requirements: str = None, source_kind: str = None, source_config: dict = None, sink_kind: str = None, sink_config: dict = None, env_vars: list[dict[str, str]] = None, state: PipelineState = 'running', metadata: dict = None) -> Pipeline

Creates a new GlassFlow pipeline

Parameters:

Name Type Description Default
name str

Name of the pipeline

required
space_id str

ID of the GlassFlow Space you want to create the pipeline in

required
transformation_file str

Path to file with transformation function of the pipeline.

None
requirements str

Requirements.txt of the pipeline

None
source_kind str

Kind of source for the pipeline. If no source is provided, the default source will be SDK

None
source_config dict

Configuration of the pipeline's source

None
sink_kind str

Kind of sink for the pipeline. If no sink is provided, the default sink will be SDK

None
sink_config dict

Configuration of the pipeline's sink

None
env_vars list[dict[str, str]]

Environment variables to pass to the pipeline

None
state PipelineState

State of the pipeline after creation. It can be either "running" or "paused"

'running'
metadata dict

Metadata of the pipeline

None

Returns:

Name Type Description
Pipeline Pipeline

New pipeline

Raises:

Type Description
UnauthorizedError

User does not have permission to perform the requested operation

Source code in src/glassflow/client.py
def create_pipeline(
    self,
    name: str,
    space_id: str,
    transformation_file: str = None,
    requirements: str = None,
    source_kind: str = None,
    source_config: dict = None,
    sink_kind: str = None,
    sink_config: dict = None,
    env_vars: list[dict[str, str]] = None,
    state: PipelineState = "running",
    metadata: dict = None,
) -> Pipeline:
    """Creates a new GlassFlow pipeline

    Args:
        name: Name of the pipeline
        space_id: ID of the GlassFlow Space you want to create the pipeline in
        transformation_file: Path to file with transformation function of
            the pipeline.
        requirements: Requirements.txt of the pipeline
        source_kind: Kind of source for the pipeline. If no source is
            provided, the default source will be SDK
        source_config: Configuration of the pipeline's source
        sink_kind: Kind of sink for the pipeline. If no sink is provided,
            the default sink will be SDK
        sink_config: Configuration of the pipeline's sink
        env_vars: Environment variables to pass to the pipeline
        state: State of the pipeline after creation.
            It can be either "running" or "paused"
        metadata: Metadata of the pipeline

    Returns:
        Pipeline: New pipeline

    Raises:
        UnauthorizedError: User does not have permission to perform
            the requested operation
    """
    return Pipeline(
        name=name,
        space_id=space_id,
        transformation_file=transformation_file,
        requirements=requirements,
        source_kind=source_kind,
        source_config=source_config,
        sink_kind=sink_kind,
        sink_config=sink_config,
        env_vars=env_vars,
        state=state,
        metadata=metadata,
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
    ).create()

list_pipelines

list_pipelines(space_ids: list[str] | None = None) -> ListPipelinesResponse

Lists all pipelines in the GlassFlow API

Parameters:

Name Type Description Default
space_ids list[str] | None

List of Space IDs of the pipelines to list. If not specified, all the pipelines will be listed.

None

Returns:

Name Type Description
ListPipelinesResponse ListPipelinesResponse

Response object with the pipelines listed

Raises:

Type Description
UnauthorizedError

User does not have permission to perform the requested operation

Source code in src/glassflow/client.py
def list_pipelines(
    self, space_ids: list[str] | None = None
) -> operations.ListPipelinesResponse:
    """
    Lists all pipelines in the GlassFlow API

    Args:
        space_ids: List of Space IDs of the pipelines to list.
            If not specified, all the pipelines will be listed.

    Returns:
        ListPipelinesResponse: Response object with the pipelines listed

    Raises:
        UnauthorizedError: User does not have permission to perform the
            requested operation
    """
    request = operations.ListPipelinesRequest(
        space_id=space_ids,
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
    )
    try:
        res = self._request(
            method="GET",
            endpoint="/pipelines",
            request=request,
        )
        res_json = res.raw_response.json()
    except errors.ClientError as e:
        if e.status_code == 401:
            raise errors.UnauthorizedError(e.raw_response) from e
        else:
            raise e

    return operations.ListPipelinesResponse(
        content_type=res.content_type,
        status_code=res.status_code,
        raw_response=res.raw_response,
        total_amount=res_json["total_amount"],
        pipelines=res_json["pipelines"],
    )

list_spaces

list_spaces() -> ListSpacesResponse

Lists all GlassFlow spaces in the GlassFlow API

Returns:

Name Type Description
ListSpacesResponse ListSpacesResponse

Response object with the spaces listed

Raises:

Type Description
UnauthorizedError

User does not have permission to perform the requested operation

Source code in src/glassflow/client.py
def list_spaces(self) -> operations.ListSpacesResponse:
    """
    Lists all GlassFlow spaces in the GlassFlow API

    Returns:
        ListSpacesResponse: Response object with the spaces listed

    Raises:
        UnauthorizedError: User does not have permission to perform the
            requested operation
    """
    request = operations.ListSpacesRequest(
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
    )
    try:
        res = self._request(
            method="GET",
            endpoint="/spaces",
            request=request,
        )
        res_json = res.raw_response.json()
    except errors.ClientError as e:
        if e.status_code == 401:
            raise errors.UnauthorizedError(e.raw_response) from e
        else:
            raise e

    return operations.ListSpacesResponse(
        content_type=res.content_type,
        status_code=res.status_code,
        raw_response=res.raw_response,
        total_amount=res_json["total_amount"],
        spaces=res_json["spaces"],
    )

create_space

create_space(name: str) -> Space

Creates a new Space

Parameters:

Name Type Description Default
name str

Name of the Space

required

Returns:

Name Type Description
Space Space

New space

Raises:

Type Description
UnauthorizedError

User does not have permission to perform the requested operation

Source code in src/glassflow/client.py
def create_space(
    self,
    name: str,
) -> Space:
    """Creates a new Space

    Args:
        name: Name of the Space

    Returns:
        Space: New space

    Raises:
        UnauthorizedError: User does not have permission to perform
            the requested operation
    """
    return Space(
        name=name,
        personal_access_token=self.personal_access_token,
        organization_id=self.organization_id,
    ).create()

Pipeline

Pipeline(personal_access_token: str, name: str | None = None, space_id: str | None = None, id: str | None = None, source_kind: str | None = None, source_config: dict | None = None, sink_kind: str | None = None, sink_config: dict | None = None, requirements: str | None = None, transformation_file: str | None = None, env_vars: list[dict[str, str]] | None = None, state: PipelineState = 'running', organization_id: str | None = None, metadata: dict | None = None, created_at: str | None = None)

Bases: APIClient

Parameters:

Name Type Description Default
personal_access_token str

The personal access token to authenticate against GlassFlow

required
id str | None

Pipeline ID

None
name str | None

Name of the pipeline

None
space_id str | None

ID of the GlassFlow Space you want to create the pipeline in

None
transformation_file str | None

Path to file with transformation function of the pipeline.

None
requirements str | None

Requirements.txt of the pipeline

None
source_kind str | None

Kind of source for the pipeline. If no source is provided, the default source will be SDK

None
source_config dict | None

Configuration of the pipeline's source

None
sink_kind str | None

Kind of sink for the pipeline. If no sink is provided, the default sink will be SDK

None
sink_config dict | None

Configuration of the pipeline's sink

None
env_vars list[dict[str, str]] | None

Environment variables to pass to the pipeline

None
state PipelineState

State of the pipeline after creation. It can be either "running" or "paused"

'running'
metadata dict | None

Metadata of the pipeline

None
created_at str | None

Timestamp when the pipeline was created

None

Raises:

Type Description
FailNotFoundError

If the transformation file is provided and does not exist

Source code in src/glassflow/pipeline.py
def __init__(
    self,
    personal_access_token: str,
    name: str | None = None,
    space_id: str | None = None,
    id: str | None = None,
    source_kind: str | None = None,
    source_config: dict | None = None,
    sink_kind: str | None = None,
    sink_config: dict | None = None,
    requirements: str | None = None,
    transformation_file: str | None = None,
    env_vars: list[dict[str, str]] | None = None,
    state: api.PipelineState = "running",
    organization_id: str | None = None,
    metadata: dict | None = None,
    created_at: str | None = None,
):
    """Creates a new GlassFlow pipeline object

    Args:
        personal_access_token: The personal access token to authenticate
            against GlassFlow
        id: Pipeline ID
        name: Name of the pipeline
        space_id: ID of the GlassFlow Space you want to create the pipeline in
        transformation_file: Path to file with transformation function of
            the pipeline.
        requirements: Requirements.txt of the pipeline
        source_kind: Kind of source for the pipeline. If no source is
            provided, the default source will be SDK
        source_config: Configuration of the pipeline's source
        sink_kind: Kind of sink for the pipeline. If no sink is provided,
            the default sink will be SDK
        sink_config: Configuration of the pipeline's sink
        env_vars: Environment variables to pass to the pipeline
        state: State of the pipeline after creation.
            It can be either "running" or "paused"
        metadata: Metadata of the pipeline
        created_at: Timestamp when the pipeline was created

    Raises:
        FailNotFoundError: If the transformation file is provided and
            does not exist
    """
    super().__init__()
    self.id = id
    self.name = name
    self.space_id = space_id
    self.personal_access_token = personal_access_token
    self.source_kind = source_kind
    self.source_config = source_config
    self.sink_kind = sink_kind
    self.sink_config = sink_config
    self.requirements = requirements
    self.transformation_code = None
    self.transformation_file = transformation_file
    self.env_vars = env_vars
    self.state = state
    self.organization_id = organization_id
    self.metadata = metadata if metadata is not None else {}
    self.created_at = created_at
    self.access_tokens = []

    if self.transformation_file is not None:
        self._read_transformation_file()

    if source_kind is not None and self.source_config is not None:
        self.source_connector = dict(
            kind=self.source_kind,
            config=self.source_config,
        )
    elif self.source_kind is None and self.source_config is None:
        self.source_connector = None
    else:
        raise ValueError("Both source_kind and source_config must be provided")

    if self.sink_kind is not None and self.sink_config is not None:
        self.sink_connector = dict(
            kind=sink_kind,
            config=sink_config,
        )
    elif self.sink_kind is None and self.sink_config is None:
        self.sink_connector = None
    else:
        raise ValueError("Both sink_kind and sink_config must be provided")

fetch

fetch() -> Pipeline

Fetches pipeline information from the GlassFlow API

Returns:

Name Type Description
self Pipeline

Pipeline object

Raises:

Type Description
ValueError

If ID is not provided in the constructor

PipelineNotFoundError

If ID provided does not match any existing pipeline in GlassFlow

UnauthorizedError

If the Personal Access Token is not provider or is invalid

Source code in src/glassflow/pipeline.py
def fetch(self) -> Pipeline:
    """
    Fetches pipeline information from the GlassFlow API

    Returns:
        self: Pipeline object

    Raises:
        ValueError: If ID is not provided in the constructor
        PipelineNotFoundError: If ID provided does not match any
            existing pipeline in GlassFlow
        UnauthorizedError: If the Personal Access Token is not
            provider or is invalid
    """
    if self.id is None:
        raise ValueError(
            "Pipeline id must be provided in order to fetch it's details"
        )

    request = operations.GetPipelineRequest(
        pipeline_id=self.id,
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
    )

    base_res = self._request(
        method="GET",
        endpoint=f"/pipelines/{self.id}",
        request=request,
    )
    self._fill_pipeline_details(base_res.raw_response.json())

    # Fetch Pipeline Access Tokens
    self._list_access_tokens()

    # Fetch function source
    self._get_function_artifact()

    return self

create

create() -> Pipeline

Creates a new GlassFlow pipeline

Returns:

Name Type Description
self Pipeline

Pipeline object

Raises:

Type Description
ValueError

If name is not provided in the constructor

ValueError

If space_id is not provided in the constructor

ValueError

If transformation_file is not provided in the constructor

Source code in src/glassflow/pipeline.py
def create(self) -> Pipeline:
    """
    Creates a new GlassFlow pipeline

    Returns:
        self: Pipeline object

    Raises:
        ValueError: If name is not provided in the constructor
        ValueError: If space_id is not provided in the constructor
        ValueError: If transformation_file is not provided
            in the constructor
    """
    create_pipeline = api.CreatePipeline(
        name=self.name,
        space_id=self.space_id,
        transformation_function=self.transformation_code,
        requirements_txt=self.requirements,
        source_connector=self.source_connector,
        sink_connector=self.sink_connector,
        environments=self.env_vars,
        state=self.state,
        metadata=self.metadata,
    )
    if self.name is None:
        raise ValueError("Name must be provided in order to create the pipeline")
    if self.space_id is None:
        raise ValueError("Argument space_id must be provided in the constructor")
    if self.transformation_file is None:
        raise ValueError(
            "Argument transformation_file must be provided in the constructor"
        )
    else:
        self._read_transformation_file()

    request = operations.CreatePipelineRequest(
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
        **create_pipeline.__dict__,
    )

    base_res = self._request(method="POST", endpoint="/pipelines", request=request)
    res = operations.CreatePipelineResponse(
        status_code=base_res.status_code,
        content_type=base_res.content_type,
        raw_response=base_res.raw_response,
        **base_res.raw_response.json(),
    )

    self.id = res.id
    self.created_at = res.created_at
    self.access_tokens.append({"name": "default", "token": res.access_token})
    return self

update

update(name: str | None = None, state: PipelineState | None = None, transformation_file: str | None = None, requirements: str | None = None, metadata: dict | None = None, source_kind: str | None = None, source_config: dict | None = None, sink_kind: str | None = None, sink_config: dict | None = None, env_vars: list[dict[str, str]] | None = None) -> Pipeline

Updates a GlassFlow pipeline

Args:

name: Name of the pipeline
state: State of the pipeline after creation.
    It can be either "running" or "paused"
transformation_file: Path to file with transformation function of
    the pipeline.
requirements: Requirements.txt of the pipeline
source_kind: Kind of source for the pipeline. If no source is
    provided, the default source will be SDK
source_config: Configuration of the pipeline's source
sink_kind: Kind of sink for the pipeline. If no sink is provided,
    the default sink will be SDK
sink_config: Configuration of the pipeline's sink
env_vars: Environment variables to pass to the pipeline
metadata: Metadata of the pipeline

Returns:

Name Type Description
self Pipeline

Updated pipeline

Source code in src/glassflow/pipeline.py
def update(
    self,
    name: str | None = None,
    state: api.PipelineState | None = None,
    transformation_file: str | None = None,
    requirements: str | None = None,
    metadata: dict | None = None,
    source_kind: str | None = None,
    source_config: dict | None = None,
    sink_kind: str | None = None,
    sink_config: dict | None = None,
    env_vars: list[dict[str, str]] | None = None,
) -> Pipeline:
    """
    Updates a GlassFlow pipeline

    Args:

        name: Name of the pipeline
        state: State of the pipeline after creation.
            It can be either "running" or "paused"
        transformation_file: Path to file with transformation function of
            the pipeline.
        requirements: Requirements.txt of the pipeline
        source_kind: Kind of source for the pipeline. If no source is
            provided, the default source will be SDK
        source_config: Configuration of the pipeline's source
        sink_kind: Kind of sink for the pipeline. If no sink is provided,
            the default sink will be SDK
        sink_config: Configuration of the pipeline's sink
        env_vars: Environment variables to pass to the pipeline
        metadata: Metadata of the pipeline

    Returns:
        self: Updated pipeline

    """

    # Fetch current pipeline data
    self.fetch()

    if transformation_file is not None or requirements is not None:
        if transformation_file is not None:
            with open(transformation_file) as f:
                file = f.read()
        else:
            file = self.transformation_code

        if requirements is None:
            requirements = self.requirements

        self._upload_function_artifact(file, requirements)
        self.requirements = requirements
        self.transformation_code = file

    if source_kind is not None:
        source_connector = dict(
            kind=source_kind,
            config=source_config,
        )
    else:
        source_connector = self.source_connector

    if sink_kind is not None:
        sink_connector = dict(
            kind=sink_kind,
            config=sink_config,
        )
    else:
        sink_connector = self.sink_connector

    if env_vars is not None:
        self._update_function(env_vars)

    request = operations.UpdatePipelineRequest(
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
        name=name if name is not None else self.name,
        state=state if state is not None else self.state,
        metadata=metadata if metadata is not None else self.metadata,
        source_connector=source_connector,
        sink_connector=sink_connector,
    )

    base_res = self._request(
        method="PATCH", endpoint=f"/pipelines/{self.id}", request=request
    )
    self._fill_pipeline_details(base_res.raw_response.json())
    return self

delete

delete() -> None

Deletes a GlassFlow pipeline

Returns:

Raises:

Type Description
ValueError

If ID is not provided in the constructor

PipelineNotFoundError

If ID provided does not match any existing pipeline in GlassFlow

UnauthorizedError

If the Personal Access Token is not provided or is invalid

Source code in src/glassflow/pipeline.py
def delete(self) -> None:
    """
    Deletes a GlassFlow pipeline

    Returns:

    Raises:
        ValueError: If ID is not provided in the constructor
        PipelineNotFoundError: If ID provided does not match any
            existing pipeline in GlassFlow
        UnauthorizedError: If the Personal Access Token is not
            provided or is invalid
    """
    if self.id is None:
        raise ValueError("Pipeline id must be provided")

    request = operations.DeletePipelineRequest(
        pipeline_id=self.id,
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
    )
    self._request(
        method="DELETE",
        endpoint=f"/pipelines/{self.id}",
        request=request,
    )

get_logs

get_logs(page_size: int = 50, page_token: str | None = None, severity_code: SeverityCodeInput = api.SeverityCodeInput.integer_100, start_time: str | None = None, end_time: str | None = None) -> GetFunctionLogsResponse

Get the pipeline's logs

Parameters:

Name Type Description Default
page_size int

Pagination size

50
page_token str | None

Page token filter (use for pagination)

None
severity_code SeverityCodeInput

Severity code filter

integer_100
start_time str | None

Start time filter

None
end_time str | None

End time filter

None

Returns:

Name Type Description
PipelineFunctionsGetLogsResponse GetFunctionLogsResponse

Response with the logs

Source code in src/glassflow/pipeline.py
def get_logs(
    self,
    page_size: int = 50,
    page_token: str | None = None,
    severity_code: api.SeverityCodeInput = api.SeverityCodeInput.integer_100,
    start_time: str | None = None,
    end_time: str | None = None,
) -> operations.GetFunctionLogsResponse:
    """
    Get the pipeline's logs

    Args:
        page_size: Pagination size
        page_token: Page token filter (use for pagination)
        severity_code: Severity code filter
        start_time: Start time filter
        end_time: End time filter

    Returns:
        PipelineFunctionsGetLogsResponse: Response with the logs
    """
    request = operations.GetFunctionLogsRequest(
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
        pipeline_id=self.id,
        page_size=page_size,
        page_token=page_token,
        severity_code=severity_code,
        start_time=start_time,
        end_time=end_time,
    )
    base_res = self._request(
        method="GET",
        endpoint=f"/pipelines/{self.id}/functions/main/logs",
        request=request,
    )
    base_res_json = base_res.raw_response.json()
    logs = [
        api.FunctionLogEntry.from_dict(entry) for entry in base_res_json["logs"]
    ]
    return operations.GetFunctionLogsResponse(
        status_code=base_res.status_code,
        content_type=base_res.content_type,
        raw_response=base_res.raw_response,
        logs=logs,
        next=base_res_json["next"],
    )

get_source

get_source(pipeline_access_token_name: str | None = None) -> PipelineDataSource

Get source client to publish data to the pipeline

Parameters:

Name Type Description Default
pipeline_access_token_name str | None

Name of the pipeline access token to use. If not specified, the default token will be used

None

Returns:

Name Type Description
PipelineDataSource PipelineDataSource

Source client to publish data to the pipeline

Raises:

Type Description
ValueError

If pipeline id is not provided in the constructor

Source code in src/glassflow/pipeline.py
def get_source(
    self, pipeline_access_token_name: str | None = None
) -> PipelineDataSource:
    """
    Get source client to publish data to the pipeline

    Args:
        pipeline_access_token_name (str | None): Name of the pipeline
            access token to use. If not specified, the default token
            will be used

    Returns:
        PipelineDataSource: Source client to publish data to the pipeline

    Raises:
        ValueError: If pipeline id is not provided in the constructor
    """
    return self._get_data_client("source", pipeline_access_token_name)

get_sink

get_sink(pipeline_access_token_name: str | None = None) -> PipelineDataSink

Get sink client to consume data from the pipeline

Parameters:

Name Type Description Default
pipeline_access_token_name str | None

Name of the pipeline access token to use. If not specified, the default token will be used

None

Returns:

Name Type Description
PipelineDataSink PipelineDataSink

Sink client to consume data from the pipeline

Raises:

Type Description
ValueError

If pipeline id is not provided in the constructor

Source code in src/glassflow/pipeline.py
def get_sink(
    self, pipeline_access_token_name: str | None = None
) -> PipelineDataSink:
    """
    Get sink client to consume data from the pipeline

    Args:
        pipeline_access_token_name (str | None): Name of the pipeline
            access token to use. If not specified, the default token
            will be used

    Returns:
        PipelineDataSink: Sink client to consume data from the pipeline

    Raises:
        ValueError: If pipeline id is not provided in the constructor
    """
    return self._get_data_client("sink", pipeline_access_token_name)

test

test(data: dict) -> TestFunctionResponse

Test a pipeline's function with a sample input JSON

Parameters:

Name Type Description Default
data dict

Input JSON

required

Returns:

Name Type Description
TestFunctionResponse TestFunctionResponse

Test function response

Source code in src/glassflow/pipeline.py
def test(self, data: dict) -> operations.TestFunctionResponse:
    """
    Test a pipeline's function with a sample input JSON

    Args:
        data: Input JSON

    Returns:
        TestFunctionResponse: Test function response
    """
    request = operations.TestFunctionRequest(
        pipeline_id=self.id,
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
        request_body=data,
    )

    base_res = self._request(
        method="POST",
        endpoint=f"/pipelines/{self.id}/functions/main/test",
        request=request,
    )
    base_res_json = base_res.raw_response.json()
    base_res_json["event_context"] = api.EventContext(
        **base_res_json["event_context"]
    )
    return operations.TestFunctionResponse(
        status_code=base_res.status_code,
        content_type=base_res.content_type,
        raw_response=base_res.raw_response,
        **base_res_json,
    )

PipelineDataClient

PipelineDataClient(pipeline_id: str, pipeline_access_token: str)

Bases: APIClient

Base Client object to publish and consume events from the given pipeline.

Attributes:

Name Type Description
glassflow_config

GlassFlowConfig object to interact with GlassFlow API

pipeline_id

The pipeline id to interact with

pipeline_access_token

The access token to access the pipeline

Source code in src/glassflow/pipeline_data.py
def __init__(self, pipeline_id: str, pipeline_access_token: str):
    super().__init__()
    self.pipeline_id = pipeline_id
    self.pipeline_access_token = pipeline_access_token

validate_credentials

validate_credentials() -> None

Check if the pipeline credentials are valid and raise an error if not

Source code in src/glassflow/pipeline_data.py
def validate_credentials(self) -> None:
    """
    Check if the pipeline credentials are valid and raise an error if not
    """
    request = operations.StatusAccessTokenRequest(
        pipeline_id=self.pipeline_id,
        x_pipeline_access_token=self.pipeline_access_token,
    )
    self._request(
        method="GET",
        endpoint="/pipelines/{pipeline_id}/status/access_token",
        request=request,
    )

PipelineDataSource

PipelineDataSource(pipeline_id: str, pipeline_access_token: str)

Bases: PipelineDataClient

Source code in src/glassflow/pipeline_data.py
def __init__(self, pipeline_id: str, pipeline_access_token: str):
    super().__init__()
    self.pipeline_id = pipeline_id
    self.pipeline_access_token = pipeline_access_token

publish

publish(request_body: dict) -> PublishEventResponse

Push a new message into the pipeline

Parameters:

Name Type Description Default
request_body dict

The message to be published into the pipeline

required

Returns:

Name Type Description
PublishEventResponse PublishEventResponse

Response object containing the status code and the raw response

Raises:

Type Description
ClientError

If an error occurred while publishing the event

Source code in src/glassflow/pipeline_data.py
def publish(self, request_body: dict) -> operations.PublishEventResponse:
    """Push a new message into the pipeline

    Args:
        request_body: The message to be published into the pipeline

    Returns:
        PublishEventResponse: Response object containing the status
            code and the raw response

    Raises:
        ClientError: If an error occurred while publishing the event
    """
    request = operations.PublishEventRequest(
        pipeline_id=self.pipeline_id,
        x_pipeline_access_token=self.pipeline_access_token,
        request_body=request_body,
    )
    base_res = self._request(
        method="POST",
        endpoint="/pipelines/{pipeline_id}/topics/input/events",
        request=request,
    )

    return operations.PublishEventResponse(
        status_code=base_res.status_code,
        content_type=base_res.content_type,
        raw_response=base_res.raw_response,
    )

PipelineDataSink

PipelineDataSink(pipeline_id: str, pipeline_access_token: str)

Bases: PipelineDataClient

Source code in src/glassflow/pipeline_data.py
def __init__(self, pipeline_id: str, pipeline_access_token: str):
    super().__init__(pipeline_id, pipeline_access_token)

    # retry delay for consuming messages (in seconds)
    self._consume_retry_delay_minimum = 1
    self._consume_retry_delay_current = 1
    self._consume_retry_delay_max = 60

consume

consume() -> ConsumeEventResponse

Consume the last message from the pipeline

Returns:

Name Type Description
ConsumeEventResponse ConsumeEventResponse

Response object containing the status code and the raw response

Raises:

Type Description
ClientError

If an error occurred while consuming the event

Source code in src/glassflow/pipeline_data.py
def consume(self) -> operations.ConsumeEventResponse:
    """Consume the last message from the pipeline

    Returns:
        ConsumeEventResponse: Response object containing the status
            code and the raw response

    Raises:
        ClientError: If an error occurred while consuming the event

    """
    request = operations.ConsumeEventRequest(
        pipeline_id=self.pipeline_id,
        x_pipeline_access_token=self.pipeline_access_token,
    )

    self._respect_retry_delay()
    base_res = self._request(
        method="POST",
        endpoint="/pipelines/{pipeline_id}/topics/output/events/consume",
        request=request,
    )

    res = operations.ConsumeEventResponse(
        status_code=base_res.status_code,
        content_type=base_res.content_type,
        raw_response=base_res.raw_response,
    )

    self._update_retry_delay(base_res.status_code)
    if res.status_code == 200:
        if not utils.match_content_type(res.content_type, "application/json"):
            raise errors.UnknownContentTypeError(res.raw_response)

        self._consume_retry_delay_current = self._consume_retry_delay_minimum
        body = utils.unmarshal_json(
            res.raw_response.text, Optional[operations.ConsumeEventResponseBody]
        )
        res.body = body
    elif res.status_code == 204:
        # No messages to be consumed.
        # update the retry delay
        # Return an empty response body
        body = operations.ConsumeEventResponseBody("", "", {})
        res.body = body
    elif res.status_code == 429:
        # update the retry delay
        body = operations.ConsumeEventResponseBody("", "", {})
        res.body = body
    elif not utils.match_content_type(res.content_type, "application/json"):
        raise errors.UnknownContentTypeError(res.raw_response)

    return res

consume_failed

consume_failed() -> ConsumeFailedResponse

Consume the failed message from the pipeline

Returns:

Name Type Description
ConsumeFailedResponse ConsumeFailedResponse

Response object containing the status code and the raw response

Raises:

Type Description
ClientError

If an error occurred while consuming the event

Source code in src/glassflow/pipeline_data.py
def consume_failed(self) -> operations.ConsumeFailedResponse:
    """Consume the failed message from the pipeline

    Returns:
        ConsumeFailedResponse: Response object containing the status
            code and the raw response

    Raises:
        ClientError: If an error occurred while consuming the event

    """
    request = operations.ConsumeFailedRequest(
        pipeline_id=self.pipeline_id,
        x_pipeline_access_token=self.pipeline_access_token,
    )

    self._respect_retry_delay()
    base_res = self._request(
        method="POST",
        endpoint="/pipelines/{pipeline_id}/topics/failed/events/consume",
        request=request,
    )

    res = operations.ConsumeFailedResponse(
        status_code=base_res.status_code,
        content_type=base_res.content_type,
        raw_response=base_res.raw_response,
    )

    self._update_retry_delay(res.status_code)
    if res.status_code == 200:
        if not utils.match_content_type(res.content_type, "application/json"):
            raise errors.UnknownContentTypeError(res.raw_response)

        self._consume_retry_delay_current = self._consume_retry_delay_minimum
        body = utils.unmarshal_json(
            res.raw_response.text, Optional[operations.ConsumeFailedResponseBody]
        )
        res.body = body
    elif res.status_code == 204:
        # No messages to be consumed. Return an empty response body
        body = operations.ConsumeFailedResponseBody("", "", {})
        res.body = body
    elif res.status_code == 429:
        # update the retry delay
        body = operations.ConsumeEventResponseBody("", "", {})
        res.body = body
    elif not utils.match_content_type(res.content_type, "application/json"):
        raise errors.UnknownContentTypeError(res.raw_response)
    return res

Space

Space(personal_access_token: str, name: str | None = None, id: str | None = None, created_at: str | None = None, organization_id: str | None = None)

Bases: APIClient

Parameters:

Name Type Description Default
personal_access_token str

The personal access token to authenticate against GlassFlow

required
name str | None

Name of the space

None
id str | None

ID of the GlassFlow Space you want to create the pipeline in

None
created_at str | None

Timestamp when the space was created

None
Source code in src/glassflow/space.py
def __init__(
    self,
    personal_access_token: str,
    name: str | None = None,
    id: str | None = None,
    created_at: str | None = None,
    organization_id: str | None = None,
):
    """Creates a new GlassFlow pipeline object

    Args:
        personal_access_token: The personal access token to authenticate
            against GlassFlow
        name: Name of the space
        id: ID of the GlassFlow Space you want to create the pipeline in
        created_at: Timestamp when the space was created

    """
    super().__init__()
    self.name = name
    self.id = id
    self.created_at = created_at
    self.organization_id = organization_id
    self.personal_access_token = personal_access_token

create

create() -> Space

Creates a new GlassFlow space

Returns:

Name Type Description
self Space

Space object

Raises:

Type Description
ValueError

If name is not provided in the constructor

Source code in src/glassflow/space.py
def create(self) -> Space:
    """
    Creates a new GlassFlow space

    Returns:
        self: Space object

    Raises:
        ValueError: If name is not provided in the constructor

    """
    if self.name is None:
        raise ValueError("Name must be provided in order to create the space")
    create_space = api.CreateSpace(name=self.name)
    request = operations.CreateSpaceRequest(
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
        **create_space.__dict__,
    )
    base_res = self._request(method="POST", endpoint="/spaces", request=request)

    res = operations.CreateSpaceResponse(
        status_code=base_res.status_code,
        content_type=base_res.content_type,
        raw_response=base_res.raw_response,
        **base_res.raw_response.json(),
    )

    self.id = res.id
    self.created_at = res.created_at
    self.name = res.name
    return self

delete

delete() -> None

Deletes a GlassFlow space

Returns:

Raises:

Type Description
ValueError

If ID is not provided in the constructor

SpaceNotFoundError

If ID provided does not match any existing space in GlassFlow

UnauthorizedError

If the Personal Access Token is not provided or is invalid

Source code in src/glassflow/space.py
def delete(self) -> None:
    """
    Deletes a GlassFlow space

    Returns:

    Raises:
        ValueError: If ID is not provided in the constructor
        SpaceNotFoundError: If ID provided does not match any
            existing space in GlassFlow
        UnauthorizedError: If the Personal Access Token is not
            provided or is invalid
    """
    if self.id is None:
        raise ValueError("Space id must be provided in the constructor")

    request = operations.DeleteSpaceRequest(
        space_id=self.id,
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
    )
    self._request(
        method="DELETE",
        endpoint=f"/spaces/{self.id}",
        request=request,
    )

GlassFlowConfig dataclass

GlassFlowConfig(server_url: str = 'https://api.glassflow.dev/v1', sdk_version: str = version('glassflow'), user_agent: str = f'glassflow-python-sdk/{sdk_version}', glassflow_client: str = f'python-sdk/{sdk_version}')

Configuration object for GlassFlowClient

Attributes:

Name Type Description
server_url str

The base URL of the GlassFlow API

sdk_version str

The version of the GlassFlow Python SDK

user_agent str

The user agent to be used in the requests

ClientError

ClientError(detail: str, status_code: int, body: str, raw_response: Response)

Bases: Exception

Represents an error returned by the API.

Attributes:

Name Type Description
detail str

A message describing the error

status_code int

The status code of the response

body str

The response body

raw_response Response

The raw response object

Parameters:

Name Type Description Default
detail str

A message describing the error

required
status_code int

The status code of the response

required
body str

The response body

required
raw_response Response

The raw response object

required
Source code in src/glassflow/models/errors/clienterror.py
def __init__(
    self,
    detail: str,
    status_code: int,
    body: str,
    raw_response: requests_http.Response,
):
    """Create a new ClientError object

    Args:
        detail: A message describing the error
        status_code: The status code of the response
        body: The response body
        raw_response: The raw response object
    """
    self.detail = detail
    self.status_code = status_code
    self.body = body
    self.raw_response = raw_response

PipelineAccessTokenInvalidError

PipelineAccessTokenInvalidError(raw_response: Response)

Bases: ClientError

Error caused by invalid access token.

Source code in src/glassflow/models/errors/clienterror.py
def __init__(self, raw_response: requests_http.Response):
    super().__init__(
        detail="The Pipeline Access Token used is invalid",
        status_code=401,
        body=raw_response.text,
        raw_response=raw_response,
    )

PipelineNotFoundError

PipelineNotFoundError(pipeline_id: str, raw_response: Response)

Bases: ClientError

Error caused by a pipeline ID not found.

Source code in src/glassflow/models/errors/clienterror.py
def __init__(self, pipeline_id: str, raw_response: requests_http.Response):
    super().__init__(
        detail=f"Pipeline ID {pipeline_id} does not exist",
        status_code=404,
        body=raw_response.text,
        raw_response=raw_response,
    )

SpaceIsNotEmptyError

SpaceIsNotEmptyError(raw_response: Response)

Bases: ClientError

Error caused by trying to delete a space that is not empty.

Source code in src/glassflow/models/errors/clienterror.py
def __init__(self, raw_response: requests_http.Response):
    super().__init__(
        detail=raw_response.json()["msg"],
        status_code=409,
        body=raw_response.text,
        raw_response=raw_response,
    )

SpaceNotFoundError

SpaceNotFoundError(space_id: str, raw_response: Response)

Bases: ClientError

Error caused by a pipeline ID not found.

Source code in src/glassflow/models/errors/clienterror.py
def __init__(self, space_id: str, raw_response: requests_http.Response):
    super().__init__(
        detail=f"Space ID {space_id} does not exist",
        status_code=404,
        body=raw_response.text,
        raw_response=raw_response,
    )

UnauthorizedError

UnauthorizedError(raw_response: Response)

Bases: ClientError

Error caused by a user not authorized.

Source code in src/glassflow/models/errors/clienterror.py
def __init__(self, raw_response: requests_http.Response):
    super().__init__(
        detail="Unauthorized request, Personal Access Token used is invalid",
        status_code=401,
        body=raw_response.text,
        raw_response=raw_response,
    )

UnknownContentTypeError

UnknownContentTypeError(raw_response: Response)

Bases: ClientError

Error caused by an unknown content type response.

Source code in src/glassflow/models/errors/clienterror.py
def __init__(self, raw_response: requests_http.Response):
    content_type = raw_response.headers.get("Content-Type")
    super().__init__(
        detail=f"unknown content-type received: {content_type}",
        status_code=raw_response.status_code,
        body=raw_response.text,
        raw_response=raw_response,
    )

Error dataclass

Error(detail: str)

Bases: Exception

Bad request error response

Attributes:

Name Type Description
message

A message describing the error

StatusAccessTokenRequest dataclass

StatusAccessTokenRequest(pipeline_id: str, organization_id: Optional[str] = None, x_pipeline_access_token: str = None)

Bases: BasePipelineDataRequest

Request check the status of an access token

Attributes:

Name Type Description
pipeline_id

The id of the pipeline

organization_id

The id of the organization

x_pipeline_access_token

The access token of the pipeline

ConsumeEventRequest dataclass

ConsumeEventRequest(pipeline_id: str, organization_id: Optional[str] = None, x_pipeline_access_token: str = None)

Bases: BasePipelineDataRequest

Request to consume an event from a pipeline topic

Attributes:

Name Type Description
pipeline_id

The id of the pipeline

organization_id

The id of the organization

x_pipeline_access_token

The access token of the pipeline

ConsumeEventResponse dataclass

ConsumeEventResponse(content_type: str = dataclasses.field(), status_code: int = dataclasses.field(), raw_response: Response = dataclasses.field(), body: ConsumeEventResponseBody | None = None)

Bases: BaseResponse

Response to consume an event from a pipeline topic

Attributes:

Name Type Description
content_type

HTTP response content type for this operation

status_code

HTTP response status code for this operation

raw_response

Raw HTTP response; suitable for custom response parsing

body ConsumeEventResponseBody | None

the response body from the api call

json

json() -> dict

Return the response body as a JSON object. This method is to have compatibility with the requests.Response.json() method

Returns:

Name Type Description
dict dict

The transformed event as a JSON object

Source code in src/glassflow/models/operations/consumeevent.py
def json(self) -> dict:
    """Return the response body as a JSON object.
    This method is to have compatibility with the requests.Response.json() method

    Returns:
        dict: The transformed event as a JSON object
    """
    return self.body.event

ConsumeEventResponseBody dataclass

ConsumeEventResponseBody(req_id: str = dataclasses.field(), receive_time: str = dataclasses.field(), event: dict)

Event response body after transformation

Attributes:

Name Type Description
req_id str

The request id

receive_time str

The time when the event was received

event dict

The event received

ConsumeFailedRequest dataclass

ConsumeFailedRequest(pipeline_id: str, organization_id: Optional[str] = None, x_pipeline_access_token: str = None)

Bases: BasePipelineDataRequest

Request to consume failed events from a pipeline

Attributes:

Name Type Description
pipeline_id

The id of the pipeline

organization_id

The id of the organization

x_pipeline_access_token

The access token of the pipeline

ConsumeFailedResponse dataclass

ConsumeFailedResponse(content_type: str = dataclasses.field(), status_code: int = dataclasses.field(), raw_response: Response = dataclasses.field(), body: ConsumeFailedResponseBody | None = None)

Bases: BaseResponse

Response to consume a failed event from a pipeline

Attributes:

Name Type Description
content_type

HTTP response content type for this operation

status_code

HTTP response status code for this operation

raw_response

Raw HTTP response; suitable for custom response parsing

body ConsumeFailedResponseBody | None

the response body from the api call

json

json() -> dict

Return the response body as a JSON object. This method is to have compatibility with the requests.Response.json() method

Returns:

Name Type Description
dict dict

The transformed event as a JSON object

Source code in src/glassflow/models/operations/consumefailed.py
def json(self) -> dict:
    """Return the response body as a JSON object.
    This method is to have compatibility with the requests.Response.json() method

    Returns:
        dict: The transformed event as a JSON object
    """
    return self.body.event

ConsumeFailedResponseBody dataclass

ConsumeFailedResponseBody(req_id: str = dataclasses.field(), receive_time: str = dataclasses.field(), event: dict)

Event response body after transformation

Attributes:

Name Type Description
req_id str

The request id

receive_time str

The time when the event was received

event dict

The event received

PublishEventRequest dataclass

PublishEventRequest(pipeline_id: str, organization_id: Optional[str] = None, x_pipeline_access_token: str = None, request_body: dict = None)

Bases: BasePipelineDataRequest

Request to publish an event to a pipeline topic

Attributes:

Name Type Description
pipeline_id

The id of the pipeline

organization_id

The id of the organization

x_pipeline_access_token

The access token of the pipeline

request_body dict

The request body / event that should be published to the pipeline

PublishEventResponse dataclass

PublishEventResponse(content_type: str = dataclasses.field(), status_code: int = dataclasses.field(), raw_response: Response = dataclasses.field(), object: PublishEventResponseBody | None = None)

Bases: BaseResponse

Response object for publish event operation

Attributes:

Name Type Description
content_type

HTTP response content type for this operation

status_code

HTTP response status code for this operation

raw_response

Raw HTTP response; suitable for custom response parsing

object PublishEventResponseBody | None

Response to the publish operation

PublishEventResponseBody dataclass

PublishEventResponseBody()

Message pushed to the pipeline