https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • p

    Preston Marshall

    02/08/2020, 4:46 PM
    So I made a custom Secret implementation that pulls from google secret manager. The problem is that the dask-workers blow up because they can't seem to find the module. It's just a file in my demo directory. How is this supposed to work?
    c
    10 replies · 2 participants
  • p

    Preston Marshall

    02/08/2020, 4:46 PM
    I tried having the workers use the same image as the scheduler which has the module, but that doesn't seem to cut it.
  • p

    Preston Marshall

    02/08/2020, 7:27 PM
    Is it possible to have a secret name come from a Parameter?
  • p

    Preston Marshall

    02/08/2020, 7:35 PM
    This is my attempt:
    with Flow("mirror_sftp_to_gcs") as flow:
        password_secret_path = Parameter("password_secret_path")
        sftp_connection_info = Parameter("sftp_connection_info")
        password = GoogleSecretManagerSecret(password_secret_path)  # TODO: Cast function to convert to namedtuple
        host_path = Parameter('host_path')
        files = sftp_list_files(sftp_connection_info, password, host_path)
        downloaded_files = sftp_to_gcs.map(
            file_path=files,
            connection_info=unmapped(sftp_connection_info),
            password=unmapped(password),
            bucket=unmapped("snip")
        )
    I get an error:
    ValueError: Flow.run received the following unexpected parameters: password_secret_path
    Very odd, the other parameters work, I am specifying them in a parameters kwarg in flow.run() in another file.
  • p

    Preston Marshall

    02/08/2020, 7:42 PM
    It appears that because the parameter is never used in a task, it doesn't ever expect it
    c
    j
    35 replies · 3 participants
  • p

    Preston Marshall

    02/09/2020, 3:39 PM
    How do I throttle a task being mapped?
  • n

    Nate Atkins

    02/09/2020, 3:43 PM
    I just started looking at the Cloud Scheduler yesterday and saw this on concurrency limits. https://docs.prefect.io/cloud/concepts/concurrency-limiting.html#task-concurrency-limiting Not sure what you do if you are running locally with Dask.
  • p

    Preston Marshall

    02/09/2020, 3:46 PM
    That's unfortunate it can't be rate-limited without cloud, I hope that feature comes back.
  • j

    Jeremiah

    02/09/2020, 3:46 PM
    Within a single Dask cluster, you can use Dask resource limits to achieve a similar result. Global limiting requires some central broker of state.
  • p

    Preston Marshall

    02/09/2020, 3:51 PM
    Do you have a link on that by chance? I feel like thats something that should be handled by the executor if the underlying system supports it. I totally get you need to fund development but if you are thinking of open source as a strategy to onboard people to the commercial offering it's difficult to put together a POC without this 🙂
  • j

    Jeremiah

    02/09/2020, 3:55 PM
    It is handled by the executor as the docs explain - https://docs.prefect.io/api/latest/engine/executors.html#daskexecutor
  • p

    Preston Marshall

    02/09/2020, 4:48 PM
    I meant that if the executor itself supports it, the Executor abstraction should facilitate it. Maybe I’m misunderstanding but it doesn’t seem like it does
  • i

    itay livni

    02/09/2020, 7:30 PM
    Hi - Is this the correct way to handle a state in a flow that fails and skips all downstream task on failure. The result of the flow being a an empty list? Thanks again
    Untitled
    c
    3 replies · 2 participants
  • p

    Preston Marshall

    02/09/2020, 8:28 PM
    Is there some way to pass a
    Secret
    to the slack notifier or look it up by name? Looking at the code it seems to read from the context, but it's not clear if I can actually set secrets in the context other than via the config file
    c
    1 reply · 2 participants
  • p

    Preston Marshall

    02/09/2020, 8:30 PM
    Ah, I see there are Secrets and Secret tasks
  • p

    Preston Marshall

    02/09/2020, 8:31 PM
    seems like the notifier uses the former, so this probably won't work
  • c

    Chris O'Brien

    02/09/2020, 9:47 PM
    Has anyone come across issues with Scheduled Flows not releasing the memory? We have an ETL setup to run every day that will just consume additional memory on each run. Are there any easy gotchas that can cause this behaviour before I begin digging into this?
    c
    3 replies · 2 participants
  • t

    Thomas La Piana

    02/10/2020, 9:56 AM
    hi all, I'm looking in to adopting prefect for DS workflows instead of trying to shoehorn them into airflow. does anyone have some experience with this? Has anyone found it slowing down workflows as opposed to something like kubeflow? Does prefect core have a web ui or only supports the dask ui?
    j
    p
    +3
    25 replies · 6 participants
  • m

    Mark Williams

    02/10/2020, 5:38 PM
    Is there an example of how to setup a custom configuration file for Windows? It points to ~/Users/<username>. This is not a valid path for Windows users. I have tried setting the environment variables on load with no luck. I also set them in the windows environment variables and still no luck.
    c
    11 replies · 2 participants
  • d

    dhume

    02/10/2020, 6:59 PM
    Hello. I’m playing around with the slack notification and trying to locally set the secret but keep getting the error that
    ValueError: Local Secret "SLACK_WEBHOOK_URL" was not found.
    I tried the suggestions mentioned here https://github.com/PrefectHQ/prefect/blob/b4c08b15b386107e2c1afb7fed35dca15b683cdd/src/prefect/client/secrets.py but wasn’t getting anything to work locally. Threading what I currently have
    j
    c
    +1
    6 replies · 4 participants
  • j

    John Ramirez

    02/11/2020, 4:01 PM
    hey - is there a way to have a prefect task populate a global python
  • n

    Nate Atkins

    02/11/2020, 4:28 PM
    While we are queuing up questions this morning, is there a way to direct which Dask resource a task needs? The obvious example is GPU or CPU. In my case, I have one task that needs a lot of memory and I don't want to set the memory limit on all the workers to that level. I'd just like to have one worker with a high memory limit and send that task to that worker. https://distributed.dask.org/en/latest/resources.html#worker-resources
    j
    3 replies · 2 participants
  • t

    trapped

    02/11/2020, 4:46 PM
    seems like formatting on the docs page on Constant tasks got borked https://docs.prefect.io/api/latest/tasks/constants.html
    😬 1
    j
    2 replies · 2 participants
  • a

    Adam Roderick

    02/12/2020, 5:22 PM
    "No module named IPython" -- is that not included in the prefect module?
    j
    i
    4 replies · 3 participants
  • a

    Alexander Verbitsky

    02/12/2020, 7:09 PM
    Hi all, Is there any way to store data between runs and don't execute some tasks if outputs already exists by analogue with luigi targets?
  • a

    Alexander Verbitsky

    02/12/2020, 7:12 PM
    I found that I can specify
    result_handler
    for task, but it doesn't check that data already exists in a repeatable run
    c
    5 replies · 2 participants
  • j

    Jan Harkema

    02/14/2020, 9:58 AM
    Hey guys, I'm using Prefect with Dask, using the built-in DaskExecutor. I see in the Dask gui that sometimes tasks 'linger'. They don't get cleaned up. Over time, this eats memory. Restarting the Dask Scheduler helps, but ideally I would see the tasks getting cleaned up by the system itself. Is this a known issue and is there a way to keep the memory clean? We're using dask==2.9.0 and prefect==0.7.3
    c
    l
    5 replies · 3 participants
  • c

    Cab Maddux

    02/14/2020, 1:42 PM
    Hi! I'm running flows using the k8s executor/agent in staging/production environment and local executor in development environment. For many workloads have the primary flow trigger work either: 1. If in staging/production environment - in a separate k8s job with a createNamespacedJob task followed by a custom waitForNamespacedJob task. 2. If in development environment - using docker createContainer, runContainer, waitOnContainer tasks Sometime these are created based on mapped inputs so work can be done in parallel - but obviously need some limits. In staging/production I can use tags for task concurrency limits which is all good 👍. How would you recommend I limit task concurrency when in development environment creating/running/waiting on containers?
    j
    b
    6 replies · 3 participants
  • n

    Nate Atkins

    02/14/2020, 8:27 PM
    I've been working to keep exception and log data in our environment instead of going to Prefect Cloud. Exceptions were pretty easily handled with a decorator to catch the exceptions, store them locally and then raise an exception that has the URI of the saved exception. I have a small app that pulls down the log from the cloud and builds a simple web page that lets you click to get the full exception from the local store. I wanted to do the same thing with logging. Log the message to a local file and update the message to Prefect Cloud with the URI of the locally stored log message. I can add the handler to log to a local file and I can update the formatter on the prefect logger to accomplish all of this when I have control of the application and call flow.run(). I'm having problems figuring out how to get the little stub of code that makes these change to the logging configuration to run with the agent picks up the flow from Prefect Cloud and runs the flow. Is there a place to put this code that the Agent will run it before the flow is kicked off? I guess as a last resort I can add a set task at the beginning of the flow to make this configuration change.
    j
    j
    +1
    28 replies · 4 participants
  • n

    Nikita Vostretsov

    02/17/2020, 6:48 AM
    Hi guys, I have next computational graph. It was easy to implement using functional API. Usually I want to run all five tasks. But, sometimes I want to run only upper branch (
    t1
    -
    t3
    ) or lower branch (
    t1
    ,
    t4
    and
    t5
    ). Is it possible to do without rewriting code from
    with Flow('flow') as flow:
       r1 = t1()
       r2 = t2(r1)
       r3 = t3(r2)
       r4 = t1(r1)
       r5 = t5(r4)
    into
    def upper_branch():
        r1 = t1()
        r2 = t2(r1)
        r3 = t3(r2)
    
    def lower_branch():
        r1 = t1()
        r4 = t4(r1)
        r5 = t5(r2)
    
    with Flow('flow') as flow:
        if do_upper or do_both:
            upper_branch()
        if do_lower or do_both:
            lower_branch()
    t
    j
    5 replies · 3 participants
