https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • n

    Nabeel

    01/27/2021, 8:13 AM
    Hi all. My team has been trying to execute a flow in Azure storage. The flow registers to prefect server & can push to Azure blob storage. When we start a local agent it is able to pick up the flow and says deploying. However, it then crashes with the error ``Failed to load and execute Flow's environment: AttributeError("'NoneType' object has no attribute 'rstrip'")` The azure storage version is within the range specified by setup.py. Please if anyone has any idea what could be the possible issue it would be a huge help 🙌 Thanks so much! 😃
    a
    • 2
    • 3
  • v

    Vitaly Shulgin

    01/27/2021, 3:14 PM
    Hello Team, I created flow, which schedules other flows, using
    StartFlowRun
    , when I run it locally, everything is working fine, but, when it executed in k8s container, which is running by schedule it fails
    ✅ 1
    j
    • 2
    • 10
  • j

    Josh Greenhalgh

    01/27/2021, 3:40 PM
    Does anyone know how to specify the prefect extras I need in the Docker storage? I am getting the following build error;
    System Version check: OK
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/tasks/gcp/__init__.py", line 14, in <module>
        from prefect.tasks.gcp.bigquery import (
      File "/usr/local/lib/python3.7/site-packages/prefect/tasks/gcp/bigquery.py", line 4, in <module>
        from google.cloud import bigquery
    ImportError: cannot import name 'bigquery' from 'google.cloud' (unknown location)
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/opt/prefect/healthcheck.py", line 151, in <module>
        flows = cloudpickle_deserialization_check(flow_file_paths)
      File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
        flows.append(cloudpickle.loads(flow_bytes))
      File "/usr/local/lib/python3.7/site-packages/prefect/tasks/gcp/__init__.py", line 24, in <module>
        ) from err
    ImportError: Using `prefect.tasks.gcp` requires Prefect to be installed with the "gcp" extra.
    I tried;
    storage = Docker(
        registry_url="<http://gcr.io/blah/|gcr.io/blah/>",
        image_name=name,
        image_tag="latest",
        build_kwargs={"buildargs":{"EXTRAS":"kubernetes,gcp"}}
    )
    With no luck 😞
    j
    j
    • 3
    • 21
  • m

    Matthew Blau

    01/27/2021, 4:32 PM
    Hello all, I am looking for some guidance/examples on running flows on docker containers. Right now my company has software that runs on schedules in their own docker containers. My understanding currently of how to best utilize Prefect is to rebuild those containers to have
    FROM prefecthq/prefect:0.7.1-python3.6
    or similar in the Dockerfile, which I have done so. From there I am needing to create a flow that takes this Dockerfile and builds the container and that is where my understanding is weaker and not so clear. I have @task decorations added to various bits of the integration that I am currently attempting to convert over to having Prefect handle the execution of. I am stuck with the how of writing the flow in order to have it work with this docker container. Am I needing a seperate flow.py that takes the Dockerfile of the container, build it, and run the tasks denoted by "@task" within the integration in order for this to be able to be orchestrated by Prefect? If so, how would I write the flow as an example? I feel like my understanding is flawed and would appreciate some help with this. For reference I am running 0.14.1 Thank you all in advance!
    s
    • 2
    • 13
  • j

    jcozar

    01/27/2021, 5:01 PM
    Hello all, I am running a prefect ECS agent as a service in AWS ECS. Now I am trying to register a flow run in Prefect Cloud using the Docker storage. I created a
    ECSRun
    run configuration for the
    run_config
    argument in the Flow. If I use the
    env
    argument to provide the
    AWS_ACCESS_KEY_ID
    and
    AWS_SECRET_ACCESS_KEY
    it works! However, I don’t want to put my credentials in the source code. I am trying to use the
    task_definition_arn
    argument, but I am not sure if it is the correct way, because the image of the task should be the Flow docker image. Can you give me any tip or advise? Thank you very much!
    j
    • 2
    • 7
  • j

    Josh Greenhalgh

    01/27/2021, 5:24 PM
    Do mapped tasks only behave in a parallel fashion on dask? I am running on k8s using job per flow and all my mapped tasks log in order sequentially - also theres a huge gap between two mapped tasks I was under the impression that if you have;
    p1_res = extract_raw_from_api.map(
            asset_location=asset_locations,
            extract_date=unmapped(extract_date),
            extract_period_days=unmapped(extract_period_days),
        )
        p2_res = process_2.map(p1_res)
        p3_res = process_3.map(p2_res)
    Then as soon as task 1 from p1_res is done then the corresponding task should start in process_2? As it stands no process_2 tasks start until many of the extract_raw_from_api tasks are already complete...
    j
    j
    • 3
    • 19
  • f

    Felipe Saldana

    01/27/2021, 5:31 PM
    Hello all, I was wondering if someone can point me in the right direction. (Please let me know if I need to clarify 🙂) I have this working as a POC in Dagster and was wondering how this example would work in Prefect This would be testing a custom task class ... multiple instances of a smaller task ... configurable params/secrets I have a legacy module that accepts 20+ parameters. Some of these parameters will be required (name), some will need to be secrets (tokens), and some vary (description). Depending on the combination of these parameters the module will do specific things: Query a database and dump to a bucket, grab data from a bucket and load in the db, and other tasks. Instead of accepting command line args, I want to "prefect-ify" that module and supply some of the configuration directly into that module. Would this be a combination of parameters and secrets? Can these be loaded from a toml file? Next, I want to wrap that base module into a smaller/specific task: example: query_db_and_dump_to_bucket(). This smaller task will have required values(db host, db username, db pass, table_name). Just to point out these values are not required in the base module. In my flow, I would want to call query_db_and_dump_to_bucket() again and again using different tables names.
    e
    • 2
    • 22
  • s

    Sean Talia

    01/27/2021, 5:40 PM
    I have a (I think simple) question around exposing more complete output to the prefect logger – I'm deliberately crashing a task that uses a
    DbtShellTask
    to execute a dbt project under the hood, and am not supplying the necessary ENV vars that
    dbt
    needs to run. If I have prefect run my flow that calls this shell task, the only failure message I see is the very last line of what dbt spits out, i.e.
    [2021-01-27 16:34:13+0000] ERROR - prefect.run_dbt | Command failed with exit code 2
    [2021-01-27 16:34:13+0000] ERROR - prefect.run_dbt |   Could not run dbt
    this is obviously not terribly informative, and I'd like for prefect to forward to me the entirety of the error message that dbt generates, which is:
    Encountered an error while reading profiles:
      ERROR Runtime Error
      Compilation Error
        Could not render {{ env_var('DB_USERNAME') }}: Env var required but not provided: 'DB_USERNAME'
    Defined profiles:
     - default
    
    For more information on configuring profiles, please consult the dbt docs:
    
    <https://docs.getdbt.com/docs/configure-your-profile>
    
    Encountered an error:
    Runtime Error
      Could not run dbt
    what kind of additional steps need to be taken in order for me to have prefect forward all the lines of output rather than just the final one? it's not clear to me where or how I would need to adjust the default prefect logger to set this up
    j
    • 2
    • 2
  • n

    Nikul

    01/27/2021, 5:51 PM
    Hi all, I'm running a flow on Kubernetes using DaskExecutor with 4 workers, but it doesn't create 4 worker pods. Ideally, I'd like to be able to scale to >1 node (not using an existing Dask cluster). (I tried using dask_kubernetes.KubeCluster, but I get an error: AttributeError("'NoneType' object has no attribute 'metadata'"). I'm not sure if this would solve my problem anyway.) Is there a way I can run a flow such that I can scale up to multiple nodes? Thank you
    j
    • 2
    • 1
  • d

    Daniel Black

    01/27/2021, 6:42 PM
    Hello! I am trying to test a few flows in our prod environment by running a local agent in a docker container that simulates the environment. For some reason the agent is not picking up the registered flows even though they share labels. Does anyone know why this would be happening? For reference, it runs successfully if I run a local agent in my dev environment
    j
    • 2
    • 10
  • m

    Marco Petrazzoli

    01/27/2021, 7:51 PM
    Hello, it's the right place to ask something about prefect errors?
    m
    • 2
    • 1
  • m

    Matthew Blau

    01/27/2021, 9:01 PM
    Hello all, I have made some progress with using docker storage in my flow, however, I am now getting this error:
    ValueError: Your docker image failed to build!  Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
    The full traceback is
    Step 12/22 : WORKDIR /app
     ---> Running in b820b907ef61
    Removing intermediate container b820b907ef61
     ---> fee83981b109
    Step 13/22 : RUN groupadd --gid ${GID} ${USERNAME}     && useradd --uid ${UID} --gid ${GID} --shell /bin/bash -M ${USERNAME}     && chown -R ${USERNAME} /app     && chgrp -R ${USERNAME} /app
     ---> Running in 7f6c8b47c9e3
    groupadd: invalid group ID 'rainbow'
    
    Removing intermediate container 7f6c8b47c9e3
    The command '/bin/sh -c groupadd --gid ${GID} ${USERNAME}     && useradd --uid ${UID} --gid ${GID} --shell /bin/bash -M ${USERNAME}     && chown -R ${USERNAME} /app     && chgrp -R ${USERNAME} /app' returned a non-zero code: 3
    Traceback (most recent call last):
      File "integration.py", line 11, in <module>
        from change_password import *
      File "/home/mblau/projects/experian-integration/change_password.py", line 5, in <module>
        from integration import modify_config
      File "/home/mblau/projects/experian-integration/integration.py", line 338, in <module>
        flow.register(project_name="test")
      File "/home/mblau/.local/lib/python3.8/site-packages/prefect/core/flow.py", line 1665, in register
        registered_flow = client.register(
      File "/home/mblau/.local/lib/python3.8/site-packages/prefect/client/client.py", line 782, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "/home/mblau/.local/lib/python3.8/site-packages/prefect/core/flow.py", line 1451, in serialize
        storage = self.storage.build()  # type: Optional[Storage]
      File "/home/mblau/.local/lib/python3.8/site-packages/prefect/storage/docker.py", line 360, in build
        self._build_image(push=push)
      File "/home/mblau/.local/lib/python3.8/site-packages/prefect/storage/docker.py", line 427, in _build_image
        raise ValueError(
    ValueError: Your docker image failed to build!  Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
    what am I doing wrong?
    j
    • 2
    • 25
  • m

    matta

    01/27/2021, 10:03 PM
    Is there a particular "Dask-ic" way to split a big CSV and make sure the compressed chunks are a specific size?
    j
    • 2
    • 4
  • p

    Peyton Runyan

    01/27/2021, 10:36 PM
    Hi all! I'm having a bit of an issue with an empty UI when running
    prefect server start
    . I'll post the logs in the thread.
    m
    s
    • 3
    • 29
  • m

    Marco Petrazzoli

    01/27/2021, 11:10 PM
    I'm missing something. It's possibile to launch a flow of docker images tasks?
  • m

    Marco Petrazzoli

    01/27/2021, 11:12 PM
    I'm using something like:
    def say_hello():
        container = CreateContainer(
            image_name="nonexistentname",
        )
        StartContainer(container_id=container, docker_server_url="arandomtext")
        WaitOnContainer(container_id=container)
    and it's always successful. Even with correct data i don't see my image executed
    m
    m
    • 3
    • 4
  • a

    Arnulfo Soriano

    01/27/2021, 11:26 PM
    Hi, We would like to do flow registers from Github Actions when a new flow gets merged into master. To accomplish this, we have our graphql end point protected through SSL using a certificate and we define it as an environment variable PREFECT__CLOUD__GRAPHQL=https://protected.grpahql.endpoint.com so prefect from Github Action runner linux image has access to this. So this endpoint is only accessible though https and using ca certificates. I've tested that this end point works on Chrome with the give certificate and it works fine. However, when we try to call it through Github Actions runners i keep getting: prefect.TaskRunner | FAIL signal raised: FAIL('Failed to register flow_test, error: 400 Client Error: Bad Request for url: https://protected.grpahql.endpoint.com/\nThis is likely caused by a poorly formatted GraphQL query or mutation but the response could not be parsed for more details') Here is some of the things i've tried 1) configured the certificate by adding it to the Github actions runner image at /usr/local/share/ca-certificates/graphCert.crt and then 'dpkg-reconfigure ca-certificates' 2) Declared python env variable to tell it where too look for the ca certs REQUESTS_CA_BUNDLE: /etc/ssl/certs/ca-certificates.crt Does any one of you have a similar setup where your graphql endpoint is protected in this way? When it's not protected our setup works just fine... Thank you in advance...
    m
    • 2
    • 1
  • f

    Fina Silva-Santisteban

    01/27/2021, 11:55 PM
    Hi Prefect team! Can someone pls confirm or clarify the following understanding that I have about how Prefect works with Docker: • I can run an app inside a docker container, any dependencies needed, e.g. pandas library, will be installed inside that container. Our repo already has a Dockerfile and a docker-compose.yml which build an image and run the app inside a container successfully. Necessary commands to do that are
    docker-compose build app
    , then
    docker-compose run app
    .  • Prefect Agent runs flows. The Local Agent runs flows with the local setup. This is not feasible for production since we’d need to manually make sure that whichever machine is hosting the flow has all dependencies installed. Better: run flows within a docker container. This way we don’t have to worry about dependencies, since every machine will simply be running the container. To run flows in containers we need to use the Docker Agent. • The Docker Agent DOES NOT REQUIRE DOCKER STORAGE, but can work with Docker Storage if desired. The Docker Agent can run with a locally available image (image="example/image-name:with-tag"). If Docker Storage is used, you can provide a url (
    <http://registry_url|registry_url>
    )to where your image is hosted, e.g. Docker Hub, and that url is used to get the image. • I should be able to run the Docker Agent with the image I had previously created with docker-compose. (It doesn’t seem to be able to find the image, even though I specify the absolute path. )
    m
    • 2
    • 18
  • m

    Martin Shin

    01/28/2021, 2:20 AM
    Hi, we’re using a paid plan on Prefect Cloud but we are now seeing this error when trying to register new flows: 
    This tenant already has the maximum number of flows
    . Don't' see there's any mention of upper limit on flows? Any help will be appreciated!
    j
    • 2
    • 1
  • m

    Marco Petrazzoli

    01/28/2021, 10:03 AM
    Hi, I would like to launch a flow where each task is a docker image(with a CMD altredy set, just need some volume mount). For me it's important that each task in UI results as function name and not like each docker task name.(https://docs.prefect.io/api/latest/tasks/docker.html)
    e
    m
    • 3
    • 7
  • h

    Henrik Väisänen

    01/28/2021, 11:39 AM
    Hey, I use Prefect+Dask distributed and in the code the prefect flow is executed multiple times consecutively. The problem is that in the Dask profiler I see that the memory usage keep growing linearly although each execution (prefect flow) can be considered as a "clean run" and the amount of used memory should stay constant. Has anyone tackled similar issue?
    m
    • 2
    • 12
  • n

    Nikul

    01/28/2021, 12:24 PM
    I'm trying to use KubeCluster with DaskExecutor in a simple flow on GCP:
    @task
    def get_mean():
        import dask.array as da
        array = da.ones((1000, 1000, 1000))
        return array.mean().compute()
    
    @task
    def output_value(value):
        print(value)
    
    with Flow("Static Dask Cluster Example") as flow:
        res = get_mean()
        output_value(res)
    
    flow.run_config = KubernetesRun(labels=["dev"], image="my-image")
    
    def get_cluster(n):
        pod_spec = make_pod_spec(image="my-image",
                                  memory_limit='1G', memory_request='1G',
                                  cpu_limit=1, cpu_request=1)
        return KubeCluster(pod_spec, n_workers=n)
    
    flow.executor = DaskExecutor(cluster_class=get_cluster, cluster_kwargs={'n': 2})
    flow.storage = GCS(bucket="my-bucket")
    flow.register(project_name="tutorial")
    However, I get a strange error:
    [2021-01-28 12:11:41+0000] ERROR - prefect.CloudFlowRunner | Unexpected error: AttributeError("can't set attribute")
    Traceback (most recent call last):
      File "/opt/conda/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/opt/conda/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
        with self.check_for_cancellation(), executor.start():
      File "/opt/conda/lib/python3.8/contextlib.py", line 113, in __enter__
        return next(self.gen)
      File "/opt/conda/lib/python3.8/site-packages/prefect/executors/dask.py", line 211, in start
        with self.cluster_class(**self.cluster_kwargs) as cluster:  # type: ignore
      File "k8_flow.py", line 33, in get_cluster
      File "/opt/conda/lib/python3.8/site-packages/dask_kubernetes/core.py", line 414, in __init__
        super().__init__(**self.kwargs)
      File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 274, in __init__
        super().__init__(
      File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 69, in __init__
        self.name = str(uuid.uuid4())[:8]
    AttributeError: can't set attribute
    Error in atexit._run_exitfuncs:
    Traceback (most recent call last):
      File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/spec.py", line 651, in close_clusters
        if cluster.status != Status.closed:
    AttributeError: 'KubeCluster' object has no attribute 'status'
    Exception ignored in: <function Cluster.__del__ at 0x7f99f6b89e50>
    Traceback (most recent call last):
      File "/opt/conda/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 113, in __del__
        if self.status != Status.closed:
    AttributeError: 'KubeCluster' object has no attribute 'status'
    This is running on a KubernetesAgent on GKE with the RBAC permissions mentioned in the documentation. The docker images I'm using are running dask-kubernetes==0.11.0, distributed==2020.12.0, prefect==0.14.5. Thank you in advance
    👍 1
    j
    • 2
    • 4
  • a

    ale

    01/28/2021, 2:07 PM
    Thanks for the swag Prefect Team! You rock! 🚀
    🙏 2
  • m

    Maurits de Ruiter

    01/28/2021, 2:09 PM
    Hi everyone! Is there some way to show statistics of a task in the UI? Currently we have a task that replace an entire table of our db with new data. Then we push a message to our Slack with the old and new counts of rows. If the difference is significant, then the task is considered failed. Is there some way to display these counts in the Prefect UI?
    j
    m
    • 3
    • 4
  • m

    Matthew Blau

    01/28/2021, 2:42 PM
    Hello all, I have been successful in getting prefect to build and run a flow on a docker container. However, I am getting an error of
    Unexpected error: ValueError('Could not infer an active Flow context.')
    and I am struggling to find relevant information on how best to debug this. I thank you all in advance for helping!
    g
    • 2
    • 8
  • m

    Marc Lipoff

    01/28/2021, 2:54 PM
    What is the best way to pass non-serializable objects into tasks? For example, I have a database connection or a sftp connection that I want to reuse. The use is that I have many files I need to write to a sftp, and want to use mapping functionality. But creating many connections is costly (and it seems that this sftp server is limiting the number of connections)
    m
    • 2
    • 3
  • p

    Peter Roelants

    01/28/2021, 3:14 PM
    Hi Prefect, What's the recommended way of dealing with Task failures? For example, I want to write to a db table or send a message to Kafka if a certain task has failed. From the documentation it seems that there are two ways of handling failure of the task: • Implement a State Handler to deal with the failed state. • Implement a Task to deal with the failed stage and use a Trigger with
    any_failed
    ? I'm not sure which is the most idiomatic way of dealing with this failure, since updating a database or sending a message to Kafka itself is prone to failure (and "negative engineering" to deal with that failure). Does Prefect have an opinion/guidelines how to deal with this?
    m
    • 2
    • 17
  • l

    Levi Leal

    01/28/2021, 3:17 PM
    How do I run an agent on ECS? I've tried the docs on https://docs.prefect.io/orchestration/agents/ecs.html#agent-configuration but it doesn't work. I always get an error. The error stack is on the thread
    m
    • 2
    • 15
  • m

    Manuel Mourato

    01/28/2021, 5:04 PM
    Hello all! Quick question: is it possible to pass a logger.xml file to Prefect logger? I want to implement log rotation for each task in my pipeline Thank you!
    m
    • 2
    • 1
  • b

    BK Lau

    01/28/2021, 5:05 PM
    Q: Is there a workflow
    model
    that one can export out from Prefect? I intended to use it to auto generate Python code from it
    m
    • 2
    • 4
Powered by Linen
Title
b

BK Lau

01/28/2021, 5:05 PM
Q: Is there a workflow
model
that one can export out from Prefect? I intended to use it to auto generate Python code from it
m

Michael Adkins

01/28/2021, 5:06 PM
Hi! The
Flow
object contains the full DAG of your tasks — is that what you’re looking for?
b

BK Lau

01/28/2021, 6:01 PM
hi @Michael Adkins What I'm looking for is some
JSON
representation of that DAG. Since flow.Register() must have send some of DAG data to the
Prefect
server minus the task code for orchestration purpose, that might be the "model"?
m

Michael Adkins

01/28/2021, 6:22 PM
Yep you can just serialize the flow — that’s what’s sent to the server.
https://github.com/PrefectHQ/prefect/blob/46afb3a394da1d9a3ff4481b66666f487a312041/src/prefect/core/flow.py#L1410
👍 1
View count: 2