https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    matta

    03/27/2021, 2:14 AM
    Hrm, I've got a task that just never seems to terminate? Everything finishes fine but I always have to manually set it as
    Successful
    . I've tried setting Reference tasks after the Flow def, I'd tried making a task that raises
    signals.SUCCESS()
    and is downstream from the two main tasks (and I've tried setting the trigger to be
    all_finished
    ), nothing seems to work. The main element of it is is two mapped tasks.
    f
    • 2
    • 2
  • a

    Ananthapadmanabhan P

    03/27/2021, 4:33 PM
    Hey guys! I’m trying to pass AWS secrets into my flow without getting it printed in plain text anywhere (I’m running the server on a kubernetes cluster). This is how i call the script that defines and registers the flow (as mentioned in https://docs.prefect.io/core/concepts/secrets.html#default-secrets):
    PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS='{"ACCESS_KEY": "<my_key_here>", "SECRET_ACCESS_KEY": "<my_secret_key_here>"}' python create_flow.py
    And this is how i internally pass it down into the KubernetesRun method
    job_env = {
        "PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS":
            os.getenv("PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS"),
        "PREFECT__BACKEND":
            "server"
    }
    
    flow.run_config = KubernetesRun(env=job_env,
                                    image="ananthutest/prefect-test:latest")
    But when i do kubectl describe of the created pod/job in k8s, it shows
    PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS
    under
    Environment
    in plain text. Anyway I can avoid this?
    c
    • 2
    • 3
  • t

    Trevor Kramer

    03/27/2021, 6:59 PM
    Is there some trick to not having to create little tasks just to do string formatting like this?
    @task()
    def get_s3_location(workspace_bucket):
        return f's3://{workspace_bucket}/{prefect.context.get("flow_run_name")}/'
    Something like the simple addition prefect turns into tasks automatically within the flow?
    c
    • 2
    • 1
  • x

    xyzy

    03/27/2021, 8:53 PM
    Is it normal that there is no error message if a flows Result is set to S3Result and the upload of results is not working? Or are results only uploaded in case of an error of the following step?
    c
    • 2
    • 3
  • x

    xyzy

    03/27/2021, 9:35 PM
    Also what is the best practice for sharing code between flows in a project? And how about sharing code between different projects? Are there any options besides rolling out a library to a pypi index?
    c
    • 2
    • 4
  • t

    Trevor Kramer

    03/27/2021, 10:59 PM
    Is it possible to set the default value of a Parameter to be a lambda instead of a value? I want to re-create something like the flow names generated by the cloud ui.
    d
    m
    • 3
    • 2
  • s

    Sean Harkins

    03/28/2021, 6:52 PM
    I have a question about my (probably incorrect 😄 ) understanding of Prefect logger and it’s behavior when using a
    DaskExecutor
    . When running this example Flow
    @task
    def say_hello():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, Cloud")
        return "hello result"
    
    
    with Flow(
        "dask-test-flow",
        storage=storage.S3(
            bucket=storage_bucket
        ),
        run_config=ECSRun(
            image=worker_image,
            labels=json.loads(os.environ["PREFECT_AGENT_LABELS"]),
            task_definition=definition,
        ),
    ) as flow:
        hello_result = say_hello()
    I am able to view log information (specifically the “Hello, Cloud” info line) directly in Prefect Cloud. But using an identical flow with a
    DaskExecutor
    logging information is sent to my CloudWatch
    dask-ecs
    log group but is not available in Prefect Cloud. Is it possible to view all Dask worker logging in Prefect Cloud and if so, what additional logging config to I need to include?
    d
    j
    • 3
    • 21
  • b

    bral

    03/28/2021, 6:52 PM
    Good day! When we use dask-cluster, is the total number of concurrently executed tasks equal to the number of dask-workers or the sum of dask-worker threads?
    d
    • 2
    • 2
  • j

    Jeremy Tee

    03/29/2021, 8:56 AM
    hi everybody, i saw this in the prefect document
    from prefect import Task, Flow
    from prefect.tasks.prefect import StartFlowRun
    
    
    class ExtractSomeData(Task)
        def run(self):
            return {"param-key": "some-random-piece-of-data"}
    
    extract_some_data = ExtractSomeData(name="extract_some_data")
    
    # assumes you have registered a flow named "exampled" in a project named "examples"
    flow_run = StartFlowRun(flow_name="example", project_name="examples")
    
    with Flow("parent-flow") as flow:
        flow_run.set_upstream(extract_some_data, key="parameters")
    but i am wondering what is the
    parameters
    ? and whether the
    param-key
    from
    ExtractSomeData
    is passed into the
    example flow
    e
    d
    j
    • 4
    • 4
  • c

    Christoph Wiese

    03/29/2021, 1:07 PM
    hello one and all I’m trying to analyse the memory consumption of one of my flows, but I cannot figure out how to add guppy to my flow. My attempt was to create two tasks: one to create the hpy object and another to print it. I added dependencies to make the object creator the first task in the flow and the printer the last task in the flow. The heap is actually printed, but only for the task that created the hpy object. Does anybody know how I can have the hpy() object cover the entire flow? (or if there is an alternative to using guppy, that’d be fine as well)
    d
    • 2
    • 40
  • s

    Sean Talia

    03/29/2021, 2:07 PM
    I'm looking through the
    ShellTask
    source because I was trying to see if there's a way to stream the task output when the log level is
    INFO
    . Is it possible? I have a task whose output i'd like to regularly stream to my logs, but I don't want to have the entire flow running in debug mode
    c
    d
    c
    • 4
    • 31
  • j

    Jeff Williams

    03/29/2021, 2:17 PM
    Hello all. I am trying to write a simple task and get the return value from that task. I have been trying to follow https://docs.prefect.io/core/concepts/tasks.html#indexing as what is stated seems pretty straightforward. I have also been looking at the "Multiple Return Values" on that same page. Regardless of what I do, I can't get the expected values of 1 for the Indexing example, and the other values from the "Multiple Return Values" example. Rather, when I am printing the results, I get a <Task: xxxxxxx> type of string displayed. As stated above, I feel like this should be straightforward, but I must be missing something.
    d
    • 2
    • 16
  • s

    Samuel Hinton

    03/29/2021, 2:48 PM
    Hey all! Does anyone know how you can pass kwargs to a task function? Right now I have a task that does a very similar thing with some tweaks depending on keywords a few times.
    @task(timeout=60, nout=2)
    def get_date(
        dt,
        ensure_explicit=False,
        max_late=timedelta(hours=12),
    ):
        # some code
        return None, None
    However, when I try and add to my flow a
    get_date(dt, max_late=timedelta(minutes=1))
    I get
    TypeError: got an unexpected keyword argument
    . I assume the task decorator is wrapping the original function and doesnt pass through any kwargs it doesnt recognise. Is there a subtle keywork Im missing to pass kwargs through, or should I rewrite these functions of mine to utilise class based Tasks?
    m
    • 2
    • 3
  • a

    Atalya

    03/29/2021, 3:09 PM
    Hi all 🙂 I was wondering - is it possible to trigger Prefect workflow externally (using Prefect Core deployed on my machine - not Prefect Cloud)? Thanks for your help!
    d
    • 2
    • 8
  • j

    Joseph Ellis

    03/29/2021, 3:16 PM
    Hi all, Prefect newbie here 🙂. I’m looking for some advice on best practice around sequencing tasks. The tasks aren’t passing any data between each other, but must run in my specified sequence.. I’ve played around with set_upstream/downstream but can’t come up with the schematic I’m after. Use case: Task 1 executes Task 2 and 3 both execute on success of Task 1 Task 4 executes on success of both Task 2 and 3 Any ideas or pointers would be appreciated 🙂
    n
    • 2
    • 2
  • j

    Joseph Ellis

    03/29/2021, 3:39 PM
    And another one from me, on a completely different topic. It looks like Prefect has good support for AWS Lambda, which is awesome. However, I couldn’t see a way to specify the AWS endpoint to use, only the credentials etc. Am I wrong? Our use case for Prefect is to string a series of Lambdas together, and we’re using serverless framework (serverless offline) in our local development environment to run Lambdas on a localhost endpoint. It would be great if I could get Prefect local talking to my local Lambdas.
    👀 1
    d
    • 2
    • 22
  • a

    Amber Papillon

    03/29/2021, 3:40 PM
    Hey guys! I'm new to Prefect as I'm trying new tools. How does Prefect compare to Dagster? Also, would MLFlow complement the use of Prefect?
    k
    k
    d
    • 4
    • 25
  • s

    Sean Talia

    03/29/2021, 4:28 PM
    is there a way to explicitly remove a task from another task's set of upstream dependencies?
    d
    • 2
    • 37
  • x

    xyzy

    03/29/2021, 5:22 PM
    What is the correct format for AWS secret environment variables? I tried the following, but it doesn't seem to work:
    PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS={"ACCESS_KEY": "myaccesskey", "SECRET_ACCESS_KEY": "mysecretaccesskey"}
    k
    • 2
    • 2
  • w

    Will Milner

    03/29/2021, 10:42 PM
    How do you go about setting up a volume for jobs when using the kubernetes agent? I see that
    --job-template
    is an option when using start, but not when using install. I'm able to set up a volume on the agent, but I realized that won't actually make the volumes available to my flows when I run them.
    k
    • 2
    • 2
  • t

    Trevor Kramer

    03/30/2021, 2:39 AM
    Is there a way to use multiline parameters in the Prefect Cloud UI?
    n
    • 2
    • 4
  • e

    Enrique Plata

    03/30/2021, 7:54 AM
    Does anyone have a docker-compose file that spins off all the services for UI, server, etc ?
    f
    • 2
    • 1
  • j

    J. Martins

    03/30/2021, 9:04 AM
    Hi all! I have registered a flow without a schedule and can trigger it fine from the Prefect Cloud UI. When I try to create a schedule for it (in Prefect cloud UI) and enable it for the flow, it does not recognise the schedule I have just created. Also if I refresh the page I can see the schedule “disappeared”, like it has never been saved. Has anyone experienced this or is there something I am missing?
    m
    n
    • 3
    • 13
  • g

    Gopinath Jaganmohan

    03/30/2021, 10:40 AM
    Hi Team, I'm evaluating Prefect as data flow pipeline and started learning and trying out. My setup is Prefect Server, LocalAgent running inside a docker and creating task from jupyter Notebook and registering flow using S3 storage. I could all but when I run getting error "Failed to load and execute Flow's environment: AttributeError("'NoneType' object has no attribute 'setup'")"I know making some simple mistake, any help would be appreciated. Flow is simple hello world. Since my prefect server running remote, I couldn't find option to pass API url to flow, hence created client and used to register, registration works. here is my code for that. import prefect from prefect import task, Flow, client from prefect.storage import Local from prefect.storage import S3 clientP = client.Client(api_server="API URL") storage = S3(bucket="bucket",client_options={"aws_access_key_id" : "awsaccesskeyidXXX", "aws_secret_access_key" : "awssecretaccesskey"}) @task def say_hello():     logger = prefect.context.get("logger")     logger.info("Hello, Cloud!") with Flow("hello-flow",storage=storage) as flow:     say_hello() # Register the flow under the "tutorial" project clientP.register(flow,"learning")
    m
    j
    • 3
    • 10
  • x

    xyzz

    03/30/2021, 2:44 PM
    how do you verify that a DaskExecutor was used to run a flow? There is no info about the executor in the logs, right?
    m
    x
    • 3
    • 14
  • a

    Adam Lewis

    03/30/2021, 3:18 PM
    Hi everyone, when I register a flow with prefect-cloud/prefect-server I can see the task graph on prefect-server on the schematic tab (image shown below) of the UI. I would have thought that dask would calculate that graph. Is that true? If I'm using the dask executor is that graph then recalculated for each flow-run by the dask-scheduler? Or is the task graph calculated by dask at flow registration time and then cached somehow, so it doesn't need to be recalculated? If there is any documentation on this, please direct me there. Thank you!
    m
    • 2
    • 2
  • m

    Marwan Sarieddine

    03/30/2021, 3:24 PM
    Hi folks, when trying to visualize a flow that ran locally using
    flow.run()
    and
    flow.visualize(flow_state=flow_state)
    is there a way to display the duration of the tasks ?
    m
    • 2
    • 2
  • t

    Trevor Kramer

    03/30/2021, 6:18 PM
    Is there an idiom for using case logic to define paramters to tasks without duplicating the entire flow inside the case? Something like this?
    with case(is_rapids(input), True):
        export_result = ExportsTask()(secret_task, 'somekey', stack)
    
    with case(is_rapids(input), False):
        export_result = ExportsTask()(secret_task, 'anotherkey', stack)
    
    MyTask()(input, export_result['training_job_queue'], export_result['training_job_definition'], PipelineArtifactsBucketTask()(secret_task, stack))
    • 1
    • 1
  • c

    Cab Maddux

    03/30/2021, 6:36 PM
    Hi! Quick question, I've upgraded my k8s agent to v0.14.9 and understand based on work here and here that the separate resource-manager has been removed from the k8s agent. I expected the newly deployed agent to clean up successful k8s jobs but finding them hanging around. Anything I'm missing here? Thanks!
    • 1
    • 2
  • s

    Sean Harkins

    03/30/2021, 7:43 PM
    I’m trying to find information on how I can expose the address of the Dask scheduler to use the distributed dashboard https://docs.dask.org/en/latest/diagnostics-distributed.html when using the Temporary Cluster approach described here https://docs.prefect.io/orchestration/flow_config/executors.html#using-a-temporary-cluster. The
    DaskExecutor
    is dynamically creating these temporary clusters https://github.com/PrefectHQ/prefect/blob/05cac2372c57a93ea72b05e7c844b1e115c01047/src/prefect/executors/dask.py#L213 on a per flow basis so I’m unsure how I can obtain the scheduler’s address (and dashboard link) without some hook here in the Prefect code? In an ideal world I would like to have the Dask scheduler’s public address and dashboard link reported as part of the Flow logs in Prefect UI. Any suggestions are greatly appreciated.
    j
    m
    • 3
    • 16
Powered by Linen
Title
s

Sean Harkins

03/30/2021, 7:43 PM
I’m trying to find information on how I can expose the address of the Dask scheduler to use the distributed dashboard https://docs.dask.org/en/latest/diagnostics-distributed.html when using the Temporary Cluster approach described here https://docs.prefect.io/orchestration/flow_config/executors.html#using-a-temporary-cluster. The
DaskExecutor
is dynamically creating these temporary clusters https://github.com/PrefectHQ/prefect/blob/05cac2372c57a93ea72b05e7c844b1e115c01047/src/prefect/executors/dask.py#L213 on a per flow basis so I’m unsure how I can obtain the scheduler’s address (and dashboard link) without some hook here in the Prefect code? In an ideal world I would like to have the Dask scheduler’s public address and dashboard link reported as part of the Flow logs in Prefect UI. Any suggestions are greatly appreciated.
j

Jim Crist-Harif

03/30/2021, 8:18 PM
This is a bit tricky, since we can't be sure what the user-viewable address is based on the scheduler address (i.e. you might be viewing it through several layers of proxies). Usually we recommend dask users configure
distributed.dashboard.link
(https://docs.dask.org/en/latest/configuration-reference.html#distributed.dashboard.link) which can template that out. If you're fine setting that up properly, then I'd be happy to add a log line with the dashboard link during dask executor startup.
s

Sean Harkins

03/30/2021, 9:05 PM
@Jim Crist-Harif I should be able to set the config properly in each Flow definition. One question on
host
information used here. I believe the scheduler.address will report the
host
from the private subnet range https://github.com/dask/distributed/pull/3429/files. For example, my scheduler logs show the private ip rather than the public ENI.
distributed.scheduler - INFO -   Scheduler at:    <tcp://10.0.115.40:8786>
distributed.scheduler - INFO -   dashboard at:                     :8787
I guess this might be a deeper question on how to report the public ip for the container where the scheduler is running rather than it’s private ip.
j

Jim Crist-Harif

03/30/2021, 9:09 PM
Yeah, dask doesn't really have a way for configuring a visible public address for the scheduler (while we do have one for the dashboard). Rather than confusing users, would it be fine to just display the dashboard link for now?
s

Sean Harkins

03/30/2021, 9:12 PM
👍 For sure. The dashboard is really my only concern. But in this case, if I use a scheme like
dask.config.set({"distributed.dashboard.link": "http://{host}:{port}/status"})
won’t my resulting dashboard_link be
<http://10.0.114.40:8786/status>
?
j

Jim Crist-Harif

03/30/2021, 9:14 PM
I'm not familiar enough with ECS to know. For most deployments where we've set this up (e.g. the pangeo k8s deployment) there's a statically known template that will work.
s

Sean Harkins

03/30/2021, 9:23 PM
I guess the issue is that we can’t know the address of the scheduler until the cluster is created by the DaskExecutor. It seems like we’ll need dask-cloudprovider to expose the dashboard’s public ip as a property of https://github.com/dask/dask-cloudprovider/blob/d2072afbeba1c6cd42f5b6c60f9d9690352e9b3c/dask_cloudprovider/aws/ecs.py#L436 when it is available and you can publish this at creation time. Maybe I can try to ping Jacob and see what options we have for this.
👍 1
@Jim Crist-Harif Looking through
dask-cloudprovider
in more detail, it looks like much of this logic is already handled. https://github.com/dask/dask-cloudprovider/blob/main/dask_cloudprovider/aws/ecs.py#L190-L213 But given this, you should be able to use
dashboard_link
and assume the correct address. Where would this be logged? In the Flow log directly?
j

Jim Crist-Harif

03/30/2021, 9:42 PM
Great! I'm adding the log right now actually, should be out in next release.
:party-parrot: 1
s

Sean Harkins

03/30/2021, 9:43 PM
🎊 Thanks so much for looking into it. You guys move fast 😄
@Jim Crist-Harif Is it possible to use the bot to capture this thread as a Github issue so we can use it for tracking? 🙏
j

Jim Crist-Harif

04/01/2021, 1:54 AM
Sure, but tracking for what exactly? I merged the log addition earlier, what outstanding work is there still?
s

Sean Harkins

04/01/2021, 1:56 AM
👍 Ok. No worries then. I was going to ref the issue in
pangeo-forge
for context.
j

Jim Crist-Harif

04/01/2021, 2:01 AM
Sure, I can archive it for that purpose.
@Marvin archive "Display dask cluster info in prefect logs"
m

Marvin

04/01/2021, 2:02 AM
https://github.com/PrefectHQ/prefect/issues/4327
j

Jim Crist-Harif

04/01/2021, 2:03 AM
And the fix for the dashboard logs: https://github.com/PrefectHQ/prefect/pull/4321
View count: 2