Lukas N.
12/16/2020, 9:55 AMWill Milner
12/16/2020, 2:30 PM@task(log_stdout=True)
def sample_print_task():
print("hello")
When I declare a shell task like this
task_shell = ShellTask(return_all=True, log_stdout=True, log_stderr=True, stream_output=True)
with Flow("test") as flow:
print_test = task_shell(command="echo hi", task_args={"name": "hi"})
I don't see anything printed after I register and run the flow. I have log_to_cloud
set to True on the agent I am runningVitaly Shulgin
12/16/2020, 3:09 PM[2020-12-16 14:48:37,015] ERROR - agent | 400 Client Error: Bad Request for url: <http://prefect-apollo:4200/>
The following error messages were provided by the GraphQL server:
INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at
"input.tenant_id"; Expected non-nullable type UUID! not to be null.
The GraphQL query was:
mutation($input: get_runs_in_queue_input!) {
get_runs_in_queue(input: $input) {
flow_run_ids
}
}
The passed variables were:
{"input": {"before": "2020-12-16T14:48:36.911186+00:00", "labels": ["talynuc", "any"], "tenant_id": null}}
dh
12/16/2020, 4:04 PMMarc Lipoff
12/16/2020, 7:45 PMThe repository with name 'staging-prefect-insurance_export' does not exist in the registry with id '524279393077'
Im running something like this :
Docker(registry_url='<account_id>.<http://dkr.ecr.us-west-2.amazonaws.com|dkr.ecr.us-west-2.amazonaws.com>',
image_name="dev-prefect-<flow-name>",
python_dependencies=python_dependencies,
files={f: os.path.join('/modules/', os.path.basename(f))
for f in get_files_in_directory(current_module_directory)},
env_vars = {"PYTHONPATH": "$PYTHONPATH:modules/:modules/utils/"})
Christian
12/16/2020, 10:27 PMNone
.
The idea is to send an email (EmailTask) on a task failure and I want to parameterise the email address that will be alerted. Is there another way to pass a variable into a state_handler? Am I going in the wrong direction?
ThanksLech Głowiak
12/17/2020, 6:14 AMBob Primusanto
12/17/2020, 9:23 AMThomas Reynaud
12/17/2020, 2:02 PMdh
12/17/2020, 2:39 PMJoël Luijmes
12/17/2020, 2:42 PMKubernetesRun
but these are only strings. Is there currently an easy way to provide secrets as envs?
My intuition says I have to provide a custom job yaml then, is that correct, or am I missing an easier way?Joseph
12/17/2020, 5:23 PMPedro Martins
12/17/2020, 6:31 PMError: ErrImagePull
custom_confs = {
"run_config": KubernetesRun(
image="drtools/prefect:aircraft-etl",
image_pull_secrets=["regcred"],
),
"storage": S3(bucket="dr-prefect"),
}
with Flow("Aircraft-ETL", **custom_confs) as flow:
airport = Parameter("airport", default = "IAD")
radius = Parameter("radius", default = 200)
reference_data = extract_reference_data()
live_data = extract_live_data(airport, radius, reference_data)
transformed_live_data = transform(live_data, reference_data)
load_reference_data(reference_data)
load_live_data(transformed_live_data)
Richard Hughes
12/17/2020, 6:36 PMmutation {
delete_flow(input:
query {
flow
(where: {name: {_eq: "MyFlowToDelete" }})
{
id
}
}
) {
success,
error
}
}
Vitaly Shulgin
12/17/2020, 7:25 PMPhil Glazer
12/17/2020, 9:33 PMHenrik Väisänen
12/17/2020, 9:53 PMMarwan Sarieddine
12/17/2020, 9:59 PMDiogo Munaro
12/17/2020, 11:38 PMStartFlowRun
, but I can't get results from that flow. Looking at the code, create_flow_run
ignores result arg: https://github.com/PrefectHQ/prefect/blob/96ef85470872593268c9498b57ac9f0b5a268e01/src/prefect/tasks/prefect/flow_run.py#L160
Do you know a way to get Flow results?
Here a test code:
from prefect.tasks.prefect import StartFlowRun
from prefect import Flow, task
from prefect.engine.results.local_result import LocalResult
graph_building = StartFlowRun(
flow_name="test_flow",
project_name="test_project",
wait=True,
result=LocalResult(".")
)
with Flow("Call Flow") as flow:
end_flow = graph_building()
state = flow.run()
state.result[endflow].result #nothing here
Marc Lipoff
12/18/2020, 12:43 AMprefect agent ecs start --cluster arn:aws:ecs:us-west-2:xxx:cluster/staging-cluster --label staging --task-role-arn arn:aws:iam::xxx:role/staging-prefect-agent-flow --log-level DEBUG --launch-type FARGATE
. So I would expect the compatibility to be FARGATE
It seems this is the cause of the error I am getting when I submit my flow to the agent. The error is: [2020-12-18 00:34:44,559] ERROR - agent | Error while deploying flow: InvalidParameterException('An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.')
Has anyone seen something like this before?jack
12/18/2020, 2:20 AM{'_schema': 'Invalid data type: None'}
Does anyone know what this means?Vitaly Shulgin
12/18/2020, 9:11 AMVitaly Shulgin
12/18/2020, 2:51 PMSergiy Krutsenko
12/18/2020, 5:32 PMFlow_a: Created 3 chunks
Flow_b: Got result from previous flow None
import prefect as pf
from prefect import Client, task, Parameter, Flow
from prefect.engine.results import LocalResult
from prefect.environments.storage import Local
from prefect.utilities.configuration import set_temporary_config
from prefect.tasks.prefect import FlowRunTask
from prefect.environments import LocalEnvironment
import socket
def get_server_config(server, port):
return {
"cloud.api": "http://{}:{}".format(server, port),
"cloud.graphql": "http://{}:{}/graphql".format(server, port),
"backend": "server",
}
def get_logger():
return pf.context.get('logger')
@task()
def create_chunks(inputs):
logger = get_logger()
chunks = ['a', 'b', 'c']
<http://logger.info|logger.info>('Created %d chunks', len(chunks))
return chunks
@task()
def accept_results(result):
logger = get_logger()
<http://logger.info|logger.info>('Got result from previous flow %s', result)
return result
def main():
hostname = socket.gethostname()
labels = [hostname]
env = LocalEnvironment(labels=labels)
with set_temporary_config(get_server_config('xxx', '4200')):
with Flow('flow_a', storage=Local(), environment=env,
result=LocalResult(dir='c:/temp/flows/flow_a',
location='{flow_run_id}_{task_name}_{map_index}.txt')
) as flow_a:
inputs = Parameter('inputs', required=True)
create_chunks(inputs)
with Flow('flow_b', storage=Local(), environment=env,
result=LocalResult(dir='c:/temp/flows/flow_b',
location='{flow_run_id}_{task_name}_{map_index}.txt')
) as flow_b:
result = Parameter('result', required=True)
accept_results(result)
fa = FlowRunTask(flow_name="flow_a", project_name="Test", wait=True)
fb = FlowRunTask(flow_name="flow_b", project_name="Test", wait=True)
with Flow('flow_c', storage=Local(), environment=env,
result=LocalResult(dir='c:/temp/flows/flow_c',
location='{flow_run_id}_{task_name}_{map_index}.txt')
) as flow_c:
a = fa(parameters={'inputs': {}})
b = fb(upstream_tasks=[a], parameters={'result': a.result})
client = Client()
id_a = client.register(flow_a, project_name="Test")
id_b = client.register(flow_b, project_name="Test")
id_c = client.register(flow_c, project_name="Test")
client.create_flow_run(flow_id=id_c)
if __name__ == "__main__":
main()
itay livni
12/18/2020, 6:31 PMFlowRunTask
change to StartFlowRun
somewhere along the way? ThanksAndrew Hannigan
12/18/2020, 7:05 PM/opt/project/dwt/extract/extract.py:43: UserWarning: A Task was passed as an argument to BookmarkEnd, you likely want to first initialize BookmarkEnd with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:
my_task = BookmarkEnd(...) # static (non-Task) args go here
res = my_task(...) # dynamic (Task) args go here
Chris White
DeprecationWarnings
, or those who just want to learn more, please join @Laura Lorenz as she walks us through the changes and new functionality! The stream will begin at 1pm PT / 4pm ET at this link:
See you all there!Severin Ryberg [sevberg]
12/18/2020, 9:35 PMmatta
12/19/2020, 5:21 AM