https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Rodrigo Neves

    11/12/2020, 3:51 PM
    There are any resources/examples about a ETL pipeline for live data ingestion reading from a Queue and for each new event it creates a new flow that is executed by a central dask cluster with auto scaling? (actually this is a good use case for prefect? or should we stick with batch jobs?)
    d
    • 2
    • 3
  • m

    Mac Gréco Péralte Chéry

    11/12/2020, 3:54 PM
    Hi Everyone, I have a flow that have mapped tasks that use DaskCloudProvider Environnement and i have set the number of workers to 4. For some reasons only one worker is created. Here is the code:
    flow.environment = DaskCloudProviderEnvironment(
        provider_class=FargateCluster,
        # task_role_arn="arn:aws:iam::497427061914:role/ecsTaskRole",
        execution_role_arn="arn:aws:iam::497427061914:role/ecsTaskRole",
        n_workers=4,
        scheduler_cpu=256,
        scheduler_mem=512,
        worker_cpu=512,
        worker_mem=1024,
        labels=["fargate"]
    )
    d
    • 2
    • 3
  • j

    Joseph Haaga

    11/12/2020, 4:02 PM
    I’m working on a Python utility to automate the registration/deployment of Flows for our Prefect users; I noticed that the Dockerfile generated by
    DockerStorage.create_dockerfile_object(directory)
    contains a COPY statement to add
    healthcheck.py
    to the image, but uses an absolute path (which
    docker build
    treats as a relative path from the build context)
    COPY /absolute/path/on/my/machine/healthcheck.py /opt/prefect/healthcheck.py
    This causes the following error:
    Step 6/16 : COPY /absolute/path/on/my/machine/healthcheck.py /opt/prefect/healthcheck.py
    COPY failed: stat /var/lib/docker/tmp/docker-builder852791584/absolute/path/on/my/machine/healthcheck.py: no such file or directory
    However, this works fine when I manually update the Dockerfile to use a relative path to
    healthcheck.py
    Would this constitute an issue? Or am I circumventing an intentional design decision?
    d
    • 2
    • 1
  • b

    bral

    11/12/2020, 4:03 PM
    Hi all! Is there possibility looping of flow ? We have long running task
    z
    d
    • 3
    • 9
  • z

    Zach

    11/12/2020, 4:33 PM
    Are there restrictions on what I can store in an environment variable? I am having a flow registration fail because of an environment variable. The environment variable looks like this:
    "RANDOM_TOOL_VERSION": ": Guppy Basecalling Software, (C) Oxford Nanopore Technologies, Limited. Version 4.2.2+effbaf84"
    Here is a sample flow registration that is similar to what I am doing: (The error I was getting is in a comment at the bottom of the file)
    sample_flow_registration.py
    j
    m
    • 3
    • 4
  • a

    ale

    11/12/2020, 4:35 PM
    Hey folks, we’re interested in using the brand new ECS Agent. However, it is not completely clear how we should setup the RunConfig. Is this something we have to setup at flow or flow environment level?
    j
    • 2
    • 7
  • n

    Nicolas Bigaouette

    11/12/2020, 6:48 PM
    Hi all! We'd like to use prefect to handle some workflows in our webapp which gets deployed in kubernetes. For reliability, our webapp uses the k8s
    replica
    setting. In addition, the app is behind guvicorn. All this means that we have multiple of our backends running concurrently. As such, if we register a flow in each backends prefect will receive multiple register request for the same flow, which is obviously wrong... How should I handle this use case? How can I have multiple instances of my application that use the same flow? We though of performing a search for the flow and creating it if not present. But then the flow's name (or anything that is used to perform the search) would become the unique key to identify a flow. From what I'm reading about prefect a flow name is not the unique key to identify a flow. Any idea? Thanks!!
    d
    j
    • 3
    • 17
  • p

    Pedro Machado

    11/12/2020, 9:14 PM
    Hi there. I am under the impression that prefect used to leverage the docker cache when building/registering a flow but it is now installing all the dependencies every time even if there has only been a small change in the flow unrelated to the dependencies. Did anything related to this change since ~ 0.11 or so? I think that's the first version I used and am now on 0.13.11
    l
    d
    • 3
    • 3
  • l

    Luke Orland

    11/12/2020, 9:30 PM
    Hi, is there a
    Serializer
    that serializes
    str
    ?
    c
    m
    • 3
    • 11
  • r

    Riley Hun

    11/13/2020, 12:30 AM
    Hi all, When I try to build my dockerfile w/ the prefect flow and push to GCR using, it fails at
    storage.build()
    . I get the following error:
    InterruptedError: unauthorized: You don't have the needed permissions to perform this operation, and you may have invalid credentials. To authenticate your request, follow the steps in: <https://cloud.google.com/container-registry/docs/advanced-authentication>
    I don't think this is a permissions issue to the GCR images because I tried building the docker file and pushing it using gcloud commands, and it worked successfully. The flow also failed on my prefect core server deployed on GCP with the same kind of authentication error:
    m
    • 2
    • 4
  • p

    Pedro Machado

    11/13/2020, 2:49 AM
    Hi there. I need to schedule a flow to run the first Wednesday of every month. I tried the approach outlined here and then ran into the same issue described in the thread. Is it possible to do this with the existing filters? I don't understand the security implications of allowing arbitrary callbacks for functions but it seems like a good feature to have.
    c
    • 2
    • 3
  • m

    M Taufik

    11/13/2020, 4:03 AM
    Hi I hope you are doing well, is there anyone have successful a build prefect server in k8s cluster also with k8s agent? could you help share link/github to deploy in k8s cluster with production ready? Thank you
    👀 1
    j
    m
    • 3
    • 41
  • p

    Pedro Machado

    11/13/2020, 6:21 AM
    I am trying to use
    idempotency_key=flow.serialized_hash()
    when registering a flow. The hash is changing even when nothing has changed in the flow. Any ideas?
    n
    • 2
    • 2
  • j

    Joël Luijmes

    11/13/2020, 8:25 AM
    When starting a prefect server locally (or in cluster), it asks to create a tenant. What exactly is the concept of tenant here? I cant find any docs on the subject. Does it matter I create a tenant from my local machine, although prefect is running at kubernetes?
    n
    m
    • 3
    • 7
  • s

    Steven Hamblin

    11/13/2020, 9:20 AM
    Hi all, apologies for the vagueness of this, but I’m getting a bit of a heisenbug in a flow I’m running on `DaskKubernetesEnvironment`:
    n
    • 2
    • 9
  • s

    simone

    11/13/2020, 2:37 PM
    Hi I have been successfully running a flow locally. I shut down the server in the evening and when I started working again in the morning  when I deployed the same flow I got  a 
    Failed to load and execute Flow's environment: ModuleNotFoundError(
    I followed the instruction seen in other slacks discussion on the error and restarted the Agent pointing to the path to my package p`refect agent local start --api http://172.19.0.6:4200 -p /home/simone/tmp_code/pysmFISH_auto -p /home/simone/tmp_code/pysmFISH_auto/pysmFISH --show-flow-logs`  This seems to solve the module error but the flow doesn’t run. Looking at the logs from the Agent is because 
    ModuleNotFoundError: No module named 'numpy'
     , numpy is install in the env. If I restart the agent adding the path to the conda env I am using 
    prefect agent local start --api <http://172.19.0.6:4200> -p /home/simone/tmp_code/pysmFISH_auto -p /home/simone/tmp_code/pysmFISH_auto/pysmFISH -p /home/simone/miniconda3/envs/prefect_dev  --show-flow-logs
      I get again the same initial error of missing module. I really cannot understand what is going on. Any help is really appreciated! thanks! Here is the flow I have been running Flow GIST ------UPDATE--------- It looks like that the agent cannot load the flow from the local directory. The local directory is there and contains the flow. The error persist even if I change the directory where to save the flows
    n
    • 2
    • 6
  • b

    Ben Fogelson

    11/13/2020, 7:55 PM
    Gauging interest in a feature: A pattern we tend to encounter is where we have a task that we call to add to a flow, and we’d like to make
    Parameters
    for some of its call method arguments. This can lead to a lot of boilerplate:
    from prefect import Flow, Parameter, task
    
    @task
    def task_with_many_args(a, b, c, d, e=None, f=5, g='foo'):
        pass
    
    with Flow('flow') as flow:
        a = some_other_task()
    
        task_with_many_args(
            a=a,
            b=Parameter('task_with_many_args.b'),
            c=Parameter('task_with_many_args.c'),
            d=Parameter('task_with_many_args.d'),
            e=Parameter('task_with_many_args.e', default=None),
            f=Parameter('task_with_many_args.f', default=5),
            g=Parameter('task_with_many_args.g', default='foo'),
        )
    This comes up often enough that we’ve written a helper function to populate these parameters by inspecting the task’s call signature.
    with Flow('flow') as flow:
        a = some_other_task()
    
        call_and_populate_parameters(task_with_many_args, a=a)
    These two examples produce identical flows. What I’m wondering from the Prefect team is whether there’d be interest in incorporating some sort of
    populate_paramaters
    kwarg directly into the
    Task.__call__
    method. Happy to take a stab at implementing it myself, but wanted to check before I put in the effort.
    n
    • 2
    • 4
  • n

    Newskooler

    11/13/2020, 8:21 PM
    Hi 👋 , I am running a very straight forward Flow and I got this error:
    ValueError: Could not infer an active Flow context.
    I find it quite cryptic. Can anyone please shed some light on what my issue may be? I have no clue right now : /
    n
    • 2
    • 14
  • j

    Joseph Solomon

    11/13/2020, 9:06 PM
    Hi. I was wondering if there is an example of running a flow with docker tasks on a fargate agent. I can get the docker tasks to run locally, and I can get non-docker tasks to run on fargate, but not both. (I can’t even get docker tasks to run on a local docker agent)
    n
    s
    • 3
    • 14
  • d

    DJ Erraballi

    11/14/2020, 9:57 AM
    Prefect client being dependent on prefect.core is actually causing some problems for some of our use cases, places where we would like to trigger flows that are external to prefect runtime may have dependency conflicts, for now just rewriting needed client methods so can execute with a minimal dependency tree
    c
    • 2
    • 1
  • d

    DJ Erraballi

    11/14/2020, 9:57 AM
    9
  • d

    DJ Erraballi

    11/14/2020, 9:58 AM
    (ex. as we migrate away from airflow triggering a prefect flow from airflow is impossible since prefect and airflow cannot be installed toghter (pendelum conflicts)
  • m

    Marwan Sarieddine

    11/14/2020, 4:18 PM
    Hi folks, I have a general question about setting up "flow-to-flow" flows (https://docs.prefect.io/core/idioms/flow-to-flow.html) - How does one pass parameters and context to each individual flow when calling
    client.create_flow_run
    which is how we programmatically run flows ...
    • 1
    • 2
  • b

    bral

    11/14/2020, 6:51 PM
    Hello! Can CASE be used with Parameter? I need, if the parameter was passed, then perform some tasks, otherwise others
  • f

    Felix Vemmer

    11/14/2020, 10:56 PM
    Hi everyone, I am using the Prefect Cloud server and trying to run from there a local agent. I authenticated in the CLI and also switched server backend to cloud. 1. I first execute my script by running
    python3 get_medium_stats.py
    in the terminal which executes these parts in the script…
    flow.storage = Local(
    path=os.path.abspath(python_file),
    stored_as_script=True
    )
    flow.register(project_name="Social Media Automation")
    flow.run_agent()
    As expected I can see that the agent is running in the terminal. Checkin the cloud I can also confirm that the local agent is running:
    AGENT ID: 1132a46f-8016-4880-b028-6616487a1785
    Finally, also running
    prefect get flows
    shows the flow that I want to run:
    fa7eb2ff-1220-44bd-b302-14fe7f8e3a22  Social Media Automation
    However, the flow is stuck in submission and doesn’t execute. However I noticed when I stop the script/local agent in the terminal with
    ctrl + c
    at timestamp
    [2020-11-14 22:49:50,984] INFO - agent | Keyboard Interrupt received: Agent is shutting down.
    I can see in the logs in the cloud that the process now kicks off
    23:49:52 Beginning Flow run for 'Medium Stats Scraper'
    . So it seems that something is blocking the execution 🧐 I would very much appreciate any help, thanks!
    c
    • 2
    • 1
  • m

    Michael Hadorn

    11/16/2020, 8:31 AM
    Hi there Have a question about the use of the dask executor with own python modules. If I have to use own python modules, i got this exception:
    [2020-11-16 09:26:55+0100] ERROR - prefect.Load | Unexpected error occured in FlowRunner: ModuleNotFoundError("No module named 'xxx'")
    I think I have to upload all my necessary files via:
    client.upload_file('xxx.py')
    (see https://distributed.dask.org/en/latest/api.html#distributed.Client.upload_file) But how can i access the dask client? Here the client would be available while the run: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/executors/dask.py#L257 But directly after, it's cleared again. Does anybody have a hint for me? Regards Michael
    m
    • 2
    • 8
  • r

    Riley Hun

    11/17/2020, 12:56 AM
    Hello everyone, May I please kindly request some assistance debugging a flow I've registered to the Prefect Core UI I have hosted on GCP? It seems to work fine when I run the flow on my local machine against the remote Dask Cluster. But when I register the flow, it fails and the error in the logs isn't sufficiently detailed enough. Thanks in advance!
    k
    • 2
    • 12
  • p

    Pedro Machado

    11/17/2020, 1:55 AM
    Hi is it possible to run a flow locally with the docker agent without first registering it with a backend? Looking for a way to test the docker image locally.
    k
    • 2
    • 2
  • m

    Michelle Wu

    11/17/2020, 7:59 AM
    Hi, I've been trying to build a local Dask cluster to run my flows. Everything worked fine when I ran the flows locally:
    executor = DaskExecutor(address="<tcp://xxx>")
    flow.run(executor = executor)
    But when I registered the flows to run it in Prefect UI, they stopped running on my local Dask cluster. Neither scheduler or workers showed any reactions in terminal or in Bokeh. I wonder why this is happening?
    e
    • 2
    • 4
  • a

    Alexander

    11/17/2020, 3:29 PM
    Flow is in running state for 5 hours, but all tasks were finished 4 hours ago
    Flow run RUNNING: terminal tasks are incomplete.
    But all tasks within flow are
    success
    . How can i debug this?
    j
    • 2
    • 3
Powered by Linen
Title
a

Alexander

11/17/2020, 3:29 PM
Flow is in running state for 5 hours, but all tasks were finished 4 hours ago
Flow run RUNNING: terminal tasks are incomplete.
But all tasks within flow are
success
. How can i debug this?
j

Jenny

11/17/2020, 3:34 PM
Hi Alexandr - is this just one flow run or every run for a particular flow? Some things I would check if trying to debug: • Are there any clues in the logs? • Is your agent still up and running? • If you've just seen this once, can you re-create with another flow run?
a

Alexander

11/17/2020, 3:49 PM
This is only one, particular flow run. Nothing special about it, only thing it was restarted to re-run failed flows. Agent is fine, and there are no flow runner containers (i use docker storage) running. I checked logs, and i see something strange there. First, i see traceback from prefect. Then i found that the flow was recheduled by lazarus. And then i found that for some reason it was scheduling a task which is not a part of current flow run flow version! The task was added in the next flow version. I cant see it in flow run task list or flow run schematic. Attaching a piece of recent logs
logs.txt
j

Jenny

11/17/2020, 4:05 PM
Ah yes - that restart would be the issue - You can only run the live version of a flow. That's true for restarting a flow run too. I'll add a ticket to update the UI to make that clearer.
View count: 1