https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • f

    Florian Guily

    03/24/2022, 10:48 AM
    Hey, is there a way to instantly force the execution of a PrefectSecret task and set it with no upstream ?
    k
    a
    • 3
    • 2
  • v

    Vadym Dytyniak

    03/24/2022, 12:19 PM
    Hey. I would like to change
    ephemeralStorage
    in ECSRun configuration. Can I use
    run_task_kwargs
    to modify it? If yes - what is the format of this dict to pass
    ephemeralStorage
    ?
    k
    • 2
    • 3
  • j

    Joshua Greenhalgh

    03/24/2022, 12:28 PM
    Hi all I am currently trying to get k8s agent working on gcp autopilot cluster - have run into the following error;
    jobs.batch is forbidden: User \"system:serviceaccount:default:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"prefect\"
    j
    a
    m
    • 4
    • 17
  • d

    Didier Marin

    03/24/2022, 1:33 PM
    Hi all! I had a very strange behaviour with a task being executed twice, with a few seconds interval:
    16:46:32 Task 'x': Starting task run...
    16:46:33 Task 'x': Finished task run for task with final state: 'TriggerFailed'
    01:39:32 Task 'x': Starting task run...
    01:39:46 Task 'x': Starting task run...
    Don't know if it could be linked, but I had to retry the parent task that failed (hence the "TriggerFailed" that you can see above). I'm running using a k8s executor, with core version 0.14.20. Any idea what could have happened?
    k
    • 2
    • 11
  • r

    Raimundo Pereira De Souza Neto

    03/24/2022, 2:21 PM
    Hello Prefecters! What's up? I tried to create a schedule flow using:
    from prefect import flow
    from prefect.deployments import DeploymentSpec
    from prefect.orion.schemas.schedules import IntervalSchedule
    from datetime import timedelta
    
    
    @flow(name="TestingFlow")
    async def testing_flow(name="raimundo"):
        try:
            print(f"Hello {name}!")
        except Exception as e:
            print(e)
    
    
    DeploymentSpec(
        flow=testing_flow,
        name="hw-30s",
        schedule=IntervalSchedule(interval=timedelta(seconds=20)),
        tags=["rai", "20s"],
    )
    when I run
    prefect deployment create my_file.py
    , that creates it correctly, but the tasks don't run. Anyone help me please 💙
    n
    a
    • 3
    • 8
  • n

    Nick Hart

    03/24/2022, 2:24 PM
    Hi Prefect team! I have a couple questions about Orion capabilities within our production environment. Currently we are running Prefect core with prefect server on a local production server and agents on separate vm servers. This makes it really easy to run automated capabilities, process flows quickly, etc. We really like how flows can be defined and encapsulated in docker with dependencies and shared within our architecture. We don’t like how Flow of Flows is statically defined within Prefect Core, but I believe this will be solved with prefect Orion. • Am I going to be able to set Orion up in a similar architecture? Can I have Orion set up on a local server that employees can access via web ui and run capabilities? Will it be able to store flow data and logs? • I was wondering what docker storage is going to look like within a production infrastructure. Is there going to be support for docker storage in gitlab like with prefect core? • I really like Orion’s developer simplicity and how easy it is to create flow of flows. Is there going to be a way for flow of flows to be dynamically created in the UI? As in, like some sort of functionality to drag and drop flows in a particular order or check box which flows you want to run. Thanks for all your help!
    k
    a
    • 3
    • 8
  • r

    Ramzi A

    03/24/2022, 3:17 PM
    Something I am trying to understand with prefect cloud 2.0 is compute part of the service? So wondering if prefect will handle infra and scaling
    k
    • 2
    • 1
  • s

    Steve R

    03/24/2022, 3:21 PM
    Hi prefecters, I have a question about prefect core (no Prefect Server, no Prefect Cloud). I'm trying to understand Flow concurrency. Is the best way to handle a stream of input data to run multiple flows (in their own thread) as the data comes in?
    k
    • 2
    • 21
  • j

    John Ramey

    03/24/2022, 3:34 PM
    Hey y’all. I’m new to Prefect and I think it’s rad. The local version of Prefect is easy enough, but I’m a little confused by secrets in Prefect Cloud. I’m trying to write to a Postgres RDS instance on AWS. The script I’m using writes to an existing table with
    <http://pandas.to|pandas.to>_sql
    . Apologies if this is answered in docs somewhere, but how do I pass my secrets to Prefect Cloud? I have 2 cases. In the first case I can run the script locally and pass the secrets as env vars, but in the second case, I’m hoping to run the job on Prefect Cloud once a week. Any help would be great! Thanks.
    k
    • 2
    • 4
  • k

    Ken Nguyen

    03/24/2022, 4:29 PM
    Is there a way I can use
    version_group_id
    instead of
    flow_id
    in my GQL query? I tried to simply swap out
    flow_id
    for
    version_group_id
    , but that’s giving me the following error
    :discourse: 1
    k
    • 2
    • 9
  • a

    Anders Segerberg

    03/24/2022, 4:39 PM
    Hi, For a `map`ed task, on the LocalExecutor -- is the map guaranteed to execute serially / in-order across the mapped object?
    k
    • 2
    • 1
  • m

    Mathijs Miermans

    03/24/2022, 4:41 PM
    Is
    flatten
    expected to work on a
    dict
    ?
    from prefect import Flow, task, context, flatten
    
    @task
    def A():
        return {i: list(range(i)) for i in range(3)}
    
    @task
    def B(y):
        logger = context.get("logger")
        <http://logger.info|logger.info>(y)
    
    
    with Flow('flat map') as f:
        a = A()  # {0: [], 1: [0], 2: [0,1]}
        b = B.map(flatten(a))
    
    if __name__ == "__main__":
        f.run()
    I'm getting an unexpected error:
    ERROR - prefect.flat map | Unexpected error occured in FlowRunner: KeyError(3)
    s
    k
    • 3
    • 10
  • d

    Diego Oliveira

    03/24/2022, 6:32 PM
    Hello everyone. I would like to know if is possible to dynamically change run name based on schedule parameter?
    k
    a
    • 3
    • 6
  • k

    Kevin Mullins

    03/24/2022, 7:47 PM
    Alright, not sure what kind of monster I’ve created. I’m created a flow that two distinct paths that are unrelated and don’t share any dependencies other than a few secrets. For some reason one of the mapped tasks won’t start until an unrelated task finishes. I’m using
    LocalDaskExecutor
    with 32 threads. I’ve verified in logs and the schematic that the
    Discover
    task should be ready to go as all it’s upstream tasks are complete; however, it still waits around for the unrelated
    Sync
    task to complete before starting. Was hoping someone might have an idea how to figure out what’s going on. I’ve created another example flow that had a bunch of unrelated root/child tasks and verified it seems to behave as I would expected, but haven’t tried to reproduce this with a test flow that uses mapping yet. I’ll attach a screenshot that visually shows (hopefully) what I’m talking about.
    k
    • 2
    • 17
  • g

    Gabriel Milan

    03/24/2022, 8:25 PM
    Hello there! Is there a way of retrieving the current flow labels during execution? My guess would be that this is available on the
    context
    , but I can't seem to find it. I'm trying to use the
    prefect.tasks.prefect.create_flow_run
    using the very same labels as the parent flow, maybe there's a shortcut for it.
    k
    • 2
    • 6
  • m

    Matthew Seligson

    03/24/2022, 8:36 PM
    Do state handlers capture state changes induced by setting state via the UI?
    k
    a
    • 3
    • 19
  • j

    John Ramey

    03/24/2022, 8:54 PM
    Question about logging best practices. In the docs (see below), a logger is defined within a task with
    prefect.context.get("logger")
    . Does a logger have to be defined within each task? This gets a bit repetitive if I have 5-10 tasks within a flow. Is there a more concise way to define loggers across tasks?
    import prefect
    
    @task
    def my_task():
        logger = prefect.context.get("logger")
    
        <http://logger.info|logger.info>("An info message.")
        logger.warning("A warning message.")
    k
    • 2
    • 6
  • l

    Lee Briggs

    03/24/2022, 9:16 PM
    hi folks! I think I'm having a mental disconnect, coming from temporal.io I've created an ECS cluster with a long running agent that's happily registered. I'm not trying to register a task, and a flow. However, when I try register it using
    flow.register("pulumi_test", set_schedule_active*=False*)
    it sets the flow to only run locally. More in 🧵
    k
    • 2
    • 8
  • d

    Denis Pokotylyuk(External)

    03/24/2022, 9:46 PM
    Hi there! I have a question about schedules. How can I fetch schedules for a flow group and specific parameters. Example: I have very many schedules. If I fetch with this query it will be a lot of data for the client. I need to fetch with a filter based on “targetProjectId” from “parameter_defaults”, see the image. It is possible? Thanks :)
    query GetSchedules {
      flow_group(where: {id: {_eq: "eb595947-xxxxxx"}}) {
        schedule 
      }
    }
    k
    • 2
    • 39
  • j

    Jake

    03/24/2022, 9:48 PM
    Is there anyway to remove a prefect agent and stop prefect cloud from using it to run more flows?
    k
    • 2
    • 2
  • a

    Andrew Huang

    03/24/2022, 10:33 PM
    Is there an equivalent to
    from prefect.engine.state import Skipped
    in orion? Maybe
    Cancelled
    ? https://orion-docs.prefect.io/concepts/states/#state-details How do I import it?
    from prefect.states import Cancelled
    doesn’t seem to work
    k
    a
    • 3
    • 11
  • s

    Suresh R

    03/25/2022, 1:12 AM
    Hi! can you share graphql example to get the task result of a flow run
    k
    • 2
    • 11
  • c

    Chris L.

    03/25/2022, 6:22 AM
    Hello Prefect team, a question about Orion. I'm starting to build out an async ML pipeline. I see the "Getting Started" page's async example doesn't pass results between tasks. I'm looking into the documentation regarding PrefectFuture and the API seems largely similar to `asyncio`'s
    Future
    . However, before I continue down this async rabbit hole, I'm wondering if there are any gotchas (or docs or source code) I should look into? In particular, regarding how to use PrefectFutures in mixed async / sync flows and tasks. Moreover, will I be able to use `asnycio`'s synchronisation primitives (https://docs.python.org/3/library/asyncio-sync.html) with PrefectFutures? Thanks in advance! 🙌
    a
    • 2
    • 4
  • r

    R Zo

    03/25/2022, 8:09 AM
    Hi Prefect team, I have currently implemented a task to load from disk a file and perform some action serially as mapping did not work due to memory constraints. Is there a way of mapping this task and putting some memory constraints on the task or mapping ? like for example telling prefect to map only if there is sufficient memory ?
    a
    • 2
    • 4
  • p

    PB

    03/25/2022, 1:33 PM
    Hi. I have a general question about architecture. I know there's repo with TF scripts for Agent on AWS, but are there any scripts for Prefect Server that could be setup on AWS so that we could use both Agent and Server on AWS without contacting Prefect Cloud?
    k
    • 2
    • 2
  • m

    Matthias

    03/25/2022, 1:49 PM
    Orion question: is there an equivalent of `Parameter`’s from Prefect 1.0?
    k
    • 2
    • 7
  • a

    Anna Geller

    03/25/2022, 4:36 PM
    There is a poll on Twitter with a question: "what orchestrator would you use next if you could switch today?" I would definitely choose Prefect! 😛refect: Thanks to everyone who participates in the poll! :thank-you:
    🚀 4
    ✅ 2
    :marvin: 3
  • j

    Joe Goldbeck

    03/25/2022, 5:16 PM
    👋 Hi all! Is there a good place to ask about logging configuration options using Prefect Cloud? 😄
    k
    • 2
    • 7
  • m

    Madison Schott

    03/25/2022, 5:45 PM
    Hi all, how do I upgrade to the latest version of dbt within my Prefect flow? I am running it on a Docker container
    k
    a
    • 3
    • 11
  • m

    Myles Steinhauser

    03/25/2022, 5:58 PM
    I’m trying to follow the Scheduling a Flow-of-Flows example in the docs, but I keep running into the following error:
    Error during execution of task: ValueError('Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.')
    More details in thread.
    k
    • 2
    • 33
Powered by Linen
Title
m

Myles Steinhauser

03/25/2022, 5:58 PM
I’m trying to follow the Scheduling a Flow-of-Flows example in the docs, but I keep running into the following error:
Error during execution of task: ValueError('Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.')
More details in thread.
Parent Flow:
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()


    # assumes you have registered the following flows in a project named "myles"
    flow_a = create_flow_run(flow_id="1c85cc1e-7e8c-46c9-86a2-d4beb75ea9ce")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)

    flow_b = create_flow_run(flow_name="B", project_name="myles")
    wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)

    flow_c = create_flow_run(flow_name="C", project_name="myles")
    wait_for_flow_c = wait_for_flow_run(flow_c, raise_final_state=True)

    flow_d = create_flow_run(flow_name="D", project_name="myles")
    wait_for_flow_d = wait_for_flow_run(flow_d, raise_final_state=True)

    flow_a.set_upstream(rval_say_hi)
    flow_b.set_upstream(wait_for_flow_a)
    flow_c.set_upstream(wait_for_flow_a)
    flow_d.set_upstream(wait_for_flow_b)
    flow_d.set_upstream(wait_for_flow_c)
