https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • k

    Kostas Chalikias

    05/04/2021, 10:26 AM
    Hey team, somehow today our agent is not picking up any flow runs to run. The last run happened around 24 hours ago, and since then it's gone quiet, I restarted it and it just sits there reporting
    Waiting for flow runs...
    We are cloud users, and we don't think we touched anything at all since it was last healthy (it was a bank holiday here yesterday in fact)
    j
    f
    • 3
    • 32
  • t

    Talha

    05/04/2021, 10:58 AM
    Hi,,
  • t

    Talha

    05/04/2021, 10:59 AM
    Is it possible in Prefect to invoke R script with-in a task. I have a code in R script and that can only run in the R-studio, I don't want to convert it into python. How can I invoke that R code with prefect. Is there an interface between R and Prefect
    s
    k
    l
    • 4
    • 51
  • f

    Fabrice Toussaint

    05/04/2021, 1:08 PM
    Hey 🙂, I have a question regarding caching. There are some tasks in my flow for which I cache the results using GCSResult. If I set the 
    cache_for
     parameter to 30 minutes, does this mean that every 30 minutes this task is rerun and cached again? My downstream task fails because this upstream task contains a token (for 1 hour), so it seems as if this this is not happening in my case.
    👋 1
    m
    k
    • 3
    • 28
  • k

    Kelby

    05/04/2021, 1:51 PM
    Is it possible to pass variables between retry attempts?
    z
    m
    • 3
    • 6
  • n

    Nicholas Chammas

    05/04/2021, 4:04 PM
    Is it possible to edit the labels on an agent? If not, I suppose as a workaround I could launch a second agent with the desired labels and tear down the original agent.
    c
    m
    • 3
    • 8
  • e

    eli

    05/04/2021, 5:55 PM
    Running into this on returning to an open tab of my dashboard home screen 😕 Env: Mac Pro - Latest Chrome recreation steps... 1. leave a tab open overnight 2. return to said tab in the morning
    b
    n
    +2
    • 5
    • 9
  • s

    Suraj Mittal

    05/04/2021, 7:34 PM
    Hi Everyone, Wanted to know if its possible to trigger a pipeline based on states of other pipelines? Say I have a pipeline-1 which runs on a daily schedule. I need to run a pipeline 2 immediately after pipeline 1 only if the pipeline 1 succeeds.
    k
    m
    • 3
    • 9
  • a

    Alex Furrier

    05/04/2021, 8:14 PM
    Is there a recommended file format for flat file storage created from a DataFrame? The data may have mixed data types including arrays stored as a value. In the past I've had trouble with complex data types using
    feather
    format and sometimes ran into errors with
    HDF
    as well
    m
    k
    d
    • 4
    • 3
  • j

    Josh Lowe

    05/04/2021, 10:19 PM
    Hey everyone! This morning we've noticed that almost ALL of our flows have started failing with this error.
    Message: [{'path': ['set_flow_run_states'], 'message': '[{\'extensions\': {\'path\': \'$\', \'code\': \'data-exception\'}, \'message\': \'invalid input syntax for type uuid: "3fca7934-6af8-45f7-aae4-ea6fec0339c9:FargateAgent:agent:<agent_name>"\'}]', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    No changes have been made to our platform, and it looks like some flows are able to be run, but then will fail again on subsequent runs. Anyone seen anything like this before? FargateAgent is deprecated, and we have plans to update and move to ECS agent - but I haven't seen any updates in a while that might be causing this
    😓 2
    m
    k
    k
    • 4
    • 21
  • r

    Rajdeep Rao

    05/04/2021, 11:18 PM
    Hey y'all! I have kind of a niche question. I was wondering if there's any way I could have prefect run my flows as
    /bin/sh -c ./bash-script.sh prefect execute flow-run
    ? I would like to run this bash script that basically exports all my env vars that are in a file. Since we use dot-env to run our services, my base docker image already has all my env vars I just need to export them while my tasks spin up. I tried overriding them with run-task-kwargs, in the ECSRunconfig() and by passing in a custom task_def, but for some reason my tasks don't respect these
    k
    • 2
    • 11
  • b

    Brian Keating

    05/05/2021, 12:13 AM
    I'm having trouble figuring out how to run multiple linear "subflows" in parallel with
    .map
    . I would like to: 1. start a batch of worker instances 2. download data on each instance 3. run a job on each instance obviously, the job for a given instance shouldn't start until the data is downloaded onto that instance. however, I don't want to wait until the data is downloaded onto all of the instances before kicking off jobs.
    k
    • 2
    • 10
  • h

    Hui Zheng

    05/05/2021, 1:09 AM
    Hello, does anyone currently experience issues with the flow run in Prefect Cloud? All our flow runs are stuck in submitted state but not running, across all agents and environments.
    k
    • 2
    • 9
  • a

    Adam Brusselback

    05/05/2021, 3:25 AM
    Hey everyone, I am just trying to understand how I could go about loading some context variables from a file as the first task in a number of my flows
  • a

    Adam Brusselback

    05/05/2021, 3:27 AM
    My issue is that after I load the context stuff into my secrets like so:
    @task
    def load_context(config):
    prefect.context.setdefault("secrets", {})
    for name in config.options('Default'):
    string_value = config.get('Default', name)
    prefect.context.secrets[name] = string_value
  • a

    Adam Brusselback

    05/05/2021, 3:28 AM
    that properly sets my secrets within that task, but as soon as that task is done those secrets are no longer accessible from any subsequent tasks in my flow
  • a

    Adam Brusselback

    05/05/2021, 3:29 AM
    What is the better way to go about this? The specific thing I am trying to do, is for a number of my flows, they need to be able to be run "per-client". So they should take in a parameter of the client's key, and load up that clients secrets so they're available for use within my other tasks that do actual work
    c
    • 2
    • 8
  • f

    Fabrice Toussaint

    05/05/2021, 9:44 AM
    Hey everyone, Does anybody know why this happens when I am running my flows after upgrading from Prefect 0.14.13 to 0.14.17? Thank you in advance 🙂
    c
    k
    • 3
    • 31
  • g

    Giovanni Giacco

    05/05/2021, 9:55 AM
    Hello everyone! After some experiments, we are starting to switch to Prefect definitively and we need your help. Is there such a way to use Dask Temporary Cluster, created by Prefect with a
    DaskExecutor
    , inside a task in order to execute any workload on that Dask cluster? In our tasks we have to deal with huge pandas dataframe and we’d like to use the same Dask cluster to parallelize our computation. Any tip?
    :upvote: 1
    a
    k
    • 3
    • 8
  • r

    Robin

    05/05/2021, 10:59 AM
    I am observing some weird behavior related to some of our flows in the last days project 1: flows run properly • with 700 task concurrency, • 4GB memory requested for each dask-pod on an EKS cluster • pods are created and teared down adaptively using
    adapt_kwargs
    project 2: another flow fails before really starting • with 100 task concurrency • 2 GB memory requested • pods are not always properly teared down after flow termination, neither with
    adapt_kwargs
    nor with
    n_workers
    in KubernetesRun Errors in project 2 flows: •
    Unexpected error: KeyError('data')
    •
    arrange error
    •
    futures
    All flows ran properly a couple of days ago and there were only minor changes. Commonalities: • We have one prefect agent running on an EKS cluster with prefect cloud • both flows share the same prefect agent • both flows use ECR Discrepancies: • both flows have different base images • the flows are in different projects Further interesting behavior: • prefect jobs get created but error out even when now flow is running
    k
    • 2
    • 12
  • r

    Robin

    05/05/2021, 1:06 PM
    Dear prefects, I have successfully persisted my first result on S3 (persisted the result of
    battery_id_generator
    ) 🎉 However, the mapped tasks did not persist any results to S3. 🤔 Do I need to specify something for the maps to also persist the results?
    flow_with_s3_result.py
    ✅ 1
    k
    • 2
    • 2
  • c

    ciaran

    05/05/2021, 1:36 PM
    Hey folks! Seeing a strange issue when using
    KubernetesRun
    amd
    DaskExecutor (KubeCluster)
    on AKS. I submit my flow and it runs, I can see the Dask Scheduler pod stand up, but then nothing happens. For example, the flow image provided shows a simple flow that has been running for 17(now 20)+ hours 😮 You can find the flow definition here https://github.com/pangeo-forge/pangeo-forge-azure-bakery/blob/add-k8s-cluster/flow_test/manual_flow.py And you can find the agent conf here https://github.com/pangeo-forge/pangeo-forge-azure-bakery/blob/add-k8s-cluster/prefect_agent_conf.yaml
    👀 1
    r
    m
    +2
    • 5
    • 26
  • r

    Robin

    05/05/2021, 1:56 PM
    Dear prefect people, result caching question: Is it possible to add
    task map input parameters
    to the
    task result location templates
    ? e.g.
    "{flow_name}/{task_name}/{system_id}"
    where
    system_id
    is an input parameter to the task run (e.g.
    task.map(system_id=system_ids, further_input=unmapped(further_input))
    )? I would use it instead of
    {task_run_id}
    , as the
    task_run_id
    might change with every flow run, but the
    system_id
    would be always the same 🤔 Or is there a smarter approach to achieving the same?
    ✅ 1
    k
    • 2
    • 6
  • z

    Zach Schumacher

    05/05/2021, 3:41 PM
    Hey guys! Have a kubernetes run question. I have a use case where I need to dynamically set a kubernetes run config. I want to find out what the latest tag is of a docker image, and then use that same docker image for each task in a flow. I don’t know what the latest tag is at registration time, only at run time. Using :latest does not work, because it is not impossible :latest could change between the first task in a flow and the last task in a flow. Is there a recipe for doing this?
    k
    • 2
    • 18
  • r

    Robin

    05/05/2021, 3:46 PM
    Another question concerning result caching. Let's say we run a flow and there are some failures caused by bugs. Assuming that these bugs only affect the tasks that failed, we would want to fix those bugs locally, reregister the flow and rerun only those tasks that have not been successful before. So I configured the task results to be cached based on all inputs for 48 h as follows:
    from prefect.engine.cache_validators import all
    
    version = "0_1_17"
    
    s3_result_battery_id = S3Result(
        bucket="accure-prefect-results",
        location=f"{{flow_name}}/{version}{{task_name}}/{{battery_id}}.prefect",
    )
    
    import datetime
    
    
    @task(
        result=s3_result_battery_id,
        # max_retries=max_retry,
        # retry_delay=retry_delay,
        # timeout=300,
        log_stdout=True,
        tags=["analysis", "task"],
        cache_validator=prefect.engine.cache_validators.all_parameters,
        cache_for=datetime.timedelta(hours=48),
    )
    However, it seems like the failed task runs aren't rerun either, as the inputs have not changed and therefore the results are read from the cache. How to explicitly tell prefect to rerun those tasks, which have previously failed? Or am I missing something? Bonus: • Is that actually a good philosophy? • I thought of introducing the
    package release version (simply "version" in above code snippet)
    into the mix ◦ so that one can control with the package release version when to rerun all tasks ◦ e.g. due to some feature enhancements in an entirely new flow run, as opposed to just some bugfixes within the same flow release • note:
    package release version != prefect flow version
    • In general we are not yet quite sure how to best handle versioning and staging with prefect 🤔
    ✅ 1
    k
    • 2
    • 29
  • d

    Dimosthenis Schizas

    05/05/2021, 3:47 PM
    Hey everyone, I have a probably noob question. I have created a fairly easy flow with 3 steps, that does some calculations. The execution usually takes 20-30 mins. When I'm trying to register the flow it seems that prefect evaluates it and start calculating. I haven't left it run, but probably after execution ends the flow will be registered, but that doesn't make any sense. Shouldn't the register just pack it and send to the server, or am I missing something here?
    n
    • 2
    • 13
  • j

    James Brink

    05/05/2021, 3:59 PM
    Hi Everyone,
    👋 2
  • j

    James Brink

    05/05/2021, 4:05 PM
    Hi Everyone, I’m new to Prefect and trying to figure out if it would help us improve our data pipeline. Currently we use a proprietary orchestration architecture but we have the desire to move to something open-source with wide adoption. I have looked through the docs and I just have a particular situation of how our pipeline works that I am unsure how to integrate with Prefect. We have some platforms outside of our own cloud resources that collect data and has a subsequent ETL process on our own resources. In many cases these collections may take hours, and because they don’t happen inside the executer itself it seems atypical to most of the examples I have found in the docs. Really what I need to happen is for the Prefect flow to start a data collection via an API call to our outside service and then stay is some sort of ‘running’ state until it could be signaled by our outside service (maybe via API call?) that the collection is complete. Then the subsequent task in the flow would download that collected data from S3 and trigger subsequent tasks for our ETL process. Is there any way to 'push' a notification to a Prefect task to signal it to complete? I read through the docs and came up with the code below which would be more like a polling/listening solution (if I have even done it correctly at all). Would this work? is there a better way?
    import prefect
    from prefect import task, Flow
    from prefect.engine.signals import RETRY, FAIL, PAUSE
    from datetime import datetime, timedelta
    
    
    @task
    def start_collection(timeout_interval, collection_name):
        invoke_outside_service(collection_name)
        timeout_time = datetime.now() + timedelta(seconds=timeout_interval)
        return timeout_time
    
    
    #can I use code to define max retries? ideally 'max_retries=timeout_interval//600 + 2' to retry every 5 minutes until timeout period ends
    @task(max_retries=50, retry_delay=timedelta(seconds=300))
    def check_collection_status(timeout_time, collection_name):
        status = poll_outside_service_status(collection_name)
        if status == 'complete':
            return '<s3://nitorum-data/>' + collection_name + '/' + datetime.now(
            ).strftime('%Y-%m-%d') + '.csv'
        elif status == 'failed':
            raise FAIL(message='Collection failed.')
        elif status == 'running':
            if datetime.now() > timeout_time:
                raise PAUSE(message='Collection timed out.')
            else:
                raise RETRY(message='Collection still running.')
    
    
    @task
    def load_data(path):
        data = download_data_from_s3(path)
        return data
    
    
    @task
    def transform_task(data):
        data = transform(data)
        return data
    
    
    @task
    def save_data(data):
        save_to_database(data)
    
    
    with Flow("ETL with Outside Service Data Collection") as flow:
        timeout_interval = Parameter("timeout_interval", default=3600)
        collection_name = Parameter("collection_name", default="colleciton_1")
    
        timeout_time = start_collection(timeout_interval, collection_name)
    
        path = check_collection_status(timeout_time, collection_name)
    
        data = load_data(path)
    
        data = transform_task(data)
    
        save_data(data)
    k
    • 2
    • 11
  • a

    Adam Brusselback

    05/05/2021, 4:25 PM
    Hey again, so I am obviously doing something wrong since I seem to be unable to create a flow that takes in host/port/etc for a PostgresExecute task at runtime rather than definition time since those are not available during definition time for me...
    k
    • 2
    • 12
  • a

    Adam Brusselback

    05/05/2021, 4:27 PM
    getting this warning:
    UserWarning: A Task was passed as an argument to PostgresExecute, you likely want to first initialize PostgresExecute with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:
    
      my_task = PostgresExecute(...)  # static (non-Task) args go here
      res = my_task(...)  # dynamic (Task) args go here
    
    see <https://docs.prefect.io/core/concepts/flows.html#apis> for more info.
Powered by Linen
Title
a

Adam Brusselback

05/05/2021, 4:27 PM
getting this warning:
UserWarning: A Task was passed as an argument to PostgresExecute, you likely want to first initialize PostgresExecute with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:

  my_task = PostgresExecute(...)  # static (non-Task) args go here
  res = my_task(...)  # dynamic (Task) args go here

see <https://docs.prefect.io/core/concepts/flows.html#apis> for more info.
View count: 1