https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • z

    Zach

    08/27/2020, 2:33 PM
    I'm getting some odd UI behavior when trying to change the "Items per page" on the Gantt Chart view of a run in Prefect Cloud
    n
    t
    • 3
    • 5
  • j

    Jason Nochlin

    08/27/2020, 5:42 PM
    Quick question on best practices: I'm trying to use "Secrets" with a ShellTask and see two possibilities: 1. Get the Secret when I register the flow, eg:
    from prefect import Flow
    from prefect.tasks.shell import ShellTask
    from prefect.client import Secret
    
    environment = {}
    secret_key = Secret("SECRET_KEY")
    environment['SECRET_KEY'] = secret_key.get()
    
    with Flow(name, schedule=schedule) as flow:
      task(command='./do-the-thing', env=environment)
      flow.register(project_name=project_name)
    2. Use
    prefect.client.Secret
    to get the Secret from within the Task when it starts (similar to how an entrypoint script is often used to set environment variables in Docker environments). eg:
    # register-tasks.py
    from prefect import Flow
    from prefect.tasks.shell import ShellTask
    
    with Flow(name, schedule=schedule) as flow:
      task(command='./do-the-thing')
      flow.register(project_name=project_name)
    
    # do-the-thing
    #!/usr/bin/env python3
    from prefect.client import Secret
    
    secret_key = Secret("SECRET_KEY")
    os.environ['SECRET_KEY'] = secret_key.get()
    Is one of these a recommended over the other as the "best practice" for Prefect?
    c
    d
    • 3
    • 8
  • m

    Mary Clair Thompson

    08/27/2020, 6:35 PM
    Hi folks! What's the proper Prefect method for _removing_/_unregistering_ a flow?
    d
    • 2
    • 20
  • j

    Julien Allard

    08/27/2020, 10:12 PM
    1. Hi all, I have a problem with running flows on a dask-kubernetes cluster with a single worker. I'm sometimes getting
    Unexpected error: TimeoutError()
    error. Sometimes, the error happens before any task are started. I'm really unsure on how to debug this, so any help is appreciated!
    n
    • 2
    • 11
  • j

    Jovan Visnjic

    08/28/2020, 9:13 AM
    Hello! I am trying to use
    PrefectResult
    with prefect core server running on my local machine. According to the docs, I understood that it should cache the result in prefect's database. I am running this simple example, extended from the one in the docs, I just added random raising of exceptions:
    @task(result=PrefectResult())
    def add(x, y=1):
        if random.random() > 0.7:
            raise Exception('I failed on purpose')
    
        return x + y
    
    
    with Flow("my handled flow!") as flow:
        first_result = add.map(list(range(10)), y=unmapped(2))
    When I restart the flow, it doesn't just run the failed mapped tasks, It runs all of them again, also the successful ones. On the other hand it works fine if I use
    LocalResult
    . Am I missing something? Any help would be much appreciated.
    n
    • 2
    • 5
  • w

    William Smith

    08/28/2020, 10:16 AM
    Why is it when a task is set to only trigger if it's upstream fails the said task is then marked as failed? The task was never executed, shouldn't it be marked as skipped?
    m
    c
    m
    • 4
    • 29
  • m

    Miecio

    08/28/2020, 10:34 AM
    Hello I have some problem with Prefect/Dask and secrets, I want to use PostgreSQL tasks from task library, so I need to pass somehow Secret task containing pg password. I thought that using EnvVarSecret will be enough, but seems that env variables are not propagated from prefect agent to dask workers, so my password is always empty. Do you know what solution can be used? or maybe can I assingn those EnvVarSecret tasks to run inside my agent job (where I have all required envs)?
    👀 1
    j
    • 2
    • 4
  • i

    itay livni

    08/28/2020, 2:05 PM
    Hi - Is there a way to
    apply_map
    on another Flow's tasks.? Basically I am trying to replace a particular flow with apply_map..
    j
    • 2
    • 1
  • h

    Hawkar Mahmod

    08/28/2020, 2:17 PM
    I thought it would be worth sharing this to the wider community in case anyone has recent experience with it.
  • h

    Hannah Amundson

    08/28/2020, 6:08 PM
    hello! can a prefect task take in something that isn't a parameter/task/etc? Example:
    @task
    def double_number(number):
      return number * 2
    
    with Flow("name") as flow:
      number = 4
      double_number(number)
    j
    • 2
    • 2
  • e

    Eric Dobroveanu

    08/28/2020, 9:09 PM
    Hello! I have been playing with Prefect (locally) for the last week to evaluate whether it would work for our team. We are doing a number of POCs to determine whether Prefect, Airflow, and other workflow tools would be a good replacement for AWS State Machines. So far the team has really liked Prefect! 🎉 On the surface it would appear to check all of our boxes and has been the best experience out of any of the tools we've tried. We have a few points of contention though, that I would love to get some feedback on (such as the project's roadmap and what a lack of "1.0" means for stability in the future). Part of the reason we are looking for a StateMachine replacement is to minimize developer pain points. Not sure who to reach out to for this so I'm posting here, in the hopes that our team could schedule a call to discuss other aspects of Prefect as well. Thanks!
    😍 2
    c
    • 2
    • 1
  • m

    Minakshi

    08/28/2020, 10:47 PM
    1. Is there a way to pass parameters to via dask executor to flow? The requirement here is to run a flow for different datasets which could be configured either from dask or the caller. And the execution of flows of different datasets could be running in parallel. 2. Is there a way to start the run of a flow using a Flask api? and also can the cancellation of flow be controlled by an api? (I tried doing this but i was getting mutiprocessor errors)
    n
    • 2
    • 14
  • a

    Alex Papanicolaou

    08/29/2020, 8:14 PM
    Hi, we’re getting some mapped child tasks rerun despite success on the first run. Normally, our tasks look like this (logs cleaned up a bit):
    12:10:11 prefect.CloudTaskRunner	Task 'run_simulator[54]': Starting task run...
    12:10:11 prefect.run_simulator[54]	Starting simulator run
    12:10:11 prefect.run_simulator[54]	cusip_list [{'secmnem': 'FNMMA3057', 'cusip': '31418CMF8'}]
    12:10:11 prefect.run_simulator[54]	Loading model 'cf621134-8c36-446a-96b5-7ecde88a33e2'
    12:10:22 prefect.run_simulator[54]	Simulating pool {'secmnem': 'FNMMA3057', 'cusip': '31418CMF8'}
    12:10:31 prefect.run_simulator[54]	Number of replicates 6
    12:11:59 prefect.CloudTaskRunner	Task 'run_simulator[54]': finished task run for task with final state: 'Success'
    Here is an example though (and they don’t appear super common) where the task succeeded and then was later rerun. One thing you can note is that the model id is different. this is randomly generated (not a big deal) but along with the timestamp just confirms that this is repeated run not a duplicated log.
    11:55:34 prefect.CloudTaskRunner	Task 'run_simulator[6]': Starting task run...
    11:55:35 prefect.run_simulator[6]	Starting simulator run
    11:55:35 prefect.run_simulator[6]	cusip_list [{'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}]
    11:55:35 prefect.run_simulator[6]	Loading model 'c410358f-4612-4aef-8f12-e9a3642711de'
    11:56:23 prefect.run_simulator[6]	Simulating pool {'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}
    11:56:36 prefect.run_simulator[6]	Number of replicates 3
    11:57:12 prefect.CloudTaskRunner	Task 'run_simulator[6]': finished task run for task with final state: 'Success'
    12:06:17 prefect.CloudTaskRunner	Task 'run_simulator[6]': Starting task run...
    12:06:17 prefect.run_simulator[6]	Starting simulator run
    12:06:17 prefect.run_simulator[6]	cusip_list [{'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}]
    12:06:17 prefect.run_simulator[6]	Loading model '45322fce-d452-4340-9e06-e7bcc2775b84'
    12:06:27 prefect.run_simulator[6]	Simulating pool {'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}
    12:06:40 prefect.run_simulator[6]	Number of replicates 3
    12:07:15 prefect.CloudTaskRunner	Task 'run_simulator[6]': finished task run for task with final state: 'Success'
    c
    m
    j
    • 4
    • 13
  • j

    Johnny Bravo

    08/29/2020, 8:31 PM
    Hello. Noob question here. I'm trying to download a couple of files from a website and want to do the jobs in parallel and limit the workers (so I can download max 2 files at a time). I've figured out how to limit the download files with this run
    flow.run(executor=DaskExecutor(
            cluster_class=LocalCluster, cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}))
    Not sure if is the right way to do. Now I have this flow
    with Flow("Files downloader") as flow:
            files = get_files()
            downloaded_files = download_file.map(files)
            import_file.map(downloaded_files)
    The problem here is, after first two downloads, it goes to the next download task, instead of getting to import task. So, because I'm limited to 2 workers at a time, I need to prioritize
    import_file
    task over
    download_file
    task. Is there a better way to do this?
    c
    • 2
    • 12
  • h

    Hawkar Mahmod

    08/30/2020, 11:22 AM
    Hi folks, It is possible to map a task that is using a
    LOOP
    signal. If I have a task that calls an API, and has to loop until it has no more data to fetch, can put pass each iteration of the call to a downstream task like so:
    results = transform_data.map(call_api())
    Inside
    call_api
    I am using a
    LOOP
    signal. But I cannot seem to access the loop results in the next task
    transform_data
    . My understanding was that when using this construct, each iteration of the task was it's own task.
  • c

    Chris Goddard

    08/30/2020, 3:45 PM
    Hey folks - I'm having issues getting a docker agent to run flows locally. I'm using prefect cloud - I have confirmed that I can run a non-docker flow locally - and trigger it from the cloud UI (so
    backend
    is correctly set and the runner token is available as an environment variable). however, when I give the flow docker storage and spin up a docker agent, nothing happens when I try to trigger a flow from the ui - no errors, it's just like it's not receiving any instructions from prefect cloud. I am working in WSL2 (widows linux subsystem) - which creates all kinds of hellish networking issues (classic windows) - but I've confirmed that docker is working and I've run the docker image that was created for my flow and run the flow manually within the container by unpickling and running
    flow.run
    The on thing I thought it might be was failure to connect to the docker daemon (in case wsl ran it somewhere else) but I've confirmed that it's running at unix:///var/run/docker.sock (I think earlier versions of WSL had an issue but I don't think that's what's going on). What else could I try? any suggestions?
    prefect diagnostics
    output:
    {
      "config_overrides": {
        "cloud": {
          "agent": {
            "auth_token": true
          }
        },
        "context": {
          "secrets": false
        }
      },
      "env_vars": [
        "PREFECT__CLOUD__AGENT__AUTH_TOKEN"
      ],
      "system_information": {
        "platform": "Linux-4.19.104-microsoft-standard-x86_64-with-glibc2.29",
        "prefect_version": "0.13.4",
        "python_version": "3.8.1"
      }
    }
    n
    • 2
    • 30
  • m

    Minakshi

    08/31/2020, 4:31 AM
    Is there a way to run multiple flows using prefect core? this doesn't seem to work. It only starts flow for the first dataset and then continues after the specified interval.
    for dataset in dataset_config['datasets']:
    print('starting flow for dataset' + dataset['dataset_name'])
    flow.run(dataset=dataset['dataset_name'])  # runs this flow on its schedule
    j
    • 2
    • 2
  • e

    Eric

    08/31/2020, 8:17 AM
    Hello. Noob question here. 1. Is there a way to use python to "quick run" a flow (create a flow-run) which has been registered onto Prefect server? I've seen the doc 'retrieve flow' but after retrieved the flow, flow.run() is executed by python. If I can trigger the "quick run" with python, I can see the log file on Prefect page instead of just on console. Thanks!
    m
    • 2
    • 3
  • s

    Scott Zelenka

    08/31/2020, 1:54 PM
    I'm working on a Flow which calls a rate-limited API, which will occasionally through a 429 exception. Within the 429 exception, it will be explicit in how long it wants the client to wait until it reties, but we already have the retry logic specified in the
    @task
    decorator. So it gets stuck in a perpetual loop, because the server wants the client to wait longer than what was specified in the
    @task
    decorator. The Task Concurrency Limiting feature would reduce the frequency this happens, but would not catch all 429 exceptions. Sometimes this specific server gets overloaded by other traffic, and will dynamically rate-limit all traffic until it has scaled up to handle the additional traffic. I'm guessing that I'd need to write some custom retry logic within the
    task
    to handle the 429 exceptions, but curious if anyone else has a way to pipe the
    Retry-After
    from a 429 into the prefect engine's
    retry_delay
    parameter for similar rate-limited API calls?
    e
    • 2
    • 3
  • m

    Marwan Sarieddine

    08/31/2020, 2:21 PM
    Hi Folks, my flow run is getting stuck on scheduled and is not getting submitted - anyone else facing similar issues ? I am using prefect v0.13.4, prefect cloud, with a kubernetes agent, and a static dask setup -flow is stored using S3 Storage Here is my flow-run id: 68e72aba-f69b-4238-9478-82fa9bfa5706
    j
    • 2
    • 9
  • j

    Jonas Hanfland

    08/31/2020, 3:00 PM
    Hello guys, I'm trying to map over pandas dataframe groups, but I can't quite get it to work even though I am able to iterate over it in a python for-loop just fine. This is what I'm trying to do:
    some_task.map(df.groupby("id"))
    But it gives me:
    KeyError: 'Column not found: 0'
    Does anyone know if mapping over groups is supported and if yes, how? Thanks in advance
    c
    e
    • 3
    • 3
  • v

    Vikram Iyer

    08/31/2020, 3:06 PM
    Hi, I am getting the below error all of a sudden. The setup had been working fine for me before.
    ad-agent_1              |   File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 941, in raise_for_status
    ad-agent_1              |     raise HTTPError(http_error_msg, response=self)
    ad-agent_1              | requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://ad-prefect-apollo:4200/>
    The env variable on the
    agent
    container is:
    PREFECT__SERVER__ENDPOINT: <http://ad-prefect-apollo:4200>
    I am running all the required services inside docker containers using docker-compose file. Can anyone check and help out?
    k
    • 2
    • 8
  • c

    Chris Goddard

    09/01/2020, 1:40 AM
    Hey folks! Any idea why I'm able to get parallelism in my flow when I pass the
    LocalDaskExecutor()
    to the
    flow.run(..
    method but when I pass it to
    LocalEnvironment(executor=LocalDaskExecutor())
    and then pass that to the flow constructor like this:
    with Flow("test", schedule=schedule, environment=LocalEnvironment(executor=LocalDaskExecutor())) as flow:
    I only get one task running at once. Here's my test code:
    @task
    def generate():
        return [x for x in range(0,40)]
    
    @task
    def log_sleep(x):
        logger = prefect.context.get('logger')
        time.sleep(5 + x)
        <http://logger.info|logger.info>(x)
        return x * x
    
    @task
    def collect(lst):
        logger = prefect.context.get('logger')
        <http://logger.info|logger.info>(lst)
    
    schedule = Schedule(clocks=[DatesClock([pendulum.now() + timedelta(seconds=5)])])
    
    with Flow("test", schedule=schedule, environment=LocalEnvironment(executor=LocalDaskExecutor())) as flow:
    
        nums = generate()
    
        results = log_sleep.map(nums)
    
        x = collect(results)
    
    flow.run()
    c
    • 2
    • 32
  • e

    Eric

    09/01/2020, 3:50 AM
    Hi team, I noticed that I register a flow with default Parameter on Prefect, and modify the Parameter value on UI setting->Parameters->default parameter. I click the "QUICK RUN" button at the top of the panel, but the flow seems to use default parameter instead of the value after modified. If I click "RUN" in the upper middle of the panel, modify the json and click run, It works fine. May I ask if this is normal ? Thanks
    c
    • 2
    • 2
  • s

    Sachit Shivam

    09/01/2020, 10:46 AM
    Hello, here to learn to use Prefect for running scheduled ETL tasks!
  • s

    Sachit Shivam

    09/01/2020, 11:18 AM
    Hi guys, I'm trying to run the hello world flow:
    from prefect import task, Flow, Parameter
    
    
    @task(log_stdout=True)
    def say_hello(name):
        print("Hello, {}!".format(name))
    
    
    with Flow("My First Flow") as flow:
        name = Parameter('name')
        say_hello(name)
    
    flow.register('Test')
    I've created a project named "Test" on the Prefect UI and I've set the parameter for "name" as "test" However, the flow keeps failing: I'm unable to figure out what I'm doing wrong Can anyone point me in the right direction?
    c
    • 2
    • 5
  • s

    Sachit Shivam

    09/01/2020, 11:21 AM
    I've set the parameter like so:
  • c

    Caleb Moses

    09/01/2020, 12:59 PM
    Hi there, hopefully a quick question: If I run a task whose output is a file (that will also be loaded in future tasks) what is the standard way to handle this? I've looked at ttps://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target but the example
    func_task
    doesn't actually produce a file target so I'm not sure how to use it.
  • c

    Caleb Moses

    09/01/2020, 1:03 PM
    # Here's an example of what I'm thinking
    corpus_file = 'corpus.txt'
    @task(target=corpus_file, result=LocalResult())
    def write_sentence_corpus(sentences):
        with open(corpus_file, 'w') as fp:
            for sent in sentences:
                fp.write(sent + '\n')
    
    with Flow("text_model") as f:
        sentences = extract_sentences()
        corpus = write_sentence_corpus(sentences)
        ...
        # Do more modelling
  • c

    Caleb Moses

    09/01/2020, 1:09 PM
    I'm used to working with makefiles, but also I don't have a choice to write this file to disk because the model downstream loads a model in order to train
    n
    • 2
    • 1
Powered by Linen
Title
c

Caleb Moses

09/01/2020, 1:09 PM
I'm used to working with makefiles, but also I don't have a choice to write this file to disk because the model downstream loads a model in order to train
n

nicholas

09/01/2020, 1:12 PM
Hi @Caleb Moses - my personal preference when working with file outputs is to return a reference to the file location instead of the file itself. This allows you to more easily recover from failure and keeps your Prefect caches smaller and more manageable (if you’re using them)
View count: 1