https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • s

    Saoirse Amarteifio

    06/20/2020, 9:07 PM
    hi there. ive just started playing around with prefect and having fun. i have set up various working examples e.g. connecting to static and dynamics dask execution environments. I am a little confused about how i would combine dask/prefect-server + ui on kubernetes and submit flows from a client (e.g. connecting to a dask scheduler running on kubernetes and being able to monitor them in a UI). The UI could be running anywhere i guess im just not sure how this fits together. prefect-server needs docker compose etc so i guess we dont run this on kubernetes? anyway, a little lost on this monitoring part and not even sure what is the right question. would love to hear folks thoughts on this.
    c
    2 replies · 2 participants
  • v

    Vikram Iyer

    06/29/2020, 5:09 PM
    Hey Guys, I have recently started evaluating prefect for my ML work at my current company. I have a few questions around "how to" use prefect for my local deployment. Setup:- I have a backend service that runs inside a docker container on a host machine. Prefect server (prefect server start) and agent (prefect agent start) run on the same host. Question:- How do I register flows from within my container to the server on host and make them run from UI? So, when the service is running on the host I can simple do the below steps
    1. flow.register() # this will register the flow and hence I can see it on the UI
    2. From the UI, I can go to the particular flow and run it. # easy
    How do I do something similar, with the only exception of the service and code residing inside a docker container? End Notes:- I might be looking at it from a completely wrong perspective, please correct my understanding if necessary. Cheers! Vikram
    k
    2 replies · 2 participants
  • r

    Rob Harrop

    06/30/2020, 11:11 AM
    Hello all. I'm trying to connect the CLI to our Prefect Cloud. I have version 0.12.1 for the CLI and I'm getting this odd error:
    Traceback (most recent call last):
      File "flow.py", line 38, in <module>
        main()
      File "flow.py", line 36, in main
        flow.register(**register_params)
      File "/Users/rdh/Library/Caches/pypoetry/virtualenvs/pipelines-3RvCBXzu-py3.8/lib/python3.8/site-packages/prefect/core/flow.py", line 1479, in register
        registered_flow = client.register(
      File "/Users/rdh/Library/Caches/pypoetry/virtualenvs/pipelines-3RvCBXzu-py3.8/lib/python3.8/site-packages/prefect/client/client.py", line 687, in register
        res = self.graphql(
      File "/Users/rdh/Library/Caches/pypoetry/virtualenvs/pipelines-3RvCBXzu-py3.8/lib/python3.8/site-packages/prefect/client/client.py", line 226, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': '[{\'extensions\': {\'path\': \'$.variableValues\', \'code\': \'validation-failed\'}, \'message\': "no such type exists in the schema: \'flow_group_insert_input\'"}]', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    z
    n
    4 replies · 3 participants
  • b

    bruno.corucho

    07/01/2020, 2:30 PM
    Hello everyone! TL;DR (and probably newbish question) I'm having some issues using Docker Storage. I did a mockup flow, and it runs fine locally, but after setting the flow.storage = Docker() and try to register it, it cannot find the modules I'm trying to import in my script. I presumed it was something regarding the current sys.path not being set to the project's root directory? (as opposed to running it locally). What I'm doing:
    1. prefect backend server #switch to local server
    2. prefect server start
    3. python register_flow.py (imports * + flow.storage = Docker(...) `flow.register(name)) #in the future would like to set up registry_url = my_aws_ECR
    4. prefect agent start docker #Start a docker agent
    Currently following this documentation.
    j
    1 reply · 2 participants
  • m

    Mac Gréco Péralte Chéry

    07/03/2020, 8:00 PM
    Hello everyone , i have deployed prefect cor on aws Ec2 instance and i have some problem running the flow on it.
    Unexpected error while running flow: AttributeError("'Flow' object has no attribute 'slugs'",)
    Traceback (most recent call last):
      File "/home/ubuntu/.local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 240, in run
        parameters=parameters,
      File "/home/ubuntu/.local/lib/python3.6/site-packages/prefect/engine/cloud/flow_runner.py", line 277, in initialize_run
        tasks = {slug: t for t, slug in self.flow.slugs.items()}
    AttributeError: 'Flow' object has no attribute 'slugs'
    c
    5 replies · 2 participants
  • m

    Mac Gréco Péralte Chéry

    07/04/2020, 1:38 AM
    Hello everyone! I have a flow that i want to schedule with cron string with different parameters for each cron. The problem is, i am building a UI for non programmer users to be able to schedule some reports, So the cron string is not predefined. I was thinking about creating a non scheduled masterflow that will a have as parameter a string containing the cron schedule and within that flow, i would create and register another scheduled childflow with the correct schedule. I would run the masterFlow with graphql query each time a user create a new report schedule. What do you think? Is that possible?
    j
    9 replies · 2 participants
  • j

    Julian

    07/07/2020, 7:45 AM
    Hey guys, we are currently evaluating prefect as a possibility to manage some of our workflows and it looks very promising. 🙂 We use prometheus to monitor all our services, so I want to monitor our flows as well. These run asynchroniously scheduled on local agents with dask executors and stored to s3 (as well as the flow results). Prometheus has a python_client https://github.com/prometheus/client_python, but I can't manage to integrate these metrics to our flows. I tried to include them in a task (say prometheus_monitor_task) as well as on callback, when a flow changes its state, but then I get the error, that the flow can't be pickled. How would you monitor your flows in addition to custom loggers? Is there any support/will there be support in the near future?
    z
    d
    8 replies · 3 participants
  • a

    Austin Sharp

    07/07/2020, 9:31 PM
    Hi everyone. I'm evaluating Prefect for my team today and running into an issue. I am able to get prefect server to run and can access to UI just fine. However, I am unable to get a flow to register on the UI. When I run flow.register() in my python script, I get:
    Result Handler check: OK
    Flow: <http://localhost:8080/flow/724c4570-3a00-4362-9df5-27ea72975538>
    However, the flow doesn't appear on the UI I already have up. If I follow the link, it takes me to a UI but I don't see the flow. I think the issue is that I am using a linux machine that I am SSH'd into. Does anyone have any tips? I have tried changing some of my user config settings in .prefect/backend.toml. I suspect I need to change some of the 'localhost' to '0.0.0.0' but I'm not sure which ones and when I try doing them all the server throws errors when I start the server. Any ideas?
    a
    j
    +1
    31 replies · 4 participants
  • b

    bruno.corucho

    07/08/2020, 9:57 AM
    Hello guys, I'm having issues deploying my flow because it seems like it's trying to append the project_name to the registry_url, when I want the registry_url to be appended with the strdata.name . How can I change this behaviour (it works as expected running it locally) Here's my code:
    strdata.storage = Docker(registry_url=os.environ["REGISTRY_URL"], #I want the flow name appended in here
                             dockerfile="deploy/Dockerfile",
                             image_tag=os.environ["CI_COMMIT_TAG"],
                             image_name=strdata.name)
    # registers the flow to the server/pro d
    strdata.register(project_name=os.environ["PROJECT_NAME"]) #but the log tells me there is no repository name = project_name in our registry_url
    j
    3 replies · 2 participants
  • r

    Robin

    07/12/2020, 7:05 PM
    Hi everybody, we are new to prefect and just started experimenting a little bit. Do we understand it correctly, that both plans,
    Developer
    and
    Team
    only allow 1 concurrent flow to run? Is that also the case when self-managing prefect (local API)? This would be a dealbreaker for us, I guess. Cheers Robin
    j
    3 replies · 2 participants
  • r

    Robin

    07/12/2020, 10:56 PM
    We just got our first
    DaskKubernetesEnvironment
    up and running, but it does not look very parallelized 🤔 Here is the code:
    import prefect
    from prefect import task, Flow
    from prefect.environments.storage import Docker
    from prefect.environments import DaskKubernetesEnvironment, LocalEnvironment
    import time
    import requests
    # from prefect.tasks import kubernetes
    from prefect.engine.executors import DaskExecutor
    @task
    def hello_task():
        time.sleep(10)
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, Kubernetes!")
    @task
    def get_value():
        time.sleep(10)
        return 5
    @task
    def output_value(value):
        print(value)
    @task
    def fetch_google():
        r = requests.get("<http://www.google.com>")
        print(r.content)
    with Flow("hello-k8s", environment=DaskKubernetesEnvironment(min_workers=1, max_workers=5, labels=["yogasev"])) as flow:
        value = get_value()
        output_value(value)
        fetch_google()
        hello_task()
        hello_task()
        hello_task()
        hello_task()
        hello_task()
    flow.storage = Docker(registry_url="<my-repo>")
    flow.register(project_name="eks_test_01")
    Did we miss anything? On kubectl it looked parallelized (5 dask pods created in parallel), but prefects visualization suggests otherwise… 🤷‍♂️
    k
    m
    +3
    9 replies · 6 participants
  • s

    Severin Ryberg [sevberg]

    07/13/2020, 1:10 AM
    Hello. Here is a question regarding the Self-hosted server. I am able to register & execute flows from a UI (and others, ofc) server hosted on AWS. But as far as I can tell, so can everyone else on the planet. What is the intended way to protect your self-hosted server against misuse? Is there something akin to the cloud API tokens for the OpenSource UI?
    g
    5 replies · 2 participants
  • b

    Bivu Raj

    07/14/2020, 6:10 PM
    First of all, hello to everyone. Secondly, how do i start prefect server on a remote server like 192.168.4.67:8080 rather than localhost? Also, how do i register a flow to that particular server rather than locally? thanks
    z
    8 replies · 2 participants
  • e

    EdCode

    07/16/2020, 5:06 PM
    to be clear, the server and UI needs docker on the machine to run correct?
    👀 1
    l
    3 replies · 2 participants
  • b

    Bivu Raj

    07/16/2020, 6:21 PM
    It might be a strange question, but what is the relationship between docker storage and dask cluster? What i mean is, even if have all the dependiencies satisfied via docker storage, how will the workes in dask resolve that? Does cloud pickle handle that?
    👀 1
    l
    r
    2 replies · 3 participants
  • m

    Mac Gréco Péralte Chéry

    07/16/2020, 8:09 PM
    Hello Everyone, how to access a task return value inside a flow state handler? I am using Prefect Core 0.12.1 installed on a singe EC2 instance
    def notify_on_success(flow: Flow, old_state: State, new_state: State)->State:
        if(new_state.is_successful()):
    
            getSmsTextTask=flow.get_tasks(name="Retrieve SMS Text")[0]
            smsMessageLocation=new_state.result[getSmsTextTask]._result.location
            smsMessages3Result=flow.result.read(smsMessageLocation)
            smsMessage=smsMessages3Result.value
            scheduledSmsIdTask = flow.get_tasks(name="scheduleSmsId")[0]
            scheduleSmsId = new_state.result[scheduledSmsIdTask]._result.value
            link="<https://myurl/>"+scheduleSmsId
            mailText=build_sms_success_email_notification_text(sms_text=smsMessage,call_to_action_link=link)
            send_email(["myemail"],message=mailText,subject="SMS SENDING COMPLETE")
        return
    (I have to mention that scheduleSmsId is a Parameter) This result handler works fine in my local machine but once i test it on my EC2 instance i get this error:
    Exception raised while calling state handlers: KeyError(<Task: Retrieve SMS Text>,)
    c
    5 replies · 2 participants
  • g

    Greg Desmarais

    07/17/2020, 1:56 PM
    Just noticed this: https://aws.amazon.com/about-aws/whats-new/2020/07/docker-and-aws-collaborate-to-help-deploy-applications-to-amazon-ecs-on-aws-fargate/ and was wondering about giving a shot to deploying the core compose yaml that gets invoked through
    prefect server start
    . Any thoughts on that, @prefect-people?
    Now, customers can use Docker Desktop and Docker Compose to deploy containers on Amazon Elastic Container Service (ECS) using the AWS Fargate launch type. After customers build and test containers locally using Docker Desktop and Docker Compose, they can now deploy them to Fargate on ECS through the same CLI.
    n
    1 reply · 2 participants
  • g

    Greg Desmarais

    07/17/2020, 9:50 PM
    I'm feeling a bit lost and embarrassed to be lost, but have a question about what seems like a simple deployment. Here is what I'm trying to do:
    j
    r
    19 replies · 3 participants
  • g

    Greg Desmarais

    07/18/2020, 3:24 AM
    How do you stop a prefect server? Looking at the
    prefect.server.cli.server
    , there is a click option for start, but none for stop. Do I have to
    docker kill
    all the processes?
    n
    4 replies · 2 participants
  • g

    Greg Desmarais

    07/18/2020, 3:44 PM
    How do I stop all these log messages on prefect server:
    hasura_1     | {"type":"http-log","timestamp":"2020-07-18T15:43:04.599+0000","level":"info","detail":{"operation":{"query_execution_time":3.814376e-3,"user_vars":{"x-hasura-role":"admin"},"request_id":"c6784bfe-20f8-4863-99da-1fd1a69c9eb2","response_size":98,"request_read_time":4.539e-6},"http_info":{"status":200,"http_version":"HTTP/1.1","url":"/v1alpha1/graphql","ip":"172.29.0.6","method":"POST","content_encoding":"gzip"}}}
    I have my log level at the default (info) im my config.toml, and these seem to be debug level. I could raise the log level higher, but I want to see info level. These messages appear over and over and pollute my logs...
    j
    2 replies · 2 participants
  • g

    Greg Desmarais

    07/20/2020, 5:59 AM
    I am trying to register a flow in server with a FargateTaskEnvironment and DaskExecutor, and I can successfully register the flow with my server instance. I can also, through the UI, run the flow with a parameter. The agent attached to the prefect server picks up the run and flow, and starts to process it. However it appears that the task definition arguments aren't being passed through the flow serialization process during the registration. Code follows and agent output. What am I doing wrong?
    m
    m
    10 replies · 3 participants
  • g

    Greg Desmarais

    07/20/2020, 6:19 AM
    Once I register a flow with my server from a client program, how can I invoke the flow in a synchronous way? I want to pass in a parameter and get the results from the flow.
  • c

    CJ

    07/20/2020, 2:08 PM
    Hey, I've got a question for y'all. I recently upgraded from prefect 0.8.x to 0.12.4, and noticed a behavior change related to the docker agent. Previously, on my local machine, I would start prefect server and then start the docker agent with
    prefect agent start --volume /path/to/local/.config.toml:/root/.prefect/config.toml docker
    . Then, when I registered and ran my flow, it would have access to variables I had set in my local
    .config.toml
    , especially secrets. After upgrading, I noticed this was no longer the case. I would start the agent in the same way, but
    context.secrets
    would be empty, and the values returned from
    prefect config
    look like the factory defaults (eg
    use_local_secrets=false
    ).
  • c

    CJ

    07/20/2020, 2:09 PM
    I'm on OSX Catalina, docker 19.03.8, python 3.7.6
  • c

    CJ

    07/20/2020, 2:10 PM
    And prefect 0.12.4, as mentioned.
  • c

    CJ

    07/20/2020, 2:33 PM
    After some investigation, it looks like I solved my own problem. When registering my flow, this was getting set `ENV PREFECT__USER_CONFIG_PATH=/opt/prefect/config.toml`; after updating the
    --volume
    argument prefect finds my config file.
    :upvote: 1
  • g

    Greg Desmarais

    07/20/2020, 3:40 PM
    Hey all - I've been struggling a bit, and have some decently long threads. I'll try a question that is hopefully to the point. I have a local bit of code that creates a flow for a fargate environment. I can submit that flow to the prefect server (on an EC2), and I have an agent that picks up that flow (after a manual run in the UI). The agent ends up hanging after
    Completed flow run submission
    . No Fargate cluster is ever created. What needs to happen on the agent (or prefect server?) to see that the next steps are takan (e.g. cluster created, work started)?
    k
    c
    30 replies · 3 participants
  • g

    Greg Desmarais

    07/21/2020, 2:08 PM
    It looks like the docker image
    prefecthq/prefect:all_extras
    is running python 3.8 - this seems to require all clients to move to 3.8, as cloudpickle might be sensitive to the version difference. Am I right on that? Is the position of the prefect team that everything needs to be on py 3.8? I might not have rtfm enough...
    j
    56 replies · 2 participants
  • b

    Burkhard Hoeckendorf

    07/21/2020, 8:13 PM
    Hi everyone, After installation (step by step below), the dashboard shows up, but under "API Status" notes "Network error: Failed to fetch" and asks whether the graphql_url field in the config file was set. After going through the docs, I'm not sure under which TOML section this field should be, let alone the correct value. Furthermore, not sure this is really the underlying issue, because there are some "connection refused" errors about Postgres in the server logs upon startup. First time user, grateful for any leads.
    z
    n
    57 replies · 3 participants
  • g

    Greg Desmarais

    07/23/2020, 3:31 PM
    Has anyone seen an issue with Dask workers having errors around serialization? I'm submitting a flow from a client (simple hello world) -> prefect server on an ec2 instance -> prefect agent -> Dask scheduler -> Dask workers. Everything is flowing allthe way to the worker, but then I am seeing a
    TypeError: can not serialize 'Variable' object
    d
    l
    21 replies · 3 participants
Powered by Linen
Title
g

Greg Desmarais

07/23/2020, 3:31 PM
Has anyone seen an issue with Dask workers having errors around serialization? I'm submitting a flow from a client (simple hello world) -> prefect server on an ec2 instance -> prefect agent -> Dask scheduler -> Dask workers. Everything is flowing allthe way to the worker, but then I am seeing a
TypeError: can not serialize 'Variable' object
==> prefect_logs/dask_worker.log <==
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 25, in dumps
    small_header, small_payload = dumps_msgpack(msg)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 163, in dumps_msgpack
    payload = msgpack.dumps(msg, use_bin_type=True)
  File "/opt/conda/lib/python3.7/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 283, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Variable' object
distributed.comm.utils - INFO - Unserializable Message: {'op': 'variable_get', 'timeout': 0, 'name': <distributed.variable.Variable object at 0x7fec9eafaef0>, 'client': 'Client-worker-365168fe-ccf9-11ea-800b-0242ac110004', 'reply': True}
distributed.comm.utils - ERROR - can not serialize 'Variable' object
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 35, in _to_frames
    msg, serializers=serializers, on_error=on_error, context=context
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 25, in dumps
    small_header, small_payload = dumps_msgpack(msg)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 163, in dumps_msgpack
    payload = msgpack.dumps(msg, use_bin_type=True)
  File "/opt/conda/lib/python3.7/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 286, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 225, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 283, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Variable' object
d

Dylan

07/23/2020, 5:12 PM
Hey @Greg Desmarais! Would you mind sharing your flow?
I will see if I can help or direct you to our resident Dask expert who almost certainly can 😄
l

Luke

07/23/2020, 7:35 PM
Unserializable Message: {'op': 'variable_get', 'timeout': 0, 'name': <distributed.variable.Variable object at 0x7fec9eafaef0>, 'client': 'Client-worker-365168fe-ccf9-11ea-800b-0242ac110004', 'reply': True}
that part reveals that you try to send not serializable object distributed.variable.Variable as a json
maybe just extract a string representation?
g

Greg Desmarais

07/23/2020, 7:42 PM
Flow on the way...
@task
def say_hello():
    return f'done'

with Flow("Dask ECS Test 2") as flow:
    say_hello()
Simple enough?
😉
Then,
bucket = 'celsius-temp-data'
key = 'datasciences/prefect_flows/dask_ecs_flow_test_2'
flow.storage = S3(bucket, key=key)
labels = ['size-small']
executor = DaskExecutor(address=f'{PREFECT_COMPOSE_HOST}:38786') 
flow.environment = LocalEnvironment(executor=executor, labels=labels)
flow_id = flow.register(labels=labels)
p_client = Client()
ret = p_client.create_flow_run(flow_id=flow_id)
print(f'Created flow run: {ret}')
👀 1
I had some fargate cruft from previous efforts...
Also - the stack trace above isn't from my code - it is inside the dask stuff. I don't sall any of those ops directly.
d

Dylan

07/23/2020, 7:51 PM
https://github.com/PrefectHQ/prefect/pull/3009
Looks like this was fixed recently! What version of Prefect are you using?
g

Greg Desmarais

07/23/2020, 7:55 PM
You are probably going to laugh a bit. In between the writing of this original posting and now, my pycharm told me there was an update to prefect - I was on 0.12.4. I went through the change logs and found that exact issue. I am about to kick off a re-build of all my images and the like to go to 0.12.5.
In reading through the fix, I'm not sure how it addresses the issue, but there is a lot of magic going on, and it certainly is in the right arena. I'll upgrade then post back here.
d

Dylan

07/23/2020, 7:56 PM
Great! 😄
g

Greg Desmarais

07/23/2020, 7:58 PM
I was pretty sure I had searched the issues on github, but maybe not. It also looks like this pr might not be tied to an issue?
Oh - and the pr refers to an issue reported on slack...which was me 😉 :snake eating tail:
😂 2
Ok - after upgrading all over the place, you can consider the case closed! Got a nice setup working now...Thanks!
d

Dylan

07/23/2020, 8:38 PM
You’re welcome! Glad it all worked out 😄
View count: 7