https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Ryan Sattler

    08/27/2021, 1:40 AM
    Is there any plan to create a task-library task for running Spark on kubernetes (as opposed to Databricks)?
    n
    g
    k
    • 4
    • 6
  • w

    Wilson Bilkovich

    08/27/2021, 2:23 AM
    Anyone ever seen
    tornado.util.TimeoutError: Operation timed out after None seconds
    before? I’m getting that in my Dask KubeCluster.
    m
    • 2
    • 4
  • j

    Jacob Hayes

    08/27/2021, 4:23 AM
    Is there a way to configure slack alerts from the flow definition (eg: like
    flow.schedule
    )? I've set them up in the UI a few times, but occasionally renamed flows or changed projects and lose failure alerts. 🙈
    a
    • 2
    • 2
  • k

    Konstantin

    08/27/2021, 6:55 AM
    Hi guys. tell me, on my computer .py flow.run is executed without errors. But if I register my_flow.py on the server, then the following error appears in the logs:
    6 August 2021,10:31:49 	agent01	INFO	Submitted for execution: PID: 135
    26 August 2021,10:31:49 	agent03	INFO	Submitted for execution: PID: 140
    26 August 2021,10:31:49 	agent02	INFO	Submitted for execution: PID: 136
    26 August 2021,10:31:49 	execute flow-run	ERROR	Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/user1/'")
    26 August 2021,10:31:49 	execute flow-run	ERROR	Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/user1/'")
    26 August 2021,10:31:49 	execute flow-run	ERROR	Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/user1/'")
    tell me where to look for the error
    n
    w
    k
    • 4
    • 10
  • b

    Bruno Murino

    08/27/2021, 10:07 AM
    Hi everyone — I’m trying to use
    prefect.Client
    to run a graphql query, but I’m not sure how to pass filters to it… I’m using a workaround where I pass the whole query as a string but the curly braces “{” everyone are making passing variables hard, so I’m wondering if there’s a better way
    k
    • 2
    • 1
  • i

    Italo Barros

    08/27/2021, 12:02 PM
    Hi everyone, sometimes I receive the following errors from the CloudTaskRunner (I'm running an "always-on" Local Agent on Windows Server 2019):
    ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
    
    urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)", ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None))
    If the Task or Flow tries to retry the execution the error pops again and only goes away if the Task/Flow is Cancelled and executed again. There's any way to bypass that?
    k
    • 2
    • 1
  • w

    Wilson Bilkovich

    08/27/2021, 1:19 PM
    I don’t have any errors in my
    apollo
    pod logs, but I see this in the Agent logs:
    urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='prefect-server-initial-apollo.prefect', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused'))
    (prefect is my namespace, prefect-server-initial-apollo is the name of the service)
    n
    • 2
    • 15
  • k

    Konstantin

    08/27/2021, 2:37 PM
    how to understand what he is missing
    n
    k
    • 3
    • 18
  • k

    Konstantin

    08/27/2021, 2:39 PM
  • b

    Bastian Röhrig

    08/27/2021, 2:46 PM
    Hey everyone, is there a way to use a FilterTask and rely on the output of two different tasks in the filter_func? I build a workaround where I combine the data I want to filter with the additional information, that feels really clunky however. I added some pseudo code to ilustrate my workaround
    with Flow("filtering") as flow:
    	a_s = get_a_s() # ["a1", "a2", "a3"]
    	b = get_b() # "b2"
    	a_s_and_b = combine_a_s_and_b.map(a_s = a_s, b = unmapped(b)) # [("a1", "b2"), ("a2", "b2"), ("a3", "b2")]
    	FilterTask(filter_func=a_and_b_end_on_the_same_character)(a_s_and_b) # [("a2", "b2")]
    n
    • 2
    • 1
  • e

    Evan Curtin

    08/27/2021, 2:58 PM
    Hey folks, just trying out prefect for the first time after using some other workflow systems before, loving the UX! Quick question, I’m using the
    ExecuteNotebook
    task and it’s spilling the cell content to stdout during normal execution, is there an option to supress? I’m using a target file to cache the output and that’s working fine
    k
    • 2
    • 18
  • j

    Jean Da Rolt

    08/27/2021, 5:03 PM
    Folks, is it possible to specify the number of threads for a specific task when running LocalDaskExecutor?
    k
    n
    • 3
    • 7
  • d

    Danny Vilela

    08/27/2021, 10:39 PM
    Hi all! Is there a documented reason as to why checkpointing is disabled by default on local Prefect flows? That is, why is the behavior opt-in when other frameworks typically have it as a requirement (or opt-out)? https://docs.prefect.io/core/concepts/results.html#pipeline-persisted-results
    c
    • 2
    • 3
  • b

    Blake List

    08/27/2021, 11:53 PM
    Hi there, I have a flow of flows (a, b, and parent flow p). Flows a and b both query different databases, build a dataframe, do some processing, and then store them elsewhere. I want to make a third flow, c, that merges these dataframes. Is it possible to input the dataframes into flow c without having to load them from the new db or read them from a csv? In other words, can I pass them into flow c, e.g. using the parent flow?
    k
    a
    • 3
    • 2
  • f

    Fabian Brück

    08/28/2021, 11:15 PM
    Hi! I just tried to run prefect agentless on AWS ECS and s3 storage. For some reason boto3 cannot find any AWS credentials and it fails downloading flows from S3. When I run it locally with credentials as env variables it reports the token is expired. (which isn't true) Is that caused by being executed in a subprocess? Thanks 🙂
    prefect run --name hello-world --execute       
    Looking up flow metadata... Done
    Creating run for flow 'hello-world'... Done└── Name: malachite-lobster└── UUID: 381ad17b-e2d8-4e8b-84aa-ab880d2f0458└── Labels: ['agentless-run-149b4f62']
    └── Parameters: {}
    └── Context: {}
    └── URL: <https://cloud.prefect.io/data-and-bi/flow-run/381ad17b-e2d8-4e8b-84aa-ab880d2f0458>
    Executing flow run...
    └── 01:11:26 | INFO    | Creating subprocess to execute flow run...
    └── 01:11:32 | INFO    | Downloading flow from <s3://bucket/hello_world.py>
    └── 01:11:32 | ERROR   | Error downloading Flow from S3: An error occurred (ExpiredToken) when calling the GetObject operation: The provided token has expired.
    An error occurred (ExpiredToken) when calling the GetObject operation: The provided token has expired.
    k
    m
    • 3
    • 13
  • w

    Wilson Bilkovich

    08/29/2021, 4:43 PM
    Anybody seen this error before? I’m getting it in the prefect job pods that get spun up:
    RuntimeError: Unable to find any timezone configuration
    k
    • 2
    • 5
  • z

    Zac Chien

    08/29/2021, 6:01 PM
    Hi, I’ve found there are lots of unnamed tasks such as List, Dict, case(True) occur when I call StartFlowRun with parameter, is there any suggestions to make schematic more readable?
    k
    • 2
    • 1
  • w

    Wilson Bilkovich

    08/29/2021, 7:17 PM
    My understanding is that the Agent deployment is what needs to have local secrets configured, in order to use for example GitHub storage?
    k
    • 2
    • 40
  • f

    Fabian

    08/30/2021, 5:36 AM
    I have a daily flow "daily_exporter" (which exports data from another system) in the morning at e.g. 6:00am. Beginning at 7:00am there are many other scheduled flows relying on the exported data of "daily_exporter". They all check if the exported data file is available, retry for a while, and if the export is not available after several retries they are configured to fail. Unfortunately, sometimes the initial "daily_exporter" fails (due to low memory or whatever reason). This means I have many failed flows at 09:00am. To resolve the situation it seems, that I have to manually go through each flow run in the UI and restart each failed flow individually. This manual process is very time-consuming and annoying. Does anyone has any practical hint to improve this? Is there any way to just click on a single button to restart ALL failed flows?
    k
    • 2
    • 1
  • c

    CA Lee

    08/30/2021, 7:08 AM
    Hi, is there any way to register a single Prefect flow to run different functions on different schedules? (details in thread)
    b
    b
    k
    • 4
    • 10
  • b

    Bouke Krom

    08/30/2021, 7:58 AM
    Good morning! We are trying to run a single flow with many (about 30) different parameters at the same schedule: all Monday morning 9am. This morning exactly 10 runs were performed, the others were silently skipped. We run Prefect server with a single LocalAgent. It seems like we're running into some default 'max 10' setting. The agent displays as having 10 'submittable runs' -- maybe this can be increased but no hits for
    submittable
    in the docs... Any pointers?
    w
    k
    +4
    • 7
    • 44
  • j

    Jelle Vegter

    08/30/2021, 11:15 AM
    I'm looking to trigger a script on the 4th day before the end of the month. Can a setup like this work where I check if the result of a task is True to run the rest or do I need a different approach? Thanks!
    b
    a
    +3
    • 6
    • 19
  • y

    YD

    08/30/2021, 2:52 PM
    …
    ✅ 1
  • t

    Tony Yun

    08/30/2021, 3:46 PM
    Hi, I tried to search but don’t seems find related answer. I want to change the auto-generated container like names “happy-jelly” stuff. But I don’t want to overcomplicate things like to
    create_flow_run
    in code. What’s the simpliest thing to change by giving the flow run a customized run-id? Assume I only need to trigger the flow run using schedules or UI.
    k
    • 2
    • 14
  • b

    bral

    08/30/2021, 7:09 PM
    Good day! I can see KilledWorker: ('blahblahblah', <Worker 'tcp://10.10.10.qp:38828', name: 367, memory: 0, processing: 1) in flow logs. My dask-scheduller worker timeouts are default. Task became 'Failed' after 20+ minutes since started. Which timeout option should i change ?
    w
    z
    k
    • 4
    • 9
  • c

    chicago-joe

    08/30/2021, 7:49 PM
    Hey all, I'm running into an error using state_handlers in the task decorator now. Any thoughts? I'm on Prefect 0.14.20
    File "D:\venv\poetry\.venv\lib\site-packages\prefect\core\task.py", line 159, in init
        old_init(self, *args, **kwargs)
    TypeError: __init__() got an unexpected keyword argument 'state_handers'
    python-BaseException
    @task(log_stdout = True, nout = 2, max_retries = 3, retry_delay = timedelta(minutes = 15), state_handers = [handler])
    z
    • 2
    • 2
  • m

    Michael S

    08/30/2021, 8:01 PM
    Hey everyone! Just started with Prefect, and I am having a great time so far. I’m running into an issue grabbing a flow from S3 and running it in a docker container. I think this should be straightforward; not sure where I am going wrong. Set-up: I am running prefect server, writing flows, and running
    DockerAgent()
    on my local computer, and I am using
    S3
    (private bucket) for my flow storage. I am having the flow run with
    DockerConfig
    , with a custom image. I am able to register the flow, and I see it in S3. Error: When trying to run the Flow, I get
    Error downloading Flow from S3: Unable to
    locate credentials
    What I tried: 1. At first, I thought, oh!, the agent doesn’t have the creds (at first I thought the server might need it as well, but I don’t think that’s true). I shut down the agent,
    export AWS_ACCESS_KEY_ID=...
    and
    export AWS_SECRET_ACCESS_ID=...
    , in the shell, and restarted. This didn’t work. 2. I then figured maybe the container needs the creds. I (temporarily) hardcoded them into my docker image. This works! This could be a fine workaround. I could build the image with
    --build-arg
    to remove the hardcoding, but I don’t really want to bake my secrets in the container environment. I rather pass them in at runtime (via the agent)-- i.e., something like
    docker run -e AWS_ACCESS_KEY_ID...
    What I don’t understand: What is the best workflow for having docker containers pull flows from S3? I was looking in the prefect
    Secrets
    , but this seems like it’s for accessing secret within tasks, but getting flows from somewhere. Am I misunderstand how
    Secrets
    can be used? What would be nice: An example of the best practice. Sorry if this is such an obvious question! Still trying to get my bearings here.
    w
    z
    • 3
    • 5
  • v

    Vincent

    08/30/2021, 9:16 PM
    Hi All - I started with a flow with prefect cloud which seems to have paused for ~2 hours. When I look at the dask scheduler, there are no reported tasks remaining. Is there any reason why my flow would have stopped?
    k
    • 2
    • 11
  • e

    Elijah Moreau-Arnott

    08/30/2021, 10:39 PM
    Hi all! I have been playing around with dependant flows, and was looking at using
    get_task_run_result
    to retrieve task results from the child flow, however I was getting errors where it couldn't find a task with the provided slug (which I had set to 'output') - I thought it was an issue with my results location until I realized that despite annotating the child task like so:
    @task(slug="output",result=GCSResult(...
    the slug name (as visible in the context) was
    output-copy
    . I now got
    get_task_run_result
    working by grabbing the task with slug
    output-copy
    but was unsure of how the
    -copy
    came about. Is this documented anywhere?
    k
    m
    • 3
    • 6
  • j

    John Marx

    08/30/2021, 11:42 PM
    @Prefect Community
Powered by Linen
Title
j

John Marx

08/30/2021, 11:42 PM
@Prefect Community
View count: 1