https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Carl

    02/23/2021, 2:41 AM
    Hi everyone. Very new to Prefect. I have some large files that I want to put through a pipeline, and after setting up the db tables, etc. I only want to process one row at a time so that I can validate errors and skip dodgy rows, etc. Speed isn’t a huge concern. Does anyone know of a good strategy to achieve this with Prefect?
    c
    • 2
    • 2
  • m

    Manik Singh

    02/23/2021, 3:14 AM
    Hi all, I'm having trouble finding any documentation around instantiating flow runs from a server app (fastapi). At present, here's the setup: 1. cloud as the backend 2. local agent + executor 3. web service (fastapi) running inside a docker container on my local machine If I've already registered some tasks by manually running a script, how am I supposed to instantiate flow runs from the web service? The python scripts with the task and flow definitions are also available for flask app
    m
    • 2
    • 6
  • a

    André Bonatto

    02/23/2021, 4:36 AM
    Hi there! I'm trying to follow the advanced tutorial (https://docs.prefect.io/core/advanced_tutorials/advanced-mapping.html#extend-flow-to-write-to-a-database) but it appears it's not working. The insert episode tasks are raising "sqlite3.OperationalError: unable to open database file" but the file was correctly created by the create_db task. Can someone help me troubleshoot?
  • v

    Varun Joshi

    02/23/2021, 7:28 AM
    Hey Prefect experts, I'm using prefect to push incremental data from SQL Server to a streaming messaging system which further pushes to Big Query. I have attached tasks to two functions. One which extracts metadata and other which loops through the pushing incremental load function.
    #!/usr/bin/env python
    # coding: utf-8
    
    import pyodbc
    from prefect.storage import GCS
    import json
    import time
    import os
    import datetime
    from google.cloud import pubsub_v1
    import pymysql
    from prefect import task, Flow, Parameter
    
    
    
    @task(log_stdout=True)
    def extract_metadata(source_system,source_database_id):
        
        #provides a list of metadata to loop through and extract delta
        return metadata
    
    
    def delta_push(metadata):
        # This function extract data from every metadata detail provided and pushes it further
    
        
    
    
    @task
    def delta_push_wrapper(metadata):
    #Looping through ever metadata row and calling the push function 
        for metadata_row in metadata:
            delta_push(metadata_row)
    
    
    
    with Flow("data_flow") as flow:
        flow.storage = GCS(bucket="bucketname")
        parameter1 = Parameter("paramater1",default="default")
        parameter2 = Parameter("parameter2",default=1)
        metadata = extract_metadata(parameter1,parameter2)
        delta_push_wrapper(metadata)
        
    
    flow.register(project_name="test_project")
    I'm getting error at the flow.register(project_name="test_project") line where it says 'TypeError: Cannot serialize socket object'. Any help will be much appreciated 🙂
    e
    v
    • 3
    • 8
  • v

    Varun Joshi

    02/23/2021, 12:14 PM
    Hello again, has anyone come across this kind of bug before while running a flow?
    v
    • 2
    • 8
  • m

    Matheus Calvelli

    02/23/2021, 12:46 PM
    Hello, everyone. We've started working with prefect in our team and somethings about caching dont seem to add up for me. Could someone help me understand? According to this issue: https://github.com/PrefectHQ/prefect/issues/1221 it seems prefect does not allow persistent caching of tasks to be used during different runs but it should allow for scheduled runs (there is also this issue which talks about it: https://github.com/PrefectHQ/prefect/pull/1226). However, this does not seem to be the case either. I tried caching the results of a couple of tasks and schedule the flow to be run again a few minutes later and the cached tasks didnt work, which in turn made the whole flow not work. Could someone explain how exactly caching works? And, is there a way through which i can "backup" results of tasks in order to iterate over models (which is what i wanted to do with cached tasks in the first place)?
    j
    • 2
    • 2
  • t

    Tobias Heintz

    02/23/2021, 12:50 PM
    Another conceptual question: how tightly is Prefect bound to Dask? It appears (in the docs) that some critical features are only available when using Dask: for example Task parallelism and transparent data flow between Tasks (which may be running on different machines). We are planning to run everything on ECS, will we still be able to use these features? Thanks a lot!
    s
    f
    • 3
    • 3
  • b

    Braun Reyes

    02/23/2021, 1:32 PM
    Hey there everyone. Curious if it is possible to map over a section of task dependencies. Like I have an E->L set of tasks that i want to map over a list of tables. Is that possible. It seems with map and apply_map it will still map over all the E's and then move on to mapping all the L's. I support this would be like mapping over a nested flow, but not a flow that is actually registered, but resides inside a registered flow. Was testing with this
    from time import sleep
    
    from prefect import Flow, apply_map, task
    from prefect.executors import DaskExecutor
    
    
    @task()
    def test_1(x):
        sleep(2)
        print(f"test_1 with {x}")
        return x
    
    
    @task()
    def test_2(x):
        sleep(2)
        print(f"test_2 with {x}")
    
    
    def micro_flow(x):
        test_1_task = test_1(x)
        test_2(test_1_task)
    
    
    with Flow(
        "example",
        executor=DaskExecutor(cluster_kwargs={"n_workers": 1, "threads_per_worker": 2}),
    ) as flow:
        apply_map(micro_flow, range(10))
    
    
    if __name__ == "__main__":
        flow.run()
        # flow.visualize()
    j
    • 2
    • 11
  • r

    rafaqat ali

    02/23/2021, 1:48 PM
    Hi I've been searching prefect deployment mechanism on azure web app for containers, but couldn't find any help. I have concluded that prefect is not suitable for me as I have to deploy on my azure infra. I have created my own docker compose file but apollo url is still pointing to localhost. Can anyone kindly guide me, how can I achieve this one? or I should look for other option like dagster?
    j
    m
    • 3
    • 2
  • d

    Daniel Black

    02/23/2021, 3:19 PM
    Hello. I am getting a weird error when my task runs on schedule. All the task is doing is executing a query in a postgres db. It is scheduled to run daily but fails because "the table does not exist". I know it exists and if I restart the same task from the Prefect Cloud UI it works just fine. Our deployment is using a fargate task to run a docker container. Any ideas as to why this error would occur?
    j
    j
    • 3
    • 16
  • p

    Pedro Machado

    02/23/2021, 3:55 PM
    Hi there. I am working on a project where we might use kubernetes to execute flows. I don't have much experience with Kubernetes, but I've used the
    KubernetesPodOperator
    in Airflow and one thing I noticed is that it that you can't see the logs until the task ends. How does logging work in Prefect when you use kubernetes? Are logs streamed to prefect cloud in real time?
    d
    • 2
    • 3
  • a

    Asif Imran

    02/23/2021, 5:19 PM
    Good morning! 👋 I am curious what is the 😛refect: equivalent of
    Sensors
    from airflow. I see this helpful link[1] from Jeremiah -- essentially do the poke-sleep-poke yourself iiuc. Any changes since his post? Recently AF introduced the notion of SmartSensors[2] which removes a fair bit of work duplication (its fairly typical for me to have several workflows all polling on the same S3 success file). I have similar worries that such polling will hog up resources (e.g going over the resources in my ecs cluster) [1] https://prefect-community.slack.com/archives/CL09KU1K7/p1602088074097600?thread_ts=1602087747.097500&cid=CL09KU1K7 [2] https://airflow.apache.org/docs/apache-airflow/stable/smart-sensor.html
    👀 1
    j
    • 2
    • 3
  • a

    Ajith Kumara Beragala Acharige Lal

    02/23/2021, 5:50 PM
    Hi Prefect Experts, am facing an issue when invoking a script in
    GITLab repo
    in my prefect-server, can someone help me to figure-out what is the mistake in my code? the error
    Failed to load and execute Flow's environment: GitlabGetError('404 Project Not Found')
    j
    • 2
    • 90
  • d

    Diego Alonso Roque Montoya

    02/23/2021, 6:20 PM
    Hi! Is there a way to send logs from a distinct machine into prefect? I have tasks that spawn computers so it would be nice if those child computers can log back to the main task
  • s

    S K

    02/23/2021, 7:03 PM
    Need help here. Trying to do in python as below. This is to check the flow state and cancel in the flow is in running state. How to pass the values to "$flowRunId: UUID!"                                                                                                                        import prefect
    from prefect import Client
    from prefect import task, Flow
    @task()
    def check_runs():
      c = Client()
      query = """
      query RunningFlowsName {
      flow(where: {name: {_eq: "flowstatechecktest"}}) {
        id
      } }  """
      print('======')
      print(c.graphql(query=query))
    
      query2 = """
      query RunningFlowsState {
      flow_run(where: {state: {_eq: "Running"}}) {
        state
      }  }  """
      print('======')
      print(c.graphql(query=query2))
    
      query3 = """
      mutation CancelFlowRun($flowRunId: UUID!) {
      cancel_flow_run(input: {flow_run_id: $flowRunId}) {
        state
      }  }  """
      c.graphql(query=query3)
    
    with Flow("flowstatechecktest") as flow:
        check_runs()
    flow.run()
    j
    • 2
    • 1
  • m

    matta

    02/23/2021, 7:04 PM
    Heya! So, I'm pulling from a database that goes down for ~75 minutes at random times. I set my tasks to have
    @task(max_retries=3, retry_delay=timedelta(minutes=30))
    but apparently Zombie Killer doesn't like that? Looking through the logs, I see
    No heartbeat detected from the remote task; marking the run as failed.
    , then `Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">`` then
    Heartbeat process died with exit code -9
    then
    Failed to set task state with error: ClientError([{'message': 'State update failed for task run ID 43f52f19-fffb-4d16-8223-da4ffc5668b2: provided a running state but associated flow run 8c8fc810-eb3d-447c-ab70-76dd1dc2acaa is not in a running state.', 'locations': [{'line': 2, 'column': 5}], 'path': ['set_task_run_states'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'State update failed for task run ID 43f52f19-fffb-4d16-8223-da4ffc5668b2: provided a running state but associated flow run 8c8fc810-eb3d-447c-ab70-76dd1dc2acaa is not in a running state.'}}}],)
    m
    a
    • 3
    • 7
  • m

    matta

    02/23/2021, 7:04 PM
    What's the best way to do a retry for something kinda long into the future?
  • a

    Alex Papanicolaou

    02/23/2021, 8:15 PM
    Hi folks, @Marwan Sarieddine and I have a general question about flow runs, the future without a flow concurrency limit, and imposing compute constraints. More detail in the thread.
    j
    • 2
    • 3
  • u

    user

    02/24/2021, 12:17 AM
    When working with dataframes, what library is best for speed? I am doing a lot of standard .apply() and .groupby() functions.
  • d

    Danny Vilela

    02/24/2021, 12:57 AM
    Hi all, I’m trying to scope out the amount of steps required to build an
    HdfsDataFrameResult
    for checkpointing PySpark DataFrames. Imagine a flow with just two tasks A -> B, where A produces a PySpark DataFrame and B uses that DataFrame. I’d like to make use of caching in case B fails, so that it can quickly a check-pointed A from disk (here, HDFS). To make use of caching, am I correct that I’d need to: 1. Set environment variable
    PREFECT__FLOWS__CHECKPOINTING=true
    . 2. Implement an `HdfsDataFrameResult`` (with interface
    read
    ,
    write
    ,
    exists
    ). 3. Have Task A’s
    run
    explicitly return a PySpark
    DataFrame
    . 4. Initialize Task A within the
    Flow
    context manager as
    TaskA(checkpoint=True, result=HdfsDataFrameResult(...))
    . Is that it? I guess I’m not 100% understanding the separation between the serializer and the result, and whether I need a
    HdfsDataFrameSerializer
    . It seems like the serializer is too low-level for PySpark DataFrames, but I’m happy to be proven wrong 🙂
    ✅ 1
  • a

    Alfie

    02/24/2021, 6:29 AM
    hi Team, now i see that scheduler checks the new task to schedule every 300 seconds, can I configure this interval via ENV? thanks.
    j
    • 2
    • 3
  • m

    Maria

    02/24/2021, 6:49 AM
    Hi prefect community! I'm new to prefect and cannot figure out why secrets don't work for me. I'm using windows. I have env var set "PREFECT__CONTEXT__SECRETS__AZURE_DEMO="DefaultEndpointsProtocol=https;AccountName=.... ", but when running code
    from prefect.tasks.azure.blobstorage import (
        BlobStorageDownload
    )
    
    storage = BlobStorageDownload(azure_credentials_secret="AZURE_DEMO", 
                                  container="mycontainer")
    
    storage.run(blob_name="demo")
    I'm getting
    ValueError: Local Secret "AZURE_DEMO" was not found.
    I tried setting AZURE_DEMO in config.toml but getting same result. What am I doing wrong?
    a
    • 2
    • 5
  • v

    vish

    02/24/2021, 10:13 AM
    Hello, here's a quick one! Are
    Results
    objects meant to be instantiated within a task? Looks like it works either way
    j
    • 2
    • 1
  • c

    Carl

    02/24/2021, 12:20 PM
    Trying to use the pre-build
    PostgresExecuteMany
    task but keep running into a “ValueError(’Could not infer an active Flow context.’)” error. Any ideas what I’m doing wrong here? Thx
    from prefect import task, Flow
    import prefect.tasks.postgres.postgres as pg
    
    DB_NAME = 'blah'
    DB_USER = 'blah'
    DB_HOST = 'localhost'
    
    sql_insert = pg.PostgresExecuteMany(db_name=DB_NAME, user=DB_USER, host=DB_HOST)
    
    @task()
    def extract_data():
        vals = ['aaa', 'bbb']
        return vals
    
    
    @task()
    def load(data):
        insert_stmt = """INSERT INTO "table_a" ("COL_A", "COL_B") VALUES (%s, %s)"""
        ret = sql_insert(query=insert_stmt, data=data, commit=True)
        return ret
    
    
    def build_flow():
        with Flow('Test ETL') as f:
            data = extract_data()
            load(data)
        return f
    
    
    flow = build_flow()
    flow.run()
    a
    • 2
    • 3
  • a

    Ajith Kumara Beragala Acharige Lal

    02/24/2021, 3:14 PM
    Hi Prefect Experts, my company needs me to migrate Spark batch jobs to Prefect.io (schedule them via Prefect.io)  , I have managed to set up Prefect-server on our Kubernetes cluster.  Those Spark batch jobs are already configured to deploy via Helm to Kubernetes cluster , what is the easiest way to schedule them via Prefect.io on Kubernetes cluster ?
    j
    • 2
    • 3
  • s

    S K

    02/24/2021, 7:21 PM
    Need help here. Trying to cancel the flow that is in running state, but getting the error. Step 1: Running flows and Step 2: Trying to cancel the flow. This is in prefect backend server
    n
    • 2
    • 5
  • r

    Richard Hughes

    02/24/2021, 8:59 PM
    Hi Prefect, I want to add a new cronclock to the schedule clocks array that has already been defined. Any method to achieve this one?
  • r

    Richard Hughes

    02/24/2021, 9:05 PM
    nm...looks like clocks is a list add the 2 lists and then create the schedule
  • d

    Danny Vilela

    02/24/2021, 9:15 PM
    Hi all. I’m trying to use task result caching in a flow and I’m struggling to understand how prefect actually does cache retrieval. I’m seeing the error:
    [2021-02-24 12:14:55-0800] WARNING - prefect.TaskRunner | Task 'MyTask': Can't use cache because it is now invalid
    . I am using a custom
    Result
    subclass I wrote for caching PySpark DataFrames, which could be the issue. Here’s how I initialize and run the task:
    my_task: MyTask = MyTask(
        checkpoint=True,
        cache_for=dt.timedelta(hours=12),
        result=HdfsDataFrameResult(spark=spark, location=hdfs_path_for_this_task),
    )
    By my understanding, this should cache the output of
    my_task(…)
    for 12 hours. So even if I restart the Python process (say, if I’m developing a flow within a notebook) I can restart the kernel as much as I’d like and still have that task access the cache….right? Am I missing something? Do I need a
    cache_key
    here to share the same cache (here, HDFS) between different flows?
    j
    • 2
    • 7
  • b

    Belal Aboabdo

    02/24/2021, 10:24 PM
    Hi I'm trying to checkpoint with
    S3Results
    but am getting this error when mapping the function.
    [2021-02-24 14:11:49-0800] ERROR - prefect.TaskRunner | Task 'plot_map_counts_data[0]': Unexpected error while running task: TypeError("exists() got multiple values for argument 'location'",)
    The flow runs successfully without checkpointing, here's an example of the task.
    @task(
        checkpoint=True,
        target="{flow_name}/{today}/{task_name}_{map_index}.png",
        result=prefect.engine.results.S3Result(bucket=results_s3_bucket),
    )
    def plot_map_counts_data(df, location, resource):
        #some task
        return image
    
    with Flow("example") as flow:
    
        plot_map_counts_data.map(
            df,
            locations,
            unmapped("test_resource"),
        )
    j
    m
    • 3
    • 4
