Kevin
05/26/2021, 4:07 PMPedro Henrique
05/26/2021, 7:43 PMClaire Herdeman
05/26/2021, 9:02 PMBruno Murino
05/26/2021, 10:48 PMnick vazquez
05/27/2021, 1:12 AMFabrice Toussaint
05/27/2021, 9:51 AMJacob Baruch
05/27/2021, 12:30 PMChohang Ng
05/27/2021, 4:55 PMNikola Lusic
05/27/2021, 5:17 PMcpu
and memory
parameters of the ECSRun doesn't seem to have any effect. It seems that the agent always creates a task with 1CPU and 2GB RAM.
Do you maybe have some pointers for this issue?Irvin Tang
05/27/2021, 5:28 PMdef test_register_flow_docker(self):
register_flow(
flow=flow_docker,
flow_name="test",
project_name="test-project",
labels=["test-label"],
host_type="docker"
)
config = get_config("docker")
with set_temporary_config(config):
runner = CliRunner()
result = runner.invoke(get, ["flows"])
assert result.exit_code == 0
assert (
"test-docker" in result.output
and "test-project" in result.output
)
I was wondering if there was another way for testing flow registration? I want to be able to register the flow and the test should confirm that the flow successfully reigsteredFelipe Saldana
05/27/2021, 5:40 PMpost_runner.set_upstream(all_pushes)
post_runner.bind(mapped_run_name, mapped_data_root, mapped_collection_name)
post_runner(mapped_run_name, mapped_data_root, mapped_collection_name)
Using bind() works as I would expect: all_pushes completes entirely and then post_runner starts and completes.
Not using bind() post_runner does not wait for all_pushes to complete and then mayhem ensues and I get run() missing X required positional arguments messages.
Thoughts?Kathryn Klarich
05/27/2021, 6:19 PMAndrew Hannigan
05/27/2021, 7:58 PMPietro Immordino
05/28/2021, 4:46 AMChristian
05/28/2021, 7:11 AMThomas Hoeck
05/28/2021, 7:42 AMJohn Ramirez
05/28/2021, 1:17 PMNikola Lusic
05/28/2021, 2:18 PMECSRun
configuration be ran in a threaded mode, similar how LocalDaskExecutor
has the scheduled=threads
configuration?ciaran
05/28/2021, 2:23 PMciaran
05/28/2021, 2:32 PMCharles Leung
05/28/2021, 3:01 PMSean Harkins
05/28/2021, 4:25 PMOutOfMemoryError
.Jacob Goldberg
05/28/2021, 5:07 PM[2021-05-28 09:59:27-0700] INFO - prefect.Docker | Pushing image to the registry...
Traceback (most recent call last):
File "flow_registry.py", line 44, in <module>
build_and_register_all_flows(all_flows)
File "flow_registry.py", line 35, in build_and_register_all_flows
storage = storage.build()
File "/XXX/XXX/XXX/XXX/XXX/XXX/XXX/new_etl/venv/lib/python3.8/site-packages/prefect/storage/docker.py", line 303, in build
self._build_image(push=push)
File "/XXX/XXX/XXX/XXX/XXX/XXX/XXX/new_etl/venv/lib/python3.8/site-packages/prefect/storage/docker.py", line 378, in _build_image
self.push_image(full_name, self.image_tag)
File "/XXX/XXX/XXX/XXX/XXX/XXX/XXX/new_etl/venv/lib/python3.8/site-packages/prefect/storage/docker.py", line 586, in push_image
raise InterruptedError(line.get("error"))
InterruptedError: denied: Your authorization token has expired. Reauthenticate and try again.
This is confusing to me, because I have double checked that my local AWS authentication is all setup properly in the environment. Is this referring to something else? Relatively new to Docker, ECR, and Prefect. Any help appreciated! Any ideas?Tom Baldwin
05/28/2021, 8:26 PMrequests
library. Is there a way to trigger a flow run directly from ADF (without databricks as an intermediary)?itay livni
05/29/2021, 12:56 AMflow.result = LocalResult(
serializer=PandasSerializer(file_type="csv")
)
Task to pickle
num_clusters = value_count(df)
Thanks in advanceDaniel Bast
05/29/2021, 6:38 AMAurélien Vallée
05/30/2021, 7:07 AMwith Flow("myflow") as flow:
multiprocess = Parameter("multiprocess")
with case(multiprocess, True):
flow.executor = DaskExecutor()
PKay
05/30/2021, 9:57 PMUnexpected error: TypeError('no default __reduce__ due to non-trivial __cinit__')
Traceback (most recent call last):
File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 900, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/results/local_result.py", line 116, in write
value = self.serializer.serialize(new.value)
File "/home/USERNAME/.local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "/home/USERNAME/.local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/USERNAME/.local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
File "stringsource", line 2, in pymssql._mssql.MSSQLConnection.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
Tried both running on Ubuntu 20.04.02 and Windows 10, both are able to run flow locally, even tried using dockers and got the same error.
I am using python package pymssql to extract data from database and do some basic transformations with petl. Works fine when I run the flow without any agent/cloud. It's able to extract data and do transformations. Figure it's something to do with how it's passing data between functions.matta
05/30/2021, 10:02 PMFabrice Toussaint
05/31/2021, 8:46 AMFabrice Toussaint
05/31/2021, 8:46 AMKevin Kho
05/31/2021, 3:29 PMFabrice Toussaint
05/31/2021, 4:08 PM@task
def offset_for_number_of_pages(pages: int = 1):
if pages < 1:
raise ValueError('Cannot be lower than 1!')
else:
return [100 * n for n in range(pages)]
@task
def get_items(offset, url):
"""Retrieves the orders response of the API request."""
request = requests.get(url, offset=offset)
return request.json
@task
def do_something(item):
return item + 1
def execute_for_item(item):
return do_something(item)
with Flow as flow:
pages = Parameter('pages', default=1)
offsets = offset_for_number_of_pages(pages=pages) #[0, 100, 200, 300, 500]
items = get_items.map(offset=offsets, url=unmapped(url)) #[[0..100], [100..200], [200..300], [300..400]
results = apply_map(execute_for_item, item=flatten(items)) #[0..400]
from prefect import task, Flow, apply_map, flatten, unmapped, Parameter
@task
def generate_list(amount: int = 1):
if amount < 1:
raise ValueError('Cannot be lower than 1!')
else:
return [100 * n for n in range(amount)]
@task
def get_items(item):
"""Retrieves the orders response of the API request."""
return [0, 1, 2, 3, item]
@task
def add_one(item):
print(item)
return item + 1
@task
def add_two(item):
return item + 2
def execute_for_item(item):
item_plus_one = add_one(item)
return add_two(item_plus_one)
with Flow("Test") as flow:
amount = Parameter('amount', default=1)
l = generate_list(amount=amount) # [0, 100, 200, 300, 500]
items = get_items.map(item=l) # [[0..100], [100..200], [200..300], [300..400]
results = apply_map(execute_for_item, item=flatten(items)) # [0..400]
flow.run()
Kevin Kho
05/31/2021, 6:16 PMFabrice Toussaint
05/31/2021, 6:17 PMKevin Kho
06/01/2021, 1:28 PMfrom prefect import Flow, task, flatten, case, apply_map
import prefect
@task
def generate_numbers():
return [[1],[2,3],[4,5,6]]
@task
def up(x):
return x + 1
@task
def is_even(x):
return x % 2 ==0
def up_or_down(x):
cond = is_even(x)
with case(cond, True):
res = up(x)
return res
@task
def simple_flatten(x):
return [num for elem in x for num in elem]
with Flow('test') as flow:
x = generate_numbers()
y = simple_flatten(x)
apply_map(up_or_down, y)
flow.run()
Fabrice Toussaint
06/01/2021, 2:01 PMKevin Kho
06/01/2021, 2:09 PM