Skip to content

Python SDK Docs

GlassFlow Python Client to interact with GlassFlow API

GlassFlowClient

GlassFlow Client to interact with GlassFlow API and manage pipelines and other resources

Attributes:

Name Type Description
rclient

requests.Session object to make HTTP requests to GlassFlow API

glassflow_config GlassFlowConfig

GlassFlowConfig object to store configuration

organization_id

Organization ID of the user. If not provided, the default organization will be used

Source code in src/glassflow/client.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class GlassFlowClient:
    """GlassFlow Client to interact with GlassFlow API and manage pipelines and other resources

    Attributes:
        rclient: requests.Session object to make HTTP requests to GlassFlow API
        glassflow_config: GlassFlowConfig object to store configuration
        organization_id: Organization ID of the user. If not provided, the default organization will be used

    """

    glassflow_config: GlassFlowConfig

    def __init__(self, organization_id: str = None) -> None:
        """Create a new GlassFlowClient object

        Args:
            organization_id: Organization ID of the user. If not provided, the default organization will be used
        """
        rclient = requests_http.Session()
        self.glassflow_config = GlassFlowConfig(rclient)
        self.organization_id = organization_id

    def pipeline_client(
        self,
        pipeline_id: Optional[str] = None,
        pipeline_access_token: Optional[str] = None,
        space_id: Optional[str] = None,
    ) -> PipelineClient:
        """Create a new PipelineClient object to interact with a specific pipeline

        Args:
            pipeline_id: The pipeline id to interact with
            pipeline_access_token: The access token to access the pipeline

        Returns:
            PipelineClient: Client object to publish and consume events from the given pipeline.
        """
        # if no pipeline_id or pipeline_access_token is provided, try to read from environment variables
        if not pipeline_id:
            pipeline_id = os.getenv("PIPELINE_ID")
        if not pipeline_access_token:
            pipeline_access_token = os.getenv("PIPELINE_ACCESS_TOKEN")
        # no pipeline_id provided explicitly or in environment variables
        if not pipeline_id:
            raise ValueError(
                "PIPELINE_ID must be set as an environment variable or provided explicitly"
            )
        if not pipeline_access_token:
            raise ValueError(
                "PIPELINE_ACCESS_TOKEN must be set as an environment variable or provided explicitly"
            )

        return PipelineClient(
            glassflow_client=self,
            pipeline_id=pipeline_id,
            pipeline_access_token=pipeline_access_token,
        )

__init__(organization_id=None)

Create a new GlassFlowClient object

Parameters:

Name Type Description Default
organization_id str

Organization ID of the user. If not provided, the default organization will be used

None
Source code in src/glassflow/client.py
24
25
26
27
28
29
30
31
32
def __init__(self, organization_id: str = None) -> None:
    """Create a new GlassFlowClient object

    Args:
        organization_id: Organization ID of the user. If not provided, the default organization will be used
    """
    rclient = requests_http.Session()
    self.glassflow_config = GlassFlowConfig(rclient)
    self.organization_id = organization_id

pipeline_client(pipeline_id=None, pipeline_access_token=None, space_id=None)

Create a new PipelineClient object to interact with a specific pipeline

Parameters:

Name Type Description Default
pipeline_id Optional[str]

The pipeline id to interact with

None
pipeline_access_token Optional[str]

The access token to access the pipeline

None

Returns:

Name Type Description
PipelineClient PipelineClient

Client object to publish and consume events from the given pipeline.

Source code in src/glassflow/client.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def pipeline_client(
    self,
    pipeline_id: Optional[str] = None,
    pipeline_access_token: Optional[str] = None,
    space_id: Optional[str] = None,
) -> PipelineClient:
    """Create a new PipelineClient object to interact with a specific pipeline

    Args:
        pipeline_id: The pipeline id to interact with
        pipeline_access_token: The access token to access the pipeline

    Returns:
        PipelineClient: Client object to publish and consume events from the given pipeline.
    """
    # if no pipeline_id or pipeline_access_token is provided, try to read from environment variables
    if not pipeline_id:
        pipeline_id = os.getenv("PIPELINE_ID")
    if not pipeline_access_token:
        pipeline_access_token = os.getenv("PIPELINE_ACCESS_TOKEN")
    # no pipeline_id provided explicitly or in environment variables
    if not pipeline_id:
        raise ValueError(
            "PIPELINE_ID must be set as an environment variable or provided explicitly"
        )
    if not pipeline_access_token:
        raise ValueError(
            "PIPELINE_ACCESS_TOKEN must be set as an environment variable or provided explicitly"
        )

    return PipelineClient(
        glassflow_client=self,
        pipeline_id=pipeline_id,
        pipeline_access_token=pipeline_access_token,
    )

PipelineClient

Client object to publish and consume events from the given pipeline.

Attributes:

Name Type Description
glassflow_client

GlassFlowClient object to interact with GlassFlow API

pipeline_id

The pipeline id to interact with

organization_id

Organization ID of the user. If not provided, the default organization will be used

pipeline_access_token

The access token to access the pipeline

