https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • n

    Naimesh Chaudhari

    04/12/2022, 3:56 PM
    What is the best method to split task resources up in a Flow. For example if I want a task to connect to a large spark cluster to do preprocessing, while for my ML task I want it to get connected to a GPU instance.
    k
    a
    3 replies · 3 participants
  • a

    Anders Segerberg

    04/12/2022, 4:20 PM
    I have a workflow that writes a file to S3 I then have a flow-of-flows, which runs this workflow via
    create_flow_run
    , with the
    idempotency_key
    set to this file path. Running the flow the first time, will write this key. Running it a second time, won't -- as expected, because of the idempotency behavior. What I don't understand is that the second flow run has every task state set to 'success' -- but wouldn't I expect them to be 'cached' or something else, indicating that the flow isn't being re-ran?
    k
    a
    12 replies · 3 participants
  • p

    Prasanth Kothuri

    04/12/2022, 4:31 PM
    Hello, I am receiving the following pickle error while registering my flow
    File "/usr/local/lib/python3.8/dist-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/usr/local/lib/python3.8/dist-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
        return Pickler.dump(self, obj)
    TypeError: cannot pickle '_thread.lock' object
    to make it simple it has just one task as below
    @task(log_stdout=True)
    def get_file_names():
        files = s3.Bucket(s3_bucket).objects.all()
        file_names = []
        for my_bucket_object in files:
            file_name = my_bucket_object.key
            regex = re.search(r".ctl", str(file_name))
    
            if regex is not None:
                file_names.append(file_name)
    
        return file_names
    and flow
    # flow to chain the tasks
    with Flow("my_flow", storage=storage, schedule=schedule) as f:
        ctl_files = get_file_names()
    any ideas why prefect is unable to serialize / pickle ???
    a
    2 replies · 2 participants
  • j

    Josh

    04/12/2022, 6:00 PM
    Is it possible to “force cancel” a flow? The flow was running on an agent that died and did not emit any events later. I manually clicked the “cancel” button from the UI, and now the flow is stuck in a
    Flow run is cancelling…
    message for 2 weeks. Is there anything I can do to let prefect cloud know that the flow is actually dead and never succeeded?
    k
    1 reply · 2 participants
  • a

    Atsushi Saito

    04/12/2022, 6:18 PM
    Is there a recommended debug way of shelltasks non-zero exit code in the docker-runner ?
    k
    7 replies · 2 participants
  • k

    kiran

    04/12/2022, 7:33 PM
    Best practices question: I’ve found that since using prefect, I’ve used
    try/except/else/finally
    less often than before (in my main code) because I figure prefect will catch (and log) things. I still find uses for it inside actual tasks/functions. Is this what other people have done or am I thinking about it the wrong way? Thanks!
    k
    2 replies · 2 participants
  • p

    Patrick Tan

    04/12/2022, 7:54 PM
    My co-worker and I have different prefect cloud accounts. How can I grant permission to my co-worker to view my flow run, or execute flow I registered?
    a
    k
    5 replies · 3 participants
  • d

    David Yang

    04/12/2022, 8:12 PM
    Hi, I tried to create an VM from Prefect Agent in Azure Marketplace. The API key generated in Prefect cloud is 40 characters. But the Prefect Agent template requires the key length should be 1-22 characters. Can Prefect team help me to solve this issue?
    a
    1 reply · 2 participants
  • g

    Greg Kennedy

    04/13/2022, 12:08 AM
    Hi all. Some of my flows started failing overnight and after investigation I have found that the EC2 instance that the flows run on has run out of disk space. After a bit of digging I realised that the prefect flow results are filling up the C: drive. What is the best practice for persisting these results? I don't think we really even need them, except maybe in the event of a failure where we might rerun failed jobs.
    a
    1 reply · 2 participants
  • c

    Carlos Cueto

    04/13/2022, 2:09 AM
    Hi. I'm new to Prefect and having some trouble understanding the concept of tasks and task runs. I'm trying to return data from a task, but attempting to print the results of the task only prints out
    <Task: fetch_data>
    . This is what I'm trying to do:
    @task
    def fetch_data():
    return {"data": "random data"}
    with Flow('Get-Data') as flow:
    flow.run_config = LocalRun()
    data = fetch_data()
    <http://logger.info|logger.info>(data)
    flow.run()
    k
    4 replies · 2 participants
  • b

    Bihag Kashikar

    04/13/2022, 5:40 AM
    Hi Guys, I was reading the documentation for running prefect agent on gcp vertex ai and noticed the links on the page here are broken -> screenshot of links
    a
    m
    2 replies · 3 participants
  • l

    Leanna Morinishi

    04/13/2022, 6:27 AM
    Hi! Our team wants to use Parameters with default values to record DRIs for Flows (regardless of who created it or updated it last), so that we can query for it. It seems like I have to use a Parameter in a task for it to show up in Prefect UI? Is there a nicer/better way for me to have a Parameter show up in the UI or be graphQL queryable than to write a dummy task that just prints the Parameter?
    a
    k
    3 replies · 3 participants
  • j

    Jacob Blanco

    04/13/2022, 7:01 AM
    How can I revoke user API Keys?
    a
    15 replies · 2 participants
  • t

    Trung Đô Trần

    04/13/2022, 8:24 AM
    👋 Hello, team!
    👋 3
    a
    k
    2 replies · 3 participants
  • o

    Olivér Atanaszov

    04/13/2022, 9:19 AM
    Hi, I have a scheduled flow run that had a failed run during the night (probably due to transient network issues). Although Lazarus is enabled for this flow, I did not see any logs reflecting that, see https://docs.prefect.io/orchestration/concepts/services.html#lazarus. Do I miss something?
    s
    3 replies · 2 participants
  • s

    Stephen Lloyd

    04/13/2022, 11:04 AM
    I have a flow that when I run locally, works fine. Here’s my flow
    with Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
        creds = get_credentials()
        conn = get_connection(creds)
        tables = get_tables()
        save_data = load_to_s3.map(tables, conn=unmapped(conn))
        conn.set_upstream(creds)
        save_data.set_upstream(tables)
        save_data.set_upstream(conn)
    It’s failing on
    get_connection
    and the relevant code is:
    @task
    def get_credentials():
        return PrefectSecret(PREFECT_FLOW_NAME).run()
    
    @task
    def get_connection(creds):
        return connectors.get_redshift_connector(creds)
    
    # from another file...
    import redshift_connector
    
    
    def get_redshift_connector(creds: dict) -> object:
        conn = redshift_connector.connect(
            host=creds['host'],
            database=creds['database'],
            user=creds['user'],
            password=creds['password']
        )
        return conn
    When I move to running in ECS, it fails with the following trace…
    Unexpected error: TypeError("cannot pickle 'Struct' object")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 71, in write
        binary_data = new.serializer.serialize(new.value)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
        return cloudpickle.dumps(value)
      File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
        return Pickler.dump(self, obj)
    TypeError: cannot pickle 'Struct' object
    a
    13 replies · 2 participants
  • m

    Malthe Karbo

    04/13/2022, 12:21 PM
    Hi (prefect) - running flows without cloud beta (orion) works fine, but if I login - all works as expected until end of flow, I then receive an error consistenly:
    14:17:55.129 | ERROR   | Flow run 'arrogant-yak' - Crash detected! Execution was interrupted by an unexpected exception.
    followed by
    prefect.exceptions.Abort: This run has already terminated.
    . This happens regardless of task success/failure. Again, running without cloud this works fine
    a
    m
    13 replies · 3 participants
  • s

    Stephen Herron

    04/13/2022, 12:40 PM
    Hi - we’re experiencing a read_timeout issue with the
    invokeLambda
    task. I think we need to pass it some extra config through boto_kwargs. Has anyone came across needing to do this before?
    a
    4 replies · 2 participants
  • t

    Tom Klein

    04/13/2022, 1:39 PM
    Hello, what’s the best idiom for marking a task of executing some k8s job via
    RunNamespacedJob
    as a failure (within the job itself) ? or is it impossible because the command is async?
    a
    4 replies · 2 participants
  • m

    Matthew Seligson

    04/13/2022, 2:07 PM
    What’s the best way (in Prefect 1.0) to schedule a flow to run at 8am US/Eastern on the second weekday of the month? It seems that CronClocks will handle DST well but can’t handle weekday of the month. And RRule clocks can handle this well but it’s not obvious to me how good their DST handling is. Would appreciate any advice on this!
    k
    6 replies · 2 participants
  • t

    Tom Klein

    04/13/2022, 3:31 PM
    Hello again 😄 regarding the
    RunNamespacedJob
    example (from https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py ) --- we implemented it and got it to work, but it seems that it’s now failing on :
    VALIDATIONFAIL signal raised: VALIDATIONFAIL('More than one dummy pod')
    because there seems to be many pod “resiudes” of previous runs:
    ['prefect-agent-7745fb9694-6fwk4', 'prefect-job-47d072a8-4pbsf', 'seg-pred-test-cm54l', 'seg-pred-test-doron', 'seg-pred-test-l2j5l', 'seg-pred-test-zvwld']
    so wouldn’t k8s keep the pods around given that we gave a “delete_job_after_completion” = False ? and even if the job is deleted successfully, wouldn’t it keep the pods around? or are the pods supposed to be deleted automatically if the job is deleted…?
    k
    a
    +1
    33 replies · 4 participants
  • d

    David Yang

    04/13/2022, 3:49 PM
    I have a master flow that executes other flows. It runs well so far using git storage. but it failed with this error message: "No heartbeat detected from the remote task; marking the run as failed" tired to first sub flow called in the master flow. Only one line code changed in the master flow. Any suggestions?
    k
    a
    31 replies · 3 participants
  • c

    Chris Reuter

    04/13/2022, 4:00 PM
    @Laura Lorenz's livestream starts in 30 minutes! She'll cover an end-to-end 😛refect2: Prefect 2.0 and :cloud2: Prefect Cloud 2.0 experience. You can sign up to watch here.
    :party-parrot: 2
  • k

    Kevin Mullins

    04/13/2022, 5:02 PM
    I’m starting to use
    AzureResult
    to store task results for my flows. I’ve already got separate storage accounts per environment (dev, qa, prod). I’m curious if it would be recommended to use separate containers for different Prefect projects and/or flows or if it is ok to just store all the results in the same blob container for the environment. My hesitation for further separation is it appears that the AzureResult requires the container to already exist, so I would need to orchestrate something creating containers for each Prefect project. Not a big deal but just trying to get a feel for good practices. Any thoughts appreciated.
    k
    8 replies · 2 participants
  • m

    Matt Alhonte

    04/13/2022, 6:39 PM
    How do I delete a bunch of Submitted runs at once? Deleting the Flow didn't seem to do it.
    a
    17 replies · 2 participants
  • c

    Chris Reuter

    04/13/2022, 6:45 PM
    Starting in 15 minutes: our 2nd in a 🔥 day of livestreams. @Ben Welsh is the guest on PrefectLive, showing how he uses Prefect and GCP together.
    🎉 1
    📺 2
    🔥 2
  • z

    Zach Munro

    04/13/2022, 7:03 PM
    What are the alternative ways to schedule prefect flows besides running them within kubernetes?
    k
    a
    2 replies · 3 participants
  • z

    Zach Munro

    04/13/2022, 7:03 PM
    e.g. Is there a Fargate executor?
    m
    1 reply · 2 participants
  • z

    Zach Munro

    04/13/2022, 7:13 PM
    One more question: is it possible to have different tasks in a pipeline run in separate containers? For instance, if one task needed to be run on a windows machine, and the next task in the flow needed to be run on a linux machine.
    k
    1 reply · 2 participants
  • r

    Rajan Subramanian

    04/13/2022, 10:15 PM
    Hello, i have a fargate eks question. I was in the verge of deploying my prefect file into fargate. I'm using redis as a temporary store. I created a redis elastic cache on AWS and im having trouble connecting to it via kubernetees. Mainly getting a
    temporary failure in name resolution
    I'm not even sure whom to ask this, but since i was using prefect i figured someone here knows about this. I raised it here, https://stackoverflow.com/questions/71864208/unable-to-connect-to-redis-elasticcache-from-fargate, curious if someone had any suggestions? My fargate profile has same 4 subnets that my cluster in elastic cache has. they also have the same security group.
    a
    7 replies · 2 participants