Child Flow (same across all 4 child flows):
@task
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


with Flow(FLOW_NAME,
        #   run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    say_hi()
Based on the logs available to me, it appears that the parent flow fails (always) at the
create_flow_run()
call, regardless if I specify a
flow_id
or a
flow_name, project_name
The child flows all run correctly when run on their own, but the parent flow always fails to create them as children.
k

Kevin Kho

03/25/2022, 6:08 PM
This is weird. Does
flow_a
start here? What happens if you do?
flow_b = create_flow_run(flow_id=None, flow_name="B", project_name="myles")
And what is your Prefect version?
m

Myles Steinhauser

03/25/2022, 6:10 PM
Nope,
flow_a
never appears to start. I’ll try that now Running on prefect-1.1.0 with Prefect Cloud connected to ECSAgent
Nothing changes, all of the same failures:
└── 14:18:05 | ERROR   | Task 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/myles/Library/Caches/pypoetry/virtualenvs/saasworks-py-ScOzV3iT-py3.10/lib/python3.10/site-packages/prefect/tasks/prefect/flow_run.py", line 115, in create_flow_run
ValueError: Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.
Stepping back to an even simpler test case
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


start_flow_run = StartFlowRun(project_name="myles", wait=True)


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()

    # assumes you have registered the following flows in a project named "myles"
    flow_a = create_flow_run(flow_id=None, flow_name="A", project_name="myles")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)