Source code in src/glassflow/pipelines.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
class PipelineClient:
    """Client object to publish and consume events from the given pipeline.

    Attributes:
        glassflow_client: GlassFlowClient object to interact with GlassFlow API
        pipeline_id: The pipeline id to interact with
        organization_id: Organization ID of the user. If not provided, the default organization will be used
        pipeline_access_token: The access token to access the pipeline
    """

    def __init__(
        self, glassflow_client, pipeline_id: str, pipeline_access_token: str
    ) -> None:
        """Create a new PipelineClient object to interact with a specific pipeline

        Args:
            glassflow_client: GlassFlowClient object to interact with GlassFlow API
            pipeline_id: The pipeline id to interact with
            pipeline_access_token: The access token to access the pipeline
        """
        self.glassflow_client = glassflow_client
        self.pipeline_id = pipeline_id
        self.organization_id = self.glassflow_client.organization_id
        self.pipeline_access_token = pipeline_access_token
        # retry delay for consuming messages (in seconds)
        self._consume_retry_delay_minimum = 1
        self._consume_retry_delay_current = 1
        self._consume_retry_delay_max = 60

    def is_access_token_valid(self) -> bool:
        """
        Check if the pipeline access token is valid

        Returns:
            Boolean: True if the pipeline access token is correct, False otherwise
        """
        base_url = self.glassflow_client.glassflow_config.server_url

        request = operations.StatusAccessTokenRequest(
            pipeline_id=self.pipeline_id,
            x_pipeline_access_token=self.pipeline_access_token,
        )

        url = utils.generate_url(
            operations.PublishEventRequest,
            base_url,
            "/pipelines/{pipeline_id}/status/access_token",
            request,
        )

        headers = self._get_headers(request)

        client = self.glassflow_client.glassflow_config.client

        http_res = client.request("GET", url, headers=headers)
        content_type = http_res.headers.get("Content-Type")

        if http_res.status_code == 200:
            res = True
        elif http_res.status_code == 401:
            res = False
        elif http_res.status_code in [400, 500]:
            if utils.match_content_type(content_type, "application/json"):
                out = utils.unmarshal_json(http_res.text, errors.Error)
                out.raw_response = http_res
                raise out
            else:
                raise errors.ClientError(
                    f"unknown content-type received: {content_type}",
                    http_res.status_code,
                    http_res.text,
                    http_res,
                )
        elif 400 < http_res.status_code < 600:
            raise errors.ClientError(
                "API error occurred", http_res.status_code, http_res.text, http_res
            )
        return res

    def is_valid(self) -> bool:
        """
        Check if the pipeline exists and credentials are valid

        Returns:
            Boolean: True if the pipeline exists and credentials are valid, False otherwise
        """
        try:
            return self.is_access_token_valid()
        except errors.ClientError as e:
            if e.status_code == 404:
                return False
            else:
                raise e

    def publish(self, request_body: dict) -> operations.PublishEventResponse:
        """Push a new message into the pipeline

        Args:
            request_body: The message to be published into the pipeline

        Returns:
            PublishEventResponse: Response object containing the status code and the raw response

        Raises:
            ClientError: If an error occurred while publishing the event
        """
        request = operations.PublishEventRequest(
            organization_id=self.organization_id,
            pipeline_id=self.pipeline_id,
            x_pipeline_access_token=self.pipeline_access_token,
            request_body=request_body,
        )

        base_url = self.glassflow_client.glassflow_config.server_url

        url = utils.generate_url(
            operations.PublishEventRequest,
            base_url,
            "/pipelines/{pipeline_id}/topics/input/events",
            request,
        )

        req_content_type, data, form = utils.serialize_request_body(
            request, operations.PublishEventRequest, "request_body", False, True, "json"
        )

        headers = self._get_headers(request, req_content_type)
        query_params = utils.get_query_params(operations.PublishEventRequest, request)

        client = self.glassflow_client.glassflow_config.client

        http_res = client.request(
            "POST", url, params=query_params, data=data, files=form, headers=headers
        )
        content_type = http_res.headers.get("Content-Type")

        res = operations.PublishEventResponse(
            status_code=http_res.status_code,
            content_type=content_type,
            raw_response=http_res,
        )

        if http_res.status_code == 200:
            pass
        elif http_res.status_code in [400, 500]:
            if utils.match_content_type(content_type, "application/json"):
                out = utils.unmarshal_json(http_res.text, errors.Error)
                out.raw_response = http_res
                raise out
            else:
                raise errors.ClientError(
                    f"unknown content-type received: {content_type}",
                    http_res.status_code,
                    http_res.text,
                    http_res,
                )
        elif 400 < http_res.status_code < 600:
            raise errors.ClientError(
                "API error occurred", http_res.status_code, http_res.text, http_res
            )

        return res

    def consume(self) -> operations.ConsumeEventResponse:
        """Consume the last message from the pipeline

        Returns:
            ConsumeEventResponse: Response object containing the status code and the raw response

        Raises:
            ClientError: If an error occurred while consuming the event

        """
        request = operations.ConsumeEventRequest(
            pipeline_id=self.pipeline_id,
            organization_id=self.organization_id,
            x_pipeline_access_token=self.pipeline_access_token,
        )

        base_url = self.glassflow_client.glassflow_config.server_url

        url = utils.generate_url(
            operations.ConsumeEventRequest,
            base_url,
            "/pipelines/{pipeline_id}/topics/output/events/consume",
            request,
        )
        headers = self._get_headers(request)
        query_params = utils.get_query_params(operations.ConsumeEventRequest, request)

        client = self.glassflow_client.glassflow_config.client
        # make the request
        self._respect_retry_delay()

        http_res = client.request("POST", url, params=query_params, headers=headers)
        content_type = http_res.headers.get("Content-Type")

        res = operations.ConsumeEventResponse(
            status_code=http_res.status_code,
            content_type=content_type,
            raw_response=http_res,
        )

        self._update_retry_delay(http_res.status_code)
        if http_res.status_code == 200:
            self._consume_retry_delay_current = self._consume_retry_delay_minimum
            if utils.match_content_type(content_type, "application/json"):
                body = utils.unmarshal_json(
                    http_res.text, Optional[operations.ConsumeEventResponseBody]
                )
                res.body = body
            else:
                raise errors.ClientError(
                    f"unknown content-type received: {content_type}",
                    http_res.status_code,
                    http_res.text,
                    http_res,
                )
        elif http_res.status_code == 204:
            # No messages to be consumed.
            # update the retry delay
            # Return an empty response body
            body = operations.ConsumeEventResponseBody("", "", {})
            res.body = body
        elif http_res.status_code == 429:
            # update the retry delay
            body = operations.ConsumeEventResponseBody("", "", {})
            res.body = body
        elif http_res.status_code in [400, 500]:
            if utils.match_content_type(content_type, "application/json"):
                out = utils.unmarshal_json(http_res.text, errors.Error)
                out.raw_response = http_res
                raise out
            else:
                raise errors.ClientError(
                    f"unknown content-type received: {content_type}",
                    http_res.status_code,
                    http_res.text,
                    http_res,
                )
        elif 400 < http_res.status_code < 600:
            raise errors.ClientError(
                "API error occurred", http_res.status_code, http_res.text, http_res
            )

        return res

    def consume_failed(self) -> operations.ConsumeFailedResponse:
        """Consume the failed message from the pipeline

        Returns:
            ConsumeFailedResponse: Response object containing the status code and the raw response

        Raises:
            ClientError: If an error occurred while consuming the event

        """
        request = operations.ConsumeFailedRequest(
            pipeline_id=self.pipeline_id,
            organization_id=self.organization_id,
            x_pipeline_access_token=self.pipeline_access_token,
        )

        base_url = self.glassflow_client.glassflow_config.server_url

        url = utils.generate_url(
            operations.ConsumeFailedRequest,
            base_url,
            "/pipelines/{pipeline_id}/topics/failed/events/consume",
            request,
        )
        headers = self._get_headers(request)
        query_params = utils.get_query_params(operations.ConsumeFailedRequest, request)

        client = self.glassflow_client.glassflow_config.client
        self._respect_retry_delay()
        http_res = client.request("POST", url, params=query_params, headers=headers)
        content_type = http_res.headers.get("Content-Type")

        res = operations.ConsumeFailedResponse(
            status_code=http_res.status_code,
            content_type=content_type,
            raw_response=http_res,
        )

        self._update_retry_delay(http_res.status_code)
        if http_res.status_code == 200:
            if utils.match_content_type(content_type, "application/json"):
                body = utils.unmarshal_json(
                    http_res.text, Optional[operations.ConsumeFailedResponseBody]
                )
                res.body = body
            else:
                raise errors.ClientError(
                    f"unknown content-type received: {content_type}",
                    http_res.status_code,
                    http_res.text,
                    http_res,
                )
        elif http_res.status_code == 204:
            # No messages to be consumed. Return an empty response body
            body = operations.ConsumeFailedResponseBody("", "", {})
            res.body = body
        elif http_res.status_code in [400, 500]:
            if utils.match_content_type(content_type, "application/json"):
                out = utils.unmarshal_json(http_res.text, errors.Error)
                out.raw_response = http_res
                raise out
            else:
                raise errors.ClientError(
                    f"unknown content-type received: {content_type}",
                    http_res.status_code,
                    http_res.text,
                    http_res,
                )
        elif 400 < http_res.status_code < 600:
            raise errors.ClientError(
                "API error occurred", http_res.status_code, http_res.text, http_res
            )

        return res

    def _get_headers(
        self, request: dataclasses.dataclass, req_content_type: Optional[str] = None
    ) -> dict:
        headers = utils.get_req_specific_headers(request)
        headers["Accept"] = "application/json"
        headers["Gf-Client"] = self.glassflow_client.glassflow_config.glassflow_client
        headers["User-Agent"] = self.glassflow_client.glassflow_config.user_agent
        headers["Gf-Python-Version"] = (
            f"{sys.version_info.major}."
            f"{sys.version_info.minor}."
            f"{sys.version_info.micro}"
        )

        if req_content_type and req_content_type not in (
            "multipart/form-data",
            "multipart/mixed",
        ):
            headers["content-type"] = req_content_type

        return headers

    def _update_retry_delay(self, status_code: int):
        if status_code == 200:
            self._consume_retry_delay_current = self._consume_retry_delay_minimum
        elif status_code == 204 or status_code == 429:
            self._consume_retry_delay_current *= 2
            self._consume_retry_delay_current = min(
                self._consume_retry_delay_current, self._consume_retry_delay_max
            )
            self._consume_retry_delay_current += random.uniform(0, 0.1)

    def _respect_retry_delay(self):
        if self._consume_retry_delay_current > self._consume_retry_delay_minimum:
            # sleep before making the request
            time.sleep(self._consume_retry_delay_current)

