https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Chris L.

    07/29/2021, 2:10 AM
    Hello Prefect community! Just a quick question: is there a simple convenience function to check if a flow has been successfully registered? (e.g.
    flow.is_registered()
    ). Otherwise, is there a gist for using
    prefect.client
    and working with the graphql response? Thanks in advance
    k
    • 2
    • 6
  • a

    Arran

    07/29/2021, 8:20 AM
    Quick question. Is there a simple way to run a flow via an API endpoint rather than scheduled within prefect?
    b
    k
    • 3
    • 3
  • s

    Samuel Tober

    07/29/2021, 12:07 PM
    Hi! I am trying to call a task inside a for-loop, but keep getting the following error:
    TypeError: Task is not iterable.
    y
    b
    k
    • 4
    • 9
  • s

    Samuel Tober

    07/29/2021, 12:08 PM
    What is the recommended way to go about this?
  • d

    Didier Marin

    07/29/2021, 1:28 PM
    Hi ! As anyone tried to validate the parameters of a flow using a Pydandic model or dataclasses ?
    k
    z
    • 3
    • 5
  • c

    Christian

    07/29/2021, 1:40 PM
    Hi all 👋 I was wondering if there is something like a Mattermost notification task? I figured as there is a slack notifier something for Mattermost might be not so far off? Cheers, C
    k
    m
    • 3
    • 3
  • l

    Leon Kozlowski

    07/29/2021, 2:55 PM
    Does anyone have an example of rendering a matplotlib graph with the artifacts api?
    k
    • 2
    • 3
  • b

    Brad I

    07/29/2021, 3:13 PM
    We’re using the k8s agent with dask executor on GKE with workload identities. Does anyone know if it is still required to set the
    GCP_CREDENTIALS
    secret in Prefect Cloud? Or will it just use the service account tied to the namespace? The latter would be nice to avoid creating any long lived service account keys.
    k
    • 2
    • 7
  • r

    Rutvik Patel

    07/29/2021, 3:24 PM
    Hey folks, I'm trying to do following but can't figure it out. Any kind of pointers will be appreciated. I want to build a pipeline where it loads a file larger than RAM my local machine have. I can do that with Dask alone, but not sure how to do that in prefect task. In other words, is it possible that the first task of a flow load a chunk of dataframe and once it processed, it will load next chunk and so on? Thanks.
    k
    k
    • 3
    • 4
  • h

    Hilary Roberts

    07/29/2021, 3:48 PM
    Why are our flow limited to a concurrency of 2 tasks? We are using prefect cloud with a Kubernetes Agent running 2 workers. For some reason it only runs 2 tasks concurrently. I have tried setting the executor to DaskExecutor() and LocalDaskExecutor(). The behaviour stays the same The tasks I am running in this example are just waiting for 10 seconds, so resource constraints can’t really be the issue. I assume it’s limiting it to 1 concurrent task per Kubernetes worker node, but haven’t been able to confirm that. Anyone know any possible reasons for this?
    ✅ 1
    k
    • 2
    • 10
  • p

    Pedro Machado

    07/29/2021, 4:02 PM
    Hey there. We'd like to preserve all the prefect logs as files in a file system. How can we get all prefect logs to be written to files as well? Thanks!
    k
    • 2
    • 4
  • d

    Dan Zhao

    07/29/2021, 4:12 PM
    Hi all, a stupid question. I was deploying a Prefect Server on a remote machine - are there any settings needed to be able to view the UI from the local machine?
    k
    k
    • 3
    • 4
  • a

    Anze Kravanja

    07/29/2021, 4:42 PM
    Hi all, I have a quick question about state handlers. I’ve created one for the use case if the flow ends up in a failed state, I want to grab all the error details from underlying tasks and send an email. What I’m doing when testing locally is:
    for t in flow.tasks:
            tr = state.result.get(t, None)
            if not tr: continue
            <http://flow.logger.info|flow.logger.info>(f"Task name '{t.name}' -> State '{tr.message}' -> Is failed: {tr.is_failed()}")
            if tr.is_failed():
                err_params['tasks'].append(OrderedDict({
                    'taskName': t.name,
                    'errorMessage': tr.message,
                    'errorParams': tr.result if isinstance(tr.result, dict) else str(tr.result)
                }))
    Basically just going through all the tasks and checking if they are is_failed is true, if so I am grabbing some info. This all works as intended locally, but when I package my flows in a docker and run with docker agent, it turns out state.result is an empty dictionary. While previously locally, I found each tasks result there. I’ve played with GCSResult and just leaving it to default but in both cases while running in docker the state.result={}. Any ideas what I might be doing wrong?
    k
    b
    • 3
    • 23
  • k

    Kyle McChesney

    07/29/2021, 4:58 PM
    I am trying to run a flow via an ECS agent. The agent is up and running, but when I trigger a flow I get the following error:
    Parameter validation failed:
    Unknown parameter in input: "null", must be one of: capacityProviderStrategy, cluster, count, enableECSManagedTags, enableExecuteCommand, group, launchType, networkConfiguration, overrides, placementConstraints, placementStrategy, platformVersion, propagateTags, referenceId, startedBy, tags, taskDefinition
    This is coming from the boto3 call triggered here: https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/agent/ecs/agent.py#L320 My agent is being started like so (with prefecthq/prefect:0.14.13-python3.8, maybe this is old?):
    prefect agent ecs start --run-task-kwargs s3://$bucket/run_task_kwargs.yml -a <https://my-api:4200/graphql>
    My run task kwargs yaml has
    cluster
    ,
    launchType
    and
    networkConfiguration
    . I also attempted to provide values in the run config via the UI, but it did not seem to change much (added CPU, memory, task and exec role). Any help is much appreciated!
    k
    • 2
    • 24
  • c

    chicago-joe

    07/29/2021, 5:25 PM
    hey all, quick question, is it possible to configure task output names for tasks with nout > 1 ?
    k
    • 2
    • 6
  • t

    Tim Enders

    07/29/2021, 5:45 PM
    How do I deal with the following error?
    [2021-07-29 12:41:09-0500] INFO - prefect.LocalDaskExecutor | Attempting to interrupt and cancel all running tasks...
    k
    • 2
    • 23
  • p

    Philip MacMenamin

    07/29/2021, 6:02 PM
    question about memory management - I have a flow along the lines of:
    with Flow() as f:
       big_obj = gen_obj1()
       big_obj2 = gen_obj2(big_obj) #never need to use big_obj again, want to reclaim memory
       big_obj3 = gen_obj3(big_obj2)    # finished with big_obj2
       ...etc
    what's the canonical way of freeing up these objects within the flow, once they've been consumed and are no longer needed?
    k
    • 2
    • 4
  • y

    YD

    07/29/2021, 6:48 PM
    Scheduling issue... Running basic task
    hello_world.py
    the Prefect.io server runs on a VM on AWS, with python 3.6 with latest prefect python package agent is running: agent LocalAgent LAST QUERY 11:19am | 1 seconds ago CORE VERSION 0.15.3 when running
    hello_world.py
    using python3.6, the schedule does not work when running using python3.8, the schedule works, but I get errors Why does the schedule does not work with the python3.6 ? (It used to work.. not sure what changed) I tried pip install prefect --upgrade (both on laptop and VM) killed the old agent (on lap top) prefect backend server (on laptop) prefect agent local start --label "<label>" (on laptop) this did not help
    k
    • 2
    • 35
  • l

    Leon Kozlowski

    07/29/2021, 7:20 PM
    What is the typical prefect version upgrade strategy for a cloud KubernetsAgent?
    k
    • 2
    • 2
  • m

    Michael Warnock

    07/29/2021, 7:37 PM
    In the functional flow context, how does one use
    .set_upstream
    to create a relationship between two tasks, where the downstream task doesn't care about the upstream's (in-band) results? The docs say it's possible, but I end up with two copies of the downstream task when doing what seems like the obvious.
    k
    • 2
    • 2
  • b

    Ben Muller

    07/29/2021, 10:09 PM
    Hey guys, I know there is a Task retry logic that you can add ( on a failure ) but is there a Flow level retry?
    k
    a
    • 3
    • 11
  • h

    Hugo Shi

    07/29/2021, 10:46 PM
    Hi, I had a question about script and module based storage - under what circumstances does a flow need to be re-registered with script/module based storage when the underlying code changes? Is it only when the structure of the Dag changes?
    k
    • 2
    • 1
  • y

    YD

    07/29/2021, 11:24 PM
    Question about retries If I have a sequence of independent tasks, is it possible run them, and then rerun only those that failed ? but without waiting for the task that failed to rerun. only at the end of all the tasks get back to those that failed ?
    k
    • 2
    • 14
  • c

    Chris L.

    07/30/2021, 7:42 AM
    Hello, I am just testing a Docker recipe which contains the docker command
    RUN prefect auth login --key ${PREFECT__CLOUD__API_KEY}
    where `PREFECT__CLOUD__API_KEY`` is an ARG. But I'm getting this error:
    #13 1.047 Your API key is set in the Prefect config instead of with the CLI. To log in with the CLI, remove the config key `prefect.cloud.api_key`
    k
    • 2
    • 15
  • j

    jake lee

    07/30/2021, 9:02 AM
    HI! I’m trying to convert existing ETL pipeline into prefect so I’m using prefect cloud as a server and using one of my ec2 server as an agent. On the ec2 agent I registered the existing job as flow and task and ran flow.register() to register on my server on the etl task, I do have a custom class that I import from local path and it seems like when i run the task from prefect ui, it can’t refer from the local python class. Following is the error message
    Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'common\'",)\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.',)
    Again thank you so much!
    k
    • 2
    • 6
  • d

    Daniel Bast

    07/30/2021, 2:03 PM
    Hi, is there a way to use the github flow storage backend not only to store and retrieve the flow to run, but also some common code shared between flows, like a file commons.py? ... if I deploy the commons.py on agent side, it can be imported, but that restricts me to one version, whereas the flows can be always the newest version from a git branch. thx
    k
    c
    • 3
    • 19
  • a

    Arran

    07/30/2021, 3:07 PM
    hey just wondering if there is a way to complete a complete chain of mapped tasks before moving on to the next iteration?
    files = [1, 2, 3]
        data = extract_data.map(files)
        upload = upload.map(data)
    I would like to transform the above code so that file 1 will be extracted and uploaded before moving on to file 2. I know i could wrap this functionality in to one function but i would prefer to keep them separate
    k
    • 2
    • 11
  • k

    Kyle McChesney

    07/30/2021, 4:07 PM
    Does anyone have a working example of using
    BatchSubmit
    and
    AWSClientWait
    in order to submit a job and wait for it?
    with Flow('flow'):
    
        batch_input = Parameter('batch_input')
    
        job_res = BatchSubmit(
            job_name='job_name',
            job_definition='job-def',
            job_queue='job-queue',
        ).run(
            batch_kwargs={
                'parameters': {
                    'batch_input': batch_input,
                },
            },
        )
    
        wait_res = AWSClientWait(
            client='batch',
            waiter_name='JobComplete',
        ).run(
            waiter_kwargs={
                'jobs': [job_res]
            }
        )
    This is what I have. Feels a bit weird to call
    .run
    directly on the task (I am new at this and have mostly just run
    @task
    annotated functions). Additionally, boto is complaining that the
    batch_input
    values need to be strings:
    type: <class 'prefect.core.parameter.Parameter'>, valid types: <class 'str'>
    k
    t
    • 3
    • 13
  • r

    Robert Hales

    07/30/2021, 4:17 PM
    Hi there, try to get a flow that looks like this to run, currently
    test_reduce
    gets skipped but I would like for it to receive
    [1, 2, 4, 5]
    k
    • 2
    • 6
  • h

    Hugo Kitano

    07/30/2021, 5:59 PM
    Hi, can I get an explanation for what causes this versioning error? Where are the two environments mentioned here?
    Failed to load and execute Flow's environment: FlowStorageError("An error occurred while unpickling the flow:\n  TypeError('an integer is required (got type bytes)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n  - python: (flow built with '3.7.5', currently running with '3.8.11')")
    k
    m
    • 3
    • 10
Powered by Linen
Title
h

Hugo Kitano

07/30/2021, 5:59 PM
Hi, can I get an explanation for what causes this versioning error? Where are the two environments mentioned here?
Failed to load and execute Flow's environment: FlowStorageError("An error occurred while unpickling the flow:\n  TypeError('an integer is required (got type bytes)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n  - python: (flow built with '3.7.5', currently running with '3.8.11')")
k

Kevin Kho

07/30/2021, 6:02 PM
Hey @Hugo Kitano, one is the version your flow was built in, and the other is the Python version of the agent. These need to be the same for the most part, especially Python versions as that changes serialization. You will run into errors if you serialize Python 3.8 and deserialize 3.7 for example
h

Hugo Kitano

07/30/2021, 6:04 PM
so, by “flow built in”, do you mean the python that’s used when I call
prefect register -p path_to_flow.py
?
k

Kevin Kho

07/30/2021, 6:05 PM
Yes exactly
👍 1
m

Matan Drory

07/30/2021, 6:30 PM
I have a follow up. What is actually being registered, the .py files? If so since the task image can be different then the agent / server image are there any optimizations for the compiled version of the code in the task?
k

Kevin Kho

07/30/2021, 6:48 PM
The metadata is being registered, including the location of the flow. If you used script-based storage (S3, Github), it will be the location to the script. If you use pickle-based storage, the Flow is serialized and stored somewhere, then loaded and unpickled by Prefect. Not sure what you mean with the optimizations?
m

Matan Drory

07/30/2021, 6:50 PM
Sorry, I meant the compiled bytecode (
.pyc
)
k

Kevin Kho

07/30/2021, 6:52 PM
Oh pretty sure there are no optimizations around that if that’s what you’re asking? When you say the flow image and agent image are different, if your agent running in a container?
m

Matan Drory

07/30/2021, 7:45 PM
So from what I understood when deploying on ECS there is a container for the agent and one for the flow (I was confused and though it’s per task, which makes sense why a pyc optimization will be useless).
k

Kevin Kho

07/30/2021, 7:54 PM
Oh I see what you are saying. ECS task means the Prefect flow run while Prefect’s flow is comprised of Prefect tasks so there are two definitions going around. In this situation, I would say the versions for registration should ideally match both agent version and ECS flow version because having them different has caused issues where ECS version doesn’t have features that Prefect added in later versions. I don’t think the
.pyc
is used anywhere yep.
m

Matan Drory

07/30/2021, 8:00 PM
Thanks a lot! Fun to work with a tool that has such an active community!
👍 1
View count: 1