Skip to content

Reference

GlassFlow Python Client to interact with GlassFlow API

GlassFlowClient

GlassFlowClient(personal_access_token: str = None, organization_id: str = None)

Bases: APIClient

GlassFlow Client to interact with GlassFlow API and manage pipelines and other resources

Attributes:

Name Type Description
client Session

Session object to make HTTP requests to GlassFlow API

glassflow_config GlassFlowConfig

GlassFlow config object to store configuration

organization_id str

Organization ID of the user. If not provided, the default organization will be used

Parameters:

Name Type Description Default
personal_access_token str

GlassFlow Personal Access Token

None
organization_id str

Organization ID of the user. If not provided, the default organization will be used

None
Source code in src/glassflow/client.py
def __init__(
    self, personal_access_token: str = None, organization_id: str = None
) -> None:
    """Create a new GlassFlowClient object

    Args:
        personal_access_token: GlassFlow Personal Access Token
        organization_id: Organization ID of the user. If not provided,
            the default organization will be used
    """
    super().__init__()
    self.personal_access_token = personal_access_token
    self.organization_id = organization_id
    self.request_headers = {"Personal-Access-Token": self.personal_access_token}
    self.request_query_params = {"organization_id": self.organization_id}

get_pipeline

get_pipeline(pipeline_id: str) -> Pipeline

Gets a Pipeline object from the GlassFlow API

Parameters:

Name Type Description Default
pipeline_id str

UUID of the pipeline

required

Returns:

Name Type Description
Pipeline Pipeline

Pipeline object from the GlassFlow API

Raises:

Type Description
PipelineNotFoundError

Pipeline does not exist

PipelineUnauthorizedError

User does not have permission to perform the requested operation

ClientError

GlassFlow Client Error

Source code in src/glassflow/client.py
def get_pipeline(self, pipeline_id: str) -> Pipeline:
    """Gets a Pipeline object from the GlassFlow API

    Args:
        pipeline_id: UUID of the pipeline

    Returns:
        Pipeline: Pipeline object from the GlassFlow API

    Raises:
        errors.PipelineNotFoundError: Pipeline does not exist
        errors.PipelineUnauthorizedError: User does not have permission to
            perform the requested operation
        errors.ClientError: GlassFlow Client Error
    """
    return Pipeline(
        personal_access_token=self.personal_access_token,
        id=pipeline_id,
        organization_id=self.organization_id,
    ).fetch()

create_pipeline

create_pipeline(name: str, space_id: str, transformation_file: str = None, requirements: str = None, source_kind: str = None, source_config: dict = None, sink_kind: str = None, sink_config: dict = None, env_vars: list[dict[str, str]] = None, state: str = 'running', metadata: dict = None) -> Pipeline

Creates a new GlassFlow pipeline

Parameters:

Name Type Description Default
name str

Name of the pipeline

required
space_id str

ID of the GlassFlow Space you want to create the pipeline in

required
transformation_file str

Path to file with transformation function of the pipeline.

None
requirements str

Requirements.txt of the pipeline

None
source_kind str

Kind of source for the pipeline. If no source is provided, the default source will be SDK

None
source_config dict

Configuration of the pipeline's source

None
sink_kind str

Kind of sink for the pipeline. If no sink is provided, the default sink will be SDK

None
sink_config dict

Configuration of the pipeline's sink

None
env_vars list[dict[str, str]]

Environment variables to pass to the pipeline

None
state str

State of the pipeline after creation. It can be either "running" or "paused"

'running'
metadata dict

Metadata of the pipeline

None

Returns:

Name Type Description
Pipeline Pipeline

New pipeline

Raises:

Type Description
PipelineUnauthorizedError

User does not have permission to perform the requested operation

Source code in src/glassflow/client.py
def create_pipeline(
    self,
    name: str,
    space_id: str,
    transformation_file: str = None,
    requirements: str = None,
    source_kind: str = None,
    source_config: dict = None,
    sink_kind: str = None,
    sink_config: dict = None,
    env_vars: list[dict[str, str]] = None,
    state: str = "running",
    metadata: dict = None,
) -> Pipeline:
    """Creates a new GlassFlow pipeline

    Args:
        name: Name of the pipeline
        space_id: ID of the GlassFlow Space you want to create the pipeline in
        transformation_file: Path to file with transformation function of
            the pipeline.
        requirements: Requirements.txt of the pipeline
        source_kind: Kind of source for the pipeline. If no source is
            provided, the default source will be SDK
        source_config: Configuration of the pipeline's source
        sink_kind: Kind of sink for the pipeline. If no sink is provided,
            the default sink will be SDK
        sink_config: Configuration of the pipeline's sink
        env_vars: Environment variables to pass to the pipeline
        state: State of the pipeline after creation.
            It can be either "running" or "paused"
        metadata: Metadata of the pipeline

    Returns:
        Pipeline: New pipeline

    Raises:
        errors.PipelineUnauthorizedError: User does not have permission to perform
            the requested operation
    """
    return Pipeline(
        name=name,
        space_id=space_id,
        transformation_file=transformation_file,
        requirements=requirements,
        source_kind=source_kind,
        source_config=source_config,
        sink_kind=sink_kind,
        sink_config=sink_config,
        env_vars=env_vars,
        state=state,
        metadata=metadata,
        organization_id=self.organization_id,
        personal_access_token=self.personal_access_token,
    ).create()

list_pipelines

list_pipelines(space_ids: list[str] = None) -> ListPipelinesResponse

Lists all pipelines in the GlassFlow API

Parameters:

Name Type Description Default
space_ids list[str]

List of Space IDs of the pipelines to list. If not specified, all the pipelines will be listed.

None

Returns:

Type Description
ListPipelinesResponse

responses.ListPipelinesResponse: Response object with the pipelines listed

Raises:

Type Description
PipelineUnauthorizedError

User does not have permission to perform the requested operation

