Kieran
09/01/2021, 4:52 PMdocker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
(full trace in this thread)
From reading around is looks like a potential docker issue, an inability to connect to a socket?!
Has anyone else been hit with this?Abhas P
09/01/2021, 4:56 PMUnexpected error: TypeError("cannot pickle '_thread.lock' object")
The task blueprint looks like this :
def mongo_connect():
db = get_db() // wrapper on top of pymongoclient to connect to the specified db
collection = get_collection(db). // wrapper on top of pymongoclient to get a specified collection form the db
return collection
I understand that prefect needs to pickle all the tasks , but this code works fine while being run as an independent script (without prefect decorators).
How can I make the connection threadlock safe or rather pickle safe?Leon Kozlowski
09/01/2021, 5:35 PMGonzalo
09/01/2021, 6:01 PMset_schedule_active
I always get "success": false
).
When I try to manually toggle the flow through the GUI the entire server hangs until I manually kill the GraphQL process on the server.
Can anyone help or provide some insight? I'm acting as an interpreter for the tech team so I have to do some back and forth with my questions.Wilson Bilkovich
09/01/2021, 6:05 PM~/.prefect/config.toml
? Maybe an environment variable, because I don't see any CLI flags about it?Martin
09/01/2021, 6:08 PMSean Talia
09/01/2021, 6:37 PMdbt
projects) that multiple teams at my org use. The individual teams will set up an orchestrator flow that has one task (the StartFlowRun
task), and they'll pass in the various parameters that they need to pass in so that their specific dbt
project will run. They also will directly attach state handlers that they configure to their StartFlowRun
task within their orchestrator flow. Of course, in order for the status of the underlying template flow to bubble up to the orchestrator flow, we have to set wait=True
on that flow.
The issue with this setup is that it unnecessarily eats up our flow concurrency, since this setup always leads to at least 2 flows running at a given time. We don't really need the orchestrator flow to wait around; we're only keeping it waiting as a means to capture the template flow's status, which then gets passed along to the custom state handler that the respective team within our org has configured.Charles Leung
09/01/2021, 10:08 PMDonny Flynn
09/01/2021, 10:10 PMDan Stoner
09/01/2021, 10:56 PMprefect register -p my_flow.py
be updating the version of the Flow? I have changed the code and ran register
again, multiple changes and multiple registers, but the VERSION is still 1
in prefect get flows
Omar Sultan
09/01/2021, 11:34 PMCamila Solange
09/02/2021, 12:16 AMBen Muller
09/02/2021, 6:01 AMAlireza Taghizadeh
09/02/2021, 6:29 AMAbhishek
09/02/2021, 7:19 AMflow.register()
I looked into the documentation but couldn’t find it.Konstantin
09/02/2021, 8:46 AMDidier Marin
09/02/2021, 1:38 PMf
and `g`:
ys = f.map(xs)
zs = g.map(ys)
Is there a generic way to give g
batches of y values, for example 10, rather than one at a time ?
(And obviously, without having to wait for all the ys to be computed.)
Or is this something that is specific to the executor used ?Felipe Saldana
09/02/2021, 1:54 PMJacob Hayes
09/02/2021, 2:09 PMBrad I
09/02/2021, 4:30 PMflow.executor = DaskExecutor(
cluster_class=lambda: KubeCluster(
name=f"dask-{flow_image_name}",
pod_template=make_pod_spec(
image=prefect.context.image,
cpu_request=worker_cpu,
memory_request=worker_mem,
threads_per_worker=threads_per_worker
)
),
adapt_kwargs={
"minimum": min_workers,
"maximum": max_workers
},
)
Constantino Schillebeeckx
09/02/2021, 4:31 PMif not args.dry_run:
logger.log(SUCCESS, f"Registering flow {flow.name}")
flow.register(project_name=args.project, idempotency_key=flow.serialized_hash())
Everything is working as expected, except I'm getting more versions of the flow (after multiple registrations) than I was previously expecting. It makes me wonder what I don't understand about the idempotency_key
- could someone explain to me what could cause the serialized_hash to change unexpectedly? for example, if a different VM registers flows, would the hash change?Michael
09/02/2021, 4:40 PMfiles
and env_vars
attributes as outlined in the Docs. But it seems that my .dockerignore
file (in the directory from which I am registering the flow) is ignored by this build process. I have massive data directories nested at different levels inside the codebase so it’s essential there be a way to include this somehow. Anyone come across this / have any ideas? I’m curious about playing around with a custom Dockerfile, but the code that copies flows to a tmp Dockerfile in create_docker_file
is dynamic so it seems tough that I could write a standalone Dockerfile to handle this case.John Shearer
09/02/2021, 4:55 PMcondition = True
resultType = LocalResult if condition else S3Result
@task()
def create_list():
return [1,2,3]
@task(result=resultType(serializer=PandasSerializer(file_type='parquet')))
def to_df(a_list):
return pd.DataFrame({'col1': a_list})
with Flow(
name="my-flow",
result=resultType(serializer=PickleSerializer())
) as my_flow:
my_list = create_list()
my_df = to_df(my_list)
Charles Leung
09/02/2021, 5:47 PMAlex Furrier
09/02/2021, 5:48 PM@task
def gen_records():
return [('item', 1), ('item', 2), ('item', 3)]
@task
def subscript_input(x, y):
return x+str(y)
with Flow(
name="Test Flow",
) as test_flow:
records = gen_records()
total = subscript_input.map(x=records[0], y=records[1])
Nadav
09/02/2021, 6:05 PMAbhas P
09/02/2021, 6:21 PM@task
def transform(df):
return transformed_df
with Flow("transform-1") as flow:
transform(df). # current implementation
for each df in filtered_dfs: # suggested way to utilize paralellization optimization on tasks
transform(df)
1. Will breaking down the data frame into small chunks and the calling the transformation task on each batch of the dataframe help me reap the benefits of prefect parellelization using a dask executor?
2. Given that the input to a transform task is a large dataframe - what other steps can I consider to optimize the turn around time of the flow ? (suggest using different run configs and prefect paradigms of writing the code)Jeremy Yeo
09/02/2021, 8:24 PMdirs = ["dir1", "dir2", "dir3"]
commands = [f"rm -rf {d}" for d in dirs]
names = [{"name": f"removing dir {d}"} for d in dirs]
shell = ShellTask()
with Flow as f:
shell.map(command = commands, task_args={"name": "removing dir"})
# shell.map(command = commands, task_args={"name": dirs}) # not working
# shell.map(command = commands, task_args=names) # not working
f.run()
Possibly not getting some syntax right. Thanks.Brad
09/03/2021, 1:44 AMBrad I
09/03/2021, 5:41 AMDaskExecutor
not getting the env vars set in the run config, this has caused other issues in the way we have been deploying flows to different environments (test, dev, prod, etc). Previously, using the LocalDaskExecutor
, we could set the executor on the flow itself and it was generic enough to work across all of our environments. Now we also need to change the dask config depending on our target environment, but it seems like setting the executor outside of the with Flow()
definition just gets ignored. I believe this has something to do with the storage
and run_config
being serialized with the flow but the executor
is not right now. An example flow and registration script is attached inside this thread. Does anyone know of a work around for this issue or have a better way to do this?