https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Shivam Bhatia

    01/11/2022, 11:23 AM
    Is it possible to make it such that one task runs after all the other tasks are completed?
    a
    10 replies · 2 participants
  • v

    Vamsi Reddy

    01/11/2022, 1:36 PM
    Hi everyone, we are building a flow where we are submitting steps to an EMR cluster. There are a lot of modules to be run. each module is a step. we submit the step and then check for its status. If the step is complete without errors we proceed to submit the next step. All of the modules/steps are in a huge list…….
    [module_name1, module_name2,....]
    . we are using a for loop to iterate over the list and submit the steps and check the status. However prefect does not submit these steps in sequence/order of the list. How do I make sure that the order is maintained ? Below is a screenshot of the code:
    a
    k
    26 replies · 3 participants
  • h

    Henrietta Salonen

    01/11/2022, 2:18 PM
    Hi, I’m trying to calculate some cost estimations for running cloud resources for our ETL pipelines in Prefect Cloud and to benchmark my own estimations I’d be curious to hear (just on a high level) what type of workloads your organization is dealing with and your monthly infrastructure costs. If you are using AWS EC2 instances (my plan is to use these for Docker Agent + DockerRun), I’d love to hear particularly the costs in this area in relation to your task workloads. I’m trying to find a cost efficient solution but want to start simple, although in future we may move more towards a setup described by @Anna Geller in this nice article: https://towardsdatascience.com/how-to-cut-your-aws-ecs-costs-with-fargate-spot-and-prefect-1a1ba5d2e2df Especially if scaling & maintaining the on-demand EC2 setup creates too much overhead
    a
    4 replies · 2 participants
  • b

    Ben Collier

    01/11/2022, 2:23 PM
    Just a quick one, we received an email with a summary of our throughput statistics and a champagne emoji - very nice, but I can imagine circumstances in which a client (not us on this occasion) may have to treat that as a security breach. I’m interested in opinions - anyone out there treating their number of tasks executed as confidential information?
    k
    a
    3 replies · 3 participants
  • p

    Prudhvi Kalakota

    01/11/2022, 2:33 PM
    Hi everyone, how do I name these tasks so that visually they appear with proper names ``````
    k
    5 replies · 2 participants
  • t

    Tom Shaffner

    01/11/2022, 3:48 PM
    I have a case where mapped runs seem to read the cache from other mapped runs. I'm using an approach liked the Complex Mapped Pipelines and have it set up to write a data pull process to a local result, cached for 10 hours, using
    _result_=LocalResult(_location_='{plan}_plan_data.prefect')
    . In my case my mappings are "plans". The problem though is that in one case a plan/map pulls data and caches it, and then a different plan/map subsequently reads that same data from cache and uses it!! Any idea what might cause this kind of behavior? It causes a bunch of my plan/maps to just not work.
    k
    23 replies · 2 participants
  • g

    Gabriel Milan

    01/11/2022, 4:06 PM
    Hi there, has anyone faced something like this before?
    Unexpected error: TypeError('no default __reduce__ due to non-trivial __cinit__')
    Traceback (most recent call last):
      File "/opt/venv/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/opt/venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 926, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
      File "/opt/venv/lib/python3.9/site-packages/prefect/engine/results/gcs_result.py", line 75, in write
        binary_data = new.serializer.serialize(new.value)
      File "/opt/venv/lib/python3.9/site-packages/prefect/engine/serializers.py", line 73, in serialize
        return cloudpickle.dumps(value)
      File "/opt/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/opt/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
        return Pickler.dump(self, obj)
      File "stringsource", line 2, in pymssql._mssql.MSSQLConnection.__reduce_cython__
    TypeError: no default __reduce__ due to non-trivial __cinit__
    I'm using Prefect 0.15.9 and pymssql 2.2.3. The task that raised it is the following
    @task
    def sql_server_get_connection(server: str, user: str, password: str, database: str):
        """
        Returns a connection to the SQL Server.
        """
        log(f"Connecting to SQL Server: {server}")
        # pylint: disable=E1101
        return pymssql.connect(
            server=server, user=user, password=password, database=database
        )
    k
    2 replies · 2 participants
  • h

    Hwi Moon

    01/11/2022, 6:09 PM
    What would be the cleanest way to write a brief description about what a flow does in the UI?
    k
    c
    12 replies · 3 participants
  • d

    Daniel Kornhauser

    01/11/2022, 6:13 PM
    Hi, trying to get only get the error logs from graphql. Does anybody know how I can restrict the query below so I don’t get all the logs levels?
    query {
    	flow_run{
        logs {
          message
          level
          info
          created
        }
      }
    }
    Of course a totally different query to get error logs from graphql would also be welcomed
    k
    2 replies · 2 participants
  • m

    Matthew Seligson

    01/11/2022, 7:18 PM
    What’s the difference/relationship between a flow group id and a version group id? I’ve noticed that as a republish a flow I see the flow ID change while the flow group id and version group id stay the same
    k
    28 replies · 2 participants
  • h

    Heeje Cho

    01/11/2022, 9:51 PM
    Hi, how do i toggle local/cloud get of PrefectSecrets? I set a few keys in the cloud UI but I am unable to retrieve them with PrefectSecret currently
    k
    a
    6 replies · 3 participants
  • l

    Leon Kozlowski

    01/11/2022, 10:31 PM
    Is there a process to suggest a field get exposed in the graphql api? If so, I wasn’t able to see duration for flow runs as an available field, I understand it can be derived from start_date and end_date, but just wanted to put this out there
    k
    2 replies · 2 participants
  • s

    Sidney Fox

    01/11/2022, 11:13 PM
    Hi - looking to pass AWS credentials to a Kubernetes agent at runtime so as to interface with the pynamoDB python library. I’m getting the following error when I attempt to scan a table in DynamoDB:
    Failed to scan table: Unable to locate credentials
    I have
    AWS_CREDENTIALS
    stored in Prefect cloud as a Secret, and I’ve tried passing credentials as envs passed to KuberenetesRun:
    env={
            "ACCESS_KEY": Secret("AWS_CREDENTIALS").get().get("ACCESS_KEY"),
            "SECRET_ACCESS_KEY": Secret("AWS_CREDENTIALS").get().get("SECRET_ACCESS_KEY")
        }
    Returns the same error. What’s the best / preferred approach to authenticate a Kubernetes agent against AWS?
    a
    2 replies · 2 participants
  • j

    Jeff Wiens

    01/12/2022, 12:42 AM
    What options are available for storing and running flows that span multiple files (e.g., flow.py, flow_tasks.py)?
    k
    1 reply · 2 participants
  • m

    M. Siddiqui

    01/12/2022, 8:33 AM
    Hello everyone, I am currently looking to try out a self hosted Prefect Server and UI instance on AWS. Based on the services I saw in the docker-compose file over here: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/docker-compose.yml Has anyone attempted to deploy this on an ECS Fargate cluster ? Apart from the Postgres instance, I would like to deploy each service into its own Fargate instance. I am just wondering what could be some sane cpu and memory specs to kicks things off assuming I have a few flows with with an overall 1000ish tasks running twice every hour. I would also like to know if all these services are stateless, meaning I can scale them horizontally via a load balance ? Or are there any services here which can only be scaled vertically in case of performance bottlenecks ? Any help would be appreciated ! 🙏
    a
    4 replies · 2 participants
  • d

    Dekel R

    01/12/2022, 10:07 AM
    Hey all, Im running a flow using VertexRun - It has multiple steps and it runs perfectly. Once a week at a specific time I’d like this flow to invoke another flow that will train a model. I used this code -
    if pendulum.today('America/New_York').weekday() == 2:  # Monday is 0 so Wednesday is 2
            x_flow = create_flow_run(flow_name=PREFECT_TRAIN_FLOW_NAME, project_name=PREFECT_TRAIN_PROJECT_NAME)
            wait_for_flow_a = wait_for_flow_run(x_flow, raise_final_state=True)
    This code is of course inside my “with Flow….” code block. Now when running this code alone (in a dummy flow) - it works and x_flow gets invoked. But when running this code in my real flow, after some other tasks - nothing happens. I cannot even see the task of “wait_for_flow” in prefect cloud(flow -> tasks tab) - seems like its getting ignored. What am I missing here? Thanks
    a
    3 replies · 2 participants
  • r

    Ram Vuppaladadiyam

    01/12/2022, 12:44 PM
    Hello! Is there a way to run flows solely on EC2 agents without using ECS? I've seen some past messages about running flows on EC2 with ECS, but my team is looking to only use EC2 and spin up/down instances based on a custom image on our end.
    a
    5 replies · 2 participants
  • t

    Tony Liberato

    01/12/2022, 2:50 PM
    Just want to say I have been focusing on getting prefect examples ready to show my team and after a few lessons learned it all worked like a charm. I am excited to be on the cusp with using Prefect to manage our data pipelines. The reporting and graphs already are way more than I expected. Thanks.
    :thank-you: 1
    🚀 4
  • j

    Justin Green

    01/12/2022, 3:00 PM
    Hello, I have a situation where I am generating tasks at run time based on some configuration (so the number of tasks may be different depending on config). I would also like these tasks to be able to be run in parallel using the local dask executor. The difficulty I'm running into is that I need the tasks to be defined individually in the flow in order for the local dask executor to be able to run them in parallel. I do not know how many tasks there will be until runtime, so I cannot define the flow with individual tasks. I cannot simply loop through the task functions in the flow because the executor will not execute the tasks in parallel. Is there a recommended way to accomplish this?
    a
    2 replies · 2 participants
  • r

    Rouven

    01/12/2022, 4:11 PM
    Dear Prefect-Community, My Senior send me here to politely ask if it is possible to map prefect-task to individual kubernetes pods instead of paralell python-tasks. :)
    k
    6 replies · 2 participants
  • q

    Qwame

    01/12/2022, 4:21 PM
    I've been playing with Prefect Orion and I'm wondering how I can view the logs associated with a run in the UI?
    k
    m
    3 replies · 3 participants
  • s

    Suresh R

    01/12/2022, 4:57 PM
    Hi! How i can import my custom library to the flows run through kubernetes agent?
    k
    2 replies · 2 participants
  • j

    Jason May

    01/12/2022, 5:30 PM
    is there an equivalent to task.map but runs the mapped parameter elements in serial instead? basically identical to map but forced to run in order. I looked at the loop signal as an option, but I don't think that will lay out the schematic in the UI as nicely as map does
    a
    k
    11 replies · 3 participants
  • d

    Didier Marin

    01/12/2022, 5:32 PM
    Hello all ! I'm looking for a simple way to modify a PrefectResult, in order to re-run a task with the correct input without re-running the whole flow. Is there a way to do it in Python, or otherwise with a mutation in GraphQL? I explored the schema but could not pinpoint where to find the result given its "location" id 🤔
    k
    3 replies · 2 participants
  • j

    Jawaad Mahmood

    01/12/2022, 6:46 PM
    Hi all! This may be more of a Docker SDK question, but was hoping a friendly prefecter might have encountered this issue. I'm trying to bind a Windows network path for my flow's run_config. I read the Docker SDK docs on create_host_config, but not sure what I am supposed to change. For example:   1) I can use the code below to successfully bind a local file path (for read/write) to a Docker container   2) However, if I use a UNC path to a network drive, I receive a "Server Error" that the path is not a valid Windows path Any help is welcome, thank you!
    ### THIS CODE ALLOWS ME TO BIND A LOCAL FILE PATH
    from prefect.run_configs import DockerRun
    import docker
    
      flow.run_config = DockerRun(labels=['my-label']
                                ,host_config={'mounts':[docker.types.Mount(target='/public'
                                                                            ,source=r'//c/some/local/path'
                                                                            ,type='bind')
                                                    ]
                                            }
                            )
    
    ### THIS CODE THROWS ERROR 
      flow.run_config = DockerRun(labels=['my-label']
                                ,host_config={'mounts':[docker.types.Mount(target='/public'
                                                        ,source=r'\\path\to\windows\network\shared\drive'
                                                                            ,type='bind')
                                                    ]
                                            }
                            )
    k
    4 replies · 2 participants
  • c

    Chris Reuter

    01/12/2022, 7:57 PM
    👋 hey all, if there's anyone new to the community hanging out here, @Kalise Richmond and I are going to go live on Twitch in 5 minutes and go from 0 to first flow. See you at twitch.tv/prefectlive!
    🔥 4
  • f

    Filipe Reis

    01/12/2022, 8:32 PM
    not wanting to disrupt the stream, do you guys have any timeline to prefect orion? looked at that and I'm wanting to start moving legacy code into prefect but I love how orion looks and didn't want to start moving and then having to deep dive again to change it once more ^^
    k
    2 replies · 2 participants
  • c

    Christoph Deil

    01/12/2022, 9:39 PM
    How do I create and run flows on a schedule in Prefect Orion? Let’s say I have this:
    schedule = IntervalSchedule(interval=datetime.timedelta(seconds=10))
    deployment_spec = DeploymentSpec(name="hola", flow=greetings_flow, schedule=schedule)
    Do I now use OrionClient and some methods to deploy? We currently use Prefect core in a pod and simply do flow.run() with a schedule attached, and I’m looking for a working example to do the equivalent in Orion (even if I gather behind the scenes it will do something else via a server and DB). Basically I’m looking for this: https://orion-docs.prefect.io/concepts/deployments/#running-deployments-with-the-api 🙂
    m
    9 replies · 2 participants
  • d

    Daniel Komisar

    01/12/2022, 10:10 PM
    Hi everyone, are results returned from the API sorted by
    created
    (or any other field) guaranteed to be returned in the same order? Thanks!
    k
    5 replies · 2 participants
  • j

    Jason Motley

    01/12/2022, 10:54 PM
    What's the easiest way to specify to run N simultaneous extracts with Dask? E.x. each extract is like this:
    df_2015 = extract_past(connection, start_date="2015-01-01", end_date = "2015-12-31", task_args={"name": "Extract 2015"})
    k
    6 replies · 2 participants
Powered by Linen
Title
j

Jason Motley

01/12/2022, 10:54 PM
What's the easiest way to specify to run N simultaneous extracts with Dask? E.x. each extract is like this:
df_2015 = extract_past(connection, start_date="2015-01-01", end_date = "2015-12-31", task_args={"name": "Extract 2015"})
k

Kevin Kho

01/12/2022, 10:57 PM
You might be able to map? Just the connection needs to go inside the task rather than passed in. Are these csv files or parquet?
j

Jason Motley

01/12/2022, 10:58 PM
reading out of a data warehouse
The full extract is too slow, so I'm splitting it apart, running some transforms in python, then running a loop for the load to incrementally load 1/10th of the total data
But i'd like to speed that extract component if possible
k

Kevin Kho

01/12/2022, 11:03 PM
Ah then the best bet is just to map and create the connection inside the task. Map over a list of start dates and end dates. The limit would be the concurrent connections your warehouse allows
j

Jason Motley

01/12/2022, 11:04 PM
Gotcha, that makes sense, thank you!
View count: 5