https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    R Zo

    02/25/2022, 2:48 AM
    post 3) This is related to a post 1 above, will it be possible to have prefect/dask setup to run jobs on two different python environments, one running tensorflow with perhaps older packages and the other running a newer environment.
    k
    • 2
    • 5
  • b

    Ben Muller

    02/25/2022, 3:52 AM
    Hey, I am running a flow locally and have another flow deployed in the cloud. I loop over something and create multiple sub flows.
    create_flow_run.run(
                flow_name="advanced_cleaning",
                project_name="modelling",
                run_name=f"data_update={data_update}",
                parameters=dict(data_update=data_update),
            )
    When I run this it is called multiple times, but only one sometimes two flows are triggered in the cloud, is this a possible bug? Do run names need to be unique? This is being called from within a task fyi
    k
    • 2
    • 20
  • t

    Tomer Cagan

    02/25/2022, 8:57 AM
    Is it possible to run dask code within prefect task? Is there a way to get submit work to the dask cluster that runs my flow (KubeCluster to LocalDaskCluster)? I am looking for a way to make my core functionality rely on dask and still be able to leverage the orchestration through prefect...
    a
    k
    m
    • 4
    • 20
  • a

    Aditi Tambi

    02/25/2022, 10:22 AM
    Hi I am new to this , I am using python, after initialising a prefect flow , we use flow.run() to start the flow . I would like to know any command which can terminate this flow . Did not find any such thing on prefect doc.
    a
    • 2
    • 12
  • d

    datamongus

    02/25/2022, 12:47 PM
    Hi everyone, I have a question regarding Prefect Core ~ Prefect Orion. Is it possible to use Prefect Core’s tasks inside Prefect Orion ? Meaning can I use the DBT Task library for core in Orion.
    a
    • 2
    • 4
  • d

    Donnchadh McAuliffe

    02/25/2022, 1:14 PM
    I have a general questions about updating tasks in a flow: say you deploy a flow with an interval schedule, you realise after a few flow runs that there is a bug in one of the tasks, you make a fix for this, is there a way to update an already deployed flow with this fix or do you need to delete the deployed flow and deploy again with the new code?
    :discourse: 1
    a
    • 2
    • 6
  • m

    Matthias

    02/25/2022, 1:34 PM
    I am trying to replicate running this flow on my own laptop with Docker Desktop. I have created a docker image (but did not push it to e.g. DockerHub) and registered the flow just like in the script to Prefect Cloud. Started a docker agent and did a quick run from the UI. So far so good everything works. But, when I modify the Dockerfile so that it can run as non-root, I can no longer do a successful run. I get the following error from the agent:
    docker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.41/containers/f30a3e9241fab0d272c3f36eae16867487fb187b964c8b3f22bc8fd05d2aa4d0/json>: Not Found ("No such container: f30a3e9241fab0d272c3f36eae16867487fb187b964c8b3f22bc8fd05d2aa4d0")
    and the flow is stuck in submitted state. Does anyone knows how to fix it? Just to make sure, the only difference between the successful run and the run stuck in submitted state is the fact that I added a non-root user to the image. https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_no_build/docker_script_docker_run_local_image.py
    ✅ 1
    a
    • 2
    • 9
  • l

    Luis Aguirre

    02/25/2022, 1:42 PM
    I'm trying to run a simple flow on a jupyter notebook (my goal is to run flows on a jupyter kernel) and getting some errors
    a
    • 2
    • 31
  • h

    Henrietta Salonen

    02/25/2022, 2:31 PM
    Hi! I’m trying to find examples of how to use the
    Data
    argument correctly for Prefect’s
    PostgresExecute
    task together with the
    Query
    argument. I have two different dataframes that I would like to insert into Postgres tables. This is what I have now but unsure of how would I pass the data argument here
    ```@task
    def make_query(schema_name, table_name, columns):
    return f'''
    CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ({columns});
    INSERT INTO {schema_name}.{table_name} ({columns}) VALUES ();'
    '''
    with Flow("postgres_test") as flow:
    execute_task = PostgresExecute(db_name=database, user=user, host=host, port=port, commit=True)
    table_name = Parameter("table_name", default=[df1.name, df2.name])
    columns = Parameter("columns", default=[df1_columns, df2_columns])
    query = make_query.map(unmapped(schema_name), table_name, columns)
    execute_task.map(password=unmapped(postgres_pwd), query=query)
    if name == "__main__":
    flow.run()```
    k
    • 2
    • 2
  • l

    Leo Kacenjar

    02/25/2022, 3:48 PM
    Hi there, we are trying to use GitLab storage and the DockerRun flow run. Our Docker image does not contain our gitlab codebase and we'd like to keep it that way. We've almost got everything working, but I'm running into PYTHONPATH issues. I see some similar threads, but none addressing the code outside the image use case. Does anyone have a GitLab Storage/DockerRun combo example? What is the path where the Gitlab repo get exposed to the Docker image? Thanks!
    k
    • 2
    • 2
  • l

    Luke Segars

    02/25/2022, 6:32 PM
    hi all, i'm working on a flow with a series of mapped tasks -- mapped task 1 feeds results into mapped task 2. i'm observing a strange behavior where: • all child tasks of mapped task 1 complete successfully • all child tasks of mapped task 2 are triggered and complete successfully • BUT for some reason, prefect appears to re-trigger the (already completed) child tasks of mapped task 1 once all have completed. these end up failing (no heartbeat detected, presumably because the dask workers are released b/c the task completed) technically the pipeline completes everything, but all of the children of mapped task 1 are marked as failures once bullet 3 hits ^^^ i'm running this in a gke autopilot cluster...working on testing in another environment now any ideas on why prefect would even try to re-run a successful child task?
    k
    • 2
    • 9
  • a

    Austen Bouza

    02/25/2022, 7:42 PM
    Hello, I’m seeing an unusual error when submitting flows to ECS. My flows have been running without issue for a long time, but suddenly I’m getting a
    prefect.exceptions.AuthorizationError: Malformed response received from Cloud - please ensure that you are authenticated.
    message. The auth token I am using has not changed and has not expired. The response is failing on a
    requests.exceptions.JSONDecodeError: [Errno Expecting value]
    error with the response body
    <html><head><meta charset="utf-8"><script>C9xx.T9xx=T9xx;C9xx.g8=window;;n0II(C9xx.g8);T9TT(C9xx.g8);C9xx.D9S=(function(){var Z9S=2;for(;Z9S !== 1;){switch(Z9S)
    …and a ton more obfuscated javascript. Is this a typical response body from Prefect Cloud when not authenticated?
    k
    • 2
    • 3
  • m

    Max Lei

    02/25/2022, 8:54 PM
    If we want to use
    DaskExecutor
    with
    dask_cloudprovider.aws.FargateCluster
    , how do we assign a task-definition to the cluster?
    k
    a
    • 3
    • 2
  • i

    iñigo

    02/25/2022, 8:57 PM
    Hello community I've developed some flows and some of them use the same (made by me) task. I'd like to know if there are some best practices while using a same task over some flows and not repeating in each flow. Thanks in advance
    k
    • 2
    • 2
  • d

    Daniel Komisar

    02/25/2022, 9:14 PM
    Hello all, when using the dask executor is there any way to control what prefect writes to the console log? We are getting a line in there per prefect log message sent. We send a lot of log messages and it is causing performance problems. Is there any way to turn that off so that the worker doesn’t have to spend the overhead of interleaving lines?
    k
    • 2
    • 9
  • j

    Josh

    02/25/2022, 9:31 PM
    Is it possible to map over kwargs as asked in this Q&A? https://github.com/PrefectHQ/prefect/discussions/5014 Context: I’m trying to create a cross-product of multiple lists to map over. I want to operate on the downstream values by taking them as kwargs. Inspiration from this question answered by @Kevin Kho https://prefect-community.slack.com/archives/CL09KU1K7/p1645196983284599
    from prefect import Flow, task
    from pprint import pformat
    
    
    @task
    def numbers():
        return [1, 2]
    
    
    @task
    def letters():
        return ["a", "b", "c"]
    
    
    @task
    def cross_product(numbers, letters):
        res = []
        for letter in letters:
            for num in numbers:
                res.append({"num": num, "letter": letter})
        return res
    
    
    @task
    def pretty_print(**kwargs):
        print(pformat(kwargs))
    
    
    with Flow("...") as flow:
        n = numbers()
        l = letters()
        map_items = cross_product(n, l)
        pretty_print.map(map_items)
    
    flow.run()
    I get the following error:
    Traceback (most recent call last):
      File "flow.py", line 33, in <module>
        pretty_print.map(map_items)
      File "prefect/core/task.py", line 760, in map
        *args, mapped=True, upstream_tasks=upstream_tasks, flow=flow, **kwargs
      File "prefect/core/task.py", line 674, in bind
        callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
      File "inspect.py", line 3015, in bind
        return args[0]._bind(args[1:], kwargs)
      File "inspect.py", line 2942, in _bind
        'too many positional arguments') from None
    TypeError: too many positional arguments
    k
    • 2
    • 4
  • k

    kiran

    02/25/2022, 9:33 PM
    Does anyone know how to mention
    @channel
    or
    @here
    in the
    SlackTask
    message? Right now I can tag myself using my user ID but it doesn’t work when I try it with my channel ID or
    @channel
    or
    <@channel>
    ... 
           elif new_state.is_failed():
                data_engineer_user_id = Secret("DATA_ENGINEER_USER_ID").get()
                msg = f"<@{data_engineer_user_id}> — {obj} ({flow_run_name}): {new_state}"
                SlackTask(message=msg, webhook_secret="SLACK_WEBHOOK_URL").run()
    k
    • 2
    • 5
  • d

    Dexter Antonio

    02/25/2022, 9:52 PM
    I’m having trouble using python’s
    client.graphql
    . This is the graphql query that I want to use. It works on the prefect cloud’s interactive api;
    query {
      flow(where: {name: {_eq: "autoscorer"}, version:{_eq:10}}) {
        id
        name
        version
        tasks{
          name
          id
        }
      }}
    This is my attempt to do it with the
    client.graphql
    .
    client.graphql(
    {
        'query': {
            'flow(where: {name: {_eq: "autoscorer"}, version:{_eq:10}})': {
                #'id',
                #'name',
                #'version',
                'tasks': {'task_runs': {'id', 'name'}},
            },
        }
    
    })
    This works, but if I uncomment the commented fields (e.g.
    id
    ,) I get a syntax error. What is the proper syntax to indicate a root field? Is there any documentation how to use this API in more detail? Is it based off of a common Python GraphQL library?
    k
    • 2
    • 2
  • a

    Aqib Fayyaz

    02/26/2022, 7:47 AM
    Do we have some working example for docker sidecar on kubernetes on github?
    a
    k
    • 3
    • 17
  • i

    iñigo

    02/26/2022, 9:50 AM
    Hello, How is possible that A flow.run() works and a run from the server doesn't. I get the next message.
    Unexpected error: PicklingError('Could not pickle object as excessively deep recursion required.')
    Traceback (most recent call last):
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
        return Pickler.dump(self, obj)
    RecursionError: maximum recursion depth exceeded while pickling an object
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 909, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/results/local_result.py", line 115, in write
        value = self.serializer.serialize(new.value)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/engine/serializers.py", line 73, in serialize
        return cloudpickle.dumps(value)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 570, in dump
        raise pickle.PicklingError(msg) from e
    _pickle.PicklingError: Could not pickle object as excessively deep recursion required.
    After some testing I also get a problem while publishing my script with flow.register:
    Traceback (most recent call last):
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
        return Pickler.dump(self, obj)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 639, in reducer_override
        if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj):  # noqa  # pragma: no branch
    RecursionError: maximum recursion depth exceeded in comparison
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/home/inigo/project/scripts/es_am_08_caetano_to_s4e/caetano_to_s4e_flow.py", line 65, in <module>
        flow.register(project_name='S4E')
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/core/flow.py", line 1727, in register
        registered_flow = client.register(
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/client/client.py", line 1127, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/core/flow.py", line 1489, in serialize
        self.storage.add_flow(self)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/storage/local.py", line 143, in add_flow
        f.write(flow_to_bytes_pickle(flow))
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/prefect/utilities/storage.py", line 177, in flow_to_bytes_pickle
        cloudpickle.dumps(flow, protocol=4), newline=False
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/home/inigo/project/scripts/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 570, in dump
        raise pickle.PicklingError(msg) from e
    _pickle.PicklingError: Could not pickle object as excessively deep recursion required.
    a
    • 2
    • 7
  • i

    iñigo

    02/26/2022, 9:50 AM
    Thanks in advance
  • h

    Hui Huang

    02/26/2022, 6:53 PM
    Hi, Prefect experts. 😄 We are bootstrapping our analytics stack. I was evaluating different modern solutions. We have production data in SQL database. We want to work on some analytics cleanup & transformation and ship to PowerBI. Based on the research I have done, it looks to me the viable solution for us will be dbt + warehouse(snowflake) + visualization(power bi). If that is the case, it sounds like we do not need prefect to orchestrate python based tasks. Do you have case study that is applying prefect to analytics pipeline achitecture? Hope someone can give me some direction.
    a
    • 2
    • 2
  • s

    Slackbot

    02/26/2022, 9:25 PM
    This message was deleted.
    k
    • 2
    • 1
  • d

    David Michael Carter

    02/26/2022, 10:59 PM
    https://docs.prefect.io/core/idioms/task-run-names.html I am having trouble refactoring the example in the documentation above for my needs. I am trying to name tasks based on the mapped children in Prefect Cloud, but I am not mapping a list. I am mapping a dictionary, where the desired
    task_run_name
    would be a value based on a specific key for that mapped dict index. Anyone have experience naming tasks based on a mapped dictionary?
    k
    • 2
    • 13
  • a

    Ayah Safeen

    02/27/2022, 6:41 AM
    Hello All.. I was checking the new open-source engine "Orion", but when I run the code I get the following error on the import statement
    from prefect import flow
    cannot import name 'ThreadedChildWatcher' from 'asyncio'
    The code :
    from prefect import flow
    @flow
    def common_flow(config: dict):
        print("I am a subgraph that shows up in lots of places!")
        intermediate_result = 42
        return intermediate_result
    
    @flow
    def main_flow():
    
        data = common_flow(config={})
    
    # run the flow
    flow_state = main_flow()
    :discourse: 1
    a
    • 2
    • 5
  • h

    Hedgar

    02/27/2022, 11:03 AM
    I'm glad I didn't give up on myself, I think I have started wrapping my head around this prefect thing anyway thanks to https://medium.com/the-prefect-blog/getting-started-with-prefect-flows-98e8881ffaf9 • I have signed up to prefect cloud • Changed backend to cloud • Registered my project and flow • Done
    prefect agent local start
    • Was also able to click the
    quick run
    and I got a beautiful green bar of success! However I have a challenge: My AWS ec2 with the help of lambda function get started at certain time of the day and shuts down at certain time of the day. How do I ensure that my flow runs when AWS ec2 starts for the day? Do I need to create a bash script that start the local agent via a crontab on the remote instance?
    a
    a
    • 3
    • 5
  • l

    Liran Cohen

    02/27/2022, 1:49 PM
    Hi all, I'm trying to learn Prefect, and I get an error after installation via pip. ModuleNotFoundError: No module named 'prefect'
  • l

    Liran Cohen

    02/27/2022, 1:53 PM
    Hi all, I'm trying to learn Prefect, and I get an error after installation via pip. when I run import prefect I get:
    ModuleNotFoundError: No module named 'prefect'
    the package seems to be installed cause when I do pip show prefect I get:
    Name: prefect
    Version: 1.0.0
    Summary: The Prefect Core automation and scheduling engine.
    I checked few times and the environment where I try to import is the same where I run pip show any suggestions on why that might happen? thanks
    a
    • 2
    • 1
  • a

    Adi Gandra

    02/27/2022, 4:55 PM
    Hey, i was trying to figure out is it possible for me to setup a task that if it fails - I can have it retry 3 times (i figured this part out). But, if it still fails I want to kick off another task so the rest of the flow can continue. So basically i’m trying to swap my task if it fails to another task as a backup, so my entire flow can continue on
    a
    • 2
    • 10
  • a

    Adi Gandra

    02/27/2022, 9:07 PM
    I have my flow setup on Kubernetes (EKS), and everything has been working great. However, sometimes I randomly get “No heartbeat detected from the remote task; marking the run as failed.” and it kills my final task. Some days it works fine, other days the task doesn’t finish and is marked as failed with this error. Any idea’s on this? Or ways to debug what is going on?
    a
    • 2
    • 1
Powered by Linen
Title
a

Adi Gandra

02/27/2022, 9:07 PM
I have my flow setup on Kubernetes (EKS), and everything has been working great. However, sometimes I randomly get “No heartbeat detected from the remote task; marking the run as failed.” and it kills my final task. Some days it works fine, other days the task doesn’t finish and is marked as failed with this error. Any idea’s on this? Or ways to debug what is going on?
a

Anna Geller

02/27/2022, 9:47 PM
Hi Adi, this is a common issue. This thread discusses the problem more and shows some solutions you may try.
View count: 5