https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Bishwarup B

    09/12/2021, 12:20 PM
    hi, i have a huge list of latitude and longitudes (over 1MM) and I am trying to download images corresponding to each of these lat-lons and do a bunch of downstream processing on them. I am using a
    map
    to parallelise over the lat-lon pairs but i have a couple of questions here: 1. I know if i use a
    DaskExecutor
    or
    LocalDaskExecutor
    the flow is distributed, but is there any limit to applying
    map
    over such a large collection? 2. instead of using threads to run the computation, is it possible to make use of
    async
    as the tasks (most of them) are heavily IO bound? What are some considerations I should make here? Thanks!
    e
    k
    • 3
    • 5
  • o

    Oliver Mannion

    09/12/2021, 12:28 PM
    Hiya, IIUC agents poll the server every 10 seconds. Are there any plans to decrease this latency and enable flow runs to start immediately once scheduled?
    k
    • 2
    • 8
  • n

    Nadav Nuni

    09/12/2021, 2:32 PM
    hey guys, I’m facing a kind of an obscure error using running a flow with GitHub storage…it seems like I’m downloading the files successfully, but then the flow fails for not finding a certain file…is there a way to produce more verbose logs so I can know exactly what file it fails on?
    z
    • 2
    • 6
  • j

    Jacob Blanco

    09/13/2021, 2:57 AM
    When looking at the Cloud dashboard we don’t see all the currently running job and it makes it kind hard to monitor what’s going on. I think this relates to long running jobs that are interspersed with shorter jobs. The start of the job gets pushed off the side of the timeline.
    k
    • 2
    • 3
  • p

    Prateek Gupta

    09/13/2021, 5:39 AM
    Can I get some resources for ML deployment?
  • p

    Prateek Gupta

    09/13/2021, 5:39 AM
    It is batch prediction
  • p

    Prateek Gupta

    09/13/2021, 5:41 AM
    One thing my team mate claimed is that task name had issues when making multiple commits
  • p

    Prateek Gupta

    09/13/2021, 5:43 AM
    So ended up renaming the flow name and version
    k
    • 2
    • 11
  • a

    Anh Nguyen

    09/13/2021, 7:38 AM
    Hi everybody! I created docker file to build image for etl flow(prefect). That only run 1 time. I wanna to run schedule and image status not exited
    k
    • 2
    • 1
  • h

    haf

    09/13/2021, 12:23 PM
    Can I return a python list from a Task implementation? I'm getting `TypeError: Task is not iterable. If your task returns multiple results, pass
    nout
    to the task decorator/constructor, or provide a
    Tuple
    return-type annotation to your task.` but I can't use
    nout
    because I don't know the result size and it is not a good fit for Tuple (since it's a list of tuple)
    e
    • 2
    • 25
  • h

    haf

    09/13/2021, 1:01 PM
    It would seem dask.distributed has had a few releases https://github.com/dask/distributed/releases since March 4th when https://github.com/dask/distributed/pull/4460 was merged for Python 3.9 support? Time to support in Prefect? (https://github.com/PrefectHQ/prefect/issues/3453)
    m
    • 2
    • 1
  • s

    Sam Cook

    09/13/2021, 2:08 PM
    Does anyone know if there is a simple function call or graphql mutation to update a flow's README? I was hoping there was a field I could pass a README (as a string) through in the registration process, but it doesn't look like it.
    n
    • 2
    • 1
  • b

    Brian Phillips

    09/13/2021, 7:01 PM
    Does S3Result use the default
    AWS_CREDENTIALS
    secret? I'm having trouble setting credentials so that tasks are able to write with S3Result. This pattern does not seem to work either
    aws_credentials = PrefectSecret('AWS_CREDENTIALS')
    with prefect.context(secrets={'AWS_CREDENTIALS': aws_credentials}):
        <task>
    k
    • 2
    • 3
  • i

    Ishavpreet Singh

    09/13/2021, 7:06 PM
    HI, I am working on using DBTShellTask as part of my Poc with prefect. I see my Dbt shell Task fails with exit code 2... but apart from this I don't see any details on error.. Any ideas on where can I find additional logs ?
    k
    • 2
    • 3
  • m

    Max Ivanchenko

    09/13/2021, 7:14 PM
    Hi everybody! I'm researching the possibility to use prefect instead of airflow. Everything looks good but has prefect any alternative to airflow variables? (Parameters that can be configured from UI, not before each runs but once when needed)
    k
    • 2
    • 6
  • o

    Owen McMahon

    09/13/2021, 7:47 PM
    Hi! I have a Mapped Task that I have checkpointing setup for (with the
    map_index
    in the filename so it properly writes out each mapped task to an individual result) within a flow running against Prefect Cloud. I just came across a weird scenario where the Flow did run the Mapped Task fully through (100 Mapped Tasks in total), but noticed afterwards that 7 of them had a status of 'Cached'. This caught my eye - as it should not have loaded any of them from the Cache. When I looked closer at the logs of one of the 'Cached' Mapped Tasks - it looks like it finished successfully, and then restarted ~7 mins later and loaded from Cache. It appears that all data is still there as I expected - but behavior seemed a bit odd. Wondering if anyone else has seen this before? Thanks!
    k
    • 2
    • 4
  • b

    Ben Muller

    09/13/2021, 8:10 PM
    Hi Team, do you guys have any examples of an implementation of a "flow builder" Class that someone has built? I have about 10 flows that are identical besides a few minor details. I do not want to use different parameters because I feel like it lacks the proper visibility in the UI and each flow should have its own name. I'm just unsure of how to implement something like this so that registration, running etc of the flow will still work. Can I simply have a class that is invoked with some arguments and then a method which when called runs the flow context manager with the flow inside of it?
    k
    m
    • 3
    • 3
  • k

    Kyle McChesney

    09/13/2021, 8:28 PM
    is there a good way to define a flow with
    set_dependencies
    , but also get its result and pass along to other flows. I.E. mixing and matching imperative vs functional?
    k
    • 2
    • 4
  • k

    Kyle McChesney

    09/13/2021, 8:28 PM
    I have a startup task which I want to run at the start of every flow, which just logs some info, etc
  • k

    Kyle McChesney

    09/13/2021, 8:28 PM
    I have not found a good way to force it to be at the start of the DAG
  • k

    Kyle McChesney

    09/13/2021, 8:29 PM
    I am trying to use
    set_dependencies
    on the actual first tasks, with my startup_task as an upstream
    data = flow.set_dependencies(generate_data(data_csv_url), upstream_tasks=[startup()])
  • k

    Kyle McChesney

    09/13/2021, 8:30 PM
    (data_csv_url is a parameter in this case)
  • k

    Kyle McChesney

    09/13/2021, 8:30 PM
    data =
    should be the result of
    generate_data
  • k

    Kyle McChesney

    09/13/2021, 8:31 PM
    seems like a flow state handler might be the better approach 😐
  • b

    Brian Phillips

    09/13/2021, 8:41 PM
    How can I clean up a DaskExecutor after a flow has finished? I'd like to call
    Cluster.stop()
    k
    • 2
    • 5
  • c

    Constantino Schillebeeckx

    09/13/2021, 9:09 PM
    Is there anyway to get a locally run flow to return a non zero exit code if it fails? Notice in the screenshot shown below, the last exit-code was 0.
    k
    m
    • 3
    • 13
  • c

    Constantino Schillebeeckx

    09/13/2021, 10:14 PM
    Where can I find a list of all API urls which I can access with
    prefect.client.client.Client.get
    ?
    For reference, I'm trying to hit he following without success:
    [2021-01-14 16:00:00.000] ERROR    --- 400 Client Error: Bad Request for url: <https://api.prefect.io/flows?name=dbt_test>
    Traceback (most recent call last):
      File "deploy/register_flows.py", line 491, in <module>
        main()
      File "deploy/register_flows.py", line 485, in main
        create_proj_and_register_flows(flows, args)
      File "deploy/register_flows.py", line 278, in create_proj_and_register_flows
        register_flow(flow, flow_file, args)
      File "deploy/register_flows.py", line 314, in register_flow
        flow_already_registered(flow.name)
      File "deploy/register_flows.py", line 319, in flow_already_registered
        resp = CLIENT.get('flows', params={"name": flow_name})
      File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/prefect/client/client.py", line 406, in get
        response = self._request(
      File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/prefect/client/client.py", line 710, in _request
        response = self._send_request(
      File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/prefect/client/client.py", line 620, in _send_request
        response.raise_for_status()
      File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/requests/models.py", line 943, in raise_for_status
        raise HTTPError(http_error_msg, response=self)
    requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <https://api.prefect.io/flows?name=dbt_test>
    k
    c
    • 3
    • 2
  • b

    Brad

    09/14/2021, 2:55 AM
    Hi team - following on from https://github.com/PrefectHQ/prefect/issues/3619, I'm seeing a looping task exit early with
    Cached
    despite not being complete. I wonder if this is the expected result or a bug. Example in thead
    k
    • 2
    • 7
  • g

    Gaylord Cherencey

    09/14/2021, 5:11 AM
    Hey team dummy question, if I have 3 flows
    A
    ,
    B
    and
    C
    I want flow
    B
    and
    C
    to be triggered each time flow
    A
    is run successfully but I want to define this dependency in flow
    B
    and
    C
    because the "parent" will be owned by another team. Is there a way to do this in Prefect or if we would have to go for an eventing solution?
    k
    • 2
    • 4
  • j

    Jacob Blanco

    09/14/2021, 7:05 AM
    Edit: Figured it out, it should be
    with case(...)
    not
    if case
    . Having a strange issue where when running a flow it warns that a Parameter was declared but not added the Flow, and when I try to run the flow with the parameters established it errors out saying that the parameters are unexpected. This is the structure of the flow:
    with Flow(name="My Flow") as flow:
        if case(Parameter("do_something", default=False), True):
            result_of_thing = run_thing()
        else:
            result_of_thing = run_another_thing()
    from flow_definition import flow
    flow.run(parameters={"do_something": True})
    ✅ 2
Powered by Linen
Title
j

Jacob Blanco

09/14/2021, 7:05 AM
Edit: Figured it out, it should be
with case(...)
not
if case
. Having a strange issue where when running a flow it warns that a Parameter was declared but not added the Flow, and when I try to run the flow with the parameters established it errors out saying that the parameters are unexpected. This is the structure of the flow:
with Flow(name="My Flow") as flow:
    if case(Parameter("do_something", default=False), True):
        result_of_thing = run_thing()
    else:
        result_of_thing = run_another_thing()
from flow_definition import flow
flow.run(parameters={"do_something": True})
✅ 2
View count: 2