https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Christopher

    02/10/2022, 6:10 PM
    I'm also not totally clear on the relationship between ecsagent and ecsrun. If I didn't use ecsrun, would the flow run on the ECS service that's running the agent directly?
    k
    3 replies · 2 participants
  • j

    James Sutton

    02/10/2022, 8:46 PM
    Hi Prefect Community. I’ve been trialing Prefect to replace Airflow and documented my thoughts to date here, along with a brief setup guide, not sure if this would be useful for others: https://jamesutton.medium.com/my-experience-with-prefect-so-far-and-a-quick-getting-sta[…]bc?source=friends_link&sk=58e6e074ee852ffbef2e46999d43b1c7
    ❤️ 5
    k
    1 reply · 2 participants
  • d

    Daniel Nilsen

    02/11/2022, 7:40 AM
    Hi! Im trying to start a run though gql. I am using this mutation where the input is a variable: Anyone know what the type is supposed to be for the input?
    mutation MyMutation($input: any) {
        create_flow_run(
          input: $input
        ) {
          id
        }
      }
    k
    11 replies · 2 participants
  • f

    Faisal k k

    02/11/2022, 8:07 AM
    Hi I need to add Prefect Cloud API IPs for whitelisting in Security group of EC2 where Agent is running ... Couldn't find IP range... Anyone know where can I find it?
    k
    2 replies · 2 participants
  • m

    Michael Hadorn

    02/11/2022, 8:21 AM
    Question about Orion: generic tasks, flow build without running it. Details in thread.
    a
    9 replies · 2 participants
  • w

    William Edwards

    02/11/2022, 12:45 PM
    I'm trying out Prefect. Two questions: • I added and registered a flow with a local server. It has caught my attention that when I re-add the exact same flow, a new 'version' is created. What is the best practice for adding a flow? Am I supposed to add it just once and again for updates? Or always before creating a new flor run? • What is the typical separation of concerns? Should I just call
    create_flow_run
    in the client, or should the client be doing anything else?
    k
    3 replies · 2 participants
  • r

    Robert Kowalski

    02/11/2022, 2:28 PM
    Hi, It is possible to fix MAC address when docker storage is using ? Docker run command have
    docker run --mac-address="70:ca:9b:ce:67:ae" IMAGE
    So I want set constant mac address when define docker storage as:
    storage = Docker(
        env_vars={"PYTHONPATH": "$PYTHONPATH:/pipeline"},
        files={f'{parent_dir}': '/pipeline'},
        image_tag=os.environ.get('IMAGE_TAG'),
        image_name=flow_name,
        stored_as_script=True,
        path='/pipeline/flow.py',
        extra_dockerfile_commands=[]
    )
    Someone has tried to achieve something similar ?
    k
    3 replies · 2 participants
  • j

    Josh

    02/11/2022, 3:07 PM
    I’m trying to dynamically name a flow following these instructions. But is there some way to name the flow run with the labels used or the environment variables present in the flow run?https://github.com/PrefectHQ/prefect/discussions/3881
    k
    8 replies · 2 participants
  • f

    FuETL

    02/11/2022, 4:28 PM
    Hey guys, is this setup possible? I already a have
    with Flow("dummy_flow") as flow:
       ... # parameters, logic, etc
    I want to re-use this same flow (create another one), but reuse the logic of the previous one without have to copy-and-paste the old one is that possible? I want to create a second flow ex: "second_dummy_flow" but using the same logic of dummy_flow, so on prefect ui i will see 2 flows that will do the same thing
    k
    2 replies · 2 participants
  • k

    Kevin Mullins

    02/11/2022, 4:55 PM
    I’m working on a flow that will fan-out/fan-in in a dynamic manner. For each path, there are linear steps that are chained together. I’m taking advantage of
    map
    and have been successful with it previously for things like
    create_flow_run
    and `wait_for_flow_run`; however, the
    wait_for_flow_run
    input arguments exactly match the output from
    create_flow_run
    . I’m trying to think of how tasks can be chained together where the tasks take mapped arguments from upstream but it’s not included in the direct parent’s output. For instance, for several tasks I to prepare/process/finalize something, I need the same configuration information for multiple thats was a result of a
    discover
    task; however, this configuration information is not returned all the way through. I’m curious what would be a good approach to something like this. If I have an original fan-out that say returns a list of 5 results that get mapped and tasks keep returning different lists of 5 results is it safe to pass these results downstream and match them up by index order?
    # pseudo code
    five_config_results = discover()
    
    five_state_results = prepare.map(five_config_results)
    
    five_process_results = process.map(
        five_state_results, config_results=five_config_results
    )
    
    five_finalize_results = finalize.map(
        five_process_results,
        five_state_results=five_state_results,
        config_results=five_config_results,
    )
    Or would another approach be needed to capture the matching results from each task to give to others?
    k
    14 replies · 2 participants
  • l

    Leon Kozlowski

    02/11/2022, 5:48 PM
    Is there any way to preform a rollback on a flow (to a previous version) using the GraphQL api?
    k
    1 reply · 2 participants
  • v

    Vipul

    02/11/2022, 5:52 PM
    Quick Question on Orion, after the recent upgrade to 2.0a10, I have seen that Orion flows work fine whenever I run but, I don't see anything on the dashboard 127.0.0.1:9090. Not sure if I am doing something wrong, anyone?
    k
    17 replies · 2 participants
  • t

    Tom Shaffner

    02/11/2022, 6:04 PM
    Ever since one of the recent updates (either 0.15.12 or 0.15.13, I'm not sure which), the machine running the server seems to accumulate zombie processes at a stunning rate. It ends up with hundreds in a few hours, thousands in a day or two. I thought I'd seen an issue related to this at some point in the Github issues list last week saying the issue was being worked, but I can't seem to find it anymore. Is this still a known issue being tracked somewhere? Or is there a fix for it?
    m
    3 replies · 2 participants
  • a

    Adam Roderick

    02/11/2022, 6:56 PM
    We are using S3Result stores and configure our flow's storage in the constructor like this:
    result=3Result(bucket=s3_results_bucket)
    The flow's output looks like it is using a default template something like
    YYYY/MM/DD/**.prefect_result
    . I would like to reuse this bucket across flows and environments, with a template something like
    {environment}/{flow_name}/YYYY/MM/DD/**.prefect_result
    How can I accomplish this?
    k
    2 replies · 2 participants
  • h

    Hugo Kitano

    02/11/2022, 7:42 PM
    whats the best way to get the most recent failed flow runs for a specific flow in Python?
    k
    3 replies · 2 participants
  • g

    Gabriel Gazola Milan

    02/11/2022, 8:44 PM
    Hi all! I've developed a Flow that has multiple Clocks on its schedule. Each Clock has its own parameters for the Flow and labels. I was wondering what would be the best way of easily identifying runs for each of the Clocks on the UI. I found this closed PR on the github repo that would help a lot: https://github.com/PrefectHQ/prefect/pull/3664 A colleague has suggested refactoring the Flow as a Task and then declare multiple flows with different names, but this doesn't sound right... Do you have any recommendations on this?
    k
    6 replies · 2 participants
  • t

    Tamas Szuromi

    02/12/2022, 11:32 PM
    How can I use *`prefect.utilities.gcp.get_storage_client` for
    Flow(...*storage=GCS(bucket=...
    ? I'd create a client myself and without using
    GOOGLE_APPLICATION_CREDENTIALS
    . Can someone point me to the right direction?
    k
    2 replies · 2 participants
  • y

    Yongchan Hong

    02/13/2022, 1:38 PM
    Hi I am totally new to Prefect and trying to figure things out. I have successfully deployed Prefect Server using helm with my own ingress host. Now I am trying to register flow in my own notebook and wondering how to connect to my own server. Can someone guide me?
    k
    r
    38 replies · 3 participants
  • a

    akshay shenoy

    02/13/2022, 6:37 PM
    Hey all! I am looking to add persistence to the prefect server, I went through the documentation and found out information for using a local file mount volume or postgreSQL, However I wanted to know if prefect server supports mysql as a backend DB for persistence, If yes I would appreciate it if somebody could point me to the documentation for it. Thanks!
    m
    3 replies · 2 participants
  • f

    Farid

    02/14/2022, 3:54 AM
    Hi, Can anyone point me towards the right direction on how I can setup access for the Kubernetes Agent to read/write Results to a S3 bucket using a AWS iam roles? I have used Kube deployment and RBAC to setup the agent and I am not using a service account atm.
    k
    1 reply · 2 participants
  • s

    Stéphan Taljaard

    02/14/2022, 5:22 AM
    Hi Why does this GQL query return "empty results"? I expect there to be only one, since the query filters the flow runs by id?
    query {
      flow(where: { name: { _ilike: "Energy Insight" } }) {
        flow_runs(where: {name: {_eq: "stalwart-ostrich"}}) {
          name
          id
        } 
      }
    ->
    {
      "data": {
        "flow": [
          {
            "flow_runs": []
          },
          {
            "flow_runs": []
          },
          {
            "flow_runs": []
          },
          {
            "flow_runs": []
          },
          {
            "flow_runs": []
          },
          {
            "flow_runs": [
              {
                "name": "stalwart-ostrich",
                "id": "8bca3ab2-05cc-4223-93f0-071813528545"
              }
            ]
          }
        ]
      }
    }
    k
    2 replies · 2 participants
  • n

    Noam polak

    02/14/2022, 8:07 AM
    Hey everyone I have a flows that triggers other child flows and I want to notify slack when one of them fails- this is my ( logic) code:
    child_flow = create_flow_run(
                flow_name=CHILD_FLOW,
                parameters={
                    "input_data": input_data,
                    "run_id": run_id,
                },
                project_name="default",
            )
    child_result = get_task_run_result(
                child_flow,
                task_slug="child_flow-copy",
                poll_time=3,
            )
    I tried to add handler to get_task_run_result but it get error:
    child_result = get_task_run_result(
                hazarder_flow,
                task_slug="child_flow-copy",
                poll_time=3,
                state_handlers=[post_to_slack_task_handler],
            )
    
    TypeError: got an unexpected keyword argument 'state_handler'
    So how can I do it? thanks
    k
    7 replies · 2 participants
  • m

    massumo

    02/14/2022, 9:09 AM
    Hello fellows, i want to create child flows. For example i have a task that returns list. I want to create child flow to handle these every single elements. Can i do that?
    e
    1 reply · 2 participants
  • m

    massumo

    02/14/2022, 9:10 AM
    I should have n+1 flows.
  • a

    Alexis Lucido

    02/14/2022, 10:58 AM
    Hi all! I am running a Shell Task to kill zombie processes generated by one of my other tasks. The flow was functioning properly until 2 weeks ago but I cannot figure why it stopped working. Here is the associated flow: kill_geckodriver_task = ShellTask( log_stderr=True, return_all=True, stream_output=True) with Flow('kill_geckodriver', schedule=schedules_prefect[ 'kill_geckodriver']) as kill_geckodriver: kill_geckodriver_task(command='source {}'.format(os.path.join( os.environ.get('BASH_SCRIPTS_FOLDER'), 'kill_geckodriver.sh'))) The bash script is below: __ pkill geckodriver pkill firefox I can run the flow when the bash script only echoes a string, so the bug is not due to the flow or the path passed with an environment variable. I guess the problem lies in the sudo rights needed to run the "pkill" command. I have been trying to replace the current script with the following lines (replacing password with the user password), but with no success so far: __ # export HISTIGNORE='*sudo -S*' # to be added in production to avoid logging passwords echo "<password>" | sudo - S pkill geckodriver echo "<password>" | sudo -S pkill firefox Unfortunately, the flow still raises an error, and I cannot figure why. I have been trying to log it with the "log_stderr=True, return_all=True, stream_output=True" kwargs of the ShellTask but the only logs I have are joined as a screenshot. Any thoughts about it? The problem is probably password-linked, but I cannot seem to find an appropriate solution. Thanks a lot in advance!
    k
    21 replies · 2 participants
  • f

    Frederick Thomas

    02/14/2022, 3:46 PM
    Greetings Everyone, I'm attempting to use
    prefect.Client
    to query the GraphQL endpoint, however, I am getting errors that have me stumped. The relevant code:
    import prefect
    import pandas as pd
    
    client = prefect.Client()
    
    client.graphql("""
        query {
            agent{
              flow_runs(limit:5, where:{state:{_eq: "Success"}start_time:{_gt:"2022-02-14"}}){
                start_time
                end_time
                state
                flow{
                  name
                  id
                  tasks_aggregate{
                    aggregate{
                      count
                    }
                  }
                }
          		
              }
              
            }
    
        } """)
    The errors:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 465, in _request
        json_resp = response.json()
      File "/usr/local/lib/python3.8/site-packages/requests/models.py", line 898, in json
        return complexjson.loads(self.text, **kwargs)
      File "/usr/local/lib/python3.8/json/__init__.py", line 357, in loads
        return _default_decoder.decode(s)
      File "/usr/local/lib/python3.8/json/decoder.py", line 337, in decode
        obj, end = self.raw_decode(s, idx=_w(s, 0).end())
      File "/usr/local/lib/python3.8/json/decoder.py", line 355, in raw_decode
        raise JSONDecodeError("Expecting value", s, err.value) from None
    json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
        return _run_code(code, main_globals, None,
      File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
        exec(code, run_globals)
      File "/workspaces/prefect/ETL/CDNY/GraphQL.py", line 7, in <module>
        client.graphql("""
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 298, in graphql
        result = <http://self.post|self.post>(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 213, in post
        response = self._request(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 468, in _request
        raise ClientError(
    prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
    We're using Prefect core as our backend at the moment and the documentation for this is sparse when I google. Thanks
    k
    s
    24 replies · 3 participants
  • a

    Austin Vecchio

    02/14/2022, 4:28 PM
    I know for an ECS task, it is possible to specify the amount of RAM/CPU that is available to each task. Would it be possible to set these same constraints on individual tasks?
    k
    5 replies · 2 participants
  • r

    Richard Hughes

    02/14/2022, 7:15 PM
    Hi Community - I am using
    croniter
    to calc cron schedules inside of the prefect flow and registering on the cloud instance. When I kick of my flow I want to understand what is the last cron time schedule that was ran. These cron values I have placed them into a dictionary that has these times for a handful of different jobs. Then, I was to use this as a parameters to some code I am running. For Example:
    croniter(SchemaNames[key], datetime.now()).get_prev(datetime).strftime("%m/%d/%Y %H:%M:%S")
    For whatever reason the datetime.now() function is recalling the datetime that might be corresponding to the datetime when the flow was actually registered and not actually the runtime datetime.now(). Any thoughts how to achieve the results I am looking or guidance from this point? Much appreciated. p.s. I would assume I could use the mquery api and extract the last schedule start times for these parameters but, I thought this approach was easier.
    k
    4 replies · 2 participants
  • a

    Aric Huang

    02/14/2022, 8:33 PM
    Is it expected that a
    Result
    using a specific serializer (e.g.
    PandasSerializer
    ) would use the same serializer when loading the result using
    prefect.tasks.prefect.get_task_run_result
    ? I have a task that uses the following task decorator:
    @task(slug="output", result=GCSResult("<path>", serializer=PandasSerializer(file_type="parquet")), checkpoint=True)
    When I try to get the result by doing:
    result = get_task_run_result.run(flow_id, "output-copy", poll_time=5)
    I get an error that seems to indicate it's trying to use
    cloudpickle
    instead of Pandas:
    File "/usr/local/lib/python3.8/dist-packages/prefect/engine/serializers.py", line 86, in deserialize
        return cloudpickle.loads(value)
    _pickle.UnpicklingError: A load persistent id instruction was encountered,
    but no persistent_load function was specified.
    Is there a way to have
    get_task_run_result
    use a specific serializer?
    m
    7 replies · 2 participants
  • p

    Peter Peter

    02/14/2022, 9:42 PM
    Hello, Do all mapped tasks have to be completed before dependant mapped tasks can be run. Let's say you have a flow that has these tasks A -> B-> C. Tasks B and C are mapped tasks. Is it possible for B to execute right after an individual run of A? Before starting another execution of A Say Task A adds 10 to input and print the value Task B Adds 100 to the input and prints the value. the expected output for the input of task A [1,2,3] would be: 11 111 12 112 13 113 I know that the actual output could be in any order.
    k
    2 replies · 2 participants
Powered by Linen
Title
p

Peter Peter

02/14/2022, 9:42 PM
Hello, Do all mapped tasks have to be completed before dependant mapped tasks can be run. Let's say you have a flow that has these tasks A -> B-> C. Tasks B and C are mapped tasks. Is it possible for B to execute right after an individual run of A? Before starting another execution of A Say Task A adds 10 to input and print the value Task B Adds 100 to the input and prints the value. the expected output for the input of task A [1,2,3] would be: 11 111 12 112 13 113 I know that the actual output could be in any order.
k

Kevin Kho

02/14/2022, 9:43 PM
The DaskExecutor prefers depth-first execution, but it can’t be forced that way. On LocalExecutor, it is single threaded so there really isn’t a difference
p

Peter Peter

02/15/2022, 11:03 AM
Thanks I will try to see if I can achieve those results with the DaskExecutor.
View count: 6