Karthikeyan Rasipalayam Durairaj
09/04/2020, 4:04 PMChris Martin
09/04/2020, 5:38 PM@resource_manager
class DaskCluster:
def init(self, n_workers):
self.n_workers = n_workers
def setup(self):
return Client(n_workers=self.n_workers)
def cleanup(self, client):
client.close()
In this case how does the Client
get passed to tasks? Is is pickled and sent around? Moreover if the client held some mutable state that could be updated by the tasks, what would happen?Shaun Cutts
09/04/2020, 5:44 PMexecutor = DaskExecutor(
address="...",
cluster_class="dask_gateway.GatewayCluster",
cluster_kwargs={
image="my/image/...",
image_pull_secret="..." # ???
},
adapt_kwargs={minimum=2, maximum=10}
)
Berty
09/04/2020, 8:05 PMprefect server start
?Shawn Marhanka
09/04/2020, 9:27 PMUserWarning: Tasks were created but not added to the flow: {<Task: Flo Run Task Flow>}. This can occur when `Task` classes, including `Parameters`, are instantiated inside a `with flow:` block but not added to the flow either explicitly or as the input to another task.
Thank youdherincx
09/06/2020, 4:01 AMJohn Ramirez
09/06/2020, 5:35 PMSachit Shivam
09/06/2020, 6:29 PMThomas La Piana
09/07/2020, 3:26 AMNelson
09/07/2020, 9:33 AMupload_dataframe...
for various tables).
Opening the tasks also doesn’t provide info on their input to distinguish the different calls. Is there something I am missing that can help here?Sven Teresniak
09/07/2020, 11:10 AM/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py:822: UserWarning: This task is running in a daemonic subprocess; consequently Prefect can only enforce a soft timeout limit, i.e., if your Task reaches its timeout limit it will enter a TimedOut state but continue running in the background.
😞Robin
09/07/2020, 12:50 PMUnexpected error: NoRegionError('You must specify a region.')
, although I added AWS_DEFAULT_REGION
(along with AWS_CREDENTIALS
) to the prefect secrets.
Do I miss anything?as
09/07/2020, 1:38 PMtarget
parameter of tasks for checkpointing.
If I'm correct it will not run the task if it can find a file with the name specified with the target
parameter. correct?
I'm wondering what happens if an upstream task has been triggered. Does it still looks for the target file or does task run nonetheless because the input has changed?
If not, is there a way to achieve this behavior?
ThanksNuno Silva
09/07/2020, 3:31 PMprefect agent start --api http://<url>:4200
but in code doesn't:
from prefect.agent.local import LocalAgent
agent = LocalAgent(agent_address="http://<url>:4200")
agent.start()
fails with:
/tornado/netutil.py", line 174, in bind_sockets
sock.bind(sockaddr)
OSError: [Errno 99] Cannot assign requested address
Any idea? thanksitay livni
09/07/2020, 9:31 PMapply_map
the only way to implement a mapped case statement? ThanksThomas Hoeck
09/08/2020, 7:17 AMHTTPSConnectionPool(host='10.0.0.1', port=443): Read timed out. (read timeout=None)
. I'm running a k8 agent. What is weird is that it is that it is not happening every time but in the same frequency (pictures shows how the same flow only fails every second time.Sven Teresniak
09/08/2020, 7:37 AMNate Joselson
09/08/2020, 7:47 AMDbtShellTask
with a dynamic set of dbt_kwargs
that are defined at run time. The task works when I define the dbt_kwargs
in the task initialization, but not when they are provided at runtime.
from prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtShellTask
@task
def generate_dbt_kwargs_dict(project_id):
dbt_kwargs_dict = {
'type': 'bigquery',
'method': 'service-account',
'keyfile': 'key.json',
'project': project_id,
'dataset': 'derived_data',
'location': 'EU',
}
return dbt_kwargs_dict
dbt_run_task = DbtShellTask(
name='dbt_run_task',
command='dbt run',
profile_name='default',
environment='test',
overwrite_profiles=True,
profiles_dir=profiles_path,
)
with Flow(name="dbt_flow") as f:
project_id = Parameter("project_id")
dbt_kwargs_dict = generate_dbt_kwargs_dict(project_id)
run_task = dbt_run_task(dbt_kwargs=dbt_kwargs_dict)
out = f.run(project_id='bq-project')
This returns a prefect error:
ERROR - prefect.TaskRunner | Unexpected error: TypeError("'NoneType' object is not a mapping",)
I assume this means from here that the dbt_kwargs
dictionary is None
but I can’t see why, since I am passing the dictionary from the previous task…
Does anyone have an idea about this?
Thanks!Georg Zangl
09/08/2020, 5:17 PMRichard Hughes
09/08/2020, 7:47 PMversion_group_id
Where is the documentation for this?bral
09/08/2020, 7:50 PMARun
09/09/2020, 2:17 PMfiles = []
for file in os.listdir(path):
files.append(path+file)
@task
def run_file(file):
with open(file) as f:
line = f.readline()
name = f.name
sql = ""
for _, line in enumerate(f):
sql += line
if line.rstrip().endswith(";"):
query.run(query=sql)
sql = ""
with Flow("parallelfiles") as flow:
apply_map(run_file, files)
flow.register(project_name="test")
Matt Wong-Kemp
09/09/2020, 2:57 PMRichard Hughes
09/09/2020, 3:00 PMRichard Hughes
09/09/2020, 3:01 PMJeremiah
Eoghan Martin
09/09/2020, 4:29 PMMarvin
09/09/2020, 4:29 PM