__init__(glassflow_client, pipeline_id, pipeline_access_token)

Create a new PipelineClient object to interact with a specific pipeline

Parameters:

Name Type Description Default
glassflow_client

GlassFlowClient object to interact with GlassFlow API

required
pipeline_id str

The pipeline id to interact with

required
pipeline_access_token str

The access token to access the pipeline

required
Source code in src/glassflow/pipelines.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(
    self, glassflow_client, pipeline_id: str, pipeline_access_token: str
) -> None:
    """Create a new PipelineClient object to interact with a specific pipeline

    Args:
        glassflow_client: GlassFlowClient object to interact with GlassFlow API
        pipeline_id: The pipeline id to interact with
        pipeline_access_token: The access token to access the pipeline
    """
    self.glassflow_client = glassflow_client
    self.pipeline_id = pipeline_id
    self.organization_id = self.glassflow_client.organization_id
    self.pipeline_access_token = pipeline_access_token
    # retry delay for consuming messages (in seconds)
    self._consume_retry_delay_minimum = 1
    self._consume_retry_delay_current = 1
    self._consume_retry_delay_max = 60

consume()

Consume the last message from the pipeline

Returns:

Name Type Description
ConsumeEventResponse ConsumeEventResponse

Response object containing the status code and the raw response

Raises:

