https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    Daniel Manson

    03/20/2020, 7:37 PM
    Hi all I came across Prefect for the first time yesterday and have read most of the documentation (I think?) and had a little bit of a play with it. We are looking at migrating a bunch of ETL stuff from ad-hoc bash scripts to a “proper” workflow tool like Prefect (it certainly seems nicer than airflow). Before I dive in and start doing everything wrong, are there any examples/talks I could refer to in terms of real-world usage? As an example of the sorts of things I plan on implementing, the first thing I want to try is downloading an 8GB file over SFTP, unzipping it and importing the resulting files into postgres (to be clear, this is one piece of a much larger hypothetical DAG). How should I architect this, e.g. should i try and do this whole thing in a single task, or split it up? If I split it up should I pass 8GB of data (+more once unzipped) through the return values? If I am to use Docker/k8s how best to draw lines around things? etc. thanks!
    c
    • 2
    • 18
  • a

    afausse

    03/22/2020, 5:54 PM
    Hi All, I need to trigger a workflow when a message arrives (on an imap server). Is it possible to trigger a workflow based on message arrival ?
    j
    • 2
    • 4
  • v

    Venkat

    03/23/2020, 6:19 AM
    @here - Any advice on how to pass results across flow runs? I have a flow that runs on a schedule, and a task in the flow that returns a particular value (in this case, a
    pandas.DataFrame
    ). For every (second and subsequent) run of the flow, I need to retrieve the previous dataFrame, in order to do a merge/diff operation with the one generated in this run. Is this possible? I read up all the docs on States and ResultHandlers, but I dont see anything that persists across flow runs.
    j
    m
    • 3
    • 11
  • v

    Venkat

    03/23/2020, 6:19 AM
    BTW, I love this thing - super simple, and I'm only scratching the surface!
    🙂 2
    :marvin: 2
  • b

    bardovv

    03/23/2020, 7:07 AM
    How to have a workflow within a workflow in prefect?
    j
    • 2
    • 6
  • b

    bardovv

    03/23/2020, 7:34 AM
    which API format do you'll use? GraphQL vs REST?
    j
    • 2
    • 1
  • b

    bardovv

    03/23/2020, 7:56 AM
    I wanted to see an example of parallel workflow for prefect, can anyone share me a link of parallel workflows?
    j
    • 2
    • 11
  • a

    Arlo Bryer

    03/23/2020, 10:12 AM
    Hi all - is anyone else having issues with loading cloud.prefect.io at the moment?
    j
    n
    • 3
    • 17
  • c

    Cab Maddux

    03/23/2020, 2:59 PM
    Hi, looking to clarify a few details about persistence and caching. Given Flow
    A
    containing task
    X
    which has input
    arg1
    , takes a long time to run and returns a string. It uses
    GCSResultHandler
    , with
    cache_for=datatime.timedelta(hours=1)
    and
    cache_validator=prefect.engine.cache_validators.all_inputs
    : 1. A run of flow
    A
    is triggered and the input for task
    X
    is
    arg1='abc'
    and the output is
    applebananacarrot
    2. 5 minutes another run of flow
    A
    is triggered and the input for task
    X
    is
    arg1='def'
    and the output is
    danceearflight
    3. 5 minutes later another run of flow
    A
    is triggered and the input for task
    X
    is
    arg1=abc
    After #1 we'll have a cached value for task
    X
    and input
    abc
    pointing to GCS URI storing pickled
    applebananacarrot
    . After #2 we'll have and additional cached value for task
    X
    and input
    def
    pointing to GCS URI storing pickled
    danceearflight
    . Then #3 will pull GCS URI for pickled
    applebananacarrot
    from cache. Is that correct?
    j
    • 2
    • 26
  • a

    Amit Singh

    03/23/2020, 3:27 PM
    @here I'm getting the following error when calling a task from a test case in pytest. Any clues. thanks
    flow = flow or prefect.context.get("flow", None)
            if not flow:
    >           raise ValueError("Could not infer an active Flow context.")
    E           ValueError: Could not infer an active Flow context.
    j
    c
    • 3
    • 14
  • j

    John Ramirez

    03/23/2020, 3:27 PM
    I upgraded to version
    0.9.8
    and saw this warning
    UserWarning: DEPRECATED: the result_handler Task option will be deprecated in 0.11.0, and removed in 0.12.0, in favor of the `result` option instead.
    is there a time line on these versions
    👀 1
    z
    • 2
    • 4
  • j

    John Ramirez

    03/23/2020, 4:17 PM
    On version 0.9.8 and received this error during testing:
    Unexpected error: AttributeError("'S3ResultHandler' object has no attribute '_client'")
    any thoughts?
    j
    j
    • 3
    • 17
  • m

    Matt Juszczak

    03/23/2020, 4:32 PM
    Hi all. Does Prefect support isolated workers? Meaning, a single master for all pipelines, but then separate physical workers (EC2 instances that are in different securitygroups, etc.) that only have access to the databases, etc. that they need access to? Part of our business requirements is single tenant for data pipelines.
    d
    • 2
    • 10
  • b

    Brad

    03/23/2020, 9:32 PM
    Hi team - is there a way to do a
    install
    with the
    DockerAgent
    ? Or, what is the easiest way to run the agent in the background ?
    c
    m
    • 3
    • 5
  • m

    Mark Williams

    03/23/2020, 11:00 PM
    Is there an example of running a flow using watchdog? I guess maybe make the first task the file watcher?
    j
    • 2
    • 2
  • a

    Adam Roderick

    03/23/2020, 11:12 PM
    Hi, I'm running into an error with flow.deploy(), and trying to figure out how to troubleshoot. Any ideas? On one development machine, we get the expected result:
    \prefect\core\flow.py:1331: UserWarning: flow.deploy() will be deprecated in an upcoming release. Please use flow.register()
      UserWarning,
    [2020-03-23 22:36:15,573] INFO - prefect.Docker | Building the flow's Docker storage...
    On a second development machine, we get an error with not much context:
    /prefect/core/flow.py:1331: UserWarning: flow.deploy() will be deprecated in an upcoming release. Please use flow.register()
      UserWarning,
    Expecting value: line 1 column 1 (char 0)
    c
    j
    a
    • 4
    • 16
  • a

    Aiden Price

    03/24/2020, 2:30 AM
    Hi Prefecters! I have an odd problem where I just need a bit of clarification. I’m trying to merge the results of two or more branches of execution, each one should return a Pandas Dataframe. So I had made a task that looks like this;
    @task(skip_on_upstream_skip=True)
    def concat(histories: List[pd.Dataframe]):
        return pd.concat(histories)
    But when I call it like
    concatted = concat([task1, task2])
    (note the manually built list from two branches of execution) then nothing happens, the
    concat()
    function is just not called. So trying to be clever I tried this instead;
    @task(skip_on_upstream_skip=True)
    def concat(*args: pd.Dataframe):
        return pd.concat(args)
    But it seems I’m fighting the framework there because I get this error when I try `concatted = concat(task1, task2)`;
    ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
    So is it best to go with the advice in the warning message or try a different way? Say something like;
    @task(skip_on_upstream_skip=True)
    def concat(**kwargs: pd.Dataframe):
        return pd.concat(x for x in kwargs.values())
    Then call with
    concatted = concat({"one": task1, "two": task2})
    c
    • 2
    • 7
  • b

    bardovv

    03/24/2020, 10:48 AM
    Does dotted lines indicate parallel processing?
    e
    • 2
    • 19
  • b

    bardovv

    03/24/2020, 1:05 PM
    how to do parllel branching in prefect?
  • b

    bardovv

    03/24/2020, 1:29 PM
    what does map in prefect do?
    z
    • 2
    • 1
  • k

    Kyle Foreman (Convoy)

    03/25/2020, 1:19 AM
    Hi all, I'm an epidemiologist by training and my former colleagues at the University of Washington (healthdata.org) [and collaborators around the world] are currently doing a lot of work on Covid-19 data collation and analysis, which is being documented here: Github repo: https://github.com/beoutbreakprepared/nCoV2019 Initial visualization: healthmap.org/covid-19/ Nature article: https://www.nature.com/articles/s41597-020-0448-0.pdf Lancet article: https://www.thelancet.com/journals/laninf/article/PIIS1473-3099(20)30119-5/fulltext They're having a hard time keeping up with all of the new sources, which they're largely manually checking and copying data from into a spreadsheet a few times a day. They're looking for help automating that process so that they can focus more on building models and delivering results to stakeholders in governments, health systems, and non-profits around the world. They are working right now on compiling a list of all of the websites they regularly gather data from and then I'd like to help them create Prefect Flows to run BeautifulSoup tasks a few times a day to check the sites for updates, parse the results, and add them to their data sources. @Chris White and @David Abraham have generously offered up a free Prefect Cloud account for us to use, so should be easy for us to get started. I expect there to be about 100 different sites to write parsers for, so I'd love to get some help crowdsourcing that effort. If you'd like to be involved, please respond here or email me at kyleforeman@gmail.com. I'll aim to have a kickoff meeting this Saturday so that we can figure out how best to tackle the problem.
    🚀 2
    👨‍⚕️ 6
    👍 6
    :upvote: 9
    😍 5
    s
    d
    +6
    • 9
    • 13
  • v

    Viktor Svekolkin

    03/25/2020, 9:21 AM
    I have a following question. I have following pipeline: first task checks S3 bucket for user files, where are two kinds of files, which must be processed differently. First task generates list of dictionaries with metadata, e.g. {'file_type': 'a', 'file_uri':'s3://something'}, on which i plan to map branch selector task, which will be fed to switch operator, which selects appropriate downstream postprocessing task. But i stuck on how to pass metadata after switch operator to selected downstream task. Can you help me with this usecase?
    n
    • 2
    • 9
  • b

    bardovv

    03/25/2020, 10:21 AM
    prefect.context.get  What is this used for?
    a
    k
    • 3
    • 2
  • a

    Arlo Bryer

    03/25/2020, 12:02 PM
    Hi - when we cancel a flow from Prefect Cloud it appears the jobs/tasks within continue to run. Are we misunderstanding something here?
    c
    • 2
    • 8
  • a

    Adam Roderick

    03/25/2020, 2:10 PM
    I'm getting an error when trying to invite a new user to our team in Prefect Cloud. Where should I go from here?
    n
    c
    • 3
    • 17
  • p

    Preston Marshall

    03/25/2020, 2:40 PM
    Question about how I'd go about something like this with Prefect. I have a bucket on Google Cloud Storage where files are dropped by third parties. The problem is there's no way to know how many, but they do come in batches. What I'd like to do is collect these files incrementally until one doesn't arrive for say 60 seconds. Is this something that Prefect can help with? It's certainly a difficult problem with Airflow.
    j
    e
    • 3
    • 8
  • e

    Eric Hauser

    03/25/2020, 5:08 PM
    @Kyle Foreman (Convoy) Please include me - ewhauser@gmail.com - on the distribution. I'm looking for hackathons for my team to participate in so we might be able to help
    👍 1
    k
    • 2
    • 1
  • i

    itay livni

    03/26/2020, 4:06 AM
    https://prefect-community.slack.com/archives/CL09KTZPX/p1585195541005600
  • b

    Bob Colner

    03/26/2020, 5:20 AM
    Question about migrating to prefect cloud. How can I get multiple agent to 'listen' to my cloud flow for task parallelism? This https://docs.prefect.io/cloud/tutorial/multiple.html#run-an-agent-manually talks about agents running multiple flows but what about multiple tasks?
    c
    j
    • 3
    • 17
  • l

    Luke Orland

    03/26/2020, 3:00 PM
    After building Docker storage using a custom local Dockerfile for multiple flows a la https://docs.prefect.io/cloud/recipes/multi_flow_storage.html#adding-flows-to-storage is it possible to execute a flow in situ locally, running in the container, to closely replicate remote execution?
    j
    • 2
    • 7
