https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Andor Tóth

    02/28/2020, 2:20 PM
    I was wondering, after thoroughly looking in the documentation, that how could I execute a single task on it’s own from a flow?
    d
    n
    4 replies · 3 participants
  • a

    Andor Tóth

    02/28/2020, 2:21 PM
    Do you have any suggestions?
    🚀 1
  • b

    Braun Reyes

    02/28/2020, 6:46 PM
    context option on flow run in cloud is 🔥 could totally use this to pass things to remote environment and agent for running tasks from cloud!
    :marvin: 3
    :upvote: 6
    a
    2 replies · 2 participants
  • a

    Adam Roderick

    02/28/2020, 10:32 PM
    Shout out to @Jenny for some excellent work on a state handler. Prefect continues to attract great people!
    💯 2
    :upvote: 4
    ❤️ 4
    j
    1 reply · 2 participants
  • m

    Mark Williams

    02/28/2020, 11:23 PM
    I am having an issue with the DaskExecutor. I am executing locally on my machine for testing. Without the DaskExecutor, my process is much faster no matter whether I am processing 1 file or 400 files. I am fairly new to Dask so it could be an implementation issue.
  • n

    Nate Atkins

    02/28/2020, 11:31 PM
    How many Dask worker processes did you start?
    m
    1 reply · 2 participants
  • a

    Adam Roderick

    02/28/2020, 11:32 PM
    Can I access a flow's run ID from a state handler? I want to generate a URL to the logs in Prefect Cloud, and I need the run ID to do that
    d
    7 replies · 2 participants
  • t

    trapped

    03/02/2020, 8:09 AM
    hi guys! is there any way to limit flow run (not task run) concurrency for agents?
    a
    3 replies · 2 participants
  • p

    Peter Ebert

    03/02/2020, 2:49 PM
    Hi...is there anybody here who could elaborate on the pros and cons of migrating from workflow engines popular in bioinformatics (e.g., Snakemake or Nextflow) to Prefect for large pipelines?
    l
    s
    8 replies · 3 participants
  • m

    Maikel Penz

    03/03/2020, 1:20 AM
    Hi.. For the ones using Prefect cloud with the Fargate execution environment. If register again an existing flow, passing the
    FargateTaskEnvironment
    with updated CPU/Memory settings I see that the Task definition doesn't get updated on ECS. No new version is created and next run will pick up the old configuration. Did anyone get around this issue ?
    j
    4 replies · 2 participants
  • a

    Arlo Bryer

    03/03/2020, 2:07 PM
    Hi - when I try to run a flow from the Cloud UI I seem to always end up with the first version of the flow (as opposed to latest). Haven’t tested this extensively but wondering if anyone else is seeing this?
    j
    j
    26 replies · 3 participants
  • c

    Cab Maddux

    03/03/2020, 2:20 PM
    Hi! I'm all of a sudden getting some failed flows/tasks due to Zombie Killer Process marking tasks as failed after they have completed successfully. Any suggestions?
    z
    7 replies · 2 participants
  • t

    trapped

    03/03/2020, 6:27 PM
    Hi! I've been trying the
    createFlowRun
    mutation in Cloud for dynamically scheduling flow runs (with parameters); is there any way to batch-create flow runs to avoid the overhead of calling the mutation for each run?
    j
    1 reply · 2 participants
  • a

    Andrew Vaccaro

    03/03/2020, 7:05 PM
    Do I need to create flow runs with the same labels that are auto-generated by an agent? I see new auto-generated labels (probably from installing more storage options?), and my agent isn't picking up a scheduled flow run. Plus, the agent has
    X
    label from the hostname running on an EC2 instance, while I scheduled the run locally with hostname (and therefore label)
    Y
    . Edit: Ah, answered my own question, that's the issue. Any way I can turn off the hostname label by default? Or keep the labels but ignore the affinity (better option)?
    j
    2 replies · 2 participants
  • w

    Wolfgang Kerzendorf

    03/03/2020, 10:06 PM
    hey guys! just looking into this for a research project. Does prefect have a dashboard?
    n
    5 replies · 2 participants
  • t

    Tuan Nguyen

    03/04/2020, 1:18 PM
    Hi all from Amsterdam!
    👋 5
  • t

    Tuan Nguyen

    03/04/2020, 1:19 PM
    I have a similar question as above. Is it true that I can't run a UI without Prefect Cloud? I can't host my own?
    j
    5 replies · 2 participants
  • j

    John Ramirez

    03/04/2020, 4:59 PM
    hey is there an easy way to reset a dask cluster and release memory after a heavy run
    c
    1 reply · 2 participants
  • t

    Tsang Yong

    03/04/2020, 5:12 PM
    hey all, if I have a task with max_retries set, is there some variables I can access that represents the current number of retries?
    c
    1 reply · 2 participants
  • g

    George Coyne

    03/04/2020, 7:57 PM
    Hey guys I am getting an error that I am not totally clear on:
    AttributeError: 'Parameter' object has no attribute 'log_stdout'
  • g

    George Coyne

    03/04/2020, 7:57 PM
    Nothing has changed on my flow, but it is erroring out on the first task, the flow also runs locally, but fails in the cloud
    j
    c
    +1
    6 replies · 4 participants
  • w

    Wolfgang Kerzendorf

    03/05/2020, 4:03 PM
    okay - I want to give this a shot. I'm downloading data from and s3 bucket. Then this is extracted into ~ 1e6 folders then each of these folders is processed which results in one file per folder. I would like to have a workflow where I can see what went wrong with each of these 1e6 tasks (for those that fail). So do I start with the first task being a glob? and then string tasks to this? Sorry for the stupid questions
    n
    j
    26 replies · 3 participants
  • a

    Arlo Bryer

    03/05/2020, 5:06 PM
    Hi - I’d like some advice on having logs show up in Prefect Cloud as well as streamed locally to stderr/stdout (the latter being currently the case). I currently have some code that is wrapped in tasks/flows which produces these logs (and has an associated
    logger
    ). I was hoping that I could perhaps add this as an extra logger to Prefect (or add Prefect’s logger as one of the handlers), resulting in getting logs in cloud and on local stderr. Has anyone set up something similar to this before? (I tried naively adding the package logger to Prefect’s
    PREFECT_LOGGING_EXTRA_LOGGERS
    env var, but this doesn’t seem to have worked as expected).
    l
    j
    25 replies · 3 participants
  • j

    John Ramirez

    03/05/2020, 7:02 PM
    hey everyone - on prefect 0.9.7 and getting this error during deployment
    ModuleNotFoundError: No module named 'prefect.utilities.notifications.notifications'; 'prefect.utilities.notifications' is not a package
    j
    1 reply · 2 participants
  • a

    Adam Roderick

    03/05/2020, 11:17 PM
    I have a flow failing in cloud that runs fine locally. I suspect something is up with the secrets. I have local secrets enabled, and I simply pasted the value into cloud secrets. The error is
    Exception raised while calling state handlers: TypeError('string indices must be integers')
    . I don't really now how to begin troubleshooting this in cloud. Any ideas?
    c
    m
    25 replies · 3 participants
  • a

    alvin goh

    03/06/2020, 12:09 AM
    I'm looking at using perfect scheduler.. can it run entirely offline in an enterprise environment?
    c
    6 replies · 2 participants
  • n

    Nguyen Hoang Nguyen

    03/06/2020, 7:53 AM
    Hi right now I want to scheldure a task with a CronClock, but right now the default CronClock of Prefect is in UTC. Is there a way to change it to a specific time zone? The code I try right now is as follow but it does not change the start date of the CronClock (terminal still show UTC)
    from prefect.schedules.clocks import CronClock
    import pendulum
    
    schedule = Schedule(clocks=[CronClock("40 14 * * *", start_date=pendulum.now("Asia/Ho_Chi_Minh"))])
    a
    5 replies · 2 participants
  • t

    trapped

    03/06/2020, 9:26 AM
    hi guys! what would be the correct way to have an environment-global Flow (i.e. can run on all agents) with environment-specific Flow Runs (i.e. only runs on some agents depending on their labels)? from the docs it seems labels can only be set on Flows, and the
    Client.create_flow_run
    method doesn't accept a
    labels
    parameter
    a
    1 reply · 2 participants
  • m

    Manuel Aristarán

    03/06/2020, 3:42 PM
    Hey everyone. I’m wondering if anybody here has integrated Singer.io’s taps and targets with a Prefect Flow?
    l
    7 replies · 2 participants
  • j

    John Ramirez

    03/06/2020, 5:42 PM
    hey everyone - im dealing with an unusual issue in dask. I need to clear memory after a workflow run (success or failure) but I can’t use
    client.restart
    in the workflow because the workflow will fail. Any thoughts or ideas on how to clear memory
    c
    17 replies · 2 participants