Type Description
ClientError

If an error occurred while consuming the event

Source code in src/glassflow/pipelines.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def consume(self) -> operations.ConsumeEventResponse:
    """Consume the last message from the pipeline

    Returns:
        ConsumeEventResponse: Response object containing the status code and the raw response

    Raises:
        ClientError: If an error occurred while consuming the event

    """
    request = operations.ConsumeEventRequest(
        pipeline_id=self.pipeline_id,
        organization_id=self.organization_id,
        x_pipeline_access_token=self.pipeline_access_token,
    )

    base_url = self.glassflow_client.glassflow_config.server_url

    url = utils.generate_url(
        operations.ConsumeEventRequest,
        base_url,
        "/pipelines/{pipeline_id}/topics/output/events/consume",
        request,
    )
    headers = self._get_headers(request)
    query_params = utils.get_query_params(operations.ConsumeEventRequest, request)

    client = self.glassflow_client.glassflow_config.client
    # make the request
    self._respect_retry_delay()

    http_res = client.request("POST", url, params=query_params, headers=headers)
    content_type = http_res.headers.get("Content-Type")

    res = operations.ConsumeEventResponse(
        status_code=http_res.status_code,
        content_type=content_type,
        raw_response=http_res,
    )

    self._update_retry_delay(http_res.status_code)
    if http_res.status_code == 200:
        self._consume_retry_delay_current = self._consume_retry_delay_minimum
        if utils.match_content_type(content_type, "application/json"):
            body = utils.unmarshal_json(
                http_res.text, Optional[operations.ConsumeEventResponseBody]
            )
            res.body = body
        else:
            raise errors.ClientError(
                f"unknown content-type received: {content_type}",
                http_res.status_code,
                http_res.text,
                http_res,
            )
    elif http_res.status_code == 204:
        # No messages to be consumed.
        # update the retry delay
        # Return an empty response body
        body = operations.ConsumeEventResponseBody("", "", {})
        res.body = body
    elif http_res.status_code == 429:
        # update the retry delay
        body = operations.ConsumeEventResponseBody("", "", {})
        res.body = body
    elif http_res.status_code in [400, 500]:
        if utils.match_content_type(content_type, "application/json"):
            out = utils.unmarshal_json(http_res.text, errors.Error)
            out.raw_response = http_res
            raise out
        else:
            raise errors.ClientError(
                f"unknown content-type received: {content_type}",
                http_res.status_code,
                http_res.text,
                http_res,
            )
    elif 400 < http_res.status_code < 600:
        raise errors.ClientError(
            "API error occurred", http_res.status_code, http_res.text, http_res
        )

    return res

consume_failed()

Consume the failed message from the pipeline

Returns:

Name Type Description
ConsumeFailedResponse ConsumeFailedResponse

Response object containing the status code and the raw response

Raises:

Type Description
ClientError

If an error occurred while consuming the event

