https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Adam Kelleher

    07/27/2020, 7:16 PM
    Hello Prefect Community! Are there any example repos of complete projects using Prefect server? I feel fairly confident in my understanding of the component pieces, but I'm having trouble seeing the whole thing as a finished product.
    👍 4
    k
    • 2
    • 4
  • w

    Wing Rider

    07/27/2020, 9:06 PM
    Hi. Newbie here. Was checking fitment of prefect for a etl project. Can we trigger prefect flow based on condition. Say, when a file arrives ? (like a file polling service)
    d
    • 2
    • 36
  • a

    Antony Southworth

    07/28/2020, 1:31 AM
    đź‘‹
    đź‘‹ 5
  • a

    Antony Southworth

    07/28/2020, 2:15 AM
    Hi all, been reading through the docs the last day or two; had a few questions. I'm coming from Airflow so my mental model of things might just be out-of-whack 1. Is there any best-practise recommendations about task sizing? I know the docs say "anything you can write in Python can be a task", but what are the trade-offs there? You could say the same thing about Airflow tasks (via
    PythonOperator
    ), but generally you don't want them too small or else things get sluggish due to scheduler overhead. 2. How does Prefect handle data for mapped tasks? For example, if I have a flow
    t = lambda x: x +1
    ,
    r1 = t.map(range(10))
    ,
    r2 = t.map(r1)
    , my understanding is that Prefect would distribute the computation of
    r1
    across the workers, then collect the results into a list on the "central" machine (where
    flow.run()
    was invoked), then serialise each element again and send the elements to the workers in order to distribute the computation of
    r2
    . This seems a bit inefficient (we compute the results on the worker, serialise, send back to central scheduler, deserialise, then serialise, send back to worker, deserialise, and then do more work). 3. How do folks use
    Task.map
    in practise? For example, would it be weird to run
    Task.map
    for each row in a dataframe? I guess this is related to the first question; basically "how small is too small for tasks/`Task.map`?" 4. Is there any way to "re-run" old instances of tasks? E.g if I had a scheduled process running daily, and I need to re-run the one from Wednesday last week, is there a convenient way of doing that in the UI? I guess I'm basically asking "can you do
    airflow backfill
    ?". 5. How do people handle returning large results from Tasks? For example, if I have a task in my Flow that returns a 1GB dataframe, I don't really want that to be serialised and sent back to the "central" machine (where
    flow.run()
    was invoked), cause that machine might have other work to do (and I don't want to waste time on ser/de, or loading the entire thing into memory just to serialise it and send it to another worker). For Airflow, I usually store things on S3 and use XCom to tell other tasks where the result was stored. Would the "Prefect Way" of doing this be for my task to upload its results to S3 and return the path as the return variable from the Task? 6. Is there any way to run Flows in a "streaming" or asynchronous fashion? For example, if my task could
    yield
    rows from a dataframe, and the downstream tasks accepted an
    Iterator
    . Again, just thinking in terms of memory-usage, it would be nice to not require the entire dataframe loaded in-memory. Apologies if these are all covered in the docs; I thought I got through them pretty thoroughly but it's possible I may have missed some parts.
    c
    • 2
    • 3
  • s

    Sven Teresniak

    07/28/2020, 9:09 AM
    Hi. I use Prefect Server. I have a question regarding results. Checkpointing is active and I use a
    LocalResult
    as a flow's
    result=mylocalresultinstance
    to persist task results. The data is written to a NFS share which is available to DaskWorker and the agent (exact same mointpoint). With default settings (no
    result
    keyword in the flow definition) all worked well. Now I use some templating to better organize (and later cleanup) results. Question: 1. The UI states that I cannot restart a task because
    Warning: If this flow run does not have a result handler, restarting is unlikely to succeed
    . Is the text now aware of the deprecated
    result_handler
    keyword and maybe checks the wrong setting? Does the UI need the NFS share (the result location) as well? Or any Prefect related service except agent and dask worker? 2. Once my
    LocalResult
    is working. Is it possible to access results from the UI? I know I can load results using the `Result`'s subclasses. But it would be easier for testing and debugging 3. Is there an elegant way to get rid of old results? Deleting old flow versions (and their runs) does not remove results. 4. How do I find which keyword parameters I can use in the format string of a
    Result
    's
    location
    string? Everything from
    prefect.context
    and all keywords a
    Task.run()
    got? 5. Is Prefect using (persisted) results in any way? Let's assume 1. is a bug and fixed. Does a retry for a failed task run reading its input from (maybe persisted) result objects? Something else? 6. The documentation does not say a word about uniqueness. If Prefect is using results (see 5.), then each persisted result must be unique, right? That is, overwriting a result by accident could lead to complete fuckup?
    j
    m
    • 3
    • 17
  • a

    Adam

    07/28/2020, 1:28 PM
    Hi all, hope you're having a lovely day. I'm having some trouble with docker storage on CircleCI. Hoping someone can help me debug it. When calling
    Docker(registry_url=...)
    I get the following exception:
    docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', BadStatusLine('\x15\x03\x01\x00\x02\x02\n'))
    Any ideas why that is? FWIW, I'm using Google Container Registry and I've already authenticated with it (running
    docker pull <http://gcr.io/etc/etc/myprivateimage|gcr.io/etc/etc/myprivateimage>
    works fine). CircleCI does use a 'remote docker' to run such commands so perhaps this command isn't able to connect to it?
    z
    j
    +3
    • 6
    • 13
  • m

    mithalee mohapatra

    07/28/2020, 2:09 PM
    Hi,My flow run failed when started from the Prefect UI. Logs say :"AttributeError: 'Flow' object has no attribute 'slugs' ". After finding the solution in the slack channel i tried to get my current prefect version. My current prefect version is "0+unknown". I tried to force reinstall prefect version again but still getting the same unknown version.What does it mean.
    z
    j
    • 3
    • 11
  • k

    karteekaddanki

    07/28/2020, 7:20 PM
    Hi guys, I've posted a version of this question yesterday. My mapped task when run through the cloud API is not running concurrently when using
    LocalEnvironment(executor=DaskExecutor())
    . Is this a limitation of the dev version of the cloud API? When using
    LocalDaskExecutor
    and running the flow with
    flow.run()
    results in concurrent execution of the tasks. I am using Docker storage for the flows. How can I debug what's going on in this case?
    j
    r
    • 3
    • 21
  • s

    Skip Breidbach

    07/28/2020, 9:28 PM
    For Resource Managers, is there some way for the
    cleanup()
    method to detect if the tasks in the context succeeded or failed? I see the comment
    A Task is automatically added to call the cleanup method (closing the Dask cluster) after all tasks under the context have completed.
    in the documentation, but can't see a way to access that created
    Task
    . (I have a situation in which it would be convenient to execute only part of my
    cleanup
    task when something in that context fails.)
    j
    • 2
    • 14
  • a

    Adam Roderick

    07/28/2020, 11:34 PM
    I have a developer on a windows machine running into an error when trying to deploy a pipeline. Looks like a path issue for windows.
    c
    • 2
    • 18
  • d

    Daniel

    07/28/2020, 11:46 PM
    I have a kinda general ETL question, I think I'm doing something wrong with my Flow. If I comment out the second part, everything runs in ~3 minutes. Once I had the second part back in, everything takes 20+ minutes and the ORM starts rolling back transactions and then the Flow kills the worker at some point. Is it related to how I'm passing the results from extract_messages to the "extract_message_entity_mapping" function maybe?
    # ETL Messages
            extract_messages = etl_msg.extract_messages(company_ids)
            process_messages = etl_msg.process_messages(extract_messages)
            truncate_messages = etl_msg.truncate_messages(upstream_tasks=[process_messages])
            load_messages = etl_msg.load_messages(
                process_messages, upstream_tasks=[truncate_messages]
            )
    
            # ETL Message Entity Mapping
            extract_message_entity_mapping = etl_msg.extract_message_entity_mapping(
                extract_messages
            )
            process_message_entity_mapping = etl_msg.process_message_entity_mapping(
                extract_message_entity_mapping
            )
            truncate_message_entity_mapping = etl_msg.truncate_message_entity_mapping(
                upstream_tasks=[process_message_entity_mapping]
            )
            load_message_entity_mapping = etl_msg.load_messages(
                process_message_entity_mapping,
                upstream_tasks=[truncate_message_entity_mapping],
            )
    c
    • 2
    • 20
  • d

    delphi

    07/29/2020, 9:35 AM
    I have a beginner's question. Can it be possible to share the registered flows among team members? The scenario is as follows: 1. There are 2 or more members in the team, they are all either flow developer or flow runner 2. One member develop a flow according to the agent environment on his desktop PC and register the flow to Prefect Server 3. Another member can run this registered flow on his own desktop PC env 4. Due to different env parameters for different desktop PC's, e.g. different logged in account at the desktop, we cannot centralize the agent run even though the FLOW itself is the same 5. Even though the agents are distributed, we still want to make the launch of flows centered in Prefect Server UI The question is : from the document I can see there is no actual execution information exchange between agent and Server which I thought the Python scripts are also kept local. Then how I can achieve the effects above? The question can be divided into two: 1. How to share scripts between different members? Maybe Git or other version control is right solution which I want to confirm it 2. The flow register process --- Is it a must for different agents on different PC to register their own Flows to Server ? More background: This environment is a Prefect Server installed in an ECS server which can be accessed by each team member. I don't know whether it's feasible for us to do like this. The document only describe local scenario for Prefect Server. Thank you for your kind help!
    j
    d
    a
    • 4
    • 5
  • a

    Adam

    07/29/2020, 2:00 PM
    Hello friends! A quick Prefect Cloud queston. The Task Concurrency page is coming up blank. The other pages work great though. Any ideas?
    z
    n
    • 3
    • 4
  • a

    Adam

    07/29/2020, 3:12 PM
    Another question for all you lovely people. Part of my flow requires that I make ~10k http requests (one per user) and then take each result and store it into the same database table. My plan is to have something like: • Task: get list of users • Task: make api request to third party (map over this task with list of users) • Task: store results of API requests with a single SQL insert My question for you: is it wise to have 10k tasks running? I would assume so from the previous threads and docs. I also plan on limiting the Task Concurrency to not exceed my rate limit with the external party. How does this look from an infra point of view? I'm currently running a KubernetesAgent. Will the single Kubernetes Job be running those 10k API requests?
    j
    • 2
    • 12
  • s

    Scott Zelenka

    07/29/2020, 3:31 PM
    Has anyone had success registering the Prefect Agent on a Private Subnet on AWS EKS? We can get the Pod running, and will register with Prefect Cloud to pick up pending Flows just fine. The challenge we're running into, is that the
    agent
    container is failing liveness probes, and killing the Pod.
    Warning Unhealthy 43s (x2 over 83s) kubelet, .internal Liveness probe failed: Get <http://10.0.28.4:8080/api/health> : dial tcp 10.0.28.4:8080: connect: connection refused
    I'm chatting with AWS right now, and they're asserting that because kubelet is attempting to connect on port 8080, that the Deployment YAML for
    agent
    should be exposing that. But I don't have this problem when deploying the same YAML on GCP, OpenShift, or bare metal K8.
    j
    s
    • 3
    • 8
  • d

    David Elliott

    07/29/2020, 5:27 PM
    Hey all, I’m new to prefect but loving it so far! I’m encountering an issue on prefect cloud - when I try to run my flow (I’ve tried Run, Quick Run via the UI & also
    prefect run cloud xxxx
    from the CLI) - it’s giving me
    There was a problem running your flow
    . I’ve tried logging out/in as it suggests, but still the same issue. The CLI returns
    prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}]
    I had this issue with a big flow (770 tasks) this morning and assumed it was a size problem, so I switched to a smaller flow of 368 tasks which ran a couple of times ok, but I’ve just tried to run that same flow (368) again and it’s giving me the same error. Any thoughts / suggestions? Many thanks in advance!
    j
    j
    • 3
    • 7
  • m

    Matthew Maldonado

    07/29/2020, 5:28 PM
    Does prefect create logs?
    j
    • 2
    • 3
  • m

    Michael Ludwig

    07/29/2020, 6:39 PM
    Does retrieving the tasks results locally still work as described in here? https://docs.prefect.io/core/idioms/task-results.html Wondering what I might do wrong (Only getting
    None
    ):
    end_state = flow.run(executor=executor,)
    for result in end_state.result:
        <http://logger.info|logger.info>(f"Result: {result} -> {result.result}")
    Flow run SUCCESS: all reference tasks succeeded
    Result: <Task: user-to-cluster-snowflake-loader-free> -> None
    Result: <Task: asp_ssm_parameter_writer> -> None
    Result: <Task: mood_scorer> -> None
    Result: <Task: rfy_predictor_FREE> -> None
    Result: <Task: rfy_bucket_prefix_updater_Variants.main> -> None
    Any ideas? Not using the newer
    Result
    construct though but would be great to grab the return values at the end of the flow run
    j
    • 2
    • 5
  • a

    Amit

    07/29/2020, 9:06 PM
    Why would a flow could be stuck in Scheduled (late run), but not running, I can see prefect agent on kubernetes is able to connect to the cloud (as per the dashboard). I don't have any other flow running. I am on free plan.
    j
    • 2
    • 13
  • d

    Doran

    07/29/2020, 9:43 PM
    I’m trying to run a flow.. but getting stuck in pending. My flow is getting registered with a hostname as a label which I presume is my issue, but I can’t figure out how to not have that label
    j
    c
    m
    • 4
    • 7
  • d

    delphi

    07/30/2020, 7:43 AM
    Just have another beginner's question. Is there any place to describe the detailed configurable items for config.toml and backend.toml. I feel most questions could be solved by these two config files. But I can only find some of the configurable items by just reading the answers to questions which is not an efficient way.
    j
    • 2
    • 3
  • a

    Adam

    07/30/2020, 8:49 AM
    Hi all! Anyone online to help debug why my flows aren't starting on Prefect Cloud? They're scheduled and the agent is responding but nothing is starting
    j
    n
    • 3
    • 5
  • a

    Adam

    07/30/2020, 1:25 PM
    Friends, another question from me. We're running Prefect Cloud + KubernetesAgent + DaskKubernetesEnvironment. How should we go about accessing existing Kubernetes Secrets and ConfigMaps from within the tasks? I'm not clear on how to make those available to the Dask Jobs as env vars? Would I have to supply a custom
    worker_spec_file
    that includes env vars with
    configMapKeyRef
    ?
    đź‘€ 1
    d
    • 2
    • 7
  • c

    Chris Martin

    07/30/2020, 3:24 PM
    Hello, We’re looking at Prefect for some of our more etl-type workflows and an important feature here is that a Prefect task may wish to submit a job to a delegate system- for example EMR or AWS batch. One consideration here is what we should do if a flow is kicked off and we subsequently realise that it needs to be stopped (e.g. because there is some bug in our processing code and the flow is therefore going to yield incorrect results). In this case, the ideal situation would be to have both an easy way of stopping the flow and a mechanism such that any external tasks spawned by the flow (e.g. an AWS batch run) are also terminated. So far, my understanding of Prefect is that you can’t really cancel a running flow. The best you can do is mark a flow run as failed, but I think all this does is stop any pending tasks being scheduled- it doesn’t actually interrupt any running tasks. Moreover it doesn’t allow any cleanup to be done. For example any prefect resource managers in play won’t have their cleanup() methods invoked. This being the case, I think the best that you an do is to manually locate and cancel the external task and then go back and mark the prefect flow as failed. It does, however, look like there’s some active development in this area- specifically: https://github.com/PrefectHQ/prefect/pull/2942. I therefore wondered what the long term goal was here and whether it might support a use case similar to the one I have outlined? In our case, what would be ideal is if the task class itself supported some sort of callback which would alert you to the fact the task was being cancelled.
    đź‘€ 1
    l
    l
    j
    • 4
    • 14
  • a

    Amit

    07/30/2020, 4:26 PM
    Hi Is there a way to access Prefect Agent's environment (K8s agent) variable in the flow Job Pod?
    đź‘€ 1
    d
    • 2
    • 7
  • m

    Mike Nerone

    07/30/2020, 4:39 PM
    Greetings! I have an existing app that is in essence an ETL app, but it currently runs in one process. I now wish to scale it out, and I’m hoping to use Prefect to do that. My question is this, and please forgive me if this is a n00b question: the result set that ultimately comes out of the Extract is potentially far more than will fit in memory, so it’s currently implemented as an (async) generator that makes paginated requests so it can yield chunks to the Transform. It doesn’t seem as if this pattern is currently supported by Prefect, so can you give me any advice on “The Prefect Way” to handle a large-data problem like this? My first thought was that instead of being a generator, the Extract task could itself kick off a parametrized flow for each chunk of data, but that seems like a lot of overhead for potentially thousands (or even tens of thousands) of chunks (and I’m not positive a task can do that anyway). Is there perhaps some other mechanism I might use to stream data between longer-running tasks?
    đź‘€ 1
    l
    p
    • 3
    • 11
  • m

    Mac Gréco Péralte Chéry

    07/30/2020, 4:43 PM
    Hello every one! Can you modify a [mapped] task name on runtime? I am doing a project where i do web-scraping to retrieve data for each institution (hospital) on a web site. instead of
    [2020-07-30 16:24:11] INFO - prefect.TaskRunner | Task 'Scraping Institution Data[0]': Starting task run...
    I would like to have
    [2020-07-30 16:24:11] INFO - prefect.TaskRunner | Task 'Scraping Institution Data[0] [hospital name]': Starting task run...
    I think this would be great when you visualize the flow run in the UI to know by the task name for which hospital the web scrapping Failed. Whithout going the the logs P.S Currently i am logging the Hospital name to know What hospital data is currently web-scraped
    @task(name="Scraping Institution Data")
    def scrape_institution_data(instActionIdTuple):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"Scraping for site: {instActionIdTuple[0]}")
    So at runtime i get:
    [2020-07-30 16:24:11] INFO - prefect.TaskRunner | Task 'Scraping Institution Data[0]': Starting task run...
    [2020-07-30 16:24:11] INFO - prefect.Scraping Institution Data[0] | Scraping for site: HĂ´spital Claire Heureuse de Marchand Dessalines
    đź‘€ 1
    i
    d
    • 3
    • 3
  • a

    Amit

    07/30/2020, 7:15 PM
    Did anyone had any timezone issues while running flow, with a docker storage, which was built using a custom Dockerfile? (This is ran on Kubernetes, via Kubernetes Agent) I get the following error, while running a flow:
    Traceback (most recent call last):
      File "/opt/conda/envs/my_project/bin/prefect", line 10, in <module>
        sys.exit(cli())
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/prefect/cli/execute.py", line 50, in cloud_flow
        client = Client()
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/prefect/client/client.py", line 82, in __init__
        self._access_token_expires_at = pendulum.now()
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/__init__.py", line 211, in now
        dt = _datetime.datetime.now(local_timezone())
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/__init__.py", line 60, in local_timezone
        return get_local_timezone()
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/local_timezone.py", line 35, in get_local_timezone
        tz = _get_system_timezone()
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/local_timezone.py", line 63, in _get_system_timezone
        return _get_unix_timezone()
      File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/local_timezone.py", line 242, in _get_unix_timezone
        raise RuntimeError("Unable to find any timezone configuration")
    RuntimeError: Unable to find any timezone configuration
    j
    c
    m
    • 4
    • 7
  • s

    Skip Breidbach

    07/30/2020, 10:34 PM
    Small oddity here. I've been working with a Docker agent and "Non-Docker Storage for Containerized Environments". I was having some issues getting the container to be able to pull from S3, but added an env var to my container and all was well. Then I read further and saw the "Authentication for using Cloud Storage with Containerized Environments" section and thought I'd try that, essentially adding
    secrets
    to the flow storage. But when I do, I get a surprising (to me) error when the flow is executed:
    Failed to load and execute Flow's environment: HTTPError('400 Client Error: Bad Request for url: http://host.docker.internal:4200/graphql')
    It looks to me like the container thinks it's using the cloud backend or something? Any ideas?
    c
    • 2
    • 4
  • m

    Michael Ludwig

    07/31/2020, 7:07 AM
    We are doing our first experiments with mappings. In theory we like them but in praxis we saw some not explainable behaviour and would be curious if we are doing something wrong or if something is broken. Our flows have an integrated Skip Functionality (from times before the cached state and the Results where existing). This means when I re-run the “same” flow (new flow but on the same input data as before) already computed tasks are sending a SKIP signal. That works also great. When adding mapped function we suddenly see 1) old tasks which were already computed being re-executed and 2) mapped tasks to be failed instead of skipped when the parent was skipped. For 1) we still need to find out how we produced this but for 2) I have this screenshot:
Powered by Linen
Title
m

Michael Ludwig

07/31/2020, 7:07 AM
We are doing our first experiments with mappings. In theory we like them but in praxis we saw some not explainable behaviour and would be curious if we are doing something wrong or if something is broken. Our flows have an integrated Skip Functionality (from times before the cached state and the Results where existing). This means when I re-run the “same” flow (new flow but on the same input data as before) already computed tasks are sending a SKIP signal. That works also great. When adding mapped function we suddenly see 1) old tasks which were already computed being re-executed and 2) mapped tasks to be failed instead of skipped when the parent was skipped. For 1) we still need to find out how we produced this but for 2) I have this screenshot:
View count: 1