https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tarek

    06/14/2022, 10:39 AM
    hi, i start to use prefect 2.0 and am struggling to solve an ssl-error when logging to cloud, am trying to login within a docker container..
    ✅ 1
    a
    s
    14 replies · 3 participants
  • s

    Sang Young Noh

    06/14/2022, 1:06 PM
    Hi all. I’m currently running a deployment with a flow like: .. (sorry, please follow the thread for the original message!)
    ✅ 1
    a
    k
    34 replies · 3 participants
  • t

    Toby Rahloff

    06/14/2022, 1:17 PM
    Anyone got an idea whether it is possible to view the average lateness of a flow in the new 2.0 Cloud UI? It was possible with the old UI (top right corner if I remember correctly)
    ✅ 1
    a
    j
    18 replies · 3 participants
  • a

    Ayush Bairagi

    06/14/2022, 1:33 PM
    Hi, How can i run prefect tasks in order of downstream, for example if i have three tasks a, b , c and the stream looks like a > b > c, then if there is one worker. then i want to run a then b and then c, but its running likt a for first element in stream, then a again then b, then b again for next element and then c
    k
    12 replies · 2 participants
  • i

    Ilhom Hayot o'g'li

    06/14/2022, 1:44 PM
    Hi All! With prefect1.0 my parent flow stuck at status. I have given labels to it but still not working . Any suggestions on how to debug the flow? Status: It has been 15 seconds and your flow run has not been submitted by an agent. Agent 57f7d146-3361-4aef-b0e7-9961bb1fa183 (agent) has matching labels and last queried a few seconds ago. It should deploy your flow run.
    k
    30 replies · 2 participants
  • j

    Joseph Mathes

    06/14/2022, 2:25 PM
    When running a flow from prefect cloud using a Coiled Dask executor, I'm noticing long waits (on the order of minutes) before the first task starts running. Is that expected? Are there any best practices (or even not-so-best practices) that can be used to shorten that loop? I'm hoping to make it part of a dev iteration cycle, which would make those two minutes pretty common, and thus expensive I'm also asking the Coiled people if there are any options on their side
    k
    2 replies · 2 participants
  • p

    Paco Ibañez

    06/14/2022, 2:34 PM
    Hello! I have a flow in 2.0 with two tasks that I want to run sequentially. I am using a docker flow runner and my flow looks something like this:
    @flow
    def records_analysis_flow():
        ...
        future = ingest_csv(config)
        future.wait()
        future = analyze_records(ra_config)
        future.wait()
    I noticed somewhat inconsistent performance results. If I run the code in docker without prefect it takes around 154 seconds but when I run it in prefect it takes almost 900 seconds. However, if I explicitly set the task runner to sequential, then it takes ~ 180 seconds. Is my flow doing something wrong? Why does the concurrent task runner take so long? Thanks!
    m
    5 replies · 2 participants
  • j

    John-Craig Borman

    06/14/2022, 2:48 PM
    Hi all, are there any utilities to determine how many tasks were charged for a given flow run? (Using Prefect Cloud)
    k
    8 replies · 2 participants
  • j

    Jason Damiani

    06/14/2022, 4:40 PM
    Apologies if something like this has been asked before. I'm developing a flow in 1.x with Cloud that will need to adhere to API rate limiting (e.g. 1 request per second). Is there a first class executor agnostic way of doing this? I will be making the http requests in mapped tasks. The best I've come up with is to use https://pyratelimiter.readthedocs.io/en/latest/ to create a queue thats persisted to some sort of storage backend (like sqlite if only using a local dask executor). Each mapped task that needs to make an api request will wait to acquire a slot in the queue before proceeding
    k
    5 replies · 2 participants
  • a

    Andreas

    06/14/2022, 4:41 PM
    Hi all! According to Orion docs creating and running deployments using the (Python?) API is a "coming soon" feature. Do we have a rough estimate on that? Is this planned with the GA on July? I do not want to put any pressure, I am just asking to have a timeline in my mind for my project that would benefit from this
    ✅ 1
    a
    4 replies · 2 participants
  • j

    Jehan Abduljabbar

    06/14/2022, 5:26 PM
    Hello, I am curious if anyone ran into the same issue I am facing with prefect server & prefect cloud. (when I use flow.run() directly in vs code it works fine), The issue happens when connecting to cosmos db, when using flow.register() then running the flow on the server or on the cloud. the task fails and I get the following error: "Unexpected error: TypeError("cannot pickle '_thread.RLock' object")" This is how I am connecting to cosmos db: URL = 'my cosmos db account uri' KEY = 'my cosmos db account primary key' client = CosmosClient(url=URL, credential=KEY) database = client.get_database_client('my database name') If anyone has any guidance as to how I can fix this, please let me know. I would highly appreciate it.
    k
    2 replies · 2 participants
  • r

    Rio McMahon

    06/14/2022, 5:42 PM
    Hello - I’d like to automatically deploy a flow from within a CI/CD pipeline. I run into import errors and am curious if there is some registration pattern that doesn’t require the environment prefect is installed into to also have the flow dependencies installed for serialization? Ideally I’d like to run some CI/CD pipeline using the prefect docker image (but not necessarily install
    sklearn
    or similar into it) then run
    prefect register --project "Project Name" -l label -p flow.py
    from within the pipeline. Realistically I could probably use the container that I am running the flow from within but am curious if there is an easier way. Thanks.
    k
    2 replies · 2 participants
  • t

    Tony Yun

    06/14/2022, 6:26 PM
    Hi, I think there is a ghost task running behind which we cannot see on UI. All of our running task is queued but no job is seen running on UI. Can someone help?
    Task 'RunNamespacedJob - DBT Run': Finished task run for task with final state: 'Queued'
    k
    22 replies · 2 participants
  • p

    Paco Ibañez

    06/14/2022, 8:24 PM
    Hello, in 2.0, is there a way to provide custom parameter values for a deployment from the UI?
    k
    2 replies · 2 participants
  • a

    aaron

    06/14/2022, 9:21 PM
    Hello — we’re using Prefect Server 1.2.2, Docker storage (with
    stored_as_script=True
    ). I’m attempting to use the ExecuteNotebook task and I’m getting the following only with debug logs on the Docker agent:
    [2022-06-14 20:54:12+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'Test Docker Notebook Flow'
    [2022-06-14 20:54:13+0000] INFO - prefect.CloudTaskRunner | Task 'notebook0': Starting task run...
    Executing:   0%|          | 0/2 [00:00<?, ?cell/s]Operation not permitted (src/thread.cpp:309)
    This is erroring out in the container when trying to access the notebook but not making it back to the Prefect API to change task state (the task run just hangs until it times out with a
    No heartbeat detected from the remote task
    error). Any ideas?
    k
    3 replies · 2 participants
  • a

    Ahmed Ezzat

    06/14/2022, 10:51 PM
    Orion's KV storage option is removed?
    2.0b6
    k
    1 reply · 2 participants
  • j

    Jovan Sakovic

    06/14/2022, 11:12 PM
    Hiya 😮ctavia-hi: Quick question on triggering Flows, any thoughts or ideas are appreciated and welcome 🙌 We have a Jenkins job extracting data from Postgres 😛ostg: and loading into BigQuery (yep, aware it could’ve been done with certain ETL tools, but it wasn’t an option) We also have a few Airbyte syncs on the same VM that Prefect is running on too. How would it make the most sense to trigger dbt jobs after the Jenkins and Airbyte syncs are finished? 🤔
    k
    3 replies · 2 participants
  • f

    Faheem Khan

    06/15/2022, 12:02 AM
    Hello Everyone, is it possible with prefect to run python script i.e. (abc.py) instead of refactoring my code to prefect tasks. I have already written my python script with multiprocessing functionality and would like to schedule it prefect.
    ✅ 1
    a
    k
    9 replies · 3 participants
  • w

    Walter Cavinaw

    06/15/2022, 2:35 AM
    is it possible to specify where the prefect local agent should clone gitstorage to? Right now it clones gistorage to AppData/Temp/[here] . Just wondering if it's possible to change that through a variable or config?
    k
    2 replies · 2 participants
  • w

    wonsun

    06/15/2022, 7:03 AM
    Hi all! Shouldn't the
    config.toml
    file be in the path where prefect is installed? There is only
    backend.toml
    file in the path where my prefect is installed, are those two the same? Also i can't read
    backend.toml
    . I got the error message like below:
    TomlDecodeError: Found invalid character in key name: ':'. Try quoting the key name. (line 1 column 2 char 1)
    What's the backend.toml file?
    ✅ 1
    a
    9 replies · 2 participants
  • j

    Jeff Kehler

    06/15/2022, 7:38 AM
    Setting the
    apollo_url
    under
    [server.ui]
    in the
    .prefect/config.toml
    file does not appear to be working. How do I change the host that the UI will call to access the API?
    a
    2 replies · 2 participants
  • a

    Andreas

    06/15/2022, 10:17 AM
    In Prefect 2.0b6 when using
    DaskTaskRunner
    tasks can stay in a pending state forever if they have custom dependencies (using _`wait_for`)_ on tasks that have failed. The failed state of the upstream tasks do not cause the downstream tasks to fail.. This isn't the case when using the default
    ConcurrentTaskRunner
    where the failed state correctly propagates downwards and causes the dependent tasks to fail by default
    ✅ 1
    a
    m
    4 replies · 3 participants
  • s

    Sheila

    06/15/2022, 10:31 AM
    Good morning! I dont know if this is the place for questions, if It is not,sorry in advance. We are implementing prefect in our company and I would like to ask a question. We have deployed prefect Server on a Kubernetes cluster. We have several flows using different custom Python packages and different versions of these. The question I have is it best practice to deploy one agent per flow with a specific environment or have a single agent with all the packages and consume it from all the flows? On the other hand, is there any defined process for the CI/CD of the flows?
    ✅ 1
    a
    5 replies · 2 participants
  • m

    Matt Delacour

    06/15/2022, 10:45 AM
    Morning all 👋 Any clue about the following error when using S3 as a storage option?
    [2022-06-15 11:08:52+0200] ERROR - prefect.S3 | Error uploading Flow to S3 bucket prefect-flows-proto: An error occurred (SignatureDoesNotMatch) when calling the PutObject operation: The request signature we calculated does not match the signature you provided. Check your key and signing method.
    Error the error + snippet of code
    ✅ 1
    👀 1
    a
    2 replies · 2 participants
  • f

    frojo

    06/15/2022, 10:52 AM
    Hi everyone! I'm setting some state handlers for notifications in our "slack-compatible" (i.e. Zulip) streams. However I'm getting some odd behaviour with the updates. The Flow
    with Flow('slack-test',state_handlers=[post_to_slack]) as flow_slack:
        ini_message="Some Init Message"
        #    
        post_slack_init(ini_message)
        #
        count()
    We get the initial flow message start (
    post_to_slack
    notifies us in start/end/failure), but we don't get the "ini_message" until the flow ends. Even more if
    count
    task (wich also carries a state handler) fails we get the fail of the task in first place and then the
    post_slack_init
    (just a post request no state handler on this task) message. Any Clues? Thanks in advance!
    ✅ 1
    a
    4 replies · 2 participants
  • t

    Toby Rahloff

    06/15/2022, 10:59 AM
    Heyho 🐙 We tested out the Dask provider and think that Prefect version 2.0b6 maybe creates a Dask cluster that has a version mismatch between client and server when using the "aws.FargateCluster" cloudprovider. Steps to reproduce can be found in the thread 🙂
    ✅ 1
    a
    m
    13 replies · 3 participants
  • a

    Apostolos Papafragkakis

    06/15/2022, 12:20 PM
    Hi all, has anyone tried using pydantic models as objects exchanged between tasks? It seems that it is not supported (exceptions are thrown). I am using the latest Prefect 2 release.
    ✅ 1
    a
    10 replies · 2 participants
  • g

    Gintautas Jankus

    06/15/2022, 1:05 PM
    Hi there, I am a bit struggling to run two separate flows on ECS and thought to ask for some help 🙂 I would like to run one flow after another. Both flows are registered on prefect cloud, they have their own task_definition_arn and they both run fine separately. I have tried to register flow that would call these flows like in example ( code in the thread). But seems that ECS agent, doesn‘t take in consideration create_flow_run function run_config parameter and tries register new tasks definitions on ECS (or only one for „parent_flow“), although I would like to use existing task definitions plus our ECS agent doesn‘t have permission to register task definitions, therefore I am getting this error:
    An error occurred (AccessDeniedException) when calling the RegisterTaskDefinition operation: User: is not authorized to perform: ecs:RegisterTaskDefinition on resource: * because no identity-based policy allows the ecs:RegisterTaskDefinition action
    I have tried also to pass flow_a task_definition_arn to parent_flow, then it runs flow, but gets this error:`in create_flow_run ValueError: Received both
    flow_id
    and
    flow_name
    . Only one flow identifier can be passed.` Maybe someone can share the best practices how I could create flow that would run in order these two already registered flows ? Big thanks in advance ! 🙂
    k
    4 replies · 2 participants
  • a

    Adam

    06/15/2022, 1:54 PM
    Hello friends, we’re running
    flow.register
    and getting an error from the
    requests
    module, presumably when it’s trying to register the flow to prefect cloud. It’s getting a webpage in the response, with a lot of obfuscated js. Any ideas why this is happening? We’re using prefect 0.14 on python 3.10.4
    k
    6 replies · 2 participants
  • m

    Matteo Fiorillo

    06/15/2022, 2:33 PM
    hello, I am seeing a strange behaviour when invoking an airbyte task from prefect, is this the right place to ask? using prefect 1.2.1
    👀 1
    k
    b
    37 replies · 3 participants
Powered by Linen
Title
m

Matteo Fiorillo

06/15/2022, 2:33 PM
hello, I am seeing a strange behaviour when invoking an airbyte task from prefect, is this the right place to ask? using prefect 1.2.1
👀 1
k

Kevin Kho

06/15/2022, 2:34 PM
Yes this is the right place to ask. What are you seeing?
m

Matteo Fiorillo

06/15/2022, 2:38 PM
Basically when the flow runs (it’s scheduled but it happens even if I do manually) some parameters in the Airbyte connection I am invoking get changed (namely the destination namespace and the normalization setting). I tried invoking using CURL and this does not happen.
k

Kevin Kho

06/15/2022, 2:39 PM
Can I see how you use it?
m

Matteo Fiorillo

06/15/2022, 2:40 PM
for sure, what do you want to see?
k

Kevin Kho

06/15/2022, 2:41 PM
The task usage in the Flow
m

Matteo Fiorillo

06/15/2022, 2:42 PM
give me a sec
from prefect import Flow
from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask

airbyte_conn = AirbyteConnectionTask(
        airbyte_server_host="XX.XX.XX.XX",
        airbyte_server_port=8000,
        airbyte_api_version="v1",
        connection_id="<uuid_of_the_connection>"
)

with Flow("material-scraper-data") as flow:
    flow.add_task(airbyte_conn) 

# Register the flow under the "airbyte" project
flow.register(project_name="laserhub")
scheduling is done through UI
k

Kevin Kho

06/15/2022, 2:44 PM
What ends up getting changed here and what does it get changed to?
m

Matteo Fiorillo

06/15/2022, 2:46 PM
destination namespace gets changed to “mirror source structure” (usually is “destination default”) and normalization gets changed from normalized to raw data
k

Kevin Kho

06/15/2022, 2:46 PM
Ok looking into it
m

Matteo Fiorillo

06/15/2022, 2:47 PM
thanks a lot
k

Kevin Kho

06/15/2022, 2:57 PM
Ok I am not sure on this but I think it might have to do with API limitations. I would need to ask our integrations lead but he’s currently out this week so feel free to ping me on Tuesday (Monday is a holiday too)
First time seeing this
m

Matteo Fiorillo

06/15/2022, 2:58 PM
for sure
thanks
hey ho, any news on this?
k

Kevin Kho

06/21/2022, 1:35 PM
Hey, thanks for following up. Yesterday was a holiday and the people with Airbyte knowledge just got back from vacation today. Will raise this
m

Matteo Fiorillo

06/21/2022, 1:36 PM
I found something interesting meanwhile
k

Kevin Kho

06/21/2022, 1:37 PM
Sure just add it to the thread
m

Matteo Fiorillo

06/21/2022, 1:37 PM
so unsurprisingly it’s the way prefect interact with airbyte API which causes the issues
first of all, prefect sets the schedule to whatever it is to manual but if the schedule is already set to manual the flow run throws an error. I suspect this is a bug.
this happens because the API response for the connection do not return any schedule if there isn’t any (!) but prefect expects it.
k

Kevin Kho

06/21/2022, 1:51 PM
So that I opened an issue for here. Not exactly but it’s worth adding that comment
m

Matteo Fiorillo

06/21/2022, 1:52 PM
furthermore, if a schedule is found then the connections gets updated, setting the schedule to manual (correct) but while doing this the connection loses some properties (destination namespace and normalization) which is the behaviour I was seeing
I added a comment to the issue
k

Kevin Kho

06/21/2022, 1:56 PM
thanks! I forwarded this too to Airbyte users on the team
m

Matteo Fiorillo

06/21/2022, 1:57 PM
thanks a lot
to be fair I don’t know if this is an issue with airbyte API (if I update a connection I would expect all the other properties to remain the same unless I explicitly change them) or with prefect. I’ll check also with the airbyte guys to be on the safe side
sorry to bother, if the fix is in master does it mean I have to wait for the next release? also, why is the conda package not updated yet?
k

Kevin Kho

06/22/2022, 1:52 PM
yes to next release. or you might be able to install from master. is the conda version 1.1?
Ah I guess conda is 1.2.1 while the latest is 1.2.2. We probably didn’t just merge the conda recipe yet
b

Boggdan Barrientos

07/05/2022, 9:06 PM
Hi @Matteo Fiorillo did you solve this?
k

Kevin Kho

07/05/2022, 9:30 PM
The schedule fix will be released tomorrow by latest
m

Matteo Fiorillo

07/06/2022, 7:46 AM
@Boggdan Barrientos as a temporary fix you can change the script yourself (like this)
I am still seeing errors:
Task 'AirbyteConnectionTask': Exception encountered during task execution!
Traceback (most recent call last):
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/tasks/airbyte/airbyte.py", line 340, in run
    connection_status = self._get_connection_status(
  File "/home/m_fiorillo/miniconda3/lib/python3.9/site-packages/prefect/tasks/airbyte/airbyte.py", line 176, in _get_connection_status
    self.logger.log(level=self.stream_output, msg=response.json())
AttributeError: 'AirbyteConnectionTask' object has no attribute 'stream_output'
k

Kevin Kho

07/07/2022, 4:34 PM
How did you use it? That seems like it should be defined
View count: 1