https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • i

    Ian Singer

    01/06/2022, 3:39 PM
    Hey Everyone. We are running a ShellTask that just calls another python file in the directory to execute. This works when running the task, but when we register the flow and run from the cloud the ShellTask fails. The python code itself is importing a module that needs to authenticate to azure, and that is where the cloud failure comes, however we use the same dependecy in other flows with out the ShellTask and the authentication is fine. Any Ideas?
    k
    24 replies · 2 participants
  • e

    Eric Richard

    01/06/2022, 3:45 PM
    💃 Just arrived!
    👋 4
    🕺 1
    k
    j
    2 replies · 3 participants
  • c

    Carrie Bohinc

    01/06/2022, 4:33 PM
    Hello everyone, I have a question on what counts as a "user" in the 3 users that are allowed in the Standard tier. The pricing page indicates that you can provision read only accounts for reporting on this tier so my question is are the 3 users actual developers and then you can provision additional read only accounts beyond that?
    k
    2 replies · 2 participants
  • v

    Vaibhav Shetye

    01/06/2022, 5:55 PM
    How does perfect server decide which agent should run a flow if my flow label is a subset and matches multiple agent?
    k
    4 replies · 2 participants
  • j

    Jason Noxon

    01/06/2022, 6:57 PM
    Hello, Community! I installed the Prefect App from the Azure Marketplace, connected it to Prefect Cloud - no issues. I cannot, however, connect prefect to the cluster because there is no external IP! Well, I think that's the reason - not sure. Any docs anywhere about this?
    k
    9 replies · 2 participants
  • g

    Gui Pires

    01/06/2022, 7:24 PM
    Hi all. I'm new to Prefect. Trying to use it to process a large dataset of 3d images. It works nicely for a small subset of the dataset, but when I try it on the whole dataset I find that "Unmanaged memory" (as per dask's dashboard) accumulates in the dask-workers and they end up OOM'ing. In broad terms, the flow does the following: • read in each image and apply a couple of simple transformations (cropping, aligning, ...) • compute some global stats across the whole dataset • use the global stats as parameters to further process the images so "map - reduce - map" I'm trying to avoid IO ops (and maybe I just shouldn't), so I do expect dask's memory to increase as it stores intermediate results, but I would assume that to be "Managed memory". • Could it be that this is just Prefect storing intermediate results? And dask doesn't "know" about them? • Is it possible I'm messing something up in terms of how I'm programming the workflow, leading to unnecessary serialization of large objects? (I do get some messages about this). I tried to use prefect's constructs everywhere to avoid messing this up. ◦ e.g.: a prefect task reads a file with a list of paths, a read-file task is mapped to this list, and so on. so I don't see where I could be passing large objects to be serialized • I also tried manually garbage collecting inside the prefect task where memory seems to be accumulating any ideas on how to further debug this? thanks in advance!!
    k
    14 replies · 2 participants
  • i

    Isaac Brodsky

    01/06/2022, 7:36 PM
    I'm getting some odd error about a Dask future my job awaits being attached to a different loop:
    RuntimeError: Task <Task pending name='Task-2051' coro=<FutureState.wait() running at /usr/local/lib/python3.8/site-packages/distributed/client.py:482> cb=[WaitIterator._done_callback()]> got Future <Future pending> attached to a different loop
    As far as I can tell I did not change anything about how work was submitted to Dask so I am wondering if this is some intermittent issue about where the Prefect task is running? My Prefect task is wrapped in
    with worker_client()
    k
    16 replies · 2 participants
  • d

    Danny Vilela

    01/06/2022, 8:24 PM
    Hi all — I have a dynamic task
    Foo
    that maps over a task input (
    list
    ) with 3 values. Sometimes, though, some of those values aren’t applicable (for example, if it’s a requested date that doesn’t exist in our database), so I thought of raising
    prefect.engine.signals.SKIP
    to note that (1) this dynamic task input doesn’t apply but also that (2) it wasn’t a failure, per se. That said, I’m noticing that a task
    Bar
    that is directly downstream of
    Foo
    is also skipping because when
    Foo
    skips any of its mapped tasks. It seems this is intended, but is there a trigger I should raise to note that “it’s fine if any number of these mapped tasks fails”?
    Bar
    has other upstream tasks but I wouldn’t want those to be considered. Does
    skip_on_upstream_skip
    apply here? Should I configure
    Bar
    such that
    skip_on_upstream_skip=False
    ? From the docs here.
    j
    k
    4 replies · 3 participants
  • j

    John Jacoby

    01/06/2022, 8:48 PM
    Hi all. I'm struggling to get my task to properly return multiple values. I specify 'nout=2' in the decorator but I still get the 'Task is not iterable' TypeError. This is the fairly simple flow:
    with Flow(constants['name'], result=result) as flow:
    k
    j
    8 replies · 3 participants
  • j

    John Jacoby

    01/06/2022, 8:49 PM
    participant_ids, scan_ids, visit_dates = get_study_data() bourget_path, BIDS_path = bourget2bids.map(participant_ids, scan_ids, unmapped(constants)) copy_DICOMS.map(scan_ids, bourget_path, BIDS_path, unmapped(constants))
  • j

    John Jacoby

    01/06/2022, 8:49 PM
    And the also fairly simple task:
    @task(target='{task_name}/{scan_id}', checkpoint=True, nout=2)
    def bourget2bids(participant_id: str, scan_id: str, study_constants): tmp_BIDS_dir, bourget_path = get_from_bourget(participant_id, scan_id, study_constants['name'], study_constants['scripts_dir']) add_ASL_metadata(scan_id, study_constants['name']) BIDS_path = copy_to_study_data_folder(participant_id, scan_id, tmp_BIDS_dir, study_constants['name'], study_constants['data_dir']) return bourget_path, BIDS_path
  • j

    John Jacoby

    01/06/2022, 8:49 PM
    Sorry, I couldn't seem to get that to format properly
  • j

    John Jacoby

    01/06/2022, 8:49 PM
    Thanks for any insight anyone can provide!
  • j

    John Jacoby

    01/06/2022, 8:50 PM
    The error gets thrown at the 'bourget2bids' line
  • d

    Danny Vilela

    01/06/2022, 8:54 PM
    (Somewhat related to my question above, but I’ll keep it separate) If a mapped task raises
    prefect.engine.signals.SKIP
    , does it need to return a
    SKIP
    as its result? Can I configure the (mapped or mapper) task such that it only returns a result (say, a string) if the task did not skip? Or [does/should] it have to return a
    SKIP
    ?
    k
    7 replies · 2 participants
  • k

    KhTan

    01/06/2022, 9:40 PM
    Hi! I intend to get Prefect to send alert only when a flow failed, but it’s sending email on every run, what have I done wrongly here (code attached)? If I also intend to get an alert when output is above certain value (output>2), should I use state_handler or something else? Thank you!!!
    k
    9 replies · 2 participants
  • k

    KhTan

    01/06/2022, 11:58 PM
    Hi! Have never run into issue with Prefect’s monitoring but it’s showing this forever loading state with a few error msg in the console. What might have caused it? Thanks in advance.
    k
    9 replies · 2 participants
  • e

    Enda Peng

    01/07/2022, 2:40 AM
    If I run two local agent with the same token and label (without local host name label), I will only see one agent on the dashboard. I have a question about this, when a flow comes, will prefect randomly assign to one agent?
    k
    2 replies · 2 participants
  • s

    Shivam Bhatia

    01/07/2022, 10:07 AM
    If I update my flow environment on dockerhub and there is no change in the flow file (the one in github storage) Do I still need to re-register all my flows?
    a
    2 replies · 2 participants
  • g

    Gabriel Milan

    01/07/2022, 12:54 PM
    Hello there! I'm having some issues with my Istio-injected kubernetes jobs: I've deployed the Prefect Helm chart into an Istio-injected namespace in one cluster (say cluster-one) with agent (agent-one) enabled. I've also deployed another Kubernetes agent (agent-two) in another cluster (say cluster-two; also in an Istio-injected namespace which shares the same Istio Service Mesh as cluster-one). Both agent-one and agent-two can successfully register with the apollo server (deployed on cluster-one) and query for runs. Unfortunately, when jobs are launched (either by agent-one or agent-two), I get the following exception:
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='prefect-apollo.prefect', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7efe788176d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    But the weird thing is that if I open a terminal inside the same job pod and try to
    curl prefect-apollo.prefect:4200
    , I can successfully get an answer from the apollo server. Has anyone had anything similar before? Or tried Istio with Prefect on Kubernetes?
    a
    21 replies · 2 participants
  • d

    Dave

    01/07/2022, 2:21 PM
    Hi guys, We are experiencing problems with the Kubernetes agent and deploying flows, due to timeout for k8s agent client. Its related to https://github.com/PrefectHQ/prefect/pull/5066. We are still encountering this error which means that the affected flows ends up being delayed approximate 15 minutes, which is rather critical in our use case and not a viable solution. I moved the trace log into the thread with some more info. Hope somebody has some ideas 🙏
    a
    k
    +1
    12 replies · 4 participants
  • j

    Jason Motley

    01/07/2022, 4:13 PM
    Can I set up a case statement to check whether it is the weekend or a weekday? https://docs.prefect.io/core/idioms/conditional.html . I want to do something like with case(condition, 6 or 7): with the numbers referring to a check_condition that returns the day of the week.
    k
    6 replies · 2 participants
  • y

    Yusuf Khan

    01/07/2022, 5:25 PM
    Hey guys, just a general best practice question, I'm still very early in my learning journey with Prefect. What I'm seeing in the documentation is largely tasks as functions. And in the case of passing data between tasks, that's usually done by assigning a variable the value of the output of a task, and then passing that variable as an argument to another task. What's the guidance on doing this in an object-oriented way? Can tasks be methods? and then the methods can simply access values through
    self
    ?
    k
    2 replies · 2 participants
  • d

    Danny Vilela

    01/07/2022, 9:06 PM
    Hi — is it valid to pass a
    mapped(flatten(nested_list_data))
    to a task?
    k
    4 replies · 2 participants
  • g

    Greg Adams

    01/07/2022, 9:14 PM
    I’m trying to run some sub-flows through a parent flow using the k8s agent. Does anyone know if there’s a way to get the flows to run in parallel, if appropriate? (looks like they’re still running one at a time, even if they’re parallel)
    k
    1 reply · 2 participants
  • a

    Ashton

    01/08/2022, 1:36 AM
    hey y'all, is there an automatic way to get notified when a Flow fails?
    k
    3 replies · 2 participants
  • s

    Stephen Herron

    01/08/2022, 7:00 AM
    hi all My flow looks something like this:
    with Flow(FLOW_NAME) as flow:
    
        event_dates = get_event_dates()
    
        s3_keys = generate_s3_key.map(suffix=event_dates)
        event_file_data = unload_data_to_s3.map(s3_keys, event_dates)
    
        update_log = update_log.map(event_dates, upstream_tasks=[event_file_data])
    
        update_snowflake = update_snowflake.map(s3_keys, event_dates, upstream_tasks=[update_log])
    The problem is when I schedule this in cloud (local agent) even though all the mapped tasks complete the run doesn’t seem to terminate. Am I missing something?
    a
    3 replies · 2 participants
  • a

    Aqib Fayyaz

    01/08/2022, 1:54 PM
    Hi i have flow on github and prefect agent running on gke and have docker file for storing all the custom modules which eventually goes to gcr. Things were working fine but now need to install pyspark in docker file. i have included it the same way we were doing it for other docker files (we have already pyspark included in other docker file for the project and that works) but now when i try to include it in current docker file and builds it using cloudbuild the build fails saying
    Unable to locate package openjdk-8-jdk
    . Is the issue is because of base image, for other docker files where spark run we have ubuntu 20.04 as base image but for prefect we have prefect as base image. Below is the docker file
    FROM prefecthq/prefect:0.15.6-python3.8
    # for spark
    ENV JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"
    ENV SPARK_HOME="/spark/spark-3.1.2-bin-hadoop3.2/"
    ENV PYTHONPATH="/spark/spark-3.1.2-bin-hadoop3.2/python:$PYTHONPATH"
    ENV PYSPARK_PYTHON="python3"
    ENV PATH="$PATH:/spark/spark-3.1.2-bin-hadoop3.2/bin"
    ENV PATH="$PATH:$JAVA_HOME"
    ENV PATH="$PATH:$JAVA_HOME/bin"
    ENV PATH="$PATH:$JAVA_HOME/jre/bin"
    ENV SPARK_LOCAL_IP="127.0.0.1"
    WORKDIR /
    COPY . /
    RUN apt-get update && \
    apt-get install -y  \
    openjdk-8-jdk  \
    python3-pip
    ADD <https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz> spark.tgz
    RUN mkdir -p spark && \
    tar -zxvf spark.tgz -C spark/ && \
    rm spark.tgz
    # for prefect
    RUN pip install feast feast-postgres sqlalchemy google-auth scikit-learn
    RUN pip install feast[gcp]
    RUN pip install --upgrade google-cloud
    RUN pip install --upgrade google-cloud-bigquery
    RUN pip install --upgrade google-cloud-storage
    WORKDIR /opt/prefect
    COPY flow_utilities/ /opt/prefect/flow_utilities/
    COPY flow_utilities_bigQ_Datastore/ /opt/prefect/flow_utilities_bigQ_Datastore/
    COPY setup.py /opt/prefect/setup.py
    COPY .feastignore /opt/prefect/.feastignore
    RUN pip install .
    a
    o
    9 replies · 3 participants
  • v

    vawati

    01/08/2022, 5:47 PM
    I have a task that takes a single argument: a
    date
    (think a task that downloads data corresponding to a date). I would like to have this task run once a day for all days, starting from a date in the past, lets call it
    start_time
    . How do I achieve this with Prefect? The closes I've found is this: https://github.com/PrefectHQ/prefect/issues/1179
    current_time = Parameter("timestamp", default=None)
    
    @prefect.task
    def do_something_time_specific(current_time):
        if current_time is None:
            current_time = prefect.context.get("scheduled_start_time")
        if isinstance(current_time, str):
            current_time = pendulum.parse(current_time)
        # does something dealing with time
    In words: the task depends on a
    date
    but if a date isn't passed, it uses the
    scheduled_start_time
    . This seems to have a problem. Lets suppose it's now
    2022-01-01
    and Prefect has scheduled tasks for
    2022-01-02
    ,
    2022-01-03
    , etc... Now my computer is off for a few days and I turn it on again on
    2022-01-04
    . What happened to the runs of those two dates which were jumped? Do they both run on
    2022-01-04
    but with
    scheduled_start_time
    in
    2022-01-02
    ,
    2022-01-03
    ? If that's the case, this solution does the right thing. Or do they both run on
    2022-01-04
    but the
    scheduled_start_time
    is
    2022-01-04
    for both? (in which case this solution doesn't work)
    k
    a
    5 replies · 3 participants
  • m

    Mike Lev

    01/09/2022, 2:47 PM
    hey all … what integrations can we plug in for data versioning in ML pipelines built with prefect?
    k
    a
    2 replies · 3 participants
Powered by Linen
Title
m

Mike Lev

01/09/2022, 2:47 PM
hey all … what integrations can we plug in for data versioning in ML pipelines built with prefect?
k

Kevin Kho

01/09/2022, 3:00 PM
There is no task in the task library, but MLFlow comes to mind. Maybe Kedro?
a

Andrew Black

01/09/2022, 3:13 PM
@Mike Lev I lead partnerships/integrations at Prefect, please tell me what tools you're interested in using and which you decide on. We’ll provide whatever support we can.
View count: 2