Peter Roelants
05/21/2021, 1:20 PMt
minutes
◦ Calls flow_B x
times using StartFlowRun
◦ Run with LocalDaskExecutor
to limit parallelism to max y
runs at a time. For one scheduled run, the dependent flow flow_B is limited to run max y
at a time.
Now, I noticed that for a single run of flow_A, flow_B is indeed limited to a parallelism of y
runs at one time.
However, when a previous flow_A (and dependent *flow_B*s) are still running), and a new flow_A, with new dependent *flow_B*s are scheduled than more than y
*flow_B*s can run at the same time.
For example with parallelism `y=2`:
• At time 1 there will be 2 *flow_B*s runing.
Run flow_A 1:
|--dependent flow_B 1: running
|--dependent flow_B 2: running
|--dependent flow_B 3: waiting
• At time 2 there will be 4 *flow_B*s runing.
Running flow_A-1:
|--dependent flow_B-1: finished
|--dependent flow_B-2: running
|--dependent flow_B-3: running
Run flow_A-2:
|--dependent flow_B-4: running
|--dependent flow_B-5: running
|--dependent flow_B-6: waiting
Is it possible to limit the parallelism of flow_B to max y
over all scheduled runs?Jeff Williams
05/21/2021, 3:11 PMlen(state.result.keys())
I get a non-zero value locally but get zero when run on the agent.
Any ideas as to why?Mariusz Olszewski
05/21/2021, 7:13 PMJulio Venegas
05/23/2021, 7:30 PMusername@server-name
. In order to get hasura running I have to escape the @
in the username with %40
but that leads to the following graphql error
graphql_1 | Could not upgrade the database!
graphql_1 | Error: invalid interpolation syntax in '<postgres://prefectpgadmin>%40sql-dev-prefect:REDACTED_PSWD@sql-dev-prefect.postgres.database.azure.com:5432/sqldb-dev-prefect' at position 25
Any suggestions about what I can to avoid the graphql interpolation issue?jaehoon
05/24/2021, 9:40 AMtest_flow.run(parameters=dict(account_id=account.id), idempotency_key=str(account.id))
I wanna run flows with new unique idempotency_key every time
what's the best practice? Is there any good method in Prefect module?
plz help me!Charles Leung
05/24/2021, 3:40 PMNoah Guilbault
05/24/2021, 6:00 PMCharles Leung
05/24/2021, 8:28 PM김응진
05/25/2021, 9:25 AMdef test_flow():
client = prefect.Client(api_token="dddd")
id = client.register(google_ads_flow, project_name='pipeline-test')
client.create_flow_run(
flow_id=id,
parameters=dict(account_id=46)
)
got
Failed to load and execute Flow's environment: TypeError("missing a required argument: 'date'")
error.
There is nothing like date parameter or something in my flow.Stéphan Taljaard
05/25/2021, 10:58 AMwhich prefect
== /home/staljaard/.local/bin/prefect
)
How would you recommend installing/configuring my prefect CLI and _pip install_ed prefect (agent) to be available for all users SSH'ing into the VM?Raúl Mansilla
05/25/2021, 4:38 PMFailed to retrieve task state with error: ClientError([{'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: '}}}])
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
task_run_info = self.client.get_task_run_info(
File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/client/client.py", line 1399, in get_task_run_info
result = self.graphql(mutation) # type: Any
File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: '}}}
This only happen when I try to run the flow: from prefect import task, Flow, Parameter
from prefect.executors import LocalExecutor
from prefect.run_configs import LocalRun
from prefect.storage import CodeCommit
@task()
def say_hello(name):
print("Hello, {}!".format(name))
with Flow("flow",run_config=LocalRun(),storage=CodeCommit(repo="prefect_flows", path="flows/hello_world.py", commit = "master"),executor=LocalExecutor()) as flow:
name = Parameter('name')
say_hello(name)
flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"
Does anyone has any clue?Matthew Neary
05/25/2021, 5:53 PMMariusz Olszewski
05/25/2021, 6:11 PMDaniel Davee
05/25/2021, 7:04 PMDiego Alonso Roque Montoya
05/25/2021, 11:51 PMArkady K.
05/26/2021, 9:41 PMnick vazquez
05/27/2021, 1:08 AMlocalhost:4200
although the prefect server/agent were running on a dedicated scheduler box.
Am I missing something for configuring the workers to work properly?
Do they need to point back at the scheduler's ip for logging?
Do I need to run an agent on each machine to pass jobs to the workers?Lukáš Polák
05/27/2021, 6:24 AMTom Forbes
05/27/2021, 11:13 AMflow.run()
, but not having any caching is anoying.
Is there a way to work around this to enable caching for locally run tasks?Tom Forbes
05/27/2021, 11:28 AM@task()
def save_results(dataframe):
dataframe.save_parquet(UNIQUE_TASK_LOCATION)
return UNIQUE_TASK_LOCATION
or somesuch. But results seem to be centred around handling the writing/pickling of data for you? Ideally I’d not like to care if it’s a S3 prefix (for production) or a local directory (for debugging).Stéphan Taljaard
05/27/2021, 1:46 PMprefect backend server
2. prefect server start
If I don't want "default" displayed on the UI (i.e. I just need one tenant, but don't want it named "default"), I need to add a new tenant. This is done through
3. prefect server create-tenant --name "Some Other Name"
This now creates a new, additional, tenant. I only need one...
Ideally, the steps would be, from above, 1 then 3 then 2. However, that order doesn't work because the server (database) has to be up for cmd 3 to work.
Would it be useful to have the default tenant name as optional argument to prefect server start
?
Or am I missing something w.r.t. the creation of my tenant?Garret Cook
05/27/2021, 6:39 PMChohang Ng
05/27/2021, 10:15 PMfrom prefect import Task
import pandas as pd
import os,sys
from prefect.utilities.tasks import task
import db as db
from prefect import task, Flow, Parameter
import prefect
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor
class ETL(Task):
def __init__(self):
self.df = self.extract()
def extract(self):
read_conn = db.read_conn
query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 5801"""
df = pd.read_sql(query,read_conn)
return df
def load(self):
self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
df = ETL()
df.load()
flow.register(project_name="tester")
Chohang Ng
05/28/2021, 3:39 PMChris Bowen
05/28/2021, 4:45 PMprefect server start
, the task submitted and didn't run interactively. I'm not sure if I inadvertently changed a setting, but for the past day when I start the server, it locks up the terminal and runs as an interactive job. I don't see a setting anywhere to modify it and run it as a daemon or comparable solution. Any ideas? I realize this might b e a Linux issue more than a Prefect one.
Second thing: I can't get an agent to start. I'm getting a timeout every time I try to fire it up. I can access the UI from a browser on port 8081 outside of the RHL VM that the server is running on. I can also access the utility that runs on port 4200 from a browser. For reference, there is also an Airflow instance running on this RHL server, so I've been using this command to start prefect: prefect server start --postgres-port=5433 --ui-port=8081
. I haven't found documentation, troubleshooting guides, or anybody asking online about the agent timing out. I tried running the start agent job as the root user and it still times out, so I don't think it's permission, but I'm happy to check anything. I did run prefect backend server
.
Here's the agent command:
prefect agent local start
I did try messing around with the various hostname flags for this command, I believe I tried every port displayed in the docker list below. I think the agent should be coming up on 4200?
Here's the docker info for the Prefect containers:
d343f6b8e7a5 prefecthq/ui:core-0.14.20 "/docker-entrypoint.…" 8 minutes ago Up 8 minutes (healthy) 80/tcp, 0.0.0.0:8081->8080/tcp, :::8081->8080/tcp tmp_ui_1
9c993c53daed prefecthq/apollo:core-0.14.20 "tini -g -- bash -c …" 8 minutes ago Up 8 minutes (healthy) 0.0.0.0:4200->4200/tcp, :::4200->4200/tcp tmp_apollo_1
f17c041a6ac8 prefecthq/server:core-0.14.20 "tini -g -- python s…" 8 minutes ago Up 8 minutes tmp_towel_1
65cd55a58004 prefecthq/server:core-0.14.20 "tini -g -- bash -c …" 8 minutes ago Up 8 minutes (healthy) 0.0.0.0:4201->4201/tcp, :::4201->4201/tcp tmp_graphql_1
9a251fd9322f hasura/graphql-engine:v1.3.3 "graphql-engine serve" 8 minutes ago Up 8 minutes (healthy) 0.0.0.0:3000->3000/tcp, :::3000->3000/tcp tmp_hasura_1
fd22ad83e1a5 postgres:11 "docker-entrypoint.s…" 8 minutes ago Up 8 minutes (healthy) 0.0.0.0:5433->5432/tcp, :::5433->5432/tcp tmp_postgres_1
Here's the command I'm executing for the agent:
$ prefect agent local start
/usr/local/lib/python3.8/site-packages/prefect/tasks/__init__.py:8: UserWarning: SQLite tasks require sqlite3 to be installed
import prefect.tasks.database
[2021-05-28 16:32:21,594] INFO - agent | Registering agent...
And here's the error:
File "/usr/local/lib/python3.8/site-packages/urllib3/connection.py", line 200, in connect
conn = self._new_conn()
File "/usr/local/lib/python3.8/site-packages/urllib3/connection.py", line 174, in _new_conn
raise ConnectTimeoutError(
urllib3.exceptions.ConnectTimeoutError: (<urllib3.connection.HTTPConnection object at 0x7f1a862b83d0>, 'Connection to localhost timed out. (connect timeout=15)')
...
requests.exceptions.ConnectTimeout: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at 0x7f1a862b83d0>, 'Connection to localhost timed out. (connect timeout=15)'))
Here's the installation guide I followed: https://docs.prefect.io/core/getting_started/installation.html
Any help or advice is much appreciated, thank you!Aurélien Vallée
05/30/2021, 6:08 AMAurélien Vallée
05/30/2021, 6:08 AMMarko Jamedzija
05/31/2021, 1:30 PMKubernetesRun
so that it runs jobs in a different namespace than the agent is in. I tried setting the namespace in job template yaml, but it doesn’t work. Any suggestions?Damien Ramunno-Johnson
06/01/2021, 4:36 PMUnixHTTPConnectionPool(host='localhost', port=None): Read timed out. (read timeout=60)
Flow run is no longer in a running state; the current state is: <Failed: "UnixHTTPConnectionPool(host='localhost', port=None): Read timed out. (read timeout=60)">
Is this limit on the cloud api side?Dana Merrick
06/01/2021, 8:47 PMDana Merrick
06/01/2021, 8:47 PMnicholas
06/01/2021, 8:51 PMDana Merrick
06/01/2021, 8:52 PMnicholas
06/01/2021, 8:54 PMDana Merrick
06/01/2021, 8:54 PMnicholas
06/01/2021, 8:55 PMDana Merrick
06/01/2021, 8:55 PMnicholas
06/02/2021, 3:45 PM