https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Aakash Indurkhya

    01/04/2021, 10:45 PM
    Hi all - new to Prefect and trying to explore and determine if it makes sense for our project. I will post a screenshot with a gantt chart in a moment but basically i am wondering why there are several seconds of latency between click to run a flow and actually having the flow run. From what i've seen there is like a 5-10 second delay.
    k
    • 2
    • 4
  • a

    Aakash Indurkhya

    01/04/2021, 10:58 PM
  • a

    Aakash Indurkhya

    01/04/2021, 10:58 PM
    Basically I want to understand what is causing that 20 second latency and also what can be done to reduce that latency substantially.
  • j

    Jeremy Phelps

    01/04/2021, 11:03 PM
    Hi all. Am I correct in my understanding that Prefect Cloud delegates all the real work to our own infrastructure, which we are responsible for setting up and maintaining?
    👍 1
    j
    c
    • 3
    • 11
  • r

    Rob Fowler

    01/05/2021, 12:25 AM
    I think this has been asked multiple times but maybe there is not an answer. Is there any way, hack or otherwise to get task parameters in a state? When a mapped task fails I need to be able to catch the failure so I can put the 'id' into the list so I can come back later and actually do something. For example, in the following working example, I would like the 'item' to be available. Otherwise maybe the way is to re-iterate through the results in the linear list in a new unmapped task and fix up the failed errors?
    from time import sleep
    
    from prefect.engine import state
    from prefect import task, Task, Flow, context
    import prefect
    
    from prefect.engine.executors import LocalDaskExecutor, LocalExecutor
    
    
    def ignore_timeout_handler(task, old_state, new_state):
        print(f"{prefect.context.parameters} old {old_state} new {new_state}")
        if new_state.is_failed() and isinstance(new_state, state.TimedOut):
            return_state = state.Success(result={"state": "forced ok"})
        else:
            return_state = new_state
        return return_state
    
    
    @task
    def produce_range():
        return range(5, 10)
    
    
    class SlowTask(Task):
        def run(self, item, sleep_time=9, **kwopts):
            sleep(sleep_time)
            # doing stuff with a host called 'item'
            return item
    
    
    with Flow("Slow flow") as flow:
        slow_task = SlowTask(timeout=6, max_retries=2, retry_delay=2, state_handlers=[ignore_timeout_handler])
        nrange = produce_range()
        result = slow_task.map(item=nrange,
                               sleep_time=nrange)
    
    # executor = LocalDaskExecutor(scheduler="threads", num_workers=10)
    executor = LocalExecutor()
    for ii in flow.run(executor=executor).result[result].result:
        print(ii)
    k
    • 2
    • 16
  • a

    Alex Koay

    01/05/2021, 3:03 AM
    How do I run a task from another task?
    c
    • 2
    • 3
  • e

    emre

    01/05/2021, 1:44 PM
    Hi everyone, I am trying to wrap my head around result caching 😅 . On a core only run on my workstation, I keep failing to reuse my result on a long running task. My latest attempt is as follows:
    meta_df = SnowflakePandasResultTask(
                db=SNOW_DB,
                checkpoint=True,
                result=LocalResult(dir=".prefect_cache"),
                cache_for=timedelta(days=14),
                cache_key="snow_pandas_out",
            )(query=info_query)
    This persist files with arbitrary names under
    .prefect_cache
    . On every run I get a warning that my cache is not valid anymore, Can anyone point me to where I am doing things wrong?
    c
    • 2
    • 6
  • c

    Chris Jordan

    01/05/2021, 3:15 PM
    Is there a way to map
    StartFlowRun
    ? I've got a task
    spawn_time_series_import = StartFlowRun(
            project_name="python_imports", 
            flow_name="blast_metric_series_flow")
    which takes an argument
    spawn = spawn_time_series_import(
                    parameters=dict(action_id=blast_id))
    and a task that returns a list of `blast_id`s with which I want to spawn many flows
    blast_id_list = push_records_to_summary_table(imported)
    I want to put all this together in something like this, but I'm having trouble finding the right syntax
    spawn_time_series_import = StartFlowRun(
            project_name="python_imports", 
            flow_name="blast_metric_series_flow")
    
    
    
    with Flow("blast_import_flow",
            schedule=daily_schedule,
            state_handlers=[cloud_only_slack_handler]
            ) as flow:
    
        [[[stuff]]]
        blast_id_list = push_records_to_summary_table(imported)
        spawn = spawn_time_series_import.map(
                    parameters=dict(action_id=blast_id_list))
    is there an accepted syntax for this?
    j
    m
    • 3
    • 6
  • d

    Dilip Thiagarajan

    01/05/2021, 4:03 PM
    hi all, what’s the analog for
    flow.serialized_hash()
    in 0.14.0?
    s
    • 2
    • 1
  • h

    Hui Zheng

    01/05/2021, 7:17 PM
    Hello, I have a question about objects that we need to pass from task to task. In our case, we have a flow-config object, which is an array of dictionary json that is fetched config-object from firstore at task_2 of our flow and need to be passed throughout every task in the flow. Could we define a global variable in python file to hold this config-object, or shall we define it as a function param in every task and pass it through?
    j
    • 2
    • 7
  • j

    jeff n

    01/05/2021, 7:21 PM
    Question about memory usage since the system passes results back from tasks. If I have task A pull 100MB of data from a data store, then task B takes that 100MB and does some data transforms on it and returns 100MB of data, then task C does the same and returns 100MB. Does that flow need 300MB to run? If so how do you avoid exploding memory when using small tasks?
    j
    • 2
    • 2
  • h

    Hui Zheng

    01/05/2021, 9:08 PM
    Hello, I would like to know what’s the best way to organize the code for setting up a development workflow for a prefect flow. I would like to do the following in my workflow 1. define/update a flow 2. test run my flow (built in a docker storage) with a local executor 3. When step 2 passes, I could register the same flow (built in a docker storage) to prefect cloud Is there any best practice or code example? We currently also use another docker container to set up the local-test/build environment. is there a better approach?
    j
    • 2
    • 11
  • m

    Marwan Sarieddine

    01/05/2021, 10:38 PM
    Hi folks, wanted to say that I really like the new cloud UI design - but I am facing a weird error in the interactive API and I am wondering if there might be a bug
    n
    • 2
    • 5
  • b

    Braun Reyes

    01/05/2021, 10:57 PM
    Hello, we are looking to use the S3 storage/ecs run config combination. We have a process where we register a flow to a UAT project on merge to master and register the flow to a production project on release. We want to leverage the stored as script functionality. What I want to do here is on the merge to master push the flow .py file to S3 manually and then register the flow without having the storage object push it while still having the flow run use this artifact I pushed. This is helpful so that when I release...the artifact for the commit id I am releasing is already in S3 and I can then register the production flow using that same flow .py file that is already in S3. I was trying to make sense of
    If neither are set then script will not be uploaded and users should manually place the script file in the desired key location in an S3 bucket.
    I could not gather how to make sure the flow run will use the S3 artifact.
    j
    • 2
    • 2
  • v

    Verun Rahimtoola

    01/06/2021, 5:00 AM
    hi, we're trying to compose a flow of flows as shown here: https://docs.prefect.io/core/idioms/flow-to-flow.html. we'd like to able to say "task A1 in flow A needs the outputs from task B1 of flow B". how can we accomplish this? the examples on that page seem to indicate that basically we have to wait for upstream flows to finish before the downstream flows can get triggered, but it doesn't show how to pick out individual outputs from an upstream flow and feed them into specific tasks in downstream flows. any pointers/suggestions? looking for an idiomatic prefect way to accomplish this, if possible...cheers
    j
    j
    • 3
    • 3
  • g

    Greg Roche

    01/06/2021, 9:52 AM
    Good morning all! Has anybody had any experience with solving this very transient error with the S3Upload task?
    Unexpected error: KeyError('endpoint_resolver')
    We have a daily flow which includes around 25 mapped S3Upload tasks, and approximately once every five flow runs, one single mapped S3Upload task fails with this error. We're not doing anything particularly complicated or novel with the task, and the flow always succeeds when restarted, so I'm curious to know if anyone else has experienced this error and knows of a fix. More details in the thread, and thanks in advance for any help 🙂
    j
    p
    • 3
    • 8
  • l

    Lukas N.

    01/06/2021, 4:08 PM
    Hello Prefect community. I have an issue with retrying failed flows runs with checkpointing where some tasks have secret outputs. Since their output value is not persisted I would expect them to be re-computed when restarting the flow run, but they are just always
    None
    . Reproducible example in thread. Thanks in advance for any help 🙂
    j
    m
    • 3
    • 7
  • p

    Pedro Martins

    01/06/2021, 6:17 PM
    Hey everyone! I'd like to have some suggestion on how to make tasks to run in parallel using dask cluster. I'm following this tutorial https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html Here is the flow I want to run. Note that I'm connecting to dask-cluster on my k8s cluster.
    custom_confs = {
        "run_config": KubernetesRun(
            image="drtools/prefect:dask-test", 
            image_pull_secrets=["regcred"], 
        ),   
        "storage": S3(bucket="dr-prefect"),
        "executor": DaskExecutor(address="dask-scheduler.dask.svc.cluster.local:8786")
    } 
    
    with Flow("dask-example", **custom_confs) as flow:
        incs = inc.map(x=range(100))
        decs = dec.map(x=range(100))
        adds = add.map(x=incs, y=decs)
        total = list_sum(adds)
    Ideally I want to have
    incs
    and
    decs
    running in parallel in different workers but they are running asynchronously as you can see in the image below. I tried with 3, 5 and 10 dask-workers but the tasks doesn't run in parallel. What do you suggest?
    j
    a
    w
    • 4
    • 30
  • v

    Verun Rahimtoola

    01/06/2021, 9:30 PM
    hi, is it possible to trigger a run of an old version of a flow? this would be useful for us in an ml ops scenario where we need full reproducibility
    j
    • 2
    • 12
  • c

    Charles Lariviere

    01/06/2021, 9:37 PM
    Hey folks 👋 I have a Kubernetes agent deployed on EKS with the
    prefect agent kubernetes install
    command. I’m running into
    ImportError
    when trying to run a flow that uses the
    SnowflakeQuery
    Prefect task.
    Failed to load and execute Flow's environment: ImportError('Using `prefect.tasks.snowflake` requires Prefect to be installed with the "snowflake" extra.')
    I’m wondering where I should specify the dependencies for this flow? I understand that if I was using the
    Docker
    storage I would define those in
    python_dependencies
    , but I’m not sure where I should define those without using the
    Docker
    storage.
    j
    • 2
    • 3
  • j

    Javier Velez

    01/07/2021, 5:56 AM
    I currently would like to add prefect code to my current setup: shell script that does a spark-submit on a python file. I can use shell-task in order to call the shell script and have this registered as a flow. But how would I go if I want also to monitor or assign tasks to my code inside the python code. Basically registering also the python code as a flow. In the end have two registered flows: Flow A for the shell script and Flow B for the python script.
    d
    • 2
    • 2
  • j

    Joël Luijmes

    01/07/2021, 10:19 AM
    Hey! I got a feature request for the UI of Prefect. Maybe someone else already raised it, but failed to find it. But I’d like to add some sort of documentation on a flow, which would be shown in the info block (or somewhere else). Preferably, in markdown, but just plain string to be able to describe the flow would be nice 🙂
    ➕ 5
    d
    • 2
    • 3
  • a

    ale

    01/07/2021, 2:37 PM
    Hi folks, I’m playing with Slack notifier and I’m wondering if: • is possible to “disable” notifications (i.e: setting something at Prefect Server startup to completely disable notifications, could be super useful for local testing/development) • is possible to send notifications on multiple channels • is possible to set the Slack webhook URL from an AWS Secret • is possible to customize the notification message for each task
    👀 1
    d
    • 2
    • 3
  • e

    Eric

    01/07/2021, 3:46 PM
    Hi! I have a legal question about the "Prefect Community License" (https://www.prefect.io/legal/prefect-community-license/) We have an app that does computations which we show the result of to our customers, and we are thinking about using Prefect to orchestrate our computations, but it's unclear whether the license allows us to do this?
    b
    j
    a
    • 4
    • 18
  • c

    Charles Lariviere

    01/07/2021, 4:20 PM
    Hey folks 👋 I’m a bit confused with how Secrets are behaving with this flow/config. I have production Secrets set up in Prefect Cloud, local dev Secrets in
    ~/.prefect/config.toml
    , a Kubernetes agent, and a Flow configured with
    Docker
    storage. When I register the flow, it looks like my local dev credentials are packaged in the Docker image for that flow, and the flow runs deployed through Prefect Cloud, running on our Kubernetes agent, do not use the Secrets logged in Prefect Cloud — they instead use my local secrets. The only way I have found for that not to happen is to comment out or delete my local config before registering the flow. Is that expected? If so, how does one ensure that devs to not accidentally package their local credentials when registering flows?
    d
    m
    m
    • 4
    • 64
  • b

    Brett Naul

    01/07/2021, 6:52 PM
    quick q on Pause behavior: is there any way to know inside a task (e.g. from the prefect context) whether the current run was restarted manually (i.e. by clicking Approve in the UI)? I'm aiming for something like the
    manual_only
    trigger but that can be achieved at runtime instead. so something like
    if condition and not prefect.context["manual_approval"]:
        raise signals.PAUSE()
    d
    m
    • 3
    • 6
  • j

    Jeff Williams

    01/07/2021, 7:39 PM
    Hi all. I hope I am in the right channel. I am trying to set up Prefect in a GCP account using a micro-VM as discussed in this article: https://medium.com/the-prefect-blog/prefect-server-101-deploying-to-google-cloud-platform-47354b16afe2. What is happening is that I an getting time out errors on the spin up of tmp_apollo_1 and tmp_towel_1 that say "(UnixHTTPConnectionPool (host='localhost', port=None): Read timed out. (read timeout=60)". Then there is some reference to a possible slow network. Everything else up to that point has worked fine, per the article. When I ping my localhost from the ssh window, it responds immediately, so I don't think it is a slow network issue. I could possibly be a configuration issue as the environment I am working in has to be tightly controlled due to compliance issues. But I think I have the appropriate Firewall Rules in place to allow the right things. Again, using the article as my road map. Any help would be greatly appreciated.
    m
    n
    • 3
    • 88
  • a

    Anish Chhaparwal

    01/07/2021, 8:00 PM
    hey im trying to use classes with prefect but running into the following... can someone point out the correct way of doing this?
    from prefect import Flow, Task, Parameter
    class PrintTheStatements(Task):
        
    def task_1():
            
    print(f"first name is {self.firstname} and last name is {self.lastname}")
        
    def run(self, firstname, lastname):
            
    self.firstname = firstname
            
    self.lastname = lastname
            
    task_1()
    if __name__ == "__main__":
        
    with Flow("ClassTask") as flow:
            
    firstname = Parameter("firstname", default="anish")
            
    lastname = Parameter("lastname", default="chhaparwal")
            
    apt = PrintTheStatements()
            
    result = apt(firstname=firstname, lastname=lastname)
        
    flow.run()
    a
    • 2
    • 5
  • v

    Verun Rahimtoola

    01/07/2021, 10:14 PM
    hi, is it safe to assume that `flow_run_id`s are unique across all flow runs of a flow, regardless of the flow version?
    d
    • 2
    • 3
  • v

    Verun Rahimtoola

    01/07/2021, 10:57 PM
    hi, is it ok to introduce a custom
    Result
    subclass to support our own result storage backend?
    d
    • 2
    • 4
Powered by Linen
Title
v

Verun Rahimtoola

01/07/2021, 10:57 PM
hi, is it ok to introduce a custom
Result
subclass to support our own result storage backend?
just wondering if this would break anything, or if it's unsupported
d

Dylan

01/07/2021, 10:59 PM
Hi @Verun Rahimtoola, I am 99% sure we support custom result classes but let me double check
v

Verun Rahimtoola

01/07/2021, 11:00 PM
thank you Dylan
d

Dylan

01/07/2021, 11:05 PM
Yessir! Custom results are 👍
🙌 1
🎉 1
View count: 2