Powered by Linen
Title
j

John Ramirez

03/06/2020, 5:42 PM
hey everyone - im dealing with an unusual issue in dask. I need to clear memory after a workflow run (success or failure) but I can’t use
client.restart
in the workflow because the workflow will fail. Any thoughts or ideas on how to clear memory
c

Chris White

03/06/2020, 6:13 PM
Hi @John Ramirez - you can call
client.restart
in the
on_exit
callback of your Flow’s execution environment: https://docs.prefect.io/cloud/execution/overview.html#environments
j

John Ramirez

03/06/2020, 6:19 PM
so i found that but i have my dask cluster within EKS which is not open to the public. when I try to deploy my workflow via docker, the function is validated and breaks
The function also run at deployment which I also do not want
c

Chris White

03/06/2020, 6:21 PM
I don’t think I understand -> if your workflow is successfully running against your Dask cluster, then it must have access to that cluster so this command should work
j

John Ramirez

03/06/2020, 6:24 PM
yes i do have access to the cluster but I use the kubernetes internal IP address in the workflow not the external address
c

Chris White

03/06/2020, 6:26 PM
Sure, but regardless if you can access the cluster then you can call restart whenever the workflow has completed its run via the on exit hook. Did I maybe misunderstand the goal?
j

John Ramirez