Source code in src/glassflow/pipelines.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def consume_failed(self) -> operations.ConsumeFailedResponse:
    """Consume the failed message from the pipeline

    Returns:
        ConsumeFailedResponse: Response object containing the status code and the raw response

    Raises:
        ClientError: If an error occurred while consuming the event

    """
    request = operations.ConsumeFailedRequest(
        pipeline_id=self.pipeline_id,
        organization_id=self.organization_id,
        x_pipeline_access_token=self.pipeline_access_token,
    )

    base_url = self.glassflow_client.glassflow_config.server_url

    url = utils.generate_url(
        operations.ConsumeFailedRequest,
        base_url,
        "/pipelines/{pipeline_id}/topics/failed/events/consume",
        request,
    )
    headers = self._get_headers(request)
    query_params = utils.get_query_params(operations.ConsumeFailedRequest, request)

    client = self.glassflow_client.glassflow_config.client
    self._respect_retry_delay()
    http_res = client.request("POST", url, params=query_params, headers=headers)
    content_type = http_res.headers.get("Content-Type")

    res = operations.ConsumeFailedResponse(
        status_code=http_res.status_code,
        content_type=content_type,
        raw_response=http_res,
    )

    self._update_retry_delay(http_res.status_code)
    if http_res.status_code == 200:
        if utils.match_content_type(content_type, "application/json"):
            body = utils.unmarshal_json(
                http_res.text, Optional[operations.ConsumeFailedResponseBody]
            )
            res.body = body
        else:
            raise errors.ClientError(
                f"unknown content-type received: {content_type}",
                http_res.status_code,
                http_res.text,
                http_res,
            )
    elif http_res.status_code == 204:
        # No messages to be consumed. Return an empty response body
        body = operations.ConsumeFailedResponseBody("", "", {})
        res.body = body
    elif http_res.status_code in [400, 500]:
        if utils.match_content_type(content_type, "application/json"):
            out = utils.unmarshal_json(http_res.text, errors.Error)
            out.raw_response = http_res
            raise out
        else:
            raise errors.ClientError(
                f"unknown content-type received: {content_type}",
                http_res.status_code,
                http_res.text,
                http_res,
            )
    elif 400 < http_res.status_code < 600:
        raise errors.ClientError(
            "API error occurred", http_res.status_code, http_res.text, http_res
        )

    return res

is_access_token_valid()

Check if the pipeline access token is valid

Returns:

Name Type Description
Boolean bool

True if the pipeline access token is correct, False otherwise

Source code in src/glassflow/pipelines.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def is_access_token_valid(self) -> bool:
    """
    Check if the pipeline access token is valid

    Returns:
        Boolean: True if the pipeline access token is correct, False otherwise
    """
    base_url = self.glassflow_client.glassflow_config.server_url

    request = operations.StatusAccessTokenRequest(
        pipeline_id=self.pipeline_id,
        x_pipeline_access_token=self.pipeline_access_token,
    )

    url = utils.generate_url(
        operations.PublishEventRequest,
        base_url,
        "/pipelines/{pipeline_id}/status/access_token",
        request,
    )

    headers = self._get_headers(request)

    client = self.glassflow_client.glassflow_config.client

    http_res = client.request("GET", url, headers=headers)
    content_type = http_res.headers.get("Content-Type")

    if http_res.status_code == 200:
        res = True
    elif http_res.status_code == 401:
        res = False
    elif http_res.status_code in [400, 500]:
        if utils.match_content_type(content_type, "application/json"):
            out = utils.unmarshal_json(http_res.text, errors.Error)
            out.raw_response = http_res
            raise out
        else:
            raise errors.ClientError(
                f"unknown content-type received: {content_type}",
                http_res.status_code,
                http_res.text,
                http_res,
            )
    elif 400 < http_res.status_code < 600:
        raise errors.ClientError(
            "API error occurred", http_res.status_code, http_res.text, http_res
        )
    return res

is_valid()

Check if the pipeline exists and credentials are valid

Returns:

Name Type Description
Boolean bool

True if the pipeline exists and credentials are valid, False otherwise

Source code in src/glassflow/pipelines.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def is_valid(self) -> bool:
    """
    Check if the pipeline exists and credentials are valid

    Returns:
        Boolean: True if the pipeline exists and credentials are valid, False otherwise
    """
    try:
        return self.is_access_token_valid()
    except errors.ClientError as e:
        if e.status_code == 404:
            return False
        else:
            raise e

publish(request_body)

Push a new message into the pipeline

Parameters:

Name Type Description Default
request_body dict

The message to be published into the pipeline

required

Returns:

Name Type Description
PublishEventResponse PublishEventResponse

Response object containing the status code and the raw response

Raises:

Type Description
ClientError

If an error occurred while publishing the event