Still fails consistently in the same way.
More interesting info. I’m essentially following this guide: https://discourse.prefect.io/t/how-can-i-create-a-subflow-and-block-until-it-s-completed/94 And if I use:
start_flow_run = StartFlowRun(project_name="myles", wait=True)

flow_a = start_flow_run(flow_name="A")
flow_a.set_upstream(rval_say_hi)
Everything executes as desired!
k

Kevin Kho

03/25/2022, 6:28 PM
A bit confused why your snippet has both
StartFlowRun
and
create_flow_run
?
m

Myles Steinhauser

03/25/2022, 6:29 PM
Because
create_flow_run
was broken.
k

Kevin Kho

03/25/2022, 6:30 PM
No but over here:
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


start_flow_run = StartFlowRun(project_name="myles", wait=True)


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()

    # assumes you have registered the following flows in a project named "myles"
    flow_a = create_flow_run(flow_id=None, flow_name="A", project_name="myles")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)
start_flow_run
is not used right?
m

Myles Steinhauser

03/25/2022, 6:31 PM
Ah, originally it was not used. I started going in that direction since
create_flow_run
kept failing. It seems like that’s the simpler API, though, so I was trying to stick with it.
k

Kevin Kho

03/25/2022, 6:32 PM
Ah ok let me try this with that code without the
start_flow_run
one sec
👍 1
m