Source code in src/glassflow/client.py
def list_pipelines(
    self, space_ids: list[str] = None
) -> responses.ListPipelinesResponse:
    """
    Lists all pipelines in the GlassFlow API

    Args:
        space_ids: List of Space IDs of the pipelines to list.
            If not specified, all the pipelines will be listed.

    Returns:
        responses.ListPipelinesResponse: Response object with the pipelines listed

    Raises:
        errors.PipelineUnauthorizedError: User does not have permission to
            perform the requested operation
    """

    endpoint = "/pipelines"
    query_params = {"space_id": space_ids} if space_ids else {}
    http_res = self._request(
        method="GET", endpoint=endpoint, request_query_params=query_params
    )
    return responses.ListPipelinesResponse(**http_res.json())

create_space

create_space(name: str) -> Space

Creates a new Space

Parameters:

Name Type Description Default
name str

Name of the Space

required

Returns:

Name Type Description
Space Space

New space

Raises:

Type Description
SpaceUnauthorizedError

User does not have permission to perform the requested operation

Source code in src/glassflow/client.py
def create_space(
    self,
    name: str,
) -> Space:
    """Creates a new Space

    Args:
        name: Name of the Space

    Returns:
        Space: New space

    Raises:
        errors.SpaceUnauthorizedError: User does not have permission to perform
            the requested operation
    """
    return Space(
        name=name,
        personal_access_token=self.personal_access_token,
        organization_id=self.organization_id,
    ).create()

list_spaces

list_spaces() -> ListSpacesResponse

Lists all GlassFlow spaces in the GlassFlow API

Returns:

Type Description
ListSpacesResponse

response.ListSpacesResponse: Response object with the spaces listed

Raises:

Type Description
SpaceUnauthorizedError

User does not have permission to perform the requested operation

Source code in src/glassflow/client.py
def list_spaces(self) -> responses.ListSpacesResponse:
    """
    Lists all GlassFlow spaces in the GlassFlow API

    Returns:
        response.ListSpacesResponse: Response object with the spaces listed

    Raises:
        errors.SpaceUnauthorizedError: User does not have permission to perform the
            requested operation
    """

    endpoint = "/spaces"
    http_res = self._request(method="GET", endpoint=endpoint)
    return responses.ListSpacesResponse(**http_res.json())

create_secret

create_secret(key: str, value: str) -> Secret

Creates a new secret

Parameters:

Name Type Description Default
key str

Secret key (must be unique in your organization)

required
value str

Secret value

required

Returns:

Name Type Description
Secret Secret

New secret

Raises:

Type Description
SecretUnauthorizedError

User does not have permission to perform the requested operation

Source code in src/glassflow/client.py
def create_secret(self, key: str, value: str) -> Secret:
    """
    Creates a new secret

    Args:
        key: Secret key (must be unique in your organization)
        value: Secret value

    Returns:
        Secret: New secret

    Raises:
        errors.SecretUnauthorizedError: User does not have permission to perform the
            requested operation
    """
    return Secret(
        key=key,
        value=value,
        personal_access_token=self.personal_access_token,
        organization_id=self.organization_id,
    ).create()

list_secrets

list_secrets() -> ListSecretsResponse

Lists all GlassFlow secrets in the GlassFlow API

Returns:

Type Description
ListSecretsResponse

responses.ListSecretsResponse: Response object with the secrets listed

Raises:

Type Description
SecretUnauthorizedError

User does not have permission to perform the requested operation

Source code in src/glassflow/client.py
def list_secrets(self) -> responses.ListSecretsResponse:
    """
    Lists all GlassFlow secrets in the GlassFlow API

    Returns:
        responses.ListSecretsResponse: Response object with the secrets listed

    Raises:
        errors.SecretUnauthorizedError: User does not have permission to perform the
            requested operation
    """
    endpoint = "/secrets"
    http_res = self._request(method="GET", endpoint=endpoint)
    return responses.ListSecretsResponse(**http_res.json())

Pipeline

Pipeline(personal_access_token: str, name: str | None = None, space_id: str | None = None, id: str | None = None, source_kind: str | None = None, source_config: dict | None = None, sink_kind: str | None = None, sink_config: dict | None = None, requirements: str | None = None, transformation_file: str | None = None, env_vars: list[dict[str, str]] | None = None, state: str = 'running', organization_id: str | None = None, metadata: dict | None = None, created_at: str | None = None)

Bases: APIClient

Parameters:

Name Type Description Default
personal_access_token str

The personal access token to authenticate against GlassFlow

required
id str | None

Pipeline ID

None
name str | None

Name of the pipeline

None
space_id str | None

ID of the GlassFlow Space you want to create the pipeline in

None
transformation_file str | None

Path to file with transformation function of the pipeline.

None
requirements str | None

Requirements.txt of the pipeline

None
source_kind str | None

Kind of source for the pipeline. If no source is provided, the default source will be SDK

None
source_config dict | None

Configuration of the pipeline's source

None
sink_kind str | None

Kind of sink for the pipeline. If no sink is provided, the default sink will be SDK

None
sink_config dict | None

Configuration of the pipeline's sink

None
env_vars list[dict[str, str]] | None

Environment variables to pass to the pipeline

None
state str

State of the pipeline after creation. It can be either "running" or "paused"

'running'
metadata dict | None

Metadata of the pipeline

None
created_at str | None

Timestamp when the pipeline was created

None

Raises:

Type Description
FileNotFoundError

If the transformation file is provided and does not exist