Source code in src/glassflow/pipelines.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def publish(self, request_body: dict) -> operations.PublishEventResponse:
    """Push a new message into the pipeline

    Args:
        request_body: The message to be published into the pipeline

    Returns:
        PublishEventResponse: Response object containing the status code and the raw response

    Raises:
        ClientError: If an error occurred while publishing the event
    """
    request = operations.PublishEventRequest(
        organization_id=self.organization_id,
        pipeline_id=self.pipeline_id,
        x_pipeline_access_token=self.pipeline_access_token,
        request_body=request_body,
    )

    base_url = self.glassflow_client.glassflow_config.server_url

    url = utils.generate_url(
        operations.PublishEventRequest,
        base_url,
        "/pipelines/{pipeline_id}/topics/input/events",
        request,
    )

    req_content_type, data, form = utils.serialize_request_body(
        request, operations.PublishEventRequest, "request_body", False, True, "json"
    )

    headers = self._get_headers(request, req_content_type)
    query_params = utils.get_query_params(operations.PublishEventRequest, request)

    client = self.glassflow_client.glassflow_config.client

    http_res = client.request(
        "POST", url, params=query_params, data=data, files=form, headers=headers
    )
    content_type = http_res.headers.get("Content-Type")

    res = operations.PublishEventResponse(
        status_code=http_res.status_code,
        content_type=content_type,
        raw_response=http_res,
    )

    if http_res.status_code == 200:
        pass
    elif http_res.status_code in [400, 500]:
        if utils.match_content_type(content_type, "application/json"):
            out = utils.unmarshal_json(http_res.text, errors.Error)
            out.raw_response = http_res
            raise out
        else:
            raise errors.ClientError(
                f"unknown content-type received: {content_type}",
                http_res.status_code,
                http_res.text,
                http_res,
            )
    elif 400 < http_res.status_code < 600:
        raise errors.ClientError(
            "API error occurred", http_res.status_code, http_res.text, http_res
        )

    return res

GlassFlowConfig dataclass

Configuration object for GlassFlowClient

Attributes:

Name Type Description
client Session

requests.Session object to interact with the GlassFlow API

server_url str

The base URL of the GlassFlow API

sdk_version str

The version of the GlassFlow Python SDK

user_agent str

The user agent to be used in the requests

Source code in src/glassflow/config.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@dataclass
class GlassFlowConfig:
    """Configuration object for GlassFlowClient

    Attributes:
        client: requests.Session object to interact with the GlassFlow API
        server_url: The base URL of the GlassFlow API
        sdk_version: The version of the GlassFlow Python SDK
        user_agent: The user agent to be used in the requests

    """

    client: requests.Session
    server_url: str = "https://api.glassflow.dev/v1"
    sdk_version: str = version("glassflow")
    user_agent: str = "glassflow-python-sdk/{}".format(sdk_version)
    glassflow_client: str = "python-sdk/{}".format(sdk_version)

ClientError

Bases: Exception

Represents an error returned by the API.

Attributes:

Name Type Description
detail str

A message describing the error

status_code int

The status code of the response

body str

The response body

raw_response Response

The raw response object

Source code in src/glassflow/models/errors/clienterror.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class ClientError(Exception):
    """Represents an error returned by the API.

    Attributes:
        detail: A message describing the error
        status_code: The status code of the response
        body: The response body
        raw_response: The raw response object

    """

    detail: str
    status_code: int
    body: str
    raw_response: requests_http.Response

    def __init__(
        self,
        detail: str,
        status_code: int,
        body: str,
        raw_response: requests_http.Response,
    ):
        """Create a new ClientError object

        Args:
            detail: A message describing the error
            status_code: The status code of the response
            body: The response body
            raw_response: The raw response object
        """
        self.detail = detail
        self.status_code = status_code
        self.body = body
        self.raw_response = raw_response

    def __str__(self):
        """Return a string representation of the error

        Returns:
            str: The string representation of the error

        """
        body = ""
        if len(self.body) > 0:
            body = f"\n{self.body}"

        return f"{self.detail}: Status {self.status_code}{body}"

__init__(detail, status_code, body, raw_response)

Create a new ClientError object

Parameters:

Name Type Description Default
detail str

A message describing the error

required
status_code int

The status code of the response

required
body str

The response body

required
raw_response Response

The raw response object

required
Source code in src/glassflow/models/errors/clienterror.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    detail: str,
    status_code: int,
    body: str,
    raw_response: requests_http.Response,
):
    """Create a new ClientError object

    Args:
        detail: A message describing the error
        status_code: The status code of the response
        body: The response body
        raw_response: The raw response object
    """
    self.detail = detail
    self.status_code = status_code
    self.body = body
    self.raw_response = raw_response

__str__()

Return a string representation of the error

Returns:

Name Type Description
str

The string representation of the error

Source code in src/glassflow/models/errors/clienterror.py
40
41
42
43
44
45
46
47
48
49
50
51
def __str__(self):
    """Return a string representation of the error

    Returns:
        str: The string representation of the error

    """
    body = ""
    if len(self.body) > 0:
        body = f"\n{self.body}"

    return f"{self.detail}: Status {self.status_code}{body}"

Error dataclass

Bases: Exception

Bad request error response

Attributes:

Name Type Description
message

A message describing the error

Source code in src/glassflow/models/errors/error.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@dataclass_json(undefined=Undefined.EXCLUDE)
@dataclasses.dataclass
class Error(Exception):
    """Bad request error response

    Attributes:
        message: A message describing the error

    """

    detail: str = dataclasses.field(
        metadata={"dataclasses_json": {"letter_case": utils.get_field_name("detail")}}
    )

    def __str__(self) -> str:
        return utils.marshal_json(self, type(self))