Myles Steinhauser

03/25/2022, 6:40 PM
Confirmed that
StartFlowRun
executes correctly consistently:
import datetime
import prefect
from prefect import task, Flow
from prefect.tasks.prefect import StartFlowRun, create_flow_run, wait_for_flow_run
from prefect.run_configs import ECSRun
from prefect.storage import S3
from prefect.executors import LocalDaskExecutor


FLOW_NAME = "ecs_demo_ecr"
S3_BUCKET="sw-app-sandbox"

# launchType="FARGATE"
launchType="EC2"

RUN_CONFIG = ECSRun(
    env={"SOME_VAR": "VALUE"},
    labels=["sandbox"],
    task_definition=dict(
        family=FLOW_NAME,
        requiresCompatibilities=[launchType],
        networkMode="bridge",
        cpu=128,
        memory=128,
        taskRoleArn=f"arn:aws:iam::<redacted>:role/prefect-ecs-cluster-v6-ECSTaskRole-1DTCELFAE040E",
        executionRoleArn=f"arn:aws:iam::<redacted>:role/prefect-ecs-cluster-v6-ECSTaskExecutionRole-61S1X6JMJ5A8",
        containerDefinitions=[
            dict(
                name="flow",
                image="prefecthq/prefect:latest-python3.9",
            )
        ],
    ),
    run_task_kwargs=dict(
        cluster="prefect-ecs-cluster-v6-ECSCluster-IgNcyDRUJqLa",
        launchType=launchType,
        enableECSManagedTags=True,
        enableExecuteCommand=True,
    ),
)

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()

    flow_a = StartFlowRun(flow_name="A", project_name="myles", wait=True)
    flow_b = StartFlowRun(flow_name="B", project_name="myles", wait=True)
    flow_c = StartFlowRun(flow_name="C", project_name="myles", wait=True)
    flow_d = StartFlowRun(flow_name="D", project_name="myles", wait=True)

    flow_a.set_upstream(rval_say_hi)
    flow_b.set_upstream(flow_a)
    flow_c.set_upstream(flow_a)
    flow_d.set_upstream(flow_b)
    flow_d.set_upstream(flow_c)
