Vladimir Bolshakov
03/18/2022, 8:23 AMprovide_task_policy
/ provide_flow_policy
). So my question is about orchestration settings that will be released in the future. Is custom orchestration policies will be parameters of API requests to set state of task/flow? Or orchestration concepts and APIs will be seriously changed in the near future? How orchestration policies will be serialized/deserialized between server and agent’s engines?Clovis
03/18/2022, 10:24 AMFailed
status but, I don't know why, prefect considers the task as successful (cf. my screenshot in attachment).
It's a blocking point from my point of view as it prevents me from relying on prefect and therefore forces me to double-check each time with Airbyte.
Maybe this issue comes from my code but I don't see why ?
sync_airbyte_connection = AirbyteConnectionTask(
max_retries=3, retry_delay=timedelta(seconds=10), timeout=timedelta(minutes=30),
)
with Flow("my flow", run_config=UniversalRun()) as flow:
airbyte_sync = sync_airbyte_connection(
<connection_infos>,
)
[...]
flow.set_reference_tasks([
airbyte_sync
])
Malthe Karbo
03/18/2022, 1:28 PMVadym Dytyniak
03/18/2022, 2:46 PMAdam Roderick
03/18/2022, 3:41 PMprefect.exceptions.ClientError: [{'path': ['secret_value'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Chris Reuter
03/18/2022, 4:32 PMYoutube▾
Aram Panasenco
03/18/2022, 5:16 PMWei Mei
03/18/2022, 6:05 PMUnexpected error while running flow: KeyError('Task slug connect_source-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?')
Chris Reuter
03/18/2022, 6:55 PMhttps://www.youtube.com/watch?v=uIv1m3-2tjA▾
Jean-Michel Provencher
03/18/2022, 7:05 PMupstream_tasks
with methods that actually requires parameter?
The documentation is not really clear regarding how to chain multiple upstream_tasks to create dependencies between them and I was wondering if some of you had some more complex examples.
For example, I don’t think I can do this
with Flow(f"{environment_prefix}-test", storage=S3(bucket=storage_location_bucket_name)) as flow:
dbt_run(organization_id_param, data_processing_start_date_param, data_processing_end_date_param, should_process_last_period, period, period_value,
upstream_tasks=[pull_snowflake_secret(a,b), pull_repo(b,c)])
Tony Liberato
03/18/2022, 7:44 PMMichał Augoff
03/18/2022, 8:12 PMIMAGE_PULL_SECRETS
for the Orion k8s agent/flow runner? I couldn’t find anything in the docs and the codeDarshan
03/18/2022, 9:06 PMTabari Brannon
03/18/2022, 9:35 PMMilton
03/18/2022, 10:01 PMMichał Augoff
03/18/2022, 10:28 PMKubernetesRun
restrict the flow only to kubernetes agents or can it still be picked up by any agent as long as the labels match? I was under this impression after reading the docs but when I created 1 docker agent and 1 k8s agent with the same set of labels, my kubernetes flow got picked up by the docker agentRoyzac
03/18/2022, 11:40 PMSerge Tarkovski
03/19/2022, 12:34 PMprefect==2.0b2
, Python 3.9, Linux):
• created an S3 bucket and a storage (isn't it too much for a local run?)
• started a local API server and configured PREFECT_API_URL
• created a queue
• started an agent for that queue
• created a deployment with DockerFlowRunner and no parameters
Then I ran the deployment and the agent output wasn't very informative, see below. In the UI the flow run "dag-sample/caped-antelope" is shown as failed without any logs available. Anyone can explain?
$ prefect agent start 6aa0c2b2-e895-4b8c-aed3-8bef5b2c88ab
Starting agent connected to <http://127.0.0.1:4200/api>...
___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
| _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
| _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
|_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
Agent started! Looking for work from queue '6aa0c2b2-e895-4b8c-aed3-8bef5b2c88ab'...
13:29:31.446 | INFO | prefect.agent - Submitting flow run '8eb8aa0b-82da-4081-9a42-aae1f22b0525'
/home/tarkovskyi/miniconda3/envs/prefect_exp39/lib/python3.9/site-packages/prefect/flow_runners.py:697: UserWarning: `host.docker.internal` could not be automatically resolved to your local ip address. This feature is not supported on Docker Engine v19.3.15, upgrade to v20.10.0+ if you encounter issues.
warnings.warn(
13:29:32.215 | INFO | prefect.flow_runner.docker - Flow run 'caped-antelope' has container settings = {'image': 'prefecthq/prefect:2.0b2-python3.9', 'network': None, 'network_mode': 'host', 'command': ['python', '-m', 'prefect.engine', '8eb8aa0b-82da-4081-9a42-aae1f22b0525'], 'environment': {'PREFECT_API_URL': '<http://127.0.0.1:4200/api>'}, 'auto_remove': False, 'labels': {'io.prefect.flow-run-id': '8eb8aa0b-82da-4081-9a42-aae1f22b0525'}, 'extra_hosts': {}, 'name': 'caped-antelope', 'volumes': []}
13:29:33.547 | INFO | prefect.agent - Completed submission of flow run '8eb8aa0b-82da-4081-9a42-aae1f22b0525'
13:29:33.584 | INFO | prefect.flow_runner.docker - Flow run container 'caped-antelope' has status 'running'
13:29:44.923 | INFO | prefect.flow_runner.docker - Flow run container 'caped-antelope' has status 'exited'
Taylor Harless
03/19/2022, 5:29 PMfrom prefect import Flow
from prefect.tasks.gcp.storage import GCSUpload
with Flow("google-cloud-test") as flow:
GCSUpload(bucket="test-upload", create_bucket=True)(
data="test-file.csv", credentials="GCP_CREDENTIALS"
)
flow.run()
and received an error: AttributeError: 'str' object has no attribute 'keys'
. I've incorporated the feedback from a similar error discussion recently, but can't figure out what the issue is. Any help is much appreciated.Omar Sultan
03/20/2022, 9:27 AMFile "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 341, in _raise_timeout
self, url, "Read timed out. (read timeout=%s)" % timeout_value
urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='prefect-apollo.prefect', port=4200): Read timed out. (read timeout=15)
This happens especially when we use the Task StartFlowRun it does not happen very often , but I was wondering if there was a way to force retry or if anyone knows why this would be happening? ThanksTomer Cagan
03/20/2022, 12:48 PMworker_client
and then submit new tasks to the same cluster (code inside), I see that printout from the first call of the function is logged in prefect and shipped to server, but consecutive calls are not (see log inside).
Is there some trick I can use?Trip Humphrey
03/21/2022, 3:15 AMMadhup Sukoon
03/21/2022, 4:28 AMprefect get flows
), and
2. De-register some of these flows through python.Ryan Sattler
03/21/2022, 5:51 AMmoti shakuri
03/21/2022, 11:04 AMAndreas Nord
03/21/2022, 1:32 PMAzer Rustamov
03/21/2022, 2:05 PMKubernetesFlowRunner
?Jared Robbins
03/21/2022, 2:33 PMJared Robbins
03/21/2022, 2:44 PMJared Robbins
03/21/2022, 3:10 PMJared Robbins
03/21/2022, 3:10 PMKevin Kho
03/21/2022, 3:26 PMAnna Geller
03/21/2022, 3:48 PMprefect concurrency-limit
command, you can set task-level concurrency limits.
2# The alternative to task-level concurrency limits are work-queue concurrency limits which allow you to set limits on a deployment level - this seems to be exactly what you want! To create that, you can create a work-queue for a specific deployment UUID:
prefect work-queue create -d 'uuid' test_queue
this will return WORK_QUEUE_ID which you can use to set a limit for that to 1:
prefect work-queue set-concurrency-limit WORK_QUEUE_ID 1
Jared Robbins
03/21/2022, 3:51 PMKevin Kho
03/21/2022, 4:40 PMJared Robbins
03/21/2022, 5:20 PM