ConsumeEventRequest dataclass

Request to consume an event from a pipeline topic

Attributes:

Name Type Description
pipeline_id str

The id of the pipeline

organization_id Optional[str]

The id of the organization

x_pipeline_access_token str

The access token of the pipeline

Source code in src/glassflow/models/operations/consumeevent.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@dataclasses.dataclass
class ConsumeEventRequest:
    """Request to consume an event from a pipeline topic

    Attributes:
        pipeline_id: The id of the pipeline
        organization_id: The id of the organization
        x_pipeline_access_token: The access token of the pipeline

    """

    pipeline_id: str = dataclasses.field(
        metadata={
            "path_param": {
                "field_name": "pipeline_id",
                "style": "simple",
                "explode": False,
            }
        }
    )
    organization_id: Optional[str] = dataclasses.field(
        default=None,
        metadata={
            "query_param": {
                "field_name": "organization_id",
                "style": "form",
                "explode": True,
            }
        },
    )
    x_pipeline_access_token: str = dataclasses.field(
        default=None,
        metadata={
            "header": {
                "field_name": "X-PIPELINE-ACCESS-TOKEN",
                "style": "simple",
                "explode": False,
            }
        },
    )

ConsumeEventResponse dataclass

Response to consume an event from a pipeline topic

Attributes:

Name Type Description
content_type str

HTTP response content type for this operation

status_code int

HTTP response status code for this operation

raw_response Response

Raw HTTP response; suitable for custom response parsing

body Optional[ConsumeEventResponseBody]

the response body from the api call

Source code in src/glassflow/models/operations/consumeevent.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@dataclasses.dataclass
class ConsumeEventResponse:
    """Response to consume an event from a pipeline topic

    Attributes:
        content_type: HTTP response content type for this operation
        status_code: HTTP response status code for this operation
        raw_response: Raw HTTP response; suitable for custom response parsing
        body: the response body from the api call

    """

    content_type: str = dataclasses.field()
    status_code: int = dataclasses.field()
    raw_response: requests_http.Response = dataclasses.field()
    body: Optional[ConsumeEventResponseBody] = dataclasses.field(default=None)

    def json(self):
        """Return the response body as a JSON object.
        This method is to have cmopatibility with the requests.Response.json() method

        Returns:
            dict: The transformed event as a JSON object
        """
        return self.body.event

json()

Return the response body as a JSON object. This method is to have cmopatibility with the requests.Response.json() method

Returns:

Name Type Description
dict

The transformed event as a JSON object

Source code in src/glassflow/models/operations/consumeevent.py
88
89
90
91
92
93
94
95
def json(self):
    """Return the response body as a JSON object.
    This method is to have cmopatibility with the requests.Response.json() method

    Returns:
        dict: The transformed event as a JSON object
    """
    return self.body.event

ConsumeEventResponseBody dataclass

Event response body after transformation

Attributes:

Name Type Description
req_id str

The request id

receive_time str

The time when the event was received

event dict

The event received

Source code in src/glassflow/models/operations/consumeevent.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@dataclass_json
@dataclasses.dataclass
class ConsumeEventResponseBody:
    """Event response body after transformation

    Attributes:
        req_id: The request id
        receive_time: The time when the event was received
        event: The event received

    """

    req_id: str = dataclasses.field()
    receive_time: str = dataclasses.field()
    event: dict = dataclasses.field(metadata=config(field_name="response"))

ConsumeFailedRequest dataclass

Request to consume failed events from a pipeline

Attributes:

Name Type Description
pipeline_id str

The id of the pipeline

organization_id Optional[str]

The id of the organization

x_pipeline_access_token str

The access token of the pipeline

Source code in src/glassflow/models/operations/consumefailed.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@dataclasses.dataclass
class ConsumeFailedRequest:
    """Request to consume failed events from a pipeline

    Attributes:
        pipeline_id: The id of the pipeline
        organization_id: The id of the organization
        x_pipeline_access_token: The access token of the pipeline

    """

    pipeline_id: str = dataclasses.field(
        metadata={
            "path_param": {
                "field_name": "pipeline_id",
                "style": "simple",
                "explode": False,
            }
        }
    )
    organization_id: Optional[str] = dataclasses.field(
        default=None,
        metadata={
            "query_param": {
                "field_name": "organization_id",
                "style": "form",
                "explode": True,
            }
        },
    )
    x_pipeline_access_token: str = dataclasses.field(
        default=None,
        metadata={
            "header": {
                "field_name": "X-PIPELINE-ACCESS-TOKEN",
                "style": "simple",
                "explode": False,
            }
        },
    )

ConsumeFailedResponse dataclass

Response to consume an failed event from a pipeline

Attributes:

Name Type Description
content_type str

HTTP response content type for this operation

status_code int

HTTP response status code for this operation

raw_response Response

Raw HTTP response; suitable for custom response parsing

body Optional[ConsumeFailedResponseBody]

the response body from the api call

