Dan Stoner
09/01/2021, 2:28 PMdelete project
.
$ prefect delete --help
Usage: prefect delete [OPTIONS] COMMAND [ARGS]...
Delete commands that refer to mutations of Prefect API metadata.
Usage:
$ prefect delete [OBJECT]
Arguments:
project Delete projects
...
marios
09/01/2021, 2:41 PMAn Hoang
09/01/2021, 3:28 PMParameter
as the default for another? Here's what I did
#task
@task(target= {output_folder_path}/result)
def example_task(output_folder_path)
....
#in flow
step1_folder_path = Parameter("step1_folder_path")
output_folder_path = Parameter("output_folder_path", default = step1_folder_path)
When I do this, I got a warning
<ipython-input-226-3b472991f1ef>:8: UserWarning: A Task was passed as an argument to Parameter, you likely want to first initialize Parameter with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:
my_task = Parameter(...) # static (non-Task) args go here
res = my_task(...) # dynamic (Task) args go here
see <https://docs.prefect.io/core/concepts/flows.html#apis> for more info.
output_folder_path = Parameter("output_folder_path", default = step1_folder_path)
When I execute the flow, it runs but the target
doesn't work as intended, the task still runs every time even though the target files are there. When I provide a hardcoded string to the default of output_folder_path
, everything works fineSamuel Hinton
09/01/2021, 4:01 PMKieran
09/01/2021, 4:52 PMdocker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
(full trace in this thread)
From reading around is looks like a potential docker issue, an inability to connect to a socket?!
Has anyone else been hit with this?Abhas P
09/01/2021, 4:56 PMUnexpected error: TypeError("cannot pickle '_thread.lock' object")
The task blueprint looks like this :
def mongo_connect():
db = get_db() // wrapper on top of pymongoclient to connect to the specified db
collection = get_collection(db). // wrapper on top of pymongoclient to get a specified collection form the db
return collection
I understand that prefect needs to pickle all the tasks , but this code works fine while being run as an independent script (without prefect decorators).
How can I make the connection threadlock safe or rather pickle safe?Leon Kozlowski
09/01/2021, 5:35 PMGonzalo
09/01/2021, 6:01 PMset_schedule_active
I always get "success": false
).
When I try to manually toggle the flow through the GUI the entire server hangs until I manually kill the GraphQL process on the server.
Can anyone help or provide some insight? I'm acting as an interpreter for the tech team so I have to do some back and forth with my questions.Wilson Bilkovich
09/01/2021, 6:05 PM~/.prefect/config.toml
? Maybe an environment variable, because I don't see any CLI flags about it?Martin
09/01/2021, 6:08 PMSean Talia
09/01/2021, 6:37 PMdbt
projects) that multiple teams at my org use. The individual teams will set up an orchestrator flow that has one task (the StartFlowRun
task), and they'll pass in the various parameters that they need to pass in so that their specific dbt
project will run. They also will directly attach state handlers that they configure to their StartFlowRun
task within their orchestrator flow. Of course, in order for the status of the underlying template flow to bubble up to the orchestrator flow, we have to set wait=True
on that flow.
The issue with this setup is that it unnecessarily eats up our flow concurrency, since this setup always leads to at least 2 flows running at a given time. We don't really need the orchestrator flow to wait around; we're only keeping it waiting as a means to capture the template flow's status, which then gets passed along to the custom state handler that the respective team within our org has configured.Charles Leung
09/01/2021, 10:08 PMDonny Flynn
09/01/2021, 10:10 PMDan Stoner
09/01/2021, 10:56 PMprefect register -p my_flow.py
be updating the version of the Flow? I have changed the code and ran register
again, multiple changes and multiple registers, but the VERSION is still 1
in prefect get flows
Omar Sultan
09/01/2021, 11:34 PMCamila Solange
09/02/2021, 12:16 AMBen Muller
09/02/2021, 6:01 AMAlireza Taghizadeh
09/02/2021, 6:29 AMAbhishek
09/02/2021, 7:19 AMflow.register()
I looked into the documentation but couldn’t find it.Konstantin
09/02/2021, 8:46 AMDidier Marin
09/02/2021, 1:38 PMf
and `g`:
ys = f.map(xs)
zs = g.map(ys)
Is there a generic way to give g
batches of y values, for example 10, rather than one at a time ?
(And obviously, without having to wait for all the ys to be computed.)
Or is this something that is specific to the executor used ?Felipe Saldana
09/02/2021, 1:54 PMJacob Hayes
09/02/2021, 2:09 PMBrad I
09/02/2021, 4:30 PMflow.executor = DaskExecutor(
cluster_class=lambda: KubeCluster(
name=f"dask-{flow_image_name}",
pod_template=make_pod_spec(
image=prefect.context.image,
cpu_request=worker_cpu,
memory_request=worker_mem,
threads_per_worker=threads_per_worker
)
),
adapt_kwargs={
"minimum": min_workers,
"maximum": max_workers
},
)
Constantino Schillebeeckx
09/02/2021, 4:31 PMif not args.dry_run:
logger.log(SUCCESS, f"Registering flow {flow.name}")
flow.register(project_name=args.project, idempotency_key=flow.serialized_hash())
Everything is working as expected, except I'm getting more versions of the flow (after multiple registrations) than I was previously expecting. It makes me wonder what I don't understand about the idempotency_key
- could someone explain to me what could cause the serialized_hash to change unexpectedly? for example, if a different VM registers flows, would the hash change?Michael
09/02/2021, 4:40 PMfiles
and env_vars
attributes as outlined in the Docs. But it seems that my .dockerignore
file (in the directory from which I am registering the flow) is ignored by this build process. I have massive data directories nested at different levels inside the codebase so it’s essential there be a way to include this somehow. Anyone come across this / have any ideas? I’m curious about playing around with a custom Dockerfile, but the code that copies flows to a tmp Dockerfile in create_docker_file
is dynamic so it seems tough that I could write a standalone Dockerfile to handle this case.John Shearer
09/02/2021, 4:55 PMcondition = True
resultType = LocalResult if condition else S3Result
@task()
def create_list():
return [1,2,3]
@task(result=resultType(serializer=PandasSerializer(file_type='parquet')))
def to_df(a_list):
return pd.DataFrame({'col1': a_list})
with Flow(
name="my-flow",
result=resultType(serializer=PickleSerializer())
) as my_flow:
my_list = create_list()
my_df = to_df(my_list)
Charles Leung
09/02/2021, 5:47 PMAlex Furrier
09/02/2021, 5:48 PM@task
def gen_records():
return [('item', 1), ('item', 2), ('item', 3)]
@task
def subscript_input(x, y):
return x+str(y)
with Flow(
name="Test Flow",
) as test_flow:
records = gen_records()
total = subscript_input.map(x=records[0], y=records[1])
Nadav
09/02/2021, 6:05 PMNadav
09/02/2021, 6:05 PMKevin Kho
09/02/2021, 6:07 PMNadav
09/02/2021, 6:10 PMKevin Kho
09/02/2021, 6:21 PMBrad I
09/03/2021, 5:23 AMKevin Kho
09/03/2021, 5:39 AMZach Angell
09/03/2021, 2:53 PMBrad I
09/03/2021, 3:08 PMprefect agent kubernetes install
• No custom job template
• IMAGE_PULL_POLICY
is left blank (assume that means IfNotPresent
)
• We are running the flow both from the UI and API
• In the UI, select the flow, click ‘Run’, setting our run inputs, and clicking ‘Run’
• API is similar, sending the input dictionary to the create_flow_run
mutation
• The KubernetesRun
config is set on registration and everything works except the image_pull_policy
flagZach Angell
09/03/2021, 3:17 PMcreate_flow_run
mutation you're using? (Feel free to DM instead of post in the thread)run_config
in create_flow_run
, I would expect it to respect the image_pull_policy
argumentBrad I
09/03/2021, 7:48 PMconst query = gql`
mutation ($input: create_flow_run_input!) {
create_flow_run(input: $input) {
id
}
}
`;
const variables = {
input,
};
Where the input is defined like:
parameters = {
some_var: 42,
uuid: fileId,
}
Zach Angell
09/07/2021, 3:58 PMIn general, we don’t really want our users to have to configure every run_config parameter and should just use the default that the flow was registered withUnderstood and 100% agree that is how the system should work. For the UI, we need to add a new field. I've opened an issue here https://github.com/PrefectHQ/ui/issues/1040. For the API, I can do some further testing. Could you DM me the ID of your Flow and Flow Group? Both can be found in the "Flow" page -> "Overview" tab -> "Details" option on the "Overview" tile"
run_config
is provided as an input to create_flow_run
, Prefect should use the Flow's run config. I've also confirmed the Flow Group does not have a run_config
that would be overriding this behavior.
The UI definitely has a bug, but I'm a bit perplexed on the API behaviorBrad I
09/10/2021, 12:40 PM