Florian Guily
03/24/2022, 10:48 AMVadym Dytyniak
03/24/2022, 12:19 PMephemeralStorage
in ECSRun configuration. Can I use run_task_kwargs
to modify it? If yes - what is the format of this dict to pass ephemeralStorage
?Joshua Greenhalgh
03/24/2022, 12:28 PMjobs.batch is forbidden: User \"system:serviceaccount:default:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"prefect\"
Didier Marin
03/24/2022, 1:33 PM16:46:32 Task 'x': Starting task run...
16:46:33 Task 'x': Finished task run for task with final state: 'TriggerFailed'
01:39:32 Task 'x': Starting task run...
01:39:46 Task 'x': Starting task run...
Don't know if it could be linked, but I had to retry the parent task that failed (hence the "TriggerFailed" that you can see above).
I'm running using a k8s executor, with core version 0.14.20.
Any idea what could have happened?Raimundo Pereira De Souza Neto
03/24/2022, 2:21 PMfrom prefect import flow
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import IntervalSchedule
from datetime import timedelta
@flow(name="TestingFlow")
async def testing_flow(name="raimundo"):
try:
print(f"Hello {name}!")
except Exception as e:
print(e)
DeploymentSpec(
flow=testing_flow,
name="hw-30s",
schedule=IntervalSchedule(interval=timedelta(seconds=20)),
tags=["rai", "20s"],
)
when I run prefect deployment create my_file.py
, that creates it correctly, but the tasks don't run.
Anyone help me please 💙Nick Hart
03/24/2022, 2:24 PMRamzi A
03/24/2022, 3:17 PMSteve R
03/24/2022, 3:21 PMJohn Ramey
03/24/2022, 3:34 PM<http://pandas.to|pandas.to>_sql
. Apologies if this is answered in docs somewhere, but how do I pass my secrets to Prefect Cloud? I have 2 cases. In the first case I can run the script locally and pass the secrets as env vars, but in the second case, I’m hoping to run the job on Prefect Cloud once a week. Any help would be great! Thanks.Ken Nguyen
03/24/2022, 4:29 PMversion_group_id
instead of flow_id
in my GQL query? I tried to simply swap out flow_id
for version_group_id
, but that’s giving me the following errorAnders Segerberg
03/24/2022, 4:39 PMMathijs Miermans
03/24/2022, 4:41 PMflatten
expected to work on a dict
?
from prefect import Flow, task, context, flatten
@task
def A():
return {i: list(range(i)) for i in range(3)}
@task
def B(y):
logger = context.get("logger")
<http://logger.info|logger.info>(y)
with Flow('flat map') as f:
a = A() # {0: [], 1: [0], 2: [0,1]}
b = B.map(flatten(a))
if __name__ == "__main__":
f.run()
I'm getting an unexpected error:
ERROR - prefect.flat map | Unexpected error occured in FlowRunner: KeyError(3)
Diego Oliveira
03/24/2022, 6:32 PMKevin Mullins
03/24/2022, 7:47 PMLocalDaskExecutor
with 32 threads. I’ve verified in logs and the schematic that the Discover
task should be ready to go as all it’s upstream tasks are complete; however, it still waits around for the unrelated Sync
task to complete before starting.
Was hoping someone might have an idea how to figure out what’s going on. I’ve created another example flow that had a bunch of unrelated root/child tasks and verified it seems to behave as I would expected, but haven’t tried to reproduce this with a test flow that uses mapping yet. I’ll attach a screenshot that visually shows (hopefully) what I’m talking about.Gabriel Milan
03/24/2022, 8:25 PMcontext
, but I can't seem to find it. I'm trying to use the prefect.tasks.prefect.create_flow_run
using the very same labels as the parent flow, maybe there's a shortcut for it.Matthew Seligson
03/24/2022, 8:36 PMJohn Ramey
03/24/2022, 8:54 PMprefect.context.get("logger")
. Does a logger have to be defined within each task? This gets a bit repetitive if I have 5-10 tasks within a flow. Is there a more concise way to define loggers across tasks?
import prefect
@task
def my_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("An info message.")
logger.warning("A warning message.")
Lee Briggs
03/24/2022, 9:16 PMflow.register("pulumi_test", set_schedule_active*=False*)
it sets the flow to only run locally. More in 🧵Denis Pokotylyuk(External)
03/24/2022, 9:46 PMquery GetSchedules {
flow_group(where: {id: {_eq: "eb595947-xxxxxx"}}) {
schedule
}
}
Jake
03/24/2022, 9:48 PMAndrew Huang
03/24/2022, 10:33 PMfrom prefect.engine.state import Skipped
in orion? Maybe Cancelled
? https://orion-docs.prefect.io/concepts/states/#state-details
How do I import it?
from prefect.states import Cancelled
doesn’t seem to workSuresh R
03/25/2022, 1:12 AMChris L.
03/25/2022, 6:22 AMFuture
. However, before I continue down this async rabbit hole, I'm wondering if there are any gotchas (or docs or source code) I should look into? In particular, regarding how to use PrefectFutures in mixed async / sync flows and tasks. Moreover, will I be able to use `asnycio`'s synchronisation primitives (https://docs.python.org/3/library/asyncio-sync.html) with PrefectFutures? Thanks in advance! 🙌R Zo
03/25/2022, 8:09 AMPB
03/25/2022, 1:33 PMMatthias
03/25/2022, 1:49 PMAnna Geller
03/25/2022, 4:36 PMJoe Goldbeck
03/25/2022, 5:16 PMMadison Schott
03/25/2022, 5:45 PMMyles Steinhauser
03/25/2022, 5:58 PMError during execution of task: ValueError('Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.')
More details in thread.Myles Steinhauser
03/25/2022, 5:58 PMError during execution of task: ValueError('Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.')
More details in thread.@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
return
with Flow(FLOW_NAME,
run_config=RUN_CONFIG,
storage=S3(bucket=S3_BUCKET),
executor=LocalDaskExecutor()
) as flow:
rval_say_hi = say_hi()
# assumes you have registered the following flows in a project named "myles"
flow_a = create_flow_run(flow_id="1c85cc1e-7e8c-46c9-86a2-d4beb75ea9ce")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
flow_b = create_flow_run(flow_name="B", project_name="myles")
wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)
flow_c = create_flow_run(flow_name="C", project_name="myles")
wait_for_flow_c = wait_for_flow_run(flow_c, raise_final_state=True)
flow_d = create_flow_run(flow_name="D", project_name="myles")
wait_for_flow_d = wait_for_flow_run(flow_d, raise_final_state=True)
flow_a.set_upstream(rval_say_hi)
flow_b.set_upstream(wait_for_flow_a)
flow_c.set_upstream(wait_for_flow_a)
flow_d.set_upstream(wait_for_flow_b)
flow_d.set_upstream(wait_for_flow_c)
Child Flow (same across all 4 child flows):
@task
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
return
with Flow(FLOW_NAME,
# run_config=RUN_CONFIG,
storage=S3(bucket=S3_BUCKET),
executor=LocalDaskExecutor()
) as flow:
say_hi()
create_flow_run()
call, regardless if I specify a flow_id
or a flow_name, project_name
The child flows all run correctly when run on their own, but the parent flow always fails to create them as children.Kevin Kho
03/25/2022, 6:08 PMflow_a
start here? What happens if you do?
flow_b = create_flow_run(flow_id=None, flow_name="B", project_name="myles")
And what is your Prefect version?Myles Steinhauser
03/25/2022, 6:10 PMflow_a
never appears to start.
I’ll try that now
Running on prefect-1.1.0 with Prefect Cloud connected to ECSAgent└── 14:18:05 | ERROR | Task 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/myles/Library/Caches/pypoetry/virtualenvs/saasworks-py-ScOzV3iT-py3.10/lib/python3.10/site-packages/prefect/tasks/prefect/flow_run.py", line 115, in create_flow_run
ValueError: Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
return
start_flow_run = StartFlowRun(project_name="myles", wait=True)
with Flow(FLOW_NAME,
run_config=RUN_CONFIG,
storage=S3(bucket=S3_BUCKET),
executor=LocalDaskExecutor()
) as flow:
rval_say_hi = say_hi()
# assumes you have registered the following flows in a project named "myles"
flow_a = create_flow_run(flow_id=None, flow_name="A", project_name="myles")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
flow_a.set_upstream(rval_say_hi)
Still fails consistently in the same way.start_flow_run = StartFlowRun(project_name="myles", wait=True)
flow_a = start_flow_run(flow_name="A")
flow_a.set_upstream(rval_say_hi)
Everything executes as desired!Kevin Kho
03/25/2022, 6:28 PMStartFlowRun
and create_flow_run
?Myles Steinhauser
03/25/2022, 6:29 PMcreate_flow_run
was broken.Kevin Kho
03/25/2022, 6:30 PM@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
return
start_flow_run = StartFlowRun(project_name="myles", wait=True)
with Flow(FLOW_NAME,
run_config=RUN_CONFIG,
storage=S3(bucket=S3_BUCKET),
executor=LocalDaskExecutor()
) as flow:
rval_say_hi = say_hi()
# assumes you have registered the following flows in a project named "myles"
flow_a = create_flow_run(flow_id=None, flow_name="A", project_name="myles")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
flow_a.set_upstream(rval_say_hi)
start_flow_run
is not used right?Myles Steinhauser
03/25/2022, 6:31 PMcreate_flow_run
kept failing. It seems like that’s the simpler API, though, so I was trying to stick with it.Kevin Kho
03/25/2022, 6:32 PMstart_flow_run
one secMyles Steinhauser
03/25/2022, 6:40 PMStartFlowRun
executes correctly consistently:
import datetime
import prefect
from prefect import task, Flow
from prefect.tasks.prefect import StartFlowRun, create_flow_run, wait_for_flow_run
from prefect.run_configs import ECSRun
from prefect.storage import S3
from prefect.executors import LocalDaskExecutor
FLOW_NAME = "ecs_demo_ecr"
S3_BUCKET="sw-app-sandbox"
# launchType="FARGATE"
launchType="EC2"
RUN_CONFIG = ECSRun(
env={"SOME_VAR": "VALUE"},
labels=["sandbox"],
task_definition=dict(
family=FLOW_NAME,
requiresCompatibilities=[launchType],
networkMode="bridge",
cpu=128,
memory=128,
taskRoleArn=f"arn:aws:iam::<redacted>:role/prefect-ecs-cluster-v6-ECSTaskRole-1DTCELFAE040E",
executionRoleArn=f"arn:aws:iam::<redacted>:role/prefect-ecs-cluster-v6-ECSTaskExecutionRole-61S1X6JMJ5A8",
containerDefinitions=[
dict(
name="flow",
image="prefecthq/prefect:latest-python3.9",
)
],
),
run_task_kwargs=dict(
cluster="prefect-ecs-cluster-v6-ECSCluster-IgNcyDRUJqLa",
launchType=launchType,
enableECSManagedTags=True,
enableExecuteCommand=True,
),
)
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
return
with Flow(FLOW_NAME,
run_config=RUN_CONFIG,
storage=S3(bucket=S3_BUCKET),
executor=LocalDaskExecutor()
) as flow:
rval_say_hi = say_hi()
flow_a = StartFlowRun(flow_name="A", project_name="myles", wait=True)
flow_b = StartFlowRun(flow_name="B", project_name="myles", wait=True)
flow_c = StartFlowRun(flow_name="C", project_name="myles", wait=True)
flow_d = StartFlowRun(flow_name="D", project_name="myles", wait=True)
flow_a.set_upstream(rval_say_hi)
flow_b.set_upstream(flow_a)
flow_c.set_upstream(flow_a)
flow_d.set_upstream(flow_b)
flow_d.set_upstream(flow_c)
Kevin Kho
03/25/2022, 6:41 PMfrom prefect import task, Flow
import prefect
import datetime
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
FLOW_NAME = "test"
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
return
with Flow("say_hi") as flow:
rval_say_hi = say_hi()
with Flow(FLOW_NAME) as flow2:
rval_say_hi = say_hi()
flow_a = create_flow_run(flow_name="say_hi", project_name="databricks")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
flow_a.set_upstream(rval_say_hi)
flow.register("databricks")
flow2.register("databricks")
Myles Steinhauser
03/25/2022, 6:42 PMFlow("say_hi")
in a different Python file and register it?Kevin Kho
03/25/2022, 6:42 PMMyles Steinhauser
03/25/2022, 6:42 PMKevin Kho
03/25/2022, 6:42 PMMyles Steinhauser
03/25/2022, 6:42 PMKevin Kho
03/25/2022, 6:44 PMfrom prefect import task, Flow
import prefect
import datetime
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
FLOW_NAME = "test"
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
return
with Flow(FLOW_NAME) as flow2:
rval_say_hi = say_hi()
flow_a = create_flow_run(flow_name="say_hi", project_name="databricks")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
flow_a.set_upstream(rval_say_hi)
flow2.register("databricks")
subflow:
from prefect import task, Flow
import prefect
import datetime
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
return
with Flow("say_hi") as flow:
rval_say_hi = say_hi()
flow.register("databricks")
Myles Steinhauser
03/25/2022, 6:46 PMKevin Kho
03/25/2022, 6:47 PMMyles Steinhauser
03/25/2022, 6:51 PMKevin Kho
03/25/2022, 6:52 PMflow.run()
also? Or were you using flow.run()
to test it? I’m trying to see which specific call failedMyles Steinhauser
03/25/2022, 6:53 PMprefect register --project myles -p parent-flow.py
and prefect run --project myles --name parent-flow --watch
cycleKevin Kho
03/25/2022, 6:53 PMMyles Steinhauser
03/25/2022, 8:07 PMprefect register
on the local system to upload to an S3 bucket for Agent execution.