Source code in src/glassflow/pipeline.py
def __init__(
    self,
    personal_access_token: str,
    name: str | None = None,
    space_id: str | None = None,
    id: str | None = None,
    source_kind: str | None = None,
    source_config: dict | None = None,
    sink_kind: str | None = None,
    sink_config: dict | None = None,
    requirements: str | None = None,
    transformation_file: str | None = None,
    env_vars: list[dict[str, str]] | None = None,
    state: str = "running",
    organization_id: str | None = None,
    metadata: dict | None = None,
    created_at: str | None = None,
):
    """Creates a new GlassFlow pipeline object

    Args:
        personal_access_token: The personal access token to authenticate
            against GlassFlow
        id: Pipeline ID
        name: Name of the pipeline
        space_id: ID of the GlassFlow Space you want to create the pipeline in
        transformation_file: Path to file with transformation function of
            the pipeline.
        requirements: Requirements.txt of the pipeline
        source_kind: Kind of source for the pipeline. If no source is
            provided, the default source will be SDK
        source_config: Configuration of the pipeline's source
        sink_kind: Kind of sink for the pipeline. If no sink is provided,
            the default sink will be SDK
        sink_config: Configuration of the pipeline's sink
        env_vars: Environment variables to pass to the pipeline
        state: State of the pipeline after creation.
            It can be either "running" or "paused"
        metadata: Metadata of the pipeline
        created_at: Timestamp when the pipeline was created

    Raises:
        FileNotFoundError: If the transformation file is provided and
            does not exist
    """
    super().__init__()
    self.id = id
    self.name = name
    self.space_id = space_id
    self.personal_access_token = personal_access_token
    self.source_kind = source_kind
    self.source_config = source_config
    self.sink_kind = sink_kind
    self.sink_config = sink_config
    self.requirements = requirements
    self.transformation_code = None
    self.transformation_file = transformation_file
    self.env_vars = env_vars
    self.state = state
    self.organization_id = organization_id
    self.metadata = metadata if metadata is not None else {}
    self.created_at = created_at
    self.access_tokens: list[AccessToken] = []

    self.headers = {"Personal-Access-Token": self.personal_access_token}
    self.query_params = {"organization_id": self.organization_id}
    if self.transformation_file is not None:
        self._read_transformation_file()

    self.source_connector = self._fill_connector(
        "source",
        self.source_kind,
        self.source_config,
    )
    self.sink_connector = self._fill_connector(
        "sink", self.sink_kind, self.sink_config
    )

fetch

fetch() -> Pipeline

Fetches pipeline information from the GlassFlow API

Returns:

Type Description
Pipeline

Pipeline object

Raises:

Type Description
ValueError

If ID is not provided in the constructor

PipelineNotFoundError

If ID provided does not match any existing pipeline in GlassFlow

PipelineUnauthorizedError

If the Personal Access Token is not provider or is invalid

Source code in src/glassflow/pipeline.py
def fetch(self) -> Pipeline:
    """
    Fetches pipeline information from the GlassFlow API

    Returns:
        Pipeline object

    Raises:
        ValueError: If ID is not provided in the constructor
        errors.PipelineNotFoundError: If ID provided does not match any
            existing pipeline in GlassFlow
        errors.PipelineUnauthorizedError: If the Personal Access Token is not
            provider or is invalid
    """
    if self.id is None:
        raise ValueError(
            "Pipeline id must be provided in order to fetch it's details"
        )

    endpoint = f"/pipelines/{self.id}"
    http_res = self._request(method="GET", endpoint=endpoint)
    fetched_pipeline = api.GetDetailedSpacePipeline(**http_res.json())
    self._fill_pipeline_details(fetched_pipeline)
    # Fetch Pipeline Access Tokens
    self._list_access_tokens()
    # Fetch function source
    self._get_function_artifact()
    return self

create

create() -> Pipeline

Creates a new GlassFlow pipeline

Returns:

Type Description
Pipeline

Pipeline object

Raises:

Type Description
ValueError

If name is not provided in the constructor

ValueError

If space_id is not provided in the constructor

ValueError

If transformation_file is not provided in the constructor

Source code in src/glassflow/pipeline.py
def create(self) -> Pipeline:
    """
    Creates a new GlassFlow pipeline

    Returns:
        Pipeline object

    Raises:
        ValueError: If name is not provided in the constructor
        ValueError: If space_id is not provided in the constructor
        ValueError: If transformation_file is not provided
            in the constructor
    """

    if self.name is None:
        raise ValueError("Name must be provided in order to create the pipeline")
    if self.space_id is None:
        raise ValueError("Argument space_id must be provided in the constructor")
    if self.transformation_file is None:
        raise ValueError(
            "Argument transformation_file must be provided in the constructor"
        )
    else:
        self._read_transformation_file()

    create_pipeline = api.CreatePipeline(
        name=self.name,
        space_id=self.space_id,
        transformation_function=self.transformation_code,
        requirements_txt=self.requirements,
        source_connector=self.source_connector,
        sink_connector=self.sink_connector,
        environments=self.env_vars,
        state=api.PipelineState(self.state),
        metadata=self.metadata,
    )
    endpoint = "/pipelines"
    http_res = self._request(
        method="POST",
        endpoint=endpoint,
        json=create_pipeline.model_dump(exclude_none=True),
    )
    res_json = http_res.json()
    # using custom operations model because api model does not exist
    res = operations.CreatePipeline(
        **res_json,
    )
    self.id = res.id
    self.created_at = res.created_at
    self.space_id = res.space_id
    self.access_tokens.append(
        AccessToken(
            name="default",
            token=res.access_token,
            id="default",
            created_at=res.created_at,
        )
    )
    return self

update

update(name: str | None = None, state: str | None = None, transformation_file: str | None = None, requirements: str | None = None, metadata: dict | None = None, source_kind: str | None = None, source_config: dict | None = None, sink_kind: str | None = None, sink_config: dict | None = None, env_vars: list[dict[str, str]] | None = None) -> Pipeline

Updates a GlassFlow pipeline

Parameters:

Name Type Description Default
name str | None

Name of the pipeline

None
state str | None

State of the pipeline after creation. It can be either "running" or "paused"

None
transformation_file str | None

Path to file with transformation function of the pipeline.

None
requirements str | None

Requirements.txt of the pipeline

None
source_kind str | None

Kind of source for the pipeline. If no source is provided, the default source will be SDK

None
source_config dict | None

Configuration of the pipeline's source

None
sink_kind str | None

Kind of sink for the pipeline. If no sink is provided, the default sink will be SDK

None
sink_config dict | None

Configuration of the pipeline's sink