Powered by Linen
Title
r

Rajan Subramanian

04/13/2022, 10:15 PM
Hello, i have a fargate eks question. I was in the verge of deploying my prefect file into fargate. I'm using redis as a temporary store. I created a redis elastic cache on AWS and im having trouble connecting to it via kubernetees. Mainly getting a
temporary failure in name resolution
I'm not even sure whom to ask this, but since i was using prefect i figured someone here knows about this. I raised it here, https://stackoverflow.com/questions/71864208/unable-to-connect-to-redis-elasticcache-from-fargate, curious if someone had any suggestions? My fargate profile has same 4 subnets that my cluster in elastic cache has. they also have the same security group.
a

Anna Geller

04/14/2022, 12:36 AM
Why Redis? 😄 we discussed this together, and Timestream is so much better suited to your use case. You're dealing with time-series data. Redis has a plugin for time series, but it's not available in the ElastiCache because ElastiCache is an AWS "fork" of Redis (or a plain version with no custom plugins), and RedisLabs afaik only supports this time-series plugin in their product only I honestly really think you would be doing yourself a huge favor when switching to Timestream. ElastiCache is a pain to manage - I can't help in connection issues because it was set up by DevOps for me in the past, but nobody was happy with it 😂 it seems you are having the same experience
👍 1
r