k

Kevin Kho

03/25/2022, 6:41 PM
This is working for me:
from prefect import task, Flow
import prefect
import datetime 
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

FLOW_NAME = "test"

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
    return

with Flow("say_hi") as flow:
    rval_say_hi = say_hi()

with Flow(FLOW_NAME) as flow2:
    rval_say_hi = say_hi()
    flow_a = create_flow_run(flow_name="say_hi", project_name="databricks")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)

flow.register("databricks")
flow2.register("databricks")
👀 1
m

Myles Steinhauser

03/25/2022, 6:42 PM
What happens if you define
Flow("say_hi")
in a different Python file and register it?
k

Kevin Kho

03/25/2022, 6:42 PM
I register the subflow and main flow in one go on local storage to be clear
m

Myles Steinhauser

03/25/2022, 6:42 PM
right
k

Kevin Kho

03/25/2022, 6:42 PM
Let me split it one sec
m

Myles Steinhauser

03/25/2022, 6:42 PM
I’m trying to compose multiple disparate Flows together (for ease of management, and overall ETL composition)
k

Kevin Kho

03/25/2022, 6:44 PM
So it still works
🤯 1
parent_flow
from prefect import task, Flow
import prefect
import datetime 
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

FLOW_NAME = "test"

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
    return

with Flow(FLOW_NAME) as flow2:
    rval_say_hi = say_hi()
    flow_a = create_flow_run(flow_name="say_hi", project_name="databricks")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)

flow2.register("databricks")
subflow:
from prefect import task, Flow
import prefect
import datetime 

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
    return

with Flow("say_hi") as flow:
    rval_say_hi = say_hi()

flow.register("databricks")
m

Myles Steinhauser

03/25/2022, 6:46 PM
This is wild to me! But, so helpful to have a confirmed working example. Going to take almost the same code and modify it for my ECS runs.
k

Kevin Kho

03/25/2022, 6:47 PM
I really can’t tell why yours is failing 🤨….could you give me the full script for the failing one? You can remove sensitive stuff like AWS stuff and DM to it to me
m

Myles Steinhauser

03/25/2022, 6:51 PM
Yep, testing a very lightly modified version of your script now.
k

Kevin Kho

03/25/2022, 6:52 PM
Do you get more logs doing
flow.run()
also? Or were you using
flow.run()
to test it? I’m trying to see which specific call failed
m

Myles Steinhauser

03/25/2022, 6:53 PM
I’m using the
prefect register --project myles -p parent-flow.py
and
prefect run --project myles --name parent-flow --watch
cycle
this failed!
same error
I’ll DM you the code files
k

Kevin Kho

03/25/2022, 6:53 PM
ok
Talked to Myles: summary is this seems to fail when he uses ECS RunConfig. There might be a versioning issue with registration using 3.10 and the ECS container using 3.9. Unsure at the moment and finding it hard to figure it out. I need to test on ECS
:upvote: 1
m

Myles Steinhauser

03/25/2022, 8:07 PM
Further investigation revealed the issue occurs when local environment is running Python 3.10.2 while the ECS agent was running Python 3.9.11. Once the local dev environment was downgraded to Python 3.9.11, all calls function as expected! This was most critical due to relying on
prefect register
on the local system to upload to an S3 bucket for Agent execution.
View count: 12