https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jonas Hanfland

    10/06/2020, 4:43 PM
    Hey guys, I am getting an exception when trying to run a
    BigQueryTask
    . This is the task I am trying to run:
    BigQueryTask(
        name="Get all statement IDs",
        query="""
            SELECT * FROM temporary_tables.statements
        """,
        max_retries=3,
        retry_delay=timedelta(seconds=30),
        to_dataframe=True,
    )
    and it throws this exception: (see thread) All the times this task has been run in the past it passed just fine. Does anybody have an idea what might be the issue? Thanks very much in advance!
    c
    • 2
    • 6
  • a

    ale

    10/06/2020, 4:53 PM
    Hey folks, anyone experiences issues when creating task definitions on Fargate. I mean, task definitions are created, but at task setup I get the following error
    c
    s
    m
    • 4
    • 52
  • k

    Krzysztof Nawara

    10/06/2020, 5:39 PM
    Hey, nice to meet everyone 🙂 I've got question about caching in Prefect - I'm trying to build a ML pipeline which can take advantage of caching, but can also selectively recompute some of the nodes (e.g. because logic changed, or new node was added). For that I was planning to keep track of version for each task and make it part of the cache key/target location. The bit I'm struggling the most with is invalidation of downstream tasks - if the task version get's bumped, downstreams cannot be read from cache. So far I've looked at _cache_key_ and _cache_validator_. If validators could access states of current execution of upstream tasks, they would be able to return False is state of at least one of the upstreams is different from Cached. Really neat solution, but from what I'm seeing cache_validator only get's access to previous state of current tasks and values of inputs, but not their states. I also looked at Result and target, but they seem even more restrictive, at least until PIN 16 is fully implemented. The only workaround I can think of is making version of each upstream task part of the cache key for the task. This is going to get tedious real fast unless there is a way to propagate that version through the pipeline and generate the cache keys/targets. But this feels a lot like a hack, so before attempting that I wanted to ask here if experts here can think of a better options. Thank you very much in advance, Chris
    j
    j
    • 3
    • 15
  • d

    Dolor Oculus

    10/06/2020, 6:05 PM
    I feel like I've seen this in the docs, but I can't find it now 😕 If I want to configure which server my code uses (ie dev vs uat vs prod), I am currently doing it by setting
    PREFECT__SERVER__ENDPOINT
    (in bash, before invoking Python). Are their other ways of configuring this? (ie via constructor arguments at runtime)? ty!
    j
    • 2
    • 1
  • m

    Marwan Sarieddine

    10/06/2020, 6:36 PM
    Hi folks, we occasionally face this error when running our flow on EKS with a static Dask cluster setup (i.e. using a LocalEnvironment with a DaskExecutor) and a kubernetes agent: (please see the thread for more details)
    j
    j
    • 3
    • 6
  • m

    Mike Fransesco

    10/06/2020, 7:05 PM
    Hello 1st message! Our company is looking at prefect and I am enquiring if there is any active support for Docker Swarm vs Kuberneties ?
    👋 2
    j
    • 2
    • 13
  • c

    Chirag

    10/07/2020, 1:14 PM
    Hi Everyone, I have a doubt regarding the metadata storage of task results. Is there a way to delete the task metadata once the flow is completed? Currently, I am storing the task results in S3.
    j
    • 2
    • 2
  • s

    Sven Teresniak

    10/07/2020, 2:39 PM
    i'm using a small dask cluster, server flavor, v0.13.9, localagent. After upgrade to v0.13.10 I cannot start a flow. The agent prints
    [2020-10-07 14:26:37,427] ERROR - agent1 | 400 Client Error: Bad Request for url: <http://localhost:4200/>
    But it is still possible to register flows as usual. I seem them in the UI, too. I switched back to v0.13.9 and the error is gone. I only changed the Prefect version -- no configuration and no flow code. I'd like to provide you with more information about this bug but the agent is ignoring
    $PREFECT__LOGGING__LEVEL=DEBUG
    it seems. Is there any change in the enviroment for the agent? Why can I register flows but not start them?
    j
    r
    • 3
    • 21
  • m

    Matias Godoy

    10/07/2020, 3:47 PM
    Hi guys! I wanted to ask a question about *flow labels*; Right now I have two environments (DEV and PROD) and I have 2 different Kubernetes Agents (one with label
    dev
    and the other with label
    prod
    ), which also means that I'm using Docker Storage for the flows. Inside my flow I set the labels like this:
    flow.environment = LocalEnvironment(labels=['dev'])
    The thing is that I'd like the flow label to be "dynamic" in the sense that if I register that flow from a development environment, the label is automatically set to
    dev
    , but if I want to register it for production, I'd like the flow label to be
    prod
    . For now I register the flows from my own laptop, which automatically generates the Docker image and uploads it to my ECR for the Agents to use. Maybe I'm wrong with this, but I guess that if I set an environment variable in my local computer, and make the flow code something like
    flow.environment = LocalEnvironment(labels=[my_env_var])
    it will not work, because that variable will not exist in the container that runs the flow. Is this correct? Another alternative would be to be able to set flow labels when registering the flow using the CLI. Something like
    prefect register flow --file my_flow.py --name My-Flow --label dev
    , but according to the CLI help,
    --label
    is not a valid parameter when registering flows. Do you have any recommendations for setting flow labels dynamically depending on the environment they where registered? Thanks!
    j
    • 2
    • 3
  • t

    Thomas La Piana

    10/07/2020, 4:22 PM
    does prefect have the "sensor" concept from airflow? triggering a flow based on a change in an external object's state?
    j
    • 2
    • 2
  • j

    josh

    10/07/2020, 4:27 PM
    Hey team, Prefect version 
    0.13.10
     has been released and here are a few notable changes: 📝 Added utilities for dynamically naming flow and task runs 🕵️‍♀️ Enabled agent registration for Server 🌐 Allowed for specifying an import path for Flows in Local Storage A big thank you to our contributors who helped out with this release! Full changelog:
    Untitled
    :marvin: 5
    🚀 5
  • k

    Konstantinos

    10/07/2020, 6:18 PM
    Hi guys, need some help with notifications upon failures: I have added a custon function that sends an email but it would very cool if the function could know the arguments that the task was run with (following instructions here. From what I have seen so far the task does not store its running args at the level of the object and the local task runner does not pass it down to the handler. The only solution i can see is storing the arguments of the run function when that is called which introduces statefulness. Am i missing sth ?
    j
    • 2
    • 3
  • n

    Nakul Gowdra

    10/08/2020, 1:58 AM
    Hi Folks, I was wondering if anyone has used prefect to schedule spark jobs via EMR. This is the flow I am working on: but i want a way for step 1 ( create EMR cluster) to start when i rerun failed task ( step 2). does anyone have ideas other than involving step 1 from step 2. Thanks in advance .
    m
    • 2
    • 2
  • r

    Rob Fowler

    10/08/2020, 4:41 AM
    is anyone in Sydney? I might present my devops automation use-case at sypy when it starts back up.
    b
    • 2
    • 1
  • n

    Nejc Vesel

    10/08/2020, 9:11 AM
    Hi everyone, I have a question regarding writing tests. I have a flow that uses LocalResult to store some data, however by default it uses ~/.prefect folder, I want to test the execution, but do not want the results when running the test to be stored there, but to some dedicated test folder. How can I change the config.home_dir during the test? I've tried it in such a way
    from prefect.utilities.configuration import set_temporary_config
    
    def test_prefect(input_folder, output_folder):
        with set_temporary_config({"home_dir": output_folder}):  
            exec_config = ExecutionConfig(dataset_location=os.path.join(get_path('parquet'), 'pq-dataset'),
                                          save_location=os.path.join(output_folder, 'mean_ndvi_results'))
    
            status = flow.run(executor=LocalExecutor(), flow_config=asdict(exec_config))
    
            assert status.is_successful()
    (took a look at some tests in the prefect source), but it doesnt work at still writes LocalResult results to ~/.prefect
    c
    • 2
    • 2
  • n

    Nuno Silva

    10/08/2020, 10:53 AM
    Hi @josh or anyone that knows. Great to see https://github.com/PrefectHQ/prefect/pull/3420 Question: I'm trying to use it but at the moment in the prefect UI (I'm using backend server) I still see
    table_copy (Mapped Child 5)
    instead of e.g.
    table_copy_visits
    . I'm running something like
    task_run_name=lambda **kwargs: f"table_copy_{kwargs['name']}"
    , where
    name
    is a function parameter. The flow gets registered successfully with
    0.13.10
    . Can it be that prefect UI is not querying yet
    task_run_name
    ? Thanks
    j
    • 2
    • 3
  • a

    Anish Chhaparwal

    10/08/2020, 1:59 PM
    Hey im trying to run a multiprocessor inside a prefect task function that makes a call to a function (not defined as task) outside the task. It runs fine on my local machine but gives PicklingError when running on local agent (refer image)
    🧐 1
    d
    • 2
    • 42
  • i

    Iain Dillingham

    10/08/2020, 2:18 PM
    Hello #prefect-community 👋 Could anyone point to any continuous integration documentation/best practices for Prefect? We have a setup where we're using GitLab runners inside Kubernetes. We'd like to push to master and then have these register the flows/build the Docker storage. However, we don't want to run these in privileged mode. Thanks in advance!
    👋 2
    d
    a
    • 3
    • 6
  • a

    Alfie

    10/08/2020, 2:53 PM
    hi Team, I’ve a question about checkpoint. Now I run flows with local environment and they generated a lot of result files like “prefect-result-2020-10-05t19-44-09-835173-00-00”, I want to get rid of them. So if I disable the checkpoint, prefect will not write the result to the disk, and my tasks can still success, right? Thanks
    d
    • 2
    • 17
  • t

    tom

    10/08/2020, 5:42 PM
    Hi! I have a question regarding processing a large amount of objects. I’d have a task that keeps yielding object IDs, and those objects should be processed by a per-object task (in parallel). Since there are many objects to be processed (potentially 1MM), object processing should start while object IDs are still being fetched: I wouldn’t want to have an in-memory list of all the object IDs. See below for pseudo code. Is it possible to model this with prefect? If so, how? Thanks for your help!
    def fetch_object_ids():
      # Long running task with many object_ids returning.
      cursor = None
      while True:
        # Keep fetching and yielding object IDs until we're exhausted.
        obj_ids, cursor = external_request_to_fetch_object_ids(cursor)
        for obj_id in obj_ids:
          yield obj_id
        if cursor is None:
          break
    
    def process_object(object_id):
      do_something_with_object(object_id)
    
    with Flow("Process Objects" as flow):
      object_ids = fetch_object_ids()  # I don't want to wait here / keep this list in memory
      process_object.map(object_ids)
      send_finished_email_to_user()  # Should depend on all objects processed.
    d
    • 2
    • 16
  • e

    Emma Willemsma

    10/08/2020, 8:40 PM
    Does anyone know if there's a way to set up workflow dependency imports so that they're only imported during flow runs? Say I have something like this:
    from prefect import task, Flow
    from prefect.environments.storage import Docker
    import boto3
    
    @task(log_stdout=True)
    def use_boto3():
        print('Using boto3 version {}'.format(boto3.__version__))
    
    with Flow('Sample Flow') as flow:
        use_boto3()
    
    storage = Docker(python_dependencies=['boto3'])
    storage.add_flow(flow)
    storage.build()
    flow.register(project_name='Dev', build=False)
    I would like to be able to run this to register my flow without having to have boto3 installed (since it will only be used during the flow run anyway)
    d
    • 2
    • 14
  • r

    Robin

    10/08/2020, 9:27 PM
    We are having some major roadblocks when using prefect and snowflake. We got the following error when running a flow with ~35000 embarassingly parallel tasks (one for each of ~35000 systems):
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 3417baee-44a2-4b39-82f4-c6ac6d073d1e: provided a running state but associated flow run 51cc335d-f029-45c6-80b4-8c88a0173dbc is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 128, in call_runner_target_handlers
        cache_for=self.task.cache_for,
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1321, in set_task_run_state
        version=version,
      File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 294, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 3417baee-44a2-4b39-82f4-c6ac6d073d1e: provided a running state but associated flow run 51cc335d-f029-45c6-80b4-8c88a0173dbc is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    Has anyone already experienced this? How to debug it? 😕
    c
    d
    r
    • 4
    • 76
  • h

    Hui Zheng

    10/09/2020, 12:01 AM
    hello, Prefect, we have a scheduled flow that runs on k8e agent. Sometimes, the run started 10 mins later than its scheduled time. It seems related to some irresponsiveness of the tasks and The prefect-server has attempted
    Rescheduled by a Lazarus process.
    For example, the one in the screenshot is scheduled for 11:10, but actually didn’t start until 11:20. Could anyone help understand why this happen and how to prevent it? Because we are building a new flow which need to run every 10 minutes with a very strict SLA. a 10 minute delay would be fatal to the new flow. Thank you
    t
    n
    • 3
    • 3
  • a

    ale

    10/09/2020, 9:12 AM
    Hi folks, question related to flow scheduling: is it possible to tell Prefect to start a schedule only if there are no other running instances of that flow? Basically I want to prevent concurrent runnings of the same flow
    r
    n
    s
    • 4
    • 20
  • r

    Robin

    10/09/2020, 11:49 AM
    Hey folks, I am asking myself how to set the logging level to DEBUG on prefect cloud when using dask-kubernetes environment on AWS EKS? We successfully added the env variable to the docker container using
    env_vars={"PREFECT__LOGGING__LEVEL": "DEBUG"},
    in
    flow.storage = Docker(...)
    . However, we still don't see the debug level log messages in cloud 🤔 Do we need to change anything else?
    ✔️ 1
    n
    j
    j
    • 4
    • 40
  • a

    ale

    10/09/2020, 2:05 PM
    Hi folks, how would you configure PostgresFetch when the connection details are available into environment variables which exists only at runtime when the flow is executed on Fargate?
    n
    • 2
    • 3
  • j

    James Phoenix

    10/09/2020, 2:46 PM
    Hey everyone should I start with prefect cloud or prefect server?
    n
    • 2
    • 16
  • f

    flavienbwk

    10/09/2020, 3:02 PM
    Hi ! I'm just starting off with Prefect and I was wondering if I can set up the Prefect server, Prefect UI and 1 Prefect worker directly in my own docker-compose file. Moreover, where do I find the ports to open and volumes to set up ?
    n
    c
    • 3
    • 24
  • p

    Pedro Machado

    10/09/2020, 3:28 PM
    Hi. I am trying to get the docker agent running on an ubuntu machine that has been somewhat restricted by the administrator. I was trying to run
    prefect agent start docker
    (passing several env variables) with the recipe created by
    prefect agent install local
    but I get a permission denied error and the agent never starts. I am able to start the agent without
    supervisord
    and it works fine. Any ideas? Logs in thread.
    n
    • 2
    • 4
  • j

    John Song

    10/09/2020, 4:31 PM
    Does Prefect store the data between tasks somewhere? If not, when task retries in a cluster of nodes, how the data is resent to the task? Does Prefect guaranteed the retried task goes to the same node? Sorry for so many questions
    n
    • 2
    • 2
Powered by Linen
Title
j

John Song

10/09/2020, 4:31 PM
Does Prefect store the data between tasks somewhere? If not, when task retries in a cluster of nodes, how the data is resent to the task? Does Prefect guaranteed the retried task goes to the same node? Sorry for so many questions
n

nicholas

10/09/2020, 4:33 PM
Hi @John Song - no worries for the questions! That's what this community is for. For retries with distributed computing, you'll need to configure results handlers for your tasks. I'd recommend taking a look at the results docs for more info on that.
j

John Song

10/09/2020, 4:42 PM
Thanks, I will take a look
View count: 1