Powered by Linen
Title
b

Belal Aboabdo

02/24/2021, 10:24 PM
Hi I'm trying to checkpoint with
S3Results
but am getting this error when mapping the function.
[2021-02-24 14:11:49-0800] ERROR - prefect.TaskRunner | Task 'plot_map_counts_data[0]': Unexpected error while running task: TypeError("exists() got multiple values for argument 'location'",)
The flow runs successfully without checkpointing, here's an example of the task.
@task(
    checkpoint=True,
    target="{flow_name}/{today}/{task_name}_{map_index}.png",
    result=prefect.engine.results.S3Result(bucket=results_s3_bucket),
)
def plot_map_counts_data(df, location, resource):
    #some task
    return image

with Flow("example") as flow:

    plot_map_counts_data.map(
        df,
        locations,
        unmapped("test_resource"),
    )
j

josh

02/24/2021, 10:36 PM
Hi @Belal Aboabdo I believe
location
may end up bring a reserved word here when formatting the target. 🤔 If you change
location
in your task kwargs does it work?
b

Belal Aboabdo

02/24/2021, 10:41 PM
Awesome you're right it worked. Thank you!
j

josh

02/24/2021, 10:41 PM
@Marvin open “Location cannot be used as task kwarg when using Result”
m

Marvin

02/24/2021, 10:41 PM
https://github.com/PrefectHQ/prefect/issues/4173
View count: 1