Rajan Subramanian

04/14/2022, 5:41 AM
ugh yea you are right. its literally a pain to integrate redis elastic cache with fargate. i just switched to timestream. Question, any idea how to write data into timestream with multiple fields? example: timestamp | symbol | bids | asks 12:24:55 | BTC | [22.33, 44.55, ...] | [55.66, 77.28] My bids and asks fields are list of prices at that timestamp. I mean i converted it to a string. Is this possible in timestream? At the moment i have it as, record = { "Time": str(int(round(time.time() * 1000))), "TimeUnit": "MILLISECONDS", "Dimensions": [{"Name": "symbol", "Value": "BTC"}], "MeasureName": "asks", "MeasureValue": str(records[0]['asks']), "MeasureValueType": "VARCHAR", }, ``````
a

Anna Geller

04/14/2022, 9:09 AM
It's a matter of how you format the records. Here is how you can do that
records = [
    {
        "Time": now,
        "TimeUnit": "MILLISECONDS",
        "Dimensions": [{"Name": "crypto", "Value": "BTC"}],
        "MeasureName": "Price",
        "MeasureValue": str(btc),
        "MeasureValueType": "DOUBLE",
    },
    {
        "Time": now,
        "TimeUnit": "MILLISECONDS",
        "Dimensions": [{"Name": "crypto", "Value": "ETH"}],
        "MeasureName": "Price",
        "MeasureValue": str(eth),
        "MeasureValueType": "DOUBLE",
    },
    {
        "Time": now,
        "TimeUnit": "MILLISECONDS",
        "Dimensions": [{"Name": "crypto", "Value": "DASH"}],
        "MeasureName": "Price",
        "MeasureValue": str(dash),
        "MeasureValueType": "DOUBLE",
    },
    {
        "Time": now,
        "TimeUnit": "MILLISECONDS",
        "Dimensions": [{"Name": "crypto", "Value": "REP"}],
        "MeasureName": "Price",
        "MeasureValue": str(rep),
        "MeasureValueType": "DOUBLE",
    },
]
and then to write the records:
import boto3

write_client = boto3.client("timestream-write")
rejected_records = write_client.write_records(
    DatabaseName="demo", TableName="data", Records=records, CommonAttributes={}
)
print(rejected_records)
this blog post shows more details, and this one has a full example incl. a guide how to build a Grafana dashboards for that 🙂
❤️ 1
r

Rajan Subramanian

04/14/2022, 6:43 PM
hi anna, thanks. so i managed to push everything to timestream. But after 3 minutes, im getting an error, 'the dimension name and value exceeds the maximum supported length for dimension names and values. see quotas in the timestream developer guide for additional information?
a

Anna Geller

04/14/2022, 6:49 PM
I think you know the answer, right? 😄 use shorter names for the dimensions. AWS docs for Timestream are quite good, you can look up all the details there
r

Rajan Subramanian

04/14/2022, 7:03 PM
oh i think its my value. I'm storing a list of list as a string in there. Is that not allowed?
a

Anna Geller

04/14/2022, 7:17 PM
I don't know, I'm afk, can you check Timestream docs?:)
View count: 3