John O'Farrell
04/03/2023, 4:28 PMarn:logs:{region}:{id}:log-group:prefect:log-stream:alpha6-matar/prefect/{log id}
during ecs flow runs. I would understand adding this permission during the setup, but I'm confused as to why it be working fine last week and suddenly experience hiccups (including a brief moment of uptime) today.Tomas Knoetze
04/03/2023, 5:59 PMState message: Submission failed. RuntimeError: Timed out after 120.73846864700317s while watching task for status {until_status or 'STOPPED'}
Is this a known issue? Any ideas how to prevent this, or at least retry the flow run in the case of an intermittent error like this?John Horn
04/03/2023, 6:05 PMCrash detected! Execution was interrupted by an unexpected exception: PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://api.prefect.cloud/api/accounts/foo_account/workspaces/foo_workspace/flow_runs/foo_flow_run_id/set_state>'
Response: {'exception_message': 'Internal Server Error'}
For more information check: <https://httpstatuses.com/500>
Sam Garvis
04/03/2023, 6:29 PMjustabill
Andrew Pruchinski
04/03/2023, 6:50 PMrun_deployment
to kick off one of our deployed flows after an event is triggered within a lambda. I am currently getting a 401 error. I've already added the PREFECT__CLOUD__API_KEY
. Anything else I need to do? Any documentation would help too. Thank youRoss Leung
04/03/2023, 8:54 PM@task(cache_key_fn=task_input_hash)
so the Flow() will not execute the code inside tasks that have been completed before. I did not change the inputs to the task signature, but I did add a simple time.sleep(10)
into my task code. I went away for 1-2 minutes and re-ran the task expecting a Cached(Completed()) status, but my task code ran again.
Documentation says that unless specified, the cache never expires. So I wonder if changing the task code itself (without changing the task inputs) changes the hash?TS
04/03/2023, 9:17 PMBennett Rand
04/03/2023, 10:10 PMboto3
in Prefect flows?
I have a task that I'm iterating over an input, and if I map it or .submit() -> .result()
in a for loop for each item, certain boto calls take upwards of 3 minutes to resolve on ECS (not locally).Preeti Mandole
04/04/2023, 5:45 AMAndreas Nigg
04/04/2023, 7:11 AMDeceivious
04/04/2023, 8:05 AMrequests.exceptions.ReadTimeout
Prefect attempts to cache this exception but Exceptions arent JSON serializable. So Object of type \'ReadTimeout\' is not JSON serializable\n')
is raised.
But Upon trying to retrigger the flow, this exception is raised prefect.exceptions.MissingResult: The result was not persisted and is no longer available.
I guess there is a cache record on the database but the cache key is not available in the storage location because it failed while dumping to the storage.eli yosef
04/04/2023, 12:42 PMMichael Gringauz
04/04/2023, 12:59 PMZachary Loertscher
04/04/2023, 2:12 PMFlow could not be retrieved from deployment
with a stack trace that leads to a NotImplementedError
. I have tried several variations on the --path argument, tried a public repo instead of a private - nothing works! Please help!
flow-code:
import sys
import prefect
from prefect import flow, task, get_run_logger
from utilities import AN_IMPORTED_MESSAGE
@task
def log_task(name):
logger = get_run_logger()
<http://logger.info|logger.info>("Hello %s!", name)
<http://logger.info|logger.info>("Prefect Version = %s 🚀", prefect.__version__)
logger.debug(AN_IMPORTED_MESSAGE)
@flow()
def log_flow(name: str):
log_task(name)
if __name__ == "__main__":
name = sys.argv[0]
log_flow(name)
deployment CLI command:
prefect deployment build ./src/log_flow.py:log_flow -n log-simple-test -q test -sb github-repository/test-public-repo --apply
log_flow-deployment.yaml:
###
### A complete description of a Prefect Deployment for flow 'log-flow'
###
name: log-simple-test
description: null
version: 0309032a075c5be33c8c074988fd6e41
# The work queue that will handle this deployment's runs
work_queue_name: test
work_pool_name: default-agent-pool
tags: []
parameters: {}
schedule: null
is_schedule_active: true
infra_overrides: {}
###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: log-flow
manifest_path: null
infrastructure:
type: process
env: {}
labels: {}
name: null
command: null
stream_output: true
working_dir: null
_block_document_id: ce44b1b5-b0a4-4034-a724-b4660da17555
_block_document_name: anonymous-fa985916-1aed-4f2b-89f2-5f5e2c278ae0
_is_anonymous: true
block_type_slug: process
_block_type_slug: process
storage:
repository_url: <https://github.com/zachloertscher/test-public-repo>
reference: main
credentials: null
_block_document_id: a2fbc663-3d11-4551-9350-49eec1a3de54
_block_document_name: test-public-github-repo
_is_anonymous: false
block_type_slug: github-repository
_block_type_slug: github-repository
path: /Users/zachloertscher/test-public-github-repo
entrypoint: src\log_flow.py:log_flow
parameter_openapi_schema:
title: Parameters
type: object
properties:
name:
title: name
position: 0
type: string
required:
- name
definitions: null
timestamp: '2023-04-04T13:57:58.625897+00:00'
Stack trace:
09:58:36.212 | INFO | prefect.agent - Submitting flow run '0b7f9e6d-bbcd-4b21-8c41-12d735d31816'
09:58:36.874 | INFO | prefect.infrastructure.process - Opening process 'dark-curassow'...
09:58:36.978 | INFO | prefect.agent - Completed submission of flow run '0b7f9e6d-bbcd-4b21-8c41-12d735d31816'
C:\Users\Zachary.Loertscher\AppData\Local\Programs\Python\Python310\lib\runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
09:58:41.105 | INFO | Flow run 'dark-curassow' - Downloading flow code from storage at '/Users/zachloertscher/test-public-github-repo'
09:58:41.113 | ERROR | Flow run 'dark-curassow' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\engine.py", line 277, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\client\utilities.py", line 40, in with_injected_client
return await fn(*args, **kwargs)
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\deployments.py", line 194, in load_flow_from_flow_run
await storage_block.get_directory(from_path=deployment.path, local_path=".")
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect_github\repository.py", line 146, in get_directory
process = await run_process(cmd, stream_output=(out_stream, err_stream))
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\utilities\processutils.py", line 258, in run_process
async with open_process(
File "C:\Users\Zachary.Loertscher\AppData\Local\Programs\Python\Python310\lib\contextlib.py", line 199, in __aenter__
return await anext(self.gen)
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\utilities\processutils.py", line 200, in open_process
process = await _open_anyio_process(command, **kwargs)
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\utilities\processutils.py", line 162, in _open_anyio_process
process = await asyncio.create_subprocess_shell(
File "C:\Users\Zachary.Loertscher\AppData\Local\Programs\Python\Python310\lib\asyncio\subprocess.py", line 205, in create_subprocess_shell
transport, protocol = await loop.subprocess_shell(
File "C:\Users\Zachary.Loertscher\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 1634, in subprocess_shell
transport = await self._make_subprocess_transport(
File "C:\Users\Zachary.Loertscher\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 498, in _make_subprocess_transport
raise NotImplementedError
NotImplementedError
09:58:41.292 | ERROR | prefect.engine - Engine execution of flow run '0b7f9e6d-bbcd-4b21-8c41-12d735d31816' exited with unexpected exception
Traceback (most recent call last):
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\engine.py", line 2131, in <module>
enter_flow_run_engine_from_subprocess(flow_run_id)
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\engine.py", line 202, in enter_flow_run_engine_from_subprocess
).result()
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\client\schemas.py", line 107, in result
return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\states.py", line 76, in get_state_result
return _get_state_result(state, raise_on_failure=raise_on_failure)
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\utilities\asyncutils.py", line 260, in coroutine_wrapper
return call()
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 245, in __call__
return self.result()
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\_internal\concurrency\calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "C:\Users\Zachary.Loertscher\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 439, in result
return self.__get_result()
File "C:\Users\Zachary.Loertscher\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 391, in __get_result
raise self._exception
File "C:\Users\Zachary.Loertscher\repos\data_warehouse_pipeline\.venv\lib\site-packages\prefect\_internal\concurrency\call transport = await self._make_subprocess_transport(
File "C:\Users\Zachary.Loertscher\AppData\Local\Programs\Python\Python310\lib\asyncio\base_events.py", line 498, in _make_subprocess_transport raise NotImplementedError
NotImplementedError
09:58:41.948 | ERROR | prefect.infrastructure.process - Process 'dark-curassow' exited with status code: 1
My repo: https://github.com/zachloertscher/test-public-repo
I've been at this for hours - any help would be so very much appreciated!Jason Motley
04/04/2023, 2:25 PMset PREFECT__CLOUD__USE_LOCAL_SECRETS=false
in order to access databases while running things in Anaconda terminal. How can I replicate this when running/testing code in a VS Code interactive window?Andreas Nord
04/04/2023, 3:08 PMZachary Loertscher
04/04/2023, 5:05 PMPaco Ibañez
04/04/2023, 5:53 PMscott
04/04/2023, 6:52 PM{{ deployment.name }}
is blank when the flow is a subflow - ideally in the automation whisker template I could check if a flow was really a subflow, and then get the subflow name and the parent deployment name. Any tips? I’ve been fiddling with get_run_context
and prefect.runtime
but I don’t see anything that can be used to 1) check if a flow is really a subflow, and 2) if a flow is a subflow, get the parent deployment name
Clearly there is a way to do it since the Prefect UI has this logic as i toggle through pages, but I don’t see a clear way to get that same information with the prefect python libraryMansour Zayer
04/04/2023, 8:44 PMCancelled
state in certain conditions (expected behavior). When any of them enters Cancelled
state and the rest complete, the main flow also enters Canceled
state. How can I make the main flow succeed? This is my code
@flow()
def main_flow():
subflows = (subflow1, subflow2, subflow3)
for subflow in subflows:
try:
subflow()
except CancelledRun:
continue
When the first flow cancels, I get something like this Finished in state Cancelled('1/3 states cancelled.')
and the main flow state becomes Cancelled too.
ThanksMiller Jiang
04/05/2023, 12:11 AMDeployment.build_from_flow
from prefect.deployments
to deploy flow to Prefect cloud, the process is stuck without erroring out and yet didn’t give any verbose status/progress. I checked the code was uploaded to S3 bucket but the deployment never happened on the Prefect cloud. Do you know what might be the root cause? and is it possible to add more verbose loggings for the Deployment.build_from_flow
method?Matt Fysh
04/05/2023, 12:45 AMmax(timestamp) + 1
in each returned response.
Is prefect a good tool to use? I have been using databricks exclusively so this will be the first tool added to my "data stack", and it looks like I will need to move my scheduled job definitions out of databricks, and into prefect, with a step that invokes the old databricks taskMatt Fysh
04/05/2023, 4:50 AMVadym Dytyniak
04/05/2023, 8:15 AMSamuel Hinton
04/05/2023, 8:34 AMSatsuki Nagae
04/05/2023, 8:35 AMstatus_code
of the ECSTaskResult
?
from prefect_aws.ecs import ECSTask
ecs_task_block = ECSTask.load("my-ecs-task-block")
ecs_task_result = ecs_task_block.run()
print(ecs_task_result.exit_code) # 1 (application error)
Mark NS
04/05/2023, 8:42 AMAndrew Pruchinski
04/05/2023, 1:13 PMDavid Wang
04/05/2023, 1:16 PM