https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    sark

    09/21/2020, 9:21 AM
    Traceback (most recent call last):
      File "daily.py", line 95, in <module>
        with Flow('start dataflow job 1', executor=LocalDaskExecutor(), storage=GCS('ghpr-dev-prefect-flows')) as startFlow1:
    TypeError: __init__() got an unexpected keyword argument 'executor'
    (prefect) bash-3.2$ prefect version
    0.13.7
    hmm i am unable to set the executor for a flow like this https://docs.prefect.io/api/latest/core/flow.html#flow-2 says that it does have the
    executor
    parameter though i am trying to increase parallelism for my flows
    • 1
    • 2
  • g

    Greg Roche

    09/21/2020, 9:27 AM
    Hi folks, quick question about the Prefect agent and virtual environments. I have a machine with two registered flows, each flow runs inside its own virtual environment (I'm using
    venv
    ). If I have two separate agents running on the machine, each started from within their own venv, they can both execute their own flows successfully. As soon as I try to cut it down to one agent listening for both flows, the agent fails to execute any flows which live inside another venv (
    Failed to load and execute Flow's environment: ModuleNotFoundError
    ). I'd like to keep the separated venvs if possible and just have one agent running, which would execute each of the flows within the context of their own venv. Is this possible, or would I need to bite the bullet and have one venv for both flows?
    r
    • 2
    • 2
  • n

    Nuno

    09/21/2020, 9:44 AM
    Hello everyone, I’m trying to have an oriented approach to Flows and Tasks. The idea is to inherit specific flow types that already have some methods as tasks. Only overriding them if necessary. I realize that the decorator
    task
    doesn’t seem to work for class properties. Here is the error:
    File "/Users/nuno/Developer/Data-Framework/data-prefect/data_prefect/utils/flows.py", line 67, in factory_flow
        flow.fetch()
      File "/Users/nuno/Developer/Data-Framework/data-prefect/.venv/lib/python3.8/site-packages/prefect/core/task.py", line 470, in __call__
        new.bind(
      File "/Users/nuno/Developer/Data-Framework/data-prefect/.venv/lib/python3.8/site-packages/prefect/core/task.py", line 511, in bind
        callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
      File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/inspect.py", line 3025, in bind
        return self._bind(args, kwargs)
      File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/inspect.py", line 2940, in _bind
        raise TypeError(msg) from None
    TypeError: missing a required argument: 'self'
    It seems that I cannot pass the method “self” argument. Do you guys have any suggestion? Thank you in advance.
    k
    • 2
    • 6
  • s

    sark

    09/21/2020, 10:55 AM
    i am dynamically scheduling a flow run (to start not immediately but in the future) from within a flow and am polling the prefect AI to wait for its completion
    def get_flow_run_state(client, flow_run_id):
        q = parse_graphql(
                {'query': {
                    with_args('flow_run', 
                        {'where': {'id': {'_eq': flow_run_id}}}): {
                            'state'
                            }
                        }
                        })
        state = client.graphql(q).data.flow_run[0].state
        return state
    
    def wait_flow_complete(flow_run_id):
        client = Client()
    
        state = None
        while state != 'Success':
            sleep(10)
            state = get_flow_run_state(client, flow_run_id)
    question: is it possible to avoid the polling and achieve the same thing?
    j
    n
    • 3
    • 5
  • n

    Nuno Silva

    09/21/2020, 12:07 PM
    Hi. When using
    DaskKubernetesEnvironment
    how to setup the k8s namespace in which we want the job to run in the cluster? I'm using
    scheduler_spec_file/worker_spec_file
    and in the yaml files I set the
    metadata:namespace: <name>
    . But for the job itself it starts in the
    default
    namespace
    j
    • 2
    • 3
  • n

    Nuno Silva

    09/21/2020, 1:08 PM
    Another question of `DaskKubernetesEnvironment`: I'm saving the images in azure container registry. I've created an
    image_pull_secret
    and I've tried with setting it in the custom scheduler/worker yaml files and also as an argument in
    DaskKubernetesEnvironment
    . Both fail. Looking the cluter error it gives is:
    Failed to pull image "<image_url>": rpc error: code = Unknown desc = Error response from daemon: Get <image_url>: unauthorized: authentication required, visit <https://aka.ms/acr/authorization> for more information.
    Looking into the yaml of the job submitted to the cluster (Pods->Actions->Edit) it is clear that there is no field as bellow:
    imagePullSecrets:
        - name: regcred
    Is it a bug or I'm doing something wrong? Thank you
    j
    • 2
    • 3
  • m

    Marek Nguyen

    09/21/2020, 2:02 PM
    Hi everyone, might overthink something but when I pulled the docker image how do I start the server? This is what I get as a response: FileNotFoundError: [Errno 2] No such file or directory: 'docker-compose': 'docker-compose'. Cheers, Marek
    k
    • 2
    • 2
  • j

    Johnny

    09/21/2020, 6:25 PM
    @Anna Geller (old account) Thanks for your setup guide on AWS EKS + prefect setup, very helpful. Others should check it out too: https://towardsdatascience.com/distributed-data-pipelines-made-easy-with-aws-eks-and-prefect-106984923b30
    👍 2
    :upvote: 8
    ❤️ 1
  • e

    Eric

    09/21/2020, 11:33 PM
    Hi all, trying to test out Dask w/ Prefect, getting this error when trying to run flow.run(executor=DaskExecutor()):
    k
    • 2
    • 6
  • p

    Pedro Machado

    09/22/2020, 3:29 AM
    Hi there. I am working on a flow that depends on a
    start_date
    and
    end_date
    . These can come from two
    Parameters
    or, if the parameters are not provided, the dates are computed after applying some logic to
    prefect.context.scheduled_start_time
    . I'd like to use the computed start and end dates for templating the task result's location. If these dates are passed as task inputs, they are accesible (I can do this
    location="{start_date}_{end_date}.txt"
    ) but I'd also like to use these variables in some downstream tasks that don't list them as inputs. Is there another way for a downstream task to access these? Since they don't always come from a Parameter, they are not available in
    prefect.context.paramters
    . I tried adding them to the context at run time with
    prefect.context["start_date"] = start_dt
    but modifying the context like this doesn't feel right. Any suggestions?
    n
    • 2
    • 2
  • p

    Pedro Machado

    09/22/2020, 4:04 AM
    Is there a way to use the
    StringFormatter
    task with a template that is read at run time?
    c
    • 2
    • 5
  • s

    sark

    09/22/2020, 8:53 AM
    hi guys, my prefect agent can run an already-manually-pulled image but cannot itself pull the image so i am trying to mount my gcloud (google container registry) credentials like this:
    $ prefect agent start docker -l gcs-flow-storage  --volume /var/run/docker.sock:/var/run/docker.sock  --volume ~/.config:/root/.config --volume ~/.docker:/root/.docker
    but i am still getting
    Unexpected error: ImageNotFound(HTTPError('404 Client Error: Not Found for url: <http+docker://localhost/v1.40/containers/create'>))
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/docker/api/client.py", line 259, in _raise_for_status
        response.raise_for_status()
      File "/usr/local/lib/python3.8/site-packages/requests/models.py", line 941, in raise_for_status
        raise HTTPError(http_error_msg, response=self)
    requests.exceptions.HTTPError: 404 Client Error: Not Found for url: <http+docker://localhost/v1.40/containers/create>
    j
    • 2
    • 10
  • n

    Nejc Vesel

    09/22/2020, 9:00 AM
    Hi everybody. I want the results of my tasks to be saved based on either the
    flow_run_id
    or
    flow_run_name
    (i.e.
    target='{flow_run_id}/foo'
    ). What I have trouble with is designing a method that works both when using Prefect Server and when using Prefect through the basic Python. For example, I am able to set the
    flow_run_id
    when running through the Python API with
    flow.run(flow_run_id=<my flow run id>)
    , however setting
    flow_run_id
    seems to be impossible when running through the Prefect server. Inversly, one can set
    flow_run_name
    when running through the Prefect Server but the concept of a
    flow_run_name
    doesn't seem to exist in the Python API. What would be the best way to resolve this issue? Thank you very much, Nejc
    j
    • 2
    • 1
  • s

    SImon Hawe

    09/22/2020, 9:18 AM
    Hi Guys. I have an issue on prefect cloud displaying the schematic of a massive flow with about 160 tasks (we do some parallel stuff here where we currently cannot use the map logic but single tasks). When trying to open the schematic, I see a spinner for a while before the thing crashes with Error5. Is this known that big flows cannot be visualized?
    j
    • 2
    • 2
  • l

    Lewis Bails

    09/22/2020, 9:54 AM
    I'm trying to run a flow using LocalEnvironment with DaskExecutor but the flow run never starts. It just keeps logging that the flow has entered the running state, and the Dask dashboard shows there's nothing running on the cluster. Has anyone ran into this problem before?
    flow.environment = LocalEnvironment(
                executor=DaskExecutor(
                    adapt_kwargs={
                        "minimum": 1,
                        "maximum": 5,
                    },
                ),
            )
    FWIW, I'm using prefect 0.13.2, dask 2.26.0, distributed 2.26.0
    j
    • 2
    • 5
  • o

    orcaman

    09/22/2020, 10:32 AM
    Hi guys. Does anyone know if the DAG can be accessed from within a task during runtime?
    j
    • 2
    • 3
  • m

    Mikael

    09/22/2020, 10:49 AM
    I have a flow that insert rows into a database. Sometimes a row or two are not inserted due to an error. Right now the flow terminates, but I want to continue running the flow but in the end set the flow run to failed or maybe to some kind of warning. Is that possible?
    j
    • 2
    • 2
  • v

    Vincent

    09/22/2020, 12:52 PM
    I am wondering if there is a mechanism in prefect for flow looping/mapping. Ie. I want to run a flow with N inputs and collect those outputs for a downstream task. I could write a for loop within the flow, but this creates a very wide DAG which becomes unwieldy after 1000 iterations.
    d
    j
    • 3
    • 4
  • g

    Greg Roche

    09/22/2020, 2:35 PM
    Hi folks, I'm having issues with ModuleNotFoundError when running a flow from a local agent. I have a project structure like this:
    |--project
       |-.venv
       |-etl
         |--__init__.py
         |--foo
         |  |--__init__.py
         |  |--foo_flow.py
         |--bar
         |  |--__init__.py
         |  |--bar_flow.py
         |--shared_utils
            |--__init__.py
            |--utils.py
    In
    foo_flow.py
    and
    bar_flow.py
    I import modules from
    shared_utils
    by appending the etl folder to sys.path before importing
    utils.py
    . When I run any of these flows with
    flow.run()
    they work fine. However, when I do
    flow.register()
    instead and then start a local agent, from any of the directories listed above, all flow runs initiated from the server to the agent fail instantly with
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'shared_utils'"
    . According to a stackoverflow answer this is because the agent's python path doesn't include
    project/etl
    but it still fails when I run the agent with
    --import-path "C:\project\etl"
    . I've tried registering the flow from every directory listed above, and tried starting the agent from every directory listed above, and also tried passing every directory listed above as an
    --import-project
    argument, and I get exactly the same error every time. Can anybody please point out what I'm missing?
    c
    s
    • 3
    • 8
  • p

    Pedro Machado

    09/22/2020, 3:15 PM
    Hi everyone. I am developing a simple flow that has to query a database, save a file as csv, run some checks and SFTP the file over to a remote server. The query takes a while to run so I am using a templated
    target
    and
    PandasSerializer
    to write the csv. I am wondering what the recommended pattern is for the downstream SFTP task to locate the csv. If I were not relying on the
    LocalResult
    to save the file, I'd simply return the path to the local file from the function that generates the csv. However, since I am using the
    Result
    mechanism, this task just returns a dataframe. Do I just have to rely on the templated
    target
    location for downstream tasks to find the csv file? In other words, do I have to use the same template and render it manually in the SFTP task?
    j
    • 2
    • 1
  • e

    EmGarr

    09/22/2020, 4:19 PM
    Hi everyone, I am currently running a dynamic job after a call from a db. I receive a configuration which allows me to know the number of steps to run. It looks like that: • build the config • for each part of the config run a step I have two implementations. The first one is mostly static while the second is more dynamic. What would be your recommended way? 1. static implementation
    import prefect
    from prefect import Flow, task
    
    @task
    def build_job():
        return {'steps': {'a': 'a', 'b': 'b', 'c': 'c'}}
    
    
    @task(log_stdout=True)
    def run_step(step):
        print(step)
    
    
    with Flow('test') as flow:
        config = build_job()
    
        tmp_task = None
        for step_name in ['a', 'b', 'c']:
            upstream_tasks = None if tmp_task is None else [tmp_task]
            tmp_task = run_step(
                config['steps'][step_name], upstream_tasks=upstream_tasks
            )
    
    flow.run()
    2. Use loop to be more dynamic
    import prefect
    from prefect.engine.signals import LOOP
    from prefect import Flow, task
    
    @task
    def build_job():
        return {'order': ['a', 'b', 'c'], 'steps': {'a': 'a', 'b': 'b', 'c': 'c'}}
    
    @task(log_stdout=True)
    def run_step(job_file):
        # we extract the accumulated task loop result from context
        loop_payload = prefect.context.get("task_loop_result", {})
        step_name = loop_payload.get("step_name", job_file['order'][0])
    
        step = job_file['steps'][step_name]
    
        print(step)
    
        pos = job_file['order'].index(step_name) + 1
    
        if pos < len(job_file['order']):
            next_step = job_file['order'][pos]
            raise LOOP(message=f"Fib {step_name}", result=dict(step_name=next_step))
    
    with Flow('test') as flow:
        config = build_job()
        run_step(config)
    Is there a plan to support something like ?
    with Flow('test') as flow:
        config = build_job()
    
        tmp_task = None
        for name in config['order']:
            config['steps'][name]
    Thanks !
    j
    • 2
    • 3
  • a

    abhilash.kr

    09/22/2020, 4:39 PM
    👋 @Marvin 👋
  • m

    Marvin

    09/22/2020, 4:39 PM
    This contest will all end in tears, I just know it. You're entered anyway, @abhilash.kr.
    a
    • 2
    • 2
  • k

    Kevin Weiler

    09/22/2020, 8:27 PM
    hi there! Does anyone know how to set this config item via environment variable for prefect server? (This is 0.12.6 - going to migrate soon, but not quite yet): https://github.com/PrefectHQ/prefect/blob/0.12.6/src/prefect/config.toml#L48
    j
    • 2
    • 4
  • e

    Eric

    09/22/2020, 9:12 PM
    hi! is it possible to configure the Github storage environment to use a container on ECR?
    j
    • 2
    • 5
  • b

    Berty

    09/22/2020, 10:49 PM
    👋 @Marvin
    👋 2
  • m

    Marvin

    09/22/2020, 10:49 PM
    This contest is the sort of thing you lifeforms enjoy, is it? I've entered you to win, @Berty.
  • s

    Spencer

    09/23/2020, 2:43 AM
    👋 @Marvin 🎉
    👋 1
  • m

    Marvin

    09/23/2020, 2:43 AM
    This contest will all end in tears, I just know it. You're entered anyway, @Spencer.
  • j

    josh

    09/23/2020, 11:23 AM
    Hey team, Prefect version 
    0.13.8
     has been released and here are a few notable changes: 🐳 Made Docker storage more flexible ⏰ Added more schedule filters (start of month, day of week) 👋 Deprecated
    /contrib
    and moved those tasks into the full task library 🤔 Fixed some perplexing issues A big thank you to our contributors who helped out with this release! Full changelog:
    Untitled
    👏 4
Powered by Linen
Title
j

josh

09/23/2020, 11:23 AM
Hey team, Prefect version 
0.13.8
 has been released and here are a few notable changes: 🐳 Made Docker storage more flexible ⏰ Added more schedule filters (start of month, day of week) 👋 Deprecated
/contrib
and moved those tasks into the full task library 🤔 Fixed some perplexing issues A big thank you to our contributors who helped out with this release! Full changelog:
Untitled
👏 4
View count: 1