https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • n

    Nicholas Chammas

    08/09/2021, 4:33 PM
    I’m trying to construct a schedule with
    parameter_defaults
    , where the defaults are derived from the date that the schedule is triggered on. In other words, I’m looking for something like this:
    schedule = Schedule(
        clocks=[
            IntervalClock(
                interval=timedelta(days=1),
                parameter_defaults={"version": date.today().isoformat()},
            ),
        ],
    ...
    Except I don’t want
    date.today().isoformat()
    to be calculated just once when the flow is registered and then fixed for all time. I want it to be calculated at the time the clock fires. How do I do this? Pass in a lambda as the value in the
    parameter_defaults
    dictionary? Construct a dict-like object that calculates the value on key lookup? I feel like I’m thinking about this incorrectly.
    k
    • 2
    • 3
  • m

    Mehdi Nazari

    08/09/2021, 4:41 PM
    Hi All, Is there a mechanism to apply on a task so that we would know if a task (or every task if that matters) within a flow is stuck in a specific state for a prolonged amount of time?
    k
    • 2
    • 10
  • k

    Kyle McChesney

    08/09/2021, 4:52 PM
    Wondering if anyone can help with the following use case. I have a project that contains a number of flows, which I would like to register to an internal backend using a json format, but I also need to run the build in a CI/CD system (which includes multiple environments), this means that I need to configure the
    run_config
    and
    storage
    attributes for each flow before registration.
    k
    • 2
    • 25
  • k

    Kyle McChesney

    08/09/2021, 4:53 PM
    Is it possible to cleanly replicate the
    prefect build ...
    CLI command directly within a custom python script
  • n

    Nicholas Chammas

    08/09/2021, 7:51 PM
    Quick question about schedules. I want a “once per quarter” schedule, so I came up with this:
    schedule = Schedule(
        clocks=[IntervalClock(interval=timedelta(days=1)),],
        or_filters=[
            between_dates(start_month=1, start_day=1, end_month=1, end_day=1),
            between_dates(start_month=4, start_day=1, end_month=4, end_day=1),
            between_dates(start_month=7, start_day=1, end_month=7, end_day=1),
            between_dates(start_month=10, start_day=1, end_month=10, end_day=1),
        ],
    )
    Is that idiomatic Prefect? Or is there a more natural way of doing this? Another potential solution is to use
    IntervalClock
    with an interval of 3 months, except you can’t say
    timedelta(months=3)
    and get the interval to fire on the first of the month easily.
    k
    • 2
    • 4
  • c

    Cab Maddux

    08/09/2021, 8:52 PM
    Hi! Quick question about the equality operator. Noticed that
    prefect.core.task
    understandably does not override
    __eq__
    __neq__
    methods to be able to suppoort task comparisons as explained in
    is_equal
    method docstring: https://github.com/PrefectHQ/prefect/blob/1959eecf1bbbb8e3b194b288c197a6108deb8693/src/prefect/core/task.py#L924-L938 Just to confirm, this would be an appropriate way to resolve task result equality within a flow, right?
    with Flow('my-flow') as flow:
        task_a = get_task_a_result()
        task_a_is_value = task_a.is_equal('value')
    k
    m
    • 3
    • 11
  • y

    YD

    08/09/2021, 11:18 PM
    When creating a state handler, how can I tell when a task fails the last retry? tried
    def my_state_handler(obj, old_state, new_state):
        if new_state.is_finished() and new_state.is_failed():
            # send notification
    but this did not work I want to send a notification only if the last retry fail
    k
    • 2
    • 11
  • m

    Michael Booth

    08/10/2021, 1:23 AM
    Hi 🙂 - trying to run the intro tutorial i.e. https://docs.prefect.io/core/tutorial/01-etl-before-prefect.html and I've having troubles with the OpenSky API by the look of it (presumably all APIs have bad days?) Should I just wait til it gets "better" to run this part of the tutorial or is something else going on? Cheers, Michael
    (prefect) mjboothaus@Michaels-Air tutorial % python 01_etl.py 
    ...TRUNCATED - See thread
    As a follow-up -- I ran the 2nd part of the tutorial
    02_etl_flow.py
    and I guess this illustrates the more helpful messages/info provided by the prefect library when tasks fail
    (prefect) mjboothaus@Michaels-Air tutorial % python 02_etl_flow.py
    ...TRUNCATED... See thread
    SOLVED - seems API was down - now OK
    k
    • 2
    • 7
  • b

    Brad I

    08/10/2021, 1:53 AM
    Just wondering if the full graphql schema is hosted anywhere? I’m trying to use a codegen (https://www.graphql-code-generator.com/docs/getting-started/codegen-config) to generate bindings for a typescript/nodejs application. I tried to use https://api.prefect.io/graphql but it wasn’t able to download the schema.
    c
    • 2
    • 2
  • h

    haven

    08/10/2021, 7:12 AM
    hi team, wondering if we could override/enrich the prefect context? i.e. I would like to have a task that does
    from prefect import context as prefect_context, task
    
    @task
    def enrich_context():
        prefect_context["custom_key"] = 1
    
    @task
    def main_task():
        value = prefect_context["custom_key"]
        print(value)
    
    with Flow("test-enrich_context") as flow:
       _enrich_context_task = enrich_context()
       main_task(upstream_tasks=[_enrich_context_task])
    k
    • 2
    • 3
  • s

    Salohy

    08/10/2021, 8:13 AM
    Hy every one, I am using Docker storage for my flow and get this error
    ModuleNotFoundError: No module named 'utils'
    when running
    python file.py
    It seems like that my local package utils is not found during docker build. Here is the structure of my folder. 
    src
    	utils/
    	file.py
    	__init__.py
    I am using a custom docker image where I already copied every thing in src in the image. Please help me on this. Any help is appreciated 🙏Many thanks already
    g
    k
    • 3
    • 6
  • i

    Italo Barros

    08/10/2021, 11:54 AM
    Hello everyone, it's possible to execute multiple flows where each one uses a specified virtualenv (I'm using conda by the way)? For example, let's suppose that I have two envs with different libraries in each one, one in Python 3.6 (env_1) and another in Python 3.8 (env_2), and I want to run two Flows, one using the env_1 and another the env_2. I know that a possible approach would be using a containerized application, but since I do some file/folder manipulation on the host, I think that would be tricky to do with docker (please correct if I'm wrong). Obs: I'm using the Prefect Cloud
    d
    k
    • 3
    • 3
  • k

    Kien Nguyen

    08/10/2021, 12:12 PM
    Hi guys, is there any guide of monitoring Prefect Agent using tools like New Relic or Datadog?
    i
    k
    • 3
    • 4
  • x

    Xyp Jn

    08/10/2021, 12:56 PM
    Hi people, I'm getting this weired error while using custom Docker image with DockerRun on Prefect Server, the error pops up immediately after the agent pulled the image successfully, and no task has been executed yet. I didn't use
    rstrip
    anywhere in my code and the flow runs smoothly with LocalRun. I wounder is it possible for me to get a full stacktrace on this error... Thanks for your help!
    k
    • 2
    • 6
  • t

    Tim Enders

    08/10/2021, 1:22 PM
    How do I change logs to debug when running a flow with
    prefect run
    ?
    • 1
    • 1
  • p

    Philip MacMenamin

    08/10/2021, 1:57 PM
    Hi - can someone point out what I'm missing in the following set up - I have a Server stood up, and separate machine running:
    prefect agent local start --api http://<server_IP>:4200
    k
    • 2
    • 23
  • p

    Pedro Machado

    08/10/2021, 2:47 PM
    Hi there. I am using the new
    get_task_run_result
    and
    create_flow_run
    tasks described here. I'd like to treat the child flow as a single unit that can be retried or restarted if the flow fails. Currently, these are two separate tasks and I haven't been able to set them up this way. How could I change my flow to support retries/restart when the child flow fails?
    k
    m
    • 3
    • 4
  • j

    Joe Hamman

    08/10/2021, 3:35 PM
    Hi folks - we’re setting up a new prefect agent in our Kubernetes cluster on Google Cloud and I could use some pointers getting the correct permissions configured. I’m starting with this:
    prefect agent kubernetes install -k $KEY --namespace=staging --rbac | kubectl apply --namespace=staging -f -
    I then submit a flow that uses the
    KubernetesRun
    config and
    GCS
    storage configured as:
    run_config = KubernetesRun(cpu_request=2, memory_request="2Gi", image='<http://gcr.io/carbonplan/hub-notebook:c89f7f1|gcr.io/carbonplan/hub-notebook:c89f7f1>', env={'TZ': 'UTC'})
    storage = GCS("carbonplan-scratch", project='carbonplan')
    This results in an error message like this:
    └── 23:15:59 | INFO    | Submitted for execution: Job prefect-job-d8a8a648
    └── 23:16:05 | INFO    | Entered state <Failed>: Failed to load and execute Flow's environment: Forbidden('GET <https://storage.googleapis.com/storage/v1/b/carbonplan-scratch?projection=noAcl&prettyPrint=false>: Caller does not have storage.buckets.get access to the Google Cloud Storage bucket.')
    So, I gather my agent doesn’t have the correct IAM privileges to read the flow from GCS. Next I tried adding a service account to my agent:
    prefect agent kubernetes install -k $KEY --namespace=staging --service-account-name pangeo --rbac | kubectl apply --namespace=staging -f -
    Here I’m pointing to my kubernetes service account called
    pangeo
    which has been given
    storage.objectAdmin
    permissions. However this results in the same error as above. So now I’m wondering if I’m missing something more fundamental here. If anyone has suggestions on where to look for more details on setting up prefect on GKE, I’d certainly appreciate it.
    k
    m
    m
    • 4
    • 7
  • n

    Nivi Mukka

    08/10/2021, 6:02 PM
    Hi folks, how can I write a file to the prefect working directory? I have defined my own path using the
    prefect_directory
    param in the
    Docker
    storage. I am able read files from that defined path but I cannot write files to that directory - these are files that will be used in the flow execution. Getting
    [Errno 13] Permission denied: my/path/filename
    k
    • 2
    • 5
  • m

    Mehdi Nazari

    08/10/2021, 6:24 PM
    Hi All, What is right way to access a returned result of a task which is a python class object? In my case I have a task that returns a python object; but attempting to access it with simple variable assignment seems to output NULL as a value for that value.
    k
    • 2
    • 15
  • l

    Leon Kozlowski

    08/10/2021, 6:28 PM
    Hi all, is it safe to say that the job template inherits the image from the dockerfile used for Docker storage when passed to
    KubernetesRun
    at runtime as an image is not required for the job spec?
    ✅ 1
    • 1
    • 1
  • y

    YD

    08/10/2021, 7:37 PM
    how can we control the size of the logs to make sure they do not get to large on one hand, but to make sure it keeps enough history ?
    k
    s
    • 3
    • 13
  • n

    Nivi Mukka

    08/10/2021, 8:08 PM
    Hi Team, I have Prefect cloud setup on GKE cluster with Dask Gateway. I would like to leverage all the power of Dask (I’m assigning 12-20 workers, each worker is set to have 16GB memory) when reading/writing data to/from BigQuery. I have
    pandas.read_gbq()
    or
    <http://pandas.to|pandas.to>_gbq()
    defined within my tasks but the execution of flow seems to be using only one Dask worker instead of distributing the load and I’m getting into memory/timeout issues. I’m getting errors and warnings like this in the GKE dask worker logs:
    INFO - Event loop was unresponsive in Worker for 4.40s. This is often caused by long running GIL holding functions or moving large chunks of data. This can cause timeouts and instability. 
    
    WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 6.40 GB -- Worker memory limit: 8.59 GB.
    
    WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 6.89 GB -- Worker memory limit: 8.59 GB
    
    WARNING - Worker exceeded 95% memory budget. Restarting.
    /opt/conda/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown.
    
    distributed.utils_perf - INFO - full garbage collection released 842.28 MB from 584 reference cycles (threshold: 10.00 MB)
    distributed.nanny - INFO - Worker process 10 exited with status 1
    Do I have to use Prefect’s
    BigQueryTask
    class if I want the
    DaskExecutor
    to be utilizing all of its workers and cluster options as set? or Do I have to change something in the Dask Gateway config? How do I tell Prefect to use all the assigned Dask workers when running tasks?
    k
    • 2
    • 16
  • b

    Brett Naul

    08/10/2021, 8:18 PM
    @Kevin Kho just curious, do you think there's a potential way to leverage the PandasSerializer from https://github.com/PrefectHQ/prefect/pull/3020 in conjunction with the question here? the
    to_dict()
    approach that you mentioned in this thread is similar to what we're doing (we return
    list(df.itertuples())
    instead) but it's not ideal for the same reasons that
    PandasSerializer
    is nice in general compared to using cloudpickle. I can't really think of anything off the top of my head though that wouldn't require messing with the .map internals (cc @Chris White @Michael Adkins too in case y'all have any clever ideas). can also make a discussion on github if that's preferable
    k
    • 2
    • 6
  • l

    Leon Kozlowski

    08/10/2021, 8:52 PM
    Is it possible to separately apply a job_template with kubectl and register a flow without passing the job_template? I think that would be done by passing the image explicitly in the
    KuberneetsRun
    ?
    k
    s
    • 3
    • 10
  • k

    Kathryn Klarich

    08/10/2021, 9:13 PM
    has registering flows been slow for anyone else today? For me it seems to hang intermittently at this line
    Cloning <https://github.com/PrefectHQ/prefect.git> (to revision master) to /tmp/pip-install-2dvduvf6/prefect_9381dd189c7849aa9cd40f1a3fa2afae
    at various points today.
    n
    • 2
    • 2
  • b

    Ben Muller

    08/10/2021, 10:31 PM
    Hey guys, do you have any examples or do you even suggest using Prefect to manage ML pipelines. Currently we have a very naive pipeline that creates 100's of features in order. It takes an hour or so, and I am thinking of a way that each feature could be created in a parallel as a flow. Would you suggest this as a good use case for Prefect?
    k
    • 2
    • 3
  • n

    Nivi Mukka

    08/11/2021, 1:30 AM
    Hi Folks, I would like to know how Prefect reacts to
    sys.exit(1)
    . I have this line inside of a task and if the execution reaches this line, does it exit with
    1
    and mark the
    task
    as successful or failed? Or does it think the
    flow
    failed and try to restart the
    flow
    ?
    k
    s
    • 3
    • 6
  • s

    Sumit Kumar Rai

    08/11/2021, 4:28 AM
    Hello everyone, I see that a task as an arbitrary function can be skipped by raising SKIP signal.
    from prefect import task
    from prefect.tasks.shell import ShellTask
    from prefect.engine import signals
    
    shell_task_invoke = ShellTask()
    
    @task
    def skip_if_non_prod(env):
        if env == "PROD":
            return shell_task_invoke(command="ls")
        else:
            raise signals.RETRY()
    Can I skip subclasses of class Task i.e. ShellTask, DBTShellTask by passing parameter something like below?
    is_not_prod = False if env == "PROD" else True
    ShellTask(skip=is_not_prod)
    k
    a
    • 3
    • 7
  • y

    YD

    08/11/2021, 5:18 AM
    I have one flow importing modules from a different directory on a cloud Prefect.io I have something like:
    import os
    import sys
    
    PATH = 'Module path'
    sys,path.append(PATH)
    
    from <my module> import <my func>
    when doing a test function, without a Prefect flows, I can run it from the command line. when running the same function from a flow, I can run it when using
    flow.run()
    , but when registering it in the cloud Prefect and trying to run it, I get
    Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'<my module>\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    I am running a local agent on my machine,
    prefect agent local start
    I do not use Docker containers any suggestions ?
    k
    • 2
    • 1
Powered by Linen
Title
y

YD

08/11/2021, 5:18 AM
I have one flow importing modules from a different directory on a cloud Prefect.io I have something like:
import os
import sys

PATH = 'Module path'
sys,path.append(PATH)

from <my module> import <my func>
when doing a test function, without a Prefect flows, I can run it from the command line. when running the same function from a flow, I can run it when using
flow.run()
, but when registering it in the cloud Prefect and trying to run it, I get
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'<my module>\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
I am running a local agent on my machine,
prefect agent local start
I do not use Docker containers any suggestions ?
k

Kevin Kho

08/11/2021, 2:31 PM
Hey YD, most of the storage classes only keep the flow code and not the other files because of the way 
cloudpickle
 works. You need to package your dependencies into a Docker container and use Docker Storage so that the agent can pull the container with the dependencies. If you are using Local agent and LocalRun, there are two ways to do this. First, is you can start the agent in the directory where the imports will resolve correctly is so it has access to those files when it runs the flow. Second is that 
LocalRun
 takes in a 
working_dir
 where you can specify where the agent will run the flow from.
View count: 2