Powered by Linen
Title
l

Luke Orland

03/26/2020, 3:00 PM
After building Docker storage using a custom local Dockerfile for multiple flows a la https://docs.prefect.io/cloud/recipes/multi_flow_storage.html#adding-flows-to-storage is it possible to execute a flow in situ locally, running in the container, to closely replicate remote execution?
j

josh

03/26/2020, 3:01 PM
Do you want to run the flow in the context of a cloud run or a local run inside the container to see if it works?
More clearly: do you want the flow to communicate state with Prefect Cloud for this run or no?
l

Luke Orland

03/26/2020, 7:56 PM
This would be a local run but executing the
/root/.prefect/my_flow.prefect
within the container to see if it works. In that context, would it still need to connect to Prefect Cloud to get secrets, or would the
[cloud]
use_local_secrets = true
and all the configs in my
~/.prefect/config.toml
be honored during this local run?
j

josh

03/26/2020, 7:58 PM
If the
config.toml
inside the container contains your secrets then setting
use_local_secrets
to True would use them otherwise it should be fine grabbing the secrets from cloud if set to false. To run your flow inside that container do something like:
from prefect.environments.storage import Docker

flow = Docker().get_flow(flow_location="/root/.prefect/my_flow.prefect")

flow.run()
l

Luke Orland

03/27/2020, 12:28 AM
Traceback (most recent call last):
  File "./workflows.py", line 221, in <module>
    main()
  File "./workflows.py", line 207, in main
    flow_location=f'/root/.prefect/{flow.name}.prefect'
  File "/home/luke/.pyenv/versions/kepler/lib/python3.7/site-packages/prefect/environments/storage/docker.py", line 214, in get_flow
    with open(flow_location, "rb") as f:
PermissionError: [Errno 13] Permission denied: '/root/.prefect/my_flow.prefect'
j

josh

03/27/2020, 12:43 AM
How are you running the docker container?
docker run -it my_image
? Looks like in the image you build you don’t have root access
l

Luke Orland

03/27/2020, 12:58 AM
ah, I was assuming that since flow came from the Docker().get_flow,
flow.run()
would run
docker run -it ...
for you. I ran that and stopped getting the PermissionError. Now it seems to be failing to find a variable I added to my local prefect config. I'm not copying a
config.toml
into the container, so looks like I'll need to figure that out. Thanks for the guidance!
View count: 1