Robin
09/30/2020, 12:11 PMwrite_task_result_to_table
task that gets the result of other tasks and writes e.g. into a table whether the task was a success
, failed
, missing data
or empty (= task was not yet executed).
We have a flow that runs a several tasks for many systems.
So the table we would like to have as result would look like:
system id, task_1, task_2, task_3, ...
and then one row for each system.
Is this there already a more elegant way to write all the results of a flow into a table or something like this?
Or is the above described way the way to go?
Cheers 🙂Adam
09/30/2020, 12:35 PMVipul
09/30/2020, 12:55 PMMitchell Bregman
09/30/2020, 1:45 PMNewskooler
09/30/2020, 2:13 PMSlackbot
09/30/2020, 2:32 PMSlackbot
09/30/2020, 4:01 PMJosh Lowe
09/30/2020, 7:57 PMms16
09/30/2020, 9:05 PMms16
10/01/2020, 12:46 AMNakul Gowdra
10/01/2020, 12:59 AMJacob Blanco
10/01/2020, 1:13 AMSimone Cittadini
10/01/2020, 9:50 AMclass Process(Task):
def fu(self, rows):
for row in rows:
yield row + 100
def run(self, data):
output = {}
output["meta"] = data["meta"]
output["data"] = self.fu(data["rows"])
return output
@task
def pull(data):
for row in data["data"]:
print(row)
with Flow("test") as flow:
input_data = {"rows": [1,2,3,4], "meta": "fixedvalue"}
process = Process()
data = process(input_data)
drain = pull(data)
psimakis
10/01/2020, 11:28 AMfrom prefect.tasks.shell import ShellTask
from prefect import Flow
a = ShellTask()
commands = [
'ls',
'ls -l',
'invalidcommand'
]
with Flow('test') as flow:
b = a.map(commands)
# send a single slack nofiication
# that summarize the states of mapped tasks
send_slack_notification()
The purpose of this slack notification is to summarize the states of mapped tasks by displaying the percentage of successfully mapped tasks. In the case above, this percentage will be 66.6% (the last command will fail).
I tried to approach the problem using triggers and state handlers but I couldn't find a clean way to achieve the goal.
Have you been in this situation before? Any hint?
Thanks in advance!Newskooler
10/01/2020, 12:52 PMprefect.Paramter
?
My question in more detail: https://stackoverflow.com/questions/64155793/is-it-possible-to-loop-over-a-prefect-parameterKevin Weiler
10/01/2020, 1:26 PMRobin
10/01/2020, 2:48 PMUnexpected error: OSError("Timed out trying to connect to '<tcp://10.100.0.100:44991>' after 10 s: Timed out trying to connect to '<tcp://10.100.0.100:44991>' after 10 s: connect() didn't finish in time")
Is this a known issue?
The environment is a daskkubernetes cluster running on AWS EKS...Adam
10/01/2020, 3:06 PMCOPY
instruction to put your package into the image, and then the RUN
instruction to install it.”Newskooler
10/01/2020, 3:15 PMmy_function.map(a, b)
where len(a)
is 2 and len(b)
is 5, will this result in 10 runs? Currently I think it only runs twice : / 🤔Kevin Weiler
10/01/2020, 3:37 PM'413 Client Error: Request Entity Too Large for url: <http://prefect-api.aq.tc/graphql>'
Konstantinos
10/01/2020, 4:04 PMNewskooler
10/01/2020, 4:42 PMget_url_data
to finish, until transform_url_data
starts? Same goes for the next step.
The problem I have is that get_url_data
will grow quite large and I guess since the information is kept, I can run out of RAM and the whole thing can crash.
If the flow kept going iteratively, then the information will be disposed in the process and I have no problem.
I must be missing something here I guess and probably there is a better way to design. If anyone can point me in the right direction, it’s much appreciated : )Andy Dyer
10/01/2020, 5:34 PMWith DOCKER_HOST = <tcp://docker:2376>
raise DockerException(
docker.errors.DockerException: Error while fetching server API version: HTTPConnectionPool(host='docker', port=2376): Max retries exceeded with url: /version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fb67baf4760>: Failed to establish a new connection: [Errno 111] Connection refused'))
without DOCKER_HOST set
docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
Dolor Oculus
10/01/2020, 7:08 PMfrom prefect import Client
client = Client()
client.create_project(project_name="Hello, World")
# client.get_project(project_name="Hello, World")
When registering a flow, it looks a project is required in later versions of prefect, but for automation purposes I'd like to see if the project already exists before trying to create.Pedro Machado
10/01/2020, 7:11 PM[2020-10-01 19:07:39,119] ERROR - agent | Error while deploying flow: APIError(HTTPError('400 Client Error: Bad Request for url: <http+docker://localhost/v1.40/containers/create'>))
I think it's related to the tls_config
argument which defaults to False
. Is it possible to set this in the command line?Pedro Machado
10/01/2020, 8:35 PM.env
file but I don't see a way to pass this file in the cli. Alternatively, how do I tell the agent to use my config.toml
? I have one at ~/.prefect/config.toml
that is used when using flow.run()
Can I tell the agent running on the same machine to use this config file?Newskooler
10/01/2020, 9:04 PM{
"strings": ['string1', 'string2'],
'start_date': '20200920',
'end_date': '20200930',
}
Matt
10/01/2020, 11:12 PMPedro Machado
10/02/2020, 3:23 AMdir
for a LocalResult
at run time? I am setting it based on a context variable but it seems to be getting the value of the variable at registration time.
@task(
target="ftp_{start_date}_{end_date}.done",
result=LocalResult(dir=prefect.context.local_results_dir),
)
def mytask(start_date, end_date):
pass
Newskooler
10/02/2020, 8:19 AMNewskooler
10/02/2020, 8:19 AMDylan
10/02/2020, 2:21 PMNewskooler
10/02/2020, 2:35 PMDylan
10/02/2020, 2:36 PMPedro Machado
10/02/2020, 10:08 PMsuvervisord
or is there a docker native way of running the agent so that it will restart if it crashes?Dylan
10/02/2020, 10:31 PMrun
on that container. If you were to install a docker agent on a vm, I would recommend using the instructions abovePedro Machado
10/02/2020, 10:34 PM