https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • x

    Xinyi Guo

    10/27/2021, 9:43 AM
    Hey all! I am new to prefect, recently want to run a scheduled prefect job, but seems its being killed from time to time by a prefect ZombieKiller task, with an error No heartbeat detected from the remote task; marking the run as failed. Any idea why this happens, and how to make sure my task always have a heartbeat could be detected by the prefect ZombieKiller? thank you.
    a
    • 2
    • 3
  • a

    Arun Giridharan

    10/27/2021, 1:13 PM
    Question: Is it possible for 2 parent flows to access the same child run id?
    k
    • 2
    • 7
  • s

    serhan

    10/27/2021, 1:22 PM
    Hey all! I am working on to integrate splunk with prefect agent. I need to know where all the LOGS being written, what is the path for the log files? Can anyone help?
    k
    a
    • 3
    • 2
  • e

    Eric Feldman

    10/27/2021, 4:02 PM
    Hey again 🙂 I’m trying to get the params that flow was run with in a state handler method but
    new_state.result
    keeps being an empty dict when running on prefect agent if i’m running the flow using
    flow.run
    I do see the results of the state
    a
    k
    • 3
    • 3
  • j

    Jason Boorn

    10/27/2021, 4:08 PM
    Quick question - if I have a set of tasks that don't depend on each other but that I want to run serially, what's the correct way to do that?
    a
    k
    • 3
    • 4
  • d

    Donny Flynn

    10/27/2021, 4:53 PM
    Hi! I want to automate a simple python process with Prefect, but I have a laptop I close to take in/out of the office. What would be the easiest online agent to run a scheduled flow? Would a GKE job to poll Prefect cloud as an agent be the best/easiest way? I was exploring Coiled, but I don't think you can set that as an agent, unless I'm missing something...
    a
    • 2
    • 19
  • i

    Imran Qureshi

    10/27/2021, 5:22 PM
    We're trying out the prefect cloud. How do folks handle pipelines in different environments (dev, staging, production)? do folks use teams, tenants, project etc for this?
    a
    k
    • 3
    • 6
  • v

    Vamsi Reddy

    10/27/2021, 6:14 PM
    Hi everyone, I am working on a flow where i need to spin up an emr cluster -> submit a step(using shell task) -> check the status of the step and read log files(using boto3)-> submit further steps based on the status/ fail the flow/step if error occurs in logs. Is there a way possible where i can change the state of a previous task to failed based on the next task?
    k
    • 2
    • 8
  • k

    Kevin Mullins

    10/27/2021, 6:43 PM
    I’m somewhat new to prefect. I’ve done simple mapped tasks but am curious if I can chain subsequent tasks to mapped tasks to perform a fan-out/fan-in type approach. Here is a diagram trying to visualize what I’d like to accomplish. Any guidance if this is possible and/or approaches would be greatly appreciated:
    k
    • 2
    • 12
  • j

    John Muehlhausen

    10/27/2021, 7:27 PM
    @task(log_stdout=True)
    # ...
    with Flow(..., executor=LocalDaskExecutor(scheduler="processes", num_workers=2)) as flow:
    I see stdout when using the threads scheduler, but not with processes ... prefect version is 0.14.19 ... running flow locally in Jupyter at this point, no agent ... I should also mention that using the threads scheduler causes subsequent runs of the notebook cell to do nothing, but it is no surprise to me that threads would be less stable than subprocesses whose end state is cleaned up by the OS
    k
    • 2
    • 22
  • c

    Christopher Chong Tau Teng

    10/28/2021, 3:17 AM
    Hi, I am trying to register a simple flow in a server that I run in a docker locally. I first built a docker image on top of
    prefecthq/prefect:latest
    image, which also copied my python script
    /src/test.py
    and ran
    prefect backend server
    . Then in the
    docker-compose.yaml
    generated by
    prefect server config
    , I added a service
    client
    whose command is
    python /src/test/py
    . After running
    docker-compose up
    , I got the following error from the
    client
    service:
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f24823ea0d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    Meanwhile, I can access UI and on UI it’s connected to Apollo endpoint successfully. It seems like
    client
    fails to connect to server at localhost:4200?
    k
    a
    • 3
    • 25
  • p

    Pierre Monico

    10/28/2021, 9:32 AM
    👂 Prefect VS Databricks, opinions? 👂 On one of my client’s projects I implemented all ETL pipelines using Prefect. I chose that technology because: • there are a lot of external dependencies (mostly self-developed API libs) • and because most of the business logic is so complex / needs to be abstracted, which would be hard to achieve in notebook/cloud environment My client now wants me to migrate everything to Databricks. I am trying to explain that they do not serve the exact same purpose and that in fact, both Airflow and Prefect have integrations with Databricks, but they don’t really get the point. They don’t have a tech background and their main argument is “harmonizing” the stack because they have 2-3 notebooks running in Databricks for a few months now (of very poor code quality btw). I am trying to justify my stack choice but it is difficult to explain in layman’s terms - what is your take on this? PS: also happy to hear any arguments that would go in favour of choosing Databricks over Prefect.
    t
    s
    a
    • 4
    • 19
  • w

    Will List

    10/28/2021, 10:57 AM
    Viewing a flow of flows in the Cloud UI, how do you see the log output for the tasks in the flows that are children? In the UI, it appears that the parent flow counts the child flows as tasks, and shows log output at the flow level (eg. 'this flow completed successfully'), but I can't see how to get to the next level down and see the logs of the actual tasks executed.
    a
    • 2
    • 4
  • a

    Ahmed Ezzat

    10/28/2021, 3:03 PM
    How to access/transform task returned data before sending it to another task i.e. Thanks in advance!
    with prefect.Flow('myflow') as flow:
        task_1 = my_task_1()
        task_2 = my_task_2(task_1['value'])
    m
    k
    • 3
    • 6
  • b

    bral

    10/28/2021, 3:43 PM
    Hi. When i install latest prefect in 3.9 python environment after command "prefect server start" , it download images with tags core_0.15.7. but inside this images python environment is not 3.9. so for 3.9 version should i download specific images ?
    m
    • 2
    • 6
  • c

    Constantino Schillebeeckx

    10/28/2021, 7:13 PM
    I have an on_failure callback setup for all my flows; is there anyway to get at the
    Exception
    that caused the failure from this callback?
    k
    • 2
    • 15
  • k

    Kelvin Malyar

    10/28/2021, 8:20 PM
    Hi, does anyone know if there are any good examples of integrating with great expectation checkpoints and also email beyond whats in the documentation
    k
    • 2
    • 2
  • s

    Steve s

    10/28/2021, 8:51 PM
    is there a way for a flow to access its own
    flow_id
    while running?
    k
    • 2
    • 3
  • h

    Harish

    10/28/2021, 9:09 PM
    Hi, Is it possible to access a flow run's logs through prefect python code?
    a
    k
    • 3
    • 9
  • j

    Joe

    10/28/2021, 9:22 PM
    Hello everyone, I'm using
    start_flow_run.map(...)
    to use the same flow, but iterate across some provided parameters. Unfortunately when I set the
    flow_name=unmapped('webfs')
    I get this error when attempting to run the flow: `ValueError: Received both
    flow_id
    and
    flow_name
    . Only one flow identifier can be passed.`
    ✅ 1
    m
    k
    • 3
    • 21
  • h

    Harish

    10/28/2021, 10:20 PM
    File "/home/prefect_user/venv/lib/python3.6/site-packages/prefect/cli/build_register.py", line 495, in build_and_register
        click.echo(f"  \u2514\u2500\u2500 ID: {flow_id}")
      File "/home/prefect_user/venv/lib/python3.6/site-packages/click/utils.py", line 272, in echo
        file.write(message)
    UnicodeEncodeError: 'latin-1' codec can't encode characters in position 2-4: ordinal not in range(256)
    Hi, when I do a register I get this weird error but it registers and all works fine. I'm on python 3.6.9. Has anyone encountered this before?
    k
    j
    • 3
    • 11
  • d

    davzucky

    10/29/2021, 5:12 AM
    Another Orion question I got orion working with docker compose (will share later), I have been able to deploy my workflow with schedule, however when the flow run if trigger from the UI or schedule this is the log I got
    data_loader-orion_agent-1  | Starting agent...
    data_loader-orion_agent-1  | Agent started! Checking for flow runs...
    data_loader-orion_agent-1  | 05:02:24.615 | Submitting flow run 'ddf9cb6b-231b-48e7-983a-fcaf1df850f7'
    data_loader-orion_agent-1  | 05:02:24.616 | Completed submission of flow run 'ddf9cb6b-231b-48e7-983a-fcaf1df850f7'
    data_loader-orion_agent-1  | 05:02:28.397 | Flow run 'ddf9cb6b-231b-48e7-983a-fcaf1df850f7' exited with exception: Abort('This run has already terminated.')
    and the job doesn't do anything. Connecting to the container and runner the command
    prefect deployment execute load-data/schedulesLoadData
    execute the workflow which finish with success
    05:04:56.465 | Creating run for flow 'load-data'...
    05:04:56.570 | Beginning flow run 'illustrious-nuthatch' for flow 'load-data'...
    05:04:56.571 | Starting executor SequentialExecutor...
    05:04:56.571 | Flow run 'illustrious-nuthatch' received parameters {'source_path': '/source', 'destination_path': '<s3://credit-risk/raw_data>', 'fs_type_source': 'file', 'fs_type_destination': 'file', 'fs_kwarg_source': {}, 'fs_kwarg_destination': {'client_kwargs': {'endpoint_url': '<http://minio:9000>', 'aws_access_key_id': 'admin', 'aws_secret_access_key': 'welcome123'}}}
    05:04:56.628 | Executing flow 'load-data' for flow run 'illustrious-nuthatch'...
    05:04:56.629 | Calling loadData('/source', '<s3://credit-risk/raw_data>', 'file', 'file', {}, {'client_kwargs': {'endpoint_url': 'http...)
    05:04:56.768 | Submitting task run 'ls_files-c965bedb-0' to executor...
    05:04:56.814 | Task run 'ls_files-c965bedb-0' received parameters {'fs_type_source': 'file', 'fs_kwarg_source': {}, 'base_path_source': '/source'}
    05:04:56.900 | Executing task 'ls_files' for task run 'ls_files-c965bedb-0'
    05:04:56.901 | Calling ls_files('file', {}, '/source')
    05:04:57.041 | Task run 'ls_files-c965bedb-0' finished in state Completed(message=None, type=COMPLETED, result=[{'name': '/source/sample_data.csv', 'size': 7564965, 'type': 'file', 'created': 1635483409.8989005, 'islink': False, 'mode': 33188, 'uid': 1000, 'gid': 1000, 'mtime': 1635483409.8989005}], task_run_id=c2d8299a-1586-4310-ae09-2a3aace3fb47)
    05:05:00.005 | Shutting down executor SequentialExecutor...
    05:05:00.089 | Flow run 'illustrious-nuthatch' finished in state Completed(message='All states completed.', type=COMPLETED, result=[Completed(message=None, type=COMPLETED, result=[{'name': '/source/sample_data.csv', 'size': 7564965, 'type': 'file', 'created': 1635483409.8989005, 'islink': False, 'mode': 33188, 'uid': 1000, 'gid': 1000, 'mtime': 1635483409.8989005}], task_run_id=c2d8299a-1586-4310-ae09-2a3aace3fb47)], flow_run_id=a37ea827-4854-49fd-a4c7-e2e6913514b1)
    Trying to get more information by setting the env variable PREFECT_DEBUG_MODE to true doesn't change anything. Do you have any input to try to get more information about what is happening on the agent ?
    m
    • 2
    • 5
  • w

    William Grim

    10/29/2021, 8:14 AM
    Hey all! I've setup
    PREFECT__LOGGING__FORMAT
    to be a JSON-style string, and I've set
    PREFECT__LOGGING__EXTRA_LOGGERS
    to capture logging from various libraries as well. The issue is the
    %(message)s
    strings that get sent to loggers often come back from libraries and things in a format that is not JSON-compatible. For example, they will have double-quotes in them that are not escaped, and I want to catch these so I can pass everything to
    json.JSONEncoder().encode(msg)
    first. Is there a way to do this? Even if I need to write a different "main" method that sets up an agent for me, I'm willing to do that. I just need to know how/where to setup the hooks. Much appreciated for any information in advance!!
    a
    • 2
    • 2
  • j

    JulienKWM

    10/29/2021, 8:17 AM
    Hello! I'm need to implement a workflow where each task must execute one after the other and use the result of the previous task The only way to do I have found so far is to it loop in the flow code, but this solution don't work with flow parameters
    @task()
    def sample_task(some):
        return some
    
    
    @task()
    def process(current, previous):
        pass
    
    
    with Flow("test") as flow:
        previous = None
        for n in range(180):
            item = sample_task(n)
            process(item, previous)
            previous = item
    Is there an other way to do this (inside a task maybe to handle flow parameters)? Thank you for your help !
    a
    • 2
    • 12
  • h

    haf

    10/29/2021, 8:47 AM
    Here's another noon question from me; can't get the Dask executor to pick up work from Prefect k8s Agent / cloud. Here's some output
    Using dask_endpoint=<tcp://dask-scheduler.flows.svc:8786>
    Registering flow with label=['prod'] image=example/data-pipelines:cb40d2797195791a3cf195fa1906a1722222222
    Registering... executor=DaskExecutor, run_config=KubernetesRun.
    From this code:
    if args.dask:
        print(f"Using dask_endpoint={args.dask_endpoint}")
        flow.executor = DaskExecutor(address=args.dask_endpoint)
    
    print(
        f"    executor={type(flow.executor).__name__}, run_config={type(flow.run_config).__name__}, result={type(flow.result).__name__}."
    )
    
    flow.register(
        project_name=args.project_name,
        build=args.build,
        idempotency_key=args.commit_ref,
        labels=args.labels,
        add_default_labels=False,
    )
    It just spawns
    prefect-job
    values...
    a
    • 2
    • 26
  • k

    Kevin Kho

    10/29/2021, 1:33 PM
    Hello everyone, the Prefect team joins the Anywhere League for trivia once a week. For this season (6 weeks long starting next week), we want to have a Prefect Community team. The schedule is flexible depending on who joins. Let me know if you want to join our community team!
    :marvin: 3
    :upvote: 3
    c
    • 2
    • 2
  • v

    Vamsi Reddy

    10/29/2021, 2:02 PM
    Hi everyone I am trying to use Parameter as an input to my flow. I want my parameter to be a dictionary. How can i access the dictionary? do i have to use dict(Parameter) to convert back to dict Type?
    k
    • 2
    • 8
  • a

    Arun Giridharan

    10/29/2021, 2:52 PM
    Running into this issue using child flows:
    get_task_run_result
    is giving a
    FlowRunView
    object as a result instead of the task result that I'm expecting.
    k
    • 2
    • 10
  • p

    Pedro Machado

    10/29/2021, 2:57 PM
    Hi there. I have a
    ShellTask
    that runs a python script. I am trying to get the logs to show in Prefect Cloud in real time. I am trying to use the PYTHONUNBUFFERED environment variable to force a to print output as it happens but it's not working. I also tried calling the script with the
    -u
    option. Any ideas?
    k
    • 2
    • 6
  • h

    haf

    10/29/2021, 3:01 PM
    I have a problem with the single-node job I'm running; it's chewing up lots of memory because it does all mapping sequentially starting with top-most tasks. A depth-first traversal of the DAG would be much more memory efficient for us since the "vertical" at the "same index" of the mapping has a full set of data in memory. Is there any way to make Prefect do a depth-first search instead?
    s
    k
    • 3
    • 4
Powered by Linen
Title
h

haf

10/29/2021, 3:01 PM
I have a problem with the single-node job I'm running; it's chewing up lots of memory because it does all mapping sequentially starting with top-most tasks. A depth-first traversal of the DAG would be much more memory efficient for us since the "vertical" at the "same index" of the mapping has a full set of data in memory. Is there any way to make Prefect do a depth-first search instead?
s

Sylvain Hazard

10/29/2021, 3:03 PM
IIRC both the
DaskExecutor
and the
LocalDaskExecutor
perform Depth First Execution if you chain mapped tasks.
:upvote: 2
k

Kevin Kho

10/29/2021, 3:03 PM
Using the DaskExecutor will prefer depth-first execution. But the LocalExecutor can’t be forced unless you rearchitect your flow somehow like using subflows or task looping
h

haf

10/29/2021, 3:04 PM
right, ok so back to https://prefect-community.slack.com/archives/CL09KU1K7/p1635497245106100 for me then
s

Sylvain Hazard

10/29/2021, 3:10 PM
You could try using a
LocalDaskExecutor
, I have found it slightly easier to set up.
View count: 2