https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Michelle Wu

    11/30/2020, 2:08 AM
    Hi, I want to run a flow on a remote dask-worker using the
    dask-resource
    in
    tags
    as argument for
    @task
    :
    running_server = "name_of_remote_server"
    resource_tag = "dask-resource:{}=1".format(running_server)
    @task(log_stdout=True, state_handlers=[email_on_failure], tags=[resource_tag])
    def test_task():
        print(1/0)
    When I started the remote dask-worker, I used command like this:
    dask-worker tcp://<address of local dask-scheduler>:8786 --nprocs 4 --nthreads 1 --worker-port xxx --resources "name_of_remote_server=1"
    This connected local scheduler with remote worker perfectly. However, when I actually started the flow on local machine, it failed running on the remote worker first because of
    ModuleNotFoundError: No module named 'prefect'
    . After I installed prefect for the remote worker, another error occurred on it:
    [2020-11-24 17:02:05+0800] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6099fcf10>: Failed to establish a new connection: [Errno 111] Connection refused'))"))
    Traceback (most recent call last):
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
        (self._dns_host, self.port), self.timeout, **extra_kw
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
        raise err
      File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
        sock.connect(sa)
    ConnectionRefusedError: [Errno 111] Connection refused
    ...
    2020-11-24 17:02:05+0800] INFO - prefect.CloudTaskRunner | Task 'test_task': Finished task run for task with final state: 'ClientFailed'
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
    [2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
    I tried opening port 4200 of local scheduler for the remote worker, but the same error occurred anyway. Wondering what it is that I’ve been doing wrong? 😶
    k
    • 2
    • 4
  • k

    Klemen Strojan

    11/30/2020, 9:31 AM
    Hey all - Not sure if this is the right channel, but what the heck 🙂 I would love to contribute to Prefect Task library. Specifically, I want to add a task that fetches data from Dremio. Now I haven’t noticed anything related to Dremio on GitHub and here on Slack, so I went and implemented a first draft of a solution on my fork of prefect. This is my very first try in contributing to open source and as you will be able to notice from the code, my programming experiences are limited. Nevertheless, I am enthusiastic to contribute 🤓 Of course I did my best to follow the docs (Contributing). And here is my question: what should be my next step to move this forward? Should I open a PR and get feedback there or are there other steps to be taken beforehand?
    s
    k
    • 3
    • 3
  • e

    Eric

    11/30/2020, 9:53 AM
    Hi folks, I have a question around the hybrid model. As mentioned in the hybrid model - medium blog, once I registered my flow on the Prefect Cloud (step 2), I could run an agent on my infra (step 3). That is, the Prefect Cloud and the agent (using docker agent) are two different machines. And I set the same env as mentioned above. The question is, in the agent machine, I use the command “`prefect agent docker start --api http://<Prefect Cloud IP>:4200` (backend: server) and the agent will query the prefect core in my cloud server. But I change backend to cloud, using “`prefect agent docker start -t <MY_TOKEN> --api http://<Prefect Cloud IP>:4200`” will show the following message:
    prefect.utilities.exceptions.ClientError: 400 Client Error: Bad Request for url: http://<Prefect Cloud IP>:4200
    
    This is likely caused by a poorly formatted GraphQL query or mutation. GraphQL sent:
    
    query {
      query { auth_info { api_token_scope } }
    }
    variables {
      null
    }
    Is there any missing step when I set Prefect agent? Thank you very much!
    k
    • 2
    • 5
  • j

    Jonas Hanfland

    11/30/2020, 10:37 AM
    Hi guys, I am having an issue with one of my flows not terminating due to a single mapped task ("mapped child 1") being stuck in
    pending
    indefinitely. Not even the timeout provided to
    @task
    gets triggered. When checking the logs for the mapped task in question, I found what seems to be an internal server error (see thread). Rerunning it multiple times in the past resulted in the same issue (with the same exception), except this Saturday when it miraculously succeeded but then failed again the next day. Is that exception the cause of the problem? Is the issue on prefect's side or on mine? Thanks in advance
    k
    • 2
    • 2
  • e

    emre

    11/30/2020, 10:55 AM
    Hi everyone, I am migrating one of our core only runs to prefect server, but couldn’t get the tasks to work in parallel. The core only run uses
    flow.run(executor=LocalDaskExecutor(scheduler="threads", num_workers=4))
    and can execute tasks in parallel. The server registration is as follows:
    flow.storage = Docker(....)
    flow.run_config = DockerRun()
    flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
    flow.register(project_name=... , build=False)
    The flow can run and communicate with prefect server, but the tasks are executed sequentially. Both the gannt chart and the logs show this. Am I missing something? (prefect==0.13.13)
    v
    c
    m
    • 4
    • 6
  • b

    Ben

    11/30/2020, 2:16 PM
    I think I want to run different tasks with slightly different Kubernetes templates in one run, e.g. if machine resources could vary between tasks (GPU or not) or each task has a different entrypoint in the kubernetes template. Is using dependent flows the way to go there? I assume each dependent flow could have its own
    run_config
    then. Is that use case normal or am I thinking about it wrong? Just to double check that I'm not attempting anything totally unidiomatic here.
    k
    • 2
    • 2
  • m

    matta

    11/30/2020, 5:22 PM
    Heya! Question from co-worker Let's say I wanted to run multiple dbt commands through the dbt task, but will not know the commands until runtime (they will be received as a paramter). What is the recommended way of looping tasks? Would something like this be ok?
    m
    • 2
    • 1
  • m

    matta

    11/30/2020, 5:22 PM
    dbt_task = DbtShellTask(
      ...
    )
    with Flow("dbt-flow") as flow:
        dbt_command_string = Parameter('dbt_command_string', default = 'dbt run', required = True)
        dbt_commands = dbt_command_string.split(',')
        for command in dbt_commands:
          dbt_task(
              command=dbt_command,
              ...
          )
  • m

    matta

    11/30/2020, 5:23 PM
    It also needs to be done sequentially. Is there an argument to call to
    map
    that forces it to run serially?
  • m

    matta

    11/30/2020, 5:23 PM
    in the dbt slack he was told to not just loop it and to use mapping
  • m

    matta

    11/30/2020, 5:23 PM
    Or would this be the way? https://docs.prefect.io/core/advanced_tutorials/task-looping.html
  • m

    matta

    11/30/2020, 5:23 PM
    Thanks!
  • j

    John Grubb

    11/30/2020, 7:25 PM
    Hey team, I have two pipelines that I'm running using the Cloud for orchestration. More often than I'd like I wake up the next morning to find that one of them has failed with
    Unexpected error while running flow: KeyError('Task slug foo-3 not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.')
    It's always the same task for both pipelines - I've split out a common task to load data to BigQuery into another file and this is the task that fails. It will run fine for a few days and then randomly decide that it can't find this task, even though the exact same task is called two other times during each pipeline, foo-1 and foo-2. I'm wondering if I'm the first person to have their install randomly lose tasks..
    k
    a
    b
    • 4
    • 14
  • j

    josh

    11/30/2020, 8:45 PM
    Hey team, Prefect version 
    0.13.18
     has been released and here are a few notable changes:   ⁉️   Formatted GraphQL errors are now raised by the Client   👟   Refactored a few Client API calls for performance   📚   Various enhancements and fixes to tasks in the Task Library A big thank you to our contributors who helped out with this release! Full changelog https://github.com/PrefectHQ/prefect/releases/tag/0.13.18
    Untitled
    :upvote: 6
    🚀 7
  • s

    Sam Luen-English

    11/30/2020, 9:40 PM
    Hi there, I am trying to deploy a flow and I am running into some issues... I have some complex dependencies and so want to use a custom base docker image. I've tried to create a simple example illustrating what I want to do. - In this example, I have a base docker image which has sqlalchemy installed (this is a placeholder for my own library). - I always develop inside docker containers and don’t want to install this library locally. This is also useful so I can easily deploy in CI. I’ve tried two approaches: - Running the main.py script on my host (but this obviously fails due to an import error). - Use the base image to run main.py, mounting the docker socket. This fails with the message “ValueError: Your docker image failed to build! Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included”, even though in the output I can see that all the health checks pass. A few other thoughts I had: - Perhaps I could just move the imports inside the task functions - but this doesn’t seem nice to me. - Perhaps I should use a docker task instead - but this seems overkill and i believe this is the exact use case of the base image. Dockerfile:
    FROM python:3.8
    RUN python -m pip install sqlalchemy prefect
    main.py
    from prefect import Flow, task
    from prefect.environments.storage import Docker
    import sqlalchemy
    
    
    @task
    def some_task():
        print(sqlalchemy.__version__)
    
    
    with Flow("Some Flow") as flow:
        some_task()
    
    if __name__ == "__main__":
        flow.storage = Docker(
            registry_url="<http://docker.io|docker.io>",
            image_name="storage_image",
            image_tag="latest",
            base_image="test-base",
            local_image=True,
        )
        flow.register(project_name="Development")
    Test script:
    #!/usr/bin/env bash
    set -x
    
    docker build . -t test-base
    
    #: Approach 1
    python main.py
    
    #: Approach 2
    docker run -v /var/run/docker.sock:/var/run/docker.sock -v ~/.docker:/root/.docker -e PREFECT__CLOUD__AUTH_TOKEN="$PREFECT__CLOUD__AUTH_TOKEN" -v $(pwd)/main.py:/main.py test-base bash -c 'python /main.py'
    Please could you guide me with the best approach? Many thanks!
    k
    w
    +2
    • 5
    • 9
  • p

    Pedro Machado

    12/01/2020, 5:09 AM
    Hi there. We have a common pattern across flows where we have a
    start_date
    parameter that is not required. When the parameter is passed, we validate it and use it in the flow. When it's not passed, we use the flow's
    scheduled_start_time
    to derive the start date. This complicates the flow a little bit. I normally have to create a downstream task that takes the
    Parameter
    result and applies this logic. I was thinking that it would be nice to be able to pass a callable as the
    default
    argument of the
    Parameter
    class to provide a value if the parameter is missing. Similarly, it would be nice to have a
    validator
    callable that could be used to validate the parameter value. Is there another way to accomplish this? Thoughts?
    🧐 1
  • e

    Eric

    12/01/2020, 6:55 AM
    Hi folks, Are there any methods can set the Prefect Core page like a view of the read-only user, or it just could be achieve via Prefect Cloud? I just want users to read prefect core page like a read-only user. Thanks in advance🙏🏽
    j
    • 2
    • 2
  • s

    Sam Luen-English

    12/01/2020, 1:03 PM
    I'm trying to use the docker storage in a private dockerhub repository. I can see that the build succeeds, but it fails when it get's to https://github.com/PrefectHQ/prefect/blob/master/src/prefect/environments/storage/docker.py#L425 The name it querys using the docker client is "docker.io/atheon/prefect-test::2020-12-01t12-57-38-789517-00-00" - but at this point the image has yet to be pushed so I don't know how this would ever succeed. Can someone point out where I'm going wrong? Many thanks!
    from prefect import Flow, task
    from prefect.environments.storage import Docker
    import sqlalchemy
    
    
    @task
    def some_task():
        print(sqlalchemy.__version__)
    
    
    with Flow("test") as flow:
        some_task()
    
    if __name__ == "__main__":
        flow.storage = Docker(
            registry_url="<http://docker.io/atheon|docker.io/atheon>",
            image_name="prefect-test",
            base_image="atheon/prefect-test:base",
        )
        flow.register(project_name="Development")
    j
    m
    • 3
    • 7
  • k

    karteekaddanki

    12/01/2020, 3:05 PM
    Hi all, When I updated to the latest version of prefect (0.13.17) one of my old scripts that built a docker storage fails mysteriously with the following message
    ---> Running in 26ae91f84100
    Beginning health checks...
    System Version check: OK
    Cloudpickle serialization check: OK
    Result check: OK
    Environment dependency check: OK
    All health checks passed.
    Removing intermediate container 26ae91f84100
     ---> a63da9840884
    Traceback (most recent call last):
      File "register.py", line 60, in <module>
        storage.build()
      File "/home/karteek/anaconda3/envs/prefect/lib/python3.6/site-packages/prefect/environments/storage/docker.py", line 359, in build
        self._build_image(push=push)
      File "/home/karteek/anaconda3/envs/prefect/lib/python3.6/site-packages/prefect/environments/storage/docker.py", line 423, in _build_image
        self._parse_generator_output(output)
      File "/home/karteek/anaconda3/envs/prefect/lib/python3.6/site-packages/prefect/environments/storage/docker.py", line 661, in _parse_generator_output
        or parsed.get("errorDetail", {}).get("message")
    AttributeError: 'NoneType' object has no attribute 'strip'
    Any thoughts on what might be happening here? I can confirm that the docker image is building without any issues and the logs show that all the health checks are passing. I am using python 3.6
    j
    s
    • 3
    • 2
  • c

    Charles Lariviere

    12/01/2020, 3:41 PM
    Hey folks 👋 I’m completely new to Prefect but it sounds amazing — I’m wondering what are best practices for structuring flows to be run with Prefect Cloud? I have a hello-world example that I tried with our Snowflake warehouse; when ending the script with
    flow.run()
    (instead of
    flow.register()
    ) and running with a local-agent, everything works great. Though, when I run the following script to register it in our Prefect Cloud account and execute it through the UI, it works (as in, the query gets executed) but I get the following error:
    Unexpected error: TypeError("can't pickle _thread.lock objects")
    . My question is; what are best practices for structuring these scripts? Once you’ve built the flow, should you always end it with
    flow.run()
    and
    flow.register()
    ? Do you manually register flows when making changes but not include the
    register()
    method inside the actual script?
    j
    m
    • 3
    • 8
  • s

    Sean Talia

    12/01/2020, 5:26 PM
    hi all, I'm also totally brand new to Prefect and am just going through the Cloud Tutorial ; I think some of sample code in here might be outdated? I'm at the "Flow with Docker" and "Register Flow with the Prefect API" sections, and this error is being raised (the check for which appears to have been introduced ~5 months ago)
    j
    m
    n
    • 4
    • 7
  • a

    Andrey Tatarinov

    12/01/2020, 6:06 PM
    Hi all, question about checkpointing and caching: There is a simple flow with two tasks: A -> B. Caching is setup with cache_for=timedelta(days=1) and result=LocalResult('./tmp') I expect that when I re run flow from console result would be computed once and then reused, but I see that each time I run flow new files are added to result directory. Is this expected behaviour? How to cache data between flow runs?
    c
    a
    • 3
    • 3
  • a

    Andrew Hannigan

    12/01/2020, 8:24 PM
    Do all tasks within a flow need to be "connected"? https://en.wikipedia.org/wiki/Connectivity_(graph_theory)
    c
    • 2
    • 1
  • z

    Zach

    12/01/2020, 9:38 PM
    Registering my flows takes a (relatively) really long time, is there anything I can do to speed it up
    j
    • 2
    • 4
  • p

    Pedro Machado

    12/02/2020, 1:29 AM
    Hi. Bumping this message in case it got missed last night. Any feedback?
    n
    • 2
    • 6
  • s

    Slackbot

    12/02/2020, 11:54 AM
    This message was deleted.
    ✋ 1
    a
    • 2
    • 1
  • a

    Alex Koay

    12/02/2020, 1:29 PM
    hi there, I'm having trouble with building Docker images where I have multiple files of the same filename but in different paths, does anybody know if this is a bug?
    j
    • 2
    • 7
  • r

    Rodrigo Ceballos

    12/02/2020, 2:49 PM
    Hello prefect! I’m an ML engineering working on computer vision for cancer diagnosis. At our company we have an HPC and use Slurm as our scheduler. We are trying to integrate prefect into our pipeline and are currently working on writing a Slurm task executor, hoping this is all we’ll need to build too use prefect for our usecase. If anyone has any resources or suggestions for us, they would be much appreciated, thanks!
    m
    • 2
    • 2
  • i

    Iason Andriopoulos

    12/02/2020, 2:52 PM
    Hello everyone, I am a machine learning engineer, congrats on your amazing engine. I am in the process of migrating some workflows to prefect and I am facing the following problem: Supposing that I have 2 flows already registered and I want to create a flow that combines them according to https://docs.prefect.io/core/idioms/flow-to-flow.html but I want to provide the output of a specific task of one flow as a Parameter of the next flow (see snippet). Is it possible to do so (and how?).
    Untitled.txt
    e
    • 2
    • 7
  • a

    Andrey Tatarinov

    12/02/2020, 3:49 PM
    What are the best practices to mix local development and remote execution? I have a flow which operates with relatively large chunks of data: 100+Mb XML files. I would like to iterate over solution locally with local caching enabled via LocalResult, and then submit flow to K8s environment which would use GCSResult to persist in the cloud. What would be the best way to organize my code to achieve that behaviour?
    a
    • 2
    • 7
Powered by Linen
Title
a

Andrey Tatarinov

12/02/2020, 3:49 PM
What are the best practices to mix local development and remote execution? I have a flow which operates with relatively large chunks of data: 100+Mb XML files. I would like to iterate over solution locally with local caching enabled via LocalResult, and then submit flow to K8s environment which would use GCSResult to persist in the cloud. What would be the best way to organize my code to achieve that behaviour?
a

Andrew Hannigan

12/02/2020, 5:14 PM
The agent is what determines which execution environment you run on. So you should be able change execution environments without any change to your code. Just send the job to a kubernetes agent instead of the default local agent: https://docs.prefect.io/orchestration/tutorial/k8s.html#running-a-kubernetes-agent
a

Andrey Tatarinov

12/02/2020, 6:07 PM
@Andrew Hannigan does it mean that I can define my task @task(result=LocalResult(...)) def f()... And later somehow redefine the way results of this specific task are handled?
a

Andrew Hannigan

12/02/2020, 6:51 PM
I might suggest using cloud-native storage even when running a local agent for this reason
https://docs.prefect.io/core/concepts/results.html#templating-result-locations
a

Andrey Tatarinov

12/02/2020, 7:44 PM
I see your point. In my case, I would prefer to keep development and production separate
We ended up with a function
def make_flow(Result)
which accepts Result constructor.
a

Andrew Hannigan

12/02/2020, 8:03 PM
Makes sense - The result location can be formatted dynamically based on the prefect context. So if you wanted you could also pass in an EXECUTION_ENV environment variable to the prefect ccontext, which you could use to format your result location dynamically. But what you've described sounds like it will also work fine.
View count: 2