Powered by Linen
Title
n

Nikita Vostretsov

02/17/2020, 6:48 AM
Hi guys, I have next computational graph. It was easy to implement using functional API. Usually I want to run all five tasks. But, sometimes I want to run only upper branch (
t1
-
t3
) or lower branch (
t1
,
t4
and
t5
). Is it possible to do without rewriting code from
with Flow('flow') as flow:
   r1 = t1()
   r2 = t2(r1)
   r3 = t3(r2)
   r4 = t1(r1)
   r5 = t5(r4)
into
def upper_branch():
    r1 = t1()
    r2 = t2(r1)
    r3 = t3(r2)

def lower_branch():
    r1 = t1()
    r4 = t4(r1)
    r5 = t5(r2)

with Flow('flow') as flow:
    if do_upper or do_both:
        upper_branch()
    if do_lower or do_both:
        lower_branch()
t

trapped

02/17/2020, 2:57 PM
I believe you can use `prefect.tasks.control_flow.conditional.ifelse`: https://docs.prefect.io/core/task_library/control_flow.html#control-flow
j

Jeremiah

02/17/2020, 3:56 PM
Yup, this is exactly what the ifelse is for. You can use a Parameter to control which branch to execute.
n

Nikita Vostretsov

02/18/2020, 7:28 AM
From code readability point view this variant is not better then using separate functions and python's
if
. I was thinking about something like
with Flow('flow') as flow:
   r1 = t1()
   r2 = t2(r1)
   r3 = t3(r2)
   r4 = t1(r1)
   r5 = t5(r4)

targets = []
if do_upper or do_both:
   targets.append(r3)
if do_upper or do_both:
   targets.append(r5)
flow.run(targets=targets)
j

Jeremiah

02/18/2020, 4:29 PM
@Nikita Vostretsov Python’s
if
is not available inside a flow since it is executed at “build time”; Prefect’s
ifelse
is executed at “run time” and can therefore depend on information available when the flow runs. In your example, the flow actually runs both branches no matter what, as you have no runtime control flow implemented. The only thing changing is the input to your
targets
keyword argument, which is presumably a Parameter not shown here.
The pattern you’d like to implement is what
ifelse
is intended for; you want to make a decision at runtime about which branch to execute. If, on the other hand, you know which branch you want to run AND you don’t mind rebuilding your flow each time (which might be fine for your purposes!), then you could use a Python
if
to control which tasks you add to the flow. However, if you want a single flow with multiple branches, you’ll need a Prefect
ifelse
.
View count: 1