None
env_vars list[dict[str, str]] | None

Environment variables to pass to the pipeline

None
metadata dict | None

Metadata of the pipeline

None

Returns:

Type Description
Pipeline

Updated pipeline

Source code in src/glassflow/pipeline.py
def update(
    self,
    name: str | None = None,
    state: str | None = None,
    transformation_file: str | None = None,
    requirements: str | None = None,
    metadata: dict | None = None,
    source_kind: str | None = None,
    source_config: dict | None = None,
    sink_kind: str | None = None,
    sink_config: dict | None = None,
    env_vars: list[dict[str, str]] | None = None,
) -> Pipeline:
    """
    Updates a GlassFlow pipeline

    Args:
        name: Name of the pipeline
        state: State of the pipeline after creation.
            It can be either "running" or "paused"
        transformation_file: Path to file with transformation function of
            the pipeline.
        requirements: Requirements.txt of the pipeline
        source_kind: Kind of source for the pipeline. If no source is
            provided, the default source will be SDK
        source_config: Configuration of the pipeline's source
        sink_kind: Kind of sink for the pipeline. If no sink is provided,
            the default sink will be SDK
        sink_config: Configuration of the pipeline's sink
        env_vars: Environment variables to pass to the pipeline
        metadata: Metadata of the pipeline

    Returns:
        Updated pipeline

    """
    self.fetch()
    if transformation_file is not None or requirements is not None:
        if transformation_file is not None:
            with open(transformation_file) as f:
                file = f.read()
        else:
            file = self.transformation_code

        if requirements is None:
            requirements = self.requirements

        self._upload_function_artifact(file, requirements)
        self.requirements = requirements
        self.transformation_code = file

    if source_kind is not None:
        source_connector = self._fill_connector(
            "source",
            source_kind,
            source_config,
        )
    else:
        source_connector = self.source_connector

    if sink_kind is not None:
        sink_connector = self._fill_connector("sink", sink_kind, sink_config)
    else:
        sink_connector = self.sink_connector

    if env_vars is not None:
        self._update_function(env_vars)

    # using custom model because api model does not exist
    pipeline_req = operations.UpdatePipelineRequest(
        name=name if name is not None else self.name,
        state=state if state is not None else self.state,
        metadata=metadata if metadata is not None else self.metadata,
        source_connector=source_connector,
        sink_connector=sink_connector,
    )

    endpoint = f"/pipelines/{self.id}"
    body = pipeline_req.model_dump_json(exclude_none=True)
    http_res = self._request(method="PATCH", endpoint=endpoint, data=body)
    # Fetch updated pipeline details and validate
    updated_pipeline = api.GetDetailedSpacePipeline(**http_res.json())
    self._fill_pipeline_details(updated_pipeline)
    return self

delete

delete() -> None

Deletes a GlassFlow pipeline

Raises:

Type Description
ValueError

If ID is not provided in the constructor

PipelineNotFoundError

If ID provided does not match any existing pipeline in GlassFlow

PipelineUnauthorizedError

If the Personal Access Token is not provided or is invalid

Source code in src/glassflow/pipeline.py
def delete(self) -> None:
    """
    Deletes a GlassFlow pipeline

    Raises:
        ValueError: If ID is not provided in the constructor
        error.PipelineNotFoundError: If ID provided does not match any
            existing pipeline in GlassFlow
        errors.PipelineUnauthorizedError: If the Personal Access Token is not
            provided or is invalid
    """
    if self.id is None:
        raise ValueError("Pipeline id must be provided")

    endpoint = f"/pipelines/{self.id}"
    self._request(method="DELETE", endpoint=endpoint)

get_logs

get_logs(page_size: int = 50, page_token: str | None = None, severity_code: int = 100, start_time: str | None = None, end_time: str | None = None) -> FunctionLogsResponse

Get the pipeline's logs

Parameters:

Name Type Description Default
page_size int

Pagination size

50
page_token str | None

Page token filter (use for pagination)

None
severity_code int

Severity code filter (100, 200, 300, 400, 500)

100
start_time str | None

Start time filter

None
end_time str | None

End time filter

None

Returns:

Type Description
FunctionLogsResponse

Response with the logs

Source code in src/glassflow/pipeline.py
def get_logs(
    self,
    page_size: int = 50,
    page_token: str | None = None,
    severity_code: int = 100,
    start_time: str | None = None,
    end_time: str | None = None,
) -> responses.FunctionLogsResponse:
    """
    Get the pipeline's logs

    Args:
        page_size: Pagination size
        page_token: Page token filter (use for pagination)
        severity_code: Severity code filter (100, 200, 300, 400, 500)
        start_time: Start time filter
        end_time: End time filter

    Returns:
        Response with the logs
    """

    query_params = {
        "page_size": page_size,
        "page_token": page_token,
        "severity_code": severity_code,
        "start_time": start_time,
        "end_time": end_time,
    }
    endpoint = f"/pipelines/{self.id}/functions/main/logs"
    http_res = self._request(
        method="GET", endpoint=endpoint, request_query_params=query_params
    )
    base_res_json = http_res.json()
    logs = [responses.FunctionLogEntry(**entry) for entry in base_res_json["logs"]]
    return responses.FunctionLogsResponse(
        logs=logs,
        next=base_res_json["next"],
    )

get_source

get_source(pipeline_access_token_name: str | None = None) -> PipelineDataSource

Get source client to publish data to the pipeline

Parameters:

Name Type Description Default
pipeline_access_token_name str | None

Name of the pipeline access token to use. If not specified, the default token will be used

None

Returns:

Type Description
PipelineDataSource

Source client to publish data to the pipeline

Raises:

Type Description
ValueError

If pipeline id is not provided in the constructor

Source code in src/glassflow/pipeline.py
def get_source(
    self, pipeline_access_token_name: str | None = None
) -> PipelineDataSource:
    """
    Get source client to publish data to the pipeline

    Args:
        pipeline_access_token_name (str | None): Name of the pipeline
            access token to use. If not specified, the default token
            will be used

    Returns:
        Source client to publish data to the pipeline

    Raises:
        ValueError: If pipeline id is not provided in the constructor
    """
    return self._get_data_client("source", pipeline_access_token_name)