03/06/2020, 6:27 PM
no you are right but i dont understand why the function is executed during deployment. Is there a way to turn that off
c

Chris White

03/06/2020, 7:01 PM
I’m sorry I don’t think I understand your question; the way Cloud deployments on K8s work at a high level is: Agent creates K8s job -> Environment class spins up and calls setup, execute, Flow runs and submits work to whatever executor, on exit is called, job is cleaned up
j

John Ramirez

03/06/2020, 7:03 PM
sorry i meant when I run
flow.register()
the
on_start
or
on_exit
function also runs
c

Chris White

03/06/2020, 7:31 PM
Hmm that shouldn’t happen
j

John Ramirez

03/06/2020, 7:35 PM
do want an error log
c

Chris White

03/06/2020, 7:35 PM
Yea, that would be great
j

John Ramirez

03/06/2020, 7:38 PM
Traceback (most recent call last):
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/comm/core.py", line 218, in connect
    _raise(error)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/comm/core.py", line 203, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to '<tcp://dask-scheduler.default.svc.cluster.local:8786>' after 10 s: connect() didn't finish in time

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "main.py", line 92, in <module>
    on_start=clear_memory()
  File "main.py", line 79, in clear_memory
    client = Client('dask-scheduler.default.svc.cluster.local:8786')
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/client.py", line 723, in __init__
    self.start(timeout=timeout)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/client.py", line 896, in start
    sync(self.loop, self._start, **kwargs)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/client.py", line 991, in _start
    await self._ensure_connected(timeout=timeout)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/client.py", line 1048, in _ensure_connected
    connection_args=self.connection_args,
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/comm/core.py", line 227, in connect
    _raise(error)
  File "/Users/johnramirez/.local/share/virtualenvs/trader-ranking-Oomc_S11/lib/python3.7/site-packages/distributed/comm/core.py", line 203, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to '<tcp://dask-scheduler.default.svc.cluster.local:8786>' after 10 s: Timed out trying to connect to '<tcp://dask-scheduler.default.svc.cluster.local:8786>' after 10 s: connect() didn't finish in time
here is the python code as well
def clear_memory():
    client = Client('dask-scheduler.default.svc.cluster.local:8786')
    client.restart()

ENV = RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={
        "address": "<tcp://dask-scheduler.default.svc.cluster.local:8786>"
    },
    on_start=clear_memory()
)
c

Chris White

03/06/2020, 7:40 PM
You are running your
clear_memory
function in this code; you should instead be providing that function to `on_start`:
ENV = RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={
        "address": "<tcp://dask-scheduler.default.svc.cluster.local:8786>"
    },
    on_start=clear_memory # <--- the change I made
)
j

John Ramirez

03/06/2020, 7:57 PM
thank you that worked.
c

Chris White

03/06/2020, 7:57 PM
👍 np
View count: 1