https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Julien Allard

    08/20/2020, 1:51 PM
    Hello! I'm having some trouble with running a flow on a single node dask cluster on Kubernetes. When I run the flow locally, the flow works as expected, but when I run it on Kubernetes , I get the following error:
    Unexpected error: TypeError("Cannot map over unsubscriptable object of type <class 'NoneType'>: None...")
    . The problems seems to come from a mapped task that outputs a pandas dataframe. Anyone has any ideas on how to fix this problem or how to debug it further?
    d
    • 2
    • 18
  • r

    Richard Hughes

    08/20/2020, 2:02 PM
    Good morning, whats the easiest way start a flow from python that is already registered?
    d
    • 2
    • 23
  • r

    Richard Hughes

    08/20/2020, 2:14 PM
    I have a flow that appears to be running and then it fails with this message, however, I don't think it is failing.
    No heartbeat detected from the remote task; marking the run as failed.
    d
    • 2
    • 16
  • m

    Marwan Sarieddine

    08/20/2020, 2:23 PM
    Hi folks, I think I am facing a prefect bug when I enable
    PREFECT_FLOWS_CHECKPOINTING
    and I use a
    pd.DataFrame
    as a parameter - I am using prefect
    v0.13.3
    export PREFECT_FLOWS_CHECKPOINTING=true
    
    
    In [1]: import pandas as pd
    In [2]: from prefect import Parameter, task, Flow
    In [3]: @task
       ...: def simple(df):
       ...:     return df
       ...: 
    In [4]: with Flow("test") as flow:
       ...:     df = Parameter(name="df")
       ...:     simple(df)
       ...: flow.run(df=pd.DataFrame())
    [2020-08-20 14:20:38] ERROR - prefect.TaskRunner | Unexpected error: TypeError('Object of type DataFrame is not JSON serializable')
    Traceback (most recent call last):
      File "~/.pyenv/versions/3.7.7/envs/infima/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "~/.pyenv/versions/3.7.7/envs/infima/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 838, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs,)
      File "~/.pyenv/versions/3.7.7/envs/infima/lib/python3.7/site-packages/prefect/engine/results/prefect_result.py", line 60, in write
        new.location = self.serializer.serialize(new.value).decode("utf-8")
      File "~/.pyenv/versions/3.7.7/envs/infima/lib/python3.7/site-packages/prefect/engine/serializers.py", line 100, in serialize
        return json.dumps(value).encode()
      File "~/.pyenv/versions/3.7.7/lib/python3.7/json/__init__.py", line 231, in dumps
        return _default_encoder.encode(obj)
      File "~/.pyenv/versions/3.7.7/lib/python3.7/json/encoder.py", line 199, in encode
        chunks = self.iterencode(o, _one_shot=True)
      File "~/.pyenv/versions/3.7.7/lib/python3.7/json/encoder.py", line 257, in iterencode
        return _iterencode(o, 0)
      File "~/.pyenv/versions/3.7.7/lib/python3.7/json/encoder.py", line 179, in default
        raise TypeError(f'Object of type {o.__class__.__name__} '
    TypeError: Object of type DataFrame is not JSON serializable
    Out[4]: <Failed: "Some reference tasks failed.">
    when I disable
    PREFECT_FLOWS_CHECKPOINTING
    flow runs successfully
    export PREFECT_FLOWS_CHECKPOINTING=false
    
    In [1]: import pandas as pd
    In [2]: from prefect import Parameter, task, Flow
    In [3]: @task
       ...: def simple(df):
       ...:     return df
       ...: 
    In [4]: with Flow("test") as flow:
       ...:     df = Parameter(name="df")
       ...:     simple(df)
       ...: flow.run(df=pd.DataFrame())
    Out[4]: <Success: "All reference tasks succeeded.">
    d
    • 2
    • 13
  • w

    William Smith

    08/20/2020, 2:29 PM
    Hi all, are there any examples where a task can be paused awaiting some sort of manual approval?
    d
    • 2
    • 65
  • w

    William Smith

    08/20/2020, 3:10 PM
    Using prefect cloud, have a very simply flow running. Sometimes my flows just don't run and are in a scheduled state, the agent is running it is waiting for flow runs. It seems to work fine the first couple times, any ideas?
    d
    • 2
    • 10
  • b

    Brian Mesick

    08/20/2020, 3:20 PM
    Follow ups from yesterday’s question, since I still can’t seem to make it work. • Is there anything special needed to make a
    PrefectResult
    work for a task, presuming I’m using Docker storage for the flow? • Does adding a
    PrefectResult
    to a task somehow transparently change the labels of a Flow?
    d
    • 2
    • 1
  • a

    Adam

    08/20/2020, 5:24 PM
    Hi everyone, hope your Thursday is going well. I have a flow that stuck on in a “Scheduled” state on Prefect Cloud. There are no other flows scheduled or running currently and the agent is alive and logging “Waiting for flow runs”. How can I debug why this flow isn’t being submitted? The flow run id is 80230641-deec-466f-b12c-cea1ef8704ae
    d
    • 2
    • 6
  • r

    Ruben Silva

    08/20/2020, 9:38 PM
    Hello :) I'm wondering if someone can explain to me how does prefect core execute mapping with the DaskExecuter. In the Docs, it's said that Prefect automatically creates a copy of the task for each element of its input data. However, looking at the code I'm not understanding where that is being done, and when those new tasks are being submitted to Dask.
    d
    • 2
    • 2
  • j

    Jimmy Le

    08/20/2020, 10:18 PM
    Hey folks! I'm working on a personal project and I'm hoping to use Prefect to automate some of the workflows 🙂
    👋 5
    j
    • 2
    • 2
  • e

    Eric

    08/21/2020, 3:01 AM
    Hi all, when I registered a flow, the flow-run will be created automatically depends on the given schedule with python code. I wonder are there any ways to mark a flow-run "skipped" when the task found the connection to RDB is temporary dropped(in our use case, we need "skipped" instead of "failed"). That is, I want to change the flow-run mark in Task layer. Is that possible? I've found a way is "`client.set_flow_run_state(flow_run_id="ce7ce40b-a21b-4273-b39f-6ebea071a76e", state=skip_state)`", but the flow-run is created by scheduler, I could not find the flow-run id automatically. Thanks!!
    n
    • 2
    • 12
  • h

    Howard Cornwell

    08/21/2020, 11:00 AM
    Hi all. Is it possible to migrate from 0.11.5 to 0.13.3? I get the following error, so I assume it’s not possible?
    graphql_1   |                                                                                  
    graphql_1   | Running Alembic migrations...                                                    
    graphql_1   | INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
    graphql_1   | INFO  [alembic.runtime.migration] Will assume transactional DDL.
    graphql_1   |                                                                                  
    graphql_1   | Could not upgrade the database!
    graphql_1   | Error: Can't locate revision identified by '7e65dadba625'
    n
    l
    • 3
    • 7
  • m

    Michael Ludwig

    08/21/2020, 11:58 AM
    We just got this error in our logs? Anybody knows what this means?
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 0c1f4327-19fe-4522-8f8c-ad4c2fb7dc45: provided a running state but associated flow run c4d445b2-8d4e-4863-9ba5-b89b5addeba3 is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/opt/app/.venv/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 112, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/opt/app/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 1295, in set_task_run_state
        result = self.graphql(
      File "/opt/app/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 287, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 0c1f4327-19fe-4522-8f8c-ad4c2fb7dc45: provided a running state but associated flow run c4d445b2-8d4e-4863-9ba5-b89b5addeba3 is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    n
    • 2
    • 6
  • m

    Miecio

    08/21/2020, 1:13 PM
    Hello everyone! I'm completely new to prefect (in fact I discover it couple days ago) and I wonder if my use case fits in what prefect can deliver. I need to run some kind of reporting pipeline working similarly to ETL schema. I need to extract ~1M of records from postgresql than for each of this record I need to query redis database, run some processing and save results to db. I manage to POC it as prefect flow but I have some questions. Does prefect support some concurrency for tasks similar to MapReduce? I can imagine that my data extracted from psql by extraction task can be splited and processed separately on multiple agents.
    i
    n
    • 3
    • 7
  • m

    Michael Ludwig

    08/21/2020, 1:57 PM
    If we start to use Prefect
    Results
    like
    S3Result
    do I need to use them for all tasks to be able to restart failed flows? Not totally clear to me
    n
    • 2
    • 3
  • m

    Michael Reeves

    08/21/2020, 3:43 PM
    If I want my flow to run a series of tasks in an exact order ignoring data dependencies, is there an easier way to do that other than manually adding an edge between all the tasks?
    n
    • 2
    • 11
  • j

    Jared

    08/21/2020, 5:39 PM
    Does the
    late_work_seconds
    setting shown in the [prefect server config](https://github.com/PrefectHQ/server/blob/master/src/prefect_server/config.toml) actually do anything? As far as I can tell there's nothing referencing
    config.services.sla
    in the open-source server code, but this would be really useful to have access to
    c
    • 2
    • 1
  • b

    bral

    08/21/2020, 9:52 PM
    Good day ! noticed that while executing a flow, the delay in transitioning from an upstream task to a downstream task is about 10 seconds. Is there some way to lessen this delay? Deployed local prefect server. Server resources are not used at all. a local agent is used, with an executor on the dask cluster.
    n
    • 2
    • 1
  • m

    Marwan Sarieddine

    08/21/2020, 10:38 PM
    Hi folks, moving my issue post to the prefect-community channel. I am trying the S3 flow storage option - using a kubernetes agent and a DaskKubernetesEnvironment execution environment. I was facing aws s3 authentication issues but thanks to Josh - I got them resolved. I am now facing a new issue which is being raised by the prefect-job pod before the flow ever gets to a running state ... Here is the error I am getting:
    [5:57 PM] $ kubectl logs pod/prefect-job-e74114b4-26n6v 
    [2020-08-21 21:55:08] INFO - prefect.S3 | Downloading simple-flow-8/2020-08-21t21-54-33-484226-00-00 from infima-etl-flows
    Traceback (most recent call last):
      File "/usr/local/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 80, in cloud_flow
        raise exc
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 69, in cloud_flow
        flow = storage.get_flow(storage.flows[flow_data.name])
      File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 101, in get_flow
        return cloudpickle.loads(output)
    TypeError: an integer is required (got type bytes)
    I am wondering if anyone else faced something similar
    a
    • 2
    • 3
  • m

    Maxwell Dylla

    08/21/2020, 11:38 PM
    Hi Community! Is there any established best practices for developing a library of prefect flows? One of the struggles I have is instituting standards for a CI process for a library of flows. Part of the challenge is that code for flows represents a delayed execution of tasks, so it is difficult to test flows without deploying them to a specific environment
    c
    • 2
    • 2
  • k

    Kyle McEntush

    08/22/2020, 12:07 AM
    Is there a way to run functions that are decorated with @task outside of prefect? I'm trying to rapidly develop individual task stages but I'd prefer to not have two functions defined. Currently, calling a function decorated with @task in a jupyter notebook throws a context error.
    c
    m
    • 3
    • 26
  • k

    Kyle McEntush

    08/22/2020, 1:41 AM
    Next newbie question: What's the best way to apply a task over a list? The task also takes additional static arguments that don't change for each element. I tried to use map but then it tried to access indices of the static args
    c
    n
    • 3
    • 5
  • k

    Kyle McEntush

    08/22/2020, 4:38 PM
    Is it possible to use the imperative API to have child tasks that live in multiple files? Ex: File 1 makes a flow that calls a task in File 2. This works so long as the task in File 2 doesn't call another task within File 2. If I try to do that, I get a
    ValueError: Could not infer an active Flow context.
    c
    • 2
    • 9
  • j

    Jack Sundberg

    08/22/2020, 6:27 PM
    Hey everyone, I apologize in advance for how much I wrote here... I realize that I'm attempting to use Prefect in a manner that it's not originally designed for, so I have a longer explanation of why I'm doing this below. While most users are coming from AirFlow, I'm trying to transition from FireWorks (https://github.com/materialsproject/fireworks). So here's my main question: Can I setup Prefect with many Agents that make a single task request and then terminate? In addition, when an agent is started and no tasks are ready for execution, the Agent will terminate. If this is possible, what would you estimate for the overhead per task? I would expect the main overhead comes from Agent connection to the Prefect Cloud/Server. This may sound like an inefficient use of Prefect, but it is intentional. Fireworks is designed with this setup in mind and is thus limited to 6/tasks per second -- this is acceptable because the average task submitted via FireWorks is on the hour timescale. I'm a materials chemistry researcher at UNC, where I must submit tasks as individual SLURM jobs, each with their own time and memory restrictions. These tasks (DFT energy calculations) range drastically in their required resources (one could be <1GB while another >200GB memory), launch in parallel using mpirun, and require their own isolated directory. I could use a single Executor like Dask, which supports queues like SLURM, but this would cause a number of problems for me. Dask holds onto worker resources indefinitely, which research-cluster admins don't want. If there are no tasks ready to execute, the cluster's resources should be released. Dask (as far as I'm aware) does not allow for setting time/memory limits on a per-task basis. And I'm unsure how Dask will handle tasks that execute via mpirun and also implement isolated directories per task. Fireworks was made by the materials chemistry community specifically for this. In the setup, you constantly submit SLURM jobs. Once a SLURM job makes it through the queue, the job itself simply starts an Agent, runs a single task, and then closes. This submission architecture is something I would like to replicate with Prefect. Fireworks has a number of limitations that I think Prefect can fix - such as their WorkFlow classes and meta-database (MongoDB) - so I'm looking into switching over. Again, sorry for the long write! Thanks for reading through, and let me know if you think a multi-Agent approach is possible with Prefect. -Jack
    c
    • 2
    • 3
  • b

    bral

    08/22/2020, 8:57 PM
    Hi folks! How can i start prefect server? Only using prefect start ? I noticed that there is docker-compose.yml in package path. And which is right way to stop server? So, for example in airflow, you stored only docker images and orchestrateting them using docker/k8s. In this case , your system is clean. But for starting prefect server you need to install python, prefect and some nessesary packages that "pollute"system.
    j
    • 2
    • 2
  • x

    x062Wyhdolq

    08/23/2020, 10:38 AM
    Hi guys I have an issue with a flow. In the transform function if certain data is not created I want to retry the fetch function after some time? How can I do it?
    c
    • 2
    • 1
  • a

    Avi A

    08/23/2020, 11:13 AM
    Hey, I’m using a
    LocalEnvironment
    /
    DaskCluster
    to run my flow on a local cluster. Where does prefect report the LocalCluster provisioning logs? I’m interested in what port the UI is served (port 8787 is already taken on the server)
    j
    j
    • 3
    • 11
  • k

    Kyle McEntush

    08/24/2020, 1:14 AM
    How can I raise an error within a task that executes a new task?
    • 1
    • 1
  • j

    Jacob Blanco

    08/24/2020, 3:11 AM
    Is there any way to setup an account wide flow error alerting in Prefect Cloud? I know this can be done by flow but it's a bit involved to do this for every flow.
    j
    • 2
    • 6
  • m

    Mikael

    08/24/2020, 7:38 AM
    Hi, I am having problems running the local DaskExecutor. When running the standard local executor all works fine, but when running a flow with
    executor=DaskExecutor()
    i get
    RuntimeError:
    An attempt has been made to start a new process before the
    current process has finished its bootstrapping phase.
    . Anyone having the same issue?
    k
    • 2
    • 2
Powered by Linen
Title
m

Mikael

08/24/2020, 7:38 AM
Hi, I am having problems running the local DaskExecutor. When running the standard local executor all works fine, but when running a flow with
executor=DaskExecutor()
i get
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
. Anyone having the same issue?
k

Kyle Moon-Wright

08/24/2020, 3:25 PM
Hello @Mikael, I haven't personally encountered this error, but there is a Github issue that was solved with a similar error related to Dask that may be helpful. Happy to dive deeper with you on this.
m

Mikael

08/24/2020, 9:27 PM
Thanks for the replay. That actually solves it. Strange, but great I suppose :-)
View count: 1