Source code in src/glassflow/models/operations/consumefailed.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@dataclasses.dataclass
class ConsumeFailedResponse:
    """Response to consume an failed event from a pipeline

    Attributes:
        content_type: HTTP response content type for this operation
        status_code: HTTP response status code for this operation
        raw_response: Raw HTTP response; suitable for custom response parsing
        body: the response body from the api call

    """

    content_type: str = dataclasses.field()
    status_code: int = dataclasses.field()
    raw_response: requests_http.Response = dataclasses.field()
    body: Optional[ConsumeFailedResponseBody] = dataclasses.field(default=None)

    def json(self):
        """Return the response body as a JSON object.
        This method is to have cmopatibility with the requests.Response.json() method

        Returns:
            dict: The transformed event as a JSON object
        """
        return self.body.event

json()

Return the response body as a JSON object. This method is to have cmopatibility with the requests.Response.json() method

Returns:

Name Type Description
dict

The transformed event as a JSON object

Source code in src/glassflow/models/operations/consumefailed.py
88
89
90
91
92
93
94
95
def json(self):
    """Return the response body as a JSON object.
    This method is to have cmopatibility with the requests.Response.json() method

    Returns:
        dict: The transformed event as a JSON object
    """
    return self.body.event

ConsumeFailedResponseBody dataclass

Event response body after transformation

Attributes:

Name Type Description
req_id str

The request id

receive_time str

The time when the event was received

event dict

The event received

Source code in src/glassflow/models/operations/consumefailed.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@dataclass_json
@dataclasses.dataclass
class ConsumeFailedResponseBody:
    """Event response body after transformation

    Attributes:
        req_id: The request id
        receive_time: The time when the event was received
        event: The event received

    """

    req_id: str = dataclasses.field()
    receive_time: str = dataclasses.field()
    event: dict = dataclasses.field(metadata=config(field_name="payload"))

PublishEventRequest dataclass

Request to publish an event to a pipeline topic

Attributes:

Name Type Description
pipeline_id str

The id of the pipeline

organization_id Optional[str]

The id of the organization

x_pipeline_access_token str

The access token of the pipeline

request_body dict

The request body / event that should be published to the pipeline

Source code in src/glassflow/models/operations/publishevent.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@dataclasses.dataclass
class PublishEventRequest:
    """Request to publish an event to a pipeline topic

    Attributes:
        pipeline_id: The id of the pipeline
        organization_id: The id of the organization
        x_pipeline_access_token: The access token of the pipeline
        request_body: The request body / event that should be published to the pipeline
    """

    pipeline_id: str = dataclasses.field(
        metadata={
            "path_param": {
                "field_name": "pipeline_id",
                "style": "simple",
                "explode": False,
            }
        }
    )
    organization_id: Optional[str] = dataclasses.field(
        default=None,
        metadata={
            "query_param": {
                "field_name": "organization_id",
                "style": "form",
                "explode": True,
            }
        },
    )
    x_pipeline_access_token: str = dataclasses.field(
        default=None,
        metadata={
            "header": {
                "field_name": "X-PIPELINE-ACCESS-TOKEN",
                "style": "simple",
                "explode": False,
            }
        },
    )
    request_body: dict = dataclasses.field(
        default=None, metadata={"request": {"media_type": "application/json"}}
    )

PublishEventResponse dataclass

Response object for publish event operation

Attributes:

Name Type Description
content_type str

HTTP response content type for this operation

status_code int

HTTP response status code for this operation

raw_response Response

Raw HTTP response; suitable for custom response parsing

object Optional[PublishEventResponseBody]

Response to the publish operation

Source code in src/glassflow/models/operations/publishevent.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@dataclasses.dataclass
class PublishEventResponse:
    """Response object for publish event operation

    Attributes:
        content_type: HTTP response content type for this operation
        status_code: HTTP response status code for this operation
        raw_response: Raw HTTP response; suitable for custom response parsing
        object: Response to the publish operation

    """

    content_type: str = dataclasses.field()
    status_code: int = dataclasses.field()
    raw_response: requests_http.Response = dataclasses.field()
    object: Optional[PublishEventResponseBody] = dataclasses.field(default=None)

PublishEventResponseBody dataclass

Message pushed to the pipeline

Source code in src/glassflow/models/operations/publishevent.py
61
62
63
@dataclasses.dataclass
class PublishEventResponseBody:
    """Message pushed to the pipeline"""

StatusAccessTokenRequest dataclass

Request check the status of an access token

Attributes:

Name Type Description
pipeline_id str

The id of the pipeline

x_pipeline_access_token str

The access token of the pipeline

Source code in src/glassflow/models/operations/status_access_token.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@dataclasses.dataclass
class StatusAccessTokenRequest:
    """Request check the status of an access token

    Attributes:
        pipeline_id: The id of the pipeline
        x_pipeline_access_token: The access token of the pipeline

    """

    pipeline_id: str = dataclasses.field(
        metadata={
            "path_param": {
                "field_name": "pipeline_id",
                "style": "simple",
                "explode": False,
            }
        }
    )
    x_pipeline_access_token: str = dataclasses.field(
        default=None,
        metadata={
            "header": {
                "field_name": "X-PIPELINE-ACCESS-TOKEN",
                "style": "simple",
                "explode": False,
            }
        },
    )