get_sink

get_sink(pipeline_access_token_name: str | None = None) -> PipelineDataSink

Get sink client to consume data from the pipeline

Parameters:

Name Type Description Default
pipeline_access_token_name str | None

Name of the pipeline access token to use. If not specified, the default token will be used

None

Returns:

Type Description
PipelineDataSink

Sink client to consume data from the pipeline

Raises:

Type Description
ValueError

If pipeline id is not provided in the constructor

Source code in src/glassflow/pipeline.py
def get_sink(
    self, pipeline_access_token_name: str | None = None
) -> PipelineDataSink:
    """
    Get sink client to consume data from the pipeline

    Args:
        pipeline_access_token_name (str | None): Name of the pipeline
            access token to use. If not specified, the default token
            will be used

    Returns:
        Sink client to consume data from the pipeline

    Raises:
        ValueError: If pipeline id is not provided in the constructor
    """
    return self._get_data_client("sink", pipeline_access_token_name)

test

test(data: dict) -> TestFunctionResponse

Test a pipeline's function with a sample input JSON

Parameters:

Name Type Description Default
data dict

Input JSON

required

Returns:

Type Description
TestFunctionResponse

Test function response

Source code in src/glassflow/pipeline.py
def test(self, data: dict) -> responses.TestFunctionResponse:
    """
    Test a pipeline's function with a sample input JSON

    Args:
        data: Input JSON

    Returns:
        Test function response
    """
    endpoint = f"/pipelines/{self.id}/functions/main/test"
    request_body = data
    http_res = self._request(method="POST", endpoint=endpoint, json=request_body)
    base_res_json = http_res.json()
    return responses.TestFunctionResponse(
        **base_res_json,
    )

PipelineDataClient

PipelineDataClient(pipeline_id: str, pipeline_access_token: str)

Bases: APIClient

Base Client object to publish and consume events from the given pipeline.

Attributes:

Name Type Description
glassflow_config GlassFlowConfig

GlassFlowConfig object to interact with GlassFlow API

pipeline_id str

The pipeline id to interact with

pipeline_access_token str

The access token to access the pipeline

Source code in src/glassflow/pipeline_data.py
def __init__(self, pipeline_id: str, pipeline_access_token: str):
    super().__init__()
    self.pipeline_id = pipeline_id
    self.pipeline_access_token = pipeline_access_token
    self.headers = {"X-PIPELINE-ACCESS-TOKEN": self.pipeline_access_token}
    self.query_params = {}

validate_credentials

validate_credentials() -> None

Check if the pipeline credentials are valid and raise an error if not

Source code in src/glassflow/pipeline_data.py
def validate_credentials(self) -> None:
    """
    Check if the pipeline credentials are valid and raise an error if not
    """

    endpoint = f"/pipelines/{self.pipeline_id}/status/access_token"
    return self._request(method="GET", endpoint=endpoint)

PipelineDataSource

PipelineDataSource(pipeline_id: str, pipeline_access_token: str)

Bases: PipelineDataClient

Source code in src/glassflow/pipeline_data.py
def __init__(self, pipeline_id: str, pipeline_access_token: str):
    super().__init__()
    self.pipeline_id = pipeline_id
    self.pipeline_access_token = pipeline_access_token
    self.headers = {"X-PIPELINE-ACCESS-TOKEN": self.pipeline_access_token}
    self.query_params = {}

publish

publish(request_body: dict) -> PublishEventResponse

Push a new message into the pipeline

Parameters:

Name Type Description Default
request_body dict

The message to be published into the pipeline

required

Returns:

Type Description
PublishEventResponse

Response object containing the status code and the raw response

Raises:

Type Description
ClientError

If an error occurred while publishing the event

Source code in src/glassflow/pipeline_data.py
def publish(self, request_body: dict) -> responses.PublishEventResponse:
    """Push a new message into the pipeline

    Args:
        request_body: The message to be published into the pipeline

    Returns:
        Response object containing the status code and the raw response

    Raises:
        errors.ClientError: If an error occurred while publishing the event
    """
    endpoint = f"/pipelines/{self.pipeline_id}/topics/input/events"
    http_res = self._request(method="POST", endpoint=endpoint, json=request_body)
    return responses.PublishEventResponse(
        status_code=http_res.status_code,
    )

PipelineDataSink

PipelineDataSink(pipeline_id: str, pipeline_access_token: str)

Bases: PipelineDataClient

Source code in src/glassflow/pipeline_data.py
def __init__(self, pipeline_id: str, pipeline_access_token: str):
    super().__init__(pipeline_id, pipeline_access_token)

    # retry delay for consuming messages (in seconds)
    self._consume_retry_delay_minimum = 1
    self._consume_retry_delay_current = 1
    self._consume_retry_delay_max = 60

consume

consume() -> ConsumeEventResponse

Consume the last message from the pipeline

Returns:

Type Description
ConsumeEventResponse

Response object containing the status code and the raw response

Raises:

Type Description
ClientError

If an error occurred while consuming the event

Source code in src/glassflow/pipeline_data.py
def consume(self) -> responses.ConsumeEventResponse:
    """Consume the last message from the pipeline

    Returns:
        Response object containing the status code and the raw response

    Raises:
        errors.ClientError: If an error occurred while consuming the event

    """

    endpoint = f"/pipelines/{self.pipeline_id}/topics/output/events/consume"
    self._respect_retry_delay()
    http_res = self._request(method="POST", endpoint=endpoint)
    self._update_retry_delay(http_res.status_code)

    body = None
    if http_res.status_code == 200:
        body = http_res.json()
        self._consume_retry_delay_current = self._consume_retry_delay_minimum

    return responses.ConsumeEventResponse(
        status_code=http_res.status_code, body=body
    )

consume_failed

consume_failed() -> ConsumeFailedResponse

Consume the failed message from the pipeline

Returns:

Type Description
ConsumeFailedResponse

Response object containing the status code and the raw response

Raises:

Type Description
ClientError

If an error occurred while consuming the event

Source code in src/glassflow/pipeline_data.py
def consume_failed(self) -> responses.ConsumeFailedResponse:
    """Consume the failed message from the pipeline

    Returns:
        Response object containing the status code and the raw response

    Raises:
        errors.ClientError: If an error occurred while consuming the event

    """

    self._respect_retry_delay()
    endpoint = f"/pipelines/{self.pipeline_id}/topics/failed/events/consume"
    http_res = self._request(method="POST", endpoint=endpoint)

    self._update_retry_delay(http_res.status_code)
    body = None
    if http_res.status_code == 200:
        body = http_res.json()
        self._consume_retry_delay_current = self._consume_retry_delay_minimum

    return responses.ConsumeFailedResponse(
        status_code=http_res.status_code, body=body
    )

Secret

Secret(personal_access_token: str, key: str | None = None, value: str | None = None, organization_id: str | None = None)

Bases: APIClient

Parameters:

Name Type Description Default
personal_access_token str

The personal access token to authenticate against GlassFlow

required
key str | None

Name of the secret. It must start with a letter, and it can only contain characters in a-zA-Z0-9_

None
value str | None

Value of the secret to store

None

Raises:

Type Description
SecretInvalidKeyError

If secret key is invalid

Source code in src/glassflow/secret.py
def __init__(
    self,
    personal_access_token: str,
    key: str | None = None,
    value: str | None = None,
    organization_id: str | None = None,
):
    """
    Creates a new Glassflow Secret object

    Args:
        personal_access_token: The personal access token to authenticate
            against GlassFlow
        key: Name of the secret. It must start with a letter,
            and it can only contain characters in a-zA-Z0-9_
        value: Value of the secret to store

    Raises:
        errors.SecretInvalidKeyError: If secret key is invalid
    """
    super().__init__()
    self.personal_access_token = personal_access_token
    self.organization_id = organization_id
    self.key = key
    self.value = value
    self.headers = {"Personal-Access-Token": self.personal_access_token}
    self.query_params = {"organization_id": self.organization_id}

    if self.key and not self._is_key_valid(self.key):
        raise errors.SecretInvalidKeyError(self.key)

create

create() -> Secret

Creates a new Glassflow Secret

Returns:

Type Description
Secret

Secret object

Raises:

Type Description
ValueError

If secret key or value are not set in the constructor

SecretUnauthorizedError

If personal access token is invalid

Source code in src/glassflow/secret.py
def create(self) -> Secret:
    """
    Creates a new Glassflow Secret

    Returns:
        Secret object

    Raises:
        ValueError: If secret key or value are not set in the constructor
        errors.SecretUnauthorizedError: If personal access token is invalid
    """
    if self.key is None:
        raise ValueError("Secret key is required in the constructor")
    if self.value is None:
        raise ValueError("Secret value is required in the constructor")

    secret_api_obj = api.CreateSecret(
        **{
            "key": self.key,
            "value": self.value,
        }
    )

    endpoint = "/secrets"
    self._request(
        method="POST", endpoint=endpoint, json=secret_api_obj.model_dump()
    )
    return self

delete

delete() -> None

Deletes a Glassflow Secret.

Raises:

Type Description
SecretUnauthorizedError

If personal access token is invalid

SecretNotFoundError

If secret key does not exist

ValueError

If secret key is not set in the constructor

Source code in src/glassflow/secret.py
def delete(self) -> None:
    """
    Deletes a Glassflow Secret.

    Raises:
        errors.SecretUnauthorizedError: If personal access token is invalid
        errors.SecretNotFoundError: If secret key does not exist
        ValueError: If secret key is not set in the constructor
    """
    if self.key is None:
        raise ValueError("Secret key is required in the constructor")

    endpoint = f"/secrets/{self.key}"
    self._request(method="DELETE", endpoint=endpoint)

Space

Space(personal_access_token: str, name: str | None = None, id: str | None = None, created_at: datetime | None = None, organization_id: str | None = None)

Bases: APIClient

Parameters:

Name Type Description Default
personal_access_token str

The personal access token to authenticate against GlassFlow

required
name str | None

Name of the space

None
id str | None

ID of the GlassFlow Space you want to create the pipeline in

None
created_at datetime | None

Timestamp when the space was created

None
Source code in src/glassflow/space.py
def __init__(
    self,
    personal_access_token: str,
    name: str | None = None,
    id: str | None = None,
    created_at: datetime.datetime | None = None,
    organization_id: str | None = None,
):
    """Creates a new GlassFlow space object

    Args:
        personal_access_token: The personal access token to authenticate
            against GlassFlow
        name: Name of the space
        id: ID of the GlassFlow Space you want to create the pipeline in
        created_at: Timestamp when the space was created

    """
    super().__init__()
    self.name = name
    self.id = id
    self.created_at = created_at
    self.organization_id = organization_id
    self.personal_access_token = personal_access_token
    self.headers = {"Personal-Access-Token": self.personal_access_token}
    self.query_params = {"organization_id": self.organization_id}

create

create() -> Space

Creates a new GlassFlow space

Returns:

Type Description
Space

Space object

Raises:

Type Description
ValueError

If name is not provided in the constructor

Source code in src/glassflow/space.py
def create(self) -> Space:
    """
    Creates a new GlassFlow space

    Returns:
        Space object

    Raises:
        ValueError: If name is not provided in the constructor

    """
    space_api_obj = api.CreateSpace(name=self.name)

    endpoint = "/spaces"
    http_res = self._request(
        method="POST", endpoint=endpoint, json=space_api_obj.model_dump()
    )

    space_created = api.Space(**http_res.json())
    self.id = space_created.id
    self.created_at = space_created.created_at
    self.name = space_created.name
    return self

delete

delete() -> None

Deletes a GlassFlow space

Raises:

Type Description
ValueError

If ID is not provided in the constructor

SpaceNotFoundError

If ID provided does not match any existing space in GlassFlow

SpaceUnauthorizedError

If the Personal Access Token is not provided or is invalid

SpaceIsNotEmptyError

If the Space is not empty

Source code in src/glassflow/space.py
def delete(self) -> None:
    """
    Deletes a GlassFlow space

    Raises:
        ValueError: If ID is not provided in the constructor
        errors.SpaceNotFoundError: If ID provided does not match any
            existing space in GlassFlow
        errors.SpaceUnauthorizedError: If the Personal Access Token is not
            provided or is invalid
        errors.SpaceIsNotEmptyError: If the Space is not empty
    """
    if self.id is None:
        raise ValueError("Space id must be provided in the constructor")

    endpoint = f"/spaces/{self.id}"
    self._request(method="DELETE", endpoint=endpoint)

GlassFlowConfig

Bases: BaseModel

Configuration object for GlassFlowClient

Attributes:

Name Type Description
server_url str

The base URL of the GlassFlow API

sdk_version str

The version of the GlassFlow Python SDK

user_agent str

The user agent to be used in the requests

ClientError

ClientError(detail: str, status_code: int, body: str, raw_response: Response)

Bases: Exception

Represents an error returned by the API.

Attributes:

Name Type Description
detail str

A message describing the error

status_code int

The status code of the response

body str

The response body

raw_response Response

The raw response object

Parameters:

Name Type Description Default
detail str

A message describing the error

required
status_code int

The status code of the response

required
body str

The response body

required
raw_response Response

The raw response object

required
Source code in src/glassflow/models/errors/clienterror.py
def __init__(
    self,
    detail: str,
    status_code: int,
    body: str,
    raw_response: requests_http.Response,
):
    """Create a new ClientError object

    Args:
        detail: A message describing the error
        status_code: The status code of the response
        body: The response body
        raw_response: The raw response object
    """
    self.detail = detail
    self.status_code = status_code
    self.body = body
    self.raw_response = raw_response

UnauthorizedError

UnauthorizedError(raw_response: Response)

Bases: ClientError

Error caused by a user not authorized.

Source code in src/glassflow/models/errors/clienterror.py
def __init__(self, raw_response: requests_http.Response):
    super().__init__(
        detail="Unauthorized request, Personal Access Token used is invalid",
        status_code=401,
        body=raw_response.text,
        raw_response=raw_response,
    )

UnknownContentTypeError

UnknownContentTypeError(raw_response: Response)

Bases: ClientError

Error caused by an unknown content type response.

Source code in src/glassflow/models/errors/clienterror.py
def __init__(self, raw_response: requests_http.Response):
    content_type = raw_response.headers.get("Content-Type")
    super().__init__(
        detail=f"unknown content-type received: {content_type}",
        status_code=raw_response.status_code,
        body=raw_response.text,
        raw_response=raw_response,
    )

UnknownError

UnknownError(raw_response: Response)

Bases: ClientError

Error caused by an unknown error.

Source code in src/glassflow/models/errors/clienterror.py
def __init__(self, raw_response: requests_http.Response):
    super().__init__(
        detail="Error in getting response from GlassFlow",
        status_code=raw_response.status_code,
        body=raw_response.text,
        raw_response=raw_response,
    )

Error

Bases: BaseModel

Bad request error response

Attributes:

Name Type Description
detail str

A message describing the error

ConnectorConfigValueError

ConnectorConfigValueError(connector_type: str)

Bases: Exception

Value error for missing connector settings.

Source code in src/glassflow/models/errors/pipeline.py
def __init__(self, connector_type: str):
    super().__init__(
        f"{connector_type}_kind and {connector_type}_config "
        f"or {connector_type}_config_secret_refs must be provided"
    )

PipelineAccessTokenInvalidError

PipelineAccessTokenInvalidError(raw_response: Response)

Bases: ClientError

Error caused by invalid access token.

Source code in src/glassflow/models/errors/pipeline.py
def __init__(self, raw_response: requests_http.Response):
    super().__init__(
        detail="The Pipeline Access Token used is invalid",
        status_code=raw_response.status_code,
        body=raw_response.text,
        raw_response=raw_response,
    )

PipelineArtifactStillInProgressError

PipelineArtifactStillInProgressError(pipeline_id: str, raw_response: Response)

Bases: ClientError

Error returned when the pipeline artifact is still being processed.

Source code in src/glassflow/models/errors/pipeline.py
def __init__(self, pipeline_id: str, raw_response: requests_http.Response):
    super().__init__(
        detail=f"Artifact from pipeline {pipeline_id} "
        f"is still in process, try again later.",
        status_code=raw_response.status_code,
        body=raw_response.text,
        raw_response=raw_response,
    )

PipelineNotFoundError

PipelineNotFoundError(pipeline_id: str, raw_response: Response)

Bases: ClientError

Error caused by a pipeline ID not found.

Source code in src/glassflow/models/errors/pipeline.py
def __init__(self, pipeline_id: str, raw_response: requests_http.Response):
    super().__init__(
        detail=f"Pipeline ID {pipeline_id} does not exist",
        status_code=raw_response.status_code,
        body=raw_response.text,
        raw_response=raw_response,
    )

PipelineTooManyRequestsError

PipelineTooManyRequestsError(raw_response: Response)

Bases: ClientError

Error caused by too many requests to a pipeline.

Source code in src/glassflow/models/errors/pipeline.py
def __init__(self, raw_response: requests_http.Response):
    super().__init__(
        detail="Too many requests",
        status_code=raw_response.status_code,
        body=raw_response.text,
        raw_response=raw_response,
    )

PipelineUnauthorizedError

PipelineUnauthorizedError(pipeline_id: str, raw_response: Response)

Bases: ClientError

Pipeline operation not authorized, invalid Personal Access Token

Source code in src/glassflow/models/errors/pipeline.py
def __init__(self, pipeline_id: str, raw_response: requests_http.Response):
    super().__init__(
        detail=f"Unauthorized request on pipeline {pipeline_id}, "
        f"Personal Access Token used is invalid",
        status_code=raw_response.status_code,
        body=raw_response.text,
        raw_response=raw_response,
    )

SecretInvalidKeyError

SecretInvalidKeyError(secret_key: str)

Bases: Exception

Error caused by a Secret Key has invalid format.

Source code in src/glassflow/models/errors/secret.py
def __init__(self, secret_key: str):
    super().__init__(
        f"Secret key {secret_key} has invalid format, it must start with a letter, "
        f"and it can only contain characters in a-zA-Z0-9_"
    )

SecretNotFoundError

SecretNotFoundError(secret_key: str, raw_response: Response)

Bases: ClientError

Error caused by a Secret Key not found.

Source code in src/glassflow/models/errors/secret.py
def __init__(self, secret_key: str, raw_response: requests_http.Response):
    super().__init__(
        detail=f"Secret Key {secret_key} does not exist",
        status_code=404,
        body=raw_response.text,
        raw_response=raw_response,
    )

SecretUnauthorizedError

SecretUnauthorizedError(secret_key: str, raw_response: Response)

Bases: ClientError

Secret operation not authorized, invalid Personal Access Token

Source code in src/glassflow/models/errors/secret.py
def __init__(self, secret_key: str, raw_response: requests_http.Response):
    super().__init__(
        detail=f"Unauthorized request on Secret {secret_key}, "
        f"Personal Access Token used is invalid",
        status_code=raw_response.status_code,
        body=raw_response.text,
        raw_response=raw_response,
    )

SpaceIsNotEmptyError

SpaceIsNotEmptyError(raw_response: Response)

Bases: ClientError

Error caused by trying to delete a space that is not empty.

Source code in src/glassflow/models/errors/space.py
def __init__(self, raw_response: requests_http.Response):
    super().__init__(
        detail=raw_response.json()["msg"],
        status_code=409,
        body=raw_response.text,
        raw_response=raw_response,
    )

SpaceNotFoundError

SpaceNotFoundError(space_id: str, raw_response: Response)

Bases: ClientError

Error caused by a space ID not found.

Source code in src/glassflow/models/errors/space.py
def __init__(self, space_id: str, raw_response: requests_http.Response):
    super().__init__(
        detail=f"Space ID {space_id} does not exist",
        status_code=404,
        body=raw_response.text,
        raw_response=raw_response,
    )

SpaceUnauthorizedError

SpaceUnauthorizedError(space_id: str, raw_response: Response)

Bases: ClientError

Space operation not authorized, invalid Personal Access Token

Source code in src/glassflow/models/errors/space.py
def __init__(self, space_id: str, raw_response: requests_http.Response):
    super().__init__(
        detail=f"Unauthorized request on Space {space_id}, "
        f"Personal Access Token used is invalid",
        status_code=raw_response.status_code,
        body=raw_response.text,
        raw_response=raw_response,
    )

AccessToken

Bases: BaseModel

Access Token response object.

Attributes:

Name Type Description
id str

The access token id.

name str

The access token name.

token str

The access token string.

created_at AwareDatetime

The access token creation date.

ConsumeEventResponse

Bases: BaseModel

Response from consume event

Attributes:

Name Type Description
status_code int

HTTP status code

body ConsumeOutputEvent

Body of the response

event

event() -> Any

Return event response.

Source code in src/glassflow/models/responses/pipeline.py
def event(self) -> Any:
    """Return event response."""
    if self.body:
        return self.body.response
    return None

ConsumeFailedResponse

Bases: BaseModel

Response from consuming failed event

Attributes:

Name Type Description
status_code int

HTTP status code

body ConsumeOutputEvent | None

ConsumeOutputEvent

event

event() -> Any

Return failed event response.

Source code in src/glassflow/models/responses/pipeline.py
def event(self) -> Any:
    """Return failed event response."""
    if self.body:
        return self.body.response
    return None

ConsumeOutputEvent

Bases: BaseModel

Consume output event

Attributes:

Name Type Description
payload Any

Payload

event_context EventContext

Event context

status str

Status

response Any

request response

error_details str

Error details

stack_trace str

Error Stack trace

FunctionLogEntry

Bases: BaseModel

Logs entry response object.

Attributes:

Name Type Description
level int

Log level.

severity_code int

Log severity code.

timestamp AwareDatetime

Log timestamp.

payload Payload

Log payload.

FunctionLogsResponse

Bases: BaseModel

Response for a function's logs endpoint.

Attributes:

Name Type Description
logs list[FunctionLogEntry]

list of logs

next str

ID used to retrieve next page of logs

ListAccessTokensResponse

Bases: BaseModel

Response for listing access tokens endpoint.

Attributes:

Name Type Description
total_amount int

Total amount of access tokens.

access_tokens list[AccessToken]

List of access tokens.

ListPipelinesResponse

Bases: BaseModel

Response for list pipelines endpoint

Attributes:

Name Type Description
total_amount int

Total amount of pipelines.

pipelines list[SpacePipeline]

List of pipelines.

PublishEventResponse

Bases: BaseModel

Response from publishing event

Attributes:

Name Type Description
status_code int

HTTP status code

TestFunctionResponse

Bases: ConsumeOutputEvent

Response for Test function endpoint.

ListSecretsResponse

Bases: BaseModel

Response from the list secrets endpoint.

Attributes:

Name Type Description
total_amount int

Total amount of the secrets.

secrets list[Secret]

List of secrets.

Secret

Bases: BaseModel

Secret response object

Attributes:

Name Type Description
key str

Secret key

ListSpacesResponse

Bases: BaseModel

Response from list spaces endpoint.

Attributes:

Name Type Description
total_amount int

Total amount of spaces.

spaces list[Space]

List of spaces.

Space

Bases: BaseModel

Space response object.

Attributes:

Name Type Description
name str

Space name.

id int

Space id.

created_at datetime

Space creation date.

permission str

Space permission.