https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Saranya Elumalai

    09/01/2020, 2:00 PM
    Good Morning Everyone, I wanted to run tasks in parallel using local Dask cluster (https://docs.prefect.io/core/idioms/parallel.html) but the tasks are executed sequentially. Why is it so? Here is my code snippet
    start = Parameter("start")
    start.set_downstream(s3_connection, flow=flow)
    s3_connection.set_downstream(lin_of_bus, flow=flow)
    s3_connection.set_downstream(company,  flow=flow)
    ....
    end_task = final_task()
    end_task.set_upstream(lin_of_bus,flow=flow)
    end_task.set_upstream(company,  flow=flow)
    flow.run(parameters={"start": "hello"}, executor=DaskExecutor())
    Below schematic diagram shows the tasks are executed sequentially and not parallel
    j
    • 2
    • 28
  • p

    Philip Bennett

    09/01/2020, 2:04 PM
    Hey all. I'm having problems where I set the environment variables using --env but I can't seem to access them from within the flow running on a Kubernetes agent I'm running my agent using  
    prefect agent install kubernetes --api "<http://10.154.15.199:4200>" --rbac --resource-manager --env DB_HOST=10.104.145.11 --env CRUNCHBASE_API_KEY=XYZ | kubectl apply -f -
    I try to access them using
    os.getenv()
    but cannot access them. Has anyone had similar issues?
    c
    m
    • 3
    • 13
  • a

    as

    09/01/2020, 2:37 PM
    Hey I'm using the caching functionality by specifying a
    target
    file as a task parameter. So if this target file is present it should skip the execution of this task. I want to choose to programmatically (eg. through a parameter) force execution of this task (and thus ignoring/overwriting the target file). I first thought using the
    cache_validator
    task parameter would be the way to go, but I see in the documentation that it is deprecated. Is there a recommended way to achieve this behavior? Thanks
    c
    • 2
    • 5
  • s

    Saranya Elumalai

    09/01/2020, 7:54 PM
    I have retry for my task as follows
    @task(max_retries=3, retry_delay=timedelta(seconds=5))
    def write_delta_lookup_table():
    ......
    My image build failed for the reason
    ValueError: Task <Task: write_delta_lookup_table> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
    Why the task retry require result types. I am not passing values from one task to another. Also https://docs.prefect.io/core/concepts/tasks.html#overview retry example doesnt show the use of result type. What am I missing here. ?? (Any documentation with simple retry usecase will be very helpful)
    c
    a
    • 3
    • 12
  • a

    asm

    09/01/2020, 10:58 PM
    howdy, is there a way to grab the Dask worker that a particular task ran on from the graphql api?
    j
    • 2
    • 1
  • e

    Eric

    09/02/2020, 12:42 AM
    Hi team, I have another question here. I have a flow with 3 tasks (A->B->C) and task B is to sleep 60 sec for example. When the flow-run is started to B, I set the flow-run state to PAUSE via (
    client.set_flow_run_state(flow_run_id="<flow_run_id>", state=PAUSE)
    ). The flow-run state on UI is set to PAUSE and the task B continues to execute until the end. But once the task B is finished, I use
    client.set_flow_run_state
    to set this flow-run back to running (or resume). Task C will get this error (I throw it to thread) and the whole flow-run is still running (Task C is pending), it seems like an abnormal state because this flow-run will keep running but do nothing. May I ask if this is normal? Thanks 🙂
    c
    • 2
    • 3
  • j

    josh

    09/02/2020, 12:23 PM
    Hey team, Prefect version 
    0.13.5
     has been released and here are a few notable changes: 🚸  Introduce better monitoring of mapped children ♻️ K8s agent manages Prefect job lifecycles 👩‍⚕️ Improved automatic health checks for common user hiccups A big thank you to our contributors who helped out with this release! Full changelog:
    Untitled
    🚀 9
    :prefect: 6
    :marvin: 6
    a
    • 2
    • 2
  • l

    Luis Muniz

    09/02/2020, 1:40 PM
    Hi Prefect Community! I think I saw in one of my deep dives an example of a ResourceManager that manages an EMR cluster to submit spark jobs. I have been searching for a good 2 hours now and can always only find managing Dask clusters via a resource manager. Does anyone have a pointer? Very much appreciated. 🙂
    j
    b
    • 3
    • 6
  • m

    Matias Godoy

    09/02/2020, 1:50 PM
    Hi guys! I'm having some troubles with a flow that was working well until yesterday: I added an import that I had forgotten, and registered the flow again. I'm using Prefect Cloud and I have a couple of agents in an EC2 instance. Now when I try tu run the flow I get a failed run with the message:
    Last State Message
    [2 Sep 2020 3:24pm]: Failed to load and execute Flow's environment: FileNotFoundError(2, 'No such file or directory')
    j
    • 2
    • 73
  • m

    Max Ernst

    09/02/2020, 2:00 PM
    Hello I'm new to prefect and started using it today its totally awesome piece but I have a problem with it. I have a script that is importing custom python modules from a few folders, when started as a "new" flow it runs fine but when i want to rerun it over the web ui its shows import errors for the custom imports. How do I get the folders into the environment ? Thanks upfront for any help.
    ✅ 1
    j
    • 2
    • 4
  • s

    Shaun Cutts

    09/02/2020, 4:07 PM
    Thanks to help from the prefect team :hattip:, I have a prefect server running in Kubernetes with the experimental helm chart. I also have dask gateway running. To connect up the bits, I need to get the prefect agent running in the cluster, and submitting jobs to dask gateway. I can get the kubernetes agent running locally against the (port-forwarded) server in K8. But: • What config do I need to point it at the cluster FQDN of the prefect server, when running in a container? • It would seem that the kubernetes agent might not be what I want anyway — as I want to submit flows to the dask gateway, not as K8 jobs. Or perhaps I can configure the agent container environment as specified here? https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html#next-steps ? • I’m a bit unsure about the relation between the agent and the server. Is there any doc explaining? Is the server purely passive, and the agent is the active component? (Then what is
    towel
    doing?) Some explanation as to how the pieces interact (and/or how I should best go about getting them to interact in my scenario) would be great. Thanks!
    👏 3
    j
    j
    c
    • 4
    • 29
  • d

    DJ Erraballi

    09/02/2020, 4:28 PM
    Hi guys! excited to join the prefect community. I have been playing around with the Fargate executor, is there a common design pattern to cache docker images near to the fargate executor? Perhaps some kind of EFS storage tied to the executor so that the docker pull doesn’t inject 1-2 minutes of latency on each flow run?
  • d

    DJ Erraballi

    09/02/2020, 4:50 PM
    ah nvm
  • d

    DJ Erraballi

    09/02/2020, 4:51 PM
    looks like ECS fargate doesn’t support image caching yet
  • h

    Hawkar Mahmod

    09/02/2020, 5:09 PM
    I'm reaching the point of frustration with what you should be a simple flow. I would like to be able to loop through an API until I get a result that has no data in it. However Prefect's mix of build time and run time have left me very confused about what I can and can't do.
    with Flow('event-export', schedule) as flow:
    
        api_key = EnvVarSecret('SOME_API_KEY')
    
        offset = get_last_offset()
        event_data = extract_email_event_data(api_key, offset)
    
        transformed_data = transform(event_data)
        send_to_firehose(transformed_data)
        update_last_offset(offset=offset+100)
    I would like to be able to call the extract task for as long as necessary, incrementing the offset on each occasion, and I would like to transform and send each extracted set of events in one swoop. However I can't seem to figure out how to put this into a while loop with the result of the extract task to be my end condition. I.e if no events are left then stop the flow. I've already tried
    LOOP
    but this is no use to me because I don't want to wait until I get all the results back which could take hours.
    c
    s
    • 3
    • 7
  • d

    David Elliott

    09/02/2020, 7:41 PM
    Hey all! It’s been a month or so since I last worked with my prefect project and you’ve come a long way since I was last using it, v0.13 looks great! I’ve come across a docker build issue with the later versions though - unsure if it’s something I’m doing or if I’ve found a bug? I’m getting the following error on the healthcheck, right at the end of the docker build:
    Step 9/9 : RUN python /opt/prefect/healthcheck.py '["/opt/prefect/flows/error-test.prefect"]' '(3, 7)'
     ---> Running in 13eb44f7f5db
    Beginning health checks...
    System Version check: OK
    Traceback (most recent call last):
      File "/opt/prefect/healthcheck.py", line 121, in <module>
        flows = cloudpickle_deserialization_check(flow_file_path)
      File "/opt/prefect/healthcheck.py", line 43, in cloudpickle_deserialization_check
        flows.append(cloudpickle.load(f))
      File "/usr/local/lib/python3.7/site-packages/prefect/core/edge.py", line 149, in __hash__
        self.flattened,
    AttributeError: 'Edge' object has no attribute 'flattened'
    I’ve simplified my original setup right down to the attached script to try and trace the issue - I’m getting the error on version 0.12.6 onwards - might be related to this change added in v0.12.6? v0.12.5 builds ok. Hoping someone could let me know if it’s something wrong in my code or if they’re able to reproduce? Locally I’m running python 3.7 & prefect 0.13.5 (though this shouldn’t matter as it’s building the docker container with the specified base image) Thanks!
    docker_build_error_example.py
    c
    • 2
    • 9
  • j

    Jeff Brainerd

    09/03/2020, 1:59 AM
    Hi 😛refect: team, we are seeing the following API client error in our flow state handler code:
    400 Client Error: Bad Request for url: <https://api.prefect.io/graphql>
    This error began last night (~9PM eastern) in code that has been working fine for weeks. We are using Prefect Cloud and Prefect Core 0.13.2 (with a 0.12.x Fargate agent IIRC). Wondering if this is related to the recent upgrade... didn’t see any breaking changes in the release notes that look suspicious. Thanks 🙏
    j
    • 2
    • 1
  • c

    Chris Goddard

    09/03/2020, 2:41 AM
    hey folks - is there a way to mark certain flows so that if they late they don't get resubmitted by lazarus? say, for instance, there are two other flows already running when a third is schedule. if it's a task that runs frequently you might just want it to be skipped and run the next scheduled time rather than being rescheduled - if that makes sense?
    c
    • 2
    • 6
  • p

    psimakis

    09/03/2020, 7:46 AM
    Hello everyone, After upgrading from
    0.12.2
    to
    0.13.5
    a connectivity issue came up with the graphql component. Prefect server is running in a different server but the graphql url remains
    <http://localhost:4200/graphql>
    .
    server.ui.graphql_url
    was working great with version
    0.12.2
    but now I can't find any way to configure the graphql url properly. Below you will find the `config.toml`:
    $ cat ~/.prefect/config.toml
    [logging]
    level = "INFO"
    
    [api]
    url = "<http://192.168.40.180:4200>"
    
    [server.database]
    host_port = "6543"
    
    [context.secrets]
    SLACK_WEBHOOK_URL = '<https://hooks.slack.com/services/xx/XX/Xx>'
    
    [server.ui]
    graphql_url = "<http://192.168.40.180:4200/graphql>"
    In the image you can see a POC of the case. I'm a lit bit confused about the old and the new way to configure the prefect server. Have you any idea about this issue? If this issue has been already reported, feel free to delete this message. Thanks in advance.
    p
    j
    • 3
    • 6
  • j

    Jacob Blanco

    09/03/2020, 7:55 AM
    Couldn't figure out the name for this, but is there any way to assign a label to mapped tasks to identify each element in the map? Task[1], Task[2] is kinda hard to manage.
    j
    • 2
    • 1
  • a

    ARun

    09/03/2020, 12:34 PM
    Hey guys, I'm just starting using prefect cloud. I have the prefect agent up and running registered with the cloud. I can see the agent alive on the prefect cloud gui as well. However, when I try to register my first flow, I get the error
    ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
    Is there another token I need to configure?
    j
    • 2
    • 8
  • t

    Thomas Hoeck

    09/03/2020, 1:14 PM
    Hi all. How do you handle logging if you want to be able to run a task separatly for e.g. unit-test:
    from prefect import task
    import prefect
    @task
    def extract():
    logger = prefect.context.get("logger")
    logger.debug("Executing")
    return [1,2,3]
    extract.run()
    as this yields
    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    <ipython-input-25-9ca06496a242> in <module>
         11 
         12 
    ---> 13 extract.run()
    
    <ipython-input-25-9ca06496a242> in extract()
          7 def extract():
          8     logger = prefect.context.get("logger")
    ----> 9     logger.debug("Executing")
         10     return [1,2,3]
         11 
    
    AttributeError: 'NoneType' object has no attribute 'debug'
    d
    i
    • 3
    • 17
  • l

    Lukas N.

    09/03/2020, 2:22 PM
    Hi everyone 👋, I've recently discovered Results which seem like a great fit for one of my task. Basically, I've got a task
    copy_data_to_s3
    which takes data (after expensive computation) on input copies them to s3 and outputs the s3 path (not the data!) where they were stored. Other downstream tasks work with that s3 path. Now if some of the downstream tasks fail, I need to re-run the entire flow, whereas with S3Result I could just rerun the part after s3 data copy. Now the tricky part is I want to output the path on s3 and not the actual the data which is what S3Result does by default, seems like it should be easy modification but I can't come up with a solution. Anyone able to help?
    👀 1
    m
    d
    m
    • 4
    • 10
  • j

    Jeff Brainerd

    09/03/2020, 6:41 PM
    Hello Prefect team, a question about retrying tasks conditionally. In our task state handler we look at the failure message every time a task fails and we want to retry on some of those errors. Per this doc entry: https://docs.prefect.io/core/concepts/execution.html#state-signals we perform a
    raise signals.RETRY
    in those cases. Problem is this does not appear to work in the task handler. This code gets called: https://github.com/PrefectHQ/prefect/blob/5b7015a75ea60bc5a8ffb676b3be2ff406103583/src/prefect/engine/cloud/task_runner.py#L96-L105 and the task ultimately is failed, not retried. So should our code be something more like this:
    return Retrying(msg=msg, context=new_state.context)
    or should this in fact work? Thanks!
    d
    m
    • 3
    • 9
  • g

    Gabriel Montañola

    09/03/2020, 7:08 PM
    Hi folks, how you doing? I hope you're doing good :) I'm experimenting with both Prefect and Airflow. My POC is pretty simple: 1. Query production database (MySQL) and store results; 2. Create a
    .csv
    with query results; 3. GZIP the
    .csv
    and upload it to S3; 4. Dump the uploaded file content into a Redshift table (using
    COPY
    with the correct settings) So I have a few questions: • A noticed that there is a 😛refect: S3 task for uploading, but I guess I can't pass gzipped content to it, right? So I built a custom Task that handles the gzipping + uploading for me. Am I using the S3Upload Task wrong or for my purposes this is what I should be doing? • Is there a way to generate dynamic tasks/flows? Example: with Airflow I made both a DAG and a Task factory using a
    .yml
    file as reference. My tasks are pretty repetitive and I only need to change some fields (source table name, destination schema/table, schedule time and incremental key). How could I achieve this with Prefect? Could you point me some directions or a documentation link? Thank you very much and congratulations for the awesome work.
    d
    • 2
    • 13
  • m

    Marwan Sarieddine

    09/03/2020, 7:42 PM
    Hi folks - is there an easy way to make a flow pickleable by pickle not cloudpickle ? the reason I am asking is I'd like to use a prefect flow in tensorflow training and tensorflow relies on python's multiprocessing library which uses vanilla pickle
    d
    c
    m
    • 4
    • 9
  • m

    Matt Wong-Kemp

    09/03/2020, 11:02 PM
    Before I start down a blind alley, how does serialization/Results interact with Resources? Do my resources need to be pickle-able?
    j
    • 2
    • 4
  • j

    Jackson Maxfield Brown

    09/04/2020, 12:02 AM
    Weird question is there are way to attach a result object to a task during flow construction? The idea being I have a flow that has a parameter of
    local
    or
    remote
    and so I want tasks to store results in
    LocalResult
    or
    S3Result
    accordingly:
    python run_my_flow.py # tasks are created with LocalResult (default)
    python run_my_flow.py --remote  # tasks are created with S3Result
    Weird pseudo code:
    with Flow("my_flow") as flow:
        result_target = Parameter("result_target", default="local")
    
        if result_target == "local":
            result_target_for_task_a = LocalResult("a_results/")
            result_target_for_task_b = LocalResult("b_results/")
        else result_target == "remote":
            result_target_for_task_a = S3Result("<s3://hello-world/a_results/>")
            result_target_for_task_b = S3Result("<s3://hello-world/b_results/>")
    
        items = task_a(result_handler=result_target_for_task_a)
        task_b(result_handler=result_target_for_task_b)
    c
    • 2
    • 5
  • b

    Brennan Chapman

    09/04/2020, 12:35 AM
    hey all, been trying out Prefect this week, and its very impressive! Working on setting up Prefect with Dask currently, and wondering how I would expose a parameter in Prefect that I could use to set the task priority for Dask. Any ideas or guidance? Thanks so much!
    c
    • 2
    • 1
  • s

    Sven Teresniak

    09/04/2020, 8:17 AM
    v0.13.5: The UI is not working because UI (or my browser?) cannot connect to http://localhost:4200/graphql - which is corret because apollo is running in a container in k8s. I have a service with an open port 4200 for apollo. So I set
    PREFECT_SERVER__GRAPHQL_URL
    to the FQDN of my service with port 4200 and everything. With that ENV the UI worked perfect with v0.12.5. But the new Prefect is completely ignoring that ENV.
    n
    • 2
    • 3
Powered by Linen
Title
s

Sven Teresniak

09/04/2020, 8:17 AM
v0.13.5: The UI is not working because UI (or my browser?) cannot connect to http://localhost:4200/graphql - which is corret because apollo is running in a container in k8s. I have a service with an open port 4200 for apollo. So I set
PREFECT_SERVER__GRAPHQL_URL
to the FQDN of my service with port 4200 and everything. With that ENV the UI worked perfect with v0.12.5. But the new Prefect is completely ignoring that ENV.
When I see the home screen in the UI I can put in my graphql URL and the UI (my browser) will perfectly connect to apollo. The UI is working. But how can I set this URL permanently (using ENV for example?)
n

nicholas

09/04/2020, 1:44 PM
Hi @Sven Teresniak - this was a breaking change with the new version of Server/UI. We have a ticket open to allow this to be set permanently, which also details why that’s been changed. One option you have to change it permanently is to run the UI in development mode and change the
.env
file to match your setup.
s

Sven Teresniak

09/04/2020, 1:46 PM
Thanks!
👍 1
View count: 1