• Greg Roche

    Greg Roche

    2 years ago
    Hi folks, quick question about the Prefect agent and virtual environments. I have a machine with two registered flows, each flow runs inside its own virtual environment (I'm using
    venv
    ). If I have two separate agents running on the machine, each started from within their own venv, they can both execute their own flows successfully. As soon as I try to cut it down to one agent listening for both flows, the agent fails to execute any flows which live inside another venv (
    Failed to load and execute Flow's environment: ModuleNotFoundError
    ). I'd like to keep the separated venvs if possible and just have one agent running, which would execute each of the flows within the context of their own venv. Is this possible, or would I need to bite the bullet and have one venv for both flows?
    Greg Roche
    rmax
    2 replies
    Copy to Clipboard
  • n

    Nuno

    2 years ago
    Hello everyone, I’m trying to have an oriented approach to Flows and Tasks. The idea is to inherit specific flow types that already have some methods as tasks. Only overriding them if necessary. I realize that the decorator
    task
    doesn’t seem to work for class properties. Here is the error:
    File "/Users/nuno/Developer/Data-Framework/data-prefect/data_prefect/utils/flows.py", line 67, in factory_flow
        flow.fetch()
      File "/Users/nuno/Developer/Data-Framework/data-prefect/.venv/lib/python3.8/site-packages/prefect/core/task.py", line 470, in __call__
        new.bind(
      File "/Users/nuno/Developer/Data-Framework/data-prefect/.venv/lib/python3.8/site-packages/prefect/core/task.py", line 511, in bind
        callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
      File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/inspect.py", line 3025, in bind
        return self._bind(args, kwargs)
      File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/inspect.py", line 2940, in _bind
        raise TypeError(msg) from None
    TypeError: missing a required argument: 'self'
    It seems that I cannot pass the method “self” argument. Do you guys have any suggestion? Thank you in advance.
    n
    Kyle Moon-Wright
    6 replies
    Copy to Clipboard
  • sark

    sark

    2 years ago
    i am dynamically scheduling a flow run (to start not immediately but in the future) from within a flow and am polling the prefect AI to wait for its completion
    def get_flow_run_state(client, flow_run_id):
        q = parse_graphql(
                {'query': {
                    with_args('flow_run', 
                        {'where': {'id': {'_eq': flow_run_id}}}): {
                            'state'
                            }
                        }
                        })
        state = client.graphql(q).data.flow_run[0].state
        return state
    
    def wait_flow_complete(flow_run_id):
        client = Client()
    
        state = None
        while state != 'Success':
            sleep(10)
            state = get_flow_run_state(client, flow_run_id)
    question: is it possible to avoid the polling and achieve the same thing?
    sark
    j
    +1
    5 replies
    Copy to Clipboard
  • Nuno Silva

    Nuno Silva

    2 years ago
    Hi. When using
    DaskKubernetesEnvironment
    how to setup the k8s namespace in which we want the job to run in the cluster? I'm using
    scheduler_spec_file/worker_spec_file
    and in the yaml files I set the
    metadata:namespace: <name>
    . But for the job itself it starts in the
    default
    namespace
    Nuno Silva
    j
    3 replies
    Copy to Clipboard
  • Nuno Silva

    Nuno Silva

    2 years ago
    Another question of DaskKubernetesEnvironment: I'm saving the images in azure container registry. I've created an
    image_pull_secret
    and I've tried with setting it in the custom scheduler/worker yaml files and also as an argument in
    DaskKubernetesEnvironment
    . Both fail. Looking the cluter error it gives is:
    Failed to pull image "<image_url>": rpc error: code = Unknown desc = Error response from daemon: Get <image_url>: unauthorized: authentication required, visit <https://aka.ms/acr/authorization> for more information.
    Looking into the yaml of the job submitted to the cluster (Pods->Actions->Edit) it is clear that there is no field as bellow:
    imagePullSecrets:
        - name: regcred
    Is it a bug or I'm doing something wrong? Thank you
    Nuno Silva
    j
    3 replies
    Copy to Clipboard
  • m

    Marek Nguyen

    2 years ago
    Hi everyone, might overthink something but when I pulled the docker image how do I start the server? This is what I get as a response: FileNotFoundError: [Errno 2] No such file or directory: 'docker-compose': 'docker-compose'. Cheers, Marek
    m
    Kyle Moon-Wright
    2 replies
    Copy to Clipboard
  • Johnny

    Johnny

    2 years ago
    @Anna Geller (old account) Thanks for your setup guide on AWS EKS + prefect setup, very helpful. Others should check it out too: https://towardsdatascience.com/distributed-data-pipelines-made-easy-with-aws-eks-and-prefect-106984923b30
  • e

    Eric

    2 years ago
    Hi all, trying to test out Dask w/ Prefect, getting this error when trying to run flow.run(executor=DaskExecutor()):
    e
    Kyle Moon-Wright
    6 replies
    Copy to Clipboard
  • Pedro Machado

    Pedro Machado

    2 years ago
    Hi there. I am working on a flow that depends on a
    start_date
    and
    end_date
    . These can come from two
    Parameters
    or, if the parameters are not provided, the dates are computed after applying some logic to
    prefect.context.scheduled_start_time
    . I'd like to use the computed start and end dates for templating the task result's location. If these dates are passed as task inputs, they are accesible (I can do this
    location="{start_date}_{end_date}.txt"
    ) but I'd also like to use these variables in some downstream tasks that don't list them as inputs. Is there another way for a downstream task to access these? Since they don't always come from a Parameter, they are not available in
    prefect.context.paramters
    . I tried adding them to the context at run time with
    prefect.context["start_date"] = start_dt
    but modifying the context like this doesn't feel right. Any suggestions?
    Pedro Machado
    nicholas
    2 replies